Skip to content

Commit

Permalink
indexer-alt: sequential pipeline concurrent update/delete (MystenLabs…
Browse files Browse the repository at this point in the history
…#20383)

## Description

Change how updates and deletes are handled in the sequential pipelines
that need to delete rows such that the inserts and deletes can run
concurrently with each other -- every little helps. This is based on a
comment from @gegaowp on an earlier PR.

## Test plan

Run the indexer locally on the first 100,000 checkpoints.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
amnn authored Nov 22, 2024
1 parent c57d97c commit 7510d6a
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 47 deletions.
50 changes: 26 additions & 24 deletions crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use anyhow::{anyhow, bail, ensure};
use diesel::{upsert::excluded, ExpressionMethods};
use diesel_async::RunQueryDsl;
use futures::future::try_join_all;
use futures::future::{try_join_all, Either};
use sui_types::{
base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData,
object::Owner,
Expand Down Expand Up @@ -159,30 +159,32 @@ impl Handler for SumCoinBalances {
}
}

let update_chunks = updates.chunks(UPDATE_CHUNK_ROWS).map(|chunk| {
diesel::insert_into(sum_coin_balances::table)
.values(chunk)
.on_conflict(sum_coin_balances::object_id)
.do_update()
.set((
sum_coin_balances::object_version
.eq(excluded(sum_coin_balances::object_version)),
sum_coin_balances::owner_id.eq(excluded(sum_coin_balances::owner_id)),
sum_coin_balances::coin_balance.eq(excluded(sum_coin_balances::coin_balance)),
))
.execute(conn)
let update_chunks = updates.chunks(UPDATE_CHUNK_ROWS).map(Either::Left);
let delete_chunks = deletes.chunks(DELETE_CHUNK_ROWS).map(Either::Right);

let futures = update_chunks.chain(delete_chunks).map(|chunk| match chunk {
Either::Left(update) => Either::Left(
diesel::insert_into(sum_coin_balances::table)
.values(update)
.on_conflict(sum_coin_balances::object_id)
.do_update()
.set((
sum_coin_balances::object_version
.eq(excluded(sum_coin_balances::object_version)),
sum_coin_balances::owner_id.eq(excluded(sum_coin_balances::owner_id)),
sum_coin_balances::coin_balance
.eq(excluded(sum_coin_balances::coin_balance)),
))
.execute(conn),
),

Either::Right(delete) => Either::Right(
diesel::delete(sum_coin_balances::table)
.filter(sum_coin_balances::object_id.eq_any(delete.iter().cloned()))
.execute(conn),
),
});

let updated: usize = try_join_all(update_chunks).await?.into_iter().sum();

let delete_chunks = deletes.chunks(DELETE_CHUNK_ROWS).map(|chunk| {
diesel::delete(sum_coin_balances::table)
.filter(sum_coin_balances::object_id.eq_any(chunk.iter().cloned()))
.execute(conn)
});

let deleted: usize = try_join_all(delete_chunks).await?.into_iter().sum();

Ok(updated + deleted)
Ok(try_join_all(futures).await?.into_iter().sum())
}
}
47 changes: 24 additions & 23 deletions crates/sui-indexer-alt/src/handlers/sum_obj_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use anyhow::{anyhow, ensure};
use diesel::{upsert::excluded, ExpressionMethods};
use diesel_async::RunQueryDsl;
use futures::future::try_join_all;
use futures::future::{try_join_all, Either};
use sui_types::{
base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData,
object::Owner,
Expand Down Expand Up @@ -156,29 +156,30 @@ impl Handler for SumObjTypes {
}
}

let update_chunks = updates.chunks(UPDATE_CHUNK_ROWS).map(|chunk| {
diesel::insert_into(sum_obj_types::table)
.values(chunk)
.on_conflict(sum_obj_types::object_id)
.do_update()
.set((
sum_obj_types::object_version.eq(excluded(sum_obj_types::object_version)),
sum_obj_types::owner_kind.eq(excluded(sum_obj_types::owner_kind)),
sum_obj_types::owner_id.eq(excluded(sum_obj_types::owner_id)),
))
.execute(conn)
let update_chunks = updates.chunks(UPDATE_CHUNK_ROWS).map(Either::Left);
let delete_chunks = deletes.chunks(DELETE_CHUNK_ROWS).map(Either::Right);

let futures = update_chunks.chain(delete_chunks).map(|chunk| match chunk {
Either::Left(update) => Either::Left(
diesel::insert_into(sum_obj_types::table)
.values(update)
.on_conflict(sum_obj_types::object_id)
.do_update()
.set((
sum_obj_types::object_version.eq(excluded(sum_obj_types::object_version)),
sum_obj_types::owner_kind.eq(excluded(sum_obj_types::owner_kind)),
sum_obj_types::owner_id.eq(excluded(sum_obj_types::owner_id)),
))
.execute(conn),
),

Either::Right(delete) => Either::Right(
diesel::delete(sum_obj_types::table)
.filter(sum_obj_types::object_id.eq_any(delete.iter().cloned()))
.execute(conn),
),
});

let updated: usize = try_join_all(update_chunks).await?.into_iter().sum();

let delete_chunks = deletes.chunks(DELETE_CHUNK_ROWS).map(|chunk| {
diesel::delete(sum_obj_types::table)
.filter(sum_obj_types::object_id.eq_any(chunk.iter().cloned()))
.execute(conn)
});

let deleted: usize = try_join_all(delete_chunks).await?.into_iter().sum();

Ok(updated + deleted)
Ok(try_join_all(futures).await?.into_iter().sum())
}
}

0 comments on commit 7510d6a

Please sign in to comment.