Skip to content

Commit

Permalink
sync: only restart peers not doing finality related requests (parityt…
Browse files Browse the repository at this point in the history
…ech#7322)

* sync: only restart peers not doing finality related requests

* sync: add test for sync restart

* sync: add better docs to restart method
  • Loading branch information
andresilva authored Oct 16, 2020
1 parent cefcd99 commit bb79f65
Showing 1 changed file with 122 additions and 9 deletions.
131 changes: 122 additions & 9 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1510,20 +1510,38 @@ impl<B: BlockT> ChainSync<B> {
self.pending_requests.set_all();
}

/// Restart the sync process.
fn restart<'a>(&'a mut self) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a {
/// Restart the sync process. This will reset all pending block requests and return an iterator
/// of new block requests to make to peers. Peers that were downloading finality data (i.e.
/// their state was `DownloadingJustification` or `DownloadingFinalityProof`) are unaffected and
/// will stay in the same state.
fn restart<'a>(
&'a mut self,
) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a {
self.blocks.clear();
let info = self.client.info();
self.best_queued_hash = info.best_hash;
self.best_queued_number = std::cmp::max(info.best_number, self.best_imported_number);
self.pending_requests.set_all();
debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash);
let old_peers = std::mem::take(&mut self.peers);

old_peers.into_iter().filter_map(move |(id, p)| {
// peers that were downloading justifications or finality proofs
// should be kept in that state.
match p.state {
PeerSyncState::DownloadingJustification(_)
| PeerSyncState::DownloadingFinalityProof(_) => {
self.peers.insert(id, p);
return None;
}
_ => {}
}

// handle peers that were in other states.
match self.new_peer(id.clone(), p.best_hash, p.best_number) {
Ok(None) => None,
Ok(Some(x)) => Some(Ok((id, x))),
Err(e) => Some(Err(e))
Err(e) => Some(Err(e)),
}
})
}
Expand Down Expand Up @@ -1792,15 +1810,15 @@ fn validate_blocks<Block: BlockT>(blocks: &Vec<message::BlockData<Block>>, who:

#[cfg(test)]
mod test {
use super::*;
use super::message::FromBlock;
use substrate_test_runtime_client::{
runtime::Block,
DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
};
use sp_blockchain::HeaderBackend;
use super::*;
use sc_block_builder::BlockBuilderProvider;
use sp_blockchain::HeaderBackend;
use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
use substrate_test_runtime_client::{
runtime::{Block, Hash},
ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
};

#[test]
fn processes_empty_response_on_justification_request_for_unknown_block() {
Expand Down Expand Up @@ -1879,4 +1897,99 @@ mod test {
})
);
}

#[test]
fn restart_doesnt_affect_peers_downloading_finality_data() {
let mut client = Arc::new(TestClientBuilder::new().build());
let info = client.info();

let mut sync = ChainSync::new(
Roles::AUTHORITY,
client.clone(),
&info,
None,
Box::new(DefaultBlockAnnounceValidator),
1,
);

let peer_id1 = PeerId::random();
let peer_id2 = PeerId::random();
let peer_id3 = PeerId::random();
let peer_id4 = PeerId::random();

let mut new_blocks = |n| {
for _ in 0..n {
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block.clone()).unwrap();
}

let info = client.info();
(info.best_hash, info.best_number)
};

let (b1_hash, b1_number) = new_blocks(50);
let (b2_hash, b2_number) = new_blocks(10);

// add 2 peers at blocks that we don't have locally
sync.new_peer(peer_id1.clone(), Hash::random(), 42).unwrap();
sync.new_peer(peer_id2.clone(), Hash::random(), 10).unwrap();

// we wil send block requests to these peers
// for these blocks we don't know about
assert!(sync.block_requests().all(|(p, _)| { *p == peer_id1 || *p == peer_id2 }));

// add a new peer at a known block
sync.new_peer(peer_id3.clone(), b1_hash, b1_number).unwrap();

// we request a justification for a block we have locally
sync.request_justification(&b1_hash, b1_number);

// the justification request should be scheduled to the
// new peer which is at the given block
assert!(sync.justification_requests().any(|(p, r)| {
p == peer_id3
&& r.fields == BlockAttributes::JUSTIFICATION
&& r.from == message::FromBlock::Hash(b1_hash)
&& r.to == None
}));

assert_eq!(
sync.peers.get(&peer_id3).unwrap().state,
PeerSyncState::DownloadingJustification(b1_hash),
);

// add another peer at a known later block
sync.new_peer(peer_id4.clone(), b2_hash, b2_number).unwrap();

// we request a finality proof for a block we have locally
sync.request_finality_proof(&b2_hash, b2_number);

// the finality proof request should be scheduled to peer 4
// which is at that block
assert!(
sync.finality_proof_requests().any(|(p, r)| { p == peer_id4 && r.block == b2_hash })
);

assert_eq!(
sync.peers.get(&peer_id4).unwrap().state,
PeerSyncState::DownloadingFinalityProof(b2_hash),
);

// we restart the sync state
let block_requests = sync.restart();

// which should make us send out block requests to the first two peers
assert!(block_requests.map(|r| r.unwrap()).all(|(p, _)| { p == peer_id1 || p == peer_id2 }));

// peer 3 and 4 should be unaffected as they were downloading finality data
assert_eq!(
sync.peers.get(&peer_id3).unwrap().state,
PeerSyncState::DownloadingJustification(b1_hash),
);

assert_eq!(
sync.peers.get(&peer_id4).unwrap().state,
PeerSyncState::DownloadingFinalityProof(b2_hash),
);
}
}

0 comments on commit bb79f65

Please sign in to comment.