Skip to content

Commit

Permalink
Remove sending witness-ack message to self and forward message to sel…
Browse files Browse the repository at this point in the history
…f+chunk producer (near#11494)

Related issues: near#11491, near#11487
State witness is distributed to chunk validators in two phases:
Phase 1) Chunk producer sends one partial state witness to each of the
other validators.
Phase 2) Each validator forwards the partial state witness it receives
to the all validators.

Note that, the chunk producer immediately sends chunk-endorsement to the
block producer, so it does not need to receive and validate witnesses.
Given that, this change includes the following change:
1) [Phase 1] Make the chunk producer NOT send partial state witness to
itself.
2) [Phase 2] Make the chunk validator NOT forward partial state to (a)
itself and (b) the chunk producer.
3) Do not send witness ack message to the same originating chunk
producer (since ack is mainly used for network roundtrip measurements).

The only functional change here is (2-b) and sending a message to self
is no-op, since the network stack drops messages routed to the same
validator. However, first, I think explicitly not sending messages makes
it clearer to understand what messages we send and to where. Also, it
will not create errors or confusion, if, in the future, we decide to not
drop such messages at the network layer but actually process them.
Second, test-loop gives warnings when there is a message to self, when
simulating the network stack. I am changing the warnings to assertions
to keep this as invariant.

**Update:** Change (1) also eliminates the following warning that we see
in the logs: `Received duplicate or redundant partial state witness
part.` This would only happen (in contrary to what is said above) if the
network layer does not actually drop the message to self?
  • Loading branch information
tayfunelmas authored Jun 10, 2024
1 parent 9a72275 commit fe5a71c
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 115 deletions.
19 changes: 19 additions & 0 deletions chain/client/src/stateless_validation/chunk_validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use near_chain::{Block, Chain};
use near_chain_primitives::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_network::types::{NetworkRequests, PeerManagerMessageRequest};
use near_o11y::log_assert;
use near_primitives::sharding::ShardChunkHeader;
use near_primitives::stateless_validation::{
ChunkEndorsement, ChunkStateWitness, ChunkStateWitnessAck, ChunkStateWitnessSize,
Expand Down Expand Up @@ -282,6 +283,24 @@ impl Client {
}

fn send_state_witness_ack(&self, witness: &ChunkStateWitness) {
// Chunk producers should not receive state witness from themselves.
log_assert!(
self.validator_signer.is_some(),
"Received a chunk state witness but this is not a validator node. Witness={:?}",
witness
);
// In production PartialWitnessActor does not forward a state witness to the chunk producer that
// produced the witness. However some tests bypass PartialWitnessActor, thus when a chunk producer
// receives its own state witness, we log a warning instead of panicking.
// TODO: Make sure all tests run with "test_features" and panic for non-test builds.
if self.validator_signer.as_ref().unwrap().validator_id() == &witness.chunk_producer {
tracing::warn!(
"Validator {:?} received state witness from itself. Witness={:?}",
self.validator_signer.as_ref().unwrap().validator_id(),
witness
);
return;
}
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkStateWitnessAck(
witness.chunk_producer.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,33 +127,15 @@ impl PartialWitnessActor {
) -> Result<(), Error> {
let DistributeStateWitnessRequest { epoch_id, chunk_header, state_witness } = msg;

let chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(
&epoch_id,
chunk_header.shard_id(),
chunk_header.height_created(),
)?
.ordered_chunk_validators();

tracing::debug!(
target: "client",
chunk_hash=?chunk_header.chunk_hash(),
?chunk_validators,
"distribute_chunk_state_witness",
);

let witness_bytes = compress_witness(&state_witness)?;

// Record the witness in order to match the incoming acks for measuring round-trip times.
// See process_chunk_state_witness_ack for the handling of the ack messages.
self.state_witness_tracker.record_witness_sent(
&state_witness,
witness_bytes.size_bytes(),
chunk_validators.len(),
);

self.send_state_witness_parts(epoch_id, chunk_header, witness_bytes, chunk_validators)?;
self.send_state_witness_parts(epoch_id, chunk_header, witness_bytes)?;

Ok(())
}
Expand All @@ -164,13 +146,28 @@ impl PartialWitnessActor {
epoch_id: EpochId,
chunk_header: ShardChunkHeader,
witness_bytes: EncodedChunkStateWitness,
chunk_validators: Vec<AccountId>,
) -> Vec<(AccountId, PartialEncodedStateWitness)> {
) -> Result<Vec<(AccountId, PartialEncodedStateWitness)>, Error> {
let chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(
&epoch_id,
chunk_header.shard_id(),
chunk_header.height_created(),
)?
.ordered_chunk_validators();

tracing::debug!(
target: "client",
chunk_hash=?chunk_header.chunk_hash(),
?chunk_validators,
"generate_state_witness_parts",
);

// Break the state witness into parts using Reed Solomon encoding.
let encoder = self.encoders.entry(chunk_validators.len());
let (parts, encoded_length) = encoder.encode(&witness_bytes);

chunk_validators
Ok(chunk_validators
.iter()
.zip_eq(parts)
.enumerate()
Expand All @@ -187,7 +184,7 @@ impl PartialWitnessActor {
);
(chunk_validator.clone(), partial_witness)
})
.collect_vec()
.collect_vec())
}

// Break the state witness into parts and send each part to the corresponding chunk validator owner.
Expand All @@ -198,49 +195,80 @@ impl PartialWitnessActor {
epoch_id: EpochId,
chunk_header: ShardChunkHeader,
witness_bytes: EncodedChunkStateWitness,
chunk_validators: Vec<AccountId>,
) -> Result<(), Error> {
// Capture these values first, as the sources are consumed before calling record_witness_sent.
let chunk_hash = chunk_header.chunk_hash();
let witness_size_in_bytes = witness_bytes.size_bytes();

// Record time taken to encode the state witness parts.
let shard_id_label = chunk_header.shard_id().to_string();
let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_TIME
.with_label_values(&[shard_id_label.as_str()])
.start_timer();
let validator_witness_tuple = self.generate_state_witness_parts(
epoch_id,
chunk_header,
witness_bytes,
chunk_validators.clone(),
);
let mut validator_witness_tuple =
self.generate_state_witness_parts(epoch_id, chunk_header, witness_bytes)?;
encode_timer.observe_duration();

// Since we can't send network message to ourselves, we need to send the PartialEncodedStateWitnessForward
// message for our part.
if let Some((_, partial_witness)) = validator_witness_tuple
if let Some(index) = validator_witness_tuple
.iter()
.find(|(validator, _)| validator == self.my_signer.validator_id())
.position(|(validator, _)| validator == self.my_signer.validator_id())
{
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitnessForward(
chunk_validators,
partial_witness.clone(),
),
));
// This also removes this validator from the list, since we do not need to send our own witness part to self.
let (_, partial_witness) = validator_witness_tuple.swap_remove(index);
self.forward_state_witness_part(partial_witness)?;
}

// Record the witness in order to match the incoming acks for measuring round-trip times.
// See process_chunk_state_witness_ack for the handling of the ack messages.
self.state_witness_tracker.record_witness_sent(
chunk_hash,
witness_size_in_bytes,
validator_witness_tuple.len(),
);

// Send the parts to the corresponding chunk validator owners.
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple),
));
Ok(())
}

/// Handles the state witness ack message from the chunk validator.
/// It computes the round-trip time between sending the state witness and receiving
/// the ack message and updates the corresponding metric with it.
/// Currently we do not raise an error for handling of witness-ack messages,
/// as it is used only for tracking some networking metrics.
pub fn handle_chunk_state_witness_ack(&mut self, witness_ack: ChunkStateWitnessAck) {
self.state_witness_tracker.on_witness_ack_received(witness_ack);
/// Sends the witness part to the chunk validators, except for the following:
/// 1) The current validator, 2) Chunk producer that originally generated the witness part.
fn forward_state_witness_part(
&self,
partial_witness: PartialEncodedStateWitness,
) -> Result<(), Error> {
let chunk_producer = self.epoch_manager.get_chunk_producer(
partial_witness.epoch_id(),
partial_witness.height_created(),
partial_witness.shard_id(),
)?;
let ordered_chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(
partial_witness.epoch_id(),
partial_witness.shard_id(),
partial_witness.height_created(),
)?
.ordered_chunk_validators();
// Forward witness part to chunk validators except for the following:
// (1) the current validator and (2) validator that produced the chunk and witness.
let target_chunk_validators = ordered_chunk_validators
.into_iter()
.filter(|validator| {
validator != self.my_signer.validator_id() && *validator != chunk_producer
})
.collect();
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitnessForward(
target_chunk_validators,
partial_witness,
),
));
Ok(())
}

/// Function to handle receiving partial_encoded_state_witness message from chunk producer.
Expand All @@ -258,18 +286,7 @@ impl PartialWitnessActor {
.store_partial_encoded_state_witness(partial_witness.clone())?;

// Forward the part to all the chunk validators.
let chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(
partial_witness.epoch_id(),
partial_witness.shard_id(),
partial_witness.height_created(),
)?
.ordered_chunk_validators();

self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitnessForward(chunk_validators, partial_witness),
));
self.forward_state_witness_part(partial_witness)?;

Ok(())
}
Expand Down Expand Up @@ -397,6 +414,15 @@ impl PartialWitnessActor {

Ok(())
}

/// Handles the state witness ack message from the chunk validator.
/// It computes the round-trip time between sending the state witness and receiving
/// the ack message and updates the corresponding metric with it.
/// Currently we do not raise an error for handling of witness-ack messages,
/// as it is used only for tracking some networking metrics.
pub fn handle_chunk_state_witness_ack(&mut self, witness_ack: ChunkStateWitnessAck) {
self.state_witness_tracker.on_witness_ack_received(witness_ack);
}
}

fn compress_witness(witness: &ChunkStateWitness) -> Result<EncodedChunkStateWitness, Error> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use near_async::time::Instant;
use near_chain::chain::ChunkStateWitnessMessage;
use near_chain::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_o11y::log_assert_fail;
use near_primitives::stateless_validation::{
ChunkProductionKey, ChunkStateWitness, ChunkStateWitnessSize, EncodedChunkStateWitness,
PartialEncodedStateWitness,
Expand Down Expand Up @@ -63,13 +64,7 @@ impl CacheEntry {

// Check if the part is already present.
if self.parts[part_ord].is_some() {
tracing::warn!(
target: "client",
?shard_id,
?height_created,
?part_ord,
"Received duplicate or redundant partial state witness part."
);
log_assert_fail!("Received duplicate or redundant partial state witness part. shard_id={shard_id:?}, height_created={height_created:?}, part_ord={part_ord:?}");
return None;
}

Expand Down
19 changes: 10 additions & 9 deletions chain/client/src/stateless_validation/state_witness_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytesize::ByteSize;
use lru::LruCache;
use near_async::time::Clock;
use near_primitives::sharding::ChunkHash;
use near_primitives::stateless_validation::{ChunkStateWitness, ChunkStateWitnessAck};
use near_primitives::stateless_validation::ChunkStateWitnessAck;
use s3::creds::time::ext::InstantExt as _;
use std::hash::Hash;

Expand All @@ -24,8 +24,8 @@ struct ChunkStateWitnessKey {
}

impl ChunkStateWitnessKey {
pub fn new(witness: &ChunkStateWitness) -> Self {
Self { chunk_hash: witness.chunk_header.chunk_hash() }
pub fn new(chunk_hash: ChunkHash) -> Self {
Self { chunk_hash }
}
}

Expand Down Expand Up @@ -57,11 +57,11 @@ impl ChunkStateWitnessTracker {
/// Adds a new witness message to track.
pub fn record_witness_sent(
&mut self,
witness: &ChunkStateWitness,
chunk_hash: ChunkHash,
witness_size_in_bytes: usize,
num_validators: usize,
) -> () {
let key = ChunkStateWitnessKey::new(witness);
let key = ChunkStateWitnessKey::new(chunk_hash);
tracing::trace!(target: "state_witness_tracker", witness_key=?key,
size=witness_size_in_bytes, "Recording state witness sent.");
self.witnesses.put(
Expand Down Expand Up @@ -111,9 +111,9 @@ impl ChunkStateWitnessTracker {
#[cfg(test)]
fn get_record_for_witness(
&mut self,
witness: &ChunkStateWitness,
witness: &near_primitives::stateless_validation::ChunkStateWitness,
) -> Option<&ChunkStateWitnessRecord> {
let key = ChunkStateWitnessKey::new(witness);
let key = ChunkStateWitnessKey::new(witness.chunk_header.chunk_hash());
self.witnesses.get(&key)
}
}
Expand Down Expand Up @@ -147,6 +147,7 @@ mod state_witness_tracker_tests {
use super::*;
use near_async::time::{Duration, FakeClock, Utc};
use near_primitives::hash::hash;
use near_primitives::stateless_validation::ChunkStateWitness;
use near_primitives::types::ShardId;

const NUM_VALIDATORS: usize = 3;
Expand All @@ -157,7 +158,7 @@ mod state_witness_tracker_tests {
let clock = dummy_clock();
let mut tracker = ChunkStateWitnessTracker::new(clock.clock());

tracker.record_witness_sent(&witness, 4321, NUM_VALIDATORS);
tracker.record_witness_sent(witness.chunk_header.compute_hash(), 4321, NUM_VALIDATORS);
clock.advance(Duration::milliseconds(3444));

// Ack received from all "except for one".
Expand All @@ -176,7 +177,7 @@ mod state_witness_tracker_tests {
let clock = dummy_clock();
let mut tracker = ChunkStateWitnessTracker::new(clock.clock());

tracker.record_witness_sent(&witness, 4321, NUM_VALIDATORS);
tracker.record_witness_sent(witness.chunk_header.compute_hash(), 4321, NUM_VALIDATORS);
clock.advance(Duration::milliseconds(3444));

// Ack received from all.
Expand Down
Loading

0 comments on commit fe5a71c

Please sign in to comment.