Skip to content

Commit

Permalink
[server + bench] Now start the batch maker, and integrate into bench (M…
Browse files Browse the repository at this point in the history
…ystenLabs#1154)

* Start the batch subsystem on authority server spawn
* Added stream based rather than channel based batch facility

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Mar 31, 2022
1 parent fa90884 commit 0b58459
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 28 deletions.
21 changes: 13 additions & 8 deletions network_utils/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,18 @@ pub trait RwChannel<'a> {
}

/// The result of spawning a server is oneshot channel to kill it and a handle to track completion.
pub struct SpawnedServer {
pub struct SpawnedServer<S> {
state: Arc<S>,
tx_cancellation: futures::channel::oneshot::Sender<()>,
handle: tokio::task::JoinHandle<Result<(), std::io::Error>>,
local_addr: SocketAddr,
}

impl SpawnedServer {
impl<S> SpawnedServer<S> {
pub fn state(&self) -> &Arc<S> {
&self.state
}

pub async fn join(self) -> Result<(), std::io::Error> {
// Note that dropping `self.complete` would terminate the server.
self.handle.await??;
Expand Down Expand Up @@ -85,9 +90,9 @@ pub async fn connect(
/// Run a server for this protocol and the given message handler.
pub async fn spawn_server<S>(
address: &str,
state: S,
state: Arc<S>,
buffer_size: usize,
) -> Result<SpawnedServer, std::io::Error>
) -> Result<SpawnedServer<S>, std::io::Error>
where
S: MessageHandler<TcpDataStream> + Send + Sync + 'static,
{
Expand All @@ -104,11 +109,12 @@ where

let handle = tokio::spawn(run_tcp_server(
listener,
state,
state.clone(),
rx_cancellation,
buffer_size,
));
Ok(SpawnedServer {
state,
tx_cancellation,
handle,
local_addr,
Expand Down Expand Up @@ -187,14 +193,13 @@ impl<'a> RwChannel<'a> for TcpDataStream {
// Server implementation for TCP.
async fn run_tcp_server<S>(
listener: TcpListener,
state: S,
state: Arc<S>,
mut exit_future: futures::channel::oneshot::Receiver<()>,
_buffer_size: usize,
) -> Result<(), std::io::Error>
where
S: MessageHandler<TcpDataStream> + Send + Sync + 'static,
{
let guarded_state = Arc::new(state);
loop {
let stream;

Expand All @@ -206,7 +211,7 @@ where
}
}

let guarded_state = guarded_state.clone();
let guarded_state = state.clone();
tokio::spawn(async move {
let framed = TcpDataStream::from_tcp_stream(stream, _buffer_size);
guarded_state.handle_messages(framed).await
Expand Down
2 changes: 1 addition & 1 deletion network_utils/src/unit_tests/transport_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn test_server() -> Result<(usize, usize), std::io::Error> {
let counter = Arc::new(AtomicUsize::new(0));
let mut received = 0;

let server = spawn_server(&address, TestService::new(counter.clone()), 100).await?;
let server = spawn_server(&address, Arc::new(TestService::new(counter.clone())), 100).await?;

let mut client = connect(address.clone(), 1000).await?;
client.write_data(b"abcdef").await?;
Expand Down
62 changes: 60 additions & 2 deletions sui/src/microbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ use std::collections::{HashSet, VecDeque};
use std::time::{Duration, Instant};
use structopt::StructOpt;
use sui_adapter::genesis;
use sui_core::authority_client::AuthorityClient;
use sui_core::{authority::*, authority_server::AuthorityServer};
use sui_network::{network::NetworkClient, transport};
use sui_types::batch::UpdateItem;
use sui_types::crypto::{get_key_pair, AuthoritySignature, KeyPair, PublicKeyBytes, Signature};
use sui_types::SUI_FRAMEWORK_ADDRESS;
use sui_types::{base_types::*, committee::*, messages::*, object::Object, serialize::*};
Expand Down Expand Up @@ -260,7 +262,10 @@ impl ClientServerBenchmark {
(state, transactions)
}

async fn spawn_server(&self, state: AuthorityState) -> transport::SpawnedServer {
async fn spawn_server(
&self,
state: AuthorityState,
) -> transport::SpawnedServer<AuthorityServer> {
let server = AuthorityServer::new(self.host.clone(), self.port, self.buffer_size, state);
server.spawn().await.unwrap()
}
Expand Down Expand Up @@ -292,6 +297,59 @@ impl ClientServerBenchmark {
Duration::from_micros(self.send_timeout_us),
Duration::from_micros(self.recv_timeout_us),
);

// We spawn a second client that listens to the batch interface
let client_batch = NetworkClient::new(
self.host.clone(),
self.port,
self.buffer_size,
Duration::from_micros(self.send_timeout_us),
Duration::from_micros(self.recv_timeout_us),
);

let _batch_client_handle = tokio::task::spawn(async move {
let authority_client = AuthorityClient::new(client_batch);

let mut start = 0;

loop {
let receiver = authority_client
.handle_batch_streaming_as_stream(BatchInfoRequest {
start,
end: start + 10_000,
})
.await;

if let Err(e) = &receiver {
error!("Listener error: {:?}", e);
break;
}
let mut receiver = receiver.unwrap();

info!("Start batch listener at sequence: {}.", start);
while let Some(item) = receiver.next().await {
match item {
Ok(BatchInfoResponseItem(UpdateItem::Transaction((
_tx_seq,
_tx_digest,
)))) => {
start = _tx_seq + 1;
}
Ok(BatchInfoResponseItem(UpdateItem::Batch(_signed_batch))) => {
info!(
"Client received batch up to sequence {}",
_signed_batch.batch.next_sequence_number
);
}
Err(err) => {
error!("{:?}", err);
break;
}
}
}
}
});

info!("Sending requests.");
if self.single_operation {
// Send batches one by one
Expand Down Expand Up @@ -354,7 +412,7 @@ fn make_transfer_transaction(

SingleTransactionKind::Call(MoveCall {
package: framework_obj_ref,
module: ident_str!("GAS").to_owned(),
module: ident_str!("SUI").to_owned(),
function: ident_str!("transfer").to_owned(),
type_arguments: Vec::new(),
object_arguments: vec![object_ref],
Expand Down
2 changes: 1 addition & 1 deletion sui/src/sui_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl SuiCommand {
}

pub struct SuiNetwork {
pub spawned_authorities: Vec<SpawnedServer>,
pub spawned_authorities: Vec<SpawnedServer<AuthorityServer>>,
}

impl SuiNetwork {
Expand Down
2 changes: 1 addition & 1 deletion sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub use authority_store::{AuthorityStore, GatewayStore};

pub mod authority_notifier;

const MAX_ITEMS_LIMIT: u64 = 10_000;
const MAX_ITEMS_LIMIT: u64 = 100_000;
const BROADCAST_CAPACITY: usize = 10_000;

/// a Trait object for `signature::Signer` that is:
Expand Down
59 changes: 57 additions & 2 deletions sui_core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

use async_trait::async_trait;
use futures::channel::mpsc::{channel, Receiver};
use futures::SinkExt;
use futures::Stream;
use futures::{SinkExt, StreamExt};
use std::io;
use sui_network::network::{parse_recv_bytes, NetworkClient};
use sui_network::transport::TcpDataStream;
use sui_types::batch::UpdateItem;
use sui_types::{error::SuiError, messages::*, serialize::*};

Expand Down Expand Up @@ -141,7 +143,7 @@ impl AuthorityAPI for AuthorityClient {
loop {
let next_data = tcp_stream.read_data().await.transpose();
let data_result = parse_recv_bytes(next_data);
match deserialize_batch_info(data_result) {
match data_result.and_then(deserialize_batch_info) {
Ok(batch_info_response_item) => {
// send to the caller via the channel
let _ = tx_output.send(Ok(batch_info_response_item.clone())).await;
Expand Down Expand Up @@ -172,3 +174,56 @@ impl AuthorityAPI for AuthorityClient {
Ok(tr_output)
}
}

impl AuthorityClient {
/// Handle Batch information requests for this authority.
pub async fn handle_batch_streaming_as_stream(
&self,
request: BatchInfoRequest,
) -> Result<impl Stream<Item = Result<BatchInfoResponseItem, SuiError>>, io::Error> {
let tcp_stream = self
.0
.connect_for_stream(serialize_batch_request(&request))
.await?;

let mut error_count = 0;
let TcpDataStream { framed_read, .. } = tcp_stream;

let stream = framed_read
.map(|item| {
item
// Convert io error to SuiCLient error
.map_err(|err| SuiError::ClientIoError {
error: format!("io error: {:?}", err),
})
// If no error try to deserialize
.and_then(|bytes| match deserialize_message(&bytes[..]) {
Ok(SerializedMessage::Error(error)) => Err(SuiError::ClientIoError {
error: format!("io error: {:?}", error),
}),
Ok(message) => Ok(message),
Err(_) => Err(SuiError::InvalidDecoding),
})
// If deserialized try to parse as Batch Item
.and_then(deserialize_batch_info)
})
// Establish conditions to stop taking from the stream
.take_while(move |item| {
let flag = match item {
Ok(BatchInfoResponseItem(UpdateItem::Batch(signed_batch))) => {
signed_batch.batch.next_sequence_number < request.end
}
Ok(BatchInfoResponseItem(UpdateItem::Transaction((seq, _digest)))) => {
*seq < request.end
}
Err(_e) => {
// TODO: record e
error_count += 1;
error_count < MAX_ERRORS
}
};
futures::future::ready(flag)
});
Ok(stream)
}
}
14 changes: 10 additions & 4 deletions sui_core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ mod server_tests;
set it, or a dynamic mechanism to adapt it according to observed workload.
*/
const CHUNK_SIZE: usize = 36;
const MIN_BATCH_SIZE: u64 = 1000;
const MAX_DELAY_MILLIS: u64 = 5_000; // 5 sec

pub struct AuthorityServer {
server: NetworkServer,
Expand Down Expand Up @@ -61,7 +63,6 @@ impl AuthorityServer {
max_delay: Duration,
) -> SuiResult<tokio::task::JoinHandle<SuiResult<()>>> {
// Start the batching subsystem, and register the handles with the authority.
// let last_batch = self.state.init_batches_from_database()?;
let local_server = self.clone();

let _batch_join_handle = tokio::task::spawn(async move {
Expand All @@ -74,12 +75,17 @@ impl AuthorityServer {
Ok(_batch_join_handle)
}

pub async fn spawn(self) -> Result<SpawnedServer, io::Error> {
pub async fn spawn(self) -> Result<SpawnedServer<AuthorityServer>, io::Error> {
let address = format!("{}:{}", self.server.base_address, self.server.base_port);
let buffer_size = self.server.buffer_size;
let guarded_state = Arc::new(self);

// Launch server for the appropriate protocol.
spawn_server(&address, self, buffer_size).await
// Start the batching subsystem
let _join_handle = guarded_state
.spawn_batch_subsystem(MIN_BATCH_SIZE, Duration::from_millis(MAX_DELAY_MILLIS))
.await;

spawn_server(&address, guarded_state, buffer_size).await
}

async fn handle_batch_streaming<'a, 'b, A>(
Expand Down
11 changes: 4 additions & 7 deletions sui_types/src/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,11 @@ pub fn deserialize_transaction_info(
}

pub fn deserialize_batch_info(
message: Result<SerializedMessage, SuiError>,
message: SerializedMessage,
) -> Result<BatchInfoResponseItem, SuiError> {
match message {
Ok(message) => match message {
SerializedMessage::BatchInfoResp(resp) => Ok(*resp),
SerializedMessage::Error(error) => Err(*error),
_ => Err(SuiError::UnexpectedMessage),
},
Err(e) => Err(e),
SerializedMessage::BatchInfoResp(resp) => Ok(*resp),
SerializedMessage::Error(error) => Err(*error),
_ => Err(SuiError::UnexpectedMessage),
}
}
4 changes: 2 additions & 2 deletions test_utils/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Sequencer {
let input_server = InputServer { tx_input };
sui_network::transport::spawn_server(
&sequencer.input_address.to_string(),
input_server,
Arc::new(input_server),
sequencer.buffer_size,
)
.await
Expand All @@ -73,7 +73,7 @@ impl Sequencer {
let subscriber_server = SubscriberServer::new(tx_subscriber, store);
sui_network::transport::spawn_server(
&sequencer.subscriber_address.to_string(),
subscriber_server,
Arc::new(subscriber_server),
sequencer.buffer_size,
)
.await
Expand Down

0 comments on commit 0b58459

Please sign in to comment.