Skip to content

Commit

Permalink
removes erroneous uses of Arc<...> from retransmit stage
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Aug 17, 2021
1 parent 8198a7e commit 6e41333
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 21 deletions.
2 changes: 1 addition & 1 deletion core/benches/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
let retransmitter_handles = retransmitter(
Arc::new(sockets),
bank_forks,
&leader_schedule_cache,
leader_schedule_cache,
cluster_info,
packet_receiver,
Arc::default(), // solana_rpc::max_slots::MaxSlots
Expand Down
32 changes: 16 additions & 16 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ fn retransmit(
shreds_received: &Mutex<ShredFilterAndHasher>,
max_slots: &MaxSlots,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
rpc_subscriptions: &Option<Arc<RpcSubscriptions>>,
rpc_subscriptions: Option<&RpcSubscriptions>,
) -> Result<()> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
let r_lock = r.lock().unwrap();
Expand Down Expand Up @@ -380,7 +380,7 @@ fn retransmit(
pub fn retransmitter(
sockets: Arc<Vec<UdpSocket>>,
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
cluster_info: Arc<ClusterInfo>,
r: Arc<Mutex<PacketReceiver>>,
max_slots: Arc<MaxSlots>,
Expand Down Expand Up @@ -430,7 +430,7 @@ pub fn retransmitter(
&shreds_received,
&max_slots,
&first_shreds_received,
&rpc_subscriptions,
rpc_subscriptions.as_deref(),
) {
match e {
Error::RecvTimeout(RecvTimeoutError::Disconnected) => break,
Expand All @@ -448,7 +448,7 @@ pub fn retransmitter(
.collect()
}

pub struct RetransmitStage {
pub(crate) struct RetransmitStage {
thread_hdls: Vec<JoinHandle<()>>,
window_service: WindowService,
cluster_slots_service: ClusterSlotsService,
Expand All @@ -457,15 +457,15 @@ pub struct RetransmitStage {
impl RetransmitStage {
#[allow(clippy::new_ret_no_self)]
#[allow(clippy::too_many_arguments)]
pub fn new(
pub(crate) fn new(
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
blockstore: Arc<Blockstore>,
cluster_info: &Arc<ClusterInfo>,
cluster_info: Arc<ClusterInfo>,
retransmit_sockets: Arc<Vec<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
verified_receiver: Receiver<Vec<Packets>>,
exit: &Arc<AtomicBool>,
exit: Arc<AtomicBool>,
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
epoch_schedule: EpochSchedule,
cfg: Option<Arc<AtomicBool>>,
Expand All @@ -475,7 +475,7 @@ impl RetransmitStage {
verified_vote_receiver: VerifiedVoteReceiver,
repair_validators: Option<HashSet<Pubkey>>,
completed_data_sets_sender: CompletedDataSetsSender,
max_slots: &Arc<MaxSlots>,
max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
duplicate_slots_sender: Sender<Slot>,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
Expand All @@ -486,10 +486,10 @@ impl RetransmitStage {
let thread_hdls = retransmitter(
retransmit_sockets,
bank_forks.clone(),
leader_schedule_cache,
leader_schedule_cache.clone(),
cluster_info.clone(),
retransmit_receiver,
Arc::clone(max_slots),
max_slots,
rpc_subscriptions,
);

Expand All @@ -508,17 +508,17 @@ impl RetransmitStage {
epoch_schedule,
duplicate_slots_reset_sender,
repair_validators,
cluster_info: cluster_info.clone(),
cluster_info,
cluster_slots,
};
let window_service = WindowService::new(
blockstore,
verified_receiver,
retransmit_sender,
repair_socket,
exit.clone(),
exit,
repair_info,
leader_schedule_cache.clone(),
leader_schedule_cache,
move |id, shred, working_bank, last_root| {
let is_connected = cfg
.as_ref()
Expand Down Expand Up @@ -547,7 +547,7 @@ impl RetransmitStage {
}
}

pub fn join(self) -> thread::Result<()> {
pub(crate) fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Expand Down Expand Up @@ -620,7 +620,7 @@ mod tests {
let _t_retransmit = retransmitter(
retransmit_socket,
bank_forks,
&leader_schedule_cache,
leader_schedule_cache,
cluster_info,
Arc::new(Mutex::new(retransmit_receiver)),
Arc::default(), // MaxSlots
Expand Down
8 changes: 4 additions & 4 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,13 @@ impl Tvu {
unbounded();
let retransmit_stage = RetransmitStage::new(
bank_forks.clone(),
leader_schedule_cache,
leader_schedule_cache.clone(),
blockstore.clone(),
cluster_info,
cluster_info.clone(),
Arc::new(retransmit_sockets),
repair_socket,
verified_receiver,
exit,
exit.clone(),
cluster_slots_update_receiver,
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
cfg,
Expand All @@ -191,7 +191,7 @@ impl Tvu {
verified_vote_receiver,
tvu_config.repair_validators,
completed_data_sets_sender,
max_slots,
max_slots.clone(),
Some(rpc_subscriptions.clone()),
duplicate_slots_sender,
ancestor_hashes_replay_update_receiver,
Expand Down

0 comments on commit 6e41333

Please sign in to comment.