Skip to content

Commit

Permalink
feat: allow master disable worker refork (#110)
Browse files Browse the repository at this point in the history
using `worker.disableRefork = true`
  • Loading branch information
fengmk2 authored Sep 25, 2017
1 parent bdce0df commit 9fb55a1
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 16 deletions.
60 changes: 60 additions & 0 deletions fixtures/kill_worker/master.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
'use strict';

var path = require('path');
var util = require('util');
var cfork = require('../../');

cfork({
exec: path.join(__dirname, '../worker.js'),
slaves: [
path.join(__dirname, '../slave.js')
],
args: [ 1984 ],
limit: 4,
count: 2,
duration: 60000,
autoCoverage: true,
})
.on('fork', function (worker) {
console.warn('[%s] [worker:%d] new worker start', Date(), worker.process.pid);
})
.on('listening', function (worker, address) {
console.warn('[%s] [worker:%d] listening on %j', Date(), worker.process.pid, address.port);
process.send('listening');
})
.on('disconnect', function (worker) {
var propertyName = worker.hasOwnProperty('exitedAfterDisconnect') ? 'exitedAfterDisconnect' : 'suicide';
console.warn('[%s] [master:%s] worker:%s disconnect, %s: %s, state: %s.',
Date(), process.pid, worker.process.pid, propertyName, worker[propertyName], worker.state);
})
.on('exit', function (worker, code, signal) {
var exitCode = worker.process.exitCode;
var propertyName = worker.hasOwnProperty('exitedAfterDisconnect') ? 'exitedAfterDisconnect' : 'suicide';
var err = new Error(util.format('worker %s died (code: %s, signal: %s, %s: %s, state: %s)',
worker.process.pid, exitCode, signal, propertyName, worker[propertyName], worker.state));
err.name = 'WorkerDiedError';
console.error('[%s] [master:%s] worker exit: %s', Date(), process.pid, err.stack);
})
.on('reachReforkLimit', function () {
process.send('reach refork limit');
});

process.once('SIGTERM', function () {
process.exit(0);
});

var cluster = require('cluster');
var http = require('http');
var port = 1986;

http.createServer(function (req, res) {
// kill worker
var count = 0;
for (var id in cluster.workers) {
var worker = cluster.workers[id];
worker.disableRefork = true;
worker.process.kill('SIGTERM');
count++;
}
res.end('kill ' + count + ' workers');
}).listen(port);
41 changes: 26 additions & 15 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
var cluster = require('cluster');
var os = require('os');
var util = require('util');
var utility = require('utility');

var defer = global.setImmediate || process.nextTick;

Expand Down Expand Up @@ -82,50 +83,60 @@ function fork(options) {
disconnectCount++;
var isDead = worker.isDead && worker.isDead();
var propertyName = worker.hasOwnProperty('exitedAfterDisconnect') ? 'exitedAfterDisconnect' : 'suicide';
console.error('[%s] [cfork:master:%s] worker:%s disconnect (%s: %s, state: %s, isDead: %s)',
Date(), process.pid, worker.process.pid, propertyName, worker[propertyName],
worker.state, isDead);
console.error('[%s] [cfork:master:%s] worker:%s disconnect (%s: %s, state: %s, isDead: %s, worker.disableRefork: %s)',
utility.logDate(), process.pid, worker.process.pid, propertyName, worker[propertyName],
worker.state, isDead, worker.disableRefork);
if (isDead) {
// worker has terminated before disconnect
console.error('[%s] [cfork:master:%s] don\'t fork, because worker:%s exit event emit before disconnect',
Date(), process.pid, worker.process.pid);
utility.logDate(), process.pid, worker.process.pid);
return;
}
if (worker.disableRefork) {
// worker has terminated by master, like egg-cluster master will set disableRefork to true
console.error('[%s] [cfork:master:%s] don\'t fork, because worker:%s will be kill soon',
utility.logDate(), process.pid, worker.process.pid);
return;
}

disconnects[worker.process.pid] = Date();
disconnects[worker.process.pid] = utility.logDate();
if (allow()) {
newWorker = forkWorker(worker._clusterSettings);
newWorker._clusterSettings = worker._clusterSettings;
console.error('[%s] [cfork:master:%s] new worker:%s fork (state: %s)',
Date(), process.pid, newWorker.process.pid, newWorker.state);
utility.logDate(), process.pid, newWorker.process.pid, newWorker.state);
} else {
console.error('[%s] [cfork:master:%s] don\'t fork new work (refork: %s)',
Date(), process.pid, refork);
utility.logDate(), process.pid, refork);
}
});

cluster.on('exit', function (worker, code, signal) {
var isExpected = !!disconnects[worker.process.pid];
var isDead = worker.isDead && worker.isDead();
var propertyName = worker.hasOwnProperty('exitedAfterDisconnect') ? 'exitedAfterDisconnect' : 'suicide';
console.error('[%s] [cfork:master:%s] worker:%s exit (code: %s, %s: %s, state: %s, isDead: %s, isExpected: %s)',
Date(), process.pid, worker.process.pid, code, propertyName, worker[propertyName],
worker.state, isDead, isExpected);
console.error('[%s] [cfork:master:%s] worker:%s exit (code: %s, %s: %s, state: %s, isDead: %s, isExpected: %s, worker.disableRefork: %s)',
utility.logDate(), process.pid, worker.process.pid, code, propertyName, worker[propertyName],
worker.state, isDead, isExpected, worker.disableRefork);
if (isExpected) {
delete disconnects[worker.process.pid];
// worker disconnect first, exit expected
return;
}
if (worker.disableRefork) {
// worker is killed by master
return;
}

unexpectedCount++;
if (allow()) {
newWorker = forkWorker(worker._clusterSettings);
newWorker._clusterSettings = worker._clusterSettings;
console.error('[%s] [cfork:master:%s] new worker:%s fork (state: %s)',
Date(), process.pid, newWorker.process.pid, newWorker.state);
utility.logDate(), process.pid, newWorker.process.pid, newWorker.state);
} else {
console.error('[%s] [cfork:master:%s] don\'t fork new work (refork: %s)',
Date(), process.pid, refork);
utility.logDate(), process.pid, refork);
}
cluster.emit('unexpectedExit', worker, code, signal);
});
Expand Down Expand Up @@ -195,7 +206,7 @@ function fork(options) {
if (!err) {
return;
}
console.error('[%s] [cfork:master:%s] master uncaughtException: %s', Date(), process.pid, err.stack);
console.error('[%s] [cfork:master:%s] master uncaughtException: %s', utility.logDate(), process.pid, err.stack);
console.error(err);
console.error('(total %d disconnect, %d unexpected exit)', disconnectCount, unexpectedCount);
}
Expand All @@ -212,7 +223,7 @@ function fork(options) {
err.name = 'WorkerDiedUnexpectedError';

console.error('[%s] [cfork:master:%s] (total %d disconnect, %d unexpected exit) %s',
Date(), process.pid, disconnectCount, unexpectedCount, err.stack);
utility.logDate(), process.pid, disconnectCount, unexpectedCount, err.stack);
}

/**
Expand All @@ -221,7 +232,7 @@ function fork(options) {

function onReachReforkLimit() {
console.error('[%s] [cfork:master:%s] worker died too fast (total %d disconnect, %d unexpected exit)',
Date(), process.pid, disconnectCount, unexpectedCount);
utility.logDate(), process.pid, disconnectCount, unexpectedCount);
}

/**
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
"clean": "rm -rf coverage",
"ci": "mocha -t 15000 test/*.test.js"
},
"dependencies": {},
"dependencies": {
"utility": "^1.12.0"
},
"devDependencies": {
"childprocess": "1",
"contributors": "*",
Expand Down
44 changes: 44 additions & 0 deletions test/kill_worker.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
'use strict';

var assert = require('assert');
var urllib = require('urllib');
var childprocess = require('childprocess');
var path = require('path');

describe('kill_worker.test.test.js', function() {
var child;
var messages = [];

before(function(done) {
var workerNum = 2;
var slaveNum = 1;
var listeningCount = 0;
child = childprocess.fork(path.join(__dirname, '..', 'fixtures', 'kill_worker', 'master.js'));
child.on('message', function (m) {
messages.push(m);
console.log(m, listeningCount);
if (m === 'listening') {
++listeningCount;
if (listeningCount === (workerNum + slaveNum)) {
done();
}
}
});
});

after(function(done) {
setTimeout(function() {
child.kill('SIGTERM');
setTimeout(done, 1000);
}, 1000);
});

it('should kill all workers', function(done) {
urllib.request('http://localhost:1986/', function (err, body) {
console.log(err, body.toString());
assert(!err);
assert(body.toString() === 'kill 3 workers');
done();
});
});
});

0 comments on commit 9fb55a1

Please sign in to comment.