Skip to content

Commit

Permalink
[feat] introducing the Mysticeti Transactions client and consumer. (M…
Browse files Browse the repository at this point in the history
…ystenLabs#15666)

## Description 

Introduces the `TransactionsClient` to be used by the Consensus client
(SUI) to submit transactions to, and also the `TransactionsConsumer` to
be used by `Core` to fetch/pull transactions to be included in a future
block proposal

## Test Plan 

CI

---
If your changes are not user-facing and do not break anything, you can
skip the following section. Otherwise, please briefly describe what has
changed under the Release Notes section.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
akichidis authored Jan 18, 2024
1 parent 5411a18 commit f6f87c6
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions consensus/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ prometheus.workspace = true
rand.workspace = true
serde.workspace = true
sui-protocol-config.workspace = true
tap.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
21 changes: 21 additions & 0 deletions consensus/core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,27 @@ pub type BlockTimestampMs = u64;
/// Round number of a block.
pub type Round = u32;

/// The transaction serialised bytes
#[derive(Clone, Eq, PartialEq, Serialize, Deserialize, Default, Debug)]
pub(crate) struct Transaction {
data: bytes::Bytes,
}

#[allow(dead_code)]
impl Transaction {
pub fn new(data: Vec<u8>) -> Self {
Self { data: data.into() }
}

pub fn data(&self) -> &[u8] {
&self.data
}

pub fn into_data(self) -> Vec<u8> {
self.data.to_vec()
}
}

/// A block includes references to previous round blocks and transactions that the validator
/// considers valid.
/// Well behaved validators produce at most one block per round, but malicious validators can
Expand Down
48 changes: 47 additions & 1 deletion consensus/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use sui_protocol_config::ProtocolConfig;

use crate::metrics::Metrics;

#[cfg(test)]
use crate::metrics::test_metrics;

/// Context contains per-epoch configuration and metrics shared by all components
/// of this authority.
#[allow(dead_code)]
Expand All @@ -24,8 +27,8 @@ pub(crate) struct Context {
pub metrics: Arc<Metrics>,
}

#[allow(dead_code)]
impl Context {
#[allow(dead_code)]
pub(crate) fn new(
own_index: AuthorityIndex,
committee: Committee,
Expand All @@ -41,4 +44,47 @@ impl Context {
metrics,
}
}

#[cfg(test)]
pub(crate) fn new_for_test() -> Self {
let (committee, _) = Committee::new_for_test(0, vec![1, 1, 1, 1]);
let metrics = test_metrics();
Context::new(
AuthorityIndex::new_for_test(0),
committee,
Parameters::default(),
ProtocolConfig::get_for_min_version(),
metrics,
)
}

#[cfg(test)]
pub(crate) fn with_committee(mut self, committee: Committee) -> Self {
self.committee = committee;
self
}

#[cfg(test)]
pub(crate) fn with_authority_index(mut self, authority: AuthorityIndex) -> Self {
self.own_index = authority;
self
}

#[cfg(test)]
pub(crate) fn with_parameters(mut self, parameters: Parameters) -> Self {
self.parameters = parameters;
self
}

#[cfg(test)]
pub(crate) fn with_protocol_config(mut self, protocol_config: ProtocolConfig) -> Self {
self.protocol_config = protocol_config;
self
}

#[cfg(test)]
pub(crate) fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
self.metrics = metrics;
self
}
}
31 changes: 12 additions & 19 deletions consensus/core/src/core_thread.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use mysten_metrics::{metered_channel, monitored_scope};
use std::{collections::HashSet, fmt::Debug, sync::Arc, thread};

use mysten_metrics::monitored_scope;
use thiserror::Error;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::{mpsc, oneshot};
use tracing::warn;

use crate::{
Expand All @@ -15,10 +14,11 @@ use crate::{
core::Core,
core_thread::CoreError::Shutdown,
};
const CORE_THREAD_COMMANDS_CHANNEL_SIZE: usize = 32;

#[allow(unused)]
pub(crate) struct CoreThreadDispatcherHandle {
sender: mpsc::Sender<CoreThreadCommand>,
sender: metered_channel::Sender<CoreThreadCommand>,
join_handle: thread::JoinHandle<()>,
}

Expand All @@ -34,7 +34,7 @@ impl CoreThreadDispatcherHandle {
#[allow(unused)]
struct CoreThread {
core: Core,
receiver: mpsc::Receiver<CoreThreadCommand>,
receiver: metered_channel::Receiver<CoreThreadCommand>,
context: Arc<Context>,
}

Expand Down Expand Up @@ -66,7 +66,7 @@ impl CoreThread {
#[derive(Clone)]
#[allow(dead_code)]
pub(crate) struct CoreThreadDispatcher {
sender: mpsc::WeakSender<CoreThreadCommand>,
sender: metered_channel::WeakSender<CoreThreadCommand>,
context: Arc<Context>,
}

Expand All @@ -88,7 +88,11 @@ pub(crate) enum CoreError {
#[allow(unused)]
impl CoreThreadDispatcher {
pub fn start(core: Core, context: Arc<Context>) -> (Self, CoreThreadDispatcherHandle) {
let (sender, receiver) = mpsc::channel(32);
let (sender, receiver) = metered_channel::channel_with_total(
CORE_THREAD_COMMANDS_CHANNEL_SIZE,
&context.metrics.channel_metrics.core_thread,
&context.metrics.channel_metrics.core_thread_total,
);
let core_thread = CoreThread {
core,
receiver,
Expand Down Expand Up @@ -148,21 +152,10 @@ impl CoreThreadDispatcher {
mod test {
use super::*;
use crate::context::Context;
use crate::metrics::test_metrics;
use consensus_config::{AuthorityIndex, Committee, Parameters};
use sui_protocol_config::ProtocolConfig;

#[tokio::test]
async fn test_core_thread() {
let (committee, _) = Committee::new_for_test(0, vec![1, 1, 1, 1]);
let metrics = test_metrics();
let context = Arc::new(Context::new(
AuthorityIndex::new_for_test(0),
committee,
Parameters::default(),
ProtocolConfig::get_for_min_version(),
metrics,
));
let context = Arc::new(Context::new_for_test());

let core = Core::new(context.clone());
let (core_dispatcher, handle) = CoreThreadDispatcher::start(core, context);
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ mod metrics;
mod stake_aggregator;
mod storage;
mod threshold_clock;
mod transactions_client;
mod validator;
49 changes: 46 additions & 3 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

use prometheus::{
register_histogram_with_registry, register_int_counter_with_registry, Histogram, IntCounter,
Registry,
register_histogram_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, Registry,
};
use std::sync::Arc;

Expand All @@ -15,12 +15,17 @@ const LATENCY_SEC_BUCKETS: &[f64] = &[

pub(crate) struct Metrics {
pub node_metrics: NodeMetrics,
pub channel_metrics: ChannelMetrics,
}

pub(crate) fn initialise_metrics(registry: Registry) -> Arc<Metrics> {
let node_metrics = NodeMetrics::new(&registry);
let channel_metrics = ChannelMetrics::new(&registry);

Arc::new(Metrics { node_metrics })
Arc::new(Metrics {
node_metrics,
channel_metrics,
})
}

#[cfg(test)]
Expand Down Expand Up @@ -73,3 +78,41 @@ impl NodeMetrics {
}
}
}

pub(crate) struct ChannelMetrics {
/// occupancy of the channel from TransactionsClient to TransactionsConsumer
pub tx_transactions_submit: IntGauge,
/// total received on channel from TransactionsClient to TransactionsConsumer
pub tx_transactions_submit_total: IntCounter,
/// occupancy of the CoreThread commands channel
pub core_thread: IntGauge,
/// total received on the CoreThread commands channel
pub core_thread_total: IntCounter,
}

impl ChannelMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
tx_transactions_submit: register_int_gauge_with_registry!(
"tx_transactions_submit",
"occupancy of the channel from the `TransactionsClient` to the `TransactionsConsumer`",
registry
).unwrap(),
tx_transactions_submit_total: register_int_counter_with_registry!(
"tx_transactions_submit_total",
"total received on channel from the `TransactionsClient` to the `TransactionsConsumer`",
registry
).unwrap(),
core_thread: register_int_gauge_with_registry!(
"core_thread",
"occupancy of the `CoreThread` commands channel",
registry
).unwrap(),
core_thread_total: register_int_counter_with_registry!(
"core_thread_total",
"total received on the `CoreThread` commands channel",
registry
).unwrap(),
}
}
}
122 changes: 122 additions & 0 deletions consensus/core/src/transactions_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::block::Transaction;
use crate::context::Context;
use mysten_metrics::metered_channel;
use mysten_metrics::metered_channel::channel_with_total;
use std::sync::Arc;
use tap::tap::TapFallible;
use thiserror::Error;
use tracing::error;

/// Maximum number of transactions to be fetched per request of `next`
const MAX_FETCHED_TRANSACTIONS: usize = 100;
/// The maximum number of transactions pending to the queue to be pulled for block proposal
const MAX_PENDING_TRANSACTIONS: usize = 2_000;

/// The TransactionsConsumer is responsible for fetching the next transactions to be included for the block proposals.
/// The transactions are submitted to a channel which is shared between the TransactionsConsumer and the TransactionsClient
/// and are pulled every time the `next` method is called.
#[allow(dead_code)]
pub(crate) struct TransactionsConsumer {
tx_receiver: metered_channel::Receiver<Transaction>,
max_fetched_per_request: usize,
}

#[allow(dead_code)]
impl TransactionsConsumer {
pub(crate) fn new(tx_receiver: metered_channel::Receiver<Transaction>) -> Self {
Self {
tx_receiver,
max_fetched_per_request: MAX_FETCHED_TRANSACTIONS,
}
}

pub(crate) fn with_max_fetched_per_request(mut self, max_fetched_per_request: usize) -> Self {
self.max_fetched_per_request = max_fetched_per_request;
self
}

// Attempts to fetch the next transactions that have been submitted for sequence.
pub(crate) fn next(&mut self) -> Vec<Transaction> {
let mut transactions = Vec::new();
while let Ok(transaction) = self.tx_receiver.try_recv() {
transactions.push(transaction);

if transactions.len() >= self.max_fetched_per_request {
break;
}
}
transactions
}
}

#[derive(Clone)]
#[allow(dead_code)]
pub struct TransactionsClient {
sender: metered_channel::Sender<Transaction>,
}

#[derive(Error, Debug)]
pub enum ClientError {
#[error("Failed to submit transaction to consensus: {0}")]
SubmitError(String),
}

#[allow(dead_code)]
impl TransactionsClient {
pub(crate) fn new(context: Arc<Context>) -> (Self, metered_channel::Receiver<Transaction>) {
let (sender, receiver) = channel_with_total(
MAX_PENDING_TRANSACTIONS,
&context.metrics.channel_metrics.tx_transactions_submit,
&context.metrics.channel_metrics.tx_transactions_submit_total,
);

(Self { sender }, receiver)
}

// Submits a transaction to be sequenced.
pub async fn submit(&self, transaction: Vec<u8>) -> Result<(), ClientError> {
self.sender
.send(Transaction::new(transaction))
.await
.tap_err(|e| error!("Submit transaction failed with {:?}", e))
.map_err(|e| ClientError::SubmitError(e.to_string()))
}
}

#[cfg(test)]
mod tests {
use crate::context::Context;
use crate::transactions_client::{TransactionsClient, TransactionsConsumer};
use std::sync::Arc;

#[tokio::test]
async fn basic_submit_and_consume() {
let context = Arc::new(Context::new_for_test());
let (client, tx_receiver) = TransactionsClient::new(context);
let mut consumer = TransactionsConsumer::new(tx_receiver);

// submit some transactions
for i in 0..3 {
let transaction =
bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
client
.submit(transaction)
.await
.expect("Shouldn't submit successfully transaction")
}

// now pull the transactions from the consumer
let transactions = consumer.next();
assert_eq!(transactions.len(), 3);

for (i, transaction) in transactions.iter().enumerate() {
let t: String = bcs::from_bytes(transaction.data()).unwrap();
assert_eq!(format!("transaction {i}").to_string(), t);
}

// try to pull again transactions, result should be empty
assert!(consumer.next().is_empty());
}
}
Loading

0 comments on commit f6f87c6

Please sign in to comment.