Skip to content

Commit

Permalink
Some style tightening.
Browse files Browse the repository at this point in the history
More worker tests.
  • Loading branch information
ceejbot committed May 5, 2013
1 parent 772d470 commit 54cfb50
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 76 deletions.
3 changes: 2 additions & 1 deletion .jshintrc
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,6 @@
//"iLoveDouglas"
],
"quotmark": true,
"unused": "true"
"unused": "true",
"indent": 4
}
56 changes: 29 additions & 27 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ FiveBeansClient.prototype.connect = function()
data.copy(tmp, self.buffer.length);
self.buffer = tmp;
}
return self.tryHandlingResponse();

self.tryHandlingResponse();
});

self.stream.on('connect', function()
Expand Down Expand Up @@ -200,14 +201,14 @@ ResponseHandler.prototype.parseBody = function(how)

switch(how)
{
case 'yaml':
this.args.push(yaml.load(body.toString()));
break;

case 'passthrough':
default:
this.args.push(body);
break;
case 'yaml':
this.args.push(yaml.load(body.toString()));
break;

case 'passthrough':
default:
this.args.push(body);
break;
}
}
};
Expand Down Expand Up @@ -264,32 +265,33 @@ function makeBeanstalkCommand(command, expectedResponse, sendsData)
FiveBeansClient.prototype.use = makeBeanstalkCommand('use', 'USING');
FiveBeansClient.prototype.put = makeBeanstalkCommand('put', 'INSERTED', true);

FiveBeansClient.prototype.watch = makeBeanstalkCommand('watch', 'WATCHING');
FiveBeansClient.prototype.ignore = makeBeanstalkCommand('ignore', 'WATCHING');
FiveBeansClient.prototype.reserve = makeBeanstalkCommand('reserve', 'RESERVED');
FiveBeansClient.prototype.watch = makeBeanstalkCommand('watch', 'WATCHING');
FiveBeansClient.prototype.ignore = makeBeanstalkCommand('ignore', 'WATCHING');
FiveBeansClient.prototype.reserve = makeBeanstalkCommand('reserve', 'RESERVED');
FiveBeansClient.prototype.reserve_with_timeout = makeBeanstalkCommand('reserve-with-timeout', 'RESERVED');
FiveBeansClient.prototype.destroy = makeBeanstalkCommand('delete', 'DELETED');
FiveBeansClient.prototype.release = makeBeanstalkCommand('release', 'RELEASED');
FiveBeansClient.prototype.bury = makeBeanstalkCommand('bury', 'BURIED');
FiveBeansClient.prototype.touch = makeBeanstalkCommand('touch', 'TOUCHED');
FiveBeansClient.prototype.kick = makeBeanstalkCommand('kick', 'KICKED');

FiveBeansClient.prototype.peek = makeBeanstalkCommand('peek', 'FOUND');
FiveBeansClient.prototype.peek_ready = makeBeanstalkCommand('peek-ready', 'FOUND');
FiveBeansClient.prototype.destroy = makeBeanstalkCommand('delete', 'DELETED');
FiveBeansClient.prototype.release = makeBeanstalkCommand('release', 'RELEASED');
FiveBeansClient.prototype.bury = makeBeanstalkCommand('bury', 'BURIED');
FiveBeansClient.prototype.touch = makeBeanstalkCommand('touch', 'TOUCHED');
FiveBeansClient.prototype.kick = makeBeanstalkCommand('kick', 'KICKED');

FiveBeansClient.prototype.peek = makeBeanstalkCommand('peek', 'FOUND');
FiveBeansClient.prototype.peek_ready = makeBeanstalkCommand('peek-ready', 'FOUND');
FiveBeansClient.prototype.peek_delayed = makeBeanstalkCommand('peek-delayed', 'FOUND');
FiveBeansClient.prototype.peek_buried = makeBeanstalkCommand('peek-buried', 'FOUND');
FiveBeansClient.prototype.peek_buried = makeBeanstalkCommand('peek-buried', 'FOUND');

FiveBeansClient.prototype.list_tube_used = makeBeanstalkCommand('list-tube-used', 'USING');
FiveBeansClient.prototype.pause_tube = makeBeanstalkCommand('pause-tube', 'PAUSED');
FiveBeansClient.prototype.pause_tube = makeBeanstalkCommand('pause-tube', 'PAUSED');

// the server returns yaml files in response to these commands
FiveBeansClient.prototype.list_tubes = makeBeanstalkCommand('list-tubes', 'OK');
FiveBeansClient.prototype.list_tubes = makeBeanstalkCommand('list-tubes', 'OK');
FiveBeansClient.prototype.list_tubes_watched = makeBeanstalkCommand('list-tubes-watched', 'OK');
FiveBeansClient.prototype.stats_job = makeBeanstalkCommand('stats-job', 'OK');
FiveBeansClient.prototype.stats_tube = makeBeanstalkCommand('stats-tube', 'OK');
FiveBeansClient.prototype.stats = makeBeanstalkCommand('stats', 'OK');
FiveBeansClient.prototype.stats_job = makeBeanstalkCommand('stats-job', 'OK');
FiveBeansClient.prototype.stats_tube = makeBeanstalkCommand('stats-tube', 'OK');
FiveBeansClient.prototype.stats = makeBeanstalkCommand('stats', 'OK');

// TODO quit -- closes connection, no response
// closes the connection, no response
FiveBeansClient.prototype.quit = makeBeanstalkCommand('quit', '');

// end beanstalkd commands

Expand Down
62 changes: 30 additions & 32 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ FiveBeansWorker.prototype.emitInfoLog = function(message, data)
message: message,
data: data
});
// console.log(message, data);
//console.log(message, data);
};

FiveBeansWorker.prototype.emitErrorLog = function(message, data)
Expand All @@ -41,28 +41,26 @@ FiveBeansWorker.prototype.emitErrorLog = function(message, data)
message: message,
data: data
});
// console.error(message, data);
//console.error(message, data);
};

FiveBeansWorker.prototype.start = function(tubes)
{
var self = this;

self.on('next', function()
{
return self.doNext();
});
this.on('next', this.doNext.bind(this));

function finishedStarting()
{
self.emit('started');
self.emit('next');
}

self.client = new beanstalk(this.host, this.port);
self.client.on('connect', function()
this.client = new beanstalk(this.host, this.port);

this.client.on('connect', function()
{
self.emitInfoLog('connected to beanstalkd at '+self.host+':'+self.port);
self.emitInfoLog('connected to beanstalkd at '+ self.host + ':'+self.port);
self.watch(tubes, function()
{
if (tubes && tubes.length && self.ignoreDefault)
Expand All @@ -79,19 +77,19 @@ FiveBeansWorker.prototype.start = function(tubes)
});
});

self.client.on('error', function(err)
this.client.on('error', function(err)
{
self.emitErrorLog('beanstalkd connection error', err);
self.emit('error', err);
});

self.client.on('close', function()
this.client.on('close', function()
{
self.emitInfoLog('beanstalkd connection closed');
self.emit('close');
});

self.client.connect();
this.client.connect();
};

FiveBeansWorker.prototype.watch = function(tubes, callback)
Expand Down Expand Up @@ -131,7 +129,6 @@ FiveBeansWorker.prototype.ignore = function(tubes, callback)
FiveBeansWorker.prototype.stop = function()
{
this.emitInfoLog('stopping...');
this.client.stop();
this.stopped = true;
};

Expand All @@ -141,7 +138,7 @@ FiveBeansWorker.prototype.doNext = function()
if (self.stopped)
{
self.client.end();
self.emitInfoLog('stopped.');
self.emitInfoLog('stopped');
self.emit('stopped');
return;
}
Expand Down Expand Up @@ -191,7 +188,6 @@ FiveBeansWorker.prototype.runJob = function(jobID, job)
}
};

// synchronous
FiveBeansWorker.prototype.lookupHandler = function(type)
{
return this.handlers[type];
Expand All @@ -201,33 +197,35 @@ FiveBeansWorker.prototype.callHandler = function(handler, jobID, jobdata)
{
var self = this;
var start = new Date().getTime();
this.currentJob = jobID;
this.currentHandler = handler;

try
{
var canceled = false;
handler.work(jobdata, function(action, delay)
{
var elapsed = new Date().getTime() - start;
switch (action)
{
case 'success':
self.emitInfoLog('ran job '+ jobID + ' in '+elapsed+' ms'+': '+JSON.stringify(jobdata));
self.deleteAndMoveOn(jobID);
break;
case 'success':
self.emitInfoLog('ran job '+ jobID + ' in '+elapsed+' ms'+': '+JSON.stringify(jobdata));
self.deleteAndMoveOn(jobID);
break;

case 'release':
self.emitInfoLog('released job '+ jobID + ' after '+elapsed+' ms');
self.releaseAndMoveOn(jobID, delay);
break;
case 'release':
self.emitInfoLog('released job '+ jobID + ' after '+elapsed+' ms');
self.releaseAndMoveOn(jobID, delay);
break;

case 'bury':
self.emitInfoLog('buried job ' + jobID);
self.buryAndMoveOn(jobID);
break;
case 'bury':
self.emitInfoLog('buried job ' + jobID);
self.buryAndMoveOn(jobID);
break;

default:
self.emitErrorLog('job ' + jobID + ' failed; reason: '+action+': '+JSON.stringify(jobdata));
self.buryAndMoveOn(jobID);
break;
default:
self.emitErrorLog('job ' + jobID + ' failed; reason: '+action+': '+JSON.stringify(jobdata));
self.buryAndMoveOn(jobID);
break;
}
});
}
Expand Down
72 changes: 56 additions & 16 deletions test/test-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

var
should = require('chai').should(),
events = require('events'),
fivebeans = require('../index'),
fs = require('fs')
;

var host = '127.0.0.1';
var port = 11300;
var tube = 'testtube';

//-------------------------------------------------------------
// some job handlers for testing

function StringReverser()
{
Expand All @@ -18,6 +17,7 @@ function StringReverser()

StringReverser.prototype.work = function(payload, callback)
{
this.reverseString(payload);
callback('success');
};

Expand All @@ -28,14 +28,19 @@ StringReverser.prototype.reverseString = function(input)
return letters.join('');
};


var joblist =
[
{ type: 'reverse', payload: 'madam, I\'m Adam' },
{ type: 'reverse', payload: 'satan oscillate my metallic sonatas' },
{ type: 'reverse', payload: 'able was I ere I saw Elba' }
{ type: 'reverse', payload: 'able was I ere I saw Elba' },
];

//-------------------------------------------------------------

var host = '127.0.0.1';
var port = 11300;
var tube = 'testtube';

var testopts =
{
id: 'testworker',
Expand All @@ -44,17 +49,27 @@ var testopts =
ignoreDefault: true,
handlers:
{
reverse: StringReverser
reverse: new StringReverser(),
}
};

//-------------------------------------------------------------

describe('FiveBeansWorker', function()
{
var producer, worker, testjobid;

before(function()
before(function(done)
{
producer = new fivebeans.client(host, port);
producer.on('connect', function()
{
producer.use(tube, function(err, resp)
{
done();
});
});
producer.connect();
});

describe('constructor', function()
Expand Down Expand Up @@ -82,11 +97,25 @@ describe('FiveBeansWorker', function()
});
});

describe('start()', function()
describe('starting & stopping', function()
{
var w;

it('emits the error event on failure', function(done)
{
w = new fivebeans.worker({id: 'fail', port: 5000});
w.on('error', function(err)
{
err.should.be.ok;
err.errno.should.equal('ECONNREFUSED');
done();
});
w.start();
});

it('emits the started event on success', function(done)
{
var w = new fivebeans.worker(testopts);
w = new fivebeans.worker(testopts);
w.on('started', function()
{
done();
Expand All @@ -97,22 +126,33 @@ describe('FiveBeansWorker', function()
w.start();
});

it('emits the error event on failure', function(done)
it('stops and cleans up when stopped', function(done)
{
var w = new fivebeans.worker({id: 'fail', port: 5000});
w.on('error', function(err)
this.timeout(5000);
w.on('stopped', function()
{
err.should.be.ok;
err.errno.should.equal('ECONNREFUSED');
w.stopped.should.equal(true);
done();
});
w.start();

w.stop();
});

});

describe('watch', function()
{
it('watches tubes on start', function(done)
{
worker = new fivebeans.worker(testopts);
worker.on('started', function()
{
// check to see if it's watching testtube
done();
});

worker.start([tube]);
});
});

describe('log events', function()
Expand Down

0 comments on commit 54cfb50

Please sign in to comment.