Skip to content

Commit

Permalink
[data ingestion] change remote reader implementation (MystenLabs#16469)
Browse files Browse the repository at this point in the history
PR makes remote fetch implementation more similar to the indexer
implementation
  • Loading branch information
phoenix-o authored Mar 13, 2024
1 parent 58e3a06 commit b6f48fa
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-data-ingestion-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ sui-storage.workspace = true
sui-types.workspace = true
url.workspace = true
tempfile.workspace = true
tap.workspace = true

[dev-dependencies]
rand.workspace = true
Expand Down
144 changes: 96 additions & 48 deletions crates/sui-data-ingestion-core/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,25 @@

use crate::executor::MAX_CHECKPOINTS_IN_PROGRESS;
use anyhow::Result;
use futures::future::try_join_all;
use futures::StreamExt;
use mysten_metrics::spawn_monitored_task;
use notify::RecursiveMode;
use notify::Watcher;
use object_store::path::Path;
use object_store::{parse_url_opts, ObjectStore};
use object_store::{parse_url_opts, ObjectStore, RetryConfig};
use std::ffi::OsString;
use std::fs;
use std::path::PathBuf;
use std::time::Duration;
use sui_storage::blob::Blob;
use sui_types::full_checkpoint_content::CheckpointData;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tap::pipe::Pipe;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tracing::{debug, info};
use tracing::{debug, error, info};
use url::Url;

pub(crate) const ENV_VAR_LOCAL_READ_TIMEOUT_MS: &str = "LOCAL_READ_TIMEOUT_MS";
Expand All @@ -28,12 +31,14 @@ pub(crate) const ENV_VAR_LOCAL_READ_TIMEOUT_MS: &str = "LOCAL_READ_TIMEOUT_MS";
/// This implementation is push-based and utilizes the inotify API.
pub struct CheckpointReader {
path: PathBuf,
remote_store: Option<Box<dyn ObjectStore>>,
remote_store_url: Option<String>,
remote_store_options: Vec<(String, String)>,
remote_read_batch_size: usize,
current_checkpoint_number: CheckpointSequenceNumber,
last_pruned_watermark: CheckpointSequenceNumber,
checkpoint_sender: mpsc::Sender<CheckpointData>,
processed_receiver: mpsc::Receiver<CheckpointSequenceNumber>,
remote_fetcher_receiver: Option<mpsc::Receiver<Result<CheckpointData>>>,
exit_receiver: oneshot::Receiver<()>,
}

Expand Down Expand Up @@ -61,31 +66,86 @@ impl CheckpointReader {
Ok(checkpoints)
}

async fn remote_fetch(&self) -> Result<Vec<CheckpointData>> {
fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
(MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number
}

async fn remote_fetch_checkpoint(
store: &dyn ObjectStore,
checkpoint_number: CheckpointSequenceNumber,
) -> Result<CheckpointData> {
let path = Path::from(format!("{}.chk", checkpoint_number));
let response = store.get(&path).await?;
let bytes = response.bytes().await?;
Blob::from_bytes::<CheckpointData>(&bytes)
}

fn start_remote_fetcher(&mut self) -> mpsc::Receiver<Result<CheckpointData>> {
let batch_size = self.remote_read_batch_size;
let start_checkpoint = self.current_checkpoint_number;
let (sender, receiver) = mpsc::channel(batch_size);
let url = self
.remote_store_url
.clone()
.expect("remote store url must be set");
let store = if self.remote_store_options.is_empty() {
let retry_config = RetryConfig {
max_retries: 0,
retry_timeout: Duration::from_secs(10),
..Default::default()
};
let http_store = object_store::http::HttpBuilder::new()
.with_url(url)
.with_retry(retry_config)
.build()
.expect("failed to parse remote store config");
Box::new(http_store)
} else {
parse_url_opts(
&Url::parse(&url).expect("failed to parse remote store url"),
self.remote_store_options.clone(),
)
.expect("failed to parse remote store config")
.0
};

spawn_monitored_task!(async move {
let mut checkpoint_stream = (start_checkpoint..u64::MAX)
.map(|checkpoint_number| Self::remote_fetch_checkpoint(&store, checkpoint_number))
.pipe(futures::stream::iter)
.buffered(batch_size);

while let Some(checkpoint) = checkpoint_stream.next().await {
if sender.send(checkpoint).await.is_err() {
break;
}
}
});
receiver
}

fn remote_fetch(&mut self) -> Vec<CheckpointData> {
let mut checkpoints = vec![];
if let Some(ref store) = self.remote_store {
let limit = std::cmp::min(
self.current_checkpoint_number + self.remote_read_batch_size as u64,
self.last_pruned_watermark + MAX_CHECKPOINTS_IN_PROGRESS as u64,
);
let futures =
(self.current_checkpoint_number..limit).map(|checkpoint_number| async move {
let path = Path::from(format!("{}.chk", checkpoint_number));
match store.get(&path).await {
Ok(resp) => resp.bytes().await.map(Some),
Err(err) if err.to_string().contains("404") => Ok(None),
Err(err) => Err(err),
}
});
for bytes in try_join_all(futures).await? {
if bytes.is_none() {
if self.remote_fetcher_receiver.is_none() {
self.remote_fetcher_receiver = Some(self.start_remote_fetcher());
}
while !self.exceeds_capacity(self.current_checkpoint_number + checkpoints.len() as u64) {
match self.remote_fetcher_receiver.as_mut().unwrap().try_recv() {
Ok(Ok(checkpoint)) => checkpoints.push(checkpoint),
Ok(Err(err)) => {
error!("remote reader transient error {:?}", err);
self.remote_fetcher_receiver = None;
break;
}
let checkpoint = Blob::from_bytes::<CheckpointData>(&bytes.unwrap())?;
checkpoints.push(checkpoint);
Err(TryRecvError::Disconnected) => {
error!("remote reader channel disconnect error");
self.remote_fetcher_receiver = None;
break;
}
Err(TryRecvError::Empty) => break,
}
}
Ok(checkpoints)
checkpoints
}

async fn sync(&mut self) -> Result<()> {
Expand All @@ -97,10 +157,15 @@ impl CheckpointReader {
})
.await?;

if checkpoints.is_empty()
|| checkpoints[0].checkpoint_summary.sequence_number > self.current_checkpoint_number
if self.remote_store_url.is_some()
&& (checkpoints.is_empty()
|| checkpoints[0].checkpoint_summary.sequence_number
> self.current_checkpoint_number)
{
checkpoints = self.remote_fetch().await?;
checkpoints = self.remote_fetch();
} else {
// cancel remote fetcher execution because local reader has made progress
self.remote_fetcher_receiver = None;
}

info!(
Expand All @@ -112,9 +177,7 @@ impl CheckpointReader {
checkpoint.checkpoint_summary.sequence_number,
self.current_checkpoint_number
);
if (MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark)
<= checkpoint.checkpoint_summary.sequence_number
{
if self.exceeds_capacity(checkpoint.checkpoint_summary.sequence_number) {
break;
}
self.checkpoint_sender.send(checkpoint).await?;
Expand Down Expand Up @@ -161,31 +224,16 @@ impl CheckpointReader {
let (checkpoint_sender, checkpoint_recv) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
let (processed_sender, processed_receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
let (exit_sender, exit_receiver) = oneshot::channel();
let remote_store = remote_store_url.map(|url| {
if remote_store_options.is_empty() {
let builder = object_store::http::HttpBuilder::new().with_url(url);
Box::new(
builder
.build()
.expect("failed to parse remote store config"),
)
} else {
parse_url_opts(
&Url::parse(&url).expect("failed to parse remote store url"),
remote_store_options,
)
.expect("failed to parse remote store config")
.0
}
});
let reader = Self {
path,
remote_store,
remote_store_url,
remote_store_options,
current_checkpoint_number: starting_checkpoint_number,
last_pruned_watermark: starting_checkpoint_number,
checkpoint_sender,
processed_receiver,
remote_read_batch_size,
remote_fetcher_receiver: None,
exit_receiver,
};
(reader, checkpoint_recv, processed_sender, exit_sender)
Expand Down

0 comments on commit b6f48fa

Please sign in to comment.