Skip to content

Commit

Permalink
More tests for new request based statement distribution (paritytech#2875
Browse files Browse the repository at this point in the history
)

* More test coverage.

* Preserve peer order.

* Better test coverage.

* Even more test coverage.

* Add doc comment to `IndexMap`.

* Fix flaky test.

* Review remarks.

* Review remarks.
  • Loading branch information
eskimor authored Apr 12, 2021
1 parent 76bfd9f commit 31acae0
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 21 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions node/network/statement-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures-timer = "3.0.2"
225 changes: 204 additions & 21 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use polkadot_node_network_protocol::{

use futures::{channel::mpsc, future::RemoteHandle, prelude::*};
use futures::channel::oneshot;
use indexmap::IndexSet;
use indexmap::{IndexSet, IndexMap, map::Entry as IEntry};

use std::collections::{HashMap, HashSet, hash_map::Entry};

Expand Down Expand Up @@ -508,7 +508,10 @@ enum LargeStatementStatus {
struct FetchingInfo {
/// All peers that send us a `LargeStatement` or a `Valid` statement for the given
/// `CandidateHash`, together with their originally sent messages.
available_peers: HashMap<PeerId, Vec<protocol_v1::StatementDistributionMessage>>,
///
/// We use an `IndexMap` here to preserve the ordering of peers sending us messages. This is
/// desirable because we reward first sending peers with reputation.
available_peers: IndexMap<PeerId, Vec<protocol_v1::StatementDistributionMessage>>,
/// Peers left to try in case the background task needs it.
peers_to_try: Vec<PeerId>,
/// Sender for sending fresh peers to the fetching task in case of failure.
Expand Down Expand Up @@ -1057,22 +1060,24 @@ async fn retrieve_statement_from_message<'a>(
match occupied.get_mut() {
LargeStatementStatus::Fetching(info) => {

let is_new_peer = !info.available_peers.contains_key(&peer);
let is_large_statement = message.is_large_statement();

match info.available_peers.entry(peer) {
Entry::Occupied(mut occupied) => {
occupied.get_mut().push(message);
}
Entry::Vacant(vacant) => {
vacant.insert(vec![message]);
}
}
let is_new_peer =
match info.available_peers.entry(peer) {
IEntry::Occupied(mut occupied) => {
occupied.get_mut().push(message);
false
}
IEntry::Vacant(vacant) => {
vacant.insert(vec![message]);
true
}
};

if is_new_peer & is_large_statement {
info.peers_to_try.push(peer);
// Answer any pending request for more peers:
if let Some(sender) = std::mem::take(&mut info.peer_sender) {
if let Some(sender) = info.peer_sender.take() {
let to_send = std::mem::take(&mut info.peers_to_try);
if let Err(peers) = sender.send(to_send) {
// Requester no longer interested for now, might want them
Expand Down Expand Up @@ -1181,7 +1186,7 @@ async fn launch_request(
return None
}
let available_peers = {
let mut m = HashMap::new();
let mut m = IndexMap::new();
m.insert(peer, vec![protocol_v1::StatementDistributionMessage::LargeStatement(meta)]);
m
};
Expand Down Expand Up @@ -1950,6 +1955,7 @@ impl metrics::Metrics for Metrics {

#[cfg(test)]
mod tests {
use std::time::Duration;
use parity_scale_codec::{Decode, Encode};
use super::*;
use std::sync::Arc;
Expand All @@ -1959,9 +1965,10 @@ mod tests {
use polkadot_primitives::v1::{CommittedCandidateReceipt, ValidationCode};
use assert_matches::assert_matches;
use futures::executor::{self, block_on};
use futures_timer::Delay;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
use sc_keystore::LocalKeystore;
use polkadot_node_network_protocol::{view, ObservedRole};
use polkadot_node_network_protocol::{view, ObservedRole, request_response::Recipient};
use polkadot_subsystem::{jaeger, ActivatedLeaf};
use polkadot_node_network_protocol::request_response::{
Requests,
Expand Down Expand Up @@ -2699,6 +2706,7 @@ mod tests {

#[test]
fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing() {
sp_tracing::try_init_simple();
let hash_a = Hash::repeat_byte(1);
let hash_b = Hash::repeat_byte(2);

Expand All @@ -2712,6 +2720,8 @@ mod tests {

let peer_a = PeerId::random();
let peer_b = PeerId::random();
let peer_c = PeerId::random();
let peer_bad = PeerId::random();

let validators = vec![
Sr25519Keyring::Alice.public().into(),
Expand Down Expand Up @@ -2780,6 +2790,16 @@ mod tests {
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_c.clone(), ObservedRole::Full)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full)
)
}).await;

handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
Expand All @@ -2792,8 +2812,19 @@ mod tests {
NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a])
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_c.clone(), view![hash_a])
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_bad.clone(), view![hash_a])
)
}).await;

// receive a seconded statement from peer A. it should be propagated onwards to peer B and to
// receive a seconded statement from peer A, which does not provide the request data,
// then get that data from peer C. It should be propagated onwards to peer B and to
// candidate backing.
let statement = {
let signing_context = SigningContext {
Expand All @@ -2815,7 +2846,7 @@ mod tests {
).await.ok().flatten().expect("should be signed")
};

let metadata =
let metadata =
protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone()).get_metadata();

handle.send(FromOverseer::Communication {
Expand All @@ -2842,6 +2873,150 @@ mod tests {
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
// Just drop request - should trigger error.
}
);

// There is a race between request handler asking for more peers and processing of the
// coming `PeerMessage`s, we want the request handler to ask first here for better test
// coverage:
Delay::new(Duration::from_millis(20)).await;

handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_c.clone(),
protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()),
)
)
}).await;

// Malicious peer:
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_bad.clone(),
protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()),
)
)
}).await;

// Let c fail once too:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_c));
}
);

// a fails again:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
// On retry, we should have reverse order:
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
}
);

// Send invalid response (all other peers have been tried now):
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_bad));
let bad_candidate = {
let mut bad = candidate.clone();
bad.descriptor.para_id = 0xeadbeaf.into();
bad
};
let response = StatementFetchingResponse::Statement(bad_candidate);
outgoing.pending_response.send(Ok(response.encode())).unwrap();
}
);

// Should get punished and never tried again:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_bad && r == COST_WRONG_HASH => {}
);

// a is tried again (retried in reverse order):
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
// On retry, we should have reverse order:
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
}
);

// c succeeds now:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
// On retry, we should have reverse order:
assert_eq!(outgoing.peer, Recipient::Peer(peer_c));
let response = StatementFetchingResponse::Statement(candidate.clone());
outgoing.pending_response.send(Ok(response.encode())).unwrap();
}
Expand All @@ -2851,7 +3026,14 @@ mod tests {
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_a && r == BENEFIT_VALID_RESPONSE => {}
) if p == peer_a && r == COST_FETCH_FAIL => {}
);

assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_c && r == BENEFIT_VALID_RESPONSE => {}
);

assert_matches!(
Expand All @@ -2869,17 +3051,18 @@ mod tests {
);


// Now messages should go out:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
recipients,
mut recipients,
protocol_v1::ValidationProtocol::StatementDistribution(
protocol_v1::StatementDistributionMessage::LargeStatement(meta)
),
)
) => {
assert_eq!(recipients, vec![peer_b.clone()]);
assert_eq!(recipients.sort(), vec![peer_b.clone(), peer_c.clone()].sort());
assert_eq!(meta.relay_parent, hash_a);
assert_eq!(meta.candidate_hash, statement.payload().candidate_hash());
assert_eq!(meta.signed_by, statement.validator_index());
Expand All @@ -2889,7 +3072,7 @@ mod tests {

// Now that it has the candidate it should answer requests accordingly (even after a
// failed request):

// Failing request first:
let (pending_response, response_rx) = oneshot::channel();
let inner_req = StatementFetchingRequest {
Expand All @@ -2906,7 +3089,7 @@ mod tests {
response_rx.await.unwrap().result,
Err(()) => {}
);

// Now the working one:
let (pending_response, response_rx) = oneshot::channel();
let inner_req = StatementFetchingRequest {
Expand Down
5 changes: 5 additions & 0 deletions node/network/statement-distribution/src/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ pub async fn fetch(
// All our peers failed us - try getting new ones before trying again:
match try_get_new_peers(relay_parent, candidate_hash, &mut sender, &span).await {
Ok(Some(mut peers)) => {
tracing::trace!(
target: LOG_TARGET,
?peers,
"Received new peers."
);
// New arrivals will be tried first:
new_peers.append(&mut peers);
}
Expand Down

0 comments on commit 31acae0

Please sign in to comment.