Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: throttle polling for status #3221

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
"knex": "^2.5.1",
"least-recent": "^1.0.3",
"level": "^8.0.1",
"lodash-es": "^4.17.21",
"lodash.clonedeep": "^4.5.0",
"mapmoize": "^1.2.1",
"multiformats": "^13.0.0",
Expand All @@ -101,6 +102,7 @@
"@ceramicnetwork/ipfs-daemon": "^5.10.0-rc.0",
"@databases/pg-test": "^3.1.2",
"@didtools/cacao": "^3.0.0",
"@types/lodash-es": "^4.17.12",
"@types/node": "^18.0.3",
"csv-parser": "^3.0.0",
"did-resolver": "^4.0.1",
Expand Down
88 changes: 81 additions & 7 deletions packages/core/src/anchor/anchor-processing-loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@ import type { DiagnosticsLogger } from '@ceramicnetwork/common'
import type { NamedTaskQueue } from '../state-management/named-task-queue.js'
import type { StreamID } from '@ceramicnetwork/streamid'
import { TimeableMetric, SinceField } from '@ceramicnetwork/observability'
import { ModelMetrics, Observable, Counter } from '@ceramicnetwork/model-metrics'
import { ModelMetrics, Counter } from '@ceramicnetwork/model-metrics'
import throttle from 'lodash-es/throttle'
import { CID, Version } from 'multiformats'
import { interval } from 'rxjs'
import { startWith } from 'rxjs/operators'
import { RemoteCAS } from './ethereum/remote-cas.js'

const METRICS_REPORTING_INTERVAL_MS = 10000 // 10 second reporting interval

const DEFAULT_CONCURRENCY = 25

const CAS_REQUEST_POLLING_INTERVAL_MS = 1000 / 6 // 1000 ms divided by 6 calls

/**
* Get anchor request entries from AnchorRequestStore one by one. For each entry, get CAS response,
* and handle the response via `eventHandler.handle`.
Expand All @@ -28,6 +35,59 @@ export class AnchorProcessingLoop {
readonly #anchorStoreQueue: NamedTaskQueue
readonly #anchorPollingMetrics: TimeableMetric

readonly #cas: CASClient
// This function is throttled to limit its execution frequency to no more than once every CAS_REQUEST_POLLING_INTERVAL_MS.
// It attempts to get the status of an anchor request from the CAS. If the request is not found, it logs a warning,
// builds a new CAR file for the request, and submits a new request to the CAS.
// The function is configured to execute only at the leading edge of the interval,
// meaning it will execute immediately when called, but subsequent calls
// within the CAS_REQUEST_POLLING_INTERVAL_MS will be ignored, ensuring that the CAS is not overwhelmed with too many
// frequent requests and helping to manage system resources efficiently.
throttledGetStatusForRequest = throttle(
async (
streamId: StreamID,
cid: CID<unknown, number, number, Version>,
cas: CASClient,
logger: DiagnosticsLogger,
eventHandler: AnchorLoopHandler
) => {
return cas.getStatusForRequest(streamId, cid).catch(async (error) => {
logger.warn(`No request present on CAS for ${cid} of ${streamId}: ${error}`)
const requestCAR = await eventHandler.buildRequestCar(streamId, cid)
return cas.create(new AnchorRequestCarFileReader(requestCAR))
})
},
CAS_REQUEST_POLLING_INTERVAL_MS, // Set the maximum frequency of function execution
{ trailing: false } // Execute only at the leading edge of the interval
)
intervalSubscription: any

// This method dynamically adjusts the polling interval based on the current rate of create requests.
// It calculates a new interval using a square root function to moderate the change rate, ensuring the interval
// remains within predefined maximum and minimum bounds. The adjusted interval is then applied to throttle
// the `throttledGetStatusForRequest` function, which controls the frequency of status checks and request submissions
// to the CAS, enhancing system responsiveness and stability.
private adjustPollingInterval(): void {
const maxInterval = 200 // maximum interval in ms
const minInterval = 5 // minimum interval in ms
if (this.#cas instanceof RemoteCAS) {
const currentRate = this.#cas.getCreateRequestRate()
let newInterval = 1000 / Math.sqrt(currentRate + 1)
newInterval = Math.min(Math.max(newInterval, minInterval), maxInterval)

this.throttledGetStatusForRequest = throttle(this.throttledGetStatusForRequest, newInterval, {
trailing: false,
})
} else {
// Handle the case where #cas is not an instance of RemoteCAS
console.warn('CAS client does not support dynamic rate adjustment.')
// Using minimum interval
this.throttledGetStatusForRequest = throttle(this.throttledGetStatusForRequest, minInterval, {
trailing: false,
})
}
}

constructor(
batchSize: number,
cas: CASClient,
Expand All @@ -42,7 +102,7 @@ export class AnchorProcessingLoop {
'anchorRequestAge',
METRICS_REPORTING_INTERVAL_MS
)

this.#cas = cas
const concurrency =
Number(process.env.CERAMIC_ANCHOR_POLLING_CONCURRENCY) || DEFAULT_CONCURRENCY
this.#loop = new ProcessingLoop(
Expand All @@ -55,11 +115,13 @@ export class AnchorProcessingLoop {
`Loading pending anchor metadata for Stream ${streamId} from AnchorRequestStore`
)
const entry = await store.load(streamId)
const event = await cas.getStatusForRequest(streamId, entry.cid).catch(async (error) => {
logger.warn(`No request present on CAS for ${entry.cid} of ${streamId}: ${error}`)
const requestCAR = await eventHandler.buildRequestCar(streamId, entry.cid)
return cas.create(new AnchorRequestCarFileReader(requestCAR))
})
const event = await this.throttledGetStatusForRequest(
streamId,
entry.cid,
this.#cas,
logger,
eventHandler
)
const isTerminal = await eventHandler.handle(event)
logger.verbose(
`Anchor event with status ${event.status} for commit CID ${entry.cid} of Stream ${streamId} handled successfully`
Expand Down Expand Up @@ -102,13 +164,25 @@ export class AnchorProcessingLoop {
start(): void {
this.#anchorPollingMetrics.startPublishingStats()
void this.#loop.start()

// Set up an interval to adjust the polling interval every 10 minutes (600000 milliseconds)
const subscription = interval(600000)
.pipe(
startWith(0) // to start immediately
)
.subscribe(() => {
this.adjustPollingInterval()
})

this.intervalSubscription = subscription
}

/**
* Stop looping.
*/
async stop(): Promise<void> {
this.#anchorPollingMetrics.stopPublishingStats()
this.intervalSubscription?.unsubscribe()
return this.#loop.stop()
}
}
7 changes: 7 additions & 0 deletions packages/core/src/anchor/anchor-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ export interface CASClient {
* Abort any fetch requests to CAS.
*/
close(): Promise<void>

/**
* Calculates the rate of anchor request creations over the last 15 minutes.
* This rate is computed as the total number of requests created divided by 15,
* providing the average number of requests per minute within this time frame.
*/
getCreateRequestRate?(): number
}

export class NotSingleChainError extends Error {
Expand Down
19 changes: 19 additions & 0 deletions packages/core/src/anchor/ethereum/remote-cas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export class RemoteCAS implements CASClient {
// does not cause this counter to increment.
#numFailedRequests: number
#firstFailedRequestDate: Date | null
private createRequestTimestamps: number[] = []

constructor(logger: DiagnosticsLogger, anchorServiceUrl: string, sendRequest: FetchRequest) {
this.#logger = logger
Expand Down Expand Up @@ -140,6 +141,7 @@ export class RemoteCAS implements CASClient {
*/
async create(carFileReader: AnchorRequestCarFileReader): Promise<AnchorEvent> {
const response = await firstValueFrom(this.create$(carFileReader))
this.incrementCreateRequestCount()
return parseResponse(carFileReader.streamId, carFileReader.tip, response)
}

Expand Down Expand Up @@ -207,6 +209,23 @@ export class RemoteCAS implements CASClient {
return parseResponse(streamId, tip, response)
}

private incrementCreateRequestCount(): void {
this.createRequestTimestamps.push(Date.now())
}

public getCreateRequestRate(): number {
const currentTime = Date.now()
const oneMinuteAgo = currentTime - 600000 // 1000 * 60 * 60 * 10

const recentTimestamps = this.createRequestTimestamps.filter(
(timestamp) => timestamp > oneMinuteAgo
)
this.createRequestTimestamps = recentTimestamps // Update the array to only hold recent timestamps

// get the average of the rates in the last 15 minutes
return recentTimestamps.length / 10
}

async close() {
this.#stopSignal.next()
this.#stopSignal.complete()
Expand Down