From e195b70c01c7b92c1a7058c1dee5e212e8924e56 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 21 Jul 2025 12:42:10 +0200 Subject: [PATCH 1/3] [WIP] Add BucketDataCache. --- .../src/storage/BucketDataCache.ts | 330 ++++++++++++++++++ .../test/src/bucket_data_cache.test.ts | 262 ++++++++++++++ 2 files changed, 592 insertions(+) create mode 100644 packages/service-core/src/storage/BucketDataCache.ts create mode 100644 packages/service-core/test/src/bucket_data_cache.test.ts diff --git a/packages/service-core/src/storage/BucketDataCache.ts b/packages/service-core/src/storage/BucketDataCache.ts new file mode 100644 index 000000000..7d369202c --- /dev/null +++ b/packages/service-core/src/storage/BucketDataCache.ts @@ -0,0 +1,330 @@ +import { JSONBig } from '@powersync/service-jsonbig'; +import { LRUCache } from 'lru-cache/min'; +import { OplogEntry } from '../util/protocol-types.js'; +import { InternalOpId } from '../util/utils.js'; +import { BucketDataBatchOptions, SyncBucketDataChunk } from './SyncRulesBucketStorage.js'; +import { ServiceAssertionError } from '@powersync/lib-services-framework'; +import { first } from 'ix/iterable/first.js'; + +export interface FetchBucketDataArgs { + bucket: string; + /** + * Starting point, inclusive. + * + * Set to 0n to start from the beginning. + */ + start: InternalOpId; + /** + * Ending point, inclusive. + */ + end: InternalOpId; +} + +export type FetchData = (batch: FetchBucketDataArgs[]) => Promise; + +export interface BucketDataCacheOptions { + /** + * Upstream checksum implementation. + * + * This fetches a batch of either entire bucket checksums, or a partial range. + */ + fetchData: FetchData; + + /** + * Maximum number of cached data. + */ + maxSize?: number; +} + +const DEFAULT_MAX_SIZE_BYTES = 128 * 1024 * 1024; + +/** + * The time to live for cached data. + * + * The primary purposes of the cache is to de-duplicate in-progress requests for the same bucket and checkpoint, + * when many clients are syncing the same buckets. For this, we don't need a long TTL. + * + * To a smaller extent the cache can also help to reduce data lookups for many initial syncs shortly after each + * other, but this is not the primary use case. + */ +const TTL_MS = 60_000; + +/** + * Implement a LRU cache for bucket data requests. Each (bucket, start_end) request is cached separately, + * while the lookups occur in batches. + * + * We use the LRUCache fetchMethod to deduplicate in-progress requests. + * + * Generally, we have a number of active clients, each needing to stream data. + * We can process a number of requests in parallel, and cache the results. + * + * We need to optimize processing to: + * 1. Avoid fetching the same bucket data multiple times. + * 2. Prioritize using cached data first. + * 3. Avoid starving certain clients. + * 4. Avoid over-fetching data - each client can only process one response at a time. + * + * General strategy: + * 1. Track each client with all its requests. + * 2. For each client, track which requests are in progress. + * 3. Have a number of workers that can process requests in parallel. + * 4. On each worker iteration, pick up to 10 requests. Prioritize clients with the least number of active requests. + * 5. Process these requests, cache the results, and notify each client that results are available. + */ +export class BucketDataCache { + private cache: LRUCache; + private inProgress = new Set(); + private sessions = new Set(); + + private fetchData: FetchData; + + constructor(options: BucketDataCacheOptions) { + this.fetchData = options.fetchData; + + this.cache = new LRUCache({ + max: 10_000, + maxSize: options.maxSize ?? DEFAULT_MAX_SIZE_BYTES, + + sizeCalculation: (value: SyncBucketDataChunk) => { + // FIXME: this is not correct + return value.chunkData.data.length + 10; + }, + + // We use a TTL so that data can eventually be refreshed + // after a compact. This only has effect if the bucket has + // not been checked in the meantime. + ttl: TTL_MS, + ttlResolution: 1_000, + allowStale: false + }); + } + + private getRequestBatch(): FetchBucketDataArgs[] { + let requests: FetchBucketDataArgs[] = []; + + let firstSession: ClientSession | null = null; + + let numBlocking = 0; + let blockingMode = true; + + // 1. Check high-priority requests first. + while ((numBlocking == 0 && requests.length < 100) || (numBlocking > 0 && requests.length < 20)) { + // 1. Get the next client with pending requests. + const next = first(this.sessions); + if (next == null) { + break; + } + if (firstSession == null) { + firstSession = next; + } else if (firstSession == next) { + if (blockingMode) { + blockingMode = false; + firstSession = null; + } else { + break; + } + } + // Move to end of the queue. + this.sessions.delete(next); + this.sessions.add(next); + + const request = blockingMode ? this.getBlockingRequest(next) : this.getNonBlockingRequest(next); + console.log('request', request); + if (request == null) { + // No blocking request, continue to the next client. + continue; + } + + // 3. Add the request to the batch. + const key = makeCacheKey(request); + this.inProgress.add(key); + requests.push(request); + } + + return requests; + } + + private async triggerWork() { + // TODO: limit concurrentcy + await this.workIteration(); + } + + private async workIteration() { + const requests = this.getRequestBatch(); + console.log('workIteration', requests); + if (requests.length === 0) { + return; + } + + const results = await this.fetchData(requests); + console.log('workIteration results', results); + for (const result of results) { + const key = makeCacheKey({ + bucket: result.chunkData.bucket, + start: BigInt(result.chunkData.after) + }); + this.cache.set( + makeCacheKey({ + bucket: result.chunkData.bucket, + start: BigInt(result.chunkData.after) + }), + result + ); + this.inProgress.delete(key); + } + + for (let session of this.sessions) { + // Notify all sessions that data is available. + session.notify?.(); + } + } + + /** + * Given a set of bucket requests, return an iterator of chunks. + * + * The iterator may be cancelled at any time. + * + * The iterator may process buckets in any order - we may prioritize cached buckets. + * + * Each bucket request may be split into smaller chunks. These chunks will always be in order. + * + * @param checkpoint the checkpoint to fetch data for + * @param dataBuckets current bucket states, used to determine the starting point for each bucket. This may be modified. + */ + async *getBucketData( + checkpoint: InternalOpId, + dataBuckets: Map, + options?: BucketDataBatchOptions + ): AsyncIterableIterator { + const request = new ClientSession(checkpoint, dataBuckets, options); + console.log('getBucketData', request); + this.sessions.add(request); + + while (dataBuckets.size > 0) { + console.log('getting next batch', request); + const chunk = await this.getNextBatch(request); + console.log('got chunk', chunk); + if (chunk == null) { + break; + } + + if (chunk.chunkData.has_more) { + request.bucketState.set(chunk.chunkData.bucket, BigInt(chunk.chunkData.next_after)); + } else { + request.bucketState.delete(chunk.chunkData.bucket); + } + // TODO: sanitize chunk (remove data past the checkpoint) + yield chunk; + } + } + + private getCachedData(session: ClientSession): SyncBucketDataChunk | undefined { + for (let [bucket, start] of session.bucketState.entries()) { + const key = makeCacheKey({ bucket, start }); + if (this.cache.has(key)) { + return this.cache.get(key)!; + } + } + return undefined; + } + + private hasPendingData(session: ClientSession): boolean { + for (let [bucket, start] of session.bucketState.entries()) { + const key = makeCacheKey({ bucket, start }); + if (this.hasPendingRequest(key)) { + return true; + } + } + return false; + } + + private hasPendingRequest(key: string): boolean { + if (this.cache.has(key)) { + return true; + } else if (this.inProgress.has(key)) { + return true; + } + return false; + } + + private getBlockingRequest(session: ClientSession): FetchBucketDataArgs | undefined { + if (this.hasPendingData(session)) { + // Not blocking + return undefined; + } + console.log('state', session.bucketState); + + for (let [bucket, start] of session.bucketState.entries()) { + const key = makeCacheKey({ bucket, start }); + if (this.hasPendingRequest(key)) { + // This request is already in progress, so we can't block on it. + continue; + } + return { bucket, start, end: session.checkpoint }; + } + } + + private getNonBlockingRequest(session: ClientSession): FetchBucketDataArgs | undefined { + for (let [bucket, start] of session.bucketState.entries()) { + const key = makeCacheKey({ bucket, start }); + if (this.hasPendingRequest(key)) { + // This request is already in progress, so we can't block on it. + continue; + } + return { bucket, start, end: session.checkpoint }; + } + } + + private async getNextBatch(session: ClientSession): Promise { + while (session.bucketState.size > 0) { + // 1. Check if we have cached data for this request. + const cachedData = this.getCachedData(session); + console.log('cached?', cachedData); + if (cachedData) { + return cachedData; + } + + const promise = session.wait(); + console.log('triggerWork'); + + await this.triggerWork(); + console.log('waiting for promise'); + await promise; + console.log('got promise'); + // Now it should be in the cache, so check again + + // TODO: Implement timeout + } + } +} + +function makeCacheKey(request: Pick) { + return JSON.stringify({ + b: request.bucket, + s: request.start.toString() + }); +} + +class ClientSession { + checkpoint: InternalOpId; + bucketState: Map; + + public notify: (() => void) | undefined; + + constructor( + checkpoint: InternalOpId, + dataBuckets: Map, + private options?: BucketDataBatchOptions + ) { + this.checkpoint = checkpoint; + this.bucketState = new Map(dataBuckets); + } + + wait() { + return new Promise((resolve) => { + this.notify = () => { + resolve(); + }; + }); + } +} diff --git a/packages/service-core/test/src/bucket_data_cache.test.ts b/packages/service-core/test/src/bucket_data_cache.test.ts new file mode 100644 index 000000000..c5a5ffece --- /dev/null +++ b/packages/service-core/test/src/bucket_data_cache.test.ts @@ -0,0 +1,262 @@ +import { BucketDataCache, FetchData, FetchBucketDataArgs } from '@/storage/BucketDataCache.js'; +import { SyncBucketDataChunk } from '@/storage/SyncRulesBucketStorage.js'; +import { InternalOpId } from '@/util/util-index.js'; +import { chunk } from 'lodash'; +import { describe, expect, it, vi } from 'vitest'; + +function createTestChunk(bucket: string, after: InternalOpId, hasMore: boolean = false): SyncBucketDataChunk { + return { + chunkData: { + bucket, + data: [], + has_more: hasMore, + after: after.toString(), + next_after: (after + 1n).toString() + }, + targetOp: after + 1n + }; +} + +function createMockFetchData(chunks: SyncBucketDataChunk[]): FetchData { + return vi.fn(async (batch: FetchBucketDataArgs[]) => { + let results = new Map(); + for (let chunk of chunks) { + results.set(`${chunk.chunkData.bucket}:${chunk.chunkData.after}`, chunk); + } + return batch.map((request) => { + const key = `${request.bucket}:${request.start}`; + return ( + results.get(key) ?? { + chunkData: { + bucket: request.bucket, + data: [], + has_more: false, + after: request.start.toString(), + next_after: (request.end + 1n).toString() + }, + targetOp: null + } + ); + }); + }); +} + +describe('BucketDataCache', () => { + it('should fetch data from one bucket when not cached', async () => { + const mockChunk = createTestChunk('bucket1', 100n); + const fetchData = createMockFetchData([mockChunk]); + const cache = new BucketDataCache({ fetchData }); + + const dataBuckets = new Map([['bucket1', 100n]]); + const results = []; + + for await (const chunk of cache.getBucketData(200n, dataBuckets)) { + results.push(chunk); + } + + expect(results).toEqual([mockChunk]); + expect(fetchData).toHaveBeenCalledOnce(); + expect(fetchData).toHaveBeenCalledWith([{ bucket: 'bucket1', start: 100n, end: 200n }]); + }); + + it('should get multiple chunks from one bucket when not cached', async () => { + const chunks = [ + { + chunkData: { + bucket: 'bucket1', + data: [], + has_more: true, + after: '100', + next_after: '200' + }, + targetOp: null + }, + { + chunkData: { + bucket: 'bucket1', + data: [], + has_more: false, + after: '200', + next_after: '300' + }, + targetOp: null + } + ]; + const fetchData = createMockFetchData(chunks); + const cache = new BucketDataCache({ fetchData }); + + const dataBuckets = new Map([['bucket1', 100n]]); + const results = []; + + for await (const chunk of cache.getBucketData(300n, dataBuckets)) { + results.push(chunk); + } + + expect(results).toEqual(chunks); + }); + + it('should deduplicate concurrent requests', async () => { + let resolvePromise: (() => void) | null = null; + const fetchPromise = new Promise((resolve) => { + resolvePromise = resolve; + }); + + const mockChunk = createTestChunk('bucket1', 100n); + const fetchData = vi.fn(async (batch: FetchBucketDataArgs[]) => { + await fetchPromise; + return batch.map(() => mockChunk); + }); + + const cache = new BucketDataCache({ fetchData }); + + const dataBuckets1 = new Map([['bucket1', 100n]]); + const dataBuckets2 = new Map([['bucket1', 100n]]); + + const iterator1 = cache.getBucketData(200n, dataBuckets1); + const iterator2 = cache.getBucketData(200n, dataBuckets2); + + const promise1 = iterator1.next(); + const promise2 = iterator2.next(); + + resolvePromise!(); + + const [result1, result2] = await Promise.all([promise1, promise2]); + + expect(result1.value).toEqual(mockChunk); + expect(result2.value).toEqual(mockChunk); + expect(fetchData).toHaveBeenCalledOnce(); + }); + + it('should use cached data for subsequent requests', async () => { + const mockChunk = createTestChunk('bucket1', 100n); + const fetchData = createMockFetchData([mockChunk]); + const cache = new BucketDataCache({ fetchData }); + + const dataBuckets1 = new Map([['bucket1', 100n]]); + const results1 = []; + for await (const chunk of cache.getBucketData(200n, dataBuckets1)) { + results1.push(chunk); + } + + const dataBuckets2 = new Map([['bucket1', 100n]]); + const results2 = []; + for await (const chunk of cache.getBucketData(200n, dataBuckets2)) { + results2.push(chunk); + } + + expect(results1).toEqual([mockChunk]); + expect(results2).toEqual([mockChunk]); + expect(fetchData).toHaveBeenCalledOnce(); + }); + + it('should handle multiple buckets in single request', async () => { + const chunk1 = createTestChunk('bucket1', 100n); + const chunk2 = createTestChunk('bucket2', 150n); + const fetchData = createMockFetchData([chunk1, chunk2]); + const cache = new BucketDataCache({ fetchData }); + + const dataBuckets = new Map([ + ['bucket1', 100n], + ['bucket2', 150n] + ]); + const results = []; + + for await (const chunk of cache.getBucketData(200n, dataBuckets)) { + results.push(chunk); + } + + expect(results).toHaveLength(2); + expect(results).toContainEqual(chunk1); + expect(results).toContainEqual(chunk2); + expect(fetchData).toHaveBeenCalledOnce(); + }); + + it('should handle bucket with has_more=true', async () => { + const chunk1 = createTestChunk('bucket1', 100n, true); + const chunk2 = createTestChunk('bucket1', 101n, false); + + const fetchData = vi.fn(async (batch: FetchBucketDataArgs[]) => { + const request = batch[0]; + if (request.start === 100n) { + return [chunk1]; + } else if (request.start === 101n) { + return [chunk2]; + } + return []; + }); + + const cache = new BucketDataCache({ fetchData }); + const dataBuckets = new Map([['bucket1', 100n]]); + const results = []; + + for await (const chunk of cache.getBucketData(200n, dataBuckets)) { + results.push(chunk); + } + + expect(results).toEqual([chunk1, chunk2]); + expect(fetchData).toHaveBeenCalledTimes(2); + expect(fetchData).toHaveBeenNthCalledWith(1, [{ bucket: 'bucket1', start: 100n, end: 200n }]); + expect(fetchData).toHaveBeenNthCalledWith(2, [{ bucket: 'bucket1', start: 101n, end: 200n }]); + }); + + it('should handle empty bucket list', async () => { + const fetchData = vi.fn(); + const cache = new BucketDataCache({ fetchData }); + + const dataBuckets = new Map(); + const results = []; + + for await (const chunk of cache.getBucketData(200n, dataBuckets)) { + results.push(chunk); + } + + expect(results).toEqual([]); + expect(fetchData).not.toHaveBeenCalled(); + }); + + it.skip('should respect custom maxSize option', async () => { + const fetchData = vi.fn(async (batch: FetchBucketDataArgs[]) => { + return batch.map((request) => createTestChunk(request.bucket, request.start)); + }); + + const cache = new BucketDataCache({ fetchData, maxSize: 1 }); + + const dataBuckets1 = new Map([['bucket1', 100n]]); + const results1 = []; + for await (const chunk of cache.getBucketData(200n, dataBuckets1)) { + results1.push(chunk); + } + + const dataBuckets2 = new Map([['bucket2', 150n]]); + const results2 = []; + for await (const chunk of cache.getBucketData(200n, dataBuckets2)) { + results2.push(chunk); + } + + const dataBuckets3 = new Map([['bucket1', 100n]]); + const results3 = []; + for await (const chunk of cache.getBucketData(200n, dataBuckets3)) { + results3.push(chunk); + } + + expect(results1).toHaveLength(1); + expect(results2).toHaveLength(1); + expect(results3).toHaveLength(1); + expect(fetchData).toHaveBeenCalledTimes(3); + }); + + it('should handle fetch errors gracefully', async () => { + const fetchData = vi.fn(async () => { + throw new Error('Fetch failed'); + }); + + const cache = new BucketDataCache({ fetchData }); + const dataBuckets = new Map([['bucket1', 100n]]); + + await expect(async () => { + for await (const chunk of cache.getBucketData(200n, dataBuckets)) { + // Should not reach here + } + }).rejects.toThrow('Fetch failed'); + }); +}); From 744da0cb8332f5ea1ab7b0a06ab4f371190cb892 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 22 Jul 2025 15:03:41 +0200 Subject: [PATCH 2/3] Sanitize chunks. --- .../src/storage/BucketDataCache.ts | 94 ++++++++++++++----- 1 file changed, 70 insertions(+), 24 deletions(-) diff --git a/packages/service-core/src/storage/BucketDataCache.ts b/packages/service-core/src/storage/BucketDataCache.ts index 7d369202c..1d44c50c3 100644 --- a/packages/service-core/src/storage/BucketDataCache.ts +++ b/packages/service-core/src/storage/BucketDataCache.ts @@ -1,10 +1,7 @@ -import { JSONBig } from '@powersync/service-jsonbig'; +import { first } from 'ix/iterable/first.js'; import { LRUCache } from 'lru-cache/min'; -import { OplogEntry } from '../util/protocol-types.js'; import { InternalOpId } from '../util/utils.js'; import { BucketDataBatchOptions, SyncBucketDataChunk } from './SyncRulesBucketStorage.js'; -import { ServiceAssertionError } from '@powersync/lib-services-framework'; -import { first } from 'ix/iterable/first.js'; export interface FetchBucketDataArgs { bucket: string; @@ -65,11 +62,34 @@ const TTL_MS = 60_000; * 4. Avoid over-fetching data - each client can only process one response at a time. * * General strategy: - * 1. Track each client with all its requests. - * 2. For each client, track which requests are in progress. + * 1. Track each session with all its requests. + * 2. Track which requests are in progress and which are cached already. * 3. Have a number of workers that can process requests in parallel. - * 4. On each worker iteration, pick up to 10 requests. Prioritize clients with the least number of active requests. + * 4. On each worker iteration, pick a batch of requests, prioritizing clients that are blocked. * 5. Process these requests, cache the results, and notify each client that results are available. + * + * Requests and responses are not always "exact": + * 1. A cached response may contain more data than the client requested, so we need to trim it to the checkpoint. + * 2. A response may be split into multiple chunks, and we need to cache each chunk separately. + * 3. When a chunk partially satisfies a request, we need to add a new request for the next chunk. + * + * For managing the cache, we have three main states for each item: + * 1. Active: A session is actively waiting for the chunk, and will be able to use it right now. + * 2. Lookahead: A session has requested the chunk, but cannot use it just yet, since it is still processing previous chunks. + * 3. Cached: No sessions are actively waiting for the chunk, but it is cached for future use. + * + * Active chunks should always take priority, and not be evicted by other chunks. If the cache is full with active chunks, + * we should stop fetching new data. + * + * Generally prioritize lookahead over other cached chunks. + * If the cache is full with lookahead chunks, avoid fetching more lookahead data, but still prioritize active requests. + * + * Next steps: + * 1. Error handling + * 2. Worker thread management: Implement worker pool / limit concurrency. + * 3. Timeout handling. + * 4. Cache size management. + * 5. Optimize. */ export class BucketDataCache { private cache: LRUCache; @@ -196,25 +216,29 @@ export class BucketDataCache { dataBuckets: Map, options?: BucketDataBatchOptions ): AsyncIterableIterator { - const request = new ClientSession(checkpoint, dataBuckets, options); - console.log('getBucketData', request); - this.sessions.add(request); - - while (dataBuckets.size > 0) { - console.log('getting next batch', request); - const chunk = await this.getNextBatch(request); - console.log('got chunk', chunk); - if (chunk == null) { - break; - } + const session = new ClientSession(checkpoint, dataBuckets, options); + console.log('getBucketData', session); + this.sessions.add(session); + try { + while (dataBuckets.size > 0) { + console.log('getting next batch', session); + const chunk = await this.getNextBatch(session); + console.log('got chunk', chunk); + if (chunk == null) { + break; + } + const sanitized = sanitizeChunkForClient(chunk, session.checkpoint); - if (chunk.chunkData.has_more) { - request.bucketState.set(chunk.chunkData.bucket, BigInt(chunk.chunkData.next_after)); - } else { - request.bucketState.delete(chunk.chunkData.bucket); + if (sanitized.chunkData.has_more) { + session.bucketState.set(sanitized.chunkData.bucket, BigInt(sanitized.chunkData.next_after)); + } else { + session.bucketState.delete(sanitized.chunkData.bucket); + } + + yield sanitized; } - // TODO: sanitize chunk (remove data past the checkpoint) - yield chunk; + } finally { + this.sessions.delete(session); } } @@ -328,3 +352,25 @@ class ClientSession { }); } } + +function sanitizeChunkForClient(chunk: SyncBucketDataChunk, checkpoint: InternalOpId): SyncBucketDataChunk { + // Remove data past the checkpoint. + if (chunk.chunkData.data.length > 0) { + const lastOpId = BigInt(chunk.chunkData.data[chunk.chunkData.data.length - 1].op_id); + if (lastOpId > checkpoint) { + // Chunk has more data than this client requested, so we need to trim it. + // Don't modify the original chunk, but return a new one. + return { + ...chunk, + chunkData: { + bucket: chunk.chunkData.bucket, + after: chunk.chunkData.after, + data: chunk.chunkData.data.filter((entry) => BigInt(entry.op_id) <= checkpoint), + has_more: false, // We don't have more data past the checkpoint - the request is complete + next_after: checkpoint.toString() // Resume from the checkpoint, not the last entry. + } + }; + } + } + return chunk; +} From 9a22306e05a6754e3036df61ccd24606aa063304 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 4 Aug 2025 14:37:57 +0200 Subject: [PATCH 3/3] Initial error handling. --- .../src/storage/BucketDataCache.ts | 110 +++++++++++++----- 1 file changed, 78 insertions(+), 32 deletions(-) diff --git a/packages/service-core/src/storage/BucketDataCache.ts b/packages/service-core/src/storage/BucketDataCache.ts index 1d44c50c3..709c03908 100644 --- a/packages/service-core/src/storage/BucketDataCache.ts +++ b/packages/service-core/src/storage/BucketDataCache.ts @@ -2,6 +2,7 @@ import { first } from 'ix/iterable/first.js'; import { LRUCache } from 'lru-cache/min'; import { InternalOpId } from '../util/utils.js'; import { BucketDataBatchOptions, SyncBucketDataChunk } from './SyncRulesBucketStorage.js'; +import { logger, ServiceAssertionError } from '@powersync/lib-services-framework'; export interface FetchBucketDataArgs { bucket: string; @@ -17,6 +18,15 @@ export interface FetchBucketDataArgs { end: InternalOpId; } +/** + * Fetch a batch of bucket data. + * + * The source may choose to only return limited chunks of data. + * + * If there is no data for a request, the source must return an empty chunk. + * + * More specifically, the response may never be an empty array. + */ export type FetchData = (batch: FetchBucketDataArgs[]) => Promise; export interface BucketDataCacheOptions { @@ -95,6 +105,8 @@ export class BucketDataCache { private cache: LRUCache; private inProgress = new Set(); private sessions = new Set(); + private activeWorkers = new Set(); + private workerLimit = 5; private fetchData: FetchData; @@ -110,17 +122,17 @@ export class BucketDataCache { return value.chunkData.data.length + 10; }, - // We use a TTL so that data can eventually be refreshed - // after a compact. This only has effect if the bucket has - // not been checked in the meantime. + // We treat this as a "least-recently-fetched" cache, not a "least-recently-used" cache. + updateAgeOnGet: false, ttl: TTL_MS, ttlResolution: 1_000, allowStale: false }); } - private getRequestBatch(): FetchBucketDataArgs[] { + private getRequestBatch(): { requests: FetchBucketDataArgs[]; sessions: Set } { let requests: FetchBucketDataArgs[] = []; + let sessions = new Set(); let firstSession: ClientSession | null = null; @@ -159,43 +171,75 @@ export class BucketDataCache { const key = makeCacheKey(request); this.inProgress.add(key); requests.push(request); + sessions.add(next); } - return requests; + return { requests, sessions }; } - private async triggerWork() { - // TODO: limit concurrentcy - await this.workIteration(); + private triggerWork() { + if (this.activeWorkers.size >= this.workerLimit) { + return; + } + const promise = this.workIteration(); + this.activeWorkers.add(promise); + promise + .then((hadData) => { + this.activeWorkers.delete(promise); + if (hadData) { + // If we had data, trigger another work iteration. + this.triggerWork(); + } + }) + .catch((e) => { + this.activeWorkers.delete(promise); + logger.error('BucketDataCache work iteration failed', e); + // Don't trigger another iteration here - let the client retry. + }); } private async workIteration() { - const requests = this.getRequestBatch(); + const { requests, sessions } = this.getRequestBatch(); console.log('workIteration', requests); if (requests.length === 0) { - return; + return false; } - - const results = await this.fetchData(requests); - console.log('workIteration results', results); - for (const result of results) { - const key = makeCacheKey({ - bucket: result.chunkData.bucket, - start: BigInt(result.chunkData.after) - }); - this.cache.set( - makeCacheKey({ + try { + const results = await this.fetchData(requests); + // Note that there results may be more than requests, since a single request may return multiple chunks, + // and the source may "over-fetch" data. + // It may also be less than requests, if the first chunk is large. + // However, it may never be empty. + if (results.length == 0) { + throw new ServiceAssertionError(`Empty response while fetching bucket data`); + } + console.log('workIteration results', results); + for (const result of results) { + const key = makeCacheKey({ bucket: result.chunkData.bucket, start: BigInt(result.chunkData.after) - }), - result - ); - this.inProgress.delete(key); - } + }); + this.cache.set(key, result); + } - for (let session of this.sessions) { - // Notify all sessions that data is available. - session.notify?.(); + for (let session of this.sessions) { + // Notify _all_ sessions that data is available. + // We can optimize this later to only notify affected sessions. + session.notify?.(); + } + return true; + } catch (e) { + logger.error('BucketDataCache work iteration failed', e); + // Notify _these specific sessions_ that an error occurred. + for (let session of sessions) { + session.notifyError?.(e); + } + return false; + } finally { + for (let r of requests) { + const key = makeCacheKey(r); + this.inProgress.delete(key); + } } } @@ -310,8 +354,7 @@ export class BucketDataCache { const promise = session.wait(); console.log('triggerWork'); - - await this.triggerWork(); + this.triggerWork(); console.log('waiting for promise'); await promise; console.log('got promise'); @@ -334,7 +377,7 @@ class ClientSession { bucketState: Map; public notify: (() => void) | undefined; - + public notifyError: ((e: Error) => void) | undefined; constructor( checkpoint: InternalOpId, dataBuckets: Map, @@ -345,10 +388,13 @@ class ClientSession { } wait() { - return new Promise((resolve) => { + return new Promise((resolve, reject) => { this.notify = () => { resolve(); }; + this.notifyError = (e: Error) => { + reject(e); + }; }); } }