Skip to content

Commit

Permalink
Fixed #91: failing to parse jobs with non-ascii characters.
Browse files Browse the repository at this point in the history
Because, well, it was treating all jobs as ascii. Fixed this
as suggested, and added a test that fails without the change.

Cleaned up the worker tests a bunch.
  • Loading branch information
ceejbot committed Aug 28, 2016
1 parent 1e3b444 commit c95e230
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 45 deletions.
37 changes: 37 additions & 0 deletions bin/clear-testtube.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env node

var fivebeans = require('../index');
var client = new fivebeans.client('127.0.0.1', 11300);

function clearAJob()
{
client.peek_buried(function(err, jobid, payload)
{
if (err && err === 'NOT_FOUND')
{
console.log('done');
process.exit(0);
}
else if (jobid)
{
console.log('nuking ' + jobid, payload.toString());
client.destroy(jobid, clearAJob);
}
else
console.log(err);
});
}

client.on('connect', function handleConnect()
{
client.use('testtube', function handleWatch(err, response)
{
if (err) throw(err);
clearAJob();
});
}).on('error', function(err)
{
throw(err);
});

client.connect();
2 changes: 1 addition & 1 deletion lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ FiveBeansWorker.prototype.doNext = function()
self.emit('job.reserved', jobID);

var job = null;
try { job = JSON.parse(payload.toString('ascii')); }
try { job = JSON.parse(payload.toString()); }
catch (e) { self.emitWarning({ message: 'parsing job JSON', id: jobID, error: e }); }
if (!job || !_.isObject(job))
self.buryAndMoveOn(jobID);
Expand Down
78 changes: 34 additions & 44 deletions test/test-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ util.inherits(TestHandler, events.EventEmitter);

TestHandler.prototype.work = function(payload, callback)
{
this.emit('result', this.reverseWords(payload));
callback(payload, 0);
this.emit('result', this.reverseWords(payload.words));
callback(payload.trigger || 'success', 0);
};

TestHandler.prototype.reverseWords = function(input)
{
var words = input.split(' ');
words.reverse();
return words.join('');
return words.join(' ');
};

//-------------------------------------------------------------
Expand Down Expand Up @@ -61,7 +61,7 @@ describe('FiveBeansWorker', function()
before(function(done)
{
producer = new fivebeans.client(host, port);
producer.on('connect', function()
producer.once('connect', function()
{
producer.use(tube, function(err, resp)
{
Expand Down Expand Up @@ -128,7 +128,7 @@ describe('FiveBeansWorker', function()
it('emits the started event on success', function(done)
{
w = new fivebeans.worker(testopts);
w.on('started', function()
w.once('started', function()
{
done();
}).on('error', function(err)
Expand Down Expand Up @@ -214,8 +214,6 @@ describe('FiveBeansWorker', function()
{
producer.peek_buried(function(err, buriedID, payload)
{
worker.removeListener('job.buried', handleBuried);

demand(err).not.exist();
buriedID.must.equal(jobid);
producer.destroy(buriedID, function(err)
Expand All @@ -226,7 +224,7 @@ describe('FiveBeansWorker', function()
});
}

worker.on('job.buried', handleBuried);
worker.once('job.buried', handleBuried);

producer.put(0, 0, 60, '{ I am invalid JSON', function(err, jobid)
{
Expand All @@ -241,8 +239,6 @@ describe('FiveBeansWorker', function()
{
producer.peek_buried(function(err, buriedID, payload)
{
worker.removeListener('job.buried', handleBuried);

demand(err).not.exist();
buriedID.must.equal(jobid);
producer.destroy(buriedID, function(err)
Expand All @@ -253,7 +249,7 @@ describe('FiveBeansWorker', function()
});
}

worker.on('job.buried', handleBuried);
worker.once('job.buried', handleBuried);
var job = { type: 'unknown', payload: 'extremely important!'};
producer.put(0, 0, 60, JSON.stringify(job), function(err, jobid)
{
Expand All @@ -264,19 +260,16 @@ describe('FiveBeansWorker', function()

it('passes good jobs to handlers', function(done)
{
var handler = testopts.handlers.reverse;

function verifyResult(item)
{
item.must.exist();
item.must.be.a.string();
item.must.equal('success');
handler.removeListener('result', verifyResult);
item.must.equal('yo success');
done();
}

handler.on('result', verifyResult);
var job = { type: 'reverse', payload: 'success'};
testopts.handlers.reverse.once('result', verifyResult);
var job = { type: 'reverse', payload: {words: 'success yo', trigger: 'success' }};
producer.put(0, 0, 60, JSON.stringify(job), function(err, jobid)
{
demand(err).not.exist();
Expand All @@ -286,15 +279,8 @@ describe('FiveBeansWorker', function()

it('handles jobs that contain arrays (for ruby compatibility)', function(done)
{
function detectDeleted(jobid)
{
worker.removeListener('job.deleted', detectDeleted);
done();
}

worker.on('job.deleted', detectDeleted);

var job = ['stalker', { type: 'reverse', payload: 'success'}];
worker.once('job.deleted', function(result) { done(); });
var job = ['stalker', { type: 'reverse', payload: {words: 'not important', trigger: 'success'}}];
producer.put(0, 0, 60, JSON.stringify(job), function(err, jobid)
{
demand(err).not.exist();
Expand All @@ -308,8 +294,6 @@ describe('FiveBeansWorker', function()
{
producer.peek_buried(function(err, buriedID, payload)
{
worker.removeListener('job.buried', detectBuried);

demand(err).not.exist();
buriedID.must.equal(jobid);
producer.destroy(buriedID, function(err)
Expand All @@ -320,9 +304,24 @@ describe('FiveBeansWorker', function()
});
}

worker.on('job.buried', detectBuried);
worker.once('job.buried', detectBuried);

var job = { type: 'reverse', payload: 'bury'};
var job = { type: 'reverse', payload: { words: 'bury', trigger: 'bury' }};
producer.put(0, 0, 60, JSON.stringify(job), function(err, jobid)
{
demand(err).not.exist();
jobid.must.exist();
});
});

it('successfully handles jobs with non-ascii characters', function(done)
{
testopts.handlers.reverse.once('result', function(result)
{
result.must.equal('brûlée crèmes');
done();
});
var job = { type: 'reverse', payload: { words: 'crèmes brûlée', trigger: 'success' }};
producer.put(0, 0, 60, JSON.stringify(job), function(err, jobid)
{
demand(err).not.exist();
Expand Down Expand Up @@ -358,25 +357,18 @@ describe('FiveBeansWorker', function()
});
}

function handled()
{
worker.removeListener('job.handled', handled);
done();
}

function handleReserved(id)
{
jobid = id;
worker.removeListener('job.reserved', handleReserved);
worker.on('job.handled', handled);
worker.once('job.handled', function() { done(); });

setTimeout(getInfo, 3000);
}

worker.on('job.reserved', handleReserved);
worker.once('job.reserved', handleReserved);
worker.on('warning', console.log);

var job = { type: 'longasync', payload: 'ignored' };
var job = { type: 'longasync', payload: { words: 'ignored', trigger: 'ignored' }};
producer.put(0, 0, 30, JSON.stringify(job), function(err, jobid)
{
demand(err).not.exist();
Expand All @@ -389,7 +381,6 @@ describe('FiveBeansWorker', function()
function detectReleased(jobid)
{
worker.stop();
worker.removeListener('job.released', detectReleased);

producer.peek_ready(function(err, releasedID, payload)
{
Expand All @@ -403,16 +394,15 @@ describe('FiveBeansWorker', function()
});
}

worker.on('job.released', detectReleased);
worker.once('job.released', detectReleased);

var job = { type: 'reverse', payload: 'release' };
var job = { type: 'reverse', payload: { words: 'release', trigger: 'release' }};
producer.put(0, 0, 60, JSON.stringify(job), function(err, jobid)
{
demand(err).not.exist();
jobid.must.exist();
});
});

});

describe('log events', function()
Expand Down

0 comments on commit c95e230

Please sign in to comment.