From a34f03f2a80330f841e81a92155d7705de6e262d Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 26 Jul 2025 09:39:32 +0100 Subject: [PATCH 1/5] Improve dynamic flush scheduler for otel data The changes introduce a more flexible and adaptive dynamic flush scheduler to address production issues where the system wasn't flushing data fast enough, causing memory growth and crashes. This issue arises from the existing scheduler handling only a single flush at a time, limiting concurrency and failing to cope with the influx of logs. - Added configuration options for setting minimum and maximum concurrency levels, maximum batch size, and memory pressure threshold. These parameters ensure that flush operations adjust dynamically based on workload and pressure. - Implemented `pLimit` to facilitate concurrent flush operations, with adjustments made according to batch queue length and memory pressure. - Metrics reporting improvements were added to monitor the dynamic behavior of the flush scheduler, aiding in identifying performance issues and optimizing the operation accordingly. --- apps/webapp/app/env.server.ts | 4 + .../app/v3/dynamicFlushScheduler.server.ts | 230 ++++++++++++++++-- apps/webapp/app/v3/eventRepository.server.ts | 57 +++++ 3 files changed, 272 insertions(+), 19 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index b72857e04f..0a56ab5fc4 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -252,6 +252,10 @@ const EnvironmentSchema = z.object({ EVENTS_BATCH_SIZE: z.coerce.number().int().default(100), EVENTS_BATCH_INTERVAL: z.coerce.number().int().default(1000), EVENTS_DEFAULT_LOG_RETENTION: z.coerce.number().int().default(7), + EVENTS_MIN_CONCURRENCY: z.coerce.number().int().default(1), + EVENTS_MAX_CONCURRENCY: z.coerce.number().int().default(10), + EVENTS_MAX_BATCH_SIZE: z.coerce.number().int().default(500), + EVENTS_MEMORY_PRESSURE_THRESHOLD: z.coerce.number().int().default(2000), SHARED_QUEUE_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10), SHARED_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(100), SHARED_QUEUE_CONSUMER_NEXT_TICK_INTERVAL_MS: z.coerce.number().int().default(100), diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index f87c2a143b..80f4dafc12 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -1,38 +1,84 @@ import { nanoid } from "nanoid"; +import pLimit from "p-limit"; +import { logger } from "~/services/logger.server"; export type DynamicFlushSchedulerConfig = { batchSize: number; flushInterval: number; callback: (flushId: string, batch: T[]) => Promise; + // New configuration options + minConcurrency?: number; + maxConcurrency?: number; + maxBatchSize?: number; + memoryPressureThreshold?: number; // Number of items that triggers increased concurrency }; export class DynamicFlushScheduler { - private batchQueue: T[][]; // Adjust the type according to your data structure - private currentBatch: T[]; // Adjust the type according to your data structure + private batchQueue: T[][]; + private currentBatch: T[]; private readonly BATCH_SIZE: number; private readonly FLUSH_INTERVAL: number; private flushTimer: NodeJS.Timeout | null; private readonly callback: (flushId: string, batch: T[]) => Promise; + + // New properties for dynamic scaling + private readonly minConcurrency: number; + private readonly maxConcurrency: number; + private readonly maxBatchSize: number; + private readonly memoryPressureThreshold: number; + private limiter: ReturnType; + private currentBatchSize: number; + private totalQueuedItems: number = 0; + private consecutiveFlushFailures: number = 0; + private lastFlushTime: number = Date.now(); + private metrics = { + flushedBatches: 0, + failedBatches: 0, + totalItemsFlushed: 0, + }; constructor(config: DynamicFlushSchedulerConfig) { this.batchQueue = []; this.currentBatch = []; this.BATCH_SIZE = config.batchSize; + this.currentBatchSize = config.batchSize; this.FLUSH_INTERVAL = config.flushInterval; this.callback = config.callback; this.flushTimer = null; + + // Initialize dynamic scaling parameters + this.minConcurrency = config.minConcurrency ?? 1; + this.maxConcurrency = config.maxConcurrency ?? 10; + this.maxBatchSize = config.maxBatchSize ?? config.batchSize * 5; + this.memoryPressureThreshold = config.memoryPressureThreshold ?? config.batchSize * 20; + + // Start with minimum concurrency + this.limiter = pLimit(this.minConcurrency); + this.startFlushTimer(); + this.startMetricsReporter(); } addToBatch(items: T[]): void { this.currentBatch.push(...items); + this.totalQueuedItems += items.length; - if (this.currentBatch.length >= this.BATCH_SIZE) { - this.batchQueue.push(this.currentBatch); - this.currentBatch = []; - this.flushNextBatch(); - this.resetFlushTimer(); + // Check if we need to create a batch + if (this.currentBatch.length >= this.currentBatchSize) { + this.createBatch(); } + + // Adjust concurrency based on queue pressure + this.adjustConcurrency(); + } + + private createBatch(): void { + if (this.currentBatch.length === 0) return; + + this.batchQueue.push(this.currentBatch); + this.currentBatch = []; + this.flushBatches(); + this.resetFlushTimer(); } private startFlushTimer(): void { @@ -48,23 +94,169 @@ export class DynamicFlushScheduler { private checkAndFlush(): void { if (this.currentBatch.length > 0) { - this.batchQueue.push(this.currentBatch); - this.currentBatch = []; + this.createBatch(); } - this.flushNextBatch(); + this.flushBatches(); } - private async flushNextBatch(): Promise { - if (this.batchQueue.length === 0) return; + private async flushBatches(): Promise { + const batchesToFlush: T[][] = []; + + // Dequeue all available batches up to current concurrency limit + while (this.batchQueue.length > 0 && batchesToFlush.length < this.limiter.concurrency) { + const batch = this.batchQueue.shift(); + if (batch) { + batchesToFlush.push(batch); + } + } + + if (batchesToFlush.length === 0) return; - const batchToFlush = this.batchQueue.shift(); - try { - await this.callback(nanoid(), batchToFlush!); + // Schedule all batches for concurrent processing + const flushPromises = batchesToFlush.map((batch) => + this.limiter(async () => { + const flushId = nanoid(); + const itemCount = batch.length; + + try { + const startTime = Date.now(); + await this.callback(flushId, batch); + + const duration = Date.now() - startTime; + this.totalQueuedItems -= itemCount; + this.consecutiveFlushFailures = 0; + this.lastFlushTime = Date.now(); + this.metrics.flushedBatches++; + this.metrics.totalItemsFlushed += itemCount; + + logger.debug("Batch flushed successfully", { + flushId, + itemCount, + duration, + remainingQueueDepth: this.totalQueuedItems, + activeConcurrency: this.limiter.activeCount, + pendingConcurrency: this.limiter.pendingCount, + }); + } catch (error) { + this.consecutiveFlushFailures++; + this.metrics.failedBatches++; + + logger.error("Error flushing batch", { + flushId, + itemCount, + error, + consecutiveFailures: this.consecutiveFlushFailures, + }); + + // Re-queue the batch at the front if it fails + this.batchQueue.unshift(batch); + this.totalQueuedItems += itemCount; + + // Back off on failures + if (this.consecutiveFlushFailures > 3) { + this.adjustConcurrency(true); + } + } + }) + ); + + // Don't await here - let them run concurrently + Promise.allSettled(flushPromises).then(() => { + // After flush completes, check if we need to flush more if (this.batchQueue.length > 0) { - this.flushNextBatch(); + this.flushBatches(); } - } catch (error) { - console.error("Error inserting batch:", error); + }); + } + + private adjustConcurrency(backOff: boolean = false): void { + const currentConcurrency = this.limiter.concurrency; + let newConcurrency = currentConcurrency; + + if (backOff) { + // Reduce concurrency on failures + newConcurrency = Math.max(this.minConcurrency, Math.floor(currentConcurrency * 0.75)); + } else { + // Calculate pressure metrics + const queuePressure = this.totalQueuedItems / this.memoryPressureThreshold; + const timeSinceLastFlush = Date.now() - this.lastFlushTime; + + if (queuePressure > 0.8 || timeSinceLastFlush > this.FLUSH_INTERVAL * 2) { + // High pressure - increase concurrency + newConcurrency = Math.min(this.maxConcurrency, currentConcurrency + 2); + } else if (queuePressure < 0.2 && currentConcurrency > this.minConcurrency) { + // Low pressure - decrease concurrency + newConcurrency = Math.max(this.minConcurrency, currentConcurrency - 1); + } + } + + // Adjust batch size based on pressure + if (this.totalQueuedItems > this.memoryPressureThreshold) { + this.currentBatchSize = Math.min( + this.maxBatchSize, + Math.floor(this.BATCH_SIZE * (1 + queuePressure)) + ); + } else { + this.currentBatchSize = this.BATCH_SIZE; + } + + // Update concurrency if changed + if (newConcurrency !== currentConcurrency) { + this.limiter = pLimit(newConcurrency); + + logger.info("Adjusted flush concurrency", { + previousConcurrency: currentConcurrency, + newConcurrency, + queuePressure, + totalQueuedItems: this.totalQueuedItems, + currentBatchSize: this.currentBatchSize, + }); + } + } + + private startMetricsReporter(): void { + // Report metrics every 30 seconds + setInterval(() => { + logger.info("DynamicFlushScheduler metrics", { + totalQueuedItems: this.totalQueuedItems, + batchQueueLength: this.batchQueue.length, + currentBatchLength: this.currentBatch.length, + currentConcurrency: this.limiter.concurrency, + activeConcurrent: this.limiter.activeCount, + pendingConcurrent: this.limiter.pendingCount, + currentBatchSize: this.currentBatchSize, + metrics: this.metrics, + }); + }, 30000); + } + + // Method to get current status + getStatus() { + return { + queuedItems: this.totalQueuedItems, + batchQueueLength: this.batchQueue.length, + currentBatchSize: this.currentBatch.length, + concurrency: this.limiter.concurrency, + activeFlushes: this.limiter.activeCount, + pendingFlushes: this.limiter.pendingCount, + metrics: { ...this.metrics }, + }; + } + + // Graceful shutdown + async shutdown(): Promise { + if (this.flushTimer) { + clearInterval(this.flushTimer); + } + + // Flush any remaining items + if (this.currentBatch.length > 0) { + this.createBatch(); + } + + // Wait for all pending flushes to complete + while (this.batchQueue.length > 0 || this.limiter.activeCount > 0) { + await new Promise((resolve) => setTimeout(resolve, 100)); } } -} +} \ No newline at end of file diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index cfca700209..787a6ce693 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -107,6 +107,10 @@ export type EventRepoConfig = { retentionInDays: number; partitioningEnabled: boolean; tracer?: Tracer; + minConcurrency?: number; + maxConcurrency?: number; + maxBatchSize?: number; + memoryPressureThreshold?: number; }; export type QueryOptions = Prisma.TaskEventWhereInput; @@ -199,6 +203,10 @@ export class EventRepository { return this._subscriberCount; } + get flushSchedulerStatus() { + return this._flushScheduler.getStatus(); + } + constructor( db: PrismaClient = prisma, readReplica: PrismaReplicaClient = $replica, @@ -208,6 +216,10 @@ export class EventRepository { batchSize: _config.batchSize, flushInterval: _config.batchInterval, callback: this.#flushBatch.bind(this), + minConcurrency: _config.minConcurrency, + maxConcurrency: _config.maxConcurrency, + maxBatchSize: _config.maxBatchSize, + memoryPressureThreshold: _config.memoryPressureThreshold, }); this._redisPublishClient = createRedisClient("trigger:eventRepoPublisher", this._config.redis); @@ -1324,6 +1336,10 @@ function initializeEventRepo() { batchInterval: env.EVENTS_BATCH_INTERVAL, retentionInDays: env.EVENTS_DEFAULT_LOG_RETENTION, partitioningEnabled: env.TASK_EVENT_PARTITIONING_ENABLED === "1", + minConcurrency: env.EVENTS_MIN_CONCURRENCY, + maxConcurrency: env.EVENTS_MAX_CONCURRENCY, + maxBatchSize: env.EVENTS_MAX_BATCH_SIZE, + memoryPressureThreshold: env.EVENTS_MEMORY_PRESSURE_THRESHOLD, redis: { port: env.PUBSUB_REDIS_PORT, host: env.PUBSUB_REDIS_HOST, @@ -1343,6 +1359,47 @@ function initializeEventRepo() { registers: [metricsRegister], }); + // Add metrics for flush scheduler + new Gauge({ + name: "event_flush_scheduler_queued_items", + help: "Total number of items queued in the flush scheduler", + collect() { + const status = repo.flushSchedulerStatus; + this.set(status.queuedItems); + }, + registers: [metricsRegister], + }); + + new Gauge({ + name: "event_flush_scheduler_batch_queue_length", + help: "Number of batches waiting to be flushed", + collect() { + const status = repo.flushSchedulerStatus; + this.set(status.batchQueueLength); + }, + registers: [metricsRegister], + }); + + new Gauge({ + name: "event_flush_scheduler_concurrency", + help: "Current concurrency level of the flush scheduler", + collect() { + const status = repo.flushSchedulerStatus; + this.set(status.concurrency); + }, + registers: [metricsRegister], + }); + + new Gauge({ + name: "event_flush_scheduler_active_flushes", + help: "Number of active flush operations", + collect() { + const status = repo.flushSchedulerStatus; + this.set(status.activeFlushes); + }, + registers: [metricsRegister], + }); + return repo; } From 7c09b6e2f799de452d7f4ff23bf738e1680de82a Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 26 Jul 2025 09:46:12 +0100 Subject: [PATCH 2/5] Implement load shedding for TaskEvent records This change introduces load shedding mechanisms to manage TaskEvent records, particularly those of kind LOG, when the system experiences high volumes and is unable to flush to the database in a timely manner. The addition aims to prevent overwhelming the system and ensure critical tasks are prioritized. - Added configuration options for `loadSheddingThreshold` and `loadSheddingEnabled` in multiple modules to activate load shedding. - Introduced `isDroppableEvent` function to allow specific events to be dropped when load shedding is enabled. - Ensured metrics are updated to reflect dropped events and load shedding status, providing visibility into system performance during high load conditions. - Updated loggers to inform about load shedding state changes, ensuring timely awareness of load management activities. --- apps/webapp/app/env.server.ts | 2 + .../app/v3/dynamicFlushScheduler.server.ts | 108 +++++++++++++++++- apps/webapp/app/v3/eventRepository.server.ts | 30 +++++ 3 files changed, 136 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 0a56ab5fc4..1a879d3e44 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -256,6 +256,8 @@ const EnvironmentSchema = z.object({ EVENTS_MAX_CONCURRENCY: z.coerce.number().int().default(10), EVENTS_MAX_BATCH_SIZE: z.coerce.number().int().default(500), EVENTS_MEMORY_PRESSURE_THRESHOLD: z.coerce.number().int().default(2000), + EVENTS_LOAD_SHEDDING_THRESHOLD: z.coerce.number().int().default(5000), + EVENTS_LOAD_SHEDDING_ENABLED: z.coerce.boolean().default(true), SHARED_QUEUE_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10), SHARED_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(100), SHARED_QUEUE_CONSUMER_NEXT_TICK_INTERVAL_MS: z.coerce.number().int().default(100), diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index 80f4dafc12..fec926186b 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -1,6 +1,7 @@ import { nanoid } from "nanoid"; import pLimit from "p-limit"; import { logger } from "~/services/logger.server"; +import { TaskEventKind } from "@trigger.dev/database"; export type DynamicFlushSchedulerConfig = { batchSize: number; @@ -11,6 +12,9 @@ export type DynamicFlushSchedulerConfig = { maxConcurrency?: number; maxBatchSize?: number; memoryPressureThreshold?: number; // Number of items that triggers increased concurrency + loadSheddingThreshold?: number; // Number of items that triggers load shedding + loadSheddingEnabled?: boolean; + isDroppableEvent?: (item: T) => boolean; // Function to determine if an event can be dropped }; export class DynamicFlushScheduler { @@ -35,8 +39,16 @@ export class DynamicFlushScheduler { flushedBatches: 0, failedBatches: 0, totalItemsFlushed: 0, + droppedEvents: 0, + droppedEventsByKind: new Map(), }; + // New properties for load shedding + private readonly loadSheddingThreshold: number; + private readonly loadSheddingEnabled: boolean; + private readonly isDroppableEvent?: (item: T) => boolean; + private isLoadShedding: boolean = false; + constructor(config: DynamicFlushSchedulerConfig) { this.batchQueue = []; this.currentBatch = []; @@ -52,6 +64,11 @@ export class DynamicFlushScheduler { this.maxBatchSize = config.maxBatchSize ?? config.batchSize * 5; this.memoryPressureThreshold = config.memoryPressureThreshold ?? config.batchSize * 20; + // Initialize load shedding parameters + this.loadSheddingThreshold = config.loadSheddingThreshold ?? config.batchSize * 50; + this.loadSheddingEnabled = config.loadSheddingEnabled ?? true; + this.isDroppableEvent = config.isDroppableEvent; + // Start with minimum concurrency this.limiter = pLimit(this.minConcurrency); @@ -60,8 +77,45 @@ export class DynamicFlushScheduler { } addToBatch(items: T[]): void { - this.currentBatch.push(...items); - this.totalQueuedItems += items.length; + let itemsToAdd = items; + + // Apply load shedding if enabled and we're over the threshold + if (this.loadSheddingEnabled && this.totalQueuedItems >= this.loadSheddingThreshold) { + const { kept, dropped } = this.applyLoadShedding(items); + itemsToAdd = kept; + + if (dropped.length > 0) { + this.metrics.droppedEvents += dropped.length; + + // Track dropped events by kind if possible + dropped.forEach((item) => { + const kind = this.getEventKind(item); + if (kind) { + const currentCount = this.metrics.droppedEventsByKind.get(kind) || 0; + this.metrics.droppedEventsByKind.set(kind, currentCount + 1); + } + }); + + if (!this.isLoadShedding) { + this.isLoadShedding = true; + logger.warn("Load shedding activated", { + totalQueuedItems: this.totalQueuedItems, + threshold: this.loadSheddingThreshold, + droppedCount: dropped.length, + }); + } + } + } else if (this.isLoadShedding && this.totalQueuedItems < this.loadSheddingThreshold * 0.8) { + this.isLoadShedding = false; + logger.info("Load shedding deactivated", { + totalQueuedItems: this.totalQueuedItems, + threshold: this.loadSheddingThreshold, + totalDropped: this.metrics.droppedEvents, + }); + } + + this.currentBatch.push(...itemsToAdd); + this.totalQueuedItems += itemsToAdd.length; // Check if we need to create a batch if (this.currentBatch.length >= this.currentBatchSize) { @@ -217,6 +271,11 @@ export class DynamicFlushScheduler { private startMetricsReporter(): void { // Report metrics every 30 seconds setInterval(() => { + const droppedByKind: Record = {}; + this.metrics.droppedEventsByKind.forEach((count, kind) => { + droppedByKind[kind] = count; + }); + logger.info("DynamicFlushScheduler metrics", { totalQueuedItems: this.totalQueuedItems, batchQueueLength: this.batchQueue.length, @@ -225,13 +284,50 @@ export class DynamicFlushScheduler { activeConcurrent: this.limiter.activeCount, pendingConcurrent: this.limiter.pendingCount, currentBatchSize: this.currentBatchSize, - metrics: this.metrics, + isLoadShedding: this.isLoadShedding, + metrics: { + ...this.metrics, + droppedEventsByKind, + }, }); }, 30000); } + private applyLoadShedding(items: T[]): { kept: T[]; dropped: T[] } { + if (!this.isDroppableEvent) { + // If no function provided to determine droppable events, keep all + return { kept: items, dropped: [] }; + } + + const kept: T[] = []; + const dropped: T[] = []; + + for (const item of items) { + if (this.isDroppableEvent(item)) { + dropped.push(item); + } else { + kept.push(item); + } + } + + return { kept, dropped }; + } + + private getEventKind(item: T): string | undefined { + // Try to extract the kind from the event if it has one + if (item && typeof item === 'object' && 'kind' in item) { + return String(item.kind); + } + return undefined; + } + // Method to get current status getStatus() { + const droppedByKind: Record = {}; + this.metrics.droppedEventsByKind.forEach((count, kind) => { + droppedByKind[kind] = count; + }); + return { queuedItems: this.totalQueuedItems, batchQueueLength: this.batchQueue.length, @@ -239,7 +335,11 @@ export class DynamicFlushScheduler { concurrency: this.limiter.concurrency, activeFlushes: this.limiter.activeCount, pendingFlushes: this.limiter.pendingCount, - metrics: { ...this.metrics }, + isLoadShedding: this.isLoadShedding, + metrics: { + ...this.metrics, + droppedEventsByKind: droppedByKind, + }, }; } diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index 787a6ce693..567fdd9b7c 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -111,6 +111,8 @@ export type EventRepoConfig = { maxConcurrency?: number; maxBatchSize?: number; memoryPressureThreshold?: number; + loadSheddingThreshold?: number; + loadSheddingEnabled?: boolean; }; export type QueryOptions = Prisma.TaskEventWhereInput; @@ -220,6 +222,12 @@ export class EventRepository { maxConcurrency: _config.maxConcurrency, maxBatchSize: _config.maxBatchSize, memoryPressureThreshold: _config.memoryPressureThreshold, + loadSheddingThreshold: _config.loadSheddingThreshold, + loadSheddingEnabled: _config.loadSheddingEnabled, + isDroppableEvent: (event: CreatableEvent) => { + // Only drop LOG events during load shedding + return event.kind === TaskEventKind.LOG; + }, }); this._redisPublishClient = createRedisClient("trigger:eventRepoPublisher", this._config.redis); @@ -1340,6 +1348,8 @@ function initializeEventRepo() { maxConcurrency: env.EVENTS_MAX_CONCURRENCY, maxBatchSize: env.EVENTS_MAX_BATCH_SIZE, memoryPressureThreshold: env.EVENTS_MEMORY_PRESSURE_THRESHOLD, + loadSheddingThreshold: env.EVENTS_LOAD_SHEDDING_THRESHOLD, + loadSheddingEnabled: env.EVENTS_LOAD_SHEDDING_ENABLED, redis: { port: env.PUBSUB_REDIS_PORT, host: env.PUBSUB_REDIS_HOST, @@ -1400,6 +1410,26 @@ function initializeEventRepo() { registers: [metricsRegister], }); + new Gauge({ + name: "event_flush_scheduler_dropped_events", + help: "Total number of events dropped due to load shedding", + collect() { + const status = repo.flushSchedulerStatus; + this.set(status.metrics.droppedEvents); + }, + registers: [metricsRegister], + }); + + new Gauge({ + name: "event_flush_scheduler_is_load_shedding", + help: "Whether load shedding is currently active (1 = active, 0 = inactive)", + collect() { + const status = repo.flushSchedulerStatus; + this.set(status.isLoadShedding ? 1 : 0); + }, + registers: [metricsRegister], + }); + return repo; } From 6ee64ed1b3593719dbceb6c33d942428a28b5299 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 26 Jul 2025 09:51:27 +0100 Subject: [PATCH 3/5] Fix undefined 'queuePressure' variable in DynamicFlushScheduler The 'queuePressure' variable was being used without being defined in the DynamicFlushScheduler class, causing potential runtime errors. This commit adds the missing definition and ensures that the variable is correctly calculated based on the 'totalQueuedItems' and 'memoryPressureThreshold'. - Addressed code inconsistencies and improved formatting. - Defined 'queuePressure' in the 'adjustConcurrency' method to prevent potential undefined errors. - Enhanced readability by maintaining consistent spacing and format across the file, contributing to the stability and maintainability of the code. - Adjusted batch size logic based on the newly defined 'queuePressure' variable. --- .../app/v3/dynamicFlushScheduler.server.ts | 80 +++++++++---------- 1 file changed, 40 insertions(+), 40 deletions(-) diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index fec926186b..8f229e098a 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -18,13 +18,13 @@ export type DynamicFlushSchedulerConfig = { }; export class DynamicFlushScheduler { - private batchQueue: T[][]; - private currentBatch: T[]; + private batchQueue: T[][]; + private currentBatch: T[]; private readonly BATCH_SIZE: number; private readonly FLUSH_INTERVAL: number; private flushTimer: NodeJS.Timeout | null; private readonly callback: (flushId: string, batch: T[]) => Promise; - + // New properties for dynamic scaling private readonly minConcurrency: number; private readonly maxConcurrency: number; @@ -57,36 +57,36 @@ export class DynamicFlushScheduler { this.FLUSH_INTERVAL = config.flushInterval; this.callback = config.callback; this.flushTimer = null; - + // Initialize dynamic scaling parameters this.minConcurrency = config.minConcurrency ?? 1; this.maxConcurrency = config.maxConcurrency ?? 10; this.maxBatchSize = config.maxBatchSize ?? config.batchSize * 5; this.memoryPressureThreshold = config.memoryPressureThreshold ?? config.batchSize * 20; - + // Initialize load shedding parameters this.loadSheddingThreshold = config.loadSheddingThreshold ?? config.batchSize * 50; this.loadSheddingEnabled = config.loadSheddingEnabled ?? true; this.isDroppableEvent = config.isDroppableEvent; - + // Start with minimum concurrency this.limiter = pLimit(this.minConcurrency); - + this.startFlushTimer(); this.startMetricsReporter(); } addToBatch(items: T[]): void { let itemsToAdd = items; - + // Apply load shedding if enabled and we're over the threshold if (this.loadSheddingEnabled && this.totalQueuedItems >= this.loadSheddingThreshold) { const { kept, dropped } = this.applyLoadShedding(items); itemsToAdd = kept; - + if (dropped.length > 0) { this.metrics.droppedEvents += dropped.length; - + // Track dropped events by kind if possible dropped.forEach((item) => { const kind = this.getEventKind(item); @@ -95,7 +95,7 @@ export class DynamicFlushScheduler { this.metrics.droppedEventsByKind.set(kind, currentCount + 1); } }); - + if (!this.isLoadShedding) { this.isLoadShedding = true; logger.warn("Load shedding activated", { @@ -113,7 +113,7 @@ export class DynamicFlushScheduler { totalDropped: this.metrics.droppedEvents, }); } - + this.currentBatch.push(...itemsToAdd); this.totalQueuedItems += itemsToAdd.length; @@ -121,14 +121,14 @@ export class DynamicFlushScheduler { if (this.currentBatch.length >= this.currentBatchSize) { this.createBatch(); } - + // Adjust concurrency based on queue pressure this.adjustConcurrency(); } private createBatch(): void { if (this.currentBatch.length === 0) return; - + this.batchQueue.push(this.currentBatch); this.currentBatch = []; this.flushBatches(); @@ -155,7 +155,7 @@ export class DynamicFlushScheduler { private async flushBatches(): Promise { const batchesToFlush: T[][] = []; - + // Dequeue all available batches up to current concurrency limit while (this.batchQueue.length > 0 && batchesToFlush.length < this.limiter.concurrency) { const batch = this.batchQueue.shift(); @@ -163,7 +163,7 @@ export class DynamicFlushScheduler { batchesToFlush.push(batch); } } - + if (batchesToFlush.length === 0) return; // Schedule all batches for concurrent processing @@ -171,18 +171,18 @@ export class DynamicFlushScheduler { this.limiter(async () => { const flushId = nanoid(); const itemCount = batch.length; - + try { const startTime = Date.now(); await this.callback(flushId, batch); - + const duration = Date.now() - startTime; this.totalQueuedItems -= itemCount; this.consecutiveFlushFailures = 0; this.lastFlushTime = Date.now(); this.metrics.flushedBatches++; this.metrics.totalItemsFlushed += itemCount; - + logger.debug("Batch flushed successfully", { flushId, itemCount, @@ -194,18 +194,18 @@ export class DynamicFlushScheduler { } catch (error) { this.consecutiveFlushFailures++; this.metrics.failedBatches++; - + logger.error("Error flushing batch", { flushId, itemCount, error, consecutiveFailures: this.consecutiveFlushFailures, }); - + // Re-queue the batch at the front if it fails this.batchQueue.unshift(batch); this.totalQueuedItems += itemCount; - + // Back off on failures if (this.consecutiveFlushFailures > 3) { this.adjustConcurrency(true); @@ -213,7 +213,7 @@ export class DynamicFlushScheduler { } }) ); - + // Don't await here - let them run concurrently Promise.allSettled(flushPromises).then(() => { // After flush completes, check if we need to flush more @@ -226,7 +226,7 @@ export class DynamicFlushScheduler { private adjustConcurrency(backOff: boolean = false): void { const currentConcurrency = this.limiter.concurrency; let newConcurrency = currentConcurrency; - + if (backOff) { // Reduce concurrency on failures newConcurrency = Math.max(this.minConcurrency, Math.floor(currentConcurrency * 0.75)); @@ -234,7 +234,7 @@ export class DynamicFlushScheduler { // Calculate pressure metrics const queuePressure = this.totalQueuedItems / this.memoryPressureThreshold; const timeSinceLastFlush = Date.now() - this.lastFlushTime; - + if (queuePressure > 0.8 || timeSinceLastFlush > this.FLUSH_INTERVAL * 2) { // High pressure - increase concurrency newConcurrency = Math.min(this.maxConcurrency, currentConcurrency + 2); @@ -243,7 +243,7 @@ export class DynamicFlushScheduler { newConcurrency = Math.max(this.minConcurrency, currentConcurrency - 1); } } - + // Adjust batch size based on pressure if (this.totalQueuedItems > this.memoryPressureThreshold) { this.currentBatchSize = Math.min( @@ -253,11 +253,11 @@ export class DynamicFlushScheduler { } else { this.currentBatchSize = this.BATCH_SIZE; } - + // Update concurrency if changed if (newConcurrency !== currentConcurrency) { this.limiter = pLimit(newConcurrency); - + logger.info("Adjusted flush concurrency", { previousConcurrency: currentConcurrency, newConcurrency, @@ -267,7 +267,7 @@ export class DynamicFlushScheduler { }); } } - + private startMetricsReporter(): void { // Report metrics every 30 seconds setInterval(() => { @@ -275,7 +275,7 @@ export class DynamicFlushScheduler { this.metrics.droppedEventsByKind.forEach((count, kind) => { droppedByKind[kind] = count; }); - + logger.info("DynamicFlushScheduler metrics", { totalQueuedItems: this.totalQueuedItems, batchQueueLength: this.batchQueue.length, @@ -287,7 +287,7 @@ export class DynamicFlushScheduler { isLoadShedding: this.isLoadShedding, metrics: { ...this.metrics, - droppedEventsByKind, + droppedByKind, }, }); }, 30000); @@ -298,10 +298,10 @@ export class DynamicFlushScheduler { // If no function provided to determine droppable events, keep all return { kept: items, dropped: [] }; } - + const kept: T[] = []; const dropped: T[] = []; - + for (const item of items) { if (this.isDroppableEvent(item)) { dropped.push(item); @@ -309,13 +309,13 @@ export class DynamicFlushScheduler { kept.push(item); } } - + return { kept, dropped }; } - + private getEventKind(item: T): string | undefined { // Try to extract the kind from the event if it has one - if (item && typeof item === 'object' && 'kind' in item) { + if (item && typeof item === "object" && "kind" in item) { return String(item.kind); } return undefined; @@ -327,7 +327,7 @@ export class DynamicFlushScheduler { this.metrics.droppedEventsByKind.forEach((count, kind) => { droppedByKind[kind] = count; }); - + return { queuedItems: this.totalQueuedItems, batchQueueLength: this.batchQueue.length, @@ -336,7 +336,7 @@ export class DynamicFlushScheduler { activeFlushes: this.limiter.activeCount, pendingFlushes: this.limiter.pendingCount, isLoadShedding: this.isLoadShedding, - metrics: { + metrics: { ...this.metrics, droppedEventsByKind: droppedByKind, }, @@ -348,12 +348,12 @@ export class DynamicFlushScheduler { if (this.flushTimer) { clearInterval(this.flushTimer); } - + // Flush any remaining items if (this.currentBatch.length > 0) { this.createBatch(); } - + // Wait for all pending flushes to complete while (this.batchQueue.length > 0 || this.limiter.activeCount > 0) { await new Promise((resolve) => setTimeout(resolve, 100)); From bad04db501d1622a0c3675ce0c0e6565b97b3aca Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 26 Jul 2025 09:52:33 +0100 Subject: [PATCH 4/5] Refactor concurrency adjustment logic in scheduler The concurrency adjustment logic in the dynamic flush scheduler has been refactored to improve clarity and maintainability. This change moves the calculation of pressure metrics outside of the conditional blocks to ensure they are always determined prior to decision-making. - The queue pressure and time since last flush calculations were moved up in the code to be independent of the 'backOff' condition. - This refactor sets up the groundwork for more reliable concurrency scaling and better performance monitoring capabilities. The overall logic of adjusting concurrency based on system pressure metrics remains unchanged. This adjustment addresses ongoing issues with the scheduler that were not resolved by previous changes. --- apps/webapp/app/v3/dynamicFlushScheduler.server.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index 8f229e098a..e3571d5302 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -226,15 +226,15 @@ export class DynamicFlushScheduler { private adjustConcurrency(backOff: boolean = false): void { const currentConcurrency = this.limiter.concurrency; let newConcurrency = currentConcurrency; + + // Calculate pressure metrics - moved outside the if/else block + const queuePressure = this.totalQueuedItems / this.memoryPressureThreshold; + const timeSinceLastFlush = Date.now() - this.lastFlushTime; if (backOff) { // Reduce concurrency on failures newConcurrency = Math.max(this.minConcurrency, Math.floor(currentConcurrency * 0.75)); } else { - // Calculate pressure metrics - const queuePressure = this.totalQueuedItems / this.memoryPressureThreshold; - const timeSinceLastFlush = Date.now() - this.lastFlushTime; - if (queuePressure > 0.8 || timeSinceLastFlush > this.FLUSH_INTERVAL * 2) { // High pressure - increase concurrency newConcurrency = Math.min(this.maxConcurrency, currentConcurrency + 2); From b6d35cbbd57df1a51ebe204012b3332d36c0deb3 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 26 Jul 2025 10:23:03 +0100 Subject: [PATCH 5/5] Some tweaks --- apps/webapp/app/env.server.ts | 6 +- .../app/v3/dynamicFlushScheduler.server.ts | 37 +++++++---- apps/webapp/app/v3/eventRepository.server.ts | 2 +- references/hello-world/src/trigger/example.ts | 65 +++++++++++++++++++ 4 files changed, 93 insertions(+), 17 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 1a879d3e44..eba67df32d 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -255,9 +255,9 @@ const EnvironmentSchema = z.object({ EVENTS_MIN_CONCURRENCY: z.coerce.number().int().default(1), EVENTS_MAX_CONCURRENCY: z.coerce.number().int().default(10), EVENTS_MAX_BATCH_SIZE: z.coerce.number().int().default(500), - EVENTS_MEMORY_PRESSURE_THRESHOLD: z.coerce.number().int().default(2000), - EVENTS_LOAD_SHEDDING_THRESHOLD: z.coerce.number().int().default(5000), - EVENTS_LOAD_SHEDDING_ENABLED: z.coerce.boolean().default(true), + EVENTS_MEMORY_PRESSURE_THRESHOLD: z.coerce.number().int().default(5000), + EVENTS_LOAD_SHEDDING_THRESHOLD: z.coerce.number().int().default(100000), + EVENTS_LOAD_SHEDDING_ENABLED: z.string().default("1"), SHARED_QUEUE_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10), SHARED_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(100), SHARED_QUEUE_CONSUMER_NEXT_TICK_INTERVAL_MS: z.coerce.number().int().default(100), diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index e3571d5302..88e6a10248 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -1,7 +1,6 @@ +import { Logger } from "@trigger.dev/core/logger"; import { nanoid } from "nanoid"; import pLimit from "p-limit"; -import { logger } from "~/services/logger.server"; -import { TaskEventKind } from "@trigger.dev/database"; export type DynamicFlushSchedulerConfig = { batchSize: number; @@ -49,6 +48,8 @@ export class DynamicFlushScheduler { private readonly isDroppableEvent?: (item: T) => boolean; private isLoadShedding: boolean = false; + private readonly logger: Logger = new Logger("EventRepo.DynamicFlushScheduler", "debug"); + constructor(config: DynamicFlushSchedulerConfig) { this.batchQueue = []; this.currentBatch = []; @@ -98,16 +99,17 @@ export class DynamicFlushScheduler { if (!this.isLoadShedding) { this.isLoadShedding = true; - logger.warn("Load shedding activated", { - totalQueuedItems: this.totalQueuedItems, - threshold: this.loadSheddingThreshold, - droppedCount: dropped.length, - }); } + + this.logger.warn("Load shedding", { + totalQueuedItems: this.totalQueuedItems, + threshold: this.loadSheddingThreshold, + droppedCount: dropped.length, + }); } } else if (this.isLoadShedding && this.totalQueuedItems < this.loadSheddingThreshold * 0.8) { this.isLoadShedding = false; - logger.info("Load shedding deactivated", { + this.logger.info("Load shedding deactivated", { totalQueuedItems: this.totalQueuedItems, threshold: this.loadSheddingThreshold, totalDropped: this.metrics.droppedEvents, @@ -183,7 +185,7 @@ export class DynamicFlushScheduler { this.metrics.flushedBatches++; this.metrics.totalItemsFlushed += itemCount; - logger.debug("Batch flushed successfully", { + this.logger.debug("Batch flushed successfully", { flushId, itemCount, duration, @@ -195,7 +197,7 @@ export class DynamicFlushScheduler { this.consecutiveFlushFailures++; this.metrics.failedBatches++; - logger.error("Error flushing batch", { + this.logger.error("Error flushing batch", { flushId, itemCount, error, @@ -223,13 +225,21 @@ export class DynamicFlushScheduler { }); } + private lastConcurrencyAdjustment: number = Date.now(); + private adjustConcurrency(backOff: boolean = false): void { const currentConcurrency = this.limiter.concurrency; let newConcurrency = currentConcurrency; - + // Calculate pressure metrics - moved outside the if/else block const queuePressure = this.totalQueuedItems / this.memoryPressureThreshold; const timeSinceLastFlush = Date.now() - this.lastFlushTime; + const timeSinceLastAdjustment = Date.now() - this.lastConcurrencyAdjustment; + + // Don't adjust too frequently (except for backoff) + if (!backOff && timeSinceLastAdjustment < 1000) { + return; + } if (backOff) { // Reduce concurrency on failures @@ -258,12 +268,13 @@ export class DynamicFlushScheduler { if (newConcurrency !== currentConcurrency) { this.limiter = pLimit(newConcurrency); - logger.info("Adjusted flush concurrency", { + this.logger.info("Adjusted flush concurrency", { previousConcurrency: currentConcurrency, newConcurrency, queuePressure, totalQueuedItems: this.totalQueuedItems, currentBatchSize: this.currentBatchSize, + memoryPressureThreshold: this.memoryPressureThreshold, }); } } @@ -276,7 +287,7 @@ export class DynamicFlushScheduler { droppedByKind[kind] = count; }); - logger.info("DynamicFlushScheduler metrics", { + this.logger.info("DynamicFlushScheduler metrics", { totalQueuedItems: this.totalQueuedItems, batchQueueLength: this.batchQueue.length, currentBatchLength: this.currentBatch.length, diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index 567fdd9b7c..1fb88e969f 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -1349,7 +1349,7 @@ function initializeEventRepo() { maxBatchSize: env.EVENTS_MAX_BATCH_SIZE, memoryPressureThreshold: env.EVENTS_MEMORY_PRESSURE_THRESHOLD, loadSheddingThreshold: env.EVENTS_LOAD_SHEDDING_THRESHOLD, - loadSheddingEnabled: env.EVENTS_LOAD_SHEDDING_ENABLED, + loadSheddingEnabled: env.EVENTS_LOAD_SHEDDING_ENABLED === "1", redis: { port: env.PUBSUB_REDIS_PORT, host: env.PUBSUB_REDIS_HOST, diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index 6d1622df28..8759953d28 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -373,3 +373,68 @@ export const largeAttributesTask = task({ }); }, }); + +export const lotsOfLogsParentTask = task({ + id: "lots-of-logs-parent", + run: async (payload: { count: number }, { ctx }) => { + logger.info("Hello, world from the lots of logs parent task", { count: payload.count }); + await lotsOfLogsTask.batchTriggerAndWait( + Array.from({ length: 20 }, (_, i) => ({ + payload: { count: payload.count }, + })) + ); + }, +}); + +export const lotsOfLogsTask = task({ + id: "lots-of-logs", + run: async (payload: { count: number }, { ctx }) => { + logger.info("Hello, world from the lots of logs task", { count: payload.count }); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + + await setTimeout(1000); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + + await setTimeout(1000); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + + await setTimeout(1000); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + + await setTimeout(1000); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + + await setTimeout(1000); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + + await setTimeout(1000); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + + await setTimeout(1000); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + }, +});