Skip to content

Commit

Permalink
[indexer-alt] Fix first_checkpoint option (MystenLabs#20079)
Browse files Browse the repository at this point in the history
## Description 

The "first_checkpoint" option is a bit broken.
If first_checkpoint is larger than watermark + 1, for sequential
pipeline this will simply lead to loss of liveness because it always
expects watermark + 1 as the next checkpoint; for concurrent pipeline it
will never be able to update watermark for a similar reason.
This PR adds a check when registering the pipeline to simply not allow
that to happen.

If first_checkpoint is smaller than watermark + 1, the intention must be
that we want to be able to backfill.
However the sequential pipeline will ignore any data that is below the
watermark.
This PR fixes that by still allowing us to commit data even when they
are below watermark.

## Test plan 

CI.
Probably need to add tests too.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Nov 4, 2024
1 parent a0d4c68 commit c538507
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 2 deletions.
34 changes: 33 additions & 1 deletion crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::{collections::BTreeSet, net::SocketAddr, sync::Arc};

use anyhow::{Context, Result};
use anyhow::{ensure, Context, Result};
use db::{Db, DbConfig};
use ingestion::{IngestionConfig, IngestionService};
use metrics::{IndexerMetrics, MetricsService};
Expand Down Expand Up @@ -145,6 +145,13 @@ impl Indexer {
return Ok(());
};

// For a concurrent pipeline, if skip_watermark is set, we don't really care about the
// watermark consistency. first_checkpoint can be anything since we don't update watermark,
// and writes should be idempotent.
if !self.pipeline_config.skip_watermark {
self.check_first_checkpoint_consistency::<H>(&watermark)?;
}

let (processor, collector, committer, watermark) = concurrent::pipeline::<H>(
watermark,
self.pipeline_config.clone(),
Expand Down Expand Up @@ -180,6 +187,10 @@ impl Indexer {
return Ok(());
};

// For a sequential pipeline, data must be written in the order of checkpoints.
// Hence, we do not allow the first_checkpoint override to be in arbitrary positions.
self.check_first_checkpoint_consistency::<H>(&watermark)?;

let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe();

let (processor, committer) = sequential::pipeline::<H>(
Expand All @@ -199,6 +210,27 @@ impl Indexer {
Ok(())
}

/// Checks that the first checkpoint override is consistent with the watermark for the pipeline.
/// If the watermark does not exist, the override can be anything. If the watermark exists, the
/// override must not leave any gap in the data: it can be in the past, or at the tip of the
/// network, but not in the future.
fn check_first_checkpoint_consistency<P: Processor>(
&self,
watermark: &Option<CommitterWatermark>,
) -> Result<()> {
if let (Some(watermark), Some(first_checkpoint)) = (watermark, self.first_checkpoint) {
ensure!(
first_checkpoint as i64 <= watermark.checkpoint_hi_inclusive + 1,
"For pipeline {}, first checkpoint override {} is too far ahead of watermark {}. This could create gaps in the data.",
P::NAME,
first_checkpoint,
watermark.checkpoint_hi_inclusive,
);
}

Ok(())
}

/// Start ingesting checkpoints. Ingestion either starts from the configured
/// `first_checkpoint`, or it is calculated based on the watermarks of all active pipelines.
/// Ingestion will stop after consuming the configured `last_checkpoint`, if one is provided,
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer-alt/src/pipeline/concurrent/watermark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ pub(super) fn watermark<H: Handler + 'static>(
.with_label_values(&[H::NAME])
.start_timer();

// TODO: If initial_watermark is empty, when we update watermark
// for the first time, we should also update the low watermark.
match watermark.update(&mut conn).await {
// If there's an issue updating the watermark, log it but keep going,
// it's OK for the watermark to lag from a correctness perspective.
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct PipelineConfig {

/// Avoid writing to the watermark table
#[arg(long)]
skip_watermark: bool,
pub skip_watermark: bool,
}

/// Processed values associated with a single checkpoint. This is an internal type used to
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer-alt/src/pipeline/sequential/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ pub(super) fn committer<H: Handler + 'static>(
// single transaction. The handler's `commit` implementation is responsible for
// chunking up the writes into a manageable size.
let affected = conn.transaction::<_, anyhow::Error, _>(|conn| async {
// TODO: If initial_watermark is empty, when we update watermark
// for the first time, we should also update the low watermark.
watermark.update(conn).await?;
H::commit(&batch, conn).await
}.scope_boxed()).await;
Expand Down

0 comments on commit c538507

Please sign in to comment.