Skip to content

Commit

Permalink
Track max checkpoint in big query tables (MystenLabs#15670)
Browse files Browse the repository at this point in the history
## Description 

With this PR, we should be able to report max checkpoint watermark in bq
tables which would be used to set up alerts on.
## Test Plan 

Running in test data pipelines and confirming it to be working
  • Loading branch information
sadhansood authored Jan 18, 2024
1 parent 67cfca8 commit f91e54e
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 16 deletions.
108 changes: 100 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-analytics-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ sui-json-rpc-types.workspace = true
sui-package-resolver.workspace = true
workspace-hack.workspace = true
simulacrum.workspace = true
gcp-bigquery-client = "0.18.0"

[dev-dependencies]

Expand Down
9 changes: 8 additions & 1 deletion crates/sui-analytics-indexer/src/analytics_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
#![allow(dead_code)]

use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounterVec,
IntGaugeVec, Registry,
Expand All @@ -11,6 +10,7 @@ use prometheus::{
pub struct AnalyticsMetrics {
pub total_received: IntCounterVec,
pub last_uploaded_checkpoint: IntGaugeVec,
pub max_checkpoint_on_store: IntGaugeVec,
}

impl AnalyticsMetrics {
Expand All @@ -30,6 +30,13 @@ impl AnalyticsMetrics {
registry,
)
.unwrap(),
max_checkpoint_on_store: register_int_gauge_vec_with_registry!(
"max_checkpoint_on_store",
"Max checkpoint on the db table.",
&["data_type"],
registry,
)
.unwrap(),
}
}
}
46 changes: 43 additions & 3 deletions crates/sui-analytics-indexer/src/analytics_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ use std::fs;
use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};

use anyhow::Context;
use anyhow::Result;
use object_store::path::Path;
use object_store::DynObjectStore;
use serde::Serialize;
use tokio::sync::{mpsc, oneshot};
use tracing::info;
use tracing::{error, info};

use sui_indexer::framework::Handler;
use sui_rest_api::CheckpointData;
Expand All @@ -24,7 +24,9 @@ use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use crate::analytics_metrics::AnalyticsMetrics;
use crate::handlers::AnalyticsHandler;
use crate::writers::AnalyticsWriter;
use crate::{AnalyticsIndexerConfig, FileMetadata, ParquetSchema, EPOCH_DIR_PREFIX};
use crate::{
AnalyticsIndexerConfig, FileMetadata, MaxCheckpointReader, ParquetSchema, EPOCH_DIR_PREFIX,
};

pub struct AnalyticsProcessor<S: Serialize + ParquetSchema> {
handler: Box<dyn AnalyticsHandler<S>>,
Expand All @@ -37,6 +39,8 @@ pub struct AnalyticsProcessor<S: Serialize + ParquetSchema> {
sender: mpsc::Sender<FileMetadata>,
#[allow(dead_code)]
kill_sender: oneshot::Sender<()>,
#[allow(dead_code)]
max_checkpoint_sender: oneshot::Sender<()>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -92,6 +96,7 @@ impl<S: Serialize + ParquetSchema + 'static> AnalyticsProcessor<S> {
pub async fn new(
handler: Box<dyn AnalyticsHandler<S>>,
writer: Box<dyn AnalyticsWriter<S>>,
max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
next_checkpoint_seq_num: CheckpointSequenceNumber,
metrics: AnalyticsMetrics,
config: AnalyticsIndexerConfig,
Expand All @@ -116,6 +121,13 @@ impl<S: Serialize + ParquetSchema + 'static> AnalyticsProcessor<S> {
receiver,
kill_receiver,
cloned_metrics,
name.clone(),
));
let (max_checkpoint_sender, max_checkpoint_receiver) = oneshot::channel::<()>();
tokio::task::spawn(Self::setup_max_checkpoint_metrics_updates(
max_checkpoint_reader,
metrics.clone(),
max_checkpoint_receiver,
name,
));
Ok(Self {
Expand All @@ -126,6 +138,7 @@ impl<S: Serialize + ParquetSchema + 'static> AnalyticsProcessor<S> {
last_commit_instant: Instant::now(),
kill_sender,
sender,
max_checkpoint_sender,
metrics,
config,
})
Expand Down Expand Up @@ -227,6 +240,33 @@ impl<S: Serialize + ParquetSchema + 'static> AnalyticsProcessor<S> {
Ok(())
}

async fn setup_max_checkpoint_metrics_updates(
max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
analytics_metrics: AnalyticsMetrics,
mut recv: oneshot::Receiver<()>,
handler_name: String,
) -> Result<()> {
let mut interval = tokio::time::interval(Duration::from_secs(300));
loop {
tokio::select! {
_ = &mut recv => break,
_ = interval.tick() => {
let max_checkpoint = max_checkpoint_reader.max_checkpoint().await;
if let Ok(max_checkpoint) = max_checkpoint {
analytics_metrics
.max_checkpoint_on_store
.with_label_values(&[&handler_name])
.set(max_checkpoint);
} else {
error!("Failed to read max checkpoint for {} with err: {}", handler_name, max_checkpoint.unwrap_err());
}

}
}
}
Ok(())
}

async fn sync_file_to_remote(
dir: PathBuf,
path: Path,
Expand Down
Loading

0 comments on commit f91e54e

Please sign in to comment.