Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiple handling of scheduled jobs in 'cluster' mode #24

Open
patriceperez opened this issue Jan 3, 2016 · 5 comments
Open

multiple handling of scheduled jobs in 'cluster' mode #24

patriceperez opened this issue Jan 3, 2016 · 5 comments

Comments

@patriceperez
Copy link

I have set up kue to run with the Cluster module, which spawns a child process of kue for every CPU core available..

When the scheduler inserts the every keys to redis, everything seems to be in order - only a single keyset for every every run.

However, when the time to trigger the job processing comes, all child processes (workers) start handling the processing logic, which results in the job having multiple instances triggered from a single "scheduler" entry.

This symptom does not seem to happen when programmatically triggering a new job in kue, nor does it happen when using the kue API to do so.

Please advise

Main Bootstrap Code

    var cluster = require('cluster');
    var numCPUs = require('os').cpus().length;

    if (cluster.isMaster) {
        require('./init.js');

        for (var i = 0; i < numCPUs; i++) {
            cluster.fork();
        }
        cluster.on('online', function (worker) {
            console.log('Worker ' + worker.process.pid + ' is online');
        });

        cluster.on('exit', function (worker, code, signal) {
            console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
            console.log('Starting a new worker');
            cluster.fork();
        });

        require('./scheduler.js');
    } else {
        require("./job_types");
    }

init.js

    /// Module Dependency
    var cors = require('cors');
    var kue = require('kue-scheduler');
    var express = require('express');
    var bodyParser = require('body-parser');
    var config = require('./configs/config');
    var redis = require("redis");

    var client = redis.createClient();
    var jobs = kue.createQueue();

    require('./routes');

    // Clearing redis for clean startup
    console.log('Clearing old Redis data...');
    client.flushall();

    /// Webserver
    var corsOptions = {origin: '*'};

    var app = express();
    app.use(cors(corsOptions));
    app.options('*', cors(corsOptions));
    app.use(bodyParser.json());
    app.use(kue.app);
    app.listen(config.env.port, function () {
        var host = config.env.host;
        var port = config.env.port;

        console.log('[' + process.pid + '] Monitoring kue listening at http://%s:%s', host, port);
    });

    // Handling safe shutdown
    process.once('SIGTERM', function (sig) {
        kue.shutdown(5000, function (err) {
            console.log('[' + process.pid + '] Kue shutdown: ', err || '');
            process.exit(0);
        });
    });

    process.on('uncaughtException', function (err) {
        console.log('[' + process.pid + '] ' + err);
        console.log('[' + process.pid + '] ' + err.stack);
    });

scheduler.js

    var scheduler = require('kue-scheduler');
    var q = scheduler.createQueue();

    // Set specific job scheduling here
    q.every('1 minutes', q.createJob('getSocialEntities').attempts(3).priority('normal'));

    // General scheduler event handling
    // Uncomment for debug
    q.on('already scheduled', function (job) {
        console.log('[' + process.pid + '] job is already scheduled: ' + job.type + ' (' + job.id + ')');
    });

    q.on('schedule success', function (job) {
        console.log('[' + process.pid + '] job scheduled: ' + job.type + ' (' + job.id + ')');
    });

    q.on('schedule error', function (error) {
        console.error('[' + process.pid + '] failed scheduling job');
        console.error(error);
    });
  • job_types - contains the kue jobs.process('job type',...) methods to handle all the job logic itself.
@lykmapipo
Copy link
Owner

@patriceperez

Sorry for that.

Currently kue-scheduler every schedules can not be run on cluster mode or multi-process. See
Am working hard to add that functionality. I will appreciate PR or a discussion.

@patriceperez
Copy link
Author

sure thing, would love to help, though I am not very well versed in the inner workings of kue it seems that if you can simulate the trigger injection that the kue JSON API does - it should fix the problem...

@patriceperez
Copy link
Author

@lykmapipo
Well it seems that doing a simple Queue.createJob(...).save() does insert a job that is being handled only once in cluster mode with 8 workers.

If I had to guess, I would wager that either _buildJob() or every() function is the culprit...
Since i do get the random [Error: Invalid job data] here and there

@rousselle
Copy link

Is there an ETA for a release containing this fix?

@lykmapipo
Copy link
Owner

@rousselle I will plan the release.

Thanks for the ping

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants