From bf7122cee026fe625199c6b99e2be4b805ccfe12 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sun, 12 Jan 2025 00:25:47 +0800 Subject: [PATCH 01/12] chore: update --- packages/bullmq/CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) delete mode 100644 packages/bullmq/CHANGELOG.md diff --git a/packages/bullmq/CHANGELOG.md b/packages/bullmq/CHANGELOG.md deleted file mode 100644 index fa4d35e687c..00000000000 --- a/packages/bullmq/CHANGELOG.md +++ /dev/null @@ -1 +0,0 @@ -# Change Log \ No newline at end of file From 9590ef25401e28d97576935350a37833be3f6b81 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sun, 12 Jan 2025 00:25:53 +0800 Subject: [PATCH 02/12] chore: update --- packages/bullmq/index.d.ts | 16 +- packages/bullmq/package.json | 4 +- packages/bullmq/src/config/config.default.ts | 6 +- packages/bullmq/src/configuration.ts | 26 +- packages/bullmq/src/constants.ts | 2 + packages/bullmq/src/decorator.ts | 28 +- packages/bullmq/src/framework.ts | 235 +++++++++++----- packages/bullmq/src/index.ts | 1 + packages/bullmq/src/interface.ts | 30 +-- .../test/fixtures/base-app/package.json | 3 - .../fixtures/base-app/src/configuration.ts | 26 -- .../fixtures/base-app/src/task/hello.task.ts | 19 -- .../fixtures/base-app/src/task/limit.task.ts | 15 -- .../fixtures/base-app/src/task/queue.task.ts | 16 -- packages/bullmq/test/index.test.ts | 250 +++++++++++++++++- 15 files changed, 470 insertions(+), 207 deletions(-) delete mode 100644 packages/bullmq/test/fixtures/base-app/package.json delete mode 100644 packages/bullmq/test/fixtures/base-app/src/configuration.ts delete mode 100644 packages/bullmq/test/fixtures/base-app/src/task/hello.task.ts delete mode 100644 packages/bullmq/test/fixtures/base-app/src/task/limit.task.ts delete mode 100644 packages/bullmq/test/fixtures/base-app/src/task/queue.task.ts diff --git a/packages/bullmq/index.d.ts b/packages/bullmq/index.d.ts index 80571ab680b..8bb0f73a147 100644 --- a/packages/bullmq/index.d.ts +++ b/packages/bullmq/index.d.ts @@ -1,20 +1,8 @@ -import { ConnectionOptions } from 'bullmq'; +import { BullMQConfig } from './dist/index'; export * from './dist/index'; -export { Job } from 'bullmq'; -import { IQueueOptions, IWorkerOptions } from './dist/index'; declare module '@midwayjs/core/dist/interface' { - // bullmq 新引入了 worker 作为执行任务的实例,一个队列 queue 和 worker 中 connection, prefix 必须一致才能正常执行 - // 所以 config 中 connection, prefix 单独配置 - // eslint-disable-next-line interface MidwayConfig { - bullmq?: { - connection: ConnectionOptions; - prefix?: string; - defaultQueueOptions?: IQueueOptions; - defaultWorkerOptions?: IWorkerOptions; - clearRepeatJobWhenStart?: boolean; - contextLoggerFormat?: (info: any) => string; - }; + bullmq?: BullMQConfig; } } diff --git a/packages/bullmq/package.json b/packages/bullmq/package.json index befb515d74f..be839aa2195 100644 --- a/packages/bullmq/package.json +++ b/packages/bullmq/package.json @@ -1,7 +1,7 @@ { "name": "@midwayjs/bullmq", - "version": "1.0.0", - "description": "midway component for bullmq", + "version": "0.0.1", + "description": "midway component for bullMQ", "main": "dist/index.js", "typings": "index.d.ts", "scripts": { diff --git a/packages/bullmq/src/config/config.default.ts b/packages/bullmq/src/config/config.default.ts index 96607182407..ab019343c23 100644 --- a/packages/bullmq/src/config/config.default.ts +++ b/packages/bullmq/src/config/config.default.ts @@ -1,5 +1,7 @@ -export const bullmq = { - prefix: '{midway-bullmq}', +import { BullMQConfig } from '../interface'; + +export const bullmq: BullMQConfig = { + defaultPrefix: '{midway-bullmq}', defaultQueueOptions: { defaultJobOptions: { removeOnComplete: 3, diff --git a/packages/bullmq/src/configuration.ts b/packages/bullmq/src/configuration.ts index d4b71f8355e..58b4c015ece 100644 --- a/packages/bullmq/src/configuration.ts +++ b/packages/bullmq/src/configuration.ts @@ -6,7 +6,7 @@ import { } from '@midwayjs/core'; import * as DefaultConfig from './config/config.default'; import { BullMQFramework } from './framework'; -import { BULLMQ_QUEUE_KEY } from './constants'; +import { BULLMQ_FLOW_PRODUCER_KEY, BULLMQ_QUEUE_KEY, BULLMQ_WORKER_KEY } from './constants'; @Configuration({ namespace: 'bullmq', @@ -36,6 +36,30 @@ export class BullConfiguration { return this.framework.getQueue(meta.queueName); } ); + + this.decoratorService.registerPropertyHandler( + BULLMQ_WORKER_KEY, + ( + propertyName, + meta: { + queueName: string; + } + ) => { + return this.framework.getWorker(meta.queueName); + } + ); + + this.decoratorService.registerPropertyHandler( + BULLMQ_FLOW_PRODUCER_KEY, + ( + propertyName, + meta: { + producerName: string; + } + ) => { + return this.framework.getFlowProducer(meta.producerName); + } + ); } async onReady() { diff --git a/packages/bullmq/src/constants.ts b/packages/bullmq/src/constants.ts index 91255fc2f7c..ce5daf65979 100644 --- a/packages/bullmq/src/constants.ts +++ b/packages/bullmq/src/constants.ts @@ -1,3 +1,5 @@ // task export const BULLMQ_QUEUE_KEY = 'bullmq:queue'; export const BULLMQ_PROCESSOR_KEY = 'bullmq:processor'; +export const BULLMQ_WORKER_KEY = 'bullmq:worker'; +export const BULLMQ_FLOW_PRODUCER_KEY = 'bullmq:flow-producer'; \ No newline at end of file diff --git a/packages/bullmq/src/decorator.ts b/packages/bullmq/src/decorator.ts index e648cd686d2..359c4fa9796 100644 --- a/packages/bullmq/src/decorator.ts +++ b/packages/bullmq/src/decorator.ts @@ -6,15 +6,19 @@ import { Scope, ScopeEnum, } from '@midwayjs/core'; -import { IQueueOptions, IWorkerOptions } from './interface'; -import { BULLMQ_PROCESSOR_KEY, BULLMQ_QUEUE_KEY } from './constants'; -import { JobsOptions } from 'bullmq'; +import { + BULLMQ_FLOW_PRODUCER_KEY, + BULLMQ_PROCESSOR_KEY, + BULLMQ_QUEUE_KEY, + BULLMQ_WORKER_KEY, +} from './constants'; +import { QueueOptions, WorkerOptions, JobsOptions } from 'bullmq'; export function Processor( queueName: string, jobOptions?: JobsOptions, - workerOptions?: IWorkerOptions, - queueOptions?: IQueueOptions + workerOptions?: WorkerOptions, + queueOptions?: QueueOptions ): ClassDecorator { return function (target: any) { saveModule(BULLMQ_PROCESSOR_KEY, target); @@ -23,8 +27,8 @@ export function Processor( { queueName, jobOptions, - queueOptions, workerOptions, + queueOptions, }, target ); @@ -38,3 +42,15 @@ export function InjectQueue(queueName: string): PropertyDecorator { queueName, }); } + +export function InjectWorker(queueName: string): PropertyDecorator { + return createCustomPropertyDecorator(BULLMQ_WORKER_KEY, { + queueName, + }); +} + +export function InjectFlowProducer(producerName: string): PropertyDecorator { + return createCustomPropertyDecorator(BULLMQ_FLOW_PRODUCER_KEY, { + producerName, + }); +} diff --git a/packages/bullmq/src/framework.ts b/packages/bullmq/src/framework.ts index 3538acb259b..94046baee37 100644 --- a/packages/bullmq/src/framework.ts +++ b/packages/bullmq/src/framework.ts @@ -1,6 +1,5 @@ import { BaseFramework, - extend, IMidwayBootstrapOptions, Framework, getClassMetadata, @@ -9,35 +8,36 @@ import { MidwayInvokeForbiddenError, Logger, ILogger, + MidwayCommonError, } from '@midwayjs/core'; -import { - Application, - Context, - IProcessor, - IQueue, - IQueueManager, - IQueueOptions, - IWorkerOptions, -} from './interface'; +import { Application, Context, IProcessor } from './interface'; import { Job, JobsOptions, Queue, Worker, QueueOptions, + WorkerOptions, ConnectionOptions, + QueueEvents, + QueueEventsProducer, + QueueBaseOptions, + QueueEventsOptions, + FlowProducer, } from 'bullmq'; import { BULLMQ_PROCESSOR_KEY } from './constants'; -export class BullMQQueue extends Queue implements IQueue { - constructor(queueName: string, queueOptions: QueueOptions) { +export class BullMQQueue extends Queue { + private queueEventsList: QueueEvents[] = []; + private queueEventsProducerList: QueueEventsProducer[] = []; + + constructor( + protected queueName: string, + protected queueOptions: QueueOptions + ) { super(queueName, queueOptions); } - getJob(name: string): Promise { - throw new Error('Method not implemented.'); - } - // bullmq 在 queue.add 新增第一个参数 jobName // runJob 与 @midwayjs/bull 保持一致,如果想要使用 jobName 则可以直接调用 queue.add public async runJob(data: any, options?: JobsOptions): Promise { const { repeat, ...OtherOptions } = options ?? {}; @@ -52,23 +52,50 @@ export class BullMQQueue extends Queue implements IQueue { } public getQueueName(): string { - return this.name; + return this.queueName; + } + + public createQueueEvents(options?: QueueEventsOptions) { + const evt = new QueueEvents(this.name, { + connection: this.queueOptions.connection, + prefix: this.queueOptions.prefix, + ...options, + }); + this.queueEventsList.push(evt); + return evt; + } + + public getQueueEventsList() { + return this.queueEventsList; + } + + public createQueueEventsProducer(options?: QueueBaseOptions) { + const producer = new QueueEventsProducer(this.name, { + connection: this.queueOptions.connection, + prefix: this.queueOptions.prefix, + ...options, + }); + this.queueEventsProducerList.push(producer); + return producer; + } + + public getQueueEventsProducerList() { + return this.queueEventsProducerList; } } @Framework() -export class BullMQFramework - extends BaseFramework - implements IQueueManager -{ - private connection: ConnectionOptions; - private prefix?: string; - private defaultQueueConfig: IQueueOptions; - private defaultWorkerConfig: IWorkerOptions; +export class BullMQFramework extends BaseFramework { + private defaultConnection: { + connection?: ConnectionOptions; + prefix?: string; + }; + private defaultQueueConfig: QueueOptions; + private defaultWorkerConfig: WorkerOptions; private clearRepeatJobWhenStart: boolean; private queueMap: Map = new Map(); - /** keep a map of workers for gracefully shutdown */ - private workerMap: Map = new Map(); + private workerMap: Map = new Map(); + private flowProducerMap: Map = new Map(); @Logger('bullMQLogger') protected bullMQLogger: ILogger; @@ -78,8 +105,19 @@ export class BullMQFramework } public loadConfig() { - this.connection = this.configService.getConfiguration('bullmq.connection'); - this.prefix = this.configService.getConfiguration('bullmq.prefix'); + const defaultConnection = this.configService.getConfiguration( + 'bullmq.defaultConnection' + ); + + const defaultPrefix = this.configService.getConfiguration( + 'bullmq.defaultPrefix' + ); + + this.defaultConnection = { + connection: defaultConnection, + prefix: defaultPrefix, + }; + this.defaultQueueConfig = this.configService.getConfiguration( 'bullmq.defaultQueueOptions' ); @@ -105,8 +143,8 @@ export class BullMQFramework const options = getClassMetadata(BULLMQ_PROCESSOR_KEY, mod) as { queueName: string; jobOptions?: JobsOptions; - queueOptions?: IQueueOptions; - workerOptions?: IWorkerOptions; + workerOptions?: WorkerOptions; + queueOptions?: QueueOptions; }; const { repeat, ...otherOptions } = options.jobOptions ?? {}; const queueOptions = options.queueOptions ?? {}; @@ -114,7 +152,9 @@ export class BullMQFramework ...queueOptions, defaultJobOptions: otherOptions, }); - if (!currentQueue) throw Error('ensureQueue failed'); + if (!currentQueue) { + throw new MidwayCommonError(`[midway:bullmq] Queue ${options.queueName} not found`); + } // clear old repeat job when start if (this.clearRepeatJobWhenStart) { const jobs = await currentQueue.getJobSchedulers(); @@ -130,7 +170,8 @@ export class BullMQFramework await this.addProcessor(mod, options.queueName, options.workerOptions); if (repeat) { - await this.runJob(options.queueName, {}, options.jobOptions); + // add repeatable job + await this.getQueue(options.queueName)?.runJob({}, options.jobOptions); } } } @@ -141,18 +182,25 @@ export class BullMQFramework await queue.close(); } for (const worker of this.workerMap.values()) { - await worker.close(); + for (const w of worker) { + await w.close(); + } + } + for (const producer of this.flowProducerMap.values()) { + await producer.close(); } } - public createQueue(name: string, queueOptions: IQueueOptions) { - const queue = new BullMQQueue( - name, - extend(true, {}, this.defaultQueueConfig, queueOptions, { - connection: this.connection, - prefix: this.prefix, - }) - ); + /** + * Create a queue with name and queueOptions + */ + public createQueue(name: string, queueOptions: Partial = {}) { + const mergedOptions = { + ...this.defaultQueueConfig, + ...this.defaultConnection, + ...queueOptions, + }; + const queue = new BullMQQueue(name, mergedOptions); this.queueMap.set(name, queue); queue.on('error', err => { this.bullMQLogger.error(err); @@ -160,11 +208,17 @@ export class BullMQFramework return queue; } + /** + * Get a queue by name + */ public getQueue(name: string) { return this.queueMap.get(name); } - public ensureQueue(name: string, queueOptions: IQueueOptions) { + /** + * Ensure a queue by name and queueOptions + */ + protected ensureQueue(name: string, queueOptions: Partial = {}) { if (!this.queueMap.has(name)) { this.createQueue(name, queueOptions); } @@ -175,27 +229,62 @@ export class BullMQFramework return Array.from(this.queueMap.values()); } - public getWorker(name: string) { - return this.workerMap.get(name); + /** + * Get the first worker by queueName + */ + public getWorker(queueName: string): Worker { + return this.workerMap.get(queueName)?.[0]; + } + + /** + * Get all workers by queueName + */ + public getWorkers(queueName: string): Worker[] { + return this.workerMap.get(queueName); + } + + /** + * Create a worker + */ + public createWorker( + queueName: string, + processor: (job: Job, token?: string) => Promise, + workerOptions: Partial = {} + ): Worker { + const merged = { + ...this.defaultConnection, + ...this.defaultWorkerConfig, + ...workerOptions, + }; + + const worker = new Worker(queueName, processor, merged); + + if (!this.workerMap.has(queueName)) { + this.workerMap.set(queueName, []); + } + this.workerMap.get(queueName).push(worker); + return worker; } + /** + * Add a processor class and init a worker + */ public async addProcessor( processor: new (...args) => IProcessor, queueName: string, - workerOptions?: IWorkerOptions + workerOptions?: WorkerOptions ) { const queue = this.queueMap.get(queueName); if (!queue) throw Error(`queue not found ${queueName}`); - - const worker = new Worker( + return this.createWorker( queueName, - async (job: Job) => { + async (job: Job, token) => { const ctx = this.app.createAnonymousContext({ jobId: job.id, job, + token, from: processor, }); - try { ctx.logger.info(`start process job ${job.id} from ${processor.name}`); @@ -212,7 +301,8 @@ export class BullMQFramework const fn = await this.applyMiddleware(async ctx => { return await Utils.toAsyncFunction(service.execute.bind(service))( job.data, - job + job, + token ); }); const result = await Promise.resolve(await fn(ctx)); @@ -225,33 +315,32 @@ export class BullMQFramework return Promise.reject(err); } }, - extend(true, {}, this.defaultWorkerConfig, workerOptions, { - connection: this.connection, - prefix: this.prefix, - }) + workerOptions ); - - this.workerMap.set(queueName, worker); } - public async runJob( - queueName: string, - jobData: any, - options?: JobsOptions - ): Promise { - const queue = this.queueMap.get(queueName); - if (queue) { - return await queue.runJob(jobData, options); + /** + * Create a flow producer, if producerName is provided, it will be store. + */ + public createFlowProducer( + options?: QueueBaseOptions, + producerName?: string + ): FlowProducer { + const producer = new FlowProducer({ + ...this.defaultConnection, + ...options, + }); + + if (producerName) { + this.flowProducerMap.set(producerName, producer); } + return producer; } - public async getJob( - queueName: string, - jobName: string - ): Promise { - const queue = this.queueMap.get(queueName); - if (queue) { - return queue.getJob(jobName); - } + /** + * Get a flow producer by name + */ + public getFlowProducer(producerName: string): FlowProducer | undefined { + return this.flowProducerMap.get(producerName); } } diff --git a/packages/bullmq/src/index.ts b/packages/bullmq/src/index.ts index b2cb92dbe88..a72a2328883 100644 --- a/packages/bullmq/src/index.ts +++ b/packages/bullmq/src/index.ts @@ -2,3 +2,4 @@ export { BullConfiguration as Configuration } from './configuration'; export { BullMQFramework as Framework, BullMQQueue } from './framework'; export * from './decorator'; export * from './interface'; +export * as BullMQ from 'bullmq'; diff --git a/packages/bullmq/src/interface.ts b/packages/bullmq/src/interface.ts index 536e32b9955..7d143e1a9af 100644 --- a/packages/bullmq/src/interface.ts +++ b/packages/bullmq/src/interface.ts @@ -1,22 +1,8 @@ import { IMidwayApplication, IMidwayContext, NextFunction as BaseNextFunction } from '@midwayjs/core'; -import { WorkerOptions, QueueOptions, Job, Worker } from 'bullmq'; +import { WorkerOptions, QueueOptions, Job, ConnectionOptions } from 'bullmq'; export interface IProcessor { - execute(data: any); -} - -export interface IQueue { - runJob(data: Record, options?: unknown): Promise; - getJob(name: string): Promise; - getQueueName(): string; -} - -export interface IQueueManager, Job> { - runJob(queueName: string, jobData: any, options?: unknown): Promise; - getJob(queueName: string, jobName: string): Promise; - createQueue(queueName: string, queueOptions?: unknown): Queue; - getQueue(queueName: string): Queue | undefined; - getWorker(queueName: string): Worker | undefined; + execute(data: any, job: Job, token?: string): Promise; } export interface Application extends IMidwayApplication { } @@ -28,6 +14,12 @@ export interface Context extends IMidwayContext { from: new (...args) => IProcessor; } -export type IWorkerOptions = Omit -export type IQueueOptions = Omit - +export interface BullMQConfig { + defaultConnection?: ConnectionOptions; + defaultPrefix?: string; + defaultQueueOptions?: Partial; + defaultWorkerOptions?: Partial; + clearRepeatJobWhenStart?: boolean; + contextLoggerApplyLogger?: string; + contextLoggerFormat?: (info: any) => string; +} diff --git a/packages/bullmq/test/fixtures/base-app/package.json b/packages/bullmq/test/fixtures/base-app/package.json deleted file mode 100644 index 621cdc6a417..00000000000 --- a/packages/bullmq/test/fixtures/base-app/package.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "name": "ali-demo" -} diff --git a/packages/bullmq/test/fixtures/base-app/src/configuration.ts b/packages/bullmq/test/fixtures/base-app/src/configuration.ts deleted file mode 100644 index 624198619bf..00000000000 --- a/packages/bullmq/test/fixtures/base-app/src/configuration.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { Configuration } from '@midwayjs/core'; -import * as bullmq from '../../../../src'; - -@Configuration({ - imports: [ - bullmq - ], - importConfigs: [ - { - default: { - bullmq: { - connection: { - host: '127.0.0.1', - port: 6379, - } - }, - }, - }, - ], -}) -export class ContainerConfiguration { - - async onReady() { - - } -} diff --git a/packages/bullmq/test/fixtures/base-app/src/task/hello.task.ts b/packages/bullmq/test/fixtures/base-app/src/task/hello.task.ts deleted file mode 100644 index 941c769ebf2..00000000000 --- a/packages/bullmq/test/fixtures/base-app/src/task/hello.task.ts +++ /dev/null @@ -1,19 +0,0 @@ -import { FORMAT, App, Inject, IMidwayApplication } from '@midwayjs/core'; -import { Processor, IProcessor } from '../../../../../src'; - -@Processor('HelloTask', { - repeat: { - pattern: FORMAT.CRONTAB.EVERY_PER_5_SECOND - } -}) -export class HelloTask implements IProcessor { - @App() - app: IMidwayApplication; - - @Inject() - logger; - - async execute() { - this.app.setAttr(`task`, 'task'); - } -} diff --git a/packages/bullmq/test/fixtures/base-app/src/task/limit.task.ts b/packages/bullmq/test/fixtures/base-app/src/task/limit.task.ts deleted file mode 100644 index 119a5eb9ee7..00000000000 --- a/packages/bullmq/test/fixtures/base-app/src/task/limit.task.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { App, sleep, Inject } from '@midwayjs/core'; -import { Processor, Application } from '../../../../../src'; - -@Processor('concurrency', {}, { limiter: { max: 3, duration: 1000 }, concurrency: 3 }, {}) -export class QueueTask { - @App() - app: Application; - - @Inject() - logger; - - async execute(params) { - await sleep(3 * 1000); - } -} diff --git a/packages/bullmq/test/fixtures/base-app/src/task/queue.task.ts b/packages/bullmq/test/fixtures/base-app/src/task/queue.task.ts deleted file mode 100644 index d56c1270b8d..00000000000 --- a/packages/bullmq/test/fixtures/base-app/src/task/queue.task.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { App, Inject } from '@midwayjs/core'; -import { Processor, Application } from '../../../../../src'; - -@Processor('test') -export class QueueTask { - @App() - app: Application; - - @Inject() - logger; - - async execute(params) { - this.logger.info(`====>QueueTask execute`); - this.app.setAttr(`queueConfig`, JSON.stringify(params)); - } -} diff --git a/packages/bullmq/test/index.test.ts b/packages/bullmq/test/index.test.ts index d8e09ea521f..38f13086569 100644 --- a/packages/bullmq/test/index.test.ts +++ b/packages/bullmq/test/index.test.ts @@ -1,14 +1,65 @@ -import { createApp, close } from '@midwayjs/mock'; -import { join } from 'path'; -import { sleep } from '@midwayjs/core'; +import { createLightApp, close } from '@midwayjs/mock'; +import { App, sleep, Inject, FORMAT } from '@midwayjs/core'; import * as bullmq from '../src'; -import expect from 'expect'; - +import { Processor, Application, IProcessor } from '../src'; +import { JobsOptions, Job } from 'bullmq'; describe(`/test/index.test.ts`, () => { it('test auto repeat processor', async () => { - const app = await createApp(join(__dirname, 'fixtures', 'base-app'), {}, bullmq); + @Processor('HelloTask', { + repeat: { + pattern: FORMAT.CRONTAB.EVERY_PER_5_SECOND + } + }) + class HelloTask implements IProcessor { + @App() + app: Application; + + async execute() { + this.app.setAttr(`task`, 'task'); + } + } + + @Processor('concurrency', {}, { limiter: { max: 3, duration: 1000 }, concurrency: 3 }) + class QueueTask { + async execute(params) { + await sleep(3 * 1000); + } + } + + @Processor('test') + class QueueTask1 { + @App() + app: Application; + + @Inject() + logger; + + async execute(params) { + this.logger.info(`====>QueueTask execute`); + this.app.setAttr(`queueConfig`, JSON.stringify(params)); + } + } + + const app = await createLightApp({ + imports: [ + bullmq + ], + globalConfig: { + bullmq: { + connection: { + host: '127.0.0.1', + port: 6379, + } + }, + }, + preloadModules: [ + HelloTask, + QueueTask, + QueueTask1 + ] + }); await sleep(5 * 1000); let res = app.getAttr(`task`); @@ -30,18 +81,195 @@ describe(`/test/index.test.ts`, () => { expect(await job?.getState()).toEqual('completed'); const concurrencyQueue = bullFramework.getQueue('concurrency'); - await concurrencyQueue.setGlobalConcurrency(2); + await concurrencyQueue?.setGlobalConcurrency(2); for (let index = 0; index < 6; index++) { - concurrencyQueue.runJob(index); + concurrencyQueue?.runJob(index); } await sleep(1 * 1000); - expect((await concurrencyQueue.getJobCounts()).active).toEqual(2); - await concurrencyQueue.setGlobalConcurrency(4); + expect((await concurrencyQueue?.getJobCounts())?.active).toEqual(2); + await concurrencyQueue?.setGlobalConcurrency(4); await sleep(4 * 1000); - expect((await concurrencyQueue.getJobCounts()).active).toEqual(3); + expect((await concurrencyQueue?.getJobCounts())?.active).toEqual(3); await sleep(3 * 1000); await close(app); }); + // 测试重试机制 + it('should handle job retries', async () => { + let retryCount = 0; + + @Processor('retryTask') + class RetryTask implements IProcessor { + @App() + app: Application; + + async execute(params: any): Promise { + retryCount++; + if (retryCount < 3) { + throw new Error('Simulated failure'); + } + this.app.setAttr('retrySuccess', true); + } + } + + const app = await createLightApp({ + imports: [bullmq], + globalConfig: { + bullmq: { + connection: { + host: '127.0.0.1', + port: 6379, + } + }, + }, + preloadModules: [RetryTask] + }); + + const bullFramework = app.getApplicationContext().get(bullmq.Framework); + const retryQueue = bullFramework.getQueue('retryTask'); + + const job = await retryQueue?.runJob({}, { + attempts: 3, + backoff: { + type: 'exponential', + delay: 100 + } + }); + + await sleep(2000); + expect(retryCount).toBe(3); + expect(app.getAttr('retrySuccess')).toBe(true); + expect(await job?.getState()).toBe('completed'); + + await close(app); + }); + + // 测试任务优先级 + it('should handle job priorities', async () => { + const executionOrder: number[] = []; + + @Processor('priorityTask') + class PriorityTask implements IProcessor { + + async execute(params: { priority: number }): Promise { + executionOrder.push(params.priority); + } + } + + const app = await createLightApp({ + imports: [bullmq], + globalConfig: { + bullmq: { + connection: { + host: '127.0.0.1', + port: 6379, + } + }, + }, + preloadModules: [PriorityTask] + }); + + const bullFramework = app.getApplicationContext().get(bullmq.Framework); + const priorityQueue = bullFramework.getQueue('priorityTask'); + + // 添加不同优先级的任务 + await priorityQueue?.runJob({ priority: 3 }, { priority: 1 }); // 低优先级 + await priorityQueue?.runJob({ priority: 1 }, { priority: 3 }); // 高优先级 + await priorityQueue?.runJob({ priority: 2 }, { priority: 2 }); // 中优先级 + + await sleep(2000); + + // 验证执行顺序是否按照优先级排序 + expect(executionOrder).toEqual([1, 2, 3]); + + await close(app); + }); + + // 测试任务进度 + it('should handle job progress', async () => { + let progressValue = 0; + + @Processor('progressTask') + class ProgressTask implements IProcessor { + @App() + app: Application; + + async execute(params: any): Promise { + const job = await this.app.getApplicationContext().get('currentJob') as Job; + for (let i = 0; i <= 100; i += 20) { + await job.updateProgress(i); + await sleep(100); + } + this.app.setAttr('finalProgress', await job.progress); + } + } + + const app = await createLightApp({ + imports: [bullmq], + globalConfig: { + bullmq: { + connection: { + host: '127.0.0.1', + port: 6379, + } + }, + }, + preloadModules: [ProgressTask] + }); + + const bullFramework = app.getApplicationContext().get(bullmq.Framework); + const progressQueue = bullFramework.getQueue('progressTask'); + + const job = await progressQueue?.runJob({}); + + if (job) { + const currentProgress = await job.progress; + progressValue = typeof currentProgress === 'number' ? currentProgress : 0; + } + + await sleep(1000); + expect(progressValue).toBeGreaterThanOrEqual(0); + await sleep(1000); + expect(app.getAttr('finalProgress')).toBe(100); + + await close(app); + }); + + // 测试任务超时 + it('should handle job timeouts', async () => { + @Processor('timeoutTask') + class TimeoutTask implements IProcessor { + async execute(): Promise { + await sleep(2000); // 任务执行时间超过超时设置 + } + } + + const app = await createLightApp({ + imports: [bullmq], + globalConfig: { + bullmq: { + connection: { + host: '127.0.0.1', + port: 6379, + } + }, + }, + preloadModules: [TimeoutTask] + }); + + const bullFramework = app.getApplicationContext().get(bullmq.Framework); + const timeoutQueue = bullFramework.getQueue('timeoutTask'); + + const jobOptions: JobsOptions = { + attempts: 1, + }; + const job = await timeoutQueue?.runJob({}, jobOptions); + + await sleep(3000); + expect(await job?.getState()).toBe('failed'); + + await close(app); + }); + }); From b343fd23e97c8812b0f42bbb1284ac2367ce6a66 Mon Sep 17 00:00:00 2001 From: harperKKK <48441544+harperKKK@users.noreply.github.com> Date: Sun, 12 Jan 2025 00:28:04 +0800 Subject: [PATCH 03/12] feat: support bull board using bullmq (#4259) Co-authored-by: gqc --- packages/bull-board/index.d.ts | 1 + packages/bull-board/package.json | 2 + .../bull-board/src/bullmq.board.middleware.ts | 173 ++++++++++++++++++ packages/bull-board/src/configuration.ts | 13 +- packages/bull-board/src/index.ts | 1 + packages/bull-board/src/interface.ts | 1 + .../fixtures/base-app-bullmq/package.json | 3 + .../base-app-bullmq/src/configuration.ts | 33 ++++ .../base-app-bullmq/src/task/queue.task.ts | 16 ++ packages/bull-board/test/index.test.ts | 22 +++ 10 files changed, 263 insertions(+), 2 deletions(-) create mode 100644 packages/bull-board/src/bullmq.board.middleware.ts create mode 100644 packages/bull-board/test/fixtures/base-app-bullmq/package.json create mode 100644 packages/bull-board/test/fixtures/base-app-bullmq/src/configuration.ts create mode 100644 packages/bull-board/test/fixtures/base-app-bullmq/src/task/queue.task.ts diff --git a/packages/bull-board/index.d.ts b/packages/bull-board/index.d.ts index 69b909d25f2..f5e041ee056 100644 --- a/packages/bull-board/index.d.ts +++ b/packages/bull-board/index.d.ts @@ -1,4 +1,5 @@ import '@midwayjs/bull'; +import '@midwayjs/bullmq'; import { BullBoardOption } from './dist/index'; export * from './dist/index'; diff --git a/packages/bull-board/package.json b/packages/bull-board/package.json index d00c1a04348..2f3b8e296c5 100644 --- a/packages/bull-board/package.json +++ b/packages/bull-board/package.json @@ -13,6 +13,7 @@ "midway", "IoC", "bull", + "bullmq", "bull-ui", "plugin" ], @@ -33,6 +34,7 @@ "@bull-board/api": "5.23.0", "@bull-board/ui": "5.23.0", "@midwayjs/bull": "^3.19.3", + "@midwayjs/bullmq": "^1.0.0", "ejs": "3.1.10" }, "engines": { diff --git a/packages/bull-board/src/bullmq.board.middleware.ts b/packages/bull-board/src/bullmq.board.middleware.ts new file mode 100644 index 00000000000..a07687521fa --- /dev/null +++ b/packages/bull-board/src/bullmq.board.middleware.ts @@ -0,0 +1,173 @@ +import { + IMiddleware, + IMidwayApplication, + IMidwayContext, + NextFunction, + Config, + Init, + Inject, + Provide, + Scope, + ScopeEnum, + MidwayFrameworkType, +} from '@midwayjs/core'; +import { extname } from 'path'; +import * as bullmq from '@midwayjs/bullmq'; +import { createBullBoard } from '@bull-board/api'; +import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; +import { MidwayAdapter } from './adapter'; +import { BullBoardOption } from './interface'; +import { BullBoardManager } from './board.manager'; + +const MIME_MAP = { + '.html': 'text/html', + '.htm': 'text/html', + '.js': 'application/javascript', + '.css': 'text/css', + '.json': 'application/json', + '.png': 'image/png', + '.jpg': 'image/jpeg', + '.jpeg': 'image/jpeg', + '.gif': 'image/gif', + '.svg': 'image/svg+xml', + '.ico': 'image/x-icon', + '.ttf': 'application/x-font-ttf', + '.woff': 'application/font-woff', + '.woff2': 'application/font-woff2', + '.eot': 'application/vnd.ms-fontobject', + '.otf': 'application/x-font-opentype', +}; + +@Provide() +@Scope(ScopeEnum.Singleton) +export class BullMQBoardMiddleware + implements IMiddleware +{ + @Inject() + protected framework: bullmq.Framework; + + @Config('bullBoard') + protected bullBoardConfig: BullBoardOption; + + @Inject() + protected bullBoardManager: BullBoardManager; + + private basePath: string; + private serverAdapter: MidwayAdapter; + + @Init() + protected async init() { + const queueList = this.framework.getQueueList(); + const wrapQueues = queueList.map(queue => new BullMQAdapter(queue)); + this.basePath = this.bullBoardConfig.basePath; + + this.serverAdapter = new MidwayAdapter(); + const bullBoard = createBullBoard({ + queues: wrapQueues, + serverAdapter: this.serverAdapter, + options: { + uiConfig: this.bullBoardConfig.uiConfig, + }, + }); + this.serverAdapter.setBasePath(this.basePath); + this.bullBoardManager.setBullBoard(bullBoard); + } + + resolve(app: IMidwayApplication) { + if (app.getFrameworkType() === MidwayFrameworkType.WEB_EXPRESS) { + return async (req: any, res: any, next: NextFunction) => { + const pathname = req.path; + if (pathname.indexOf(this.basePath) === -1) { + return next(); + } + const routePath: string = pathname.replace(this.basePath, '') || '/'; + + let content; + if (routePath.startsWith(this.serverAdapter.getStaticRoutes())) { + content = await this.serverAdapter.renderStatic(routePath); + } else if ( + this.serverAdapter.getViewRoutes().indexOf(routePath) !== -1 + ) { + const entryRoute = this.serverAdapter.getEntryRoute(); + const { name, params } = entryRoute.handler({ + basePath: this.basePath, + uiConfig: this.bullBoardConfig.uiConfig, + }); + content = await this.serverAdapter.renderView(name, params); + } else { + const matchRoute = this.serverAdapter.matchApiRoutes( + req.method, + routePath + ); + if (matchRoute) { + content = await this.serverAdapter.runAPI(matchRoute, req.query); + } + } + + const ext = extname(pathname); + if (MIME_MAP[ext]) { + res.type(MIME_MAP[ext]); + } else { + if (typeof content === 'object') { + res.type('application/json'); + } else { + res.type('text/html'); + } + } + + res.send(content); + }; + } else { + return async (ctx: IMidwayContext, next: NextFunction) => { + const pathname = (ctx as any).path; + if (pathname.indexOf(this.basePath) === -1) { + return next(); + } + + const routePath: string = pathname.replace(this.basePath, '') || '/'; + + let content; + if (routePath.startsWith(this.serverAdapter.getStaticRoutes())) { + content = await this.serverAdapter.renderStatic(routePath); + } else if ( + this.serverAdapter.getViewRoutes().indexOf(routePath) !== -1 + ) { + const entryRoute = this.serverAdapter.getEntryRoute(); + const { name, params } = entryRoute.handler({ + basePath: this.basePath, + uiConfig: this.bullBoardConfig.uiConfig, + }); + content = await this.serverAdapter.renderView(name, params); + } else { + const matchRoute = this.serverAdapter.matchApiRoutes( + (ctx as any).method, + routePath + ); + if (matchRoute) { + content = await this.serverAdapter.runAPI( + matchRoute, + (ctx as any).query + ); + } + } + + const ext = extname(pathname); + if (MIME_MAP[ext]) { + (ctx as any).type = MIME_MAP[ext]; + } else { + if (typeof content === 'object') { + (ctx as any).type = 'application/json'; + } else { + (ctx as any).type = 'text/html'; + } + } + + (ctx as any).body = content; + }; + } + } + + static getName() { + return 'bull-board-ui'; + } +} diff --git a/packages/bull-board/src/configuration.ts b/packages/bull-board/src/configuration.ts index b3d79b009a1..87d959c21cb 100644 --- a/packages/bull-board/src/configuration.ts +++ b/packages/bull-board/src/configuration.ts @@ -1,4 +1,5 @@ import * as bull from '@midwayjs/bull'; +import * as bullmq from '@midwayjs/bullmq'; import { Configuration, Inject, @@ -6,14 +7,16 @@ import { MidwayConfigService, } from '@midwayjs/core'; import { BoardMiddleware } from './board.middleware'; +import { BullMQBoardMiddleware } from './bullmq.board.middleware'; @Configuration({ namespace: 'bull-board', - imports: [bull], + imports: [bull, bullmq], importConfigs: [ { default: { bullBoard: { + package: 'bull', basePath: '/ui', uiConfig: {}, adapterOptions: { @@ -32,6 +35,8 @@ export class BullBoardConfiguration { configService: MidwayConfigService; async onReady() { + const queuePackage = + this.configService.getConfiguration('bullBoard.package'); const apps = this.applicationManager.getApplications([ 'express', 'egg', @@ -39,7 +44,11 @@ export class BullBoardConfiguration { ]); if (apps.length) { apps.forEach(app => { - app.useMiddleware(BoardMiddleware); + if (queuePackage === 'bull') { + app.useMiddleware(BoardMiddleware); + } else if (queuePackage === 'bullmq') { + app.useMiddleware(BullMQBoardMiddleware); + } }); } } diff --git a/packages/bull-board/src/index.ts b/packages/bull-board/src/index.ts index 84a2477f0c6..78ec0542de6 100644 --- a/packages/bull-board/src/index.ts +++ b/packages/bull-board/src/index.ts @@ -1,5 +1,6 @@ export { BullBoardConfiguration as Configuration } from './configuration'; export * from './board.middleware'; +export * from './bullmq.board.middleware'; export * from './board.manager'; export * from './interface'; export * as BullBoard from '@bull-board/api'; diff --git a/packages/bull-board/src/interface.ts b/packages/bull-board/src/interface.ts index 65bd653461c..cfb5b5bcf21 100644 --- a/packages/bull-board/src/interface.ts +++ b/packages/bull-board/src/interface.ts @@ -1,6 +1,7 @@ import type { QueueAdapterOptions, UIConfig } from "@bull-board/api/dist/typings/app"; export interface BullBoardOption { + package?: 'bull' | 'bullmq'; basePath?: string; uiConfig?: UIConfig; adapterOptions?: QueueAdapterOptions; diff --git a/packages/bull-board/test/fixtures/base-app-bullmq/package.json b/packages/bull-board/test/fixtures/base-app-bullmq/package.json new file mode 100644 index 00000000000..621cdc6a417 --- /dev/null +++ b/packages/bull-board/test/fixtures/base-app-bullmq/package.json @@ -0,0 +1,3 @@ +{ + "name": "ali-demo" +} diff --git a/packages/bull-board/test/fixtures/base-app-bullmq/src/configuration.ts b/packages/bull-board/test/fixtures/base-app-bullmq/src/configuration.ts new file mode 100644 index 00000000000..27d804089cf --- /dev/null +++ b/packages/bull-board/test/fixtures/base-app-bullmq/src/configuration.ts @@ -0,0 +1,33 @@ +import { Configuration } from '@midwayjs/core'; +import * as bullBoard from '../../../../src'; +import * as bullmq from '@midwayjs/bullmq'; +import * as koa from '@midwayjs/koa'; + +@Configuration({ + imports: [ + koa, + bullmq, + bullBoard, + ], + importConfigs: [ + { + default: { + keys: 123, + bullmq: { + connection: { + host: '127.0.0.1', + port: 6379, + } + }, + bullBoard: { + package: 'bullmq' + }, + }, + }, + ], +}) +export class AutoConfiguration { + + async onReady(){ + } +} diff --git a/packages/bull-board/test/fixtures/base-app-bullmq/src/task/queue.task.ts b/packages/bull-board/test/fixtures/base-app-bullmq/src/task/queue.task.ts new file mode 100644 index 00000000000..0c0afa640cb --- /dev/null +++ b/packages/bull-board/test/fixtures/base-app-bullmq/src/task/queue.task.ts @@ -0,0 +1,16 @@ +import { App, Inject } from '@midwayjs/core'; +import { Processor, Application } from '@midwayjs/bullmq'; + +@Processor('test') +export class QueueTask { + @App() + app: Application; + + @Inject() + logger; + + async execute(params) { + this.logger.info(`====>QueueTask execute`); + this.app.setAttr(`queueConfig`, JSON.stringify(params)); + } +} diff --git a/packages/bull-board/test/index.test.ts b/packages/bull-board/test/index.test.ts index 478eb9cdf99..1113d2f334f 100644 --- a/packages/bull-board/test/index.test.ts +++ b/packages/bull-board/test/index.test.ts @@ -1,6 +1,7 @@ import { createApp, close, createHttpRequest, createLightApp } from '@midwayjs/mock'; import { join } from 'path'; import * as bullboard from '../src'; +import * as bullmq from '@midwayjs/bullmq'; describe(`/test/index.test.ts`, () => { it('test ui in koa', async () => { @@ -74,4 +75,25 @@ describe(`/test/index.test.ts`, () => { await close(app); }); + + it('test using package bullmq', async () => { + const app = await createApp(join(__dirname, 'fixtures', 'base-app-bullmq')); + + const bullFramework = app.getApplicationContext().get(bullmq.Framework); + const testQueue = bullFramework.getQueue('test'); + await testQueue?.runJob({name: 'stone-jin'}); + // page + let result = await createHttpRequest(app).get('/ui'); + expect(result.status).toBe(200); + expect(result.text).toMatch(/doctype html/); + expect(result.headers['content-type']).toMatch(/text\/html/); + + result = await createHttpRequest(app).get('/ui/api/queues?activeQueue=test&page=1&jobsPerPage=10'); + expect(result.status).toBe(200); + expect(result.body.queues.length).toBe(1); + expect(result.body.queues[0].type).toBe('bullmq'); + expect(result.headers['content-type']).toMatch('application/json'); + + await close(app); + }); }); From 793ab8586d7a9a36d130bb7ee222cb4076139cf8 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sun, 12 Jan 2025 00:45:05 +0800 Subject: [PATCH 04/12] refactor: bull-board --- packages/bull-board/package.json | 6 ++-- packages/bull-board/src/board.middleware.ts | 34 ++++++++++++++++--- packages/bull-board/src/configuration.ts | 17 ++++------ packages/bull-board/src/interface.ts | 3 +- .../base-app-bullmq/src/configuration.ts | 5 +-- packages/bullmq/src/configuration.ts | 6 +++- packages/bullmq/src/constants.ts | 2 +- packages/bullmq/src/framework.ts | 9 +++-- 8 files changed, 54 insertions(+), 28 deletions(-) diff --git a/packages/bull-board/package.json b/packages/bull-board/package.json index 2f3b8e296c5..a7c7411ed65 100644 --- a/packages/bull-board/package.json +++ b/packages/bull-board/package.json @@ -28,13 +28,13 @@ "@midwayjs/core": "^3.19.0", "@midwayjs/express": "^3.19.2", "@midwayjs/koa": "^3.19.2", - "@midwayjs/mock": "^3.19.2" + "@midwayjs/mock": "^3.19.2", + "@midwayjs/bull": "^3.19.3", + "@midwayjs/bullmq": "^0.0.1" }, "dependencies": { "@bull-board/api": "5.23.0", "@bull-board/ui": "5.23.0", - "@midwayjs/bull": "^3.19.3", - "@midwayjs/bullmq": "^1.0.0", "ejs": "3.1.10" }, "engines": { diff --git a/packages/bull-board/src/board.middleware.ts b/packages/bull-board/src/board.middleware.ts index ff7f6ea45c7..f9a85b8ed69 100644 --- a/packages/bull-board/src/board.middleware.ts +++ b/packages/bull-board/src/board.middleware.ts @@ -10,14 +10,19 @@ import { Scope, ScopeEnum, MidwayFrameworkType, + ApplicationContext, + IMidwayContainer, + MidwayFrameworkService, } from '@midwayjs/core'; import { extname } from 'path'; -import * as bull from '@midwayjs/bull'; import { createBullBoard } from '@bull-board/api'; import { BullAdapter } from '@bull-board/api/bullAdapter'; +import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; import { MidwayAdapter } from './adapter'; import { BullBoardOption } from './interface'; import { BullBoardManager } from './board.manager'; +import type { Framework as BullFramework } from '@midwayjs/bull'; +import type { Framework as BullMQFramework } from '@midwayjs/bullmq'; const MIME_MAP = { '.html': 'text/html', @@ -44,7 +49,7 @@ export class BoardMiddleware implements IMiddleware { @Inject() - protected framework: bull.Framework; + protected frameworkService: MidwayFrameworkService; @Config('bullBoard') protected bullBoardConfig: BullBoardOption; @@ -52,13 +57,34 @@ export class BoardMiddleware @Inject() protected bullBoardManager: BullBoardManager; + @ApplicationContext() + protected applicationContext: IMidwayContainer; + private basePath: string; private serverAdapter: MidwayAdapter; @Init() protected async init() { - const queueList = this.framework.getQueueList(); - const wrapQueues = queueList.map(queue => new BullAdapter(queue)); + let framework: BullFramework | BullMQFramework = + this.frameworkService.getFramework('bull') as BullFramework; + if (!framework) { + framework = this.frameworkService.getFramework( + 'bullmq' + ) as BullMQFramework; + } + + if (!framework) { + return; + } + + const queueList = framework.getQueueList(); + const wrapQueues = queueList.map(queue => { + if (this.applicationContext.hasNamespace('bull')) { + return new BullAdapter(queue); + } else if (this.applicationContext.hasNamespace('bullmq')) { + return new BullMQAdapter(queue); + } + }); this.basePath = this.bullBoardConfig.basePath; this.serverAdapter = new MidwayAdapter(); diff --git a/packages/bull-board/src/configuration.ts b/packages/bull-board/src/configuration.ts index 87d959c21cb..9189b7af85a 100644 --- a/packages/bull-board/src/configuration.ts +++ b/packages/bull-board/src/configuration.ts @@ -1,22 +1,18 @@ -import * as bull from '@midwayjs/bull'; -import * as bullmq from '@midwayjs/bullmq'; import { Configuration, + IMidwayContainer, Inject, MidwayApplicationManager, MidwayConfigService, } from '@midwayjs/core'; import { BoardMiddleware } from './board.middleware'; -import { BullMQBoardMiddleware } from './bullmq.board.middleware'; @Configuration({ namespace: 'bull-board', - imports: [bull, bullmq], importConfigs: [ { default: { bullBoard: { - package: 'bull', basePath: '/ui', uiConfig: {}, adapterOptions: { @@ -34,9 +30,7 @@ export class BullBoardConfiguration { @Inject() configService: MidwayConfigService; - async onReady() { - const queuePackage = - this.configService.getConfiguration('bullBoard.package'); + async onReady(container: IMidwayContainer) { const apps = this.applicationManager.getApplications([ 'express', 'egg', @@ -44,10 +38,11 @@ export class BullBoardConfiguration { ]); if (apps.length) { apps.forEach(app => { - if (queuePackage === 'bull') { + if ( + container.hasNamespace('bull') || + container.hasNamespace('bullmq') + ) { app.useMiddleware(BoardMiddleware); - } else if (queuePackage === 'bullmq') { - app.useMiddleware(BullMQBoardMiddleware); } }); } diff --git a/packages/bull-board/src/interface.ts b/packages/bull-board/src/interface.ts index cfb5b5bcf21..af7aaca5a01 100644 --- a/packages/bull-board/src/interface.ts +++ b/packages/bull-board/src/interface.ts @@ -1,8 +1,7 @@ import type { QueueAdapterOptions, UIConfig } from "@bull-board/api/dist/typings/app"; export interface BullBoardOption { - package?: 'bull' | 'bullmq'; basePath?: string; uiConfig?: UIConfig; adapterOptions?: QueueAdapterOptions; -} \ No newline at end of file +} diff --git a/packages/bull-board/test/fixtures/base-app-bullmq/src/configuration.ts b/packages/bull-board/test/fixtures/base-app-bullmq/src/configuration.ts index 27d804089cf..4c4886c301f 100644 --- a/packages/bull-board/test/fixtures/base-app-bullmq/src/configuration.ts +++ b/packages/bull-board/test/fixtures/base-app-bullmq/src/configuration.ts @@ -14,14 +14,11 @@ import * as koa from '@midwayjs/koa'; default: { keys: 123, bullmq: { - connection: { + defaultConnection: { host: '127.0.0.1', port: 6379, } }, - bullBoard: { - package: 'bullmq' - }, }, }, ], diff --git a/packages/bullmq/src/configuration.ts b/packages/bullmq/src/configuration.ts index 58b4c015ece..5704d5d3815 100644 --- a/packages/bullmq/src/configuration.ts +++ b/packages/bullmq/src/configuration.ts @@ -6,7 +6,11 @@ import { } from '@midwayjs/core'; import * as DefaultConfig from './config/config.default'; import { BullMQFramework } from './framework'; -import { BULLMQ_FLOW_PRODUCER_KEY, BULLMQ_QUEUE_KEY, BULLMQ_WORKER_KEY } from './constants'; +import { + BULLMQ_FLOW_PRODUCER_KEY, + BULLMQ_QUEUE_KEY, + BULLMQ_WORKER_KEY, +} from './constants'; @Configuration({ namespace: 'bullmq', diff --git a/packages/bullmq/src/constants.ts b/packages/bullmq/src/constants.ts index ce5daf65979..0305a67baad 100644 --- a/packages/bullmq/src/constants.ts +++ b/packages/bullmq/src/constants.ts @@ -2,4 +2,4 @@ export const BULLMQ_QUEUE_KEY = 'bullmq:queue'; export const BULLMQ_PROCESSOR_KEY = 'bullmq:processor'; export const BULLMQ_WORKER_KEY = 'bullmq:worker'; -export const BULLMQ_FLOW_PRODUCER_KEY = 'bullmq:flow-producer'; \ No newline at end of file +export const BULLMQ_FLOW_PRODUCER_KEY = 'bullmq:flow-producer'; diff --git a/packages/bullmq/src/framework.ts b/packages/bullmq/src/framework.ts index 94046baee37..e5a6888d398 100644 --- a/packages/bullmq/src/framework.ts +++ b/packages/bullmq/src/framework.ts @@ -153,7 +153,9 @@ export class BullMQFramework extends BaseFramework { defaultJobOptions: otherOptions, }); if (!currentQueue) { - throw new MidwayCommonError(`[midway:bullmq] Queue ${options.queueName} not found`); + throw new MidwayCommonError( + `[midway:bullmq] Queue ${options.queueName} not found` + ); } // clear old repeat job when start if (this.clearRepeatJobWhenStart) { @@ -218,7 +220,10 @@ export class BullMQFramework extends BaseFramework { /** * Ensure a queue by name and queueOptions */ - protected ensureQueue(name: string, queueOptions: Partial = {}) { + protected ensureQueue( + name: string, + queueOptions: Partial = {} + ) { if (!this.queueMap.has(name)) { this.createQueue(name, queueOptions); } From a0426bf8075d2a7198e9f23378aecdda8fdbd7a9 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sun, 12 Jan 2025 00:51:00 +0800 Subject: [PATCH 05/12] fix: case --- packages/bull-board/index.d.ts | 2 - .../bull-board/src/bullmq.board.middleware.ts | 173 ------------------ packages/bull-board/src/index.ts | 1 - packages/bullmq/src/decorator.ts | 4 +- packages/bullmq/test/index.test.ts | 34 ++-- 5 files changed, 19 insertions(+), 195 deletions(-) delete mode 100644 packages/bull-board/src/bullmq.board.middleware.ts diff --git a/packages/bull-board/index.d.ts b/packages/bull-board/index.d.ts index f5e041ee056..357114f5d71 100644 --- a/packages/bull-board/index.d.ts +++ b/packages/bull-board/index.d.ts @@ -1,5 +1,3 @@ -import '@midwayjs/bull'; -import '@midwayjs/bullmq'; import { BullBoardOption } from './dist/index'; export * from './dist/index'; diff --git a/packages/bull-board/src/bullmq.board.middleware.ts b/packages/bull-board/src/bullmq.board.middleware.ts deleted file mode 100644 index a07687521fa..00000000000 --- a/packages/bull-board/src/bullmq.board.middleware.ts +++ /dev/null @@ -1,173 +0,0 @@ -import { - IMiddleware, - IMidwayApplication, - IMidwayContext, - NextFunction, - Config, - Init, - Inject, - Provide, - Scope, - ScopeEnum, - MidwayFrameworkType, -} from '@midwayjs/core'; -import { extname } from 'path'; -import * as bullmq from '@midwayjs/bullmq'; -import { createBullBoard } from '@bull-board/api'; -import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; -import { MidwayAdapter } from './adapter'; -import { BullBoardOption } from './interface'; -import { BullBoardManager } from './board.manager'; - -const MIME_MAP = { - '.html': 'text/html', - '.htm': 'text/html', - '.js': 'application/javascript', - '.css': 'text/css', - '.json': 'application/json', - '.png': 'image/png', - '.jpg': 'image/jpeg', - '.jpeg': 'image/jpeg', - '.gif': 'image/gif', - '.svg': 'image/svg+xml', - '.ico': 'image/x-icon', - '.ttf': 'application/x-font-ttf', - '.woff': 'application/font-woff', - '.woff2': 'application/font-woff2', - '.eot': 'application/vnd.ms-fontobject', - '.otf': 'application/x-font-opentype', -}; - -@Provide() -@Scope(ScopeEnum.Singleton) -export class BullMQBoardMiddleware - implements IMiddleware -{ - @Inject() - protected framework: bullmq.Framework; - - @Config('bullBoard') - protected bullBoardConfig: BullBoardOption; - - @Inject() - protected bullBoardManager: BullBoardManager; - - private basePath: string; - private serverAdapter: MidwayAdapter; - - @Init() - protected async init() { - const queueList = this.framework.getQueueList(); - const wrapQueues = queueList.map(queue => new BullMQAdapter(queue)); - this.basePath = this.bullBoardConfig.basePath; - - this.serverAdapter = new MidwayAdapter(); - const bullBoard = createBullBoard({ - queues: wrapQueues, - serverAdapter: this.serverAdapter, - options: { - uiConfig: this.bullBoardConfig.uiConfig, - }, - }); - this.serverAdapter.setBasePath(this.basePath); - this.bullBoardManager.setBullBoard(bullBoard); - } - - resolve(app: IMidwayApplication) { - if (app.getFrameworkType() === MidwayFrameworkType.WEB_EXPRESS) { - return async (req: any, res: any, next: NextFunction) => { - const pathname = req.path; - if (pathname.indexOf(this.basePath) === -1) { - return next(); - } - const routePath: string = pathname.replace(this.basePath, '') || '/'; - - let content; - if (routePath.startsWith(this.serverAdapter.getStaticRoutes())) { - content = await this.serverAdapter.renderStatic(routePath); - } else if ( - this.serverAdapter.getViewRoutes().indexOf(routePath) !== -1 - ) { - const entryRoute = this.serverAdapter.getEntryRoute(); - const { name, params } = entryRoute.handler({ - basePath: this.basePath, - uiConfig: this.bullBoardConfig.uiConfig, - }); - content = await this.serverAdapter.renderView(name, params); - } else { - const matchRoute = this.serverAdapter.matchApiRoutes( - req.method, - routePath - ); - if (matchRoute) { - content = await this.serverAdapter.runAPI(matchRoute, req.query); - } - } - - const ext = extname(pathname); - if (MIME_MAP[ext]) { - res.type(MIME_MAP[ext]); - } else { - if (typeof content === 'object') { - res.type('application/json'); - } else { - res.type('text/html'); - } - } - - res.send(content); - }; - } else { - return async (ctx: IMidwayContext, next: NextFunction) => { - const pathname = (ctx as any).path; - if (pathname.indexOf(this.basePath) === -1) { - return next(); - } - - const routePath: string = pathname.replace(this.basePath, '') || '/'; - - let content; - if (routePath.startsWith(this.serverAdapter.getStaticRoutes())) { - content = await this.serverAdapter.renderStatic(routePath); - } else if ( - this.serverAdapter.getViewRoutes().indexOf(routePath) !== -1 - ) { - const entryRoute = this.serverAdapter.getEntryRoute(); - const { name, params } = entryRoute.handler({ - basePath: this.basePath, - uiConfig: this.bullBoardConfig.uiConfig, - }); - content = await this.serverAdapter.renderView(name, params); - } else { - const matchRoute = this.serverAdapter.matchApiRoutes( - (ctx as any).method, - routePath - ); - if (matchRoute) { - content = await this.serverAdapter.runAPI( - matchRoute, - (ctx as any).query - ); - } - } - - const ext = extname(pathname); - if (MIME_MAP[ext]) { - (ctx as any).type = MIME_MAP[ext]; - } else { - if (typeof content === 'object') { - (ctx as any).type = 'application/json'; - } else { - (ctx as any).type = 'text/html'; - } - } - - (ctx as any).body = content; - }; - } - } - - static getName() { - return 'bull-board-ui'; - } -} diff --git a/packages/bull-board/src/index.ts b/packages/bull-board/src/index.ts index 78ec0542de6..84a2477f0c6 100644 --- a/packages/bull-board/src/index.ts +++ b/packages/bull-board/src/index.ts @@ -1,6 +1,5 @@ export { BullBoardConfiguration as Configuration } from './configuration'; export * from './board.middleware'; -export * from './bullmq.board.middleware'; export * from './board.manager'; export * from './interface'; export * as BullBoard from '@bull-board/api'; diff --git a/packages/bullmq/src/decorator.ts b/packages/bullmq/src/decorator.ts index 359c4fa9796..572e19a7ff7 100644 --- a/packages/bullmq/src/decorator.ts +++ b/packages/bullmq/src/decorator.ts @@ -17,8 +17,8 @@ import { QueueOptions, WorkerOptions, JobsOptions } from 'bullmq'; export function Processor( queueName: string, jobOptions?: JobsOptions, - workerOptions?: WorkerOptions, - queueOptions?: QueueOptions + workerOptions?: Partial, + queueOptions?: Partial ): ClassDecorator { return function (target: any) { saveModule(BULLMQ_PROCESSOR_KEY, target); diff --git a/packages/bullmq/test/index.test.ts b/packages/bullmq/test/index.test.ts index 38f13086569..152fcc2c79c 100644 --- a/packages/bullmq/test/index.test.ts +++ b/packages/bullmq/test/index.test.ts @@ -15,7 +15,7 @@ describe(`/test/index.test.ts`, () => { class HelloTask implements IProcessor { @App() app: Application; - + async execute() { this.app.setAttr(`task`, 'task'); } @@ -48,7 +48,7 @@ describe(`/test/index.test.ts`, () => { ], globalConfig: { bullmq: { - connection: { + defaultConnection: { host: '127.0.0.1', port: 6379, } @@ -116,7 +116,7 @@ describe(`/test/index.test.ts`, () => { imports: [bullmq], globalConfig: { bullmq: { - connection: { + defaultConnection: { host: '127.0.0.1', port: 6379, } @@ -127,7 +127,7 @@ describe(`/test/index.test.ts`, () => { const bullFramework = app.getApplicationContext().get(bullmq.Framework); const retryQueue = bullFramework.getQueue('retryTask'); - + const job = await retryQueue?.runJob({}, { attempts: 3, backoff: { @@ -140,7 +140,7 @@ describe(`/test/index.test.ts`, () => { expect(retryCount).toBe(3); expect(app.getAttr('retrySuccess')).toBe(true); expect(await job?.getState()).toBe('completed'); - + await close(app); }); @@ -160,7 +160,7 @@ describe(`/test/index.test.ts`, () => { imports: [bullmq], globalConfig: { bullmq: { - connection: { + defaultConnection: { host: '127.0.0.1', port: 6379, } @@ -171,17 +171,17 @@ describe(`/test/index.test.ts`, () => { const bullFramework = app.getApplicationContext().get(bullmq.Framework); const priorityQueue = bullFramework.getQueue('priorityTask'); - + // 添加不同优先级的任务 await priorityQueue?.runJob({ priority: 3 }, { priority: 1 }); // 低优先级 await priorityQueue?.runJob({ priority: 1 }, { priority: 3 }); // 高优先级 await priorityQueue?.runJob({ priority: 2 }, { priority: 2 }); // 中优先级 await sleep(2000); - + // 验证执行顺序是否按照优先级排序 expect(executionOrder).toEqual([1, 2, 3]); - + await close(app); }); @@ -193,7 +193,7 @@ describe(`/test/index.test.ts`, () => { class ProgressTask implements IProcessor { @App() app: Application; - + async execute(params: any): Promise { const job = await this.app.getApplicationContext().get('currentJob') as Job; for (let i = 0; i <= 100; i += 20) { @@ -208,7 +208,7 @@ describe(`/test/index.test.ts`, () => { imports: [bullmq], globalConfig: { bullmq: { - connection: { + defaultConnection: { host: '127.0.0.1', port: 6379, } @@ -219,9 +219,9 @@ describe(`/test/index.test.ts`, () => { const bullFramework = app.getApplicationContext().get(bullmq.Framework); const progressQueue = bullFramework.getQueue('progressTask'); - + const job = await progressQueue?.runJob({}); - + if (job) { const currentProgress = await job.progress; progressValue = typeof currentProgress === 'number' ? currentProgress : 0; @@ -231,7 +231,7 @@ describe(`/test/index.test.ts`, () => { expect(progressValue).toBeGreaterThanOrEqual(0); await sleep(1000); expect(app.getAttr('finalProgress')).toBe(100); - + await close(app); }); @@ -248,7 +248,7 @@ describe(`/test/index.test.ts`, () => { imports: [bullmq], globalConfig: { bullmq: { - connection: { + defaultConnection: { host: '127.0.0.1', port: 6379, } @@ -259,7 +259,7 @@ describe(`/test/index.test.ts`, () => { const bullFramework = app.getApplicationContext().get(bullmq.Framework); const timeoutQueue = bullFramework.getQueue('timeoutTask'); - + const jobOptions: JobsOptions = { attempts: 1, }; @@ -267,7 +267,7 @@ describe(`/test/index.test.ts`, () => { await sleep(3000); expect(await job?.getState()).toBe('failed'); - + await close(app); }); From a5ff0990837081d096a5b0f4c69406f4f4aeab24 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sun, 12 Jan 2025 23:23:19 +0800 Subject: [PATCH 06/12] test: add more case --- packages/bullmq/src/framework.ts | 4 +- packages/bullmq/test/index.test.ts | 210 +++++++++++++++++++++++++++-- 2 files changed, 202 insertions(+), 12 deletions(-) diff --git a/packages/bullmq/src/framework.ts b/packages/bullmq/src/framework.ts index e5a6888d398..db72cae3366 100644 --- a/packages/bullmq/src/framework.ts +++ b/packages/bullmq/src/framework.ts @@ -328,13 +328,13 @@ export class BullMQFramework extends BaseFramework { * Create a flow producer, if producerName is provided, it will be store. */ public createFlowProducer( - options?: QueueBaseOptions, + options?: Partial, producerName?: string ): FlowProducer { const producer = new FlowProducer({ ...this.defaultConnection, ...options, - }); + } as QueueBaseOptions); if (producerName) { this.flowProducerMap.set(producerName, producer); diff --git a/packages/bullmq/test/index.test.ts b/packages/bullmq/test/index.test.ts index 152fcc2c79c..bb4f2f3aac2 100644 --- a/packages/bullmq/test/index.test.ts +++ b/packages/bullmq/test/index.test.ts @@ -1,9 +1,9 @@ - import { createLightApp, close } from '@midwayjs/mock'; -import { App, sleep, Inject, FORMAT } from '@midwayjs/core'; +import { App, sleep, Inject, FORMAT, MidwayCommonError } from '@midwayjs/core'; import * as bullmq from '../src'; -import { Processor, Application, IProcessor } from '../src'; +import { Processor, Application, IProcessor, Context } from '../src'; import { JobsOptions, Job } from 'bullmq'; +import * as assert from 'node:assert'; describe(`/test/index.test.ts`, () => { it('test auto repeat processor', async () => { @@ -173,9 +173,9 @@ describe(`/test/index.test.ts`, () => { const priorityQueue = bullFramework.getQueue('priorityTask'); // 添加不同优先级的任务 - await priorityQueue?.runJob({ priority: 3 }, { priority: 1 }); // 低优先级 - await priorityQueue?.runJob({ priority: 1 }, { priority: 3 }); // 高优先级 + await priorityQueue?.runJob({ priority: 3 }, { priority: 3 }); // 低优先级 await priorityQueue?.runJob({ priority: 2 }, { priority: 2 }); // 中优先级 + await priorityQueue?.runJob({ priority: 1 }, { priority: 1 }); // 高优先级 await sleep(2000); @@ -194,13 +194,16 @@ describe(`/test/index.test.ts`, () => { @App() app: Application; - async execute(params: any): Promise { - const job = await this.app.getApplicationContext().get('currentJob') as Job; + @Inject() + ctx: Context; + + async execute(params: any, job: Job): Promise { + assert(job === this.ctx.job); for (let i = 0; i <= 100; i += 20) { await job.updateProgress(i); await sleep(100); } - this.app.setAttr('finalProgress', await job.progress); + this.app.setAttr('finalProgress', job.progress); } } @@ -223,7 +226,7 @@ describe(`/test/index.test.ts`, () => { const job = await progressQueue?.runJob({}); if (job) { - const currentProgress = await job.progress; + const currentProgress = job.progress; progressValue = typeof currentProgress === 'number' ? currentProgress : 0; } @@ -237,10 +240,34 @@ describe(`/test/index.test.ts`, () => { // 测试任务超时 it('should handle job timeouts', async () => { + // bullmq 不提供 timeout 配置,自行处理超时情况 @Processor('timeoutTask') class TimeoutTask implements IProcessor { async execute(): Promise { - await sleep(2000); // 任务执行时间超过超时设置 + let controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), 1000); + + try { + await new Promise((resolve, reject) => { + const t = setTimeout(() => { + clearTimeout(timer); + resolve(true); + }, 2000); + + controller.signal.addEventListener('abort', () => { + clearTimeout(t); + reject(new MidwayCommonError('Task execution timed out')) + }); + }); + } catch(err) { + if (err.name == "MidwayCommonError") { + throw err; + } else { + throw new MidwayCommonError('unknown error'); + } + } finally { + clearTimeout(timer); + } } } @@ -271,5 +298,168 @@ describe(`/test/index.test.ts`, () => { await close(app); }); + // 测试动态创建队列和任务 + it('should handle dynamic queue and job creation', async () => { + const app = await createLightApp({ + imports: [bullmq], + globalConfig: { + bullmq: { + defaultConnection: { + host: '127.0.0.1', + port: 6379, + } + }, + } + }); + + const bullFramework = app.getApplicationContext().get(bullmq.Framework); + + // 测试动态创建队列 + const dynamicQueue = bullFramework.createQueue('dynamicQueue', { + defaultJobOptions: { + removeOnComplete: 1, + removeOnFail: 1, + } + }); + + // 测试创建 worker + const results: string[] = []; + const worker = bullFramework.createWorker( + 'dynamicQueue', + async (job) => { + results.push(job.data.message); + return job.data; + }, + { + concurrency: 2 + } + ); + + // 添加任务 + await dynamicQueue.runJob({ message: 'task1' }); + await dynamicQueue.runJob({ message: 'task2' }); + + await sleep(1000); + expect(results).toContain('task1'); + expect(results).toContain('task2'); + + await worker.close(); + await close(app); + }); + + // 测试 Flow Producer + it('should handle flow producer', async () => { + const app = await createLightApp({ + imports: [bullmq], + globalConfig: { + bullmq: { + defaultConnection: { + host: '127.0.0.1', + port: 6379, + } + }, + } + }); + + const bullFramework = app.getApplicationContext().get(bullmq.Framework); + + // 创建队列 + bullFramework.createQueue('flow-queue-1'); + bullFramework.createQueue('flow-queue-2'); + + const results: string[] = []; + + // 创建 workers + bullFramework.createWorker( + 'flow-queue-1', + async (job) => { + results.push(`queue1-${job.data.value}`); + return { value: job.data.value + 1 }; + } + ); + + bullFramework.createWorker( + 'flow-queue-2', + async (job) => { + results.push(`queue2-${job.data.value}`); + return { value: job.data.value + 1 }; + } + ); + + // 创建 flow producer + const flowProducer = bullFramework.createFlowProducer({}, 'test-flow'); + + // 创建任务流 + await flowProducer.add({ + name: 'flow-test', + queueName: 'flow-queue-1', + data: { value: 1 }, + children: [ + { + name: 'child-job', + queueName: 'flow-queue-2', + data: { value: 2 } + } + ] + }); + + await sleep(2000); + expect(results).toContain('queue1-1'); + expect(results).toContain('queue2-2'); + await close(app); + }); + + // 测试队列事件 + it('should handle queue events', async () => { + const app = await createLightApp({ + imports: [bullmq], + globalConfig: { + bullmq: { + defaultConnection: { + host: '127.0.0.1', + port: 6379, + } + }, + } + }); + + const bullFramework = app.getApplicationContext().get(bullmq.Framework); + const eventQueue = bullFramework.createQueue('event-queue'); + + const events: string[] = []; + const queueEvents = eventQueue.createQueueEvents(); + + // 监听队列事件 + queueEvents.on('completed', ({ jobId }) => { + events.push(`completed:${jobId}`); + }); + + queueEvents.on('failed', ({ jobId }) => { + events.push(`failed:${jobId}`); + }); + + // 创建成功和失败的任务 + const worker = bullFramework.createWorker( + 'event-queue', + async (job) => { + if (job.data.shouldFail) { + throw new Error('Task failed'); + } + return job.data; + } + ); + + const job1 = await eventQueue.runJob({ shouldFail: false }); + const job2 = await eventQueue.runJob({ shouldFail: true }); + + await sleep(2000); + + expect(events).toContain(`completed:${job1.id}`); + expect(events).toContain(`failed:${job2.id}`); + + await worker.close(); + await queueEvents.close(); + await close(app); + }); }); From 390338cfa54b1c086023ec1472314b51f66f5d81 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sun, 12 Jan 2025 23:41:43 +0800 Subject: [PATCH 07/12] docs: update --- site/docs/extensions/bullmq.md | 438 +++++++++++++++++++++++++++++++++ 1 file changed, 438 insertions(+) create mode 100644 site/docs/extensions/bullmq.md diff --git a/site/docs/extensions/bullmq.md b/site/docs/extensions/bullmq.md new file mode 100644 index 00000000000..fedfb0e6ffc --- /dev/null +++ b/site/docs/extensions/bullmq.md @@ -0,0 +1,438 @@ +# BullMQ 任务队列 + +队列是一种强大的设计模式,可帮助您应对常见的应用程序扩展和性能挑战。队列可以帮助您解决的一些问题: + +- 平滑处理峰值。可以在任意时间启动资源密集型任务,然后将这些任务添加到队列中,而不是同步执行。让任务进程以受控方式从队列中提取任务。也可以轻松添加新的队列消费者以扩展后端任务处理。 +- 分解可能会阻塞 Node.js 事件循环的单一任务。比如用户请求需要像音频转码这样的 CPU 密集型工作,就可以将此任务委托给其他进程,从而释放面向用户的进程以保持响应。 +- 提供跨各种服务的可靠通信渠道。例如,您可以在一个进程或服务中排队任务(作业),并在另一个进程或服务中使用它们。在任何流程或服务的作业生命周期中完成、错误或其他状态更改时,您都可以收到通知(通过监听状态事件)。当队列生产者或消费者失败时,它们的状态被保留,并且当节点重新启动时任务处理可以自动重新启动。 + +Midway 提供了 `@midwayjs/bullmq` 包作为 [BullMQ](https://github.com/taskforcesh/bullmq) 之上的抽象/包装器。BullMQ 是 Bull 的下一代实现,提供了更好的性能和更多的功能。该软件包可以轻松地将 BullMQ 以友好的方式集成到您的应用程序中。 + +BullMQ 使用 Redis 来保存作业数据,在使用 Redis 时,Queue 架构是完全分布式,和平台无关。例如,您可以在一个(或多个)节点(进程)中运行一些 Queue 生产者、消费者,而在其他节点上的运行其他生产者和消费者。 + +:::tip +bullmq 是一个分布式任务管理系统,必须依赖 redis +::: + +相关信息: + +| 描述 | | +| ----------------- | ---- | +| 可用于标准项目 | ✅ | +| 可用于 Serverless | ❌ | +| 可用于一体化 | ✅ | +| 包含独立主框架 | ✅ | +| 包含独立日志 | ✅ | + + +## 安装组件 + +```bash +$ npm i @midwayjs/bullmq@3 --save +``` + +或者在 `package.json` 中增加如下依赖后,重新安装。 + +```json +{ + "dependencies": { + "@midwayjs/bullmq": "^3.0.0", + // ... + }, +} +``` + +## 使用组件 + +将 bullmq 组件配置到代码中。 + +```typescript +import { Configuration } from '@midwayjs/core'; +import * as bullmq from '@midwayjs/bullmq'; + +@Configuration({ + imports: [ + // ... + bullmq + ] +}) +export class MainConfiguration { + //... +} +``` + +## 一些概念 + +BullMQ 将整个队列分为以下几个部分: + +- Queue:队列,管理任务 +- Job:每个任务对象,可以对任务进行启停控制 +- Worker:任务处理器,实际的逻辑执行部分 +- QueueEvents:队列事件,用于监听任务状态变化 +- FlowProducer:任务流生产者,用于创建任务依赖关系 + +## 基础配置 + +bullmq 是一个分布式任务管理器,强依赖于 redis,在 `config.default.ts` 文件中配置。 + +```typescript +// src/config/config.default.ts +export default { + // ... + bullmq: { + defaultConnection: { + host: '127.0.0.1', + port: 6379, + }, + // 可选,队列前缀 + defaultPrefix: '{midway-bullmq}', + }, +} +``` + +有账号密码情况: + +```typescript +// src/config/config.default.ts +export default { + // ... + bullmq: { + defaultConnection: { + port: 6379, + host: '127.0.0.1', + password: 'foobared', + } + }, +} +``` + +所有的队列、任务处理器、队列事件、任务流都会复用该配置。 + +## 编写任务处理器 + +使用 `@Processor` 装饰器装饰一个类,用于快速定义一个任务处理器。 + +`@Processor` 装饰器需要传递一个 Queue(队列)的名字,在框架启动时,如果没有名为 `test` 的队列,则会自动创建。 + +比如,我们在 `src/processor/test.processor.ts` 文件中编写如下代码。 + +```typescript +import { Processor, IProcessor } from '@midwayjs/bullmq'; + +@Processor('test') +export class TestProcessor implements IProcessor { + async execute(data: any) { + // 处理任务逻辑 + console.log('processing job:', data); + } +} +``` + +## 执行任务 + +当定义完 Processor 之后,由于并未指定 Processor 如何执行,我们还需要手动执行它。 + +### 手动执行任务 + +```typescript +import { Configuration, Inject } from '@midwayjs/core'; +import * as bullmq from '@midwayjs/bullmq'; + +@Configuration({ + imports: [ + bullmq + ] +}) +export class MainConfiguration { + + @Inject() + bullmqFramework: bullmq.Framework; + + async onServerReady() { + // 获取 Processor 相关的队列 + const testQueue = this.bullmqFramework.getQueue('test'); + // 立即执行这个任务 + await testQueue?.runJob(); + } +} +``` + +### 增加执行参数 + +我们也可以在执行时,附加一些参数。 + +```typescript +@Processor('test') +export class TestProcessor implements IProcessor { + async execute(params) { + // params.name => 'harry' + } +} + +// invoke +const testQueue = this.bullmqFramework.getQueue('test'); +await testQueue?.runJob({ + name: 'harry' +}); +``` + +### 任务状态和管理 + +执行 `runJob` 后,我们可以获取到一个 `Job` 对象。 + +```typescript +const testQueue = this.bullmqFramework.getQueue('test'); +const job = await testQueue?.runJob(); + +// 更新进度 +await job.updateProgress(60); +// 获取进度 +const progress = await job.progress; +// => 60 + +// 获取任务状态 +const state = await job.getState(); +// state => 'delayed' 延迟状态 +// state => 'completed' 完成状态 +// state => 'failed' 失败状态 +``` + +### 延迟执行 + +执行任务时,也有一些额外的选项。 + +比如,延迟 1s 执行。 + +```typescript +const testQueue = this.bullmqFramework.getQueue('test'); +await testQueue?.runJob({}, { delay: 1000 }); +``` + +### 任务重试 + +BullMQ 支持任务失败重试机制。 + +```typescript +const testQueue = this.bullmqFramework.getQueue('test'); +await testQueue?.runJob({}, { + attempts: 3, // 最多重试 3 次 + backoff: { // 重试策略 + type: 'exponential', // 指数退避 + delay: 1000 // 初始延迟 1 秒 + } +}); +``` + +### 任务优先级 + +可以为任务设置优先级,优先级高的任务会优先执行。 + +```typescript +const testQueue = this.bullmqFramework.getQueue('test'); +// priority 值越大优先级越高 +await testQueue?.runJob({ priority: 1 }, { priority: 3 }); // 高优先级 +await testQueue?.runJob({ priority: 2 }, { priority: 2 }); // 中优先级 +await testQueue?.runJob({ priority: 3 }, { priority: 1 }); // 低优先级 +``` + +### 中间件和错误处理 + +BullMQ 组件包含可以独立启动的 Framework,有着自己的 App 对象和 Context 结构。 + +我们可以对 bullmq 的 App 配置独立的中间件和错误过滤器。 + +```typescript +@Configuration({ + imports: [ + bullmq + ] +}) +export class MainConfiguration { + + @App('bullmq') + bullmqApp: bullmq.Application; + + async onReady() { + this.bullmqApp.useMiddleware(/*中间件*/); + this.bullmqApp.useFilter(/*过滤器*/); + } +} +``` + +### 上下文 + +任务处理器执行是在请求作用域中,其有着特殊的 Context 对象结构。 + +```typescript +export interface Context extends IMidwayContext { + jobId: string; + job: Job; + token?: string; + from: new (...args) => IProcessor; +} +``` + +我们可以直接从 ctx 中访问当前的 Job 对象。 + +```typescript +import { Processor, IProcessor, Context } from '@midwayjs/bullmq'; + +@Processor('test') +export class TestProcessor implements IProcessor { + @Inject() + ctx: Context; + + async execute(data: any) { + // ctx.jobId => 当前任务ID + // ctx.job => 当前任务对象 + } +} +``` + +## 重复执行的任务 + +除了手动执行的方式,我们也可以通过 `@Processor` 装饰器的参数,快速配置任务的重复执行。 + +```typescript +import { Processor, IProcessor } from '@midwayjs/bullmq'; +import { FORMAT } from '@midwayjs/core'; + +@Processor('test', { + repeat: { + pattern: FORMAT.CRONTAB.EVERY_PER_5_SECOND + } +}) +export class TestProcessor implements IProcessor { + async execute() { + // 每 5 秒执行一次 + } +} +``` + +## 高级功能 + +### 任务流(Flow Producer) + +BullMQ 支持创建任务依赖关系,形成任务流。 + +```typescript +const flowProducer = bullmqFramework.createFlowProducer({}, 'test-flow'); + +// 创建任务流 +await flowProducer.add({ + name: 'flow-test', + queueName: 'flow-queue-1', + data: { value: 1 }, + children: [ + { + name: 'child-job', + queueName: 'flow-queue-2', + data: { value: 2 } + } + ] +}); +``` + +### 队列事件 + +BullMQ 提供了丰富的事件系统,可以监听任务的各种状态变化。 + +```typescript +const eventQueue = bullmqFramework.createQueue('event-queue'); +const queueEvents = eventQueue.createQueueEvents(); + +// 监听任务完成事件 +queueEvents.on('completed', ({ jobId }) => { + console.log(`Job ${jobId} completed!`); +}); + +// 监听任务失败事件 +queueEvents.on('failed', ({ jobId, failedReason }) => { + console.log(`Job ${jobId} failed: ${failedReason}`); +}); +``` + +### 清理任务历史记录 + +当开启 Redis 后,默认情况下,bullmq 会记录所有的成功和失败的任务 key,这可能会导致 redis 的 key 暴涨,我们可以配置成功或者失败后清理的选项。 + +```typescript +// src/config/config.default.ts +export default { + bullmq: { + defaultQueueOptions: { + defaultJobOptions: { + removeOnComplete: 3, // 成功后只保留最近 3 条记录 + removeOnFail: 10, // 失败后只保留最近 10 条记录 + } + } + } +} +``` + +### Redis 集群 + +bullmq 可以指定 connection 实例,你可以将自己创建的 Redis 实例配置到 `defaultConnection` 中,这样就可以接入 Redis 集群。 + +```typescript +// src/config/config.default.ts +import Redis from 'ioredis'; + +const clusterOptions = { + enableReadyCheck: false, + retryDelayOnClusterDown: 300, + retryDelayOnFailover: 1000, + retryDelayOnTryAgain: 3000, + slotsRefreshTimeout: 10000, + maxRetriesPerRequest: null +} + +const redisClientInstance = new Redis.Cluster([ + { + port: 7000, + host: '127.0.0.1' + }, + { + port: 7002, + host: '127.0.0.1' + }, +], clusterOptions); + +export default { + bullmq: { + defaultConnection: redisClientInstance, + defaultPrefix: '{midway-bullmq}', + } +} +``` + +## 组件日志 + +组件有着自己的日志,默认会将 `ctx.logger` 记录在 `midway-bullmq.log` 中。 + +我们可以单独配置这个 logger 对象。 + +```typescript +export default { + midwayLogger: { + clients: { + bullMQLogger: { + fileLogName: 'midway-bullmq.log', + }, + }, + }, +} +``` + +这个日志的输出格式,我们也可以单独配置。 + +```typescript +export default { + bullmq: { + contextLoggerFormat: info => { + const { jobId, from } = info.ctx; + return `${info.timestamp} ${info.LEVEL} ${info.pid} [${jobId} ${from.name}] ${info.message}`; + }, + } +} +``` From ec5ac48d2bf7ff21c6cc231f726cc690e7aa016b Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sun, 12 Jan 2025 23:51:01 +0800 Subject: [PATCH 08/12] docs: update --- site/docs/extensions/bullmq.md | 8 +- site/docs/sidebars.json | 2 +- .../current/extensions/bullmq.md | 442 ++++++++++++++++++ 3 files changed, 449 insertions(+), 3 deletions(-) create mode 100644 site/i18n/en/docusaurus-plugin-content-docs/current/extensions/bullmq.md diff --git a/site/docs/extensions/bullmq.md b/site/docs/extensions/bullmq.md index fedfb0e6ffc..f98d3c6207b 100644 --- a/site/docs/extensions/bullmq.md +++ b/site/docs/extensions/bullmq.md @@ -1,4 +1,4 @@ -# BullMQ 任务队列 +# 任务队列 队列是一种强大的设计模式,可帮助您应对常见的应用程序扩展和性能挑战。队列可以帮助您解决的一些问题: @@ -11,7 +11,11 @@ Midway 提供了 `@midwayjs/bullmq` 包作为 [BullMQ](https://github.com/taskfo BullMQ 使用 Redis 来保存作业数据,在使用 Redis 时,Queue 架构是完全分布式,和平台无关。例如,您可以在一个(或多个)节点(进程)中运行一些 Queue 生产者、消费者,而在其他节点上的运行其他生产者和消费者。 :::tip -bullmq 是一个分布式任务管理系统,必须依赖 redis +BullMQ 是一个分布式任务管理系统,必须依赖 redis +::: + +:::caution +由于 BullMQ 是 Bull 的升级版,从 v3.20 开始,将由 BullMQ 替代 Bull 组件,如需使用 Bull 组件,请参考 [Bull 文档](./bull) ::: 相关信息: diff --git a/site/docs/sidebars.json b/site/docs/sidebars.json index 388d406a299..991aad70a7c 100644 --- a/site/docs/sidebars.json +++ b/site/docs/sidebars.json @@ -214,7 +214,7 @@ "extensions/info", "extensions/validate", "extensions/swagger", - "extensions/bull", + "extensions/bullmq", "extensions/cron", "extensions/jwt" ] diff --git a/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/bullmq.md b/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/bullmq.md new file mode 100644 index 00000000000..2b7dacffe6a --- /dev/null +++ b/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/bullmq.md @@ -0,0 +1,442 @@ +# Task Queue + +Queue is a powerful design pattern that can help you handle common application scaling and performance challenges. Here are some problems that queues can help you solve: + +- Smooth out processing peaks. You can start resource-intensive tasks at any time, add them to a queue instead of executing them synchronously. Let task processes pull tasks from the queue in a controlled manner. You can also easily add new queue consumers to scale backend task processing. +- Break down single tasks that might block the Node.js event loop. For example, if a user request requires CPU-intensive work like audio transcoding, you can delegate this task to other processes, freeing up user-facing processes to remain responsive. +- Provide reliable communication channels across various services. For example, you can queue tasks (jobs) in one process or service and consume them in another. You can receive notifications (by listening to status events) when jobs complete, fail, or undergo other status changes in any process or service's job lifecycle. When queue producers or consumers fail, their state is preserved, and task processing can automatically restart when nodes restart. + +Midway provides the @midwayjs/bullmq package as an abstraction/wrapper on top of [BullMQ](https://github.com/taskforcesh/bullmq). BullMQ is the next-generation implementation of Bull, offering better performance and more features. This package makes it easy to integrate BullMQ into your application in a friendly way. + +BullMQ uses Redis to store job data. When using Redis, the Queue architecture is completely distributed and platform-independent. For example, you can run some Queue producers and consumers in one (or more) nodes (processes) while running other producers and consumers on other nodes. + +:::tip +bullmq is a distributed task management system that requires redis +::: + +:::caution +Since BullMQ is a successor to Bull, starting with v3.20, it will replace the Bull component. If you need to use the Bull component, please refer to the [Bull documentation](./bull). +::: + +Related Information: + +| Description | | +| ----------------------- | ---- | +| Available for standard projects | ✅ | +| Available for Serverless | ❌ | +| Available for Integration | ✅ | +| Contains independent main framework | ✅ | +| Contains independent logs | ✅ | + + +## Installation + +```bash +$ npm i @midwayjs/bullmq@3 --save +``` + +Or add the following dependency to your `package.json` and reinstall. + +```json +{ + "dependencies": { + "@midwayjs/bullmq": "^3.0.0", + // ... + }, +} +``` + +## Using the Component + +Configure the bullmq component in your code. + +```typescript +import { Configuration } from '@midwayjs/core'; +import * as bullmq from '@midwayjs/bullmq'; + +@Configuration({ + imports: [ + // ... + bullmq + ] +}) +export class MainConfiguration { + //... +} +``` + +## Core Concepts + +BullMQ divides the entire queue system into the following parts: + +- Queue: Manages tasks +- Job: Each task object that can be controlled (start/stop) +- Worker: Task processor that executes the actual logic +- QueueEvents: Queue events for monitoring task status changes +- FlowProducer: Task flow producer for creating task dependencies + +## Basic Configuration + +bullmq is a distributed task manager that heavily depends on redis. Configure it in the `config.default.ts` file. + +```typescript +// src/config/config.default.ts +export default { + // ... + bullmq: { + defaultConnection: { + host: '127.0.0.1', + port: 6379, + }, + // Optional, queue prefix + defaultPrefix: '{midway-bullmq}', + }, +} +``` + +With username and password: + +```typescript +// src/config/config.default.ts +export default { + // ... + bullmq: { + defaultConnection: { + port: 6379, + host: '127.0.0.1', + password: 'foobared', + } + }, +} +``` + +All queues, processors, queue events, and flow producers will reuse this configuration. + +## Writing Task Processors + +Use the `@Processor` decorator to quickly define a task processor. + +The `@Processor` decorator requires a Queue name. If a queue with the specified name doesn't exist when the framework starts, it will be automatically created. + +For example, write the following code in `src/processor/test.processor.ts`: + +```typescript +import { Processor, IProcessor } from '@midwayjs/bullmq'; + +@Processor('test') +export class TestProcessor implements IProcessor { + async execute(data: any) { + // Process task logic + console.log('processing job:', data); + } +} +``` + +## Executing Tasks + +After defining a Processor, since we haven't specified how to execute it, we need to run it manually. + +### Manual Execution + +```typescript +import { Configuration, Inject } from '@midwayjs/core'; +import * as bullmq from '@midwayjs/bullmq'; + +@Configuration({ + imports: [ + bullmq + ] +}) +export class MainConfiguration { + + @Inject() + bullmqFramework: bullmq.Framework; + + async onServerReady() { + // Get the queue associated with the Processor + const testQueue = this.bullmqFramework.getQueue('test'); + // Execute the task immediately + await testQueue?.runJob(); + } +} +``` + +### Adding Execution Parameters + +We can attach additional parameters when executing tasks. + +```typescript +@Processor('test') +export class TestProcessor implements IProcessor { + async execute(params) { + // params.name => 'harry' + } +} + +// invoke +const testQueue = this.bullmqFramework.getQueue('test'); +await testQueue?.runJob({ + name: 'harry' +}); +``` + +### Task Status and Management + +After executing `runJob`, we get a `Job` object. + +```typescript +const testQueue = this.bullmqFramework.getQueue('test'); +const job = await testQueue?.runJob(); + +// Update progress +await job.updateProgress(60); +// Get progress +const progress = await job.progress; +// => 60 + +// Get task status +const state = await job.getState(); +// state => 'delayed' +// state => 'completed' +// state => 'failed' +``` + +### Delayed Execution + +Tasks can be executed with additional options. + +For example, to delay execution by 1 second: + +```typescript +const testQueue = this.bullmqFramework.getQueue('test'); +await testQueue?.runJob({}, { delay: 1000 }); +``` + +### Task Retry + +BullMQ supports task failure retry mechanism. + +```typescript +const testQueue = this.bullmqFramework.getQueue('test'); +await testQueue?.runJob({}, { + attempts: 3, // Maximum 3 retry attempts + backoff: { // Retry strategy + type: 'exponential', // Exponential backoff + delay: 1000 // Initial delay of 1 second + } +}); +``` + +### Task Priority + +Tasks can be assigned priorities, with higher priority tasks executing first. + +```typescript +const testQueue = this.bullmqFramework.getQueue('test'); +// Higher priority value means higher priority +await testQueue?.runJob({ priority: 1 }, { priority: 3 }); // High priority +await testQueue?.runJob({ priority: 2 }, { priority: 2 }); // Medium priority +await testQueue?.runJob({ priority: 3 }, { priority: 1 }); // Low priority +``` + +### Middleware and Error Handling + +The BullMQ component includes an independently startable Framework with its own App object and Context structure. + +We can configure independent middleware and error filters for the bullmq App. + +```typescript +@Configuration({ + imports: [ + bullmq + ] +}) +export class MainConfiguration { + + @App('bullmq') + bullmqApp: bullmq.Application; + + async onReady() { + this.bullmqApp.useMiddleware(/*middleware*/); + this.bullmqApp.useFilter(/*filter*/); + } +} +``` + +### Context + +Task processors execute in request scope and have a special Context object structure. + +```typescript +export interface Context extends IMidwayContext { + jobId: string; + job: Job; + token?: string; + from: new (...args) => IProcessor; +} +``` + +We can access the current Job object directly from ctx. + +```typescript +import { Processor, IProcessor, Context } from '@midwayjs/bullmq'; + +@Processor('test') +export class TestProcessor implements IProcessor { + @Inject() + ctx: Context; + + async execute(data: any) { + // ctx.jobId => current task ID + // ctx.job => current job object + } +} +``` + +## Repeatable Tasks + +Besides manual execution, we can quickly configure repeatable task execution through the `@Processor` decorator parameters. + +```typescript +import { Processor, IProcessor } from '@midwayjs/bullmq'; +import { FORMAT } from '@midwayjs/core'; + +@Processor('test', { + repeat: { + pattern: FORMAT.CRONTAB.EVERY_PER_5_SECOND + } +}) +export class TestProcessor implements IProcessor { + async execute() { + // Executes every 5 seconds + } +} +``` + +## Advanced Features + +### Task Flow (Flow Producer) + +BullMQ supports creating task dependencies to form task flows. + +```typescript +const flowProducer = bullmqFramework.createFlowProducer({}, 'test-flow'); + +// Create task flow +await flowProducer.add({ + name: 'flow-test', + queueName: 'flow-queue-1', + data: { value: 1 }, + children: [ + { + name: 'child-job', + queueName: 'flow-queue-2', + data: { value: 2 } + } + ] +}); +``` + +### Queue Events + +BullMQ provides a rich event system for monitoring various task status changes. + +```typescript +const eventQueue = bullmqFramework.createQueue('event-queue'); +const queueEvents = eventQueue.createQueueEvents(); + +// Listen for task completion +queueEvents.on('completed', ({ jobId }) => { + console.log(`Job ${jobId} completed!`); +}); + +// Listen for task failure +queueEvents.on('failed', ({ jobId, failedReason }) => { + console.log(`Job ${jobId} failed: ${failedReason}`); +}); +``` + +### Cleaning Task History + +When Redis is enabled, bullmq records all successful and failed task keys by default, which may cause Redis keys to grow rapidly. We can configure cleanup options for successful or failed tasks. + +```typescript +// src/config/config.default.ts +export default { + bullmq: { + defaultQueueOptions: { + defaultJobOptions: { + removeOnComplete: 3, // Keep only the last 3 records after success + removeOnFail: 10, // Keep only the last 10 records after failure + } + } + } +} +``` + +### Redis Cluster + +bullmq allows you to specify a connection instance. You can configure your own Redis instance in `defaultConnection` to connect to a Redis cluster. + +```typescript +// src/config/config.default.ts +import Redis from 'ioredis'; + +const clusterOptions = { + enableReadyCheck: false, + retryDelayOnClusterDown: 300, + retryDelayOnFailover: 1000, + retryDelayOnTryAgain: 3000, + slotsRefreshTimeout: 10000, + maxRetriesPerRequest: null +} + +const redisClientInstance = new Redis.Cluster([ + { + port: 7000, + host: '127.0.0.1' + }, + { + port: 7002, + host: '127.0.0.1' + }, +], clusterOptions); + +export default { + bullmq: { + defaultConnection: redisClientInstance, + defaultPrefix: '{midway-bullmq}', + } +} +``` + +## Component Logging + +The component has its own logs, by default recording `ctx.logger` in `midway-bullmq.log`. + +We can configure this logger object separately. + +```typescript +export default { + midwayLogger: { + clients: { + bullMQLogger: { + fileLogName: 'midway-bullmq.log', + }, + }, + }, +} +``` + +We can also configure the log output format separately. + +```typescript +export default { + bullmq: { + contextLoggerFormat: info => { + const { jobId, from } = info.ctx; + return `${info.timestamp} ${info.LEVEL} ${info.pid} [${jobId} ${from.name}] ${info.message}`; + }, + } +} +``` \ No newline at end of file From 3c00e6ebdaf53ea7e20be62f3518d10e2cdd18b1 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sun, 12 Jan 2025 23:59:24 +0800 Subject: [PATCH 09/12] docs: update --- .../base-app-express/src/configuration.ts | 3 +- .../base-app-koa/src/configuration.ts | 3 +- site/docs/extensions/bullmq.md | 90 ++++++++++++++++++ .../current/extensions/bullmq.md | 92 ++++++++++++++++++- 4 files changed, 185 insertions(+), 3 deletions(-) diff --git a/packages/bull-board/test/fixtures/base-app-express/src/configuration.ts b/packages/bull-board/test/fixtures/base-app-express/src/configuration.ts index 2b6577dbbfd..d1bbd705e05 100644 --- a/packages/bull-board/test/fixtures/base-app-express/src/configuration.ts +++ b/packages/bull-board/test/fixtures/base-app-express/src/configuration.ts @@ -2,9 +2,10 @@ import { Configuration } from '@midwayjs/core'; import * as bullBoard from '../../../../src'; import { join } from 'path' import * as express from '@midwayjs/express'; +import * as bull from '@midwayjs/bull'; @Configuration({ - imports: [express, bullBoard], + imports: [express, bull, bullBoard], importConfigs: [join(__dirname, 'config')] }) export class AutoConfiguration { diff --git a/packages/bull-board/test/fixtures/base-app-koa/src/configuration.ts b/packages/bull-board/test/fixtures/base-app-koa/src/configuration.ts index 427480d364e..55f4b739d86 100644 --- a/packages/bull-board/test/fixtures/base-app-koa/src/configuration.ts +++ b/packages/bull-board/test/fixtures/base-app-koa/src/configuration.ts @@ -2,9 +2,10 @@ import { Configuration } from '@midwayjs/core'; import * as bullBoard from '../../../../src'; import { join } from 'path' import * as koa from '@midwayjs/koa'; +import * as bull from '@midwayjs/bull'; @Configuration({ - imports: [koa, bullBoard], + imports: [koa, bull, bullBoard], importConfigs: [join(__dirname, 'config')] }) export class AutoConfiguration { diff --git a/site/docs/extensions/bullmq.md b/site/docs/extensions/bullmq.md index f98d3c6207b..8ab72d0f250 100644 --- a/site/docs/extensions/bullmq.md +++ b/site/docs/extensions/bullmq.md @@ -440,3 +440,93 @@ export default { } } ``` + +## Bull UI + +在分布式场景中,我们可以资利用 Bull UI 来简化管理。 + +和 bull 组件类似,需要独立安装和启用。 + +```bash +$ npm i @midwayjs/bull-board@3 --save +``` + +或者在 `package.json` 中增加如下依赖后,重新安装。 + +```json +{ + "dependencies": { + "@midwayjs/bull-board": "^3.0.0", + // ... + }, +} +``` + +将 bull-board 组件配置到代码中。 + +```typescript +import { Configuration } from '@midwayjs/core'; +import * as bullmq from '@midwayjs/bullmq'; +import * as bullBoard from '@midwayjs/bull-board'; + +@Configuration({ + imports: [ + // ... + bullmq, + bullBoard, + ] +}) +export class MainConfiguration { + //... +} +``` + +默认的访问路径为:`http://127.1:7001/ui`。 + +效果如下: + +![](https://img.alicdn.com/imgextra/i2/O1CN01j4wEFb1UacPxA06gs_!!6000000002534-2-tps-1932-1136.png) + +可以通过配置进行基础路径的修改。 + +```typescript +// src/config/config.prod.ts +export default { + // ... + bullBoard: { + basePath: '/ui', + }, +} +``` + +此外,组件提供了 `BullBoardManager` ,可以添加动态创建的队列。 + +```typescript +import { Configuration, Inject } from '@midwayjs/core'; +import * as bullmq from '@midwayjs/bullmq'; +import * as bullBoard from '@midwayjs/bull-board'; + +@Configuration({ + imports: [ + // ... + bullmq, + bullBoard + ] +}) +export class MainConfiguration { + + @Inject() + bullmqFramework: bullmq.Framework; + + @Inject() + bullBoardManager: bullBoard.BullBoardManager; + + async onReady() { + const testQueue = this.bullmqFramework.createQueue('test', { + // ... + }); + + this.bullBoardManager.addQueue(testQueue); + } +} +``` diff --git a/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/bullmq.md b/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/bullmq.md index 2b7dacffe6a..cc28031b242 100644 --- a/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/bullmq.md +++ b/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/bullmq.md @@ -439,4 +439,94 @@ export default { }, } } -``` \ No newline at end of file +``` + +## Bull UI + +In a distributed scenario, we can leverage the Bull UI to simplify management. + +Similar to the bull component, it needs to be installed and enabled independently. + +```bash +$ npm i @midwayjs/bull-board@3 --save +``` + +Or reinstall it after adding the following dependencies to ``package.json``. + +```json +{ + "dependencies": { + "@midwayjs/bull-board": "^3.0.0", + // ... + }, +} +``` + +Configure the bull-board component into the code. + +```typescript +import { Configuration } from '@midwayjs/core'; +import * as bullmq from '@midwayjs/bullmq'; +import * as bullBoard from '@midwayjs/bull-board'; + +@Configuration({ + imports: [ + // ... + bullmq, + bullBoard, + ] +}) +export class MainConfiguration { + //... +} +``` + +The default access path is: `http://127.1:7001/ui`. + +The effect is as follows. + +![](https://img.alicdn.com/imgextra/i2/O1CN01j4wEFb1UacPxA06gs_!!6000000002534-2-tps-1932-1136.png) + +The base path can be modified by configuration. + +```typescript +// src/config/config.prod.ts +export default { + // ... + bullBoard: { + basePath: '/ui', + }, +} +``` + +In addition, the component provides the `BullBoardManager` class, which can add queues dynamically created. + +```typescript +import { Configuration, Inject } from '@midwayjs/core'; +import * as bullmq from '@midwayjs/bullmq'; +import * as bullBoard from '@midwayjs/bull-board'; + +@Configuration({ + imports: [ + // ... + bullmq, + bullBoard + ] +}) +export class MainConfiguration { + + @Inject() + bullmqFramework: bullmq.Framework; + + @Inject() + bullBoardManager: bullBoard.BullBoardManager; + + async onReady() { + const testQueue = this.bullmqFramework.createQueue('test', { + // ... + }); + + this.bullBoardManager.addQueue(testQueue); + } +} +``` \ No newline at end of file From c2444ea0c53aaf4e20b4f77cde9ed78c7bffb92e Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Mon, 13 Jan 2025 22:53:45 +0800 Subject: [PATCH 10/12] fix: merge config --- packages/bullmq/src/framework.ts | 13 ++++++++----- packages/bullmq/test/index.test.ts | 6 +++--- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/packages/bullmq/src/framework.ts b/packages/bullmq/src/framework.ts index db72cae3366..d707adb5f4d 100644 --- a/packages/bullmq/src/framework.ts +++ b/packages/bullmq/src/framework.ts @@ -9,6 +9,7 @@ import { Logger, ILogger, MidwayCommonError, + extend, } from '@midwayjs/core'; import { Application, Context, IProcessor } from './interface'; import { @@ -197,11 +198,13 @@ export class BullMQFramework extends BaseFramework { * Create a queue with name and queueOptions */ public createQueue(name: string, queueOptions: Partial = {}) { - const mergedOptions = { - ...this.defaultQueueConfig, - ...this.defaultConnection, - ...queueOptions, - }; + const mergedOptions = extend( + true, + {}, + this.defaultQueueConfig, + this.defaultConnection, + queueOptions + ); const queue = new BullMQQueue(name, mergedOptions); this.queueMap.set(name, queue); queue.on('error', err => { diff --git a/packages/bullmq/test/index.test.ts b/packages/bullmq/test/index.test.ts index bb4f2f3aac2..0527457a924 100644 --- a/packages/bullmq/test/index.test.ts +++ b/packages/bullmq/test/index.test.ts @@ -173,9 +173,9 @@ describe(`/test/index.test.ts`, () => { const priorityQueue = bullFramework.getQueue('priorityTask'); // 添加不同优先级的任务 - await priorityQueue?.runJob({ priority: 3 }, { priority: 3 }); // 低优先级 - await priorityQueue?.runJob({ priority: 2 }, { priority: 2 }); // 中优先级 - await priorityQueue?.runJob({ priority: 1 }, { priority: 1 }); // 高优先级 + await priorityQueue?.runJob({ priority: 3 }, { priority: 3, delay: 100 }); // 低优先级 + await priorityQueue?.runJob({ priority: 2 }, { priority: 2, delay: 100 }); // 中优先级 + await priorityQueue?.runJob({ priority: 1 }, { priority: 1, delay: 100 }); // 高优先级 await sleep(2000); From 5a6a15bfb3d8bdb3fc7fc6cf964c175c533c64ff Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Mon, 13 Jan 2025 23:14:23 +0800 Subject: [PATCH 11/12] fix: try to fix test --- packages/bullmq/src/framework.ts | 7 +++++++ packages/bullmq/test/index.test.ts | 11 +++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/packages/bullmq/src/framework.ts b/packages/bullmq/src/framework.ts index d707adb5f4d..6bff0b48a7d 100644 --- a/packages/bullmq/src/framework.ts +++ b/packages/bullmq/src/framework.ts @@ -83,6 +83,13 @@ export class BullMQQueue extends Queue { public getQueueEventsProducerList() { return this.queueEventsProducerList; } + + public async close() { + // 并发关闭 + await Promise.all(this.queueEventsList.map(evt => evt.close())); + await Promise.all(this.queueEventsProducerList.map(producer => producer.close())); + await super.close(); + } } @Framework() diff --git a/packages/bullmq/test/index.test.ts b/packages/bullmq/test/index.test.ts index 0527457a924..79f22fc34b9 100644 --- a/packages/bullmq/test/index.test.ts +++ b/packages/bullmq/test/index.test.ts @@ -438,7 +438,7 @@ describe(`/test/index.test.ts`, () => { events.push(`failed:${jobId}`); }); - // 创建成功和失败的任务 + // 创建 worker 并等待其准备就绪 const worker = bullFramework.createWorker( 'event-queue', async (job) => { @@ -449,10 +449,17 @@ describe(`/test/index.test.ts`, () => { } ); + // 等待 worker 准备就绪 + await new Promise(resolve => worker.on('ready', () => resolve())); + + // 添加一个小延迟确保事件监听器已设置 + await sleep(1000); + const job1 = await eventQueue.runJob({ shouldFail: false }); const job2 = await eventQueue.runJob({ shouldFail: true }); - await sleep(2000); + // 增加等待时间,确保事件能够被正确捕获 + await sleep(3000); expect(events).toContain(`completed:${job1.id}`); expect(events).toContain(`failed:${job2.id}`); From 6e7d4b0835bde093f538e4b3cb8a0e7a9e4b6df6 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Tue, 14 Jan 2025 22:30:40 +0800 Subject: [PATCH 12/12] fix: lint --- packages/bullmq/package.json | 4 +-- packages/bullmq/src/framework.ts | 28 +++++++++++-------- site/docs/extensions/bullmq.md | 10 +++++++ .../current/extensions/bullmq.md | 10 +++++++ 4 files changed, 38 insertions(+), 14 deletions(-) diff --git a/packages/bullmq/package.json b/packages/bullmq/package.json index be839aa2195..971cb17f698 100644 --- a/packages/bullmq/package.json +++ b/packages/bullmq/package.json @@ -1,7 +1,7 @@ { "name": "@midwayjs/bullmq", "version": "0.0.1", - "description": "midway component for bullMQ", + "description": "midway component for BullMQ", "main": "dist/index.js", "typings": "index.d.ts", "scripts": { @@ -13,7 +13,7 @@ "midway", "IoC", "task", - "bullmq", + "BullMQ", "plugin" ], "author": "guo qicong", diff --git a/packages/bullmq/src/framework.ts b/packages/bullmq/src/framework.ts index 6bff0b48a7d..f8e54fa9cdb 100644 --- a/packages/bullmq/src/framework.ts +++ b/packages/bullmq/src/framework.ts @@ -87,7 +87,9 @@ export class BullMQQueue extends Queue { public async close() { // 并发关闭 await Promise.all(this.queueEventsList.map(evt => evt.close())); - await Promise.all(this.queueEventsProducerList.map(producer => producer.close())); + await Promise.all( + this.queueEventsProducerList.map(producer => producer.close()) + ); await super.close(); } } @@ -188,17 +190,19 @@ export class BullMQFramework extends BaseFramework { protected async beforeStop() { // loop queueMap and stop all queue - for (const queue of this.queueMap.values()) { - await queue.close(); - } - for (const worker of this.workerMap.values()) { - for (const w of worker) { - await w.close(); - } - } - for (const producer of this.flowProducerMap.values()) { - await producer.close(); - } + await Promise.all( + Array.from(this.queueMap.values()).map(queue => queue.close()) + ); + await Promise.all( + Array.from(this.workerMap.values()).map(worker => + worker.map(w => w.close()) + ) + ); + await Promise.all( + Array.from(this.flowProducerMap.values()).map(producer => + producer.close() + ) + ); } /** diff --git a/site/docs/extensions/bullmq.md b/site/docs/extensions/bullmq.md index 8ab72d0f250..3759cfe83eb 100644 --- a/site/docs/extensions/bullmq.md +++ b/site/docs/extensions/bullmq.md @@ -441,6 +441,16 @@ export default { } ``` +## BullMQ 原始对象 + +组件导出了 BullMQ 的原始对象,可以进行更多的操作。 + +```typescript +import { BullMQ } from '@midwayjs/bullmq'; +``` + +你可以通过 `BullMQ` 对象,获取到 `Queue`、`Worker`、`FlowProducer` 等对象定义。 + ## Bull UI 在分布式场景中,我们可以资利用 Bull UI 来简化管理。 diff --git a/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/bullmq.md b/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/bullmq.md index cc28031b242..a82da781c80 100644 --- a/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/bullmq.md +++ b/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/bullmq.md @@ -441,6 +441,16 @@ export default { } ``` +## BullMQ Original Objects + +The component exports the original BullMQ objects, which can be used for more operations. + +```typescript +import { BullMQ } from '@midwayjs/bullmq'; +``` + +Through the `BullMQ` object, you can access object definitions such as `Queue`, `Worker`, `FlowProducer`, etc. + ## Bull UI In a distributed scenario, we can leverage the Bull UI to simplify management.