Skip to content

Commit

Permalink
Remove enum wrapper for last remaining w2w "Batch" RPC (MystenLabs#5365)
Browse files Browse the repository at this point in the history
  • Loading branch information
aschran authored Oct 19, 2022
1 parent 2f71fcd commit 69ef705
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 65 deletions.
16 changes: 8 additions & 8 deletions narwhal/network/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::time::Duration;
use tokio::{runtime::Handle, task::JoinHandle};
use types::{
Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse, PrimaryMessage,
PrimaryToPrimaryClient, PrimaryToWorkerClient, RequestBatchRequest, WorkerBatchRequest,
WorkerBatchResponse, WorkerDeleteBatchesMessage, WorkerMessage, WorkerOthersBatchMessage,
PrimaryToPrimaryClient, PrimaryToWorkerClient, RequestBatchRequest, WorkerBatchMessage,
WorkerBatchRequest, WorkerBatchResponse, WorkerDeleteBatchesMessage, WorkerOthersBatchMessage,
WorkerOurBatchMessage, WorkerPrimaryMessage, WorkerReconfigureMessage,
WorkerSynchronizeMessage, WorkerToPrimaryClient, WorkerToWorkerClient,
};
Expand Down Expand Up @@ -378,32 +378,32 @@ impl ReliableNetwork<WorkerOthersBatchMessage> for P2pNetwork {
// Worker-to-Worker
//

impl UnreliableNetwork<WorkerMessage> for P2pNetwork {
impl UnreliableNetwork<WorkerBatchMessage> for P2pNetwork {
type Response = ();
fn unreliable_send(
&mut self,
peer: NetworkPublicKey,
message: &WorkerMessage,
message: &WorkerBatchMessage,
) -> Result<JoinHandle<Result<anemo::Response<()>>>> {
let message = message.to_owned();
let f =
move |peer| async move { WorkerToWorkerClient::new(peer).send_message(message).await };
move |peer| async move { WorkerToWorkerClient::new(peer).report_batch(message).await };
self.unreliable_send(peer, f)
}
}

#[async_trait]
impl ReliableNetwork<WorkerMessage> for P2pNetwork {
impl ReliableNetwork<WorkerBatchMessage> for P2pNetwork {
type Response = ();
async fn send(
&mut self,
peer: NetworkPublicKey,
message: &WorkerMessage,
message: &WorkerBatchMessage,
) -> CancelOnDropHandler<Result<anemo::Response<()>>> {
let message = message.to_owned();
let f = move |peer| {
let message = message.clone();
async move { WorkerToWorkerClient::new(peer).send_message(message).await }
async move { WorkerToWorkerClient::new(peer).report_batch(message).await }
};

self.send(peer, f).await
Expand Down
20 changes: 10 additions & 10 deletions narwhal/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use types::{
Batch, BatchDigest, Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest,
FetchCertificatesResponse, Header, HeaderBuilder, PrimaryMessage, PrimaryToPrimary,
PrimaryToPrimaryServer, PrimaryToWorker, PrimaryToWorkerServer, RequestBatchRequest,
RequestBatchResponse, Round, SequenceNumber, Transaction, Vote, WorkerBatchRequest,
WorkerBatchResponse, WorkerDeleteBatchesMessage, WorkerMessage, WorkerReconfigureMessage,
RequestBatchResponse, Round, SequenceNumber, Transaction, Vote, WorkerBatchMessage,
WorkerBatchRequest, WorkerBatchResponse, WorkerDeleteBatchesMessage, WorkerReconfigureMessage,
WorkerSynchronizeMessage, WorkerToWorker, WorkerToWorkerServer,
};

Expand Down Expand Up @@ -275,7 +275,7 @@ impl PrimaryToWorker for PrimaryToWorkerMockServer {
}

pub struct WorkerToWorkerMockServer {
msg_sender: Sender<WorkerMessage>,
batch_sender: Sender<WorkerBatchMessage>,
batch_request_sender: Sender<WorkerBatchRequest>,
}

Expand All @@ -284,15 +284,15 @@ impl WorkerToWorkerMockServer {
keypair: NetworkKeyPair,
address: Multiaddr,
) -> (
Receiver<WorkerMessage>,
Receiver<WorkerBatchMessage>,
Receiver<WorkerBatchRequest>,
anemo::Network,
) {
let addr = network::multiaddr_to_address(&address).unwrap();
let (msg_sender, msg_receiver) = channel(1);
let (batch_sender, batch_receiver) = channel(1);
let (batch_request_sender, batch_request_receiver) = channel(1);
let service = WorkerToWorkerServer::new(Self {
msg_sender,
batch_sender,
batch_request_sender,
});

Expand All @@ -303,19 +303,19 @@ impl WorkerToWorkerMockServer {
.start(routes)
.unwrap();
info!("starting network on: {}", network.local_addr());
(msg_receiver, batch_request_receiver, network)
(batch_receiver, batch_request_receiver, network)
}
}

#[async_trait]
impl WorkerToWorker for WorkerToWorkerMockServer {
async fn send_message(
async fn report_batch(
&self,
request: anemo::Request<WorkerMessage>,
request: anemo::Request<WorkerBatchMessage>,
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
let message = request.into_body();

self.msg_sender.send(message).await.unwrap();
self.batch_sender.send(message).await.unwrap();

Ok(anemo::Response::new(()))
}
Expand Down
6 changes: 4 additions & 2 deletions narwhal/types/benches/batch_digest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use criterion::{
use fastcrypto::hash::Hash;
use narwhal_types as types;
use rand::Rng;
use types::{serialized_batch_digest, Batch, WorkerMessage};
use types::{serialized_batch_digest, Batch, WorkerBatchMessage};

pub fn batch_digest(c: &mut Criterion) {
let mut digest_group = c.benchmark_group("Batch digests");
Expand All @@ -21,7 +21,9 @@ pub fn batch_digest(c: &mut Criterion) {
.collect::<Vec<u8>>()
};
let batch = Batch((0..size).map(|_| tx_gen()).collect::<Vec<_>>());
let message = WorkerMessage::Batch(batch.clone());
let message = WorkerBatchMessage {
batch: batch.clone(),
};
let serialized_batch = bincode::serialize(&message).unwrap();

digest_group.throughput(Throughput::Bytes(512 * size as u64));
Expand Down
6 changes: 3 additions & 3 deletions narwhal/types/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ fn build_anemo_services(out_dir: &Path) {
.attributes(automock_attribute)
.method(
anemo_build::manual::Method::builder()
.name("send_message")
.route_name("SendMessage")
.request_type("crate::WorkerMessage")
.name("report_batch")
.route_name("ReportBatch")
.request_type("crate::WorkerBatchMessage")
.response_type("()")
.codec_path("anemo::rpc::codec::BincodeCodec")
.build(),
Expand Down
16 changes: 8 additions & 8 deletions narwhal/types/src/tests/batch_serde.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{serialized_batch_digest, Batch, WorkerMessage};
use crate::{serialized_batch_digest, Batch, WorkerBatchMessage};
use fastcrypto::hash::Hash;
use proptest::arbitrary::Arbitrary;
use serde_test::{assert_tokens, Token};
Expand Down Expand Up @@ -65,16 +65,16 @@ fn test_bincode_serde_batch() {
fn test_bincode_serde_batch_message() {
let tx = || vec![1; 5];

let txes: WorkerMessage = WorkerMessage::Batch(Batch((0..2).map(|_| tx()).collect()));
let txes = WorkerBatchMessage {
batch: Batch((0..2).map(|_| tx()).collect()),
};

let txes_bytes = bincode::serialize(&txes).unwrap();

// We expect the difference with the above test will be the enum variant above on 4 bytes,
// see https://github.com/bincode-org/bincode/blob/75a2e0bc9d35cfa7537633b07a9307bf71da84b5/src/features/serde/ser.rs#L212-L224

// Variant index 0 (4 bytes), Length-prefix 2, length-prefix 5, 11111, length-prefix 5, 11111
// We expect this will be the same as the above.
// Length-prefix 2, length-prefix 5, 11111, length-prefix 5, 11111
let expected_bytes =
hex::decode("0000000002000000000000000500000000000000010101010105000000000000000101010101")
hex::decode("02000000000000000500000000000000010101010105000000000000000101010101")
.unwrap();

assert_eq!(
Expand All @@ -92,7 +92,7 @@ proptest::proptest! {
batch in Batch::arbitrary()
) {
let digest = batch.digest();
let message = WorkerMessage::Batch(batch);
let message = WorkerBatchMessage{batch};
let serialized = bincode::serialize(&message).expect("Failed to serialize our own batch");
let digest_from_serialized = serialized_batch_digest(&serialized).expect("Failed to hash serialized batch");
assert_eq!(digest, digest_from_serialized);
Expand Down
10 changes: 4 additions & 6 deletions narwhal/types/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ use thiserror::Error;
#[path = "tests/batch_serde.rs"]
mod batch_serde;

/// Unsolicited messages exchanged between workers.
#[allow(clippy::large_enum_variant)]
/// Used by workers to send a new batch.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum WorkerMessage {
/// Used by workers to send a new batch.
Batch(Batch),
pub struct WorkerBatchMessage {
pub batch: Batch,
}

/// Used by workers to request batches from other workers.
Expand Down Expand Up @@ -54,7 +52,7 @@ pub struct RequestBatchResponse {
/// TODO: update batch hashing to reflect hashing fixed sequences of transactions, see #87.
pub fn serialized_batch_digest<K: AsRef<[u8]>>(sbm: K) -> Result<BatchDigest, DigestError> {
let sbm = sbm.as_ref();
let mut offset = 4; // skip the enum variant selector
let mut offset = 0;
let num_transactions = u64::from_le_bytes(
sbm[offset..offset + 8]
.try_into()
Expand Down
34 changes: 15 additions & 19 deletions narwhal/worker/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use tokio::sync::watch;
use tracing::{debug, error, info, trace, warn};
use types::{
metered_channel::Sender, Batch, BatchDigest, PrimaryToWorker, ReconfigureNotification,
RequestBatchRequest, RequestBatchResponse, WorkerBatchRequest, WorkerBatchResponse,
WorkerDeleteBatchesMessage, WorkerMessage, WorkerOthersBatchMessage, WorkerReconfigureMessage,
WorkerSynchronizeMessage, WorkerToWorker, WorkerToWorkerClient,
RequestBatchRequest, RequestBatchResponse, WorkerBatchMessage, WorkerBatchRequest,
WorkerBatchResponse, WorkerDeleteBatchesMessage, WorkerOthersBatchMessage,
WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToWorker, WorkerToWorkerClient,
};

#[cfg(test)]
Expand All @@ -39,25 +39,21 @@ pub struct WorkerReceiverHandler {

#[async_trait]
impl WorkerToWorker for WorkerReceiverHandler {
async fn send_message(
async fn report_batch(
&self,
request: anemo::Request<WorkerMessage>,
request: anemo::Request<WorkerBatchMessage>,
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
let message = request.into_body();
match message {
WorkerMessage::Batch(batch) => {
let digest = batch.digest();
self.store.write(digest, batch).await;
self.tx_others_batch
.send(WorkerOthersBatchMessage {
digest,
worker_id: self.id,
})
.await
}
}
.map(|_| anemo::Response::new(()))
.map_err(|e| anemo::rpc::Status::internal(e.to_string()))
let digest = message.batch.digest();
self.store.write(digest, message.batch).await;
self.tx_others_batch
.send(WorkerOthersBatchMessage {
digest,
worker_id: self.id,
})
.await
.map(|_| anemo::Response::new(()))
.map_err(|e| anemo::rpc::Status::internal(e.to_string()))
}
async fn request_batches(
&self,
Expand Down
2 changes: 1 addition & 1 deletion narwhal/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ mod primary_connector;
mod quorum_waiter;
mod worker;

pub use crate::worker::{Worker, WorkerMessage};
pub use crate::worker::Worker;
4 changes: 2 additions & 2 deletions narwhal/worker/src/quorum_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::{sync::watch, task::JoinHandle};
use types::{
error::DagError,
metered_channel::{Receiver, Sender},
Batch, BatchDigest, ReconfigureNotification, WorkerMessage, WorkerOurBatchMessage,
Batch, BatchDigest, ReconfigureNotification, WorkerBatchMessage, WorkerOurBatchMessage,
};

#[cfg(test)]
Expand Down Expand Up @@ -94,7 +94,7 @@ impl QuorumWaiter {
.map(|(name, info)| (name, info.name))
.collect();
let (primary_names, worker_names): (Vec<_>, _) = workers.into_iter().unzip();
let message = WorkerMessage::Batch(batch.clone());
let message = WorkerBatchMessage{batch: batch.clone()};
let handlers = self.network.broadcast(worker_names, &message).await;

// Collect all the handlers to receive acknowledgements.
Expand Down
4 changes: 2 additions & 2 deletions narwhal/worker/src/tests/handlers_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ async fn synchronize() {
}
#[async_trait]
impl WorkerToWorker for MockWorkerToWorker {
async fn send_message(
async fn report_batch(
&self,
_request: anemo::Request<WorkerMessage>,
_request: anemo::Request<WorkerBatchMessage>,
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
unimplemented!();
}
Expand Down
5 changes: 3 additions & 2 deletions narwhal/worker/src/tests/quorum_waiter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use super::*;
use crate::worker::WorkerMessage;
use test_utils::{batch, test_network, CommitteeFixture, WorkerToWorkerMockServer};

#[tokio::test]
Expand Down Expand Up @@ -36,7 +35,9 @@ async fn wait_for_quorum() {

// Make a batch.
let batch = batch();
let message = WorkerMessage::Batch(batch.clone());
let message = WorkerBatchMessage {
batch: batch.clone(),
};

// Spawn enough listeners to acknowledge our batches.
let mut listener_handles = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion narwhal/worker/src/tests/worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn handle_clients_transactions() {
for worker in fixture.authorities().skip(1).map(|a| a.worker(worker_id)) {
let mut mock_server = MockWorkerToWorker::new();
mock_server
.expect_send_message()
.expect_report_batch()
.returning(|_| Ok(anemo::Response::new(())));
let routes = anemo::Router::new().add_rpc_service(WorkerToWorkerServer::new(mock_server));
peer_networks.push(worker.new_network(routes));
Expand Down
1 change: 0 additions & 1 deletion narwhal/worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ pub mod worker_tests;
pub const CHANNEL_CAPACITY: usize = 1_000;

use crate::metrics::{Metrics, WorkerEndpointMetrics, WorkerMetrics};
pub use types::WorkerMessage;

pub struct Worker {
/// The public key of this authority.
Expand Down

0 comments on commit 69ef705

Please sign in to comment.