Skip to content

Commit

Permalink
Instrument bullmq
Browse files Browse the repository at this point in the history
  • Loading branch information
zodern committed Dec 11, 2024
1 parent 7823fe0 commit 8c05f01
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 2 deletions.
48 changes: 48 additions & 0 deletions lib/hijack/bullmq.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { checkModuleUsed, tryResolve } from './commonjs-utils';

export function wrapBullMQ () {
Meteor.startup(() => {
if (checkModuleUsed('bullmq')) {
instrumentBullMQ(tryResolve('bullmq'));
}
});
}

function instrumentBullMQ (modulePath) {
let bullMq = Npm.require(modulePath);

let oldAdd = bullMq.Queue.prototype.addJob;
bullMq.Queue.prototype.addJob = function () {
Kadira.models.jobs.trackNewJob(this.name);
return oldAdd.apply(this, arguments);
};

let oldAddBulk = bullMq.Queue.prototype.addJobs;
bullMq.Queue.prototype.addJobs = function (jobs) {
let count = jobs && jobs.length || 0;

Kadira.models.jobs.trackNewJob(this.name, count);

return oldAddBulk.apply(this, arguments);
};

let oldProcessJob = bullMq.Worker.prototype.callProcessJob;
bullMq.Worker.prototype.callProcessJob = function (...args) {
let job = args[0];
let name = this.name;

return Kadira.traceJob({
name,
waitTime: Date.now() - (job.timestamp + (job.delay || 0)),
_attributes: {
jobId: job.id,
jobName: job.name,
jobCreated: new Date(job.timestamp),
jobDelay: job.delay || 0,
queueName: job.queueName,
attemptsMade: job.attemptsMade,
},
data: job.data
}, () => oldProcessJob.apply(this, args));
};
}
2 changes: 2 additions & 0 deletions lib/hijack/instrument.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { hijackDBOps } from './db';
import { wrapRedisOplogObserveDriver } from './redis_oplog';
import { wrapSyncedCron } from './synced-cron.js';
import { wrapAgenda } from './agenda.js';
import { wrapBullMQ } from './bullmq.js';

let instrumented = false;
Kadira._startInstrumenting = function (callback = () => {}) {
Expand All @@ -37,6 +38,7 @@ Kadira._startInstrumenting = function (callback = () => {}) {
wrapRouters();
wrapSyncedCron();
wrapAgenda();
wrapBullMQ();

MeteorX.onReady(function () {
// instrumenting session
Expand Down
10 changes: 8 additions & 2 deletions lib/kadira.js
Original file line number Diff line number Diff line change
Expand Up @@ -509,10 +509,16 @@ Kadira.traceJob = function (details, processor) {

let trace = Kadira.tracer.start(details.name, 'job');

Kadira.tracer.event(trace, 'start', {
let startData = {
waitTime: wait,
data: stringifiedParams
});
};

if (details._attributes) {
Object.assign(startData, details._attributes);
}

Kadira.tracer.event(trace, 'start', startData);

Kadira._setInfo({ trace });
Kadira.models.jobs.trackActiveJobs(details.name, 1);
Expand Down

0 comments on commit 8c05f01

Please sign in to comment.