Skip to content

Commit

Permalink
Give streamer::receiver() threads unique names (#35369)
Browse files Browse the repository at this point in the history
The name was previously hard-coded to solReceiver. The use of the same
name makes it hard to figure out which thread is which when these
threads are handling many services (Gossip, Tvu, etc).
  • Loading branch information
steviez authored Mar 1, 2024
1 parent 564a9f7 commit 7d6f1d5
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 6 deletions.
1 change: 1 addition & 0 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ fn main() -> Result<()> {
let (s_reader, r_reader) = unbounded();
read_channels.push(r_reader);
read_threads.push(receiver(
"solRcvrBenStrmr".to_string(),
Arc::new(read),
exit.clone(),
s_reader,
Expand Down
12 changes: 9 additions & 3 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ impl FetchStage {
let tpu_threads: Vec<_> = if tpu_enable_udp {
tpu_sockets
.into_iter()
.map(|socket| {
.enumerate()
.map(|(i, socket)| {
streamer::receiver(
format!("solRcvrTpu{i:02}"),
socket,
exit.clone(),
sender.clone(),
Expand All @@ -180,8 +182,10 @@ impl FetchStage {
let tpu_forwards_threads: Vec<_> = if tpu_enable_udp {
tpu_forwards_sockets
.into_iter()
.map(|socket| {
.enumerate()
.map(|(i, socket)| {
streamer::receiver(
format!("solRcvrTpuFwd{i:02}"),
socket,
exit.clone(),
forward_sender.clone(),
Expand All @@ -200,8 +204,10 @@ impl FetchStage {
let tpu_vote_stats = Arc::new(StreamerReceiveStats::new("tpu_vote_receiver"));
let tpu_vote_threads: Vec<_> = tpu_vote_sockets
.into_iter()
.map(|socket| {
.enumerate()
.map(|(i, socket)| {
streamer::receiver(
format!("solRcvrTpuVot{i:02}"),
socket,
exit.clone(),
vote_sender.clone(),
Expand Down
2 changes: 2 additions & 0 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl AncestorHashesService {
let outstanding_requests = Arc::<RwLock<OutstandingAncestorHashesRepairs>>::default();
let (response_sender, response_receiver) = unbounded();
let t_receiver = streamer::receiver(
"solRcvrAncHash".to_string(),
ancestor_hashes_request_socket.clone(),
exit.clone(),
response_sender.clone(),
Expand Down Expand Up @@ -1294,6 +1295,7 @@ mod test {

// Set up repair request receiver threads
let t_request_receiver = streamer::receiver(
"solRcvrTest".to_string(),
Arc::new(responder_node.sockets.serve_repair),
exit.clone(),
requests_sender,
Expand Down
1 change: 1 addition & 0 deletions core/src/repair/serve_repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl ServeRepairService {
serve_repair_socket.local_addr().unwrap()
);
let t_receiver = streamer::receiver(
"solRcvrServeRep".to_string(),
serve_repair_socket.clone(),
exit.clone(),
request_sender,
Expand Down
9 changes: 7 additions & 2 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl ShredFetchStage {

#[allow(clippy::too_many_arguments)]
fn packet_modifier(
receiver_thread_name: &'static str,
sockets: Vec<Arc<UdpSocket>>,
exit: Arc<AtomicBool>,
sender: Sender<PacketBatch>,
Expand All @@ -161,9 +162,11 @@ impl ShredFetchStage {
let (packet_sender, packet_receiver) = unbounded();
let streamers = sockets
.into_iter()
.map(|s| {
.enumerate()
.map(|(i, socket)| {
streamer::receiver(
s,
format!("{receiver_thread_name}{i:02}"),
socket,
exit.clone(),
packet_sender.clone(),
recycler.clone(),
Expand Down Expand Up @@ -211,6 +214,7 @@ impl ShredFetchStage {
let recycler = PacketBatchRecycler::warmed(100, 1024);

let (mut tvu_threads, tvu_filter) = Self::packet_modifier(
"solRcvrShred",
sockets,
exit.clone(),
sender.clone(),
Expand All @@ -224,6 +228,7 @@ impl ShredFetchStage {
);

let (repair_receiver, repair_handler) = Self::packet_modifier(
"solRcvrShredRep",
vec![repair_socket.clone()],
exit.clone(),
sender.clone(),
Expand Down
1 change: 1 addition & 0 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl GossipService {
);
let socket_addr_space = *cluster_info.socket_addr_space();
let t_receiver = streamer::receiver(
"solRcvrGossip".to_string(),
gossip_socket.clone(),
exit.clone(),
request_sender,
Expand Down
4 changes: 3 additions & 1 deletion streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ fn recv_loop(
}

pub fn receiver(
thread_name: String,
socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
packet_batch_sender: PacketBatchSender,
Expand All @@ -169,7 +170,7 @@ pub fn receiver(
let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
Builder::new()
.name("solReceiver".to_string())
.name(thread_name)
.spawn(move || {
let _ = recv_loop(
&socket,
Expand Down Expand Up @@ -480,6 +481,7 @@ mod test {
let (s_reader, r_reader) = unbounded();
let stats = Arc::new(StreamerReceiveStats::new("test"));
let t_receiver = receiver(
"solRcvrTest".to_string(),
Arc::new(read),
exit.clone(),
s_reader,
Expand Down

0 comments on commit 7d6f1d5

Please sign in to comment.