Skip to content

Commit

Permalink
[data ingestion] update docs (MystenLabs#16595)
Browse files Browse the repository at this point in the history
update existing docs:
* add more examples
* fix diagrams
* add documentation for a helper function to setup a single pipeline
workflow

---------

Co-authored-by: ronny-mysten <[email protected]>
  • Loading branch information
phoenix-o and ronny-mysten authored Mar 11, 2024
1 parent 5e5345a commit bacd3d4
Showing 1 changed file with 33 additions and 5 deletions.
38 changes: 33 additions & 5 deletions docs/content/guides/developer/advanced/custom-indexer.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ The most straightforward stream source is to subscribe to a remote store of chec

```mermaid
flowchart LR
A("fa:fa-cloud Cloud storage (S3, GCP)");
A("fa:fa-cloud Cloud storage(S3, GCP)");
B[("fa:fa-gears
Indexer
daemon")];
Expand Down Expand Up @@ -64,7 +64,7 @@ checkpoint-executor-config:
```mermaid
flowchart LR
subgraph Sui
A("fa:fa-server Full node binary");
A("fa:fa-server Full node");
A-->B("fa:fa-folder Local directory");
B-->C[("fa:fa-gears
Indexer
Expand All @@ -85,10 +85,36 @@ flowchart LR
Specify both a local and remote store as a fallback to ensure constant data flow. The framework always prioritizes locally available checkpoint data over remote data. It's useful when you want to start utilizing your own Full node for data ingestion but need to partially backfill historical data or just have a failover.


## Example
## Examples

Here's a basic example of a custom data ingestion worker.
The Sui data ingestion framework provides a helper function to quickly bootstrap an indexer workflow.
```rust
struct CustomWorker;

#[async_trait]
impl Worker for CustomWorker {
async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()> {
// custom processing logic
...
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<()> {
let (executor, term_sender) = setup_single_workflow(
CustomWorker,
"https://checkpoints.mainnet.sui.io".to_string(),
0, /* initial checkpoint number */
5, /* concurrency */
).await?;
executor.await?;
Ok(())
}
```
This is suitable for setups with a single ingestion pipeline where progress tracking is managed outside of the framework.

For more complex setups, refer to the following example:
```rust
struct CustomWorker;

Expand Down Expand Up @@ -130,5 +156,7 @@ The data ingestion executor can run multiple workflows simultaneously. For each

The concurrency parameter specifies how many threads the workflow uses. Having a concurrency value greater than 1 is helpful when tasks are idempotent and can be processed in parallel and out of order. The executor only updates the progress/watermark to a certain checkpoint when all preceding checkpoints are processed.

Find more examples of custom ingestion pipelines in the [Sui repository](https://github.com/MystenLabs/sui/tree/main/crates/sui-data-ingestion/src/workers).
Find more examples of custom ingestion pipelines in the Sui repository:
* Sui data ingestion daemon that runs internal [pipelines](https://github.com/MystenLabs/sui/tree/main/crates/sui-data-ingestion/src/).
* Sui Name Service's custom [indexer](https://github.com/MystenLabs/sui/tree/main/crates/suins-indexer/src).

0 comments on commit bacd3d4

Please sign in to comment.