Skip to content

Commit 7dfb82f

Browse files
authored
feat(store-indexer): add latest stored block number to metrics (latticexyz#2740)
1 parent 82ada7a commit 7dfb82f

File tree

3 files changed

+98
-40
lines changed

3 files changed

+98
-40
lines changed

packages/store-indexer/bin/postgres-indexer.ts

+25-18
Original file line numberDiff line numberDiff line change
@@ -46,25 +46,30 @@ const { storageAdapter, tables } = await createStorageAdapter({ database, public
4646

4747
let startBlock = env.START_BLOCK;
4848

49-
// Resume from latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error.
50-
// TODO: query if the DB exists instead of try/catch
51-
try {
52-
const chainState = await database
53-
.select()
54-
.from(tables.configTable)
55-
.where(eq(tables.configTable.chainId, chainId))
56-
.limit(1)
57-
.execute()
58-
// Get the first record in a way that returns a possible `undefined`
59-
// TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true`
60-
.then((rows) => rows.find(() => true));
61-
62-
if (chainState?.blockNumber != null) {
63-
startBlock = chainState.blockNumber + 1n;
64-
console.log("resuming from block number", startBlock);
49+
async function getLatestStoredBlockNumber(): Promise<bigint | undefined> {
50+
// Fetch latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error.
51+
// TODO: query if the DB exists instead of try/catch
52+
try {
53+
const chainState = await database
54+
.select()
55+
.from(tables.configTable)
56+
.where(eq(tables.configTable.chainId, chainId))
57+
.limit(1)
58+
.execute()
59+
// Get the first record in a way that returns a possible `undefined`
60+
// TODO: move this to `.findFirst` after upgrading drizzle or `rows[0]` after enabling `noUncheckedIndexedAccess: true`
61+
.then((rows) => rows.find(() => true));
62+
63+
return chainState?.blockNumber;
64+
} catch (error) {
65+
// ignore errors for now
6566
}
66-
} catch (error) {
67-
// ignore errors for now
67+
}
68+
69+
const latestStoredBlockNumber = await getLatestStoredBlockNumber();
70+
if (latestStoredBlockNumber != null) {
71+
startBlock = latestStoredBlockNumber + 1n;
72+
console.log("resuming from block number", startBlock);
6873
}
6974

7075
const { latestBlockNumber$, storedBlockLogs$ } = await createStoreSync({
@@ -111,6 +116,8 @@ if (env.HEALTHCHECK_HOST != null || env.HEALTHCHECK_PORT != null) {
111116
metrics({
112117
isHealthy: () => true,
113118
isReady: () => isCaughtUp,
119+
getLatestStoredBlockNumber,
120+
followBlockTag: env.FOLLOW_BLOCK_TAG,
114121
}),
115122
);
116123
server.use(helloWorld());

packages/store-indexer/bin/sqlite-indexer.ts

+42-21
Original file line numberDiff line numberDiff line change
@@ -48,29 +48,48 @@ const database = drizzle(new Database(env.SQLITE_FILENAME));
4848

4949
let startBlock = env.START_BLOCK;
5050

51-
// Resume from latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error.
52-
try {
53-
const currentChainStates = database.select().from(chainState).where(eq(chainState.chainId, chainId)).all();
54-
// TODO: replace this type workaround with `noUncheckedIndexedAccess: true` when we can fix all the issues related (https://github.com/latticexyz/mud/issues/1212)
55-
const currentChainState: (typeof currentChainStates)[number] | undefined = currentChainStates[0];
56-
57-
if (currentChainState != null) {
58-
if (currentChainState.schemaVersion != schemaVersion) {
59-
console.log(
60-
"schema version changed from",
61-
currentChainState.schemaVersion,
62-
"to",
63-
schemaVersion,
64-
"recreating database",
65-
);
66-
fs.truncateSync(env.SQLITE_FILENAME);
67-
} else if (currentChainState.lastUpdatedBlockNumber != null) {
68-
console.log("resuming from block number", currentChainState.lastUpdatedBlockNumber + 1n);
69-
startBlock = currentChainState.lastUpdatedBlockNumber + 1n;
51+
async function getCurrentChainState(): Promise<
52+
| {
53+
schemaVersion: number;
54+
chainId: number;
55+
lastUpdatedBlockNumber: bigint | null;
56+
lastError: string | null;
7057
}
58+
| undefined
59+
> {
60+
// This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error.
61+
try {
62+
const currentChainStates = database.select().from(chainState).where(eq(chainState.chainId, chainId)).all();
63+
// TODO: replace this type workaround with `noUncheckedIndexedAccess: true` when we can fix all the issues related (https://github.com/latticexyz/mud/issues/1212)
64+
const currentChainState: (typeof currentChainStates)[number] | undefined = currentChainStates[0];
65+
return currentChainState;
66+
} catch (error) {
67+
// ignore errors, this is optional
68+
}
69+
}
70+
71+
async function getLatestStoredBlockNumber(): Promise<bigint | undefined> {
72+
const currentChainState = await getCurrentChainState();
73+
return currentChainState?.lastUpdatedBlockNumber ?? undefined;
74+
}
75+
76+
const currentChainState = await getCurrentChainState();
77+
if (currentChainState) {
78+
// Reset the db if the version changed
79+
if (currentChainState.schemaVersion != schemaVersion) {
80+
console.log(
81+
"schema version changed from",
82+
currentChainState.schemaVersion,
83+
"to",
84+
schemaVersion,
85+
"recreating database",
86+
);
87+
fs.truncateSync(env.SQLITE_FILENAME);
88+
} else if (currentChainState.lastUpdatedBlockNumber != null) {
89+
// Resume from latest block stored in DB. This will throw if the DB doesn't exist yet, so we wrap in a try/catch and ignore the error.
90+
console.log("resuming from block number", currentChainState.lastUpdatedBlockNumber + 1n);
91+
startBlock = currentChainState.lastUpdatedBlockNumber + 1n;
7192
}
72-
} catch (error) {
73-
// ignore errors, this is optional
7493
}
7594

7695
const { latestBlockNumber$, storedBlockLogs$ } = await syncToSqlite({
@@ -112,6 +131,8 @@ server.use(
112131
metrics({
113132
isHealthy: () => true,
114133
isReady: () => isCaughtUp,
134+
getLatestStoredBlockNumber,
135+
followBlockTag: env.FOLLOW_BLOCK_TAG,
115136
}),
116137
);
117138
server.use(helloWorld());

packages/store-indexer/src/koa-middleware/metrics.ts

+31-1
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,19 @@ import promClient from "prom-client";
44
type MetricsOptions = {
55
isHealthy?: () => boolean;
66
isReady?: () => boolean;
7+
getLatestStoredBlockNumber?: () => Promise<bigint | undefined>;
8+
followBlockTag?: "latest" | "safe" | "finalized";
79
};
810

911
/**
1012
* Middleware to add Prometheus metrics endpoints
1113
*/
12-
export function metrics({ isHealthy, isReady }: MetricsOptions = {}): Middleware {
14+
export function metrics({
15+
isHealthy,
16+
isReady,
17+
getLatestStoredBlockNumber,
18+
followBlockTag,
19+
}: MetricsOptions = {}): Middleware {
1320
promClient.collectDefaultMetrics();
1421
if (isHealthy != null) {
1522
new promClient.Gauge({
@@ -31,6 +38,29 @@ export function metrics({ isHealthy, isReady }: MetricsOptions = {}): Middleware
3138
});
3239
}
3340

41+
if (getLatestStoredBlockNumber != null) {
42+
new promClient.Gauge({
43+
name: "latest_stored_block_number",
44+
help: "Latest block number stored in the database",
45+
async collect(): Promise<void> {
46+
this.set(Number(await getLatestStoredBlockNumber()));
47+
},
48+
});
49+
}
50+
51+
if (followBlockTag != null) {
52+
const blockTagGauge = new promClient.Gauge({
53+
name: "follow_block_tag",
54+
help: "Block tag the indexer is following (0 = finalized, 1 = safe, 2 = latest)",
55+
});
56+
const blockTagToValue = {
57+
finalized: 0,
58+
safe: 1,
59+
latest: 2,
60+
};
61+
blockTagGauge.set(blockTagToValue[followBlockTag]);
62+
}
63+
3464
return async function metricsMiddleware(ctx, next): Promise<void> {
3565
if (ctx.path === "/metrics") {
3666
ctx.status = 200;

0 commit comments

Comments
 (0)