Skip to content

Commit

Permalink
Event Store Initial Implementation (MystenLabs#2507)
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Chan authored Jun 15, 2022
1 parent ea0c9ec commit 74d519b
Show file tree
Hide file tree
Showing 8 changed files with 1,116 additions and 63 deletions.
316 changes: 297 additions & 19 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions crates/sui-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@ futures = "0.3.21"
flexstr = "^0.9"
rand = "0.7.3"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.80"
tokio = { version = "1.17.0", features = ["full", "tracing"] }
rocksdb = "0.18.0"
tracing = "0.1.34"
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls", "sqlite" ] }
strum = "^0.24"
strum_macros = "^0.24"

sui-types = { path = "../sui-types" }

Expand All @@ -26,6 +30,7 @@ workspace-hack = { path = "../workspace-hack"}
move-core-types = { git = "https://github.com/move-language/move", rev = "c2949bc7967de5b93f0850ce4987fc06c529f9f2", features = ["address20"] }

[dev-dependencies]
bcs = "0.1.3"
tempfile = "3.3.0"
num_cpus = "1.13.1"
pretty_assertions = "1.2.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,20 @@
//! Events are also archived into checkpoints so this API should support that as well.
//!
use async_trait::async_trait;
use move_core_types::language_storage::ModuleId;
use move_core_types::value::MoveValue;
use serde_json::Value;
use sui_types::base_types::{ObjectID, TransactionDigest};
use sui_types::event::{EventEnvelope, EventType};

pub mod sql;

use flexstr::SharedStr;

/// One event pulled out from the EventStore
#[allow(unused)]
#[derive(Clone, Debug, PartialEq)]
pub struct StoredEvent {
/// UTC timestamp in milliseconds
timestamp: u64,
Expand All @@ -28,84 +33,111 @@ pub struct StoredEvent {
tx_digest: Option<TransactionDigest>,
/// The variant name from SuiEvent, eg MoveEvent, Publish, etc.
event_type: SharedStr,
/// Object ID of the Move package generating the event
/// Package ID if available
package_id: Option<ObjectID>,
/// Module name of the Move package generating the event
module_name: Option<SharedStr>,
/// Function name that produced the event, for Move Events
function_name: Option<SharedStr>,
/// Object ID of NewObject, DeleteObject, package being published, or object being transferred
object_id: Option<ObjectID>,
/// Individual event fields. As much as possible these should be deconstructed and flattened,
/// ie `{'obj': {'fieldA': 'A', 'fieldB': 'B'}}` should really be broken down to
/// `[('obj.fieldA', 'A'), ('obj.fieldB', 'B')]
///
/// There is no guarantee of ordering in the fields.
///
/// ## Common field names
/// * `object_id` - used by TransferObject, DeleteObject
/// * `version` - used by TransferObject
/// * `destination` - address, in hex bytes, used by TransferObject
/// * `type` - used by TransferObject (TransferType - Coin, ToAddress, ToObject)
fields: Vec<(SharedStr, EventValue)>, // Change this to something based on CBOR for binary values, or our own value types for efficiency
}

/// Enum for different types of values returnable from events in the EventStore
// This is distinct from MoveValue because we want to explicitly represent (and translate)
// blobs and strings, allowing us to use more efficient representations.
#[derive(Clone, Debug, PartialEq)]
pub enum EventValue {
Move(MoveValue),
/// Efficient string representation, no allocation for small strings
String(SharedStr),
/// Arbitrary-length blob. Please use MoveValue::Address for ObjectIDs and similar things.
BinaryBlob(Vec<u8>),
Json(Value),
}

/// An EventStore supports event ingestion and flexible event querying
/// One can think of events as logs. They represent a log of what is happening to Sui.
/// Thus, all different kinds of events fit on a timeline, and one should be able to query for
/// different types of events that happen over that timeline.
trait EventStore<EventIt>
where
EventIt: Iterator<Item = StoredEvent>,
{
#[async_trait]
trait EventStore {
type EventIt: IntoIterator<Item = StoredEvent>;

/// Adds events to the EventStore.
/// Semantics: events are appended, no deduplication is done.
fn add_events(
async fn add_events(
&self,
events: &[EventEnvelope],
checkpoint_num: u64,
) -> Result<(), EventStoreError>;

/// Queries for events emitted by a given transaction, returned in order emitted
/// NOTE: Not all events come from transactions
fn events_for_transaction(&self, digest: TransactionDigest)
-> Result<EventIt, EventStoreError>;
async fn events_for_transaction(
&self,
digest: TransactionDigest,
) -> Result<Self::EventIt, EventStoreError>;

/// Queries for all events of a certain EventType within a given time window.
/// Will return at most limit of the most recent events within the window, sorted in ascending time.
fn events_by_type(
/// Will return at most limit of the most recent events within the window, sorted in descending time.
async fn events_by_type(
&self,
start_time: u64,
end_time: u64,
event_type: EventType,
limit: usize,
) -> Result<EventIt, EventStoreError>;
) -> Result<Self::EventIt, EventStoreError>;

/// Generic event iteration bounded by time. Return in ingestion order.
fn event_iterator(&self, start_time: u64, end_time: u64) -> Result<EventIt, EventStoreError>;
/// start_time is inclusive and end_time is exclusive.
async fn event_iterator(
&self,
start_time: u64,
end_time: u64,
limit: usize,
) -> Result<Self::EventIt, EventStoreError>;

/// Generic event iteration bounded by checkpoint number. Return in ingestion order.
/// Checkpoint numbers are inclusive on both ends.
fn events_by_checkpoint(
async fn events_by_checkpoint(
&self,
start_checkpoint: u64,
end_checkpoint: u64,
) -> Result<EventIt, EventStoreError>;
limit: usize,
) -> Result<Self::EventIt, EventStoreError>;

/// Queries all Move events belonging to a certain Module ID within a given time window.
/// Will return at most limit of the most recent events within the window, sorted in ascending time.
fn events_by_module_id(
/// Will return at most limit of the most recent events within the window, sorted in descending time.
async fn events_by_module_id(
&self,
start_time: u64,
end_time: u64,
module: ModuleId,
limit: usize,
) -> Result<EventIt, EventStoreError>;
) -> Result<Self::EventIt, EventStoreError>;
}

#[derive(Debug)]
pub enum EventStoreError {
GenericError(Box<dyn std::error::Error>),
SqlError(sqlx::Error),
LimitTooHigh(usize),
}

impl From<sqlx::Error> for EventStoreError {
fn from(err: sqlx::Error) -> Self {
EventStoreError::SqlError(err)
}
}
Loading

0 comments on commit 74d519b

Please sign in to comment.