From 035104913fd7b7cf8c680129731fc66e6c2806c3 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Fri, 29 Apr 2022 15:31:52 -0700 Subject: [PATCH] network: convert to grpc --- Cargo.lock | 242 ++++++- Cargo.toml | 3 +- .../sui-network}/Cargo.toml | 18 +- crates/sui-network/README.md | 11 + crates/sui-network/proto/common.proto | 12 + crates/sui-network/proto/validator.proto | 19 + crates/sui-network/src/api.rs | 33 + .../sui-network/src/generated/sui.common.rs | 9 + .../src/generated/sui.validator.rs | 668 ++++++++++++++++++ .../sui-network}/src/lib.rs | 6 +- crates/sui-network/src/network.rs | 109 +++ crates/sui-network/tests/bootstrap.rs | 66 ++ doc/src/build/install.md | 2 +- network_utils/src/network.rs | 240 ------- network_utils/src/transport.rs | 239 ------- .../src/unit_tests/transport_tests.rs | 107 --- sui/Cargo.toml | 3 +- sui/src/benchmark.rs | 44 +- sui/src/benchmark/bench_types.rs | 10 +- sui/src/benchmark/load_generator.rs | 64 +- sui/src/benchmark/validator_preparer.rs | 39 +- sui/src/config.rs | 39 +- sui/src/gateway_config.rs | 38 +- sui/src/sui_commands.rs | 63 +- sui/tests/shared_objects_tests.rs | 96 ++- sui_core/Cargo.toml | 4 +- sui_core/src/authority_client.rs | 203 +++--- sui_core/src/authority_server.rs | 603 ++++++++-------- sui_core/src/consensus_adapter.rs | 51 +- sui_core/src/unit_tests/server_tests.rs | 196 ++--- test_utils/Cargo.toml | 2 +- test_utils/src/authority.rs | 25 +- test_utils/src/lib.rs | 12 +- test_utils/src/sequencer.rs | 437 ------------ 34 files changed, 1939 insertions(+), 1774 deletions(-) rename {network_utils => crates/sui-network}/Cargo.toml (50%) create mode 100644 crates/sui-network/README.md create mode 100644 crates/sui-network/proto/common.proto create mode 100644 crates/sui-network/proto/validator.proto create mode 100644 crates/sui-network/src/api.rs create mode 100644 crates/sui-network/src/generated/sui.common.rs create mode 100644 crates/sui-network/src/generated/sui.validator.rs rename {network_utils => crates/sui-network}/src/lib.rs (56%) create mode 100644 crates/sui-network/src/network.rs create mode 100644 crates/sui-network/tests/bootstrap.rs delete mode 100644 network_utils/src/network.rs delete mode 100644 network_utils/src/transport.rs delete mode 100644 network_utils/src/unit_tests/transport_tests.rs delete mode 100644 test_utils/src/sequencer.rs diff --git a/Cargo.lock b/Cargo.lock index 27358bdcbf5ba..50a86fe5ddd52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -261,6 +261,27 @@ dependencies = [ "syn", ] +[[package]] +name = "async-stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.53" @@ -770,6 +791,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "cmake" +version = "0.1.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a" +dependencies = [ + "cc", +] + [[package]] name = "codespan" version = "0.11.1" @@ -1464,7 +1494,7 @@ dependencies = [ "serde 1.0.137", "thiserror", "tokio", - "tokio-util", + "tokio-util 0.7.1", "tracing", "typed-store 0.1.0 (git+https://github.com/mystenlabs/mysten-infra.git?rev=808de09203d147b43d59114b8afd9e51cbcf5778)", "types", @@ -1748,7 +1778,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util", + "tokio-util 0.7.1", "tracing", ] @@ -1910,6 +1940,18 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -2069,7 +2111,7 @@ dependencies = [ "thiserror", "tokio", "tokio-rustls", - "tokio-util", + "tokio-util 0.7.1", "tracing", "webpki-roots", ] @@ -2189,7 +2231,7 @@ dependencies = [ "serde_json", "soketto", "tokio", - "tokio-util", + "tokio-util 0.7.1", "tracing", ] @@ -3097,6 +3139,12 @@ dependencies = [ "smallvec", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "name_variant" version = "1.0.0" @@ -3132,7 +3180,7 @@ dependencies = [ "rand 0.7.3", "thiserror", "tokio", - "tokio-util", + "tokio-util 0.7.1", "tracing", ] @@ -3178,7 +3226,7 @@ dependencies = [ "rand 0.7.3", "thiserror", "tokio", - "tokio-util", + "tokio-util 0.7.1", "tracing", "tracing-log", "tracing-subscriber 0.3.11", @@ -3743,6 +3791,16 @@ dependencies = [ "output_vt100", ] +[[package]] +name = "prettyplease" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9e07e3a46d0771a8a06b5f4441527802830b43e679ba12f44960f48dd4c6803" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "primary" version = "0.1.0" @@ -3764,7 +3822,7 @@ dependencies = [ "serde 1.0.137", "thiserror", "tokio", - "tokio-util", + "tokio-util 0.7.1", "tracing", "typed-store 0.1.0 (git+https://github.com/mystenlabs/mysten-infra.git?rev=808de09203d147b43d59114b8afd9e51cbcf5778)", "types", @@ -3813,6 +3871,61 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "prost" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a07b0857a71a8cb765763950499cae2413c3f9cede1133478c43600d9e146890" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "120fbe7988713f39d780a58cf1a7ef0d7ef66c6d87e5aa3438940c05357929f4" +dependencies = [ + "bytes", + "cfg-if 1.0.0", + "cmake", + "heck 0.4.0", + "itertools", + "lazy_static 1.4.0", + "log", + "multimap", + "petgraph 0.6.0", + "prost", + "prost-types", + "regex", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b670f45da57fb8542ebdbb6105a925fe571b67f9e7ed9f47a06a84e72b4e7cc" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d0a014229361011dc8e69c8a1ec6c2e8d0f2af7c91e3ea3f5b2170298461e68" +dependencies = [ + "bytes", + "prost", +] + [[package]] name = "ptree" version = "0.4.0" @@ -4809,7 +4922,7 @@ dependencies = [ "tempfile", "test_utils", "tokio", - "tokio-util", + "tokio-util 0.7.1", "toml", "tracing", "tracing-subscriber 0.3.11", @@ -4890,14 +5003,17 @@ dependencies = [ [[package]] name = "sui-network" -version = "0.1.0" +version = "0.0.0" dependencies = [ "async-trait", + "bincode", "bytes", - "futures", - "sui-types", + "prost", + "prost-build", + "serde 1.0.137", "tokio", - "tokio-util", + "tonic", + "tonic-build", "tracing", ] @@ -5046,6 +5162,8 @@ dependencies = [ "tempfile", "test_utils", "tokio", + "tokio-stream", + "tokio-util 0.7.1", "tracing", "typed-store 0.1.0 (git+https://github.com/MystenLabs/mysten-infra?rev=d2976a45420147ad821baae96e6fe4b12215f743)", ] @@ -5165,7 +5283,7 @@ dependencies = [ "sui_core", "tempfile", "tokio", - "tokio-util", + "tokio-util 0.7.1", "tracing", "typed-store 0.1.0 (git+https://github.com/MystenLabs/mysten-infra?rev=d2976a45420147ad821baae96e6fe4b12215f743)", ] @@ -5287,6 +5405,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "1.7.0" @@ -5318,6 +5446,21 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util 0.6.9", +] + +[[package]] +name = "tokio-util" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", ] [[package]] @@ -5344,6 +5487,51 @@ dependencies = [ "serde 1.0.137", ] +[[package]] +name = "tonic" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30fb54bf1e446f44d870d260d99957e7d11fb9d0a0f5bd1a662ad1411cc103f9" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "prost-derive", + "tokio", + "tokio-stream", + "tokio-util 0.7.1", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + +[[package]] +name = "tonic-build" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c03447cdc9eaf8feffb6412dcb27baf2db11669a6c4789f29da799aabfb99547" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn", +] + [[package]] name = "tower" version = "0.4.12" @@ -5352,10 +5540,13 @@ checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e" dependencies = [ "futures-core", "futures-util", + "indexmap", "pin-project", "pin-project-lite", + "rand 0.8.5", + "slab", "tokio", - "tokio-util", + "tokio-util 0.7.1", "tower-layer", "tower-service", "tracing", @@ -5437,6 +5628,16 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + [[package]] name = "tracing-log" version = "0.1.3" @@ -5857,6 +6058,17 @@ dependencies = [ "webpki", ] +[[package]] +name = "which" +version = "4.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c4fb54e6113b6a8772ee41c3404fb0301ac79604489467e0a9ce1f3e97c24ae" +dependencies = [ + "either", + "lazy_static 1.4.0", + "libc", +] + [[package]] name = "widestring" version = "0.4.3" @@ -5997,7 +6209,7 @@ dependencies = [ "primary", "serde 1.0.137", "tokio", - "tokio-util", + "tokio-util 0.7.1", "tracing", "typed-store 0.1.0 (git+https://github.com/mystenlabs/mysten-infra.git?rev=808de09203d147b43d59114b8afd9e51cbcf5778)", "types", diff --git a/Cargo.toml b/Cargo.toml index d16e9ca1d9cd9..c33c014ed23ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,8 @@ members = [ "sui_programmability/adapter/transactional-tests", "sui_types", "faucet", - "test_utils" + "test_utils", + "crates/sui-network", ] [profile.release] diff --git a/network_utils/Cargo.toml b/crates/sui-network/Cargo.toml similarity index 50% rename from network_utils/Cargo.toml rename to crates/sui-network/Cargo.toml index c93aa41831a82..236efcc30cc35 100644 --- a/network_utils/Cargo.toml +++ b/crates/sui-network/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sui-network" -version = "0.1.0" +version = "0.0.0" authors = ["Mysten Labs "] license = "Apache-2.0" publish = false @@ -8,14 +8,14 @@ edition = "2021" [dependencies] bytes = "1.1.0" -futures = "0.3.21" async-trait = "0.1.53" tokio = { version = "1.17.0", features = ["full"] } -tracing = { version = "0.1.34", features = ["log"] } -tokio-util = { version = "0.7.1", features = ["codec"] } +tracing = "0.1.34" +tonic = "0.7" +prost = "0.10" +bincode = "1.3.3" +serde = "1.0.136" -sui-types = { path = "../sui_types" } - - -[package.metadata.cargo-udeps.ignore] -normal = ["net2"] +[dev-dependencies] +tonic-build = { version = "0.7", features = [ "prost", "transport" ] } +prost-build = "0.10.1" diff --git a/crates/sui-network/README.md b/crates/sui-network/README.md new file mode 100644 index 0000000000000..b01fa1074ea3c --- /dev/null +++ b/crates/sui-network/README.md @@ -0,0 +1,11 @@ +# sui-network + +## Changing an RPC service + +The general process for changing an RPC service is as follows: +1. Change the corresponding `.proto` file in the `proto` directory +2. Run `cargo test --test bootstrap` to re-run the code generation. + Generated rust files are in the `src/generated` directory. +3. Update any other corresponding logic that would have been affected by + the interface change, e.g. the server implementation of the service or + usages of the generated client. diff --git a/crates/sui-network/proto/common.proto b/crates/sui-network/proto/common.proto new file mode 100644 index 0000000000000..8d87a9efbc575 --- /dev/null +++ b/crates/sui-network/proto/common.proto @@ -0,0 +1,12 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; + +package sui.common; + +// A bincode encoded payload. This is intended to be used in the short-term +// while we don't have good protobuf definitions for sui types +message BincodeEncodedPayload { + bytes payload = 1; +} diff --git a/crates/sui-network/proto/validator.proto b/crates/sui-network/proto/validator.proto new file mode 100644 index 0000000000000..cda0588d9df55 --- /dev/null +++ b/crates/sui-network/proto/validator.proto @@ -0,0 +1,19 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; + +package sui.validator; + +import "common.proto"; + +// The Validator interface +service Validator { + rpc Transaction (common.BincodeEncodedPayload) returns (common.BincodeEncodedPayload) {} + rpc ConfirmationTransaction (common.BincodeEncodedPayload) returns (common.BincodeEncodedPayload) {} + rpc ConsensusTransaction (common.BincodeEncodedPayload) returns (common.BincodeEncodedPayload) {} + rpc AccountInfo (common.BincodeEncodedPayload) returns (common.BincodeEncodedPayload) {} + rpc ObjectInfo (common.BincodeEncodedPayload) returns (common.BincodeEncodedPayload) {} + rpc TransactionInfo (common.BincodeEncodedPayload) returns (common.BincodeEncodedPayload) {} + rpc BatchInfo (common.BincodeEncodedPayload) returns (stream common.BincodeEncodedPayload) {} +} diff --git a/crates/sui-network/src/api.rs b/crates/sui-network/src/api.rs new file mode 100644 index 0000000000000..ebae7a6f47c9a --- /dev/null +++ b/crates/sui-network/src/api.rs @@ -0,0 +1,33 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +#[path = "generated/sui.validator.rs"] +#[rustfmt::skip] +mod validator; + +#[path = "generated/sui.common.rs"] +#[rustfmt::skip] +mod common; + +pub use common::BincodeEncodedPayload; +pub use validator::{ + validator_client::ValidatorClient, + validator_server::{Validator, ValidatorServer}, +}; + +impl BincodeEncodedPayload { + pub fn deserialize(&self) -> Result { + bincode::deserialize(self.payload.as_ref()) + } + + pub fn try_from(value: &T) -> Result { + let payload = bincode::serialize(value)?.into(); + Ok(Self { payload }) + } +} + +impl From for BincodeEncodedPayload { + fn from(payload: bytes::Bytes) -> Self { + Self { payload } + } +} diff --git a/crates/sui-network/src/generated/sui.common.rs b/crates/sui-network/src/generated/sui.common.rs new file mode 100644 index 0000000000000..988bfa3a17766 --- /dev/null +++ b/crates/sui-network/src/generated/sui.common.rs @@ -0,0 +1,9 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 +/// A bincode encoded payload. This is intended to be used in the short-term +/// while we don't have good protobuf definitions for sui types +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BincodeEncodedPayload { + #[prost(bytes="bytes", tag="1")] + pub payload: ::prost::bytes::Bytes, +} diff --git a/crates/sui-network/src/generated/sui.validator.rs b/crates/sui-network/src/generated/sui.validator.rs new file mode 100644 index 0000000000000..37dff6bc11355 --- /dev/null +++ b/crates/sui-network/src/generated/sui.validator.rs @@ -0,0 +1,668 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 +/// Generated client implementations. +pub mod validator_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// The Validator interface + #[derive(Debug, Clone)] + pub struct ValidatorClient { + inner: tonic::client::Grpc, + } + impl ValidatorClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: std::convert::TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl ValidatorClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> ValidatorClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + ValidatorClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with `gzip`. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_gzip(mut self) -> Self { + self.inner = self.inner.send_gzip(); + self + } + /// Enable decompressing responses with `gzip`. + #[must_use] + pub fn accept_gzip(mut self) -> Self { + self.inner = self.inner.accept_gzip(); + self + } + pub async fn transaction( + &mut self, + request: impl tonic::IntoRequest, + ) -> Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sui.validator.Validator/Transaction", + ); + self.inner.unary(request.into_request(), path, codec).await + } + pub async fn confirmation_transaction( + &mut self, + request: impl tonic::IntoRequest, + ) -> Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sui.validator.Validator/ConfirmationTransaction", + ); + self.inner.unary(request.into_request(), path, codec).await + } + pub async fn consensus_transaction( + &mut self, + request: impl tonic::IntoRequest, + ) -> Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sui.validator.Validator/ConsensusTransaction", + ); + self.inner.unary(request.into_request(), path, codec).await + } + pub async fn account_info( + &mut self, + request: impl tonic::IntoRequest, + ) -> Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sui.validator.Validator/AccountInfo", + ); + self.inner.unary(request.into_request(), path, codec).await + } + pub async fn object_info( + &mut self, + request: impl tonic::IntoRequest, + ) -> Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sui.validator.Validator/ObjectInfo", + ); + self.inner.unary(request.into_request(), path, codec).await + } + pub async fn transaction_info( + &mut self, + request: impl tonic::IntoRequest, + ) -> Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sui.validator.Validator/TransactionInfo", + ); + self.inner.unary(request.into_request(), path, codec).await + } + pub async fn batch_info( + &mut self, + request: impl tonic::IntoRequest, + ) -> Result< + tonic::Response< + tonic::codec::Streaming, + >, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sui.validator.Validator/BatchInfo", + ); + self.inner.server_streaming(request.into_request(), path, codec).await + } + } +} +/// Generated server implementations. +pub mod validator_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + ///Generated trait containing gRPC methods that should be implemented for use with ValidatorServer. + #[async_trait] + pub trait Validator: Send + Sync + 'static { + async fn transaction( + &self, + request: tonic::Request, + ) -> Result< + tonic::Response, + tonic::Status, + >; + async fn confirmation_transaction( + &self, + request: tonic::Request, + ) -> Result< + tonic::Response, + tonic::Status, + >; + async fn consensus_transaction( + &self, + request: tonic::Request, + ) -> Result< + tonic::Response, + tonic::Status, + >; + async fn account_info( + &self, + request: tonic::Request, + ) -> Result< + tonic::Response, + tonic::Status, + >; + async fn object_info( + &self, + request: tonic::Request, + ) -> Result< + tonic::Response, + tonic::Status, + >; + async fn transaction_info( + &self, + request: tonic::Request, + ) -> Result< + tonic::Response, + tonic::Status, + >; + ///Server streaming response type for the BatchInfo method. + type BatchInfoStream: futures_core::Stream< + Item = Result, + > + + Send + + 'static; + async fn batch_info( + &self, + request: tonic::Request, + ) -> Result, tonic::Status>; + } + /// The Validator interface + #[derive(Debug)] + pub struct ValidatorServer { + inner: _Inner, + accept_compression_encodings: (), + send_compression_encodings: (), + } + struct _Inner(Arc); + impl ValidatorServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + } + impl tonic::codegen::Service> for ValidatorServer + where + T: Validator, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/sui.validator.Validator/Transaction" => { + #[allow(non_camel_case_types)] + struct TransactionSvc(pub Arc); + impl< + T: Validator, + > tonic::server::UnaryService< + super::super::common::BincodeEncodedPayload, + > for TransactionSvc { + type Response = super::super::common::BincodeEncodedPayload; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + super::super::common::BincodeEncodedPayload, + >, + ) -> Self::Future { + let inner = self.0.clone(); + let fut = async move { (*inner).transaction(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = TransactionSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/sui.validator.Validator/ConfirmationTransaction" => { + #[allow(non_camel_case_types)] + struct ConfirmationTransactionSvc(pub Arc); + impl< + T: Validator, + > tonic::server::UnaryService< + super::super::common::BincodeEncodedPayload, + > for ConfirmationTransactionSvc { + type Response = super::super::common::BincodeEncodedPayload; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + super::super::common::BincodeEncodedPayload, + >, + ) -> Self::Future { + let inner = self.0.clone(); + let fut = async move { + (*inner).confirmation_transaction(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = ConfirmationTransactionSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/sui.validator.Validator/ConsensusTransaction" => { + #[allow(non_camel_case_types)] + struct ConsensusTransactionSvc(pub Arc); + impl< + T: Validator, + > tonic::server::UnaryService< + super::super::common::BincodeEncodedPayload, + > for ConsensusTransactionSvc { + type Response = super::super::common::BincodeEncodedPayload; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + super::super::common::BincodeEncodedPayload, + >, + ) -> Self::Future { + let inner = self.0.clone(); + let fut = async move { + (*inner).consensus_transaction(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = ConsensusTransactionSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/sui.validator.Validator/AccountInfo" => { + #[allow(non_camel_case_types)] + struct AccountInfoSvc(pub Arc); + impl< + T: Validator, + > tonic::server::UnaryService< + super::super::common::BincodeEncodedPayload, + > for AccountInfoSvc { + type Response = super::super::common::BincodeEncodedPayload; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + super::super::common::BincodeEncodedPayload, + >, + ) -> Self::Future { + let inner = self.0.clone(); + let fut = async move { + (*inner).account_info(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = AccountInfoSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/sui.validator.Validator/ObjectInfo" => { + #[allow(non_camel_case_types)] + struct ObjectInfoSvc(pub Arc); + impl< + T: Validator, + > tonic::server::UnaryService< + super::super::common::BincodeEncodedPayload, + > for ObjectInfoSvc { + type Response = super::super::common::BincodeEncodedPayload; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + super::super::common::BincodeEncodedPayload, + >, + ) -> Self::Future { + let inner = self.0.clone(); + let fut = async move { (*inner).object_info(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = ObjectInfoSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/sui.validator.Validator/TransactionInfo" => { + #[allow(non_camel_case_types)] + struct TransactionInfoSvc(pub Arc); + impl< + T: Validator, + > tonic::server::UnaryService< + super::super::common::BincodeEncodedPayload, + > for TransactionInfoSvc { + type Response = super::super::common::BincodeEncodedPayload; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + super::super::common::BincodeEncodedPayload, + >, + ) -> Self::Future { + let inner = self.0.clone(); + let fut = async move { + (*inner).transaction_info(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = TransactionInfoSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/sui.validator.Validator/BatchInfo" => { + #[allow(non_camel_case_types)] + struct BatchInfoSvc(pub Arc); + impl< + T: Validator, + > tonic::server::ServerStreamingService< + super::super::common::BincodeEncodedPayload, + > for BatchInfoSvc { + type Response = super::super::common::BincodeEncodedPayload; + type ResponseStream = T::BatchInfoStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + super::super::common::BincodeEncodedPayload, + >, + ) -> Self::Future { + let inner = self.0.clone(); + let fut = async move { (*inner).batch_info(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = BatchInfoSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for ValidatorServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(self.0.clone()) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::transport::NamedService for ValidatorServer { + const NAME: &'static str = "sui.validator.Validator"; + } +} diff --git a/network_utils/src/lib.rs b/crates/sui-network/src/lib.rs similarity index 56% rename from network_utils/src/lib.rs rename to crates/sui-network/src/lib.rs index 3b09c71376038..915210676d05e 100644 --- a/network_utils/src/lib.rs +++ b/crates/sui-network/src/lib.rs @@ -1,6 +1,8 @@ -// Copyright (c) 2021, Facebook, Inc. and its affiliates // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +pub mod api; pub mod network; -pub mod transport; + +pub use prost; +pub use tonic; diff --git a/crates/sui-network/src/network.rs b/crates/sui-network/src/network.rs new file mode 100644 index 0000000000000..0cb13ffd6a873 --- /dev/null +++ b/crates/sui-network/src/network.rs @@ -0,0 +1,109 @@ +// Copyright (c) 2021, Facebook, Inc. and its affiliates +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + net::TcpListener, + sync::atomic::{AtomicUsize, Ordering}, +}; + +#[derive(Clone, Debug)] +pub struct NetworkClient { + base_address: String, + base_port: u16, + _buffer_size: usize, + send_timeout: std::time::Duration, + recv_timeout: std::time::Duration, +} + +impl NetworkClient { + pub fn new( + base_address: String, + base_port: u16, + buffer_size: usize, + send_timeout: std::time::Duration, + recv_timeout: std::time::Duration, + ) -> Self { + NetworkClient { + base_address, + base_port, + _buffer_size: buffer_size, + send_timeout, + recv_timeout, + } + } + + pub fn base_address(&self) -> &str { + &self.base_address + } + + pub fn base_port(&self) -> u16 { + self.base_port + } + + pub fn send_timeout(&self) -> std::time::Duration { + self.send_timeout + } + + pub fn recv_timeout(&self) -> std::time::Duration { + self.recv_timeout + } +} + +pub struct NetworkServer { + pub base_address: String, + pub base_port: u16, + pub buffer_size: usize, + // Stats + packets_processed: AtomicUsize, + user_errors: AtomicUsize, +} + +impl NetworkServer { + pub fn new(base_address: String, base_port: u16, buffer_size: usize) -> Self { + Self { + base_address, + base_port, + buffer_size, + packets_processed: AtomicUsize::new(0), + user_errors: AtomicUsize::new(0), + } + } + + pub fn packets_processed(&self) -> usize { + self.packets_processed.load(Ordering::Relaxed) + } + + pub fn increment_packets_processed(&self) { + self.packets_processed.fetch_add(1, Ordering::Relaxed); + } + + pub fn user_errors(&self) -> usize { + self.user_errors.load(Ordering::Relaxed) + } + + pub fn increment_user_errors(&self) { + self.user_errors.fetch_add(1, Ordering::Relaxed); + } +} + +pub struct PortAllocator { + next_port: u16, +} + +impl PortAllocator { + pub fn new(starting_port: u16) -> Self { + Self { + next_port: starting_port, + } + } + pub fn next_port(&mut self) -> Option { + for port in self.next_port..65535 { + if TcpListener::bind(("127.0.0.1", port)).is_ok() { + self.next_port = port + 1; + return Some(port); + } + } + None + } +} diff --git a/crates/sui-network/tests/bootstrap.rs b/crates/sui-network/tests/bootstrap.rs new file mode 100644 index 0000000000000..9f52a8c0cdfc5 --- /dev/null +++ b/crates/sui-network/tests/bootstrap.rs @@ -0,0 +1,66 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + fs, + path::{Path, PathBuf}, + process::Command, +}; + +type Result = ::std::result::Result>; + +#[test] +fn bootstrap() { + let proto_files = &["proto/validator.proto", "proto/common.proto"]; + let dirs = &["proto"]; + + let out_dir = PathBuf::from(std::env!("CARGO_MANIFEST_DIR")) + .join("src") + .join("generated"); + + // Use `Bytes` instead of `Vec` for bytes fields + let mut config = prost_build::Config::new(); + config.bytes(&["."]); + + tonic_build::configure() + .out_dir(format!("{}", out_dir.display())) + .compile_with_config(config, proto_files, dirs) + .unwrap(); + + prepend_license(&out_dir).unwrap(); + + let status = Command::new("git") + .arg("diff") + .arg("--exit-code") + .arg("--") + .arg(format!("{}", out_dir.display())) + .status() + .unwrap(); + + if !status.success() { + panic!("You should commit the protobuf files"); + } +} + +fn prepend_license(directory: &Path) -> Result<()> { + for entry in fs::read_dir(directory)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() { + prepend_license_to_file(&path)?; + } + } + Ok(()) +} + +const LICENSE_HEADER: &str = "\ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 +"; + +fn prepend_license_to_file(file: &Path) -> Result<()> { + let mut contents = fs::read_to_string(file)?; + contents.insert_str(0, LICENSE_HEADER); + fs::write(file, &contents)?; + Ok(()) +} diff --git a/doc/src/build/install.md b/doc/src/build/install.md index cebeeb67aa663..24b5a44aabbb1 100644 --- a/doc/src/build/install.md +++ b/doc/src/build/install.md @@ -54,7 +54,7 @@ You can start exploring Sui's source code by looking into the following primary * [sui_core](https://github.com/MystenLabs/sui/tree/main/sui_core) - authority server and Sui Gateway * [sui_types](https://github.com/MystenLabs/sui/tree/main/sui_types) - coins, gas, and other object types * [explorer](https://github.com/MystenLabs/sui/tree/main/explorer) - object explorer for the Sui network -* [network_utils](https://github.com/MystenLabs/sui/tree/main/network_utils) - networking utilities and related unit tests +* [sui-network](https://github.com/MystenLabs/sui/tree/main/crates/sui-network) - networking interfaces To contribute updates to Sui code, [send pull requests](../contribute/index.md#send-pull-requests) our way. diff --git a/network_utils/src/network.rs b/network_utils/src/network.rs deleted file mode 100644 index 760d9bb532509..0000000000000 --- a/network_utils/src/network.rs +++ /dev/null @@ -1,240 +0,0 @@ -// Copyright (c) 2021, Facebook, Inc. and its affiliates -// Copyright (c) 2022, Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use crate::transport::*; -use bytes::{Bytes, BytesMut}; -use std::{ - net::TcpListener, - sync::atomic::{AtomicUsize, Ordering}, -}; -use sui_types::{error::*, serialize::*}; -use tracing::debug; - -use std::io; - -use futures::stream; -use futures::SinkExt; -use futures::StreamExt; -use tokio::task::JoinError; -use tokio::time; - -#[derive(Clone, Debug)] -pub struct NetworkClient { - base_address: String, - base_port: u16, - buffer_size: usize, - send_timeout: std::time::Duration, - recv_timeout: std::time::Duration, -} - -impl NetworkClient { - pub fn new( - base_address: String, - base_port: u16, - buffer_size: usize, - send_timeout: std::time::Duration, - recv_timeout: std::time::Duration, - ) -> Self { - NetworkClient { - base_address, - base_port, - buffer_size, - send_timeout, - recv_timeout, - } - } - - pub async fn connect_for_stream(&self, buf: Vec) -> Result { - let address = format!("{}:{}", self.base_address, self.base_port); - let mut tcp_stream = connect(address, self.buffer_size).await?; - // Send message - time::timeout(self.send_timeout, tcp_stream.write_data(&buf)).await??; - Ok(tcp_stream) - } - - async fn send_recv_bytes_internal(&self, buf: Vec) -> Result>, io::Error> { - let address = format!("{}:{}", self.base_address, self.base_port); - let mut stream = connect(address, self.buffer_size).await?; - // Send message - time::timeout(self.send_timeout, stream.write_data(&buf)).await??; - // Wait for reply - time::timeout(self.recv_timeout, async { - stream.read_data().await.transpose() - }) - .await? - } - - pub async fn send_recv_bytes(&self, buf: Vec) -> Result { - match self.send_recv_bytes_internal(buf).await { - Err(error) => Err(SuiError::ClientIoError { - error: format!("{error}"), - }), - Ok(Some(response)) => { - // Parse reply - match deserialize_message(&response[..]) { - Ok(SerializedMessage::Error(error)) => Err(*error), - Ok(message) => Ok(message), - Err(_) => Err(SuiError::InvalidDecoding), - // _ => Err(SuiError::UnexpectedMessage), - } - } - Ok(None) => Err(SuiError::ClientIoError { - error: "Empty response from authority.".to_string(), - }), - } - } - - pub fn base_address(&self) -> &str { - &self.base_address - } - - pub fn base_port(&self) -> u16 { - self.base_port - } - - async fn batch_send_one_chunk( - &self, - requests: Vec, - _max_in_flight: u64, - ) -> Vec> { - let address = format!("{}:{}", self.base_address, self.base_port); - let stream = connect(address, self.buffer_size) - .await - .expect("Must be able to connect."); - let total = requests.len(); - - let (read_stream, mut write_stream) = (stream.framed_read, stream.framed_write); - - let mut requests = stream::iter(requests.into_iter().map(Ok)); - tokio::spawn(async move { write_stream.send_all(&mut requests).await }); - - let mut received = 0; - let responses: Vec> = read_stream - .take_while(|_buf| { - received += 1; - if received % 5000 == 0 && received > 0 { - debug!("Received {received}"); - } - let xcontinue = received <= total; - futures::future::ready(xcontinue) - }) - .collect() - .await; - - responses - } - - pub fn batch_send( - &self, - requests: Vec, - connections: usize, - max_in_flight: u64, - ) -> impl futures::stream::Stream>, JoinError>> - { - let handles = futures::stream::FuturesUnordered::new(); - - let outer_requests: Vec<_> = requests.into_iter().collect(); - let size = outer_requests.len() / connections; - for chunk in outer_requests[..].chunks(size) { - let requests: Vec<_> = chunk.to_vec(); - let client = self.clone(); - handles.push( - tokio::spawn(async move { - debug!( - "Sending TCP requests to {}:{}", - client.base_address, client.base_port, - ); - let responses = client.batch_send_one_chunk(requests, max_in_flight).await; - // .unwrap_or_else(|_| Vec::new()); - debug!( - "Done sending TCP requests to {}:{}", - client.base_address, client.base_port, - ); - responses - }), // .then(|x| async { x.unwrap_or_else(|_| Vec::new()) }), - ); - } - - handles - } -} - -pub struct NetworkServer { - pub base_address: String, - pub base_port: u16, - pub buffer_size: usize, - // Stats - packets_processed: AtomicUsize, - user_errors: AtomicUsize, -} - -impl NetworkServer { - pub fn new(base_address: String, base_port: u16, buffer_size: usize) -> Self { - Self { - base_address, - base_port, - buffer_size, - packets_processed: AtomicUsize::new(0), - user_errors: AtomicUsize::new(0), - } - } - - pub fn packets_processed(&self) -> usize { - self.packets_processed.load(Ordering::Relaxed) - } - - pub fn increment_packets_processed(&self) { - self.packets_processed.fetch_add(1, Ordering::Relaxed); - } - - pub fn user_errors(&self) -> usize { - self.user_errors.load(Ordering::Relaxed) - } - - pub fn increment_user_errors(&self) { - self.user_errors.fetch_add(1, Ordering::Relaxed); - } -} - -pub struct PortAllocator { - next_port: u16, -} - -impl PortAllocator { - pub fn new(starting_port: u16) -> Self { - Self { - next_port: starting_port, - } - } - pub fn next_port(&mut self) -> Option { - for port in self.next_port..65535 { - if TcpListener::bind(("127.0.0.1", port)).is_ok() { - self.next_port = port + 1; - return Some(port); - } - } - None - } -} - -pub fn parse_recv_bytes( - response: Result>, io::Error>, -) -> Result { - match response { - Err(error) => Err(SuiError::ClientIoError { - error: format!("{error}"), - }), - Ok(Some(response)) => { - // Parse reply - match deserialize_message(&response[..]) { - Ok(SerializedMessage::Error(error)) => Err(*error), - Ok(message) => Ok(message), - Err(_) => Err(SuiError::InvalidDecoding), - } - } - Ok(None) => Err(SuiError::ClientIoError { - error: "Empty response from authority.".to_string(), - }), - } -} diff --git a/network_utils/src/transport.rs b/network_utils/src/transport.rs deleted file mode 100644 index 0692e518aae8b..0000000000000 --- a/network_utils/src/transport.rs +++ /dev/null @@ -1,239 +0,0 @@ -// Copyright (c) 2021, Facebook, Inc. and its affiliates -// Copyright (c) 2022, Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use futures::{Sink, SinkExt, Stream, StreamExt}; -use std::io::ErrorKind; -use std::net::SocketAddr; -use std::sync::Arc; -use tokio::net::TcpSocket; -use tokio::net::{TcpListener, TcpStream}; - -use async_trait::async_trait; - -use tracing::info; - -use bytes::{Bytes, BytesMut}; -use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; - -#[cfg(test)] -#[path = "unit_tests/transport_tests.rs"] -mod transport_tests; - -/// Suggested buffer size -pub const DEFAULT_MAX_DATAGRAM_SIZE: usize = 650000; -pub const DEFAULT_MAX_DATAGRAM_SIZE_STR: &str = "650000"; - -/// The handler required to create a service. -#[async_trait] -pub trait MessageHandler { - async fn handle_messages(&self, channel: A) -> (); -} - -/* - The RwChannel connects the low-level networking code here, that handles - TCP streams, ports, accept/connect, and sockets that provide AsyncRead / - AsyncWrite on byte streams, with the higher level logic in AuthorityServer - that handles sequences of Bytes / BytesMut, as framed messages, through - exposing a standard Stream and Sink trait. - - This separation allows us to change the details of the network, transport - and framing, without changing the authority code. It also allows us to test - the authority without using a real network. -*/ -pub trait RwChannel<'a> { - type R: 'a + Stream> + Unpin + Send; - type W: 'a + Sink + Unpin + Send; - - fn sink(&mut self) -> &mut Self::W; - fn stream(&mut self) -> &mut Self::R; -} - -/// The result of spawning a server is oneshot channel to kill it and a handle to track completion. -pub struct SpawnedServer { - state: Arc, - tx_cancellation: futures::channel::oneshot::Sender<()>, - handle: tokio::task::JoinHandle>, - local_addr: SocketAddr, -} - -impl SpawnedServer { - pub fn state(&self) -> &Arc { - &self.state - } - - pub async fn join(self) -> Result<(), std::io::Error> { - // Note that dropping `self.complete` would terminate the server. - self.handle.await??; - Ok(()) - } - - pub async fn kill(self) -> Result<(), std::io::Error> { - self.tx_cancellation.send(()).unwrap(); - self.handle.await??; - Ok(()) - } - - pub fn get_port(&self) -> u16 { - self.local_addr.port() - } -} - -/// Create a DataStream for this protocol. -pub async fn connect( - address: String, - max_data_size: usize, -) -> Result { - TcpDataStream::connect(address, max_data_size).await -} - -/// Run a server for this protocol and the given message handler. -pub async fn spawn_server( - address: &str, - state: Arc, - buffer_size: usize, -) -> Result, std::io::Error> -where - S: MessageHandler + Send + Sync + 'static, -{ - let (tx_cancellation, rx_cancellation) = futures::channel::oneshot::channel(); - info!(address =% address, "Attempting to spawn server and bind to address..."); - let std_listener = std::net::TcpListener::bind(address).map_err(|e| match e.kind() { - // Wrap custom error to give information about the address that could not be bbound - ErrorKind::AddrInUse => std::io::Error::new( - ErrorKind::AddrInUse, - format!("Address {address} was in use!"), - ), - _ => e, - })?; - - let local_addr = std_listener.local_addr()?; - let host = local_addr.ip(); - let port = local_addr.port(); - info!("Listening to TCP traffic on {host}:{port}"); - // see https://fly.io/blog/the-tokio-1-x-upgrade/#tcplistener-from_std-needs-to-be-set-to-nonblocking - std_listener.set_nonblocking(true)?; - let listener = TcpListener::from_std(std_listener)?; - - let handle = tokio::spawn(run_tcp_server( - listener, - state.clone(), - rx_cancellation, - buffer_size, - )); - Ok(SpawnedServer { - state, - tx_cancellation, - handle, - local_addr, - }) -} - -use tokio::net::tcp::OwnedReadHalf; -use tokio::net::tcp::OwnedWriteHalf; - -/// An implementation of DataStream based on TCP. -pub struct TcpDataStream { - pub framed_read: FramedRead, - pub framed_write: FramedWrite, -} - -impl TcpDataStream { - async fn connect(address: String, max_data_size: usize) -> Result { - let addr = match address.parse() { - Ok(addr) => addr, - Err(_) => { - // Maybe it's a host name, try doing lookup first - if let Some(addr) = tokio::net::lookup_host(address.clone()).await?.next() { - addr - } else { - return Err(std::io::Error::new( - ErrorKind::Other, - format!("Could not lookup address {address}"), - )); - } - } - }; - let socket = TcpSocket::new_v4()?; - socket.set_send_buffer_size(max_data_size as u32)?; - socket.set_recv_buffer_size(max_data_size as u32)?; - - let stream = socket.connect(addr).await?; - Ok(TcpDataStream::from_tcp_stream(stream, max_data_size)) - } - - fn from_tcp_stream(stream: TcpStream, max_data_size: usize) -> TcpDataStream { - let (read_half, write_half) = stream.into_split(); - - let framed_read = FramedRead::new( - read_half, - LengthDelimitedCodec::builder() - .max_frame_length(max_data_size) - .new_codec(), - ); - - let framed_write = FramedWrite::new( - write_half, - LengthDelimitedCodec::builder() - .max_frame_length(max_data_size) - .new_codec(), - ); - - Self { - framed_read, - framed_write, - } - } - - // TODO: Eliminate vecs and use Byte, ByteBuf - - pub async fn write_data<'a>(&'a mut self, buffer: &'a [u8]) -> Result<(), std::io::Error> { - self.framed_write.send(buffer.to_vec().into()).await - } - - pub async fn read_data(&mut self) -> Option, std::io::Error>> { - let result = self.framed_read.next().await; - result.map(|v| v.map(|w| w.to_vec())) - } -} - -impl<'a> RwChannel<'a> for TcpDataStream { - type W = FramedWrite; - type R = FramedRead; - - fn sink(&mut self) -> &mut Self::W { - &mut self.framed_write - } - fn stream(&mut self) -> &mut Self::R { - &mut self.framed_read - } -} - -// Server implementation for TCP. -async fn run_tcp_server( - listener: TcpListener, - state: Arc, - mut exit_future: futures::channel::oneshot::Receiver<()>, - _buffer_size: usize, -) -> Result<(), std::io::Error> -where - S: MessageHandler + Send + Sync + 'static, -{ - loop { - let stream; - - tokio::select! { - _ = &mut exit_future => break, - result = listener.accept() => { - let (value, _addr) = result?; - stream = value; - } - } - let guarded_state = state.clone(); - tokio::spawn(async move { - let framed = TcpDataStream::from_tcp_stream(stream, _buffer_size); - guarded_state.handle_messages(framed).await - }); - } - Ok(()) -} diff --git a/network_utils/src/unit_tests/transport_tests.rs b/network_utils/src/unit_tests/transport_tests.rs deleted file mode 100644 index 9133d3604e1b6..0000000000000 --- a/network_utils/src/unit_tests/transport_tests.rs +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright (c) 2021, Facebook, Inc. and its affiliates -// Copyright (c) 2022, Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use super::*; -use std::{ - sync::atomic::{AtomicUsize, Ordering}, - time::Duration, -}; -use tokio::{runtime::Runtime, time::timeout}; -use tracing::error; - -async fn get_new_local_address() -> Result { - let client = tokio::net::TcpSocket::new_v4()?; - client.set_reuseaddr(true)?; - client.bind("127.0.0.1:0".parse().unwrap())?; - Ok(format!("{}", client.local_addr()?)) -} - -struct TestService { - counter: Arc, -} - -impl TestService { - fn new(counter: Arc) -> Self { - TestService { counter } - } - - async fn handle_one_message<'a>(&'a self, buffer: &'a [u8]) -> Option> { - self.counter.fetch_add(buffer.len(), Ordering::Relaxed); - Some(Vec::from(buffer)) - } -} - -#[async_trait] -impl<'a, A> MessageHandler for TestService -where - A: 'static + RwChannel<'a> + Unpin + Send, -{ - async fn handle_messages(&self, mut channel: A) -> () { - loop { - let buffer = match channel.stream().next().await { - Some(Ok(buffer)) => buffer, - Some(Err(err)) => { - // We expect some EOF or disconnect error at the end. - error!("Error while reading TCP stream: {err}"); - break; - } - None => { - break; - } - }; - - if let Some(reply) = self.handle_one_message(&buffer[..]).await { - let status = channel.sink().send(reply.into()).await; - if let Err(error) = status { - error!("Failed to send query response: {error}"); - } - }; - } - } -} - -async fn test_server() -> Result<(usize, usize), std::io::Error> { - let address = get_new_local_address().await.unwrap(); - - let counter = Arc::new(AtomicUsize::new(0)); - let mut received = 0; - - let server = spawn_server(&address, Arc::new(TestService::new(counter.clone())), 100).await?; - - let mut client = connect(address.clone(), 1000).await?; - client.write_data(b"abcdef").await?; - received += client.read_data().await.unwrap()?.len(); - client.write_data(b"abcd").await?; - received += client.read_data().await.unwrap()?.len(); - - // Try to read data on the first connection (should fail). - received += timeout(Duration::from_millis(500), client.read_data()) - .await - .unwrap_or_else(|_| Some(Ok(Vec::new()))) - .unwrap()? - .len(); - - // Attempt to gracefully kill server. - server.kill().await?; - - timeout(Duration::from_millis(500), client.write_data(b"abcd")) - .await - .unwrap_or(Ok(()))?; - received += timeout(Duration::from_millis(500), client.read_data()) - .await - .unwrap_or_else(|_| Some(Ok(Vec::new()))) - .unwrap()? - .len(); - - Ok((counter.load(Ordering::Relaxed), received)) -} - -#[test] -fn tcp_server() { - let rt = Runtime::new().unwrap(); - let (processed, received) = rt.block_on(test_server()).unwrap(); - // Active TCP connections are allowed to finish before the server is gracefully killed. - assert_eq!(processed, 14); - assert_eq!(received, 14); -} diff --git a/sui/Cargo.toml b/sui/Cargo.toml index b3c3735a02cbe..c739432c85c01 100644 --- a/sui/Cargo.toml +++ b/sui/Cargo.toml @@ -38,7 +38,7 @@ bcs = "0.1.3" sui_core = { path = "../sui_core" } sui-adapter = { path = "../sui_programmability/adapter" } sui-framework = { path = "../sui_programmability/framework" } -sui-network = { path = "../network_utils" } +sui-network = { path = "../crates/sui-network" } sui-types = { path = "../sui_types" } sui-verifier = { path = "../sui_programmability/verifier" } sui-open-rpc = { path = "open_rpc" } @@ -72,7 +72,6 @@ pretty_assertions = "1.2.0" tokio-util = { version = "0.7.1", features = ["codec"] } test_utils = { path = "../test_utils" } -sui-network = { path = "../network_utils" } [features] benchmark = ["narwhal-node/benchmark"] diff --git a/sui/src/benchmark.rs b/sui/src/benchmark.rs index f86233e3ede15..accf4930f4b76 100644 --- a/sui/src/benchmark.rs +++ b/sui/src/benchmark.rs @@ -3,26 +3,31 @@ use futures::{join, StreamExt}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; -use std::panic; -use std::thread; -use std::{thread::sleep, time::Duration}; +use std::{panic, thread, thread::sleep, time::Duration}; use sui_core::authority_client::{AuthorityAPI, NetworkAuthorityClient}; -use sui_network::network::{NetworkClient, NetworkServer}; -use sui_types::batch::UpdateItem; -use sui_types::messages::{BatchInfoRequest, BatchInfoResponseItem}; -use sui_types::serialize::*; +use sui_network::{ + network::{NetworkClient, NetworkServer}, + tonic, +}; +use sui_types::{ + batch::UpdateItem, + messages::{BatchInfoRequest, BatchInfoResponseItem}, + serialize::*, +}; use tokio::runtime::{Builder, Runtime}; use tracing::{error, info}; pub mod bench_types; pub mod load_generator; pub mod transaction_creator; pub mod validator_preparer; -use crate::benchmark::bench_types::{Benchmark, BenchmarkType}; -use crate::benchmark::load_generator::{ - calculate_throughput, check_transaction_response, send_tx_chunks, FixedRateLoadGenerator, +use crate::benchmark::{ + bench_types::{Benchmark, BenchmarkType}, + load_generator::{ + calculate_throughput, check_transaction_response, send_tx_chunks, FixedRateLoadGenerator, + }, + transaction_creator::TransactionCreator, + validator_preparer::ValidatorPreparer, }; -use crate::benchmark::transaction_creator::TransactionCreator; -use crate::benchmark::validator_preparer::ValidatorPreparer; use self::bench_types::{BenchmarkResult, MicroBenchmarkResult, MicroBenchmarkType}; @@ -228,7 +233,20 @@ fn run_latency_microbench( async fn run_follower(network_client: NetworkClient) { // We spawn a second client that listens to the batch interface let _batch_client_handle = tokio::task::spawn(async move { - let authority_client = NetworkAuthorityClient::new(network_client); + let uri = format!( + "http://{}:{}", + network_client.base_address(), + network_client.base_port() + ) + .parse() + .unwrap(); + let channel = tonic::transport::Channel::builder(uri) + .connect_timeout(network_client.send_timeout()) + .timeout(network_client.recv_timeout()) + .connect() + .await + .unwrap(); + let authority_client = NetworkAuthorityClient::with_channel(channel, network_client); let mut start = 0; diff --git a/sui/src/benchmark/bench_types.rs b/sui/src/benchmark/bench_types.rs index 4d666fc213d9d..24ed5df834245 100644 --- a/sui/src/benchmark/bench_types.rs +++ b/sui/src/benchmark/bench_types.rs @@ -3,10 +3,8 @@ use super::load_generator::calculate_throughput; use clap::*; -use std::default::Default; -use std::path::PathBuf; +use std::{default::Default, path::PathBuf}; use strum_macros::EnumString; -use sui_network::transport; #[derive(Debug, Clone, Parser)] #[clap( @@ -18,13 +16,13 @@ pub struct Benchmark { #[clap(long, default_value = "1", global = true)] pub committee_size: usize, /// Timeout for sending queries (us) - #[clap(long, default_value = "40000000", global = true)] + #[clap(long, default_value = "400000000", global = true)] pub send_timeout_us: u64, /// Timeout for receiving responses (us) - #[clap(long, default_value = "40000000", global = true)] + #[clap(long, default_value = "400000000", global = true)] pub recv_timeout_us: u64, /// Maximum size of datagrams received and sent (bytes) - #[clap(long, default_value = transport::DEFAULT_MAX_DATAGRAM_SIZE_STR, global = true)] + #[clap(long, default_value = "65000", global = true)] pub buffer_size: usize, /// Number of connections to the server #[clap(long, default_value = "0", global = true)] diff --git a/sui/src/benchmark/load_generator.rs b/sui/src/benchmark/load_generator.rs index 17a60dd8d6fdd..d315fd4124def 100644 --- a/sui/src/benchmark/load_generator.rs +++ b/sui/src/benchmark/load_generator.rs @@ -1,25 +1,27 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -#![deny(warnings)] - use anyhow::Error; -use bytes::{Bytes, BytesMut}; -use futures::channel::mpsc::{channel as MpscChannel, Receiver, Sender as MpscSender}; -use futures::stream::StreamExt; -use futures::SinkExt; - +use bytes::Bytes; +use futures::{ + channel::mpsc::{channel as MpscChannel, Receiver, Sender as MpscSender}, + stream::StreamExt, + SinkExt, +}; use rayon::prelude::*; -use std::io; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use sui_core::authority::*; -use sui_core::authority_server::AuthorityServer; +use std::{ + io, + sync::Arc, + time::{Duration, Instant}, +}; +use sui_core::{ + authority::*, + authority_client::{AuthorityAPI, NetworkAuthorityClient}, + authority_server::{AuthorityServer, AuthorityServerHandle}, +}; use sui_network::network::{NetworkClient, NetworkServer}; -use sui_network::transport; use sui_types::{messages::*, serialize::*}; -use tokio::sync::Notify; -use tokio::time; +use tokio::{sync::Notify, time}; use tracing::{error, info}; pub fn check_transaction_response(reply_message: Result) { @@ -41,15 +43,31 @@ pub fn check_transaction_response(reply_message: Result, net_client: NetworkClient, - conn: usize, -) -> (u128, Vec>) { + _conn: usize, +) -> (u128, Vec, io::Error>>) { let time_start = Instant::now(); - let tx_resp = net_client - .batch_send(tx_chunks, conn, 0) - .map(|x| x.unwrap()) - .concat() - .await; + // This probably isn't going to be as fast so we probably want to provide away to send a batch + // of txns to the authority at a time + let client = NetworkAuthorityClient::new(net_client); + let mut tx_resp = Vec::new(); + for tx in tx_chunks { + let message = deserialize_message(&tx[..]).unwrap(); + let resp = match message { + SerializedMessage::Transaction(transaction) => client + .handle_transaction(*transaction) + .await + .map(|resp| serialize_transaction_info(&resp)) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), + SerializedMessage::Cert(cert) => client + .handle_confirmation_transaction(ConfirmationTransaction { certificate: *cert }) + .await + .map(|resp| serialize_transaction_info(&resp)) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), + _ => panic!("unexpected message type"), + }; + tx_resp.push(resp); + } let elapsed = time_start.elapsed().as_micros(); @@ -167,7 +185,7 @@ impl FixedRateLoadGenerator { pub async fn spawn_authority_server( network_server: NetworkServer, state: AuthorityState, -) -> transport::SpawnedServer { +) -> AuthorityServerHandle { // The following two fields are only needed for shared objects (not by this bench). let consensus_address = "127.0.0.1:0".parse().unwrap(); let (tx_consensus_listener, _rx_consensus_listener) = tokio::sync::mpsc::channel(1); diff --git a/sui/src/benchmark/validator_preparer.rs b/sui/src/benchmark/validator_preparer.rs index f55271249ee54..5fd8e0fac5fcc 100644 --- a/sui/src/benchmark/validator_preparer.rs +++ b/sui/src/benchmark/validator_preparer.rs @@ -2,29 +2,34 @@ // SPDX-License-Identifier: Apache-2.0 #![allow(clippy::large_enum_variant)] -use crate::benchmark::bench_types::RunningMode; -use crate::benchmark::load_generator::spawn_authority_server; -use crate::config::{AccountConfig, Config, GenesisConfig, ObjectConfig}; + +use crate::{ + benchmark::{bench_types::RunningMode, load_generator::spawn_authority_server}, + config::{AccountConfig, Config, GenesisConfig, ObjectConfig}, +}; use rocksdb::Options; -use std::env; -use std::fs; -use std::panic; -use std::path::{Path, PathBuf}; -use std::process::Child; -use std::process::Command; -use std::sync::Arc; -use std::thread; -use std::{thread::sleep, time::Duration}; +use std::{ + env, fs, panic, + path::{Path, PathBuf}, + process::{Child, Command}, + sync::Arc, + thread, + thread::sleep, + time::Duration, +}; use sui_adapter::genesis; use sui_core::authority::*; use sui_network::network::NetworkServer; -use sui_types::base_types::SuiAddress; -use sui_types::crypto::{random_key_pairs, KeyPair, PublicKeyBytes}; -use sui_types::gas_coin::GasCoin; -use sui_types::object::Object; -use sui_types::{base_types::*, committee::*}; +use sui_types::{ + base_types::{SuiAddress, *}, + committee::*, + crypto::{random_key_pairs, KeyPair, PublicKeyBytes}, + gas_coin::GasCoin, + object::Object, +}; use tokio::runtime::{Builder, Runtime}; use tracing::{error, info}; + const GENESIS_CONFIG_NAME: &str = "genesis_config.json"; pub const VALIDATOR_BINARY_NAME: &str = "validator"; diff --git a/sui/src/config.rs b/sui/src/config.rs index bd2bbebcf9500..a45eefe1e8e03 100644 --- a/sui/src/config.rs +++ b/sui/src/config.rs @@ -2,30 +2,31 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::gateway_config::GatewayType; -use crate::keystore::KeystoreType; -use narwhal_config::Committee as ConsensusCommittee; -use narwhal_config::{Authority, PrimaryAddresses, Stake, WorkerAddresses}; +use crate::{gateway_config::GatewayType, keystore::KeystoreType}; +use narwhal_config::{ + Authority, Committee as ConsensusCommittee, PrimaryAddresses, Stake, WorkerAddresses, +}; use narwhal_crypto::ed25519::Ed25519PublicKey; use once_cell::sync::Lazy; -use serde::de::DeserializeOwned; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::Value; -use serde_with::hex::Hex; -use serde_with::serde_as; -use std::fmt::Write; -use std::fmt::{Display, Formatter}; -use std::fs::{self, File}; -use std::io::BufReader; -use std::net::{SocketAddr, ToSocketAddrs}; -use std::ops::{Deref, DerefMut}; -use std::path::{Path, PathBuf}; -use std::sync::Mutex; +use serde_with::{hex::Hex, serde_as}; +use std::{ + fmt::{Display, Formatter, Write}, + fs::{self, File}, + io::BufReader, + net::{SocketAddr, ToSocketAddrs}, + ops::{Deref, DerefMut}, + path::{Path, PathBuf}, + sync::Mutex, +}; use sui_framework::DEFAULT_FRAMEWORK_PATH; use sui_network::network::PortAllocator; -use sui_types::base_types::*; -use sui_types::committee::{Committee, EpochId}; -use sui_types::crypto::{get_key_pair, KeyPair, PublicKeyBytes}; +use sui_types::{ + base_types::*, + committee::{Committee, EpochId}, + crypto::{get_key_pair, KeyPair, PublicKeyBytes}, +}; use tracing::log::trace; const DEFAULT_WEIGHT: usize = 1; diff --git a/sui/src/gateway_config.rs b/sui/src/gateway_config.rs index be806b340b3ed..2c5b54f3d579c 100644 --- a/sui/src/gateway_config.rs +++ b/sui/src/gateway_config.rs @@ -1,24 +1,26 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::collections::BTreeMap; -use std::fmt::Write; -use std::fmt::{Display, Formatter}; -use std::path::PathBuf; -use std::time::Duration; - -use serde::Deserialize; -use serde::Serialize; - -use sui_core::authority_client::NetworkAuthorityClient; -use sui_core::gateway_state::{GatewayClient, GatewayState}; +use crate::{ + config::{AuthorityInfo, Config}, + rpc_gateway_client::RpcGatewayClient, +}; +use serde::{Deserialize, Serialize}; +use std::{ + collections::BTreeMap, + fmt::{Display, Formatter, Write}, + path::PathBuf, + time::Duration, +}; +use sui_core::{ + authority_client::NetworkAuthorityClient, + gateway_state::{GatewayClient, GatewayState}, +}; use sui_network::network::NetworkClient; -use sui_network::transport; -use sui_types::base_types::AuthorityName; -use sui_types::committee::{Committee, EpochId}; - -use crate::config::{AuthorityInfo, Config}; -use crate::rpc_gateway_client::RpcGatewayClient; +use sui_types::{ + base_types::AuthorityName, + committee::{Committee, EpochId}, +}; #[derive(Serialize, Deserialize)] #[serde(rename_all = "lowercase")] @@ -117,7 +119,7 @@ impl Default for GatewayConfig { authorities: vec![], send_timeout: Duration::from_micros(4000000), recv_timeout: Duration::from_micros(4000000), - buffer_size: transport::DEFAULT_MAX_DATAGRAM_SIZE, + buffer_size: 650000, db_folder_path: Default::default(), } } diff --git a/sui/src/sui_commands.rs b/sui/src/sui_commands.rs index 717ff4f2a37e0..26ca89a0e8251 100644 --- a/sui/src/sui_commands.rs +++ b/sui/src/sui_commands.rs @@ -1,12 +1,14 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::config::{make_default_narwhal_committee, AuthorityInfo, CONSENSUS_DB_NAME}; -use crate::config::{ - AuthorityPrivateInfo, Config, GenesisConfig, NetworkConfig, PersistedConfig, WalletConfig, +use crate::{ + config::{ + make_default_narwhal_committee, AuthorityInfo, AuthorityPrivateInfo, Config, GenesisConfig, + NetworkConfig, PersistedConfig, WalletConfig, CONSENSUS_DB_NAME, + }, + gateway_config::{GatewayConfig, GatewayType}, + keystore::{Keystore, KeystoreType, SuiKeystore}, + sui_config_dir, SUI_GATEWAY_CONFIG, SUI_NETWORK_CONFIG, SUI_WALLET_CONFIG, }; -use crate::gateway_config::{GatewayConfig, GatewayType}; -use crate::keystore::{Keystore, KeystoreType, SuiKeystore}; -use crate::{sui_config_dir, SUI_GATEWAY_CONFIG, SUI_NETWORK_CONFIG, SUI_WALLET_CONFIG}; use anyhow::{anyhow, bail}; use base64ct::{Base64, Encoding}; use clap::*; @@ -15,30 +17,29 @@ use move_binary_format::CompiledModule; use move_package::BuildConfig; use narwhal_config::{Committee as ConsensusCommittee, Parameters as ConsensusParameters}; use narwhal_crypto::ed25519::Ed25519PublicKey; -use std::collections::BTreeMap; -use std::collections::HashMap; -use std::fs; -use std::path::Path; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Duration; -use sui_adapter::adapter::generate_package_id; -use sui_adapter::genesis; -use sui_core::authority::{AuthorityState, AuthorityStore}; -use sui_core::authority_active::ActiveAuthority; -use sui_core::authority_client::NetworkAuthorityClient; -use sui_core::authority_server::AuthorityServer; -use sui_core::consensus_adapter::ConsensusListener; +use std::{ + collections::{BTreeMap, HashMap}, + fs, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; +use sui_adapter::{adapter::generate_package_id, genesis}; +use sui_core::{ + authority::{AuthorityState, AuthorityStore}, + authority_active::ActiveAuthority, + authority_client::NetworkAuthorityClient, + authority_server::{AuthorityServer, AuthorityServerHandle}, + consensus_adapter::ConsensusListener, +}; use sui_network::network::NetworkClient; -use sui_network::transport::SpawnedServer; -use sui_network::transport::DEFAULT_MAX_DATAGRAM_SIZE; -use sui_types::base_types::decode_bytes_hex; -use sui_types::base_types::encode_bytes_hex; -use sui_types::base_types::{SequenceNumber, SuiAddress, TxContext}; -use sui_types::committee::Committee; -use sui_types::crypto::{random_key_pairs, KeyPair}; -use sui_types::error::SuiResult; -use sui_types::object::Object; +use sui_types::{ + base_types::{decode_bytes_hex, encode_bytes_hex, SequenceNumber, SuiAddress, TxContext}, + committee::Committee, + crypto::{random_key_pairs, KeyPair}, + error::SuiResult, + object::Object, +}; use tokio::sync::mpsc::channel; use tracing::{error, info}; @@ -279,7 +280,7 @@ impl SuiCommand { } pub struct SuiNetwork { - pub spawned_authorities: Vec>, + pub spawned_authorities: Vec, } impl SuiNetwork { @@ -389,7 +390,7 @@ pub async fn genesis( let mut network_config = NetworkConfig { epoch: 0, authorities: vec![], - buffer_size: DEFAULT_MAX_DATAGRAM_SIZE, + buffer_size: 650000, loaded_move_packages: vec![], key_pair: genesis_conf.key_pair, }; diff --git a/sui/tests/shared_objects_tests.rs b/sui/tests/shared_objects_tests.rs index b348245f24798..ea164b94ebd08 100644 --- a/sui/tests/shared_objects_tests.rs +++ b/sui/tests/shared_objects_tests.rs @@ -1,37 +1,25 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use bytes::Bytes; -use futures::{sink::SinkExt, stream::StreamExt}; use sui::config::AuthorityPrivateInfo; -use sui_types::base_types::ObjectRef; -use sui_types::error::{SuiError, SuiResult}; -use sui_types::messages::CallArg; -use sui_types::messages::Transaction; -use sui_types::messages::TransactionInfoResponse; -use sui_types::messages::{ConsensusTransaction, ExecutionStatus}; -use sui_types::object::Object; -use sui_types::serialize::{ - deserialize_message, deserialize_transaction_info, serialize_cert, - serialize_consensus_transaction, SerializedMessage, +use sui_core::authority_client::{AuthorityAPI, NetworkAuthorityClient}; +use sui_network::network::NetworkClient; +use sui_types::{ + base_types::ObjectRef, + error::SuiResult, + messages::{ + CallArg, ConfirmationTransaction, ConsensusTransaction, ExecutionStatus, Transaction, + TransactionInfoResponse, + }, + object::Object, +}; +use test_utils::{ + authority::{spawn_test_authorities, test_authority_configs}, + messages::{ + make_certificates, move_transaction, parse_package_ref, publish_move_package_transaction, + test_shared_object_transactions, + }, + objects::{test_gas_objects, test_shared_object}, }; -use test_utils::authority::{spawn_test_authorities, test_authority_configs}; -use test_utils::messages::{make_certificates, move_transaction, publish_move_package_transaction}; -use test_utils::messages::{parse_package_ref, test_shared_object_transactions}; -use test_utils::objects::{test_gas_objects, test_shared_object}; -use tokio::net::TcpStream; -use tokio_util::codec::Framed; -use tokio_util::codec::LengthDelimitedCodec; - -/// Send bytes to a Sui authority. -async fn transmit(transaction: Bytes, config: &AuthorityPrivateInfo) -> SerializedMessage { - let authority_address = format!("{}:{}", config.host, config.port); - let stream = TcpStream::connect(authority_address).await.unwrap(); - let mut connection = Framed::new(stream, LengthDelimitedCodec::new()); - - connection.send(transaction).await.unwrap(); - let bytes = connection.next().await.unwrap().unwrap(); - deserialize_message(&bytes[..]).unwrap() -} /// Submit a certificate containing only owned-objects to all authorities. async fn submit_single_owner_transaction( @@ -39,17 +27,32 @@ async fn submit_single_owner_transaction( configs: &[AuthorityPrivateInfo], ) -> Vec { let certificate = make_certificates(vec![transaction]).pop().unwrap(); - let serialized = Bytes::from(serialize_cert(&certificate)); + let txn = ConfirmationTransaction { certificate }; let mut responses = Vec::new(); for config in configs { - let bytes = transmit(serialized.clone(), config).await; - let reply = deserialize_transaction_info(bytes).unwrap(); + let client = get_client(config); + let reply = client + .handle_confirmation_transaction(txn.clone()) + .await + .unwrap(); responses.push(reply); } responses } +fn get_client(config: &AuthorityPrivateInfo) -> NetworkAuthorityClient { + let network_config = NetworkClient::new( + config.host.clone(), + config.port, + 0, + std::time::Duration::from_secs(30), + std::time::Duration::from_secs(30), + ); + + NetworkAuthorityClient::new(network_config) +} + /// Keep submitting the certificates of a shared-object transaction until it is sequenced by /// at least one consensus node. We use the loop since some consensus protocols (like Tusk) /// may drop transactions. The certificate is submitted to every Sui authority. @@ -59,31 +62,20 @@ async fn submit_shared_object_transaction( ) -> Vec> { let certificate = make_certificates(vec![transaction]).pop().unwrap(); let message = ConsensusTransaction::UserTransaction(certificate); - let serialized = Bytes::from(serialize_consensus_transaction(&message)); loop { let futures: Vec<_> = configs .iter() - .map(|config| transmit(serialized.clone(), config)) + .map(|config| { + let client = get_client(config); + let txn = message.clone(); + async move { client.handle_consensus_transaction(txn).await } + }) .collect(); let mut replies = Vec::new(); for result in futures::future::join_all(futures).await { - match result { - SerializedMessage::TransactionResp(reply) => { - // We got a reply from the Sui authority. - replies.push(Some(Ok(*reply))); - } - SerializedMessage::Error(error) => match *error { - SuiError::ConsensusConnectionBroken(_) => { - // This is the (confusing, #1489) error message returned by the consensus - // adapter. It means it didn't hear back from consensus and timed out. - replies.push(None); - } - error => replies.push(Some(Err(error))), - }, - message => panic!("Unexpected protocol message: {message:?}"), - } + replies.push(Some(result)) } if replies.iter().any(|x| x.is_some()) { // Remove all `ConsensusConnectionBroken` replies. @@ -407,7 +399,9 @@ async fn shared_object_sync() { .await; for reply in replies { match reply { - Err(SuiError::SharedObjectLockingFailure(_)) => (), + // Right now grpc doesn't send back the error message in a recoverable way + // Err(SuiError::SharedObjectLockingFailure(_)) => (), + Err(_) => (), _ => panic!("Unexpected protocol message"), } } diff --git a/sui_core/Cargo.toml b/sui_core/Cargo.toml index f1987367e5330..e16999737de6f 100644 --- a/sui_core/Cargo.toml +++ b/sui_core/Cargo.toml @@ -15,6 +15,8 @@ rand = "0.7.3" bytes = "1.1.0" serde = { version = "1.0.137", features = ["derive"] } tokio = { version = "1.17.0", features = ["full", "tracing"] } +tokio-stream = { version = "0.1.8", features = ["sync", "net"] } +tokio-util = { version = "0.7.1", features = ["codec"] } parking_lot = "0.12.0" itertools = "0.10.3" async-trait = "0.1.53" @@ -30,7 +32,7 @@ schemars = "0.8.8" sui-adapter = { path = "../sui_programmability/adapter" } sui-framework = { path = "../sui_programmability/framework" } -sui-network = { path = "../network_utils" } +sui-network = { path = "../crates/sui-network" } sui-types = { path = "../sui_types" } move-binary-format = { git = "https://github.com/move-language/move", rev = "4e025186af502c931318884df53c11bf34a664bc" } diff --git a/sui_core/src/authority_client.rs b/sui_core/src/authority_client.rs index 982fc8be6f932..7bf90a0a022d8 100644 --- a/sui_core/src/authority_client.rs +++ b/sui_core/src/authority_client.rs @@ -4,16 +4,18 @@ use crate::authority::AuthorityState; use async_trait::async_trait; -use futures::lock::Mutex; -use futures::stream::{self, BoxStream}; -use futures::StreamExt; -use std::collections::VecDeque; -use std::io; -use std::sync::Arc; -use sui_network::network::NetworkClient; -use sui_network::transport::TcpDataStream; -use sui_types::batch::UpdateItem; -use sui_types::{error::SuiError, messages::*, serialize::*}; +use futures::{ + lock::Mutex, + stream::{self, BoxStream}, + StreamExt, TryStreamExt, +}; +use std::{collections::VecDeque, io, sync::Arc}; +use sui_network::{ + api::{BincodeEncodedPayload, ValidatorClient}, + network::NetworkClient, + tonic, +}; +use sui_types::{error::SuiError, messages::*}; #[cfg(test)] use sui_types::{ @@ -23,8 +25,6 @@ use sui_types::{ object::Object, }; -static MAX_ERRORS: i32 = 10; - #[async_trait] pub trait AuthorityAPI { /// Initiate a new transaction to a Sui or Primary account. @@ -72,11 +72,40 @@ pub trait AuthorityAPI { pub type BatchInfoResponseItemStream = BoxStream<'static, Result>; #[derive(Clone)] -pub struct NetworkAuthorityClient(NetworkClient); +pub struct NetworkAuthorityClient { + _network_client: NetworkClient, + client: ValidatorClient, +} impl NetworkAuthorityClient { pub fn new(network_client: NetworkClient) -> Self { - Self(network_client) + let uri = format!( + "http://{}:{}", + network_client.base_address(), + network_client.base_port() + ) + .parse() + .unwrap(); + let channel = tonic::transport::Channel::builder(uri) + .connect_timeout(network_client.send_timeout()) + .timeout(network_client.recv_timeout()) + .connect_lazy(); + let client = ValidatorClient::new(channel); + Self { + _network_client: network_client, + client, + } + } + + pub fn with_channel(channel: tonic::transport::Channel, network_client: NetworkClient) -> Self { + Self { + _network_client: network_client, + client: ValidatorClient::new(channel), + } + } + + fn client(&self) -> ValidatorClient { + self.client.clone() } } @@ -87,11 +116,17 @@ impl AuthorityAPI for NetworkAuthorityClient { &self, transaction: Transaction, ) -> Result { + let request = BincodeEncodedPayload::try_from(&transaction).unwrap(); let response = self - .0 - .send_recv_bytes(serialize_transaction(&transaction)) - .await?; - deserialize_transaction_info(response) + .client() + .transaction(request) + .await + .map_err(|_| SuiError::UnexpectedMessage)? + .into_inner(); + + response + .deserialize() + .map_err(|_| SuiError::UnexpectedMessage) } /// Confirm a transfer to a Sui or Primary account. @@ -99,44 +134,72 @@ impl AuthorityAPI for NetworkAuthorityClient { &self, transaction: ConfirmationTransaction, ) -> Result { + let request = BincodeEncodedPayload::try_from(&transaction).unwrap(); let response = self - .0 - .send_recv_bytes(serialize_cert(&transaction.certificate)) - .await?; - deserialize_transaction_info(response) + .client() + .confirmation_transaction(request) + .await + .map_err(|_| SuiError::UnexpectedMessage)? + .into_inner(); + + response + .deserialize() + .map_err(|_| SuiError::UnexpectedMessage) } async fn handle_consensus_transaction( &self, transaction: ConsensusTransaction, ) -> Result { + let request = BincodeEncodedPayload::try_from(&transaction).unwrap(); let response = self - .0 - .send_recv_bytes(serialize_consensus_transaction(&transaction)) - .await?; - deserialize_transaction_info(response) + .client() + .consensus_transaction(request) + .await + .map_err(|e| SuiError::GenericAuthorityError { + error: e.to_string(), + })? + .into_inner(); + + response + .deserialize() + .map_err(|e| SuiError::GenericAuthorityError { + error: e.to_string(), + }) } async fn handle_account_info_request( &self, request: AccountInfoRequest, ) -> Result { + let request = BincodeEncodedPayload::try_from(&request).unwrap(); let response = self - .0 - .send_recv_bytes(serialize_account_info_request(&request)) - .await?; - deserialize_account_info(response) + .client() + .account_info(request) + .await + .map_err(|_| SuiError::UnexpectedMessage)? + .into_inner(); + + response + .deserialize() + .map_err(|_| SuiError::UnexpectedMessage) } async fn handle_object_info_request( &self, request: ObjectInfoRequest, ) -> Result { + let request = BincodeEncodedPayload::try_from(&request).unwrap(); let response = self - .0 - .send_recv_bytes(serialize_object_info_request(&request)) - .await?; - deserialize_object_info(response) + .client() + .object_info(request) + .await + .map_err(|_| SuiError::UnexpectedMessage)? + .into_inner(); + + response + .deserialize() + .map_err(|_| SuiError::UnexpectedMessage) } /// Handle Object information requests for this account. @@ -144,11 +207,17 @@ impl AuthorityAPI for NetworkAuthorityClient { &self, request: TransactionInfoRequest, ) -> Result { + let request = BincodeEncodedPayload::try_from(&request).unwrap(); let response = self - .0 - .send_recv_bytes(serialize_transaction_info_request(&request)) - .await?; - deserialize_transaction_info(response) + .client() + .transaction_info(request) + .await + .map_err(|_| SuiError::UnexpectedMessage)? + .into_inner(); + + response + .deserialize() + .map_err(|_| SuiError::UnexpectedMessage) } /// Handle Batch information requests for this authority. @@ -156,49 +225,23 @@ impl AuthorityAPI for NetworkAuthorityClient { &self, request: BatchInfoRequest, ) -> Result { - let tcp_stream = self - .0 - .connect_for_stream(serialize_batch_request(&request)) - .await?; - - let mut error_count = 0; - let TcpDataStream { framed_read, .. } = tcp_stream; - - let stream = framed_read - .map(|item| { - item - // Convert io error to SuiClient error - .map_err(|err| SuiError::ClientIoError { - error: format!("io error: {:?}", err), - }) - // If no error try to deserialize - .and_then(|bytes| match deserialize_message(&bytes[..]) { - Ok(SerializedMessage::Error(error)) => Err(SuiError::ClientIoError { - error: format!("io error: {:?}", error), - }), - Ok(message) => Ok(message), - Err(_) => Err(SuiError::InvalidDecoding), - }) - // If deserialized try to parse as Batch Item - .and_then(deserialize_batch_info) - }) - // Establish conditions to stop taking from the stream - .take_while(move |item| { - let flag = match item { - Ok(BatchInfoResponseItem(UpdateItem::Batch(signed_batch))) => { - signed_batch.batch.next_sequence_number < request.end - } - Ok(BatchInfoResponseItem(UpdateItem::Transaction((seq, _digest)))) => { - *seq < request.end - } - Err(_e) => { - // TODO: record e - error_count += 1; - error_count < MAX_ERRORS - } - }; - futures::future::ready(flag) + let request = BincodeEncodedPayload::try_from(&request).unwrap(); + let response_stream = self + .client() + .batch_info(request) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? + .into_inner(); + + let stream = response_stream + .map_err(|_| SuiError::UnexpectedMessage) + .and_then(|item| { + let response_item = item + .deserialize::() + .map_err(|_| SuiError::UnexpectedMessage); + futures::future::ready(response_item) }); + Ok(Box::pin(stream)) } } diff --git a/sui_core/src/authority_server.rs b/sui_core/src/authority_server.rs index 59df1cda4e436..0c897ab36ccc6 100644 --- a/sui_core/src/authority_server.rs +++ b/sui_core/src/authority_server.rs @@ -2,46 +2,65 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::authority::AuthorityState; -use futures::{SinkExt, StreamExt}; -use std::collections::VecDeque; -use std::net::SocketAddr; -use std::{io, sync::Arc}; +use crate::{ + authority::AuthorityState, + consensus_adapter::{ConsensusAdapter, ConsensusListenerMessage}, +}; +use async_trait::async_trait; +use futures::{stream::BoxStream, FutureExt, StreamExt, TryStreamExt}; +use std::{io, net::SocketAddr, sync::Arc, time::Duration}; use sui_network::{ + api::{BincodeEncodedPayload, Validator, ValidatorServer}, network::NetworkServer, - transport::{spawn_server, MessageHandler, RwChannel, SpawnedServer}, + tonic, }; use sui_types::{ batch::UpdateItem, crypto::VerificationObligation, error::*, messages::*, serialize::*, }; -use tokio::sync::mpsc::Sender; - -use std::time::Duration; -use tracing::{error, info, warn, Instrument}; - -use crate::consensus_adapter::{ConsensusAdapter, ConsensusListenerMessage}; -use async_trait::async_trait; -use bytes::{Bytes, BytesMut}; -use tokio::sync::broadcast::error::RecvError; +use tokio::{net::TcpListener, sync::mpsc::Sender}; +use tracing::{info, Instrument}; #[cfg(test)] #[path = "unit_tests/server_tests.rs"] mod server_tests; -/* - The number of input chunks the authority will try to process in parallel. - - TODO: provide a configuration parameter to allow authority operators to - set it, or a dynamic mechanism to adapt it according to observed workload. -*/ -const CHUNK_SIZE: usize = 36; const MIN_BATCH_SIZE: u64 = 1000; const MAX_DELAY_MILLIS: u64 = 5_000; // 5 sec +pub struct AuthorityServerHandle { + tx_cancellation: tokio::sync::oneshot::Sender<()>, + local_addr: SocketAddr, + handle: tokio::task::JoinHandle>, +} + +impl AuthorityServerHandle { + pub async fn join(self) -> Result<(), std::io::Error> { + // Note that dropping `self.complete` would terminate the server. + self.handle + .await? + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + Ok(()) + } + + pub async fn kill(self) -> Result<(), std::io::Error> { + self.tx_cancellation.send(()).unwrap(); + self.handle + .await? + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + Ok(()) + } + + pub fn get_port(&self) -> u16 { + self.local_addr.port() + } +} + pub struct AuthorityServer { server: NetworkServer, pub state: Arc, consensus_adapter: ConsensusAdapter, + min_batch_size: u64, + max_delay: Duration, } impl AuthorityServer { @@ -58,36 +77,35 @@ impl AuthorityServer { buffer_size, state.committee.clone(), tx_consensus_listener, - /* max_delay */ Duration::from_millis(1_000), + /* max_delay */ Duration::from_millis(2_000), ); Self { server: NetworkServer::new(base_address, base_port, buffer_size), state, consensus_adapter, + min_batch_size: MIN_BATCH_SIZE, + max_delay: Duration::from_millis(MAX_DELAY_MILLIS), } } /// Create a batch subsystem, register it with the authority state, and /// launch a task that manages it. Return the join handle of this task. pub async fn spawn_batch_subsystem( - self: &Arc, + &self, min_batch_size: u64, max_delay: Duration, ) -> SuiResult>> { // Start the batching subsystem, and register the handles with the authority. - let local_server = self.clone(); - - let _batch_join_handle = tokio::task::spawn(async move { - local_server - .state - .run_batch_service(min_batch_size, max_delay) - .await - }); + let state = self.state.clone(); + let _batch_join_handle = + tokio::task::spawn( + async move { state.run_batch_service(min_batch_size, max_delay).await }, + ); Ok(_batch_join_handle) } - pub async fn spawn(self) -> Result, io::Error> { + pub async fn spawn(self) -> Result { let address = format!("{}:{}", self.server.base_address, self.server.base_port); self.spawn_with_bind_address(&address).await } @@ -95,294 +113,283 @@ impl AuthorityServer { pub async fn spawn_with_bind_address( self, address: &str, - ) -> Result, io::Error> { - let buffer_size = self.server.buffer_size; - let guarded_state = Arc::new(self); - + ) -> Result { // Start the batching subsystem - let _join_handle = guarded_state - .spawn_batch_subsystem(MIN_BATCH_SIZE, Duration::from_millis(MAX_DELAY_MILLIS)) + let _join_handle = self + .spawn_batch_subsystem(self.min_batch_size, self.max_delay) .await; - spawn_server(address, guarded_state, buffer_size).await + let std_listener = std::net::TcpListener::bind(address)?; + + let local_addr = std_listener.local_addr()?; + let host = local_addr.ip(); + let port = local_addr.port(); + info!("Listening to TCP traffic on {host}:{port}"); + // see https://fly.io/blog/the-tokio-1-x-upgrade/#tcplistener-from_std-needs-to-be-set-to-nonblocking + std_listener.set_nonblocking(true)?; + let listener = + tokio_stream::wrappers::TcpListenerStream::new(TcpListener::from_std(std_listener)?); + + let (tx_cancellation, rx_cancellation) = tokio::sync::oneshot::channel(); + let service = tonic::transport::Server::builder() + .add_service(ValidatorServer::new(self)) + .serve_with_incoming_shutdown(listener, rx_cancellation.map(|_| ())); + let handle = AuthorityServerHandle { + tx_cancellation, + local_addr, + handle: tokio::spawn(service), + }; + Ok(handle) } +} - async fn handle_batch_streaming<'a, 'b, A>( - &'a self, - request: BatchInfoRequest, - channel: &mut A, - ) -> Result<(), SuiError> - where - A: RwChannel<'b>, - { - // Register a subscriber to not miss any updates - let mut subscriber = self.state.subscribe_batch(); - let message_end = request.end; - - // Get the historical data requested - let (mut items, should_subscribe) = self.state.handle_batch_info_request(request).await?; - - let mut last_seq_sent = 0; - while let Some(item) = items.pop_front() { - // Remember the last transaction sequence number sent - if let UpdateItem::Transaction((seq, _)) = &item { - last_seq_sent = *seq; - } - - // Send all items back to the client - let item = serialize_batch_item(&BatchInfoResponseItem(item)); - channel - .sink() - .send(Bytes::from(item)) - .await - .map_err(|_| SuiError::CannotSendClientMessageError)?; - } +#[async_trait] +impl Validator for AuthorityServer { + async fn transaction( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let mut transaction: Transaction = request + .into_inner() + .deserialize() + .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; - // No need to send live events. - if !should_subscribe { - return Ok(()); - } + let mut obligation = VerificationObligation::default(); + transaction + .add_tx_sig_to_verification_obligation(&mut obligation) + .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; + obligation + .verify_all() + .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; + //TODO This is really really bad, we should have different types for checked transactions + transaction.is_checked = true; + + let tx_digest = transaction.digest(); + + // Enable Trace Propagation across spans/processes using tx_digest + let span = tracing::debug_span!( + "process_tx", + ?tx_digest, + tx_kind = transaction.data.kind_as_str() + ); - // Now we read from the live updates. - loop { - match subscriber.recv().await { - Ok(item) => { - let seq = match &item { - UpdateItem::Transaction((seq, _)) => *seq, - UpdateItem::Batch(signed_batch) => signed_batch.batch.next_sequence_number, - }; - - // Do not re-send transactions already sent from the database - if seq <= last_seq_sent { - continue; - } + let info = self + .state + .handle_transaction(transaction) + .instrument(span) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; - let response = BatchInfoResponseItem(item); - - // Send back the item from the subscription - let resp = serialize_batch_item(&response); - channel - .sink() - .send(Bytes::from(resp)) - .await - .map_err(|_| SuiError::CannotSendClientMessageError)?; - - // We always stop sending at batch boundaries, so that we try to always - // start with a batch and end with a batch to allow signature verification. - if let BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) = &response { - if message_end < signed_batch.batch.next_sequence_number { - break; - } - } - } - Err(RecvError::Closed) => { - // The service closed the channel, so we tell the client. - return Err(SuiError::SubscriptionServiceClosed); - } - Err(RecvError::Lagged(number_skipped)) => { - // We tell the client they are too slow to consume, and - // stop. - return Err(SuiError::SubscriptionItemsDroppedError(number_skipped)); - } - } - } + let payload = BincodeEncodedPayload::try_from(&info) + .map_err(|e| tonic::Status::internal(e.to_string()))?; - Ok(()) + Ok(tonic::Response::new(payload)) } - async fn handle_one_message<'a, 'b, A>( - &'a self, - message: SerializedMessage, - channel: &mut A, - ) -> Option> - where - A: RwChannel<'b>, - { - let reply = match message { - SerializedMessage::Transaction(message) => { - let tx_digest = message.digest(); - // Enable Trace Propagation across spans/processes using tx_digest - let span = tracing::debug_span!( - "process_tx", - ?tx_digest, - tx_kind = message.data.kind_as_str() - ); - // No allocations: it's a 'static str! - self.state - .handle_transaction(*message) - .instrument(span) - .await - .map(|info| Some(serialize_transaction_info(&info))) - } - SerializedMessage::Cert(message) => { - let confirmation_transaction = ConfirmationTransaction { - certificate: message.as_ref().clone(), - }; - let tx_digest = *message.digest(); - let span = tracing::debug_span!( - "process_cert", - ?tx_digest, - tx_kind = message.data.kind_as_str() - ); - match self - .state - .handle_confirmation_transaction(confirmation_transaction) - .instrument(span) - .await - { - Ok(info) => { - // Response - Ok(Some(serialize_transaction_info(&info))) - } - Err(error) => Err(error), - } - } - SerializedMessage::AccountInfoReq(message) => self - .state - .handle_account_info_request(*message) - .await - .map(|info| Some(serialize_account_info_response(&info))), - SerializedMessage::ObjectInfoReq(message) => self - .state - .handle_object_info_request(*message) - .await - .map(|info| Some(serialize_object_info_response(&info))), - SerializedMessage::TransactionInfoReq(message) => self - .state - .handle_transaction_info_request(*message) - .await - .map(|info| Some(serialize_transaction_info(&info))), - SerializedMessage::BatchInfoReq(message) => self - .handle_batch_streaming(*message, channel) - .await - .map(|_| None), - SerializedMessage::ConsensusTransaction(message) => { - self.consensus_adapter.submit(&message).await.map(Some) - } - - _ => Err(SuiError::UnexpectedMessage), + async fn confirmation_transaction( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let mut transaction: CertifiedTransaction = request + .into_inner() + .deserialize() + .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; + + let mut obligation = VerificationObligation::default(); + transaction + .add_to_verification_obligation(&self.state.committee, &mut obligation) + .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; + obligation + .verify_all() + .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; + //TODO This is really really bad, we should have different types for checked transactions + transaction.is_checked = true; + + let tx_digest = transaction.digest(); + let span = tracing::debug_span!( + "process_cert", + ?tx_digest, + tx_kind = transaction.data.kind_as_str() + ); + + let confirmation_transaction = ConfirmationTransaction { + certificate: transaction, }; - self.server.increment_packets_processed(); + let info = self + .state + .handle_confirmation_transaction(confirmation_transaction) + .instrument(span) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; - if self.server.packets_processed() % 5000 == 0 { - info!( - "{}:{} has processed {} packets", - self.server.base_address, - self.server.base_port, - self.server.packets_processed() - ); - } + let payload = BincodeEncodedPayload::try_from(&info) + .map_err(|e| tonic::Status::internal(e.to_string()))?; - match reply { - Ok(x) => x, - Err(error) => { - warn!("User query failed: {error}"); - self.server.increment_user_errors(); - Some(serialize_error(&error)) - } - } + Ok(tonic::Response::new(payload)) } - /// For each Transaction and Certificate updates a verification - /// obligation structure, and returns an error either if the collection in the - /// obligation went wrong or the verification of the signatures went wrong. - fn batch_verify_one_chunk( + async fn consensus_transaction( &self, - one_chunk: Vec>, - ) -> Result, SuiError> { - let one_chunk: Result, _> = one_chunk.into_iter().collect(); - let one_chunk = one_chunk?; + request: tonic::Request, + ) -> Result, tonic::Status> { + let transaction: ConsensusTransaction = request + .into_inner() + .deserialize() + .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; + + let info = self + .consensus_adapter + .submit(&transaction) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; - // Now create a verification obligation - let mut obligation = VerificationObligation::default(); - let load_verification: Result, SuiError> = - one_chunk - .into_iter() - .map(|mut item| { - let (message, _message_bytes) = &mut item; - match message { - SerializedMessage::Transaction(message) => { - message.is_checked = true; - message.add_tx_sig_to_verification_obligation(&mut obligation)?; - } - SerializedMessage::Cert(message) => { - message.is_checked = true; - message.add_to_verification_obligation( - &self.state.committee, - &mut obligation, - )?; - } - _ => {} - }; - Ok(item) - }) - .collect(); - - // Check the obligations and the verification is - let one_chunk = load_verification?; - obligation.verify_all()?; - Ok(one_chunk) + // For some reason the output of consensus changed, we should change it back + let info = deserialize_message(&info[..]).unwrap(); + let info = deserialize_transaction_info(info).unwrap(); + + let payload = BincodeEncodedPayload::try_from(&info) + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + Ok(tonic::Response::new(payload)) } -} -#[async_trait] -impl<'a, A> MessageHandler for AuthorityServer -where - A: 'static + RwChannel<'a> + Unpin + Send, -{ - async fn handle_messages(&self, mut channel: A) -> () { - /* - Take messages in chunks of CHUNK_SIZE and parses them, keeps also the - original bytes for later use, and reports any errors. Special care is - taken to turn all errors into SuiError. - */ - - while let Some(one_chunk) = channel - .stream() - .map(|msg_bytes_result| { - msg_bytes_result - .map_err(|_| SuiError::InvalidDecoding) - .and_then(|msg_bytes| { - deserialize_message(&msg_bytes[..]) - .map_err(|_| SuiError::InvalidDecoding) - .map(|msg| (msg, msg_bytes)) - }) - }) - .ready_chunks(CHUNK_SIZE) - .next() + async fn account_info( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request: AccountInfoRequest = request + .into_inner() + .deserialize() + .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; + + let response = self + .state + .handle_account_info_request(request) .await - { - /* - Very the signatures of the chunk as a whole - */ - let one_chunk = self.batch_verify_one_chunk(one_chunk); - - /* - If this is an error send back the error and drop the connection. - Here we make the choice to bail out as soon as either any parsing - or signature / committee verification operation fails. The client - should know better than give invalid input. All conditions can be - trivially checked on the client side, so there should be no surprises - here for well behaved clients. - */ - if let Err(err) = one_chunk { - // If the response channel is closed there is no much we can do - // to handle the error result. - let _ = channel.sink().send(serialize_error(&err).into()).await; - return; - } - let mut one_chunk = one_chunk.unwrap(); - - // Process each message - while let Some((_message, _buffer)) = one_chunk.pop_front() { - if let Some(reply) = self.handle_one_message(_message, &mut channel).await { - let status = channel.sink().send(reply.into()).await; - if let Err(error) = status { - error!("Failed to send query response: {error}"); + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let payload = BincodeEncodedPayload::try_from(&response) + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + Ok(tonic::Response::new(payload)) + } + + async fn object_info( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request: ObjectInfoRequest = request + .into_inner() + .deserialize() + .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; + + let response = self + .state + .handle_object_info_request(request) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let payload = BincodeEncodedPayload::try_from(&response) + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + Ok(tonic::Response::new(payload)) + } + + async fn transaction_info( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request: TransactionInfoRequest = request + .into_inner() + .deserialize() + .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; + + let response = self + .state + .handle_transaction_info_request(request) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let payload = BincodeEncodedPayload::try_from(&response) + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + Ok(tonic::Response::new(payload)) + } + + type BatchInfoStream = BoxStream<'static, Result>; + + async fn batch_info( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request: BatchInfoRequest = request + .into_inner() + .deserialize() + .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; + + // Register a subscriber to not miss any updates + let subscriber = self.state.subscribe_batch(); + let message_end = request.end; + + // Get the historical data requested + let (items, should_subscribe) = self + .state + .handle_batch_info_request(request) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let last_seq = items + .back() + .map(|item| { + if let UpdateItem::Transaction((seq, _)) = item { + *seq + } else { + 0 + } + }) + .unwrap_or(0); + + let items = futures::stream::iter(items).map(Ok); + let subscriber = tokio_stream::wrappers::BroadcastStream::new(subscriber) + .take_while(move |_| futures::future::ready(should_subscribe)) + .take_while(|item| futures::future::ready(item.is_ok())) + // Do not re-send transactions already sent from the database + .skip_while(move |item| { + let skip = match item { + Ok(item) => { + let seq = match item { + UpdateItem::Transaction((seq, _)) => *seq, + UpdateItem::Batch(signed_batch) => { + signed_batch.batch.next_sequence_number + } + }; + seq <= last_seq } + Err(_) => false, }; - } - } + futures::future::ready(skip) + }) + // We always stop sending at batch boundaries, so that we try to always + // start with a batch and end with a batch to allow signature verification. + .take_while(move |item| { + let take = match item { + Ok(UpdateItem::Batch(signed_batch)) => { + message_end >= signed_batch.batch.next_sequence_number + } + _ => true, + }; + futures::future::ready(take) + }); + + let response = items + .chain(subscriber) + .map_err(|e| tonic::Status::internal(e.to_string())) + .map_ok(|item| { + let item = BatchInfoResponseItem(item); + BincodeEncodedPayload::try_from(&item).expect("serialization should not fail") + }); + + Ok(tonic::Response::new(Box::pin(response))) } } diff --git a/sui_core/src/consensus_adapter.rs b/sui_core/src/consensus_adapter.rs index 2267aa2eff42d..a7fc4fbac9edd 100644 --- a/sui_core/src/consensus_adapter.rs +++ b/sui_core/src/consensus_adapter.rs @@ -3,19 +3,26 @@ use bytes::Bytes; use futures::SinkExt; use narwhal_executor::SubscriberResult; -use std::collections::hash_map::DefaultHasher; -use std::collections::HashMap; -use std::hash::{Hash, Hasher}; -use std::net::SocketAddr; -use sui_network::transport; -use sui_network::transport::{RwChannel, TcpDataStream}; -use sui_types::committee::Committee; -use sui_types::error::{SuiError, SuiResult}; -use sui_types::messages::ConsensusTransaction; -use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::oneshot; -use tokio::task::JoinHandle; -use tokio::time::{timeout, Duration}; +use std::{ + collections::{hash_map::DefaultHasher, HashMap}, + hash::{Hash, Hasher}, + net::SocketAddr, +}; +use sui_types::{ + committee::Committee, + error::{SuiError, SuiResult}, + messages::ConsensusTransaction, +}; +use tokio::{ + net::TcpStream, + sync::{ + mpsc::{Receiver, Sender}, + oneshot, + }, + task::JoinHandle, + time::{timeout, Duration}, +}; +use tokio_util::codec::{FramedWrite, LengthDelimitedCodec}; use tracing::debug; #[cfg(test)] @@ -84,10 +91,21 @@ impl ConsensusAdapter { } /// Attempt to reconnect with a the consensus node. - async fn reconnect(address: SocketAddr, buffer_size: usize) -> SuiResult { - transport::connect(address.to_string(), buffer_size) + async fn reconnect( + address: SocketAddr, + buffer_size: usize, + ) -> SuiResult> { + let stream = TcpStream::connect(address) .await - .map_err(|e| SuiError::ConsensusConnectionBroken(e.to_string())) + .map_err(|e| SuiError::ConsensusConnectionBroken(e.to_string()))?; + + let stream = FramedWrite::new( + stream, + LengthDelimitedCodec::builder() + .max_frame_length(buffer_size) + .new_codec(), + ); + Ok(stream) } /// Check if this authority should submit the transaction to consensus. @@ -124,7 +142,6 @@ impl ConsensusAdapter { // does not require to take self as a mutable reference. Self::reconnect(self.consensus_address, self.buffer_size) .await? - .sink() .send(bytes) .await .map_err(|e| SuiError::ConsensusConnectionBroken(e.to_string()))?; diff --git a/sui_core/src/unit_tests/server_tests.rs b/sui_core/src/unit_tests/server_tests.rs index 4fc853b17cc76..3d99134591dbb 100644 --- a/sui_core/src/unit_tests/server_tests.rs +++ b/sui_core/src/unit_tests/server_tests.rs @@ -1,16 +1,19 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use super::*; +use crate::{ + authority::authority_tests::init_state_with_object_id, + authority_client::{AuthorityAPI, NetworkAuthorityClient}, +}; use std::sync::Arc; - -use crate::authority::authority_tests::init_state_with_object_id; -use sui_types::base_types::{dbg_addr, dbg_object_id, TransactionDigest}; -use sui_types::object::ObjectFormatOptions; -use sui_types::serialize::{deserialize_message, serialize_object_info_request}; +use sui_network::network::NetworkClient; +use sui_types::{ + base_types::{dbg_addr, dbg_object_id, TransactionDigest}, + object::ObjectFormatOptions, +}; use typed_store::Map; -use super::*; - #[tokio::test] async fn test_start_stop_batch_subsystem() { let sender = dbg_addr(1); @@ -47,56 +50,9 @@ async fn test_start_stop_batch_subsystem() { .expect("Subsystem crashed?"); } -// Some infra to feed the server messages and receive responses. - -use bytes::{Bytes, BytesMut}; -use futures::channel::mpsc::{channel, Receiver, Sender}; -use futures::sink::SinkMapErr; -use futures::{Sink, SinkExt}; - -type SinkSenderErr = - SinkMapErr, fn( as Sink>::Error) -> std::io::Error>; - -struct TestChannel { - reader: Receiver>, - writer: SinkSenderErr, -} - -#[allow(clippy::type_complexity)] // appease clippy, in the tests! -impl TestChannel { - pub fn new() -> ( - TestChannel, - (Sender>, Receiver), - ) { - let (outer_tx, inner_rx) = channel(1000); - let (inner_tx, outer_rx) = channel(1000); - - let test_channel = TestChannel { - reader: inner_rx, - writer: inner_tx - .sink_map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "SOme error!")), - }; - - (test_channel, (outer_tx, outer_rx)) - } -} - -impl<'a> RwChannel<'a> for TestChannel { - type R = Receiver>; - type W = SinkSenderErr; - - fn sink(&mut self) -> &mut Self::W { - &mut self.writer - } - fn stream(&mut self) -> &mut Self::R { - &mut self.reader - } -} - //This is the most basic example of how to test the server logic - #[tokio::test] -async fn test_channel_infra() { +async fn test_simple_request() { let sender = dbg_addr(1); let object_id = dbg_object_id(1); let authority_state = init_state_with_object_id(sender, object_id).await; @@ -105,33 +61,33 @@ async fn test_channel_infra() { let consensus_address = "127.0.0.1:0".parse().unwrap(); let (tx_consensus_listener, _rx_consensus_listener) = tokio::sync::mpsc::channel(1); - let server = Arc::new(AuthorityServer::new( + let server = AuthorityServer::new( "127.0.0.1".to_string(), - 999, + 0, 65000, Arc::new(authority_state), consensus_address, tx_consensus_listener, - )); + ); - let (channel, (mut tx, mut rx)) = TestChannel::new(); + let server_handle = server.spawn().await.unwrap(); - let handle = tokio::spawn(async move { - server.handle_messages(channel).await; - }); + let network_config = NetworkClient::new( + server_handle.local_addr.ip().to_string(), + server_handle.local_addr.port(), + 0, + std::time::Duration::from_secs(30), + std::time::Duration::from_secs(30), + ); + + let client = NetworkAuthorityClient::new(network_config); let req = ObjectInfoRequest::latest_object_info_request( object_id, Some(ObjectFormatOptions::default()), ); - let bytes: BytesMut = BytesMut::from(&serialize_object_info_request(&req)[..]); - tx.send(Ok(bytes)).await.expect("Problem sending"); - let resp = rx.next().await; - assert!(!resp.unwrap().is_empty()); - - drop(tx); - handle.await.expect("Problem closing task"); + client.handle_object_info_request(req).await.unwrap(); } #[tokio::test] @@ -145,67 +101,65 @@ async fn test_subscription() { let (tx_consensus_listener, _rx_consensus_listener) = tokio::sync::mpsc::channel(1); // Start the batch server - let server = Arc::new(AuthorityServer::new( + let mut server = AuthorityServer::new( "127.0.0.1".to_string(), - 998, + 0, 65000, Arc::new(authority_state), consensus_address, tx_consensus_listener, - )); + ); + server.min_batch_size = 10; + server.max_delay = Duration::from_secs(500); let db = server.state.db().clone(); let db2 = server.state.db().clone(); + let state = server.state.clone(); - let _join = server - .spawn_batch_subsystem(10, Duration::from_secs(500)) - .await - .expect("Problem launching subsystem."); + let server_handle = server.spawn().await.unwrap(); + + let network_config = NetworkClient::new( + server_handle.local_addr.ip().to_string(), + server_handle.local_addr.port(), + 0, + std::time::Duration::from_secs(30), + std::time::Duration::from_secs(30), + ); + + let client = NetworkAuthorityClient::new(network_config); let tx_zero = TransactionDigest::new([0; 32]); for _i in 0u64..105 { - let ticket = server.state.batch_notifier.ticket().expect("all good"); + let ticket = state.batch_notifier.ticket().expect("all good"); db.executed_sequence .insert(&ticket.seq(), &tx_zero) .expect("Failed to write."); } println!("Sent tickets."); - let (channel, (mut tx, mut rx)) = TestChannel::new(); - - let inner_server1 = server.clone(); - let handle1 = tokio::spawn(async move { - inner_server1.handle_messages(channel).await; - }); - println!("Started messahe handling."); // TEST 1: Get historical data let req = BatchInfoRequest { start: 12, end: 34 }; - let bytes: BytesMut = BytesMut::from(&serialize_batch_request(&req)[..]); - tx.send(Ok(bytes)).await.expect("Problem sending"); + let mut resp = client.handle_batch_stream(req).await.unwrap(); println!("TEST1: Send request."); let mut num_batches = 0; let mut num_transactions = 0; - while let Some(data) = rx.next().await { - match deserialize_message(&data[..]).expect("Bad response") { - SerializedMessage::BatchInfoResp(resp) => match *resp { - BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) => { - num_batches += 1; - if signed_batch.batch.next_sequence_number >= 34 { - break; - } + while let Some(data) = resp.next().await { + let item = data.unwrap(); + match item { + BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) => { + num_batches += 1; + if signed_batch.batch.next_sequence_number >= 34 { + break; } - BatchInfoResponseItem(UpdateItem::Transaction((_seq, _digest))) => { - num_transactions += 1; - } - }, - _ => { - panic!("Bad response"); + } + BatchInfoResponseItem(UpdateItem::Transaction((_seq, _digest))) => { + num_transactions += 1; } } } @@ -218,15 +172,10 @@ async fn test_subscription() { // Test 2: Get subscription data // Add data in real time - let inner_server2 = server.clone(); + let inner_server2 = state.clone(); let _handle2 = tokio::spawn(async move { for i in 105..150 { - tokio::time::sleep(Duration::from_millis(20)).await; - let ticket = inner_server2 - .state - .batch_notifier - .ticket() - .expect("all good"); + let ticket = inner_server2.batch_notifier.ticket().expect("all good"); db2.executed_sequence .insert(&ticket.seq(), &tx_zero) .expect("Failed to write."); @@ -241,30 +190,25 @@ async fn test_subscription() { end: 112, }; - let bytes: BytesMut = BytesMut::from(&serialize_batch_request(&req)[..]); - tx.send(Ok(bytes)).await.expect("Problem sending"); + let mut resp = client.handle_batch_stream(req).await.unwrap(); println!("TEST2: Send request."); let mut num_batches = 0; let mut num_transactions = 0; - while let Some(data) = rx.next().await { - match deserialize_message(&data[..]).expect("Bad response") { - SerializedMessage::BatchInfoResp(resp) => match *resp { - BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) => { - num_batches += 1; - if signed_batch.batch.next_sequence_number >= 112 { - break; - } - } - BatchInfoResponseItem(UpdateItem::Transaction((seq, _digest))) => { - println!("Received {seq}"); - num_transactions += 1; + while let Some(data) = resp.next().await { + let item = data.unwrap(); + match item { + BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) => { + num_batches += 1; + if signed_batch.batch.next_sequence_number >= 112 { + break; } - }, - _ => { - panic!("Bad response"); + } + BatchInfoResponseItem(UpdateItem::Transaction((seq, _digest))) => { + println!("Received {seq}"); + num_transactions += 1; } } } @@ -274,7 +218,5 @@ async fn test_subscription() { println!("TEST2: Finished."); - server.state.batch_notifier.close(); - drop(tx); - handle1.await.expect("Problem closing task"); + state.batch_notifier.close(); } diff --git a/test_utils/Cargo.toml b/test_utils/Cargo.toml index c18db002a1507..384752068f851 100644 --- a/test_utils/Cargo.toml +++ b/test_utils/Cargo.toml @@ -27,5 +27,5 @@ narwhal-config = { git = "https://github.com/MystenLabs/narwhal", rev = "72efe71 sui-types = { path = "../sui_types" } sui_core = { path = "../sui_core" } -sui-network = { path = "../network_utils" } +sui-network = { path = "../crates/sui-network" } sui = { path = "../sui" } diff --git a/test_utils/src/authority.rs b/test_utils/src/authority.rs index 060d2c9145b8c..a9c5cf9ce42c0 100644 --- a/test_utils/src/authority.rs +++ b/test_utils/src/authority.rs @@ -1,19 +1,18 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::test_committee; -use crate::test_keys; +use crate::{test_committee, test_keys}; use narwhal_config::Parameters as ConsensusParameters; -use std::path::PathBuf; -use std::sync::Arc; -use sui::config::{make_default_narwhal_committee, AuthorityPrivateInfo, PORT_ALLOCATOR}; -use sui::sui_commands::make_authority; +use std::{path::PathBuf, sync::Arc}; +use sui::{ + config::{make_default_narwhal_committee, AuthorityPrivateInfo, PORT_ALLOCATOR}, + sui_commands::make_authority, +}; use sui_adapter::genesis; -use sui_core::authority::AuthorityState; -use sui_core::authority::AuthorityStore; -use sui_core::authority_server::AuthorityServer; -use sui_network::transport::SpawnedServer; -use sui_types::crypto::KeyPair; -use sui_types::object::Object; +use sui_core::{ + authority::{AuthorityState, AuthorityStore}, + authority_server::AuthorityServerHandle, +}; +use sui_types::{crypto::KeyPair, object::Object}; /// The default network buffer size of a test authority. pub const NETWORK_BUFFER_SIZE: usize = 65_000; @@ -83,7 +82,7 @@ pub async fn spawn_test_authorities( objects: I, authorities: &[AuthorityPrivateInfo], key_pairs: &[KeyPair], -) -> Vec> +) -> Vec where I: IntoIterator + Clone, { diff --git a/test_utils/src/lib.rs b/test_utils/src/lib.rs index 93adf6da0e4f0..fd50262718829 100644 --- a/test_utils/src/lib.rs +++ b/test_utils/src/lib.rs @@ -4,13 +4,13 @@ pub mod authority; pub mod messages; pub mod network; pub mod objects; -pub mod sequencer; -use rand::rngs::StdRng; -use rand::SeedableRng; -use sui_types::base_types::SuiAddress; -use sui_types::committee::Committee; -use sui_types::crypto::{get_key_pair_from_rng, KeyPair}; +use rand::{rngs::StdRng, SeedableRng}; +use sui_types::{ + base_types::SuiAddress, + committee::Committee, + crypto::{get_key_pair_from_rng, KeyPair}, +}; /// The size of the committee used for tests. pub const TEST_COMMITTEE_SIZE: usize = 4; diff --git a/test_utils/src/sequencer.rs b/test_utils/src/sequencer.rs deleted file mode 100644 index 2035fb2a1eb94..0000000000000 --- a/test_utils/src/sequencer.rs +++ /dev/null @@ -1,437 +0,0 @@ -// Copyright (c) 2022, Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use async_trait::async_trait; -use bytes::Bytes; -use futures::stream::futures_unordered::FuturesUnordered; -use futures::stream::StreamExt; -use futures::SinkExt; -use rocksdb::{ColumnFamilyDescriptor, DBCompressionType, DBWithThreadMode, MultiThreaded}; -use std::collections::HashMap; -use std::net::SocketAddr; -use std::path::Path; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::time::Duration; -use sui_network::transport::{MessageHandler, RwChannel}; -use sui_types::base_types::SequenceNumber; -use sui_types::error::{SuiError, SuiResult}; -use sui_types::messages::ConsensusOutput; -use sui_types::serialize::{deserialize_message, serialize_consensus_output, SerializedMessage}; -use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::time::sleep; -use tracing::log; -use typed_store::rocks::{DBMap, TypedStoreError}; -use typed_store::traits::Map; - -/// A mock single-process sequencer. It is not crash-safe (it has no storage) and should -/// only be used for testing. -pub struct Sequencer { - /// The network address where to receive input messages. - pub input_address: SocketAddr, - /// The network address where to receive subscriber requests. - pub subscriber_address: SocketAddr, - /// The network buffer size. - pub buffer_size: usize, - /// The delay to wait before sequencing a message. This parameter is only required to - /// emulates the consensus' latency. - pub consensus_delay: Duration, -} - -impl Sequencer { - /// Spawn a new sequencer. The sequencer is made of a number of component each running - /// in their own tokio task. - pub async fn spawn(sequencer: Self, store_path: &Path) -> SuiResult<()> { - let (tx_input, rx_input) = channel(100); - let (tx_subscriber, rx_subscriber) = channel(100); - - // Load the persistent storage. - let store = Arc::new(SequencerStore::open(store_path, None)); - - // Spawn the sequencer core. - let mut sequencer_core = SequencerCore::new(rx_input, rx_subscriber, store.clone())?; - tokio::spawn(async move { - sequencer_core.run(sequencer.consensus_delay).await; - }); - - // Spawn the server receiving input messages to order. - tokio::spawn(async move { - let input_server = InputServer { tx_input }; - sui_network::transport::spawn_server( - &sequencer.input_address.to_string(), - Arc::new(input_server), - sequencer.buffer_size, - ) - .await - .unwrap() - .join() - .await - .unwrap(); - }); - - // Spawn the server receiving subscribers to the output of the sequencer. - tokio::spawn(async move { - let subscriber_server = SubscriberServer::new(tx_subscriber, store); - sui_network::transport::spawn_server( - &sequencer.subscriber_address.to_string(), - Arc::new(subscriber_server), - sequencer.buffer_size, - ) - .await - .unwrap() - .join() - .await - .unwrap(); - }); - - Ok(()) - } -} - -/// The core of the sequencer, totally ordering input bytes. -pub struct SequencerCore { - /// Receive users' certificates to sequence - rx_input: Receiver, - /// Communicate with subscribers to update with the output of the sequence. - rx_subscriber: Receiver, - /// Persistent storage to hold all consensus outputs. This task is the only one - /// that writes to the store. - store: Arc, - /// The global consensus index. - consensus_index: SequenceNumber, - /// The current number of subscribers. - subscribers_count: usize, -} - -impl SequencerCore { - /// The maximum number of subscribers. - pub const MAX_SUBSCRIBERS: usize = 1_000; - - /// Create a new sequencer core instance. - pub fn new( - rx_input: Receiver, - rx_subscriber: Receiver, - store: Arc, - ) -> SuiResult { - let consensus_index = store.get_last_consensus_index()?; - Ok(Self { - rx_input, - rx_subscriber, - store, - consensus_index, - subscribers_count: 0, - }) - } - - /// Simply wait for a fixed delay and then returns the input. - async fn waiter(deliver: Bytes, delay: Duration) -> Bytes { - sleep(delay).await; - deliver - } - - /// Main loop ordering input bytes. - pub async fn run(&mut self, consensus_delay: Duration) { - let mut waiting = FuturesUnordered::new(); - let mut subscribers = HashMap::new(); - loop { - tokio::select! { - // Receive bytes to order. - Some(message) = self.rx_input.recv() => { - waiting.push(Self::waiter(message, consensus_delay)); - }, - - // Receive subscribers to update with the sequencer's output. - Some(message) = self.rx_subscriber.recv() => { - if self.subscribers_count < Self::MAX_SUBSCRIBERS { - let SubscriberMessage(sender, id) = message; - subscribers.insert(id, sender); - self.subscribers_count +=1 ; - } - }, - - // Bytes are ready to be delivered, notify the subscribers. - Some(message) = waiting.next() => { - let output = ConsensusOutput { - message: message.to_vec(), - sequence_number: self.consensus_index, - }; - - // Store the sequenced message. If this fails, we do not notify and subscribers - // and effectively throw away the message. Liveness may be lost. - if let Err(e) = self.store.store_output(&output) { - log::error!("Failed to store consensus output: {e}"); - continue; - } - - // Increment the consensus index. - self.consensus_index = self.consensus_index.increment(); - - // Notify the subscribers of the new output. If a subscriber's channel is full - // (the subscriber is slow), we simply skip this output. The subscriber will - // eventually sync to catch up. - let mut to_drop = Vec::new(); - for (id, subscriber) in &subscribers { - if subscriber.is_closed() { - to_drop.push(*id); - continue; - } - if subscriber.capacity() > 0 && subscriber.send(output.clone()).await.is_err() { - to_drop.push(*id); - } - } - - // Cleanup the list subscribers that dropped connection. - for id in to_drop { - subscribers.remove(&id); - self.subscribers_count -= 1; - } - } - } - } - } -} - -/// Define how the network server should handle incoming clients' certificates. This -/// is not got to stream many input transactions (benchmarks) as the task handling the -/// TCP connection blocks until a reply is ready. -struct InputServer { - /// Send user transactions to the sequencer. - pub tx_input: Sender, -} - -#[async_trait] -impl<'a, Stream> MessageHandler for InputServer -where - Stream: 'static + RwChannel<'a> + Unpin + Send, -{ - async fn handle_messages(&self, mut stream: Stream) { - loop { - // Read the user's certificate. - let buffer = match stream.stream().next().await { - Some(Ok(buffer)) => buffer, - Some(Err(e)) => { - log::warn!("Error while reading TCP stream: {e}"); - break; - } - None => { - log::debug!("Connection dropped by the client"); - break; - } - }; - - // Send the certificate to the sequencer. - if self.tx_input.send(buffer.freeze()).await.is_err() { - panic!("Failed to sequence input bytes"); - } - - // Send an acknowledgment to the client. - if stream.sink().send(Bytes::from("Ok")).await.is_err() { - log::debug!("Failed to send ack to client"); - break; - } - } - } -} - -/// Represents the subscriber's unique id. -pub type SubscriberId = usize; - -/// The messages sent by the subscriber server to the sequencer core to notify -/// the core of a new subscriber. -#[derive(Debug)] -pub struct SubscriberMessage(Sender, SubscriberId); - -/// Define how the network server should handle incoming authorities sync requests. -/// The authorities are basically light clients of the sequencer. A real consensus -/// implementation would make sure to receive an ack from the authorities and retry -/// sending until the message is delivered. This is safety-critical. -struct SubscriberServer { - /// Notify the sequencer's core of a new subscriber. - pub tx_subscriber: Sender, - /// Count the number of subscribers. - counter: AtomicUsize, - /// The persistent storage. It is only used to help subscribers to sync, this - /// task never writes to the store. - store: Arc, -} - -impl SubscriberServer { - /// The number of pending updates that the subscriber can hold in memory. - pub const CHANNEL_SIZE: usize = 1_000; - - /// Create a new subscriber server. - pub fn new(tx_subscriber: Sender, store: Arc) -> Self { - Self { - tx_subscriber, - counter: AtomicUsize::new(0), - store, - } - } - - /// Helper function loading from store the outputs missed by the subscriber (in the right order). - async fn synchronize<'a, Stream>( - &self, - sequence_number: SequenceNumber, - stream: &mut Stream, - ) -> SuiResult<()> - where - Stream: 'static + RwChannel<'a> + Unpin + Send, - { - // TODO: Loading the missed outputs one by one may not be the most efficient. But we can't - // load them all in memory at once (there may be a lot of missed outputs). We could do - // this in chunks. - let consensus_index = self.store.get_last_consensus_index()?; - if sequence_number < consensus_index { - let start: u64 = sequence_number.into(); - let stop: u64 = consensus_index.into(); - for i in start..=stop { - let index = SequenceNumber::from(i); - let message = self.store.get_output(&index)?.unwrap(); - let serialized = serialize_consensus_output(&message); - if stream.sink().send(Bytes::from(serialized)).await.is_err() { - log::debug!("Connection dropped by subscriber"); - break; - } - } - } - Ok(()) - } -} - -#[async_trait] -impl<'a, Stream> MessageHandler for SubscriberServer -where - Stream: 'static + RwChannel<'a> + Unpin + Send, -{ - async fn handle_messages(&self, mut stream: Stream) { - let (tx_output, mut rx_output) = channel(Self::CHANNEL_SIZE); - let subscriber_id = self.counter.fetch_add(1, Ordering::SeqCst); - - // Notify the core of a new subscriber. - self.tx_subscriber - .send(SubscriberMessage(tx_output, subscriber_id)) - .await - .expect("Failed to send new subscriber to core"); - - // Interact with the subscriber. - loop { - tokio::select! { - // Update the subscriber every time a certificate is sequenced. - Some(message) = rx_output.recv() => { - let serialized = serialize_consensus_output(&message); - if stream.sink().send(Bytes::from(serialized)).await.is_err() { - log::debug!("Connection dropped by subscriber"); - break; - } - }, - - // Receive sync requests form the subscriber. - Some(buffer) = stream.stream().next() => match buffer { - Ok(buffer) => match deserialize_message(&*buffer) { - Ok(SerializedMessage::ConsensusSync(sync)) => { - if let Err(e) = self.synchronize(sync.sequence_number, &mut stream).await { - log::error!("{e}"); - break; - } - } - Ok(_) => { - log::warn!("{}", SuiError::UnexpectedMessage); - break; - } - Err(e) => { - log::warn!("Failed to deserialize consensus sync request {e}"); - break; - } - }, - Err(e) => { - log::warn!("Error while reading TCP stream: {e}"); - break; - } - } - } - } - } -} - -/// The persistent storage of the sequencer. -pub struct SequencerStore { - /// All sequenced messages indexed by sequence number. - outputs: DBMap, -} - -impl SequencerStore { - /// Open the consensus store. - pub fn open>(path: P, db_options: Option) -> Self { - let row_cache = rocksdb::Cache::new_lru_cache(1_000_000).expect("Cache is ok"); - let mut options = db_options.unwrap_or_default(); - options.set_row_cache(&row_cache); - options.set_table_cache_num_shard_bits(10); - options.set_compression_type(DBCompressionType::None); - - let db = Self::open_cf_opts( - &path, - Some(options.clone()), - &[("last_consensus_index", &options), ("outputs", &options)], - ) - .expect("Cannot open DB."); - - Self { - outputs: DBMap::reopen(&db, Some("outputs")).expect("Cannot open CF."), - } - } - - /// Helper function to open the store. - fn open_cf_opts>( - path: P, - db_options: Option, - opt_cfs: &[(&str, &rocksdb::Options)], - ) -> Result>, TypedStoreError> { - let mut options = db_options.unwrap_or_default(); - options.create_if_missing(true); - options.create_missing_column_families(true); - - let mut cfs = DBWithThreadMode::::list_cf(&options, &path) - .ok() - .unwrap_or_default(); - for cf_key in opt_cfs.iter().map(|(name, _)| name) { - let key = (*cf_key).to_owned(); - if !cfs.contains(&key) { - cfs.push(key); - } - } - - Ok(Arc::new( - DBWithThreadMode::::open_cf_descriptors( - &options, - /* primary */ &path.as_ref(), - opt_cfs - .iter() - .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())), - )?, - )) - } - - /// Read the last consensus index from the store. - pub fn get_last_consensus_index(&self) -> SuiResult { - self.outputs - .iter() - .skip_prior_to(&SequenceNumber::MAX)? - .next() - .map_or_else(|| Ok(SequenceNumber::default()), |(s, _)| Ok(s.increment())) - } - - /// Stores a new consensus output. - pub fn store_output(&self, output: &ConsensusOutput) -> SuiResult<()> { - let mut write_batch = self.outputs.batch(); - write_batch = write_batch.insert_batch( - &self.outputs, - std::iter::once((output.sequence_number, output)), - )?; - write_batch.write().map_err(SuiError::from) - } - - /// Load a specific output from storage. - pub fn get_output(&self, index: &SequenceNumber) -> SuiResult> { - self.outputs.get(index).map_err(SuiError::from) - } -}