Skip to content

Commit

Permalink
[narwhal] implement range fetching of missing certificates (MystenLab…
Browse files Browse the repository at this point in the history
…s#5058)

The partially implemented certificate range sync logic is reworked and completed as _"range fetch"_ in this PR. The goal is to improve on the existing recursive causal completion approach in the following areas:
- Improved node resilience by speeding up the certificate catch up process.
- Easier to reason about performance, and easier to instrument.

### High level approach
1. When authority X receives a primary message containing certificate C from authority Y where C is missing parents at X, X will forward C to its certificate waiter.
2. The certificate waiter will record C's round number as authority Y's target round, then  kick off a task to fetch certificates from other authorities. Or it will ignore C if it already knows a higher or equal round number from Y.
3. Each authority has at most one inflight task fetching certificates at a time.
4. For each fetch task, requests to fetch certificates will be sent out to other authorities in random order with 5s intervals. The first response will be processed.
5. Each fetch request contains the **last committed round** of each authority's internal consensus, and the maximum number of certificates to be sent back.
6. Certificates from the fetch response will be sent to the primary core for verification and processing.
7. After a response is processed, another fetch task will be kicked off.
8. At the beginning of a fetch task, it will check if the target rounds have been fully satisfied by the local certificate store. If yes, no fetch requests will be sent out.

### Most significant changes
- `certificate_waiter.rs` contains the fetcher logic of fetching certificates by range.
- `primary.rs` contains the logic of fulfilling a request for fetching a range of certificates.
- `core.rs` contains changes to
  - Avoid starting causal completion on certificates with missing parents in `process_header()`.
  - Remove `process_header()`'s logic to retry requesting parent certificates if they are still missing locally.
  - Remove gc checks from `process_certificate()`, because it is already checked in `sanitize_certificate()`. Remove gc checks from `process_header()` because itis already checked in `sanitize_header()`.
- `header_waiter.rs` is converted from a component performing critical work of retrieving parent certificates, to become best effort only. The load on the system is reduced by removing header waiter initiated retries, and the deadlock when both primary message and header loopback channels are clogged is mitigated by allowing the senders to drop messages. In future, the header's proposer should be responsible for driving the process to obtain votes, instead of relying on the voter's header waiter.

### Less significant changes (but touch more files)
- Add FetchCertificates{Request,Response} and handler, to have request response pattern for fetching certificates.
- Add a more efficient way to read the latest round of an origin in certificate store.
- Remove previous range sync logic that is no longer used.
- Rename some variables representing the same channels to be consistent across files.
  • Loading branch information
mwtian authored Oct 18, 2022
1 parent d863344 commit 27492fe
Show file tree
Hide file tree
Showing 34 changed files with 1,409 additions and 1,112 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 14 additions & 5 deletions narwhal/consensus/src/tests/bullshark_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,29 @@ pub fn make_consensus_store(store_path: &std::path::Path) -> Arc<ConsensusStore>
pub fn make_certificate_store(store_path: &std::path::Path) -> CertificateStore {
const CERTIFICATES_CF: &str = "certificates";
const CERTIFICATE_ID_BY_ROUND_CF: &str = "certificate_id_by_round";
const CERTIFICATE_ID_BY_ORIGIN_CF: &str = "certificate_id_by_origin";

let rocksdb = rocks::open_cf(
store_path,
None,
&[CERTIFICATES_CF, CERTIFICATE_ID_BY_ROUND_CF],
&[
CERTIFICATES_CF,
CERTIFICATE_ID_BY_ROUND_CF,
CERTIFICATE_ID_BY_ORIGIN_CF,
],
)
.expect("Failed creating database");

let (certificate_map, certificate_id_by_round_map) = reopen!(&rocksdb,
let (certificate_map, certificate_id_by_round_map, certificate_id_by_origin_map) = reopen!(&rocksdb,
CERTIFICATES_CF;<CertificateDigest, Certificate>,
CERTIFICATE_ID_BY_ROUND_CF;<(Round, CertificateDigest), u8>
);
CERTIFICATE_ID_BY_ROUND_CF;<(Round, PublicKey), CertificateDigest>,
CERTIFICATE_ID_BY_ORIGIN_CF;<(PublicKey, Round), CertificateDigest>);

CertificateStore::new(certificate_map, certificate_id_by_round_map)
CertificateStore::new(
certificate_map,
certificate_id_by_round_map,
certificate_id_by_origin_map,
)
}

// Run for 4 dag rounds in ideal conditions (all nodes reference all other nodes). We should commit
Expand Down
19 changes: 14 additions & 5 deletions narwhal/consensus/src/tests/tusk_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,29 @@ pub fn make_consensus_store(store_path: &std::path::Path) -> Arc<ConsensusStore>
pub fn make_certificate_store(store_path: &std::path::Path) -> CertificateStore {
const CERTIFICATES_CF: &str = "certificates";
const CERTIFICATE_ID_BY_ROUND_CF: &str = "certificate_id_by_round";
const CERTIFICATE_ID_BY_ORIGIN_CF: &str = "certificate_id_by_origin";

let rocksdb = rocks::open_cf(
store_path,
None,
&[CERTIFICATES_CF, CERTIFICATE_ID_BY_ROUND_CF],
&[
CERTIFICATES_CF,
CERTIFICATE_ID_BY_ROUND_CF,
CERTIFICATE_ID_BY_ORIGIN_CF,
],
)
.expect("Failed creating database");

let (certificate_map, certificate_id_by_round_map) = reopen!(&rocksdb,
let (certificate_map, certificate_id_by_round_map, certificate_id_by_origin_map) = reopen!(&rocksdb,
CERTIFICATES_CF;<CertificateDigest, Certificate>,
CERTIFICATE_ID_BY_ROUND_CF;<(Round, CertificateDigest), u8>
);
CERTIFICATE_ID_BY_ROUND_CF;<(Round, PublicKey), CertificateDigest>,
CERTIFICATE_ID_BY_ORIGIN_CF;<(PublicKey, Round), CertificateDigest>);

CertificateStore::new(certificate_map, certificate_id_by_round_map)
CertificateStore::new(
certificate_map,
certificate_id_by_round_map,
certificate_id_by_origin_map,
)
}

// Run for 4 dag rounds in ideal conditions (all nodes reference all other nodes). We should commit
Expand Down
5 changes: 4 additions & 1 deletion narwhal/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ pub use crate::{
bounded_executor::BoundedExecutor,
p2p::P2pNetwork,
retry::RetryConfig,
traits::{Lucky, LuckyNetwork, PrimaryToWorkerRpc, ReliableNetwork, UnreliableNetwork},
traits::{
Lucky, LuckyNetwork, PrimaryToPrimaryRpc, PrimaryToWorkerRpc, ReliableNetwork,
UnreliableNetwork,
},
};

/// This adapter will make a [`tokio::task::JoinHandle`] abort its handled task when the handle is dropped.
Expand Down
31 changes: 26 additions & 5 deletions narwhal/network/src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::traits::PrimaryToWorkerRpc;
use crate::traits::{PrimaryToPrimaryRpc, PrimaryToWorkerRpc};
use crate::{
traits::{Lucky, ReliableNetwork, UnreliableNetwork},
BoundedExecutor, CancelOnDropHandler, RetryConfig, MAX_TASK_CONCURRENCY,
Expand All @@ -16,10 +16,11 @@ use std::collections::HashMap;
use std::time::Duration;
use tokio::{runtime::Handle, task::JoinHandle};
use types::{
Batch, BatchDigest, PrimaryMessage, PrimaryToPrimaryClient, PrimaryToWorkerClient,
RequestBatchRequest, WorkerBatchRequest, WorkerBatchResponse, WorkerDeleteBatchesMessage,
WorkerMessage, WorkerPrimaryMessage, WorkerReconfigureMessage, WorkerSynchronizeMessage,
WorkerToPrimaryClient, WorkerToWorkerClient,
Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse, PrimaryMessage,
PrimaryToPrimaryClient, PrimaryToWorkerClient, RequestBatchRequest, WorkerBatchRequest,
WorkerBatchResponse, WorkerDeleteBatchesMessage, WorkerMessage, WorkerPrimaryMessage,
WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToPrimaryClient,
WorkerToWorkerClient,
};

fn default_executor() -> BoundedExecutor {
Expand Down Expand Up @@ -199,6 +200,26 @@ impl ReliableNetwork<PrimaryMessage> for P2pNetwork {
}
}

#[async_trait]
impl PrimaryToPrimaryRpc for P2pNetwork {
async fn fetch_certificates(
&self,
peer: &NetworkPublicKey,
request: FetchCertificatesRequest,
) -> Result<FetchCertificatesResponse> {
let peer_id = PeerId(peer.0.to_bytes());
let peer = self
.network
.peer(peer_id)
.ok_or_else(|| format_err!("Network has no connection with peer {peer_id}"))?;
let response = PrimaryToPrimaryClient::new(peer)
.fetch_certificates(request)
.await
.map_err(|e| format_err!("Network error {:?}", e))?;
Ok(response.into_body())
}
}

//
// Primary-to-Worker
//
Expand Down
11 changes: 10 additions & 1 deletion narwhal/network/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use async_trait::async_trait;
use crypto::NetworkPublicKey;
use rand::prelude::{SliceRandom, SmallRng};
use tokio::task::JoinHandle;
use types::{Batch, BatchDigest};
use types::{Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse};

pub trait UnreliableNetwork<Request: Clone + Send + Sync> {
type Response: Clone + Send + Sync;
Expand Down Expand Up @@ -92,6 +92,15 @@ pub trait ReliableNetwork<Request: Clone + Send + Sync> {
}
}

#[async_trait]
pub trait PrimaryToPrimaryRpc {
async fn fetch_certificates(
&self,
peer: &NetworkPublicKey,
request: FetchCertificatesRequest,
) -> Result<FetchCertificatesResponse>;
}

#[async_trait]
pub trait PrimaryToWorkerRpc {
async fn request_batch(
Expand Down
21 changes: 15 additions & 6 deletions narwhal/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use network::P2pNetwork;
use primary::{NetworkModel, PayloadToken, Primary, PrimaryChannelMetrics};
use prometheus::{IntGauge, Registry};
use std::sync::Arc;
use storage::{CertificateStore, CertificateToken, ProposerKey, ProposerStore};
use storage::{CertificateStore, ProposerKey, ProposerStore};
use store::{
reopen,
rocks::{open_cf, DBMap},
Expand Down Expand Up @@ -54,6 +54,7 @@ impl NodeStorage {
const HEADERS_CF: &'static str = "headers";
const CERTIFICATES_CF: &'static str = "certificates";
const CERTIFICATE_ID_BY_ROUND_CF: &'static str = "certificate_id_by_round";
const CERTIFICATE_ID_BY_ORIGIN_CF: &'static str = "certificate_id_by_origin";
const PAYLOAD_CF: &'static str = "payload";
const BATCHES_CF: &'static str = "batches";
const LAST_COMMITTED_CF: &'static str = "last_committed";
Expand All @@ -71,6 +72,7 @@ impl NodeStorage {
Self::HEADERS_CF,
Self::CERTIFICATES_CF,
Self::CERTIFICATE_ID_BY_ROUND_CF,
Self::CERTIFICATE_ID_BY_ORIGIN_CF,
Self::PAYLOAD_CF,
Self::BATCHES_CF,
Self::LAST_COMMITTED_CF,
Expand All @@ -86,6 +88,7 @@ impl NodeStorage {
header_map,
certificate_map,
certificate_id_by_round_map,
certificate_id_by_origin_map,
payload_map,
batch_map,
last_committed_map,
Expand All @@ -96,7 +99,8 @@ impl NodeStorage {
Self::VOTES_CF;<PublicKey, RoundVoteDigestPair>,
Self::HEADERS_CF;<HeaderDigest, Header>,
Self::CERTIFICATES_CF;<CertificateDigest, Certificate>,
Self::CERTIFICATE_ID_BY_ROUND_CF;<(Round, CertificateDigest), CertificateToken>,
Self::CERTIFICATE_ID_BY_ROUND_CF;<(Round, PublicKey), CertificateDigest>,
Self::CERTIFICATE_ID_BY_ORIGIN_CF;<(PublicKey, Round), CertificateDigest>,
Self::PAYLOAD_CF;<(BatchDigest, WorkerId), PayloadToken>,
Self::BATCHES_CF;<BatchDigest, Batch>,
Self::LAST_COMMITTED_CF;<PublicKey, Round>,
Expand All @@ -107,7 +111,11 @@ impl NodeStorage {
let proposer_store = ProposerStore::new(last_proposed_map);
let vote_digest_store = Store::new(votes_map);
let header_store = Store::new(header_map);
let certificate_store = CertificateStore::new(certificate_map, certificate_id_by_round_map);
let certificate_store = CertificateStore::new(
certificate_map,
certificate_id_by_round_map,
certificate_id_by_origin_map,
);
let payload_store = Store::new(payload_map);
let batch_store = Store::new(batch_map);
let consensus_store = Arc::new(ConsensusStore::new(last_committed_map, sequence_map));
Expand Down Expand Up @@ -187,7 +195,7 @@ impl Node {
let mut handles = Vec::new();
let (rx_executor_network, tx_executor_network) = oneshot::channel();
let (dag, network_model) = if !internal_consensus {
debug!("Consensus is disabled: the primary will run w/o Tusk");
debug!("Consensus is disabled: the primary will run w/o Bullshark");
let consensus_metrics = Arc::new(ConsensusMetrics::new(registry));
let (handle, dag) = Dag::new(&committee.load(), rx_new_certificates, consensus_metrics);

Expand Down Expand Up @@ -242,9 +250,10 @@ impl Node {
store.proposer_store.clone(),
store.payload_store.clone(),
store.vote_digest_store.clone(),
store.consensus_store.clone(),
tx_new_certificates,
/* rx_consensus */ rx_consensus,
/* dag */ dag,
rx_consensus,
dag,
network_model,
tx_reconfigure,
tx_consensus,
Expand Down
3 changes: 0 additions & 3 deletions narwhal/primary/src/block_synchronizer/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ impl MockBlockSynchronizerCore {
tokio::select! {
Some(command) = self.rx_commands.recv() => {
match command {
Command::SynchronizeRange { .. } => {
todo!("MockBlockSynchronizerCore for Command::SynchronizeRange is unimplemented!")
}
Command::SynchronizeBlockHeaders { block_ids, respond_to } => {
let (times, results) = self
.block_headers_expected_requests
Expand Down
Loading

0 comments on commit 27492fe

Please sign in to comment.