Skip to content

Commit

Permalink
add WS transaction subscription to JSONRPC (MystenLabs#11432)
Browse files Browse the repository at this point in the history
## Description 

Added `suix_subscribeTransaction`, user can subscribe to transaction
effects stream using `TransactionFilter` object

## Test Plan 

Unit test
  • Loading branch information
patrickkuo authored Apr 29, 2023
1 parent 8440912 commit 695e7f4
Show file tree
Hide file tree
Showing 18 changed files with 277 additions and 101 deletions.
27 changes: 14 additions & 13 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ use prometheus::{
};
use serde::de::DeserializeOwned;
use serde::Serialize;
use sui_config::transaction_deny_config::TransactionDenyConfig;
use sui_types::metrics::LimitsMetrics;
use sui_types::{is_system_package, TypeTag};
use tap::{TapFallible, TapOptional};
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::oneshot;
Expand All @@ -47,10 +44,12 @@ use sui_config::genesis::Genesis;
use sui_config::node::{
AuthorityStorePruningConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
};
use sui_config::transaction_deny_config::TransactionDenyConfig;
use sui_framework::{BuiltInFramework, SystemPackage};
use sui_json_rpc_types::{
Checkpoint, DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent,
SuiMoveValue, SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEvents,
SuiMoveValue, SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
SuiTransactionBlockEvents, TransactionFilter,
};
use sui_macros::{fail_point, fail_point_async};
use sui_protocol_config::{ProtocolConfig, SupportedProtocolVersions};
Expand All @@ -72,8 +71,8 @@ use sui_types::messages_checkpoint::{
CheckpointSequenceNumber, CheckpointSummary, CheckpointTimestamp, VerifiedCheckpoint,
};
use sui_types::messages_checkpoint::{CheckpointRequest, CheckpointResponse};
use sui_types::metrics::LimitsMetrics;
use sui_types::object::{MoveObject, Owner, PastObjectRead, OBJECT_START_VERSION};
use sui_types::query::TransactionFilter;
use sui_types::storage::{ObjectKey, ObjectStore, WriteKind};
use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
use sui_types::sui_system_state::SuiSystemState;
Expand All @@ -92,6 +91,7 @@ use sui_types::{
object::{Object, ObjectFormatOptions, ObjectRead},
SUI_SYSTEM_ADDRESS,
};
use sui_types::{is_system_package, TypeTag};
use typed_store::Map;

use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
Expand All @@ -103,7 +103,7 @@ use crate::authority::epoch_start_configuration::EpochStartConfiguration;
use crate::checkpoints::checkpoint_executor::CheckpointExecutor;
use crate::checkpoints::CheckpointStore;
use crate::epoch::committee_store::CommitteeStore;
use crate::event_handler::EventHandler;
use crate::event_handler::SubscriptionHandler;
use crate::execution_driver::execution_process;
use crate::module_cache_metrics::ResolverMetrics;
use crate::stake_aggregator::StakeAggregator;
Expand Down Expand Up @@ -486,7 +486,7 @@ pub struct AuthorityState {

pub indexes: Option<Arc<IndexStore>>,

pub event_handler: Arc<EventHandler>,
pub subscription_handler: Arc<SubscriptionHandler>,
pub(crate) checkpoint_store: Arc<CheckpointStore>,

committee_store: Arc<CommitteeStore>,
Expand Down Expand Up @@ -1456,7 +1456,7 @@ impl AuthorityState {
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<Option<DynamicFieldInfo>> {
// Skip if not a move object
let Some(move_object) = o.data.try_as_move().cloned() else {
let Some(move_object) = o.data.try_as_move().cloned() else {
return Ok(None);
};
// We only index dynamic field objects
Expand Down Expand Up @@ -1552,12 +1552,13 @@ impl AuthorityState {
.await
.tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
.tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"));

let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
// Emit events
if res.is_ok() {
self.event_handler
.process_events(
&effects.clone().try_into()?,
self.subscription_handler
.process_tx(
certificate.data().transaction_data(),
&effects,
&SuiTransactionBlockEvents::try_from(
events.clone(),
*tx_digest,
Expand Down Expand Up @@ -1781,7 +1782,7 @@ impl AuthorityState {
epoch_store: ArcSwap::new(epoch_store.clone()),
database: store,
indexes,
event_handler: Arc::new(EventHandler::default()),
subscription_handler: Arc::new(SubscriptionHandler::default()),
checkpoint_store,
committee_store,
transaction_manager,
Expand Down
47 changes: 35 additions & 12 deletions crates/sui-core/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,40 @@
use tokio_stream::Stream;
use tracing::{error, instrument, trace};

use sui_json_rpc_types::{EventFilter, SuiTransactionBlockEffects, SuiTransactionBlockEvents};
use crate::streamer::Streamer;
use sui_json_rpc_types::{
EffectsWithInput, EventFilter, SuiTransactionBlockEffects, SuiTransactionBlockEvents,
TransactionFilter,
};
use sui_json_rpc_types::{SuiEvent, SuiTransactionBlockEffectsAPI};
use sui_types::error::SuiResult;

use crate::streamer::Streamer;
use sui_types::messages::TransactionData;

#[cfg(test)]
#[path = "unit_tests/event_handler_tests.rs"]
mod event_handler_tests;

pub const EVENT_DISPATCH_BUFFER_SIZE: usize = 1000;

pub struct EventHandler {
event_streamer: Streamer<SuiEvent, EventFilter>,
pub struct SubscriptionHandler {
event_streamer: Streamer<SuiEvent, SuiEvent, EventFilter>,
transaction_streamer: Streamer<EffectsWithInput, SuiTransactionBlockEffects, TransactionFilter>,
}

impl Default for EventHandler {
impl Default for SubscriptionHandler {
fn default() -> Self {
let streamer = Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE);
Self {
event_streamer: streamer,
event_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE),
transaction_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE),
}
}
}

impl EventHandler {
#[instrument(level = "debug", skip_all, fields(tx_digest=?effects.transaction_digest()), err)]
pub async fn process_events(
impl SubscriptionHandler {
#[instrument(level = "debug", skip_all, fields(tx_digest = ? effects.transaction_digest()), err)]
pub async fn process_tx(
&self,
input: &TransactionData,
effects: &SuiTransactionBlockEffects,
events: &SuiTransactionBlockEvents,
) -> SuiResult {
Expand All @@ -42,6 +47,17 @@ impl EventHandler {
"Finished writing events to event store"
);

if let Err(e) = self
.transaction_streamer
.send(EffectsWithInput {
input: input.clone(),
effects: effects.clone(),
})
.await
{
error!(error =? e, "Failed to send transaction to dispatch");
}

// serially dispatch event processing to honor events' orders.
for event in events.data.clone() {
if let Err(e) = self.event_streamer.send(event).await {
Expand All @@ -51,7 +67,14 @@ impl EventHandler {
Ok(())
}

pub fn subscribe(&self, filter: EventFilter) -> impl Stream<Item = SuiEvent> {
pub fn subscribe_events(&self, filter: EventFilter) -> impl Stream<Item = SuiEvent> {
self.event_streamer.subscribe(filter)
}

pub fn subscribe_transactions(
&self,
filter: TransactionFilter,
) -> impl Stream<Item = SuiTransactionBlockEffects> {
self.transaction_streamer.subscribe(filter)
}
}
17 changes: 9 additions & 8 deletions crates/sui-core/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ type Subscribers<T, F> = Arc<RwLock<BTreeMap<String, (Sender<T>, F)>>>;

/// The Streamer splits a mpsc channel into multiple mpsc channels using the subscriber's `Filter<T>` object.
/// Data will be sent to the subscribers in parallel and the subscription will be dropped if it received a send error.
pub struct Streamer<T, F: Filter<T>> {
pub struct Streamer<T, S, F: Filter<T>> {
streamer_queue: Sender<T>,
subscribers: Subscribers<T, F>,
subscribers: Subscribers<S, F>,
}

impl<T, F> Streamer<T, F>
impl<T, S, F> Streamer<T, S, F>
where
T: Clone + Debug + Send + Sync + 'static,
S: From<T> + Clone + Debug + Send + Sync + 'static,
T: Clone + Send + Sync + 'static,
F: Filter<T> + Clone + Send + Sync + 'static + Clone,
{
pub fn spawn(buffer: usize) -> Self {
Expand All @@ -46,15 +47,15 @@ where
streamer
}

async fn send_to_all_subscribers(subscribers: Subscribers<T, F>, data: T) {
async fn send_to_all_subscribers(subscribers: Subscribers<S, F>, data: T) {
for (id, (subscriber, filter)) in subscribers.read().clone() {
if !(filter.matches(&data)) {
continue;
}
let data = data.clone();
let subscribers = subscribers.clone();
spawn_monitored_task!(async move {
match subscriber.send(data).await {
match subscriber.send(data.into()).await {
Ok(_) => {
debug!("Sending Move event to subscriber [{id}].")
}
Expand All @@ -68,8 +69,8 @@ where
}

/// Subscribe to the data stream filtered by the filter object.
pub fn subscribe(&self, filter: F) -> impl Stream<Item = T> {
let (tx, rx) = mpsc::channel::<T>(EVENT_DISPATCH_BUFFER_SIZE);
pub fn subscribe(&self, filter: F) -> impl Stream<Item = S> {
let (tx, rx) = mpsc::channel::<S>(EVENT_DISPATCH_BUFFER_SIZE);
self.subscribers
.write()
.insert(ObjectID::random().to_string(), (tx, filter));
Expand Down
25 changes: 18 additions & 7 deletions crates/sui-indexer/src/apis/indexer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use jsonrpsee::types::SubscriptionResult;
use jsonrpsee::{RpcModule, SubscriptionSink};

use move_core_types::identifier::Identifier;
use sui_core::event_handler::EventHandler;
use sui_core::event_handler::SubscriptionHandler;
use sui_json_rpc::api::{
validate_limit, IndexerApiClient, IndexerApiServer, QUERY_MAX_RESULT_LIMIT,
};
Expand All @@ -21,36 +21,35 @@ use sui_json_rpc::SuiRpcModule;
use sui_json_rpc_types::{
DynamicFieldPage, EventFilter, EventPage, ObjectsPage, Page, SuiObjectDataFilter,
SuiObjectResponse, SuiObjectResponseQuery, SuiTransactionBlockResponseQuery,
TransactionBlocksPage,
TransactionBlocksPage, TransactionFilter,
};
use sui_open_rpc::Module;
use sui_types::base_types::{ObjectID, SuiAddress};
use sui_types::digests::TransactionDigest;
use sui_types::dynamic_field::DynamicFieldName;
use sui_types::event::EventID;
use sui_types::query::TransactionFilter;

use crate::errors::IndexerError;
use crate::store::IndexerStore;

pub(crate) struct IndexerApi<S> {
state: S,
fullnode: HttpClient,
event_handler: Arc<EventHandler>,
subscription_handler: Arc<SubscriptionHandler>,
migrated_methods: Vec<String>,
}

impl<S: IndexerStore> IndexerApi<S> {
pub fn new(
state: S,
fullnode_client: HttpClient,
event_handler: Arc<EventHandler>,
event_handler: Arc<SubscriptionHandler>,
migrated_methods: Vec<String>,
) -> Self {
Self {
state,
fullnode: fullnode_client,
event_handler,
subscription_handler: event_handler,
migrated_methods,
}
}
Expand Down Expand Up @@ -434,7 +433,19 @@ where
}

fn subscribe_event(&self, sink: SubscriptionSink, filter: EventFilter) -> SubscriptionResult {
spawn_subscription(sink, self.event_handler.subscribe(filter));
spawn_subscription(sink, self.subscription_handler.subscribe_events(filter));
Ok(())
}

fn subscribe_transaction(
&self,
sink: SubscriptionSink,
filter: TransactionFilter,
) -> SubscriptionResult {
spawn_subscription(
sink,
self.subscription_handler.subscribe_transactions(filter),
);
Ok(())
}

Expand Down
10 changes: 6 additions & 4 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::task::JoinHandle;
use tracing::{error, info, warn};

use mysten_metrics::spawn_monitored_task;
use sui_core::event_handler::EventHandler;
use sui_core::event_handler::SubscriptionHandler;
use sui_json_rpc::api::{GovernanceReadApiClient, ReadApiClient};
use sui_json_rpc_types::{
OwnedObjectRef, SuiGetPastObjectRequest, SuiObjectData, SuiObjectDataOptions, SuiRawData,
Expand All @@ -26,6 +26,7 @@ use sui_json_rpc_types::{
use sui_sdk::error::Error;
use sui_types::base_types::{ObjectID, SequenceNumber};
use sui_types::committee::EpochId;
use sui_types::messages::SenderSignedData;
use sui_types::messages_checkpoint::{CheckpointCommitment, CheckpointSequenceNumber};
use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary;
use sui_types::SUI_SYSTEM_ADDRESS;
Expand Down Expand Up @@ -56,7 +57,7 @@ const EPOCH_QUEUE_LIMIT: usize = 2;
pub struct CheckpointHandler<S> {
state: S,
http_client: HttpClient,
event_handler: Arc<EventHandler>,
event_handler: Arc<SubscriptionHandler>,
metrics: IndexerMetrics,
config: IndexerConfig,
checkpoint_sender: Arc<Mutex<Sender<TemporaryCheckpointStore>>>,
Expand All @@ -75,7 +76,7 @@ where
pub fn new(
state: S,
http_client: HttpClient,
event_handler: Arc<EventHandler>,
event_handler: Arc<SubscriptionHandler>,
metrics: IndexerMetrics,
config: &IndexerConfig,
) -> Self {
Expand Down Expand Up @@ -397,8 +398,9 @@ where
for checkpoint in &downloaded_checkpoints {
let ws_guard = self.metrics.subscription_process_latency.start_timer();
for tx in &checkpoint.transactions {
let data: SenderSignedData = bcs::from_bytes(&tx.raw_transaction)?;
self.event_handler
.process_events(&tx.effects, &tx.events)
.process_tx(data.transaction_data(), &tx.effects, &tx.events)
.await?;
}
ws_guard.stop_and_record();
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use errors::IndexerError;
use handlers::checkpoint_handler::CheckpointHandler;
use mysten_metrics::{spawn_monitored_task, RegistryService};
use store::IndexerStore;
use sui_core::event_handler::EventHandler;
use sui_core::event_handler::SubscriptionHandler;
use sui_json_rpc::{JsonRpcServerBuilder, ServerHandle, CLIENT_SDK_TYPE_HEADER};
use sui_sdk::{SuiClient, SuiClientBuilder};

Expand Down Expand Up @@ -192,7 +192,7 @@ impl Indexer {
"Sui indexer of version {:?} started...",
env!("CARGO_PKG_VERSION")
);
let event_handler = Arc::new(EventHandler::default());
let event_handler = Arc::new(SubscriptionHandler::default());

if config.rpc_server_worker && config.fullnode_sync_worker {
info!("Starting indexer with both fullnode sync and RPC server");
Expand Down Expand Up @@ -403,7 +403,7 @@ pub async fn get_async_pg_pool_connection(
pub async fn build_json_rpc_server<S: IndexerStore + Sync + Send + 'static + Clone>(
prometheus_registry: &Registry,
state: S,
event_handler: Arc<EventHandler>,
event_handler: Arc<SubscriptionHandler>,
config: &IndexerConfig,
custom_runtime: Option<Handle>,
) -> Result<ServerHandle, IndexerError> {
Expand Down
Loading

0 comments on commit 695e7f4

Please sign in to comment.