Skip to content

Commit

Permalink
Revert "Remove BatchV1 and related code gating BatchV2" (MystenLabs#1…
Browse files Browse the repository at this point in the history
…3098)

Causes panics when upgrading validators because RocksDB schema will not
be updated until epoch change
  • Loading branch information
arun-koshy authored Jul 21, 2023
1 parent 54fc41a commit d43f12c
Show file tree
Hide file tree
Showing 16 changed files with 794 additions and 135 deletions.
51 changes: 46 additions & 5 deletions crates/sui-core/src/consensus_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use eyre::WrapErr;
use mysten_metrics::monitored_scope;
use prometheus::{register_int_counter_with_registry, IntCounter, Registry};
use std::sync::Arc;
use sui_protocol_config::ProtocolConfig;

use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::checkpoints::CheckpointServiceNotify;
use crate::transaction_manager::TransactionManager;
use async_trait::async_trait;
use narwhal_types::BatchAPI;
use narwhal_types::{validate_batch_version, BatchAPI};
use narwhal_worker::TransactionValidator;
use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
use tap::TapFallible;
Expand Down Expand Up @@ -60,9 +61,17 @@ impl TransactionValidator for SuiTxValidator {
Ok(())
}

async fn validate_batch(&self, b: &narwhal_types::Batch) -> Result<(), Self::Error> {
async fn validate_batch(
&self,
b: &narwhal_types::Batch,
protocol_config: &ProtocolConfig,
) -> Result<(), Self::Error> {
let _scope = monitored_scope("ValidateBatch");

// TODO: Remove once we have upgraded to protocol version 12.
validate_batch_version(b, protocol_config)
.map_err(|err| eyre::eyre!(format!("Invalid Batch: {err}")))?;

let txs = b
.transactions()
.iter()
Expand Down Expand Up @@ -163,7 +172,7 @@ mod tests {
consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics},
};

use narwhal_test_utils::latest_protocol_version;
use narwhal_test_utils::{get_protocol_config, latest_protocol_version};
use narwhal_types::Batch;
use narwhal_worker::TransactionValidator;
use sui_types::signature::GenericSignature;
Expand Down Expand Up @@ -221,7 +230,9 @@ mod tests {
.collect();

let batch = Batch::new(transaction_bytes, latest_protocol_config);
let res_batch = validator.validate_batch(&batch).await;
let res_batch = validator
.validate_batch(&batch, latest_protocol_config)
.await;
assert!(res_batch.is_ok(), "{res_batch:?}");

let bogus_transaction_bytes: Vec<_> = certificates
Expand All @@ -237,7 +248,37 @@ mod tests {
.collect();

let batch = Batch::new(bogus_transaction_bytes, latest_protocol_config);
let res_batch = validator.validate_batch(&batch).await;
let res_batch = validator
.validate_batch(&batch, latest_protocol_config)
.await;
assert!(res_batch.is_err());

// TODO: Remove once we have upgraded to protocol version 12.
// protocol version 11 should only support BatchV1
let protocol_config_v11 = &get_protocol_config(11);
let batch_v1 = Batch::new(vec![], protocol_config_v11);

// Case #1: Receive BatchV1 and network has not upgraded to 12 so we are okay
let res_batch = validator
.validate_batch(&batch_v1, protocol_config_v11)
.await;
assert!(res_batch.is_ok());
// Case #2: Receive BatchV1 but network has upgraded to 12 so we fail because we expect BatchV2
let res_batch = validator
.validate_batch(&batch_v1, latest_protocol_config)
.await;
assert!(res_batch.is_err());

let batch_v2 = Batch::new(vec![], latest_protocol_config);
// Case #3: Receive BatchV2 but network is still in v11 so we fail because we expect BatchV1
let res_batch = validator
.validate_batch(&batch_v2, protocol_config_v11)
.await;
assert!(res_batch.is_err());
// Case #4: Receive BatchV2 and network is upgraded to 12 so we are okay
let res_batch = validator
.validate_batch(&batch_v2, latest_protocol_config)
.await;
assert!(res_batch.is_ok());
}
}
82 changes: 48 additions & 34 deletions narwhal/executor/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct Inner {
authority_id: AuthorityIdentifier,
worker_cache: WorkerCache,
committee: Committee,
_protocol_config: ProtocolConfig,
protocol_config: ProtocolConfig,
client: NetworkClient,
metrics: Arc<ExecutorMetrics>,
}
Expand Down Expand Up @@ -121,7 +121,7 @@ async fn create_and_run_subscriber(
authority_id: AuthorityIdentifier,
worker_cache: WorkerCache,
committee: Committee,
_protocol_config: ProtocolConfig,
protocol_config: ProtocolConfig,
rx_shutdown: ConditionalBroadcastReceiver,
rx_sequence: metered_channel::Receiver<CommittedSubDag>,
client: NetworkClient,
Expand All @@ -136,7 +136,7 @@ async fn create_and_run_subscriber(
inner: Arc::new(Inner {
authority_id,
committee,
_protocol_config,
protocol_config,
worker_cache,
client,
metrics,
Expand Down Expand Up @@ -376,49 +376,63 @@ impl Subscriber {
}

fn record_fetched_batch_metrics(inner: &Inner, batch: &Batch, digest: &BatchDigest) {
let metadata = batch.versioned_metadata();
if let Some(received_at) = metadata.received_at() {
let remote_duration = received_at.elapsed().as_secs_f64();
debug!(
"Batch was fetched for execution after being received from another worker {}s ago.",
remote_duration
);
inner
.metrics
.batch_execution_local_latency
.with_label_values(&["other"])
.observe(remote_duration);
} else {
let local_duration = batch
// TODO: Remove once we have upgraded to protocol version 12.
if inner.protocol_config.narwhal_versioned_metadata() {
let metadata = batch.versioned_metadata();
if let Some(received_at) = metadata.received_at() {
let remote_duration = received_at.elapsed().as_secs_f64();
debug!(
"Batch was fetched for execution after being received from another worker {}s ago.",
remote_duration
);
inner
.metrics
.batch_execution_local_latency
.with_label_values(&["other"])
.observe(remote_duration);
} else {
let local_duration = batch
.versioned_metadata()
.created_at()
.elapsed()
.as_secs_f64();
debug!(
"Batch was fetched for execution after being created locally {}s ago.",
local_duration
);
inner
.metrics
.batch_execution_local_latency
.with_label_values(&["own"])
.observe(local_duration);
};

let batch_fetch_duration = batch
.versioned_metadata()
.created_at()
.elapsed()
.as_secs_f64();
inner
.metrics
.batch_execution_latency
.observe(batch_fetch_duration);
debug!(
"Batch was fetched for execution after being created locally {}s ago.",
local_duration
"Batch {:?} took {} seconds since it has been created to when it has been fetched for execution",
digest,
batch_fetch_duration,
);
} else {
let batch_fetch_duration = batch.metadata().created_at.elapsed().as_secs_f64();
inner
.metrics
.batch_execution_local_latency
.with_label_values(&["own"])
.observe(local_duration);
};

let batch_fetch_duration = batch
.versioned_metadata()
.created_at()
.elapsed()
.as_secs_f64();
inner
.metrics
.batch_execution_latency
.observe(batch_fetch_duration);
debug!(
.batch_execution_latency
.observe(batch_fetch_duration);
debug!(
"Batch {:?} took {} seconds since it has been created to when it has been fetched for execution",
digest,
batch_fetch_duration,
);
}
}
}

Expand Down
19 changes: 18 additions & 1 deletion narwhal/network/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use parking_lot::RwLock;
use tokio::{select, time::sleep};
use types::{
error::LocalClientError, FetchBatchesRequest, FetchBatchesResponse, PrimaryToWorker,
WorkerOthersBatchMessage, WorkerOwnBatchMessage, WorkerSynchronizeMessage, WorkerToPrimary,
WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerOwnBatchMessage,
WorkerSynchronizeMessage, WorkerToPrimary,
};

use crate::traits::{PrimaryToWorkerClient, WorkerToPrimaryClient};
Expand Down Expand Up @@ -171,6 +172,22 @@ impl PrimaryToWorkerClient for NetworkClient {

#[async_trait]
impl WorkerToPrimaryClient for NetworkClient {
// TODO: Remove once we have upgraded to protocol version 12.
async fn report_our_batch(
&self,
request: WorkerOurBatchMessage,
) -> Result<(), LocalClientError> {
let c = self.get_worker_to_primary_handler().await?;
select! {
resp = c.report_our_batch(Request::new(request)) => {
resp.map_err(|e| LocalClientError::Internal(format!("{e:?}")))?;
Ok(())
},
() = self.shutdown_notify.wait() => {
Err(LocalClientError::ShuttingDown)
},
}
}
async fn report_own_batch(
&self,
request: WorkerOwnBatchMessage,
Expand Down
9 changes: 8 additions & 1 deletion narwhal/network/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crypto::NetworkPublicKey;
use types::{
error::LocalClientError, FetchBatchesRequest, FetchBatchesResponse, FetchCertificatesRequest,
FetchCertificatesResponse, RequestBatchesRequest, RequestBatchesResponse,
WorkerOthersBatchMessage, WorkerOwnBatchMessage, WorkerSynchronizeMessage,
WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerOwnBatchMessage,
WorkerSynchronizeMessage,
};

pub trait ReliableNetwork<Request: Clone + Send + Sync> {
Expand Down Expand Up @@ -59,6 +60,12 @@ pub trait PrimaryToWorkerClient {

#[async_trait]
pub trait WorkerToPrimaryClient {
// TODO: Remove once we have upgraded to protocol version 12.
async fn report_our_batch(
&self,
request: WorkerOurBatchMessage,
) -> Result<(), LocalClientError>;

async fn report_own_batch(
&self,
request: WorkerOwnBatchMessage,
Expand Down
14 changes: 10 additions & 4 deletions narwhal/node/src/generate_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use std::{fs::File, io::Write};
use structopt::{clap::arg_enum, StructOpt};
use types::{
Batch, BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest, HeaderV1Builder,
MetadataV1, VersionedMetadata, WorkerOthersBatchMessage, WorkerOwnBatchMessage,
WorkerSynchronizeMessage,
Metadata, MetadataV1, VersionedMetadata, WorkerOthersBatchMessage, WorkerOurBatchMessage,
WorkerOwnBatchMessage, WorkerSynchronizeMessage,
};

#[allow(clippy::mutable_key_type)]
Expand Down Expand Up @@ -112,7 +112,12 @@ fn get_registry() -> Result<Registry> {
);
tracer.trace_value(&mut samples, &worker_index)?;

let own_batch = WorkerOwnBatchMessage {
let our_batch = WorkerOurBatchMessage {
digest: BatchDigest([0u8; 32]),
worker_id: 0,
metadata: Metadata { created_at: 0 },
};
let our_batch_v2 = WorkerOwnBatchMessage {
digest: BatchDigest([0u8; 32]),
worker_id: 0,
metadata: VersionedMetadata::V1(MetadataV1 {
Expand All @@ -130,7 +135,8 @@ fn get_registry() -> Result<Registry> {
is_certified: true,
};

tracer.trace_value(&mut samples, &own_batch)?;
tracer.trace_value(&mut samples, &our_batch)?;
tracer.trace_value(&mut samples, &our_batch_v2)?;
tracer.trace_value(&mut samples, &others_batch)?;
tracer.trace_value(&mut samples, &sync)?;

Expand Down
18 changes: 18 additions & 0 deletions narwhal/node/tests/staged/narwhal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ AuthorityIdentifier:
Batch:
ENUM:
0:
V1:
NEWTYPE:
TYPENAME: BatchV1
1:
V2:
NEWTYPE:
TYPENAME: BatchV2
Expand All @@ -12,6 +16,13 @@ BatchDigest:
TUPLEARRAY:
CONTENT: U8
SIZE: 32
BatchV1:
STRUCT:
- transactions:
SEQ:
SEQ: U8
- metadata:
TYPENAME: Metadata
BatchV2:
STRUCT:
- transactions:
Expand Down Expand Up @@ -102,6 +113,13 @@ WorkerOthersBatchMessage:
- digest:
TYPENAME: BatchDigest
- worker_id: U32
WorkerOurBatchMessage:
STRUCT:
- digest:
TYPENAME: BatchDigest
- worker_id: U32
- metadata:
TYPENAME: Metadata
WorkerOwnBatchMessage:
STRUCT:
- digest:
Expand Down
30 changes: 29 additions & 1 deletion narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use types::{
FetchCertificatesResponse, Header, HeaderAPI, MetadataAPI, PreSubscribedBroadcastSender,
PrimaryToPrimary, PrimaryToPrimaryServer, RequestVoteRequest, RequestVoteResponse, Round,
SendCertificateRequest, SendCertificateResponse, Vote, VoteInfoAPI, WorkerOthersBatchMessage,
WorkerOwnBatchMessage, WorkerToPrimary, WorkerToPrimaryServer,
WorkerOurBatchMessage, WorkerOwnBatchMessage, WorkerToPrimary, WorkerToPrimaryServer,
};

#[cfg(any(test))]
Expand Down Expand Up @@ -1015,6 +1015,34 @@ struct WorkerReceiverHandler {

#[async_trait]
impl WorkerToPrimary for WorkerReceiverHandler {
// TODO: Remove once we have upgraded to protocol version 12.
async fn report_our_batch(
&self,
request: anemo::Request<WorkerOurBatchMessage>,
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
let message = request.into_body();

let (tx_ack, rx_ack) = oneshot::channel();
let response = self
.tx_our_digests
.send(OurDigestMessage {
digest: message.digest,
worker_id: message.worker_id,
timestamp: message.metadata.created_at,
ack_channel: Some(tx_ack),
})
.await
.map(|_| anemo::Response::new(()))
.map_err(|e| anemo::rpc::Status::internal(e.to_string()))?;

// If we are ok, then wait for the ack
rx_ack
.await
.map_err(|e| anemo::rpc::Status::internal(e.to_string()))?;

Ok(response)
}

async fn report_own_batch(
&self,
request: anemo::Request<WorkerOwnBatchMessage>,
Expand Down
Loading

0 comments on commit d43f12c

Please sign in to comment.