Skip to content

fix: otel logs better DynamicFlushScheduler #2318

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 26, 2025

Conversation

ericallam
Copy link
Member

Description

This pull request enhances the dynamic flush scheduler with the following key changes:

  • Introduces configurable minimum and maximum concurrency levels, maximum batch size, and memory pressure thresholds for adaptive flush operations.
  • Implements concurrent flush handling using pLimit, dynamically adjusting concurrency based on queue length and memory pressure.
  • Adds load shedding for TaskEvent records (especially logs) to prevent system overload during high volumes, with configuration options to enable and control shedding.
  • Fixes an undefined queuePressure variable in the scheduler to prevent runtime errors and improve batch size logic.
  • Refactors concurrency adjustment logic for better clarity, maintainability, and reliable scaling.
  • Improves metrics reporting and logging to monitor scheduler behavior, load shedding, and system performance.

The changes introduce a more flexible and adaptive dynamic flush scheduler to address production issues where the system wasn't flushing data fast enough, causing memory growth and crashes. This issue arises from the existing scheduler handling only a single flush at a time, limiting concurrency and failing to cope with the influx of logs.

- Added configuration options for setting minimum and maximum concurrency levels, maximum batch size, and memory pressure threshold. These parameters ensure that flush operations adjust dynamically based on workload and pressure.
- Implemented `pLimit` to facilitate concurrent flush operations, with adjustments made according to batch queue length and memory pressure.
- Metrics reporting improvements were added to monitor the dynamic behavior of the flush scheduler, aiding in identifying performance issues and optimizing the operation accordingly.
This change introduces load shedding mechanisms to manage TaskEvent
records, particularly those of kind LOG, when the system experiences
high volumes and is unable to flush to the database in a timely
manner. The addition aims to prevent overwhelming the system and
ensure critical tasks are prioritized.

- Added configuration options for `loadSheddingThreshold` and
  `loadSheddingEnabled` in multiple modules to activate load shedding.
- Introduced `isDroppableEvent` function to allow specific events to
  be dropped when load shedding is enabled.
- Ensured metrics are updated to reflect dropped events and load
  shedding status, providing visibility into system performance
during high load conditions.
- Updated loggers to inform about load shedding state changes,
  ensuring timely awareness of load management activities.
The 'queuePressure' variable was being used without being defined
in the DynamicFlushScheduler class, causing potential runtime
errors. This commit adds the missing definition and ensures that
the variable is correctly calculated based on the 'totalQueuedItems'
and 'memoryPressureThreshold'.

- Addressed code inconsistencies and improved formatting.
- Defined 'queuePressure' in the 'adjustConcurrency' method
  to prevent potential undefined errors.
- Enhanced readability by maintaining consistent spacing and
  format across the file, contributing to the stability and
  maintainability of the code.
- Adjusted batch size logic based on the newly defined 'queuePressure'
  variable.
The concurrency adjustment logic in the dynamic flush scheduler has been refactored to improve clarity and maintainability. This change moves the calculation of pressure metrics outside of the conditional blocks to ensure they are always determined prior to decision-making.

- The queue pressure and time since last flush calculations were moved up in the code to be independent of the 'backOff' condition.
- This refactor sets up the groundwork for more reliable concurrency scaling and better performance monitoring capabilities. The overall logic of adjusting concurrency based on system pressure metrics remains unchanged.

This adjustment addresses ongoing issues with the scheduler that were not resolved by previous changes.
Copy link

changeset-bot bot commented Jul 26, 2025

⚠️ No Changeset found

Latest commit: b6d35cb

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Jul 26, 2025

Walkthrough

This change introduces new environment variable schema entries related to event processing concurrency, batch size, memory pressure, and load shedding controls. The DynamicFlushScheduler class is extensively enhanced to support dynamic concurrency scaling, load shedding, and detailed metrics reporting. The scheduler now manages concurrency and batch size adaptively, applies load shedding to droppable events, and exposes internal metrics. The EventRepository class and its configuration are updated to support these new parameters and expose scheduler status. Additional Prometheus metrics are registered to monitor scheduler internals, including concurrency, queue size, and load shedding activity.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Complexity: Numerous high-complexity, interrelated changes across several files, introducing new configuration options, adaptive concurrency logic, load shedding, and observability enhancements. Review requires careful attention to concurrency management, scheduler logic, and metric integration.

Note

⚡️ Unit Test Generation is now available in beta!

Learn more here, or try it out under "Finishing Touches" below.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch scheduler-enhancement-concurrent-flush

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
apps/webapp/app/v3/eventRepository.server.ts (1)

221-231: Consider making droppable event types configurable.

The implementation correctly passes configuration to the scheduler. However, the isDroppableEvent callback is hardcoded to only drop LOG events. Consider making this configurable through environment variables or configuration to allow flexibility in load shedding strategies.

apps/webapp/app/v3/dynamicFlushScheduler.server.ts (2)

62-75: Add validation for concurrency bounds.

While the initialization logic is sound, consider adding validation to ensure minConcurrency <= maxConcurrency to prevent configuration errors.

 // Initialize dynamic scaling parameters
 this.minConcurrency = config.minConcurrency ?? 1;
 this.maxConcurrency = config.maxConcurrency ?? 10;
+if (this.minConcurrency > this.maxConcurrency) {
+  throw new Error(`minConcurrency (${this.minConcurrency}) cannot be greater than maxConcurrency (${this.maxConcurrency})`);
+}
 this.maxBatchSize = config.maxBatchSize ?? config.batchSize * 5;

283-305: Consider making metrics reporting interval configurable.

The 30-second interval for metrics reporting is hardcoded. For high-volume systems, this could be too frequent, while for debugging it might not be frequent enough.

Consider adding a configuration option for the metrics reporting interval:

+  metricsReportingInterval?: number; // milliseconds
 
 // In constructor:
+const metricsInterval = config.metricsReportingInterval ?? 30000;
 
 // In startMetricsReporter:
-  }, 30000);
+  }, metricsInterval);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0d136a3 and b6d35cb.

⛔ Files ignored due to path filters (1)
  • references/hello-world/src/trigger/example.ts is excluded by !references/**
📒 Files selected for processing (3)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/v3/dynamicFlushScheduler.server.ts (2 hunks)
  • apps/webapp/app/v3/eventRepository.server.ts (5 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}

📄 CodeRabbit Inference Engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/dynamicFlushScheduler.server.ts
  • apps/webapp/app/v3/eventRepository.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit Inference Engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/dynamicFlushScheduler.server.ts
  • apps/webapp/app/v3/eventRepository.server.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit Inference Engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: In the webapp, all environment variables must be accessed through the env export of env.server.ts, instead of directly accessing process.env.
When importing from @trigger.dev/core in the webapp, never import from the root @trigger.dev/core path; always use one of the subpath exports as defined in the package's package.json.

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/dynamicFlushScheduler.server.ts
  • apps/webapp/app/v3/eventRepository.server.ts
🧠 Learnings (2)
apps/webapp/app/env.server.ts (3)

Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-07-18T17:49:47.180Z
Learning: Applies to apps/webapp/**/*.{ts,tsx} : In the webapp, all environment variables must be accessed through the env export of env.server.ts, instead of directly accessing process.env.

Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-07-18T17:50:25.014Z
Learning: Applies to trigger.config.ts : Global lifecycle hooks, telemetry, runtime, machine settings, log level, max duration, and build configuration must be set in trigger.config.ts as shown.

Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-07-18T17:49:47.180Z
Learning: Applies to apps/webapp/app/**/*.test.{ts,tsx} : Test files in the webapp should not import env.server.ts, either directly or indirectly. Tests should only import classes and functions from files matching app/**/*.ts of the webapp, and those files should not use environment variables directly; everything should be passed through as options instead.

apps/webapp/app/v3/eventRepository.server.ts (1)

Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-07-18T17:50:25.014Z
Learning: Applies to /trigger//*.{ts,tsx,js,jsx} : When using retry, queue, machine, or maxDuration options, configure them as shown in the examples for Trigger.dev tasks.

🧬 Code Graph Analysis (1)
apps/webapp/app/v3/eventRepository.server.ts (3)
apps/webapp/app/env.server.ts (1)
  • env (1030-1030)
internal-packages/tracing/src/index.ts (1)
  • Gauge (23-23)
apps/webapp/app/metrics.server.ts (1)
  • metricsRegister (5-5)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (10, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (9, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 10)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 10)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 10)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (11)
apps/webapp/app/env.server.ts (1)

255-260: LGTM! Environment variables follow existing patterns.

The new environment variables for event processing configuration are well-structured and consistent with the existing schema:

  • Proper use of coerce.number().int() for numeric values
  • Sensible defaults that allow gradual scaling
  • Consistent naming convention with EVENTS_ prefix
  • Boolean flag using string pattern matches other flags in the file
apps/webapp/app/v3/eventRepository.server.ts (4)

110-116: New configuration parameters properly integrated.

The optional parameters for concurrency control and load shedding are well-typed and follow TypeScript best practices.


208-211: Good addition of scheduler status getter.

The getter provides clean access to internal scheduler metrics for monitoring purposes.


1347-1352: Environment variables correctly integrated.

The new environment variables are properly read and passed to the EventRepository constructor.


1372-1432: Well-structured Prometheus metrics for scheduler monitoring.

The metrics follow the existing pattern and provide comprehensive visibility into the flush scheduler's behavior. The use of the collect() function to query current status is the correct approach for Gauge metrics.

apps/webapp/app/v3/dynamicFlushScheduler.server.ts (6)

1-3: LGTM! Good choice of concurrency library.

The addition of p-limit for managing concurrent operations is appropriate for this use case.


9-17: Well-structured configuration options.

The new configuration parameters are properly typed and documented. The optional parameters with sensible defaults provide flexibility while maintaining backward compatibility.


84-118: Excellent load shedding implementation.

The load shedding logic includes:

  • Proper threshold checking
  • Event tracking by kind for observability
  • Hysteresis to prevent oscillation (deactivates at 80% of threshold)
  • Clear logging of shedding events

158-226: Well-designed concurrent batch flushing.

The implementation correctly:

  • Dequeues batches up to the concurrency limit
  • Handles errors by re-queuing failed batches
  • Tracks consecutive failures for backoff
  • Uses Promise.allSettled for proper concurrent execution
  • Recursively checks for more batches after completion

307-334: Clean implementation of load shedding and event kind extraction.

The helper methods are well-implemented:

  • Load shedding respects the droppable event predicate
  • Safe type checking for event kind extraction
  • Proper separation of kept and dropped items

358-373: Robust graceful shutdown implementation.

The shutdown method properly:

  • Clears the flush timer
  • Flushes remaining items in the current batch
  • Waits for all pending operations to complete
  • Uses a polling approach to ensure completion

@matt-aitken matt-aitken merged commit a7ff6de into main Jul 26, 2025
33 checks passed
@matt-aitken matt-aitken deleted the scheduler-enhancement-concurrent-flush branch July 26, 2025 10:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants