Skip to content

Commit

Permalink
indexer: make event commit atomic and append only (MystenLabs#7524)
Browse files Browse the repository at this point in the history
### problem and theory
I found the event indexer's throughput will go down along with growth of
event table, one reason is that today upon event commit, we do de-dup
like below, which will be more and more expensive along with table
growth
```
.on_conflict((transaction_digest, event_sequence))
.do_nothing()
```
the reason that we need this de-dup is that, we commit event and event
logs in 2 steps (so not atomic), first commit events, second commit
event log. However indexer itself can stop / crash in between, thus to
avoid writing duplicate events to event table, we want to de-dup here.

### solution in this PR
This PR merges the events and event_log table such that we only need one
DB commit for events thus it is atomic, in that sense we do not have to
count on DB de-dup and can do append only.

This should help if not resolve the event throughput issue.

### Test
- run indexer with local validator and make sure events can be populated
- stop indexer, check DB and make sure that last event indeed has next
cursor info
- restart indexer and make sure that indexer can proceed with new events
  • Loading branch information
gegaowp authored Jan 19, 2023
1 parent a21e19d commit df15ffe
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 131 deletions.
5 changes: 3 additions & 2 deletions crates/sui-indexer/migrations/2022-11-18-220045_events/up.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
CREATE TABLE EVENTS (
id BIGSERIAL PRIMARY KEY,
-- below 2 are from Event ID, tx_digest and event_seq
transaction_digest VARCHAR(255),
transaction_digest VARCHAR(255) NOT NULL,
event_sequence BIGINT NOT NULL,
event_time TIMESTAMP,
event_type VARCHAR NOT NULL,
event_content VARCHAR NOT NULL,
UNIQUE (transaction_digest, event_sequence)
next_cursor_transaction_digest VARCHAR(255),
next_cursor_event_sequence BIGINT
);
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
DROP TABLE event_logs;
DROP TABLE transaction_logs;
9 changes: 0 additions & 9 deletions crates/sui-indexer/migrations/2022-11-22-235415_logs/up.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
CREATE TABLE event_logs (
id SERIAL PRIMARY KEY,
next_cursor_tx_dig TEXT,
next_cursor_event_seq BIGINT
);

CREATE TABLE transaction_logs (
id SERIAL PRIMARY KEY,
next_cursor_tx_digest TEXT
);

INSERT INTO event_logs (id, next_cursor_tx_dig, next_cursor_event_seq) VALUES
(1, NULL, NULL);

INSERT INTO transaction_logs (id, next_cursor_tx_digest) VALUES
(1, NULL);
79 changes: 43 additions & 36 deletions crates/sui-indexer/src/handlers/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ use sui_sdk::SuiClient;
use sui_types::event::EventID;
use sui_types::query::EventQuery;
use tokio::time::sleep;
use tracing::info;
use tracing::{error, info};

use sui_indexer::errors::IndexerError;
use sui_indexer::metrics::IndexerEventHandlerMetrics;
use sui_indexer::models::event_logs::{commit_event_log, read_event_log};
use sui_indexer::models::events::commit_events;
use sui_indexer::models::events::{commit_events, read_last_event};
use sui_indexer::{get_pg_pool_connection, PgConnectionPool};

const EVENT_PAGE_SIZE: usize = 100;
Expand Down Expand Up @@ -44,22 +43,34 @@ impl EventHandler {
let mut pg_pool_conn = get_pg_pool_connection(self.pg_connection_pool.clone())?;

let mut next_cursor = None;
let event_log = read_event_log(&mut pg_pool_conn)?;
let (tx_dig_opt, event_seq_opt) = (
event_log.next_cursor_tx_dig,
event_log.next_cursor_event_seq,
);
if let (Some(tx_dig), Some(event_seq)) = (tx_dig_opt, event_seq_opt) {
let tx_digest = tx_dig.parse().map_err(|e| {
IndexerError::TransactionDigestParsingError(format!(
"Failed parsing transaction digest {:?} with error: {:?}",
tx_dig, e
))
})?;
next_cursor = Some(EventID {
tx_digest,
event_seq,
});
let last_event_opt = read_last_event(&mut pg_pool_conn)?;
if let Some(last_event) = last_event_opt {
match (
last_event.next_cursor_transaction_digest,
last_event.next_cursor_event_sequence,
) {
(Some(tx_digest_str), Some(event_seq)) => {
let tx_digest = tx_digest_str.parse().map_err(|e| {
IndexerError::TransactionDigestParsingError(format!(
"Failed parsing transaction digest {:?} with error: {:?}",
tx_digest_str, e
))
})?;
next_cursor = Some(EventID {
tx_digest,
event_seq,
});
}
(Some(_), None) => {
error!("Last event was found but it has no next cursor event sequence, this should never happen!");
}
(None, Some(_)) => {
error!("Last event was found but it has no next cursor tx digest, this should never happen!");
}
(None, None) => {
error!("Last event was found but it has no next cursor tx digest and no next cursor event sequence, this should never happen!");
}
}
}

loop {
Expand All @@ -76,26 +87,22 @@ impl EventHandler {
self.event_handler_metrics
.total_events_received
.inc_by(event_count as u64);
commit_events(&mut pg_pool_conn, event_page.clone())?;
// Event page's next cursor can be None when latest event page is reached,
// if we use the None cursor to read events, it will start from genesis,
// thus here we do not commit / use the None cursor.
// This will cause duplicate run of the current batch, but will not cause
// duplicate rows b/c of the uniqueness restriction of the table.
if let Some(next_cursor_val) = event_page.next_cursor.clone() {
commit_event_log(
&mut pg_pool_conn,
Some(next_cursor_val.tx_digest.base58_encode()),
Some(next_cursor_val.event_seq),
)?;
let commit_result = commit_events(&mut pg_pool_conn, event_page.clone())?;

if let Some((commit_count, next_cursor_val)) = commit_result {
next_cursor = Some(next_cursor_val);

self.event_handler_metrics.total_event_page_committed.inc();
self.event_handler_metrics
.total_events_processed
.inc_by(event_count as u64);
next_cursor = Some(next_cursor_val);
.inc_by(commit_count as u64);
info!(
"Committed {} events, next cursor: {:?}",
commit_count, next_cursor
);
}
self.event_handler_metrics.total_event_page_committed.inc();
// sleep when the event page has been the latest page
if event_count < EVENT_PAGE_SIZE || event_page.next_cursor.is_none() {

if event_page.next_cursor.is_none() {
sleep(Duration::from_secs_f32(0.1)).await;
}
}
Expand Down
58 changes: 0 additions & 58 deletions crates/sui-indexer/src/models/event_logs.rs

This file was deleted.

83 changes: 69 additions & 14 deletions crates/sui-indexer/src/models/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@
use crate::errors::IndexerError;
use crate::schema::events;
use crate::schema::events::dsl::{events as events_table, id};
use crate::schema::events::{event_sequence, transaction_digest};
use crate::utils::log_errors_to_pg;
use crate::PgPoolConnection;

use chrono::NaiveDateTime;
use diesel::prelude::*;
use diesel::result::Error;
use sui_json_rpc_types::{EventPage, SuiEvent, SuiEventEnvelope};
use sui_types::event::EventID;

#[derive(Queryable, Debug)]
pub struct Event {
pub id: i64,
pub transaction_digest: Option<String>,
pub transaction_digest: String,
pub event_sequence: i64,
pub event_time: Option<NaiveDateTime>,
pub event_type: String,
pub event_content: String,
pub next_cursor_transaction_digest: Option<String>,
pub next_cursor_event_sequence: Option<i64>,
}

#[derive(Debug, Insertable)]
Expand All @@ -31,6 +33,8 @@ pub struct NewEvent {
pub event_time: Option<NaiveDateTime>,
pub event_type: String,
pub event_content: String,
pub next_cursor_transaction_digest: Option<String>,
pub next_cursor_event_sequence: Option<i64>,
}

pub fn read_events(
Expand All @@ -56,6 +60,23 @@ pub fn read_events(
})
}

pub fn read_last_event(pg_pool_conn: &mut PgPoolConnection) -> Result<Option<Event>, IndexerError> {
let event_read_result: Result<Option<Event>, Error> = pg_pool_conn
.build_transaction()
.read_only()
.run::<_, Error, _>(|conn| {
events_table
.order(id.desc())
.limit(1)
.load::<Event>(conn)
.map(|mut events| events.pop())
});

event_read_result.map_err(|e| {
IndexerError::PostgresReadError(format!("Failed reading last event with error: {:?}", e))
})
}

// NOTE: no need to retry here b/c errors here are not transient,
// instead we write them to PG tables for debugging purposes.
pub fn event_to_new_event(e: SuiEventEnvelope) -> Result<NewEvent, IndexerError> {
Expand All @@ -78,40 +99,74 @@ pub fn event_to_new_event(e: SuiEventEnvelope) -> Result<NewEvent, IndexerError>
event_time: Some(timestamp),
event_type: e.event.get_event_type(),
event_content: event_json,
next_cursor_transaction_digest: None,
next_cursor_event_sequence: None,
})
}

pub fn commit_events(
pg_pool_conn: &mut PgPoolConnection,
event_page: EventPage,
) -> Result<usize, IndexerError> {
) -> Result<Option<(usize, EventID)>, IndexerError> {
let events = event_page.data;
let mut errors = vec![];
let new_events: Vec<NewEvent> = events
let mut new_events: Vec<NewEvent> = events
.into_iter()
.map(event_to_new_event)
.filter_map(|r| r.map_err(|e| errors.push(e)).ok())
.collect();

log_errors_to_pg(pg_pool_conn, errors);

// No op when there is no more than 1 event, which has been left
// as next cursor from last iteration.
if new_events.len() <= 1 {
return Ok(None);
}
let next_cursor: EventID;
if let Some(next_cursor_val) = event_page.next_cursor {
next_cursor = next_cursor_val.clone();
// unwrap is safe because we already checked the length of new_events
let mut last_event = new_events.pop().unwrap();
last_event.next_cursor_transaction_digest = Some(next_cursor_val.tx_digest.base58_encode());
last_event.next_cursor_event_sequence = Some(next_cursor_val.event_seq);
new_events.push(last_event);
} else {
// unwrap here are safe because we already checked the length of new_events
let next_cursor_event = new_events.pop().unwrap();
let mut last_event = new_events.pop().unwrap();
last_event.next_cursor_transaction_digest =
Some(next_cursor_event.transaction_digest.clone());
last_event.next_cursor_event_sequence = Some(next_cursor_event.event_sequence);
new_events.push(last_event);

let tx_digest = next_cursor_event.transaction_digest.parse().map_err(|e| {
IndexerError::TransactionDigestParsingError(format!(
"Failed parsing transaction digest {:?} with error: {:?}",
next_cursor_event.transaction_digest, e
))
})?;
next_cursor = EventID {
tx_digest,
event_seq: next_cursor_event.event_sequence,
};
}

let event_commit_result: Result<usize, Error> = pg_pool_conn
.build_transaction()
.read_write()
.run::<_, Error, _>(|conn| {
diesel::insert_into(events::table)
.values(&new_events)
.on_conflict((transaction_digest, event_sequence))
.do_nothing()
.execute(conn)
});

event_commit_result.map_err(|e| {
IndexerError::PostgresWriteError(format!(
"Failed writing events to PostgresDB with events {:?} and error: {:?}",
new_events, e
))
})
event_commit_result
.map(|commit_row_count| Some((commit_row_count, next_cursor)))
.map_err(|e| {
IndexerError::PostgresWriteError(format!(
"Failed writing events to PostgresDB with events {:?} and error: {:?}",
new_events, e
))
})
}

pub fn events_to_sui_events(
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pub mod addresses;
pub mod checkpoint_logs;
pub mod checkpoints;
pub mod error_logs;
pub mod event_logs;
pub mod events;
pub mod object_logs;
pub mod objects;
Expand Down
13 changes: 3 additions & 10 deletions crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,16 @@ diesel::table! {
}
}

diesel::table! {
event_logs (id) {
id -> Int4,
next_cursor_tx_dig -> Nullable<Text>,
next_cursor_event_seq -> Nullable<Int8>,
}
}

diesel::table! {
events (id) {
id -> Int8,
transaction_digest -> Nullable<Varchar>,
transaction_digest -> Varchar,
event_sequence -> Int8,
event_time -> Nullable<Timestamp>,
event_type -> Varchar,
event_content -> Varchar,
next_cursor_transaction_digest -> Nullable<Varchar>,
next_cursor_event_sequence -> Nullable<Int8>,
}
}

Expand Down Expand Up @@ -176,7 +170,6 @@ diesel::allow_tables_to_appear_in_same_query!(
checkpoint_logs,
checkpoints,
error_logs,
event_logs,
events,
object_event_logs,
object_events,
Expand Down

0 comments on commit df15ffe

Please sign in to comment.