Skip to content

Commit

Permalink
Improve comments, error handling, and retry delay handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mvines committed Sep 8, 2021
1 parent d3aa9bc commit 007fb3a
Showing 1 changed file with 44 additions and 31 deletions.
75 changes: 44 additions & 31 deletions client/src/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub const MAX_FANOUT_SLOTS: u64 = 100;
#[derive(Clone, Debug)]
pub struct TpuClientConfig {
/// The range of upcoming slots to include when determining which
/// leaders to send transactions to (min: 1, max: 100)
/// leaders to send transactions to (min: 1, max: `MAX_FANOUT_SLOTS`)
pub fanout_slots: u64,
}

Expand All @@ -63,13 +63,14 @@ pub struct TpuClient {
}

impl TpuClient {
/// Serializes and sends a transaction to the current leader's TPU port
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
pub fn send_transaction(&self, transaction: &Transaction) -> bool {
let wire_transaction = serialize(transaction).expect("serialization should succeed");
self.send_wire_transaction(&wire_transaction)
}

/// Sends a transaction to the current leader's TPU port
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
pub fn send_wire_transaction(&self, wire_transaction: &[u8]) -> bool {
let mut sent = false;
for tpu_address in self
Expand Down Expand Up @@ -119,14 +120,14 @@ struct LeaderTpuCache {
}

impl LeaderTpuCache {
fn new(rpc_client: &RpcClient, first_slot: Slot) -> Self {
let leaders = Self::fetch_slot_leaders(rpc_client, first_slot).unwrap_or_default();
let leader_tpu_map = Self::fetch_cluster_tpu_sockets(rpc_client).unwrap_or_default();
Self {
fn new(rpc_client: &RpcClient, first_slot: Slot) -> Result<Self> {
let leaders = Self::fetch_slot_leaders(rpc_client, first_slot)?;
let leader_tpu_map = Self::fetch_cluster_tpu_sockets(rpc_client)?;
Ok(Self {
first_slot,
leaders,
leader_tpu_map,
}
})
}

// Last slot that has a cached leader pubkey
Expand All @@ -144,7 +145,13 @@ impl LeaderTpuCache {
if leader_set.insert(*leader) {
leader_sockets.push(*tpu_socket);
}
} else {
// The leader is probably delinquent
trace!("TPU not available for leader {}", leader);
}
} else {
// Overran the local leader schedule cache
warn!("Leader not known for slot {}", leader_slot);
}
}
leader_sockets
Expand Down Expand Up @@ -245,7 +252,7 @@ impl LeaderTpuService {
let start_slot = rpc_client.get_max_shred_insert_slot()?;

let recent_slots = RecentLeaderSlots::new(start_slot);
let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(&rpc_client, start_slot)));
let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(&rpc_client, start_slot)?));

let subscription = if !websocket_url.is_empty() {
let recent_slots = recent_slots.clone();
Expand Down Expand Up @@ -317,42 +324,48 @@ impl LeaderTpuService {
break;
}

// Sleep a few slots before checking if leader cache needs to be refreshed again
std::thread::sleep(Duration::from_millis(sleep_ms));
sleep_ms = 1000;

// Refresh cluster TPU ports every 5min in case validators restart with new port configuration
// or new validators come online
if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) {
if let Ok(leader_tpu_map) = LeaderTpuCache::fetch_cluster_tpu_sockets(&rpc_client) {
leader_tpu_cache.write().unwrap().leader_tpu_map = leader_tpu_map;
last_cluster_refresh = Instant::now();
} else {
sleep_ms = 100;
continue;
match LeaderTpuCache::fetch_cluster_tpu_sockets(&rpc_client) {
Ok(leader_tpu_map) => {
leader_tpu_cache.write().unwrap().leader_tpu_map = leader_tpu_map;
last_cluster_refresh = Instant::now();
}
Err(err) => {
warn!("Failed to fetch cluster tpu sockets: {}", err);
sleep_ms = 100;
}
}
}

// Sleep a few slots before checking if leader cache needs to be refreshed again
std::thread::sleep(Duration::from_millis(sleep_ms));

let current_slot = recent_slots.estimated_current_slot();
if current_slot
let estimated_current_slot = recent_slots.estimated_current_slot();
if estimated_current_slot
>= leader_tpu_cache
.read()
.unwrap()
.last_slot()
.saturating_sub(MAX_FANOUT_SLOTS)
{
if let Ok(slot_leaders) =
LeaderTpuCache::fetch_slot_leaders(&rpc_client, current_slot)
{
let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
leader_tpu_cache.first_slot = current_slot;
leader_tpu_cache.leaders = slot_leaders;
} else {
sleep_ms = 100;
continue;
match LeaderTpuCache::fetch_slot_leaders(&rpc_client, estimated_current_slot) {
Ok(slot_leaders) => {
let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
leader_tpu_cache.first_slot = estimated_current_slot;
leader_tpu_cache.leaders = slot_leaders;
}
Err(err) => {
warn!(
"Failed to fetch slot leaders (current estimated slot: {}): {}",
estimated_current_slot, err
);
sleep_ms = 100;
}
}
}

sleep_ms = 1000;
}
}
}
Expand Down

0 comments on commit 007fb3a

Please sign in to comment.