Skip to content

Commit

Permalink
Sui v0.15.0 version bump (MystenLabs#5765)
Browse files Browse the repository at this point in the history
  • Loading branch information
ebmifa authored and lanvidr committed Nov 3, 2022
1 parent 2188f13 commit 6332dd5
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 25 deletions.
24 changes: 21 additions & 3 deletions narwhal/network/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
traits::{Lucky, ReliableNetwork, UnreliableNetwork},
BoundedExecutor, CancelOnDropHandler, RetryConfig, MAX_TASK_CONCURRENCY,
};
use anemo::PeerId;
use anemo::{Peer, PeerId, Response};
use anyhow::format_err;
use anyhow::Result;
use async_trait::async_trait;
Expand All @@ -17,10 +17,12 @@ use std::time::Duration;
use tokio::{runtime::Handle, task::JoinHandle};
use types::{
Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse,
GetCertificatesRequest, GetCertificatesResponse, PrimaryMessage, PrimaryToPrimaryClient,
GetCertificatesRequest, GetCertificatesResponse, LatestHeaderRequest, LatestHeaderResponse,
PrimaryMessage, PrimaryToPrimaryClient, PrimaryToPrimaryClient, PrimaryToWorkerClient,
PrimaryToWorkerClient, RequestBatchRequest, WorkerBatchMessage, WorkerDeleteBatchesMessage,
WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerReconfigureMessage,
WorkerSynchronizeMessage, WorkerToPrimaryClient, WorkerToWorkerClient,
WorkerSynchronizeMessage, WorkerToPrimaryClient, WorkerToPrimaryClient, WorkerToWorkerClient,
WorkerToWorkerClient,
};

fn default_executor() -> BoundedExecutor {
Expand Down Expand Up @@ -236,6 +238,22 @@ impl PrimaryToPrimaryRpc for anemo::Network {
.map_err(|e| format_err!("Network error {:?}", e))?;
Ok(response.into_body())
}

async fn get_latest_header(
&self,
peer: &NetworkPublicKey,
request: LatestHeaderRequest,
) -> Result<LatestHeaderResponse> {
let peer_id = PeerId(peer.0.to_bytes());
let peer = self
.peer(peer_id)
.ok_or_else(|| format_err!("Network has no connection with peer {peer_id}"))?;
let response = PrimaryToPrimaryClient::new(peer)
.get_latest_header(request)
.await
.map_err(|e| format_err!("Network error {:?}", e))?;
Ok(response.into_body())
}
}

//
Expand Down
8 changes: 7 additions & 1 deletion narwhal/network/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rand::prelude::{SliceRandom, SmallRng};
use tokio::task::JoinHandle;
use types::{
Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse,
GetCertificatesRequest, GetCertificatesResponse,
GetCertificatesRequest, GetCertificatesResponse, LatestHeaderRequest, LatestHeaderResponse,
};

pub trait UnreliableNetwork<Request: Clone + Send + Sync> {
Expand Down Expand Up @@ -107,6 +107,12 @@ pub trait PrimaryToPrimaryRpc {
peer: &NetworkPublicKey,
request: FetchCertificatesRequest,
) -> Result<FetchCertificatesResponse>;

async fn get_latest_header(
&self,
peer: &NetworkPublicKey,
request: LatestHeaderRequest,
) -> Result<LatestHeaderResponse>;
}

#[async_trait]
Expand Down
70 changes: 62 additions & 8 deletions narwhal/primary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ use crate::{
primary::PrimaryMessage,
synchronizer::Synchronizer,
};

use async_recursion::async_recursion;
use config::{Committee, Epoch, SharedWorkerCache};
use crypto::{PublicKey, Signature};
use crypto::{NetworkPublicKey, PublicKey, Signature};
use fastcrypto::{hash::Hash as _, SignatureService};
use network::{CancelOnDropHandler, P2pNetwork, ReliableNetwork};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use network::{
CancelOnDropHandler, P2pNetwork, PrimaryToPrimaryRpc, ReliableNetwork, UnreliableNetwork,
};
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
Expand All @@ -21,23 +27,21 @@ use std::{
use storage::CertificateStore;
use store::Store;
use sui_metrics::spawn_monitored_task;
use tokio::{sync::watch, task::JoinHandle};
use tokio::{sync::watch, task::JoinHandle, time};
use tracing::{debug, error, info, instrument, trace, warn};
use types::{
ensure,
error::{DagError, DagError::StoreError, DagResult},
metered_channel::{Receiver, Sender},
Certificate, Header, HeaderDigest, ReconfigureNotification, Round, RoundVoteDigestPair,
Timestamp, Vote,
Certificate, Header, HeaderDigest, LatestHeaderRequest, PrimaryToPrimaryClient,
ReconfigureNotification, Round, RoundVoteDigestPair, Timestamp, Vote,
};

#[cfg(test)]
#[path = "tests/core_tests.rs"]
pub mod core_tests;

// TODO: enable below.
// Rejects a header if it requires catching up the following number of rounds.
// const MAX_HEADER_ROUND_CATCHUP_THRESHOLD: u64 = 20;
const RECOVERY_REQUEST_TIMEOUT_SECS: u64 = 60;

pub struct Core {
/// The public key of this primary.
Expand Down Expand Up @@ -181,6 +185,56 @@ impl Core {

self.highest_received_round = last_round_number;
self.highest_processed_round = last_round_number;

// Get latest header from all peers and process it.
let peers: Vec<NetworkPublicKey> = self
.committee
.others_primaries(&self.name)
.into_iter()
.map(|(_, _, network_key)| network_key)
.collect();

let network = P2pNetwork::new(self.network.network().clone());
let mut header_futures = FuturesUnordered::new();
let request = LatestHeaderRequest {};
for peer in peers.iter() {
let network = network.network();
let request = request.clone();

header_futures.push(async move {
let _ = &network;
network.get_latest_header(peer, request).await
});
}
let request_interval = Duration::from_secs(RECOVERY_REQUEST_TIMEOUT_SECS);
let interval = Box::pin(time::sleep(request_interval));
loop {
tokio::select! {
res = header_futures.next() => {
match res {
Some(Ok(response)) => {
if let Some(header) = response.header {
self.process_header(&header);
}
}
Some(Err(e)) => {
error!(
"failed to get latest header from peer as recovery on startup: {:?}",
e
)
}
None => {
break;
}
}
}
_ = &mut interval => {
debug!("timeout was passed when requesting recovery header from peer");
break;
}
}
}

self
}

Expand Down
26 changes: 22 additions & 4 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ use types::{
metered_channel::{channel_with_total, Receiver, Sender},
BatchDigest, Certificate, CertificateDigest, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header,
HeaderDigest, PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryToPrimary,
PrimaryToPrimaryServer, ReconfigureNotification, Round, RoundVoteDigestPair,
WorkerInfoResponse, WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerToPrimary,
WorkerToPrimaryServer,
HeaderDigest, LatestHeaderRequest, LatestHeaderResponse, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PrimaryToPrimary, PrimaryToPrimary, PrimaryToPrimary,
PrimaryToPrimaryServer, PrimaryToPrimaryServer, PrimaryToPrimaryServer,
ReconfigureNotification, Round, RoundVoteDigestPair, WorkerInfoResponse,
WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerToPrimary, WorkerToPrimary,
WorkerToPrimary, WorkerToPrimaryServer, WorkerToPrimaryServer, WorkerToPrimaryServer,
};

#[cfg(any(test))]
Expand Down Expand Up @@ -198,6 +200,7 @@ impl Primary {
tx_primary_messages: tx_primary_messages.clone(),
certificate_store: certificate_store.clone(),
payload_store: payload_store.clone(),
proposer_store: proposer_store.clone(),
});
let worker_service = WorkerToPrimaryServer::new(WorkerReceiverHandler {
tx_our_digests,
Expand Down Expand Up @@ -516,6 +519,7 @@ struct PrimaryReceiverHandler {
tx_primary_messages: Sender<PrimaryMessage>,
certificate_store: CertificateStore,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
proposer_store: ProposerStore,
}

#[allow(clippy::result_large_err)]
Expand Down Expand Up @@ -670,6 +674,20 @@ impl PrimaryToPrimary for PrimaryReceiverHandler {
payload_availability: result,
}))
}

async fn get_latest_header(
&self,
_request: anemo::Request<LatestHeaderRequest>,
) -> Result<anemo::Response<LatestHeaderResponse>, anemo::rpc::Status> {
let latest_header = self.proposer_store.get_last_proposed().map_err(|e| {
anemo::rpc::Status::internal(format!(
"error fetching latest proposed header from store: {e}"
))
})?;
Ok(anemo::Response::new(LatestHeaderResponse {
header: latest_header,
}))
}
}

/// Defines how the network receiver handles incoming workers messages.
Expand Down
3 changes: 2 additions & 1 deletion narwhal/primary/src/proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use tracing::{debug, info};
use types::{
error::{DagError, DagResult},
metered_channel::{Receiver, Sender},
BatchDigest, Certificate, Header, ReconfigureNotification, Round, Timestamp, TimestampMs,
BatchDigest, Certificate, Header, PrimaryMessage, ReconfigureNotification, Round, Timestamp,
TimestampMs,
};

#[cfg(test)]
Expand Down
13 changes: 11 additions & 2 deletions narwhal/primary/src/tests/certificate_waiter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ use tokio::{
use types::{
BatchDigest, Certificate, CertificateDigest, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header,
HeaderDigest, Metadata, PayloadAvailabilityRequest, PayloadAvailabilityResponse,
PrimaryMessage, PrimaryToPrimary, PrimaryToPrimaryServer, ReconfigureNotification, Round,
HeaderDigest, LatestHeaderRequest, LatestHeaderResponse, Metadata, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, PrimaryToPrimary,
PrimaryToPrimaryServer, PrimaryToPrimaryServer, ReconfigureNotification, Round,
};

struct FetchCertificateProxy {
Expand Down Expand Up @@ -66,12 +67,20 @@ impl PrimaryToPrimary for FetchCertificateProxy {
self.response.lock().await.recv().await.unwrap(),
))
}

async fn get_payload_availability(
&self,
_request: anemo::Request<PayloadAvailabilityRequest>,
) -> Result<anemo::Response<PayloadAvailabilityResponse>, anemo::rpc::Status> {
unimplemented!()
}

async fn get_latest_header(
&self,
_request: anemo::Request<LatestHeaderRequest>,
) -> Result<anemo::Response<LatestHeaderResponse>, anemo::rpc::Status> {
unimplemented!()
}
}

async fn verify_certificates_in_store(
Expand Down
5 changes: 4 additions & 1 deletion narwhal/primary/src/tests/primary_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
sync::Arc,
time::Duration,
};
use storage::CertificateStore;
use storage::{CertificateStore, ProposerStore};
use store::rocks::DBMap;
use store::Store;
use test_utils::{temp_dir, CommitteeFixture};
Expand Down Expand Up @@ -268,6 +268,7 @@ async fn test_fetch_certificates_handler() {
tx_primary_messages,
certificate_store: certificate_store.clone(),
payload_store: payload_store.clone(),
proposer_store: ProposerStore::new_for_tests(),
};

let mut current_round: Vec<_> = Certificate::genesis(&committee)
Expand Down Expand Up @@ -402,6 +403,7 @@ async fn test_process_payload_availability_success() {
tx_primary_messages,
certificate_store: certificate_store.clone(),
payload_store: payload_store.clone(),
proposer_store: ProposerStore::new_for_tests(),
};

// GIVEN some mock certificates
Expand Down Expand Up @@ -516,6 +518,7 @@ async fn test_process_payload_availability_when_failures() {
tx_primary_messages,
certificate_store: certificate_store.clone(),
payload_store: payload_store.clone(),
proposer_store: ProposerStore::new_for_tests(),
};

// AND some mock certificates
Expand Down
20 changes: 15 additions & 5 deletions narwhal/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ use tracing::info;
use types::{
Batch, BatchDigest, Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header,
HeaderBuilder, PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryMessage,
PrimaryToPrimary, PrimaryToPrimaryServer, PrimaryToWorker, PrimaryToWorkerServer,
RequestBatchRequest, RequestBatchResponse, Round, SequenceNumber, Transaction, Vote,
WorkerBatchMessage, WorkerDeleteBatchesMessage, WorkerReconfigureMessage,
WorkerSynchronizeMessage, WorkerToWorker, WorkerToWorkerServer,
HeaderBuilder, HeaderBuilder, LatestHeaderRequest, LatestHeaderResponse,
PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary,
PrimaryToPrimary, PrimaryToPrimaryServer, PrimaryToPrimaryServer, PrimaryToWorker,
PrimaryToWorker, PrimaryToWorkerServer, PrimaryToWorkerServer, RequestBatchRequest,
RequestBatchResponse, Round, SequenceNumber, Transaction, Vote, WorkerBatchMessage,
WorkerDeleteBatchesMessage, WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToWorker,
WorkerToWorker, WorkerToWorkerServer, WorkerToWorkerServer,
};

pub mod cluster;
Expand Down Expand Up @@ -210,12 +212,20 @@ impl PrimaryToPrimary for PrimaryToPrimaryMockServer {
) -> Result<anemo::Response<FetchCertificatesResponse>, anemo::rpc::Status> {
unimplemented!()
}

async fn get_payload_availability(
&self,
_request: anemo::Request<PayloadAvailabilityRequest>,
) -> Result<anemo::Response<PayloadAvailabilityResponse>, anemo::rpc::Status> {
unimplemented!()
}

async fn get_latest_header(
&self,
_request: anemo::Request<LatestHeaderRequest>,
) -> Result<anemo::Response<LatestHeaderResponse>, anemo::rpc::Status> {
unimplemented!()
}
}

pub struct PrimaryToWorkerMockServer {
Expand Down
10 changes: 10 additions & 0 deletions narwhal/types/build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use rustversion::{beta, nightly, stable};
use std::{
env,
path::{Path, PathBuf},
Expand Down Expand Up @@ -86,6 +87,15 @@ fn build_anemo_services(out_dir: &Path) {
.codec_path("anemo::rpc::codec::BincodeCodec")
.build(),
)
.method(
anemo_build::manual::Method::builder()
.name("get_latest_header")
.route_name("GetLatestHeader")
.request_type("crate::LatestHeaderRequest")
.response_type("crate::LatestHeaderResponse")
.codec_path("anemo::rpc::codec::BincodeCodec")
.build(),
)
.build();

let primary_to_worker = anemo_build::manual::Service::builder()
Expand Down
8 changes: 8 additions & 0 deletions narwhal/types/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,14 @@ impl PayloadAvailabilityResponse {
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LatestHeaderRequest {}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LatestHeaderResponse {
pub header: Option<Header>,
}

/// Message to reconfigure worker tasks. This message must be sent by a trusted source.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum ReconfigureNotification {
Expand Down

0 comments on commit 6332dd5

Please sign in to comment.