diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 65a7059e23..b72857e04f 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -763,6 +763,45 @@ const EnvironmentSchema = z.object({ .default(process.env.REDIS_TLS_DISABLED ?? "false"), BATCH_TRIGGER_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), + ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), + ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), + ADMIN_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), + ADMIN_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50), + ADMIN_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(20), + ADMIN_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), + ADMIN_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), + + ADMIN_WORKER_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + ADMIN_WORKER_REDIS_READER_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_READER_HOST), + ADMIN_WORKER_REDIS_READER_PORT: z.coerce + .number() + .optional() + .transform( + (v) => + v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined) + ), + ADMIN_WORKER_REDIS_PORT: z.coerce + .number() + .optional() + .transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)), + ADMIN_WORKER_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + ADMIN_WORKER_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + ADMIN_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), + ADMIN_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + ALERTS_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), ALERTS_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), diff --git a/apps/webapp/app/routes/admin.api.v1.feature-flags.ts b/apps/webapp/app/routes/admin.api.v1.feature-flags.ts index cd7958c5b8..d0e1dd6b27 100644 --- a/apps/webapp/app/routes/admin.api.v1.feature-flags.ts +++ b/apps/webapp/app/routes/admin.api.v1.feature-flags.ts @@ -1,17 +1,7 @@ import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; import { prisma } from "~/db.server"; import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; -import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server"; -import { runsReplicationInstance } from "~/services/runsReplicationInstance.server"; -import { - makeSetFlags, - setFlags, - FeatureFlagCatalogSchema, - validateAllFeatureFlags, - validatePartialFeatureFlags, - makeSetMultipleFlags, -} from "~/v3/featureFlags.server"; -import { z } from "zod"; +import { makeSetMultipleFlags, validatePartialFeatureFlags } from "~/v3/featureFlags.server"; export async function action({ request }: ActionFunctionArgs) { // Next authenticate the request diff --git a/apps/webapp/app/routes/admin.api.v1.runs-replication.$batchId.backfill.ts b/apps/webapp/app/routes/admin.api.v1.runs-replication.$batchId.backfill.ts new file mode 100644 index 0000000000..105fcaa408 --- /dev/null +++ b/apps/webapp/app/routes/admin.api.v1.runs-replication.$batchId.backfill.ts @@ -0,0 +1,68 @@ +import { type ActionFunctionArgs, json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { prisma } from "~/db.server"; +import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { adminWorker } from "~/v3/services/adminWorker.server"; + +const Body = z.object({ + from: z.coerce.date(), + to: z.coerce.date(), + batchSize: z.number().optional(), + delayIntervalMs: z.number().optional(), +}); + +const Params = z.object({ + batchId: z.string(), +}); + +const DEFAULT_BATCH_SIZE = 500; +const DEFAULT_DELAY_INTERVAL_MS = 1000; + +export async function action({ request, params }: ActionFunctionArgs) { + // Next authenticate the request + const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); + + if (!authenticationResult) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + const user = await prisma.user.findUnique({ + where: { + id: authenticationResult.userId, + }, + }); + + if (!user) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + if (!user.admin) { + return json({ error: "You must be an admin to perform this action" }, { status: 403 }); + } + + const { batchId } = Params.parse(params); + + try { + const body = await request.json(); + + const { from, to, batchSize, delayIntervalMs } = Body.parse(body); + + await adminWorker.enqueue({ + job: "admin.backfillRunsToReplication", + payload: { + from, + to, + batchSize: batchSize ?? DEFAULT_BATCH_SIZE, + delayIntervalMs: delayIntervalMs ?? DEFAULT_DELAY_INTERVAL_MS, + }, + id: batchId, + }); + + return json({ + success: true, + id: batchId, + }); + } catch (error) { + return json({ error: error instanceof Error ? error.message : error }, { status: 400 }); + } +} diff --git a/apps/webapp/app/routes/admin.api.v1.runs-replication.$batchId.cancel.ts b/apps/webapp/app/routes/admin.api.v1.runs-replication.$batchId.cancel.ts new file mode 100644 index 0000000000..8dfcf9fb85 --- /dev/null +++ b/apps/webapp/app/routes/admin.api.v1.runs-replication.$batchId.cancel.ts @@ -0,0 +1,45 @@ +import { type ActionFunctionArgs, json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { prisma } from "~/db.server"; +import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { adminWorker } from "~/v3/services/adminWorker.server"; + +const Params = z.object({ + batchId: z.string(), +}); + +export async function action({ request, params }: ActionFunctionArgs) { + // Next authenticate the request + const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); + + if (!authenticationResult) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + const user = await prisma.user.findUnique({ + where: { + id: authenticationResult.userId, + }, + }); + + if (!user) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + if (!user.admin) { + return json({ error: "You must be an admin to perform this action" }, { status: 403 }); + } + + const { batchId } = Params.parse(params); + + try { + await adminWorker.cancel(batchId); + + return json({ + success: true, + id: batchId, + }); + } catch (error) { + return json({ error: error instanceof Error ? error.message : error }, { status: 400 }); + } +} diff --git a/apps/webapp/app/services/runsBackfiller.server.ts b/apps/webapp/app/services/runsBackfiller.server.ts new file mode 100644 index 0000000000..a566b44bb3 --- /dev/null +++ b/apps/webapp/app/services/runsBackfiller.server.ts @@ -0,0 +1,92 @@ +import { Tracer } from "@opentelemetry/api"; +import type { PrismaClientOrTransaction } from "@trigger.dev/database"; +import { RunsReplicationService } from "~/services/runsReplicationService.server"; +import { startSpan } from "~/v3/tracing.server"; +import { FINAL_RUN_STATUSES } from "../v3/taskStatus"; +import { Logger } from "@trigger.dev/core/logger"; + +export class RunsBackfillerService { + private readonly prisma: PrismaClientOrTransaction; + private readonly runsReplicationInstance: RunsReplicationService; + private readonly tracer: Tracer; + private readonly logger: Logger; + + constructor(opts: { + prisma: PrismaClientOrTransaction; + runsReplicationInstance: RunsReplicationService; + tracer: Tracer; + logLevel?: "log" | "error" | "warn" | "info" | "debug"; + }) { + this.prisma = opts.prisma; + this.runsReplicationInstance = opts.runsReplicationInstance; + this.tracer = opts.tracer; + this.logger = new Logger("RunsBackfillerService", opts.logLevel ?? "debug"); + } + + public async call({ + from, + to, + cursor, + batchSize, + }: { + from: Date; + to: Date; + cursor?: string; + batchSize?: number; + }): Promise { + return await startSpan(this.tracer, "RunsBackfillerService.call()", async (span) => { + span.setAttribute("from", from.toISOString()); + span.setAttribute("to", to.toISOString()); + span.setAttribute("cursor", cursor ?? ""); + span.setAttribute("batchSize", batchSize ?? 0); + + const runs = await this.prisma.taskRun.findMany({ + where: { + createdAt: { + gte: from, + lte: to, + }, + status: { + in: FINAL_RUN_STATUSES, + }, + ...(cursor ? { id: { gt: cursor } } : {}), + }, + orderBy: { + id: "asc", + }, + take: batchSize, + }); + + if (runs.length === 0) { + this.logger.info("No runs to backfill", { from, to, cursor }); + + return; + } + + this.logger.info("Backfilling runs", { + from, + to, + cursor, + batchSize, + runCount: runs.length, + firstCreatedAt: runs[0].createdAt, + lastCreatedAt: runs[runs.length - 1].createdAt, + }); + + await this.runsReplicationInstance.backfill(runs); + + const lastRun = runs[runs.length - 1]; + + this.logger.info("Backfilled runs", { + from, + to, + cursor, + batchSize, + lastRunId: lastRun.id, + }); + + // Return the last run ID to continue from + return lastRun.id; + }); + } +} diff --git a/apps/webapp/app/v3/services/adminWorker.server.ts b/apps/webapp/app/v3/services/adminWorker.server.ts new file mode 100644 index 0000000000..97c94b954f --- /dev/null +++ b/apps/webapp/app/v3/services/adminWorker.server.ts @@ -0,0 +1,102 @@ +import { Logger } from "@trigger.dev/core/logger"; +import { Worker as RedisWorker } from "@trigger.dev/redis-worker"; +import { z } from "zod"; +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import { runsReplicationInstance } from "~/services/runsReplicationInstance.server"; +import { singleton } from "~/utils/singleton"; +import { tracer } from "../tracer.server"; +import { $replica } from "~/db.server"; +import { RunsBackfillerService } from "../../services/runsBackfiller.server"; + +function initializeWorker() { + const redisOptions = { + keyPrefix: "admin:worker:", + host: env.ADMIN_WORKER_REDIS_HOST, + port: env.ADMIN_WORKER_REDIS_PORT, + username: env.ADMIN_WORKER_REDIS_USERNAME, + password: env.ADMIN_WORKER_REDIS_PASSWORD, + enableAutoPipelining: true, + ...(env.ADMIN_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }; + + logger.debug(`👨‍🏭 Initializing admin worker at host ${env.ADMIN_WORKER_REDIS_HOST}`); + + const worker = new RedisWorker({ + name: "admin-worker", + redisOptions, + catalog: { + "admin.backfillRunsToReplication": { + schema: z.object({ + from: z.coerce.date(), + to: z.coerce.date(), + cursor: z.string().optional(), + batchSize: z.coerce.number().int().default(500), + delayIntervalMs: z.coerce.number().int().default(1000), + }), + visibilityTimeoutMs: 60_000 * 15, // 15 minutes + retry: { + maxAttempts: 5, + }, + }, + }, + concurrency: { + workers: env.ADMIN_WORKER_CONCURRENCY_WORKERS, + tasksPerWorker: env.ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER, + limit: env.ADMIN_WORKER_CONCURRENCY_LIMIT, + }, + pollIntervalMs: env.ADMIN_WORKER_POLL_INTERVAL, + immediatePollIntervalMs: env.ADMIN_WORKER_IMMEDIATE_POLL_INTERVAL, + shutdownTimeoutMs: env.ADMIN_WORKER_SHUTDOWN_TIMEOUT_MS, + logger: new Logger("AdminWorker", env.ADMIN_WORKER_LOG_LEVEL), + jobs: { + "admin.backfillRunsToReplication": async ({ payload, id }) => { + if (!runsReplicationInstance) { + logger.error("Runs replication instance not found"); + return; + } + + const service = new RunsBackfillerService({ + prisma: $replica, + runsReplicationInstance: runsReplicationInstance, + tracer: tracer, + }); + + const cursor = await service.call({ + from: payload.from, + to: payload.to, + cursor: payload.cursor, + batchSize: payload.batchSize, + }); + + if (cursor) { + await worker.enqueue({ + job: "admin.backfillRunsToReplication", + payload: { + from: payload.from, + to: payload.to, + cursor, + batchSize: payload.batchSize, + delayIntervalMs: payload.delayIntervalMs, + }, + id, + availableAt: new Date(Date.now() + payload.delayIntervalMs), + cancellationKey: id, + }); + } + }, + }, + }); + + if (env.ADMIN_WORKER_ENABLED === "true") { + logger.debug( + `👨‍🏭 Starting admin worker at host ${env.ADMIN_WORKER_REDIS_HOST}, pollInterval = ${env.ADMIN_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.ADMIN_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.ADMIN_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.ADMIN_WORKER_CONCURRENCY_LIMIT}` + ); + + worker.start(); + } + + return worker; +} + +export const adminWorker = singleton("adminWorker", initializeWorker); diff --git a/apps/webapp/test/runsBackfiller.test.ts b/apps/webapp/test/runsBackfiller.test.ts new file mode 100644 index 0000000000..7051fb976f --- /dev/null +++ b/apps/webapp/test/runsBackfiller.test.ts @@ -0,0 +1,183 @@ +import { vi } from "vitest"; + +// Mock the db prisma client +vi.mock("~/db.server", () => ({ + prisma: {}, + $replica: {}, +})); + +import { ClickHouse } from "@internal/clickhouse"; +import { containerTest } from "@internal/testcontainers"; +import { z } from "zod"; +import { RunsBackfillerService } from "~/services/runsBackfiller.server"; +import { RunsReplicationService } from "~/services/runsReplicationService.server"; +import { createInMemoryTracing } from "./utils/tracing"; + +vi.setConfig({ testTimeout: 60_000 }); + +describe("RunsBackfillerService", () => { + containerTest( + "should backfill completed runs to clickhouse", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + compression: { + request: true, + }, + }); + + const { tracer, exporter } = createInMemoryTracing(); + + const runsReplicationService = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + tracer, + }); + + const organization = await prisma.organization.create({ + data: { + title: "test", + slug: "test", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + // Insert 11 completed runs into the database + for (let i = 0; i < 11; i++) { + await prisma.taskRun.create({ + data: { + friendlyId: `run_1234_${i}`, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: "1234", + spanId: "1234", + queue: "test", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "COMPLETED_SUCCESSFULLY", + }, + }); + } + + // Insert a second run that's not completed + await prisma.taskRun.create({ + data: { + friendlyId: "run_1235", + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: "1234", + spanId: "1234", + queue: "test", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "PENDING", + }, + }); + + // Insert a third run that was created before the "from" date + await prisma.taskRun.create({ + data: { + friendlyId: "run_1236", + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: "1234", + spanId: "1234", + queue: "test", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + status: "COMPLETED_SUCCESSFULLY", + createdAt: new Date(Date.now() - 60000), // 60 seconds ago + }, + }); + + const service = new RunsBackfillerService({ + prisma, + runsReplicationInstance: runsReplicationService, + tracer, + }); + + const from = new Date(Date.now() - 10000); + const to = new Date(Date.now() + 1000); + + const backfillResult = await service.call({ + from, + to, + batchSize: 10, + }); + + expect(backfillResult).toBeDefined(); + + // Okay now use the cursor to backfill again for the next batch + const backfillResult2 = await service.call({ + from, + to, + batchSize: 10, + cursor: backfillResult, + }); + + expect(backfillResult2).toBeDefined(); + + // Now use the cursor to backfill again for the next batch, but this time it will return undefined because there are no more runs to backfill + const backfillResult3 = await service.call({ + from, + to, + batchSize: 10, + cursor: backfillResult2, + }); + + expect(backfillResult3).toBeUndefined(); + + // Check that the row was replicated to clickhouse + const queryRuns = clickhouse.reader.query({ + name: "runs-replication", + query: "SELECT * FROM trigger_dev.task_runs_v2", + schema: z.any(), + }); + + const [queryError, result] = await queryRuns({}); + + expect(queryError).toBeNull(); + expect(result?.length).toBe(11); + } + ); +}); diff --git a/packages/redis-worker/src/queue.ts b/packages/redis-worker/src/queue.ts index b09a9979bd..e4c593a148 100644 --- a/packages/redis-worker/src/queue.ts +++ b/packages/redis-worker/src/queue.ts @@ -83,6 +83,10 @@ export class SimpleQueue { this.schema = schema; } + async cancel(cancellationKey: string): Promise { + await this.redis.set(`cancellationKey:${cancellationKey}`, "1", "EX", 60 * 60 * 24); // 1 day + } + async enqueue({ id, job, @@ -90,6 +94,7 @@ export class SimpleQueue { attempt, availableAt, visibilityTimeoutMs, + cancellationKey, }: { id?: string; job: MessageCatalogKey; @@ -97,6 +102,7 @@ export class SimpleQueue { attempt?: number; availableAt?: Date; visibilityTimeoutMs: number; + cancellationKey?: string; }): Promise { try { const score = availableAt ? availableAt.getTime() : Date.now(); @@ -109,13 +115,16 @@ export class SimpleQueue { deduplicationKey, }); - const result = await this.redis.enqueueItem( - `queue`, - `items`, - id ?? nanoid(), - score, - serializedItem - ); + const result = cancellationKey + ? await this.redis.enqueueItemWithCancellationKey( + `queue`, + `items`, + `cancellationKey:${cancellationKey}`, + id ?? nanoid(), + score, + serializedItem + ) + : await this.redis.enqueueItem(`queue`, `items`, id ?? nanoid(), score, serializedItem); if (result !== 1) { throw new Error("Enqueue operation failed"); @@ -409,6 +418,29 @@ export class SimpleQueue { `, }); + this.redis.defineCommand("enqueueItemWithCancellationKey", { + numberOfKeys: 3, + lua: ` + local queue = KEYS[1] + local items = KEYS[2] + local cancellationKey = KEYS[3] + + local id = ARGV[1] + local score = ARGV[2] + local serializedItem = ARGV[3] + + -- if the cancellation key exists, return 1 + if redis.call('EXISTS', cancellationKey) == 1 then + return 1 + end + + redis.call('ZADD', queue, score, id) + redis.call('HSET', items, id, serializedItem) + + return 1 + `, + }); + this.redis.defineCommand("dequeueItems", { numberOfKeys: 2, lua: ` @@ -599,6 +631,18 @@ declare module "@internal/redis" { callback?: Callback ): Result; + enqueueItemWithCancellationKey( + //keys + queue: string, + items: string, + cancellationKey: string, + //args + id: string, + score: number, + serializedItem: string, + callback?: Callback + ): Result; + dequeueItems( //keys queue: string, diff --git a/packages/redis-worker/src/worker.test.ts b/packages/redis-worker/src/worker.test.ts index 8b604be8ae..e4b6fd3e85 100644 --- a/packages/redis-worker/src/worker.test.ts +++ b/packages/redis-worker/src/worker.test.ts @@ -548,4 +548,83 @@ describe("Worker", () => { await worker.stop(); } ); + + redisTest( + "Should allow cancelling a job before it's enqueued, but only if the enqueue.cancellationKey is provided", + { timeout: 30_000 }, + async ({ redisContainer }) => { + const processedPayloads: string[] = []; + + const worker = new Worker({ + name: "test-worker", + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + catalog: { + testJob: { + schema: z.object({ value: z.string() }), + visibilityTimeoutMs: 5000, + retry: { maxAttempts: 3 }, + }, + }, + jobs: { + testJob: async ({ payload }) => { + processedPayloads.push(payload.value); + }, + }, + concurrency: { + workers: 1, + tasksPerWorker: 1, + }, + pollIntervalMs: 10, + logger: new Logger("test", "debug"), // Use debug to see all logs + }).start(); + + // Enqueue a job to run immediately + await worker.enqueue({ + id: "immediate-job", + job: "testJob", + payload: { value: "test" }, + cancellationKey: "test-cancellation-key", + }); + + // Verify it's in the present queue + const initialSize = await worker.queue.size(); + const initialSizeWithFuture = await worker.queue.size({ includeFuture: true }); + expect(initialSize).toBe(1); + expect(initialSizeWithFuture).toBe(1); + + // Wait for job to be processed + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify job was processed + expect(processedPayloads).toContain("test"); + + // Verify queue is completely empty + const finalSize = await worker.queue.size(); + const finalSizeWithFuture = await worker.queue.size({ includeFuture: true }); + expect(finalSize).toBe(0); + expect(finalSizeWithFuture).toBe(0); + + // Now cancel a key + await worker.cancel("test-cancellation-key-2"); + + await worker.enqueue({ + id: "immediate-job", + job: "testJob", + payload: { value: "test" }, + cancellationKey: "test-cancellation-key-2", + }); + + // Verify it's not in the queue (since it's been cancelled) + const finalSize2 = await worker.queue.size(); + expect(finalSize2).toBe(0); + const finalSize2WithFuture = await worker.queue.size({ includeFuture: true }); + expect(finalSize2WithFuture).toBe(0); + + await worker.stop(); + } + ); }); diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index 58db6bef56..a5e77d3a35 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -264,12 +264,14 @@ class Worker { payload, visibilityTimeoutMs, availableAt, + cancellationKey, }: { id?: string; job: K; payload: z.infer; visibilityTimeoutMs?: number; availableAt?: Date; + cancellationKey?: string; }) { return startSpan( this.tracer, @@ -291,6 +293,7 @@ class Worker { item: payload, visibilityTimeoutMs: timeout, availableAt, + cancellationKey, }), { job_type: String(job), @@ -391,6 +394,17 @@ class Worker { ); } + /** + * Cancels a job before it's enqueued. + * @param cancellationKey - The cancellation key to cancel. + * @returns A promise that resolves when the job is cancelled. + * + * Any jobs enqueued with the same cancellation key will be not be enqueued. + */ + cancel(cancellationKey: string) { + return startSpan(this.tracer, "cancel", () => this.queue.cancel(cancellationKey)); + } + ack(id: string) { return startSpan( this.tracer,