Skip to content

Commit

Permalink
Rewrite sending & receiving job bodies to use message buffers.
Browse files Browse the repository at this point in the history
Preparation for fixing issue #2.
  • Loading branch information
ceejbot committed Jul 4, 2012
1 parent fd89f4c commit b7794a8
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 27 deletions.
69 changes: 51 additions & 18 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,29 @@ var FiveBeansClient = function(host, port)
{
this.stream = null;
this.handlers = [];
this.buffer = '';
this.buffer = undefined;

this.host = (host === undefined) ? DEFAULT_HOST : host;
this.port = (port === undefined) ? DEFAULT_PORT : port;
};

FiveBeansClient.prototype.connect = function(callback)
{
var self = this;
var self = this, tmp;

self.stream = net.createConnection(self.port, self.host);

self.stream.on('data', function(data)
{
self.buffer += data;
if (!self.buffer)
self.buffer = data;
else
{
tmp = new Buffer(self.buffer.length + data.length);
self.buffer.copy(tmp, 0);
data.copy(tmp, self.buffer.length);
self.buffer = tmp;
}
return self.tryHandlingResponse();
});
self.stream.on('connect', function()
Expand Down Expand Up @@ -75,15 +83,15 @@ FiveBeansClient.prototype.tryHandlingResponse = function()
var latest = this.handlers[0];
var handler = latest[0];
var callback = latest[1];

if ((handler !== undefined) && (handler !== null))
{
handler.process(this.buffer);
if (handler.complete)
{
// shift it off & reset
this.handlers.shift();
this.buffer = '';
this.buffer = undefined;
if (handler.success)
callback.call.apply(callback, [null, null].concat(handler.args));
else
Expand Down Expand Up @@ -124,17 +132,37 @@ ResponseHandler.prototype.RESPONSES_REQUIRING_BODY =
'OK': 'yaml'
};

function findInBuffer(buffer, bytes)
{
var ptr = 0, idx = 0;
while (ptr < buffer.length)
{
if (buffer[ptr] === bytes[idx])
{
idx++;
if (idx === bytes.length)
return (ptr - bytes.length + 1);
}
else
idx = 0;
ptr++;
}
return -1;
}

var CRLF = new Buffer([0x0d, 0x0a]);

ResponseHandler.prototype.process = function(data)
{
var eol = data.indexOf('\r\n');
if (eol >= 0)
var eol = findInBuffer(data, CRLF);
if (eol > -1)
{
// Header is everything up to the windows line break;
// body is everything after.
this.header = data.substr(0, eol);
this.body = data.substr(eol + 2);
this.header = data.toString('utf8', 0, eol);
this.body = data.slice(eol + 2, data.length);
this.args = this.header.split(' ');

var response = this.args[0];
if (response === this.expectedResponse)
{
Expand Down Expand Up @@ -164,12 +192,12 @@ ResponseHandler.prototype.parseBody = function(how)
return;

var expectedLength = parseInt(this.args[this.args.length - 1], 10);
if (Buffer.byteLength(this.body) === (expectedLength + 2))
if (this.body.length === (expectedLength + 2))
{
this.args.pop();
var body = this.body.substr(0, expectedLength);
var body = this.body.slice(0, expectedLength);
this.complete = true;

switch(how)
{
case 'yaml':
Expand Down Expand Up @@ -198,7 +226,7 @@ function makeBeanstalkCommand(command, expectedResponse, sendsData)
// That's the case handled when args < 2.
return function()
{
var data;
var data, encoding;
var args = argHashToArray(arguments);
var callback = args.pop();
args.unshift(command);
Expand All @@ -207,10 +235,15 @@ function makeBeanstalkCommand(command, expectedResponse, sendsData)
data = args.pop();
args.push(data.length);
}

this.send.apply(this, args);
if (data) this.send(data);

if (data)
{
encoding = (typeof data === 'string') ? 'utf8' : 'binary';
this.stream.write(data, encoding);
this.stream.write('\r\n');
}

this.handlers.push([new ResponseHandler(expectedResponse), callback]);
};
}
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
"keywords" : ["beanstalkd", "worker", "jobs" ],
"dependencies" : {
"js-yaml": "0.3.x",
"optimist": "0.3.x",
"optimist": "0.3.x",
"winston": "0.5.x"
},
},
"devDependencies" : { "mocha": "0.9.x", "chai": "0.1.x" },
"repository" : { "type": "git",
"url" : "git://github.com/ceejbot/fivebeans.git" },
"bugs" : "http://github.com/ceejbot/fivebeans/issues",
"main" : "fivebeans",
"bin" : { "beanworker": "./bin/beanworker" },
"scripts" : { "test": "mocha -R spec test/test.js" }
}
}
42 changes: 36 additions & 6 deletions test/test.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
var should = require('chai').should();

var fivebeans = require('../fivebeans');
var fivebeans = require('../fivebeans'),
fs = require('fs')
;

var host = '127.0.0.1';
var host = '10.0.0.14';
var port = 11300;
var tube = 'testtube';

function readTestImage()
{
return fs.readFileSync('./test/test.png');
}

describe('FiveBeansClient', function()
{
var producer = null;
Expand Down Expand Up @@ -38,7 +45,7 @@ describe('FiveBeansClient', function()
});
});
});
describe('job producer', function()
describe('job producer:', function()
{
it('#use connects to a specific tube', function(done)
{
Expand Down Expand Up @@ -69,7 +76,7 @@ describe('FiveBeansClient', function()
});
});
});
describe('job consumer', function()
describe('job consumer:', function()
{
it('#watch watches a tube', function(done)
{
Expand Down Expand Up @@ -104,6 +111,7 @@ describe('FiveBeansClient', function()
});
it('#peek_ready peeks ahead at jobs', function(done)
{
this.timeout(4000);
producer.peek_ready(function(err, jobid, payload)
{
should.not.exist(err);
Expand Down Expand Up @@ -154,6 +162,28 @@ describe('FiveBeansClient', function()
done();
});
});

/*
it('jobs can contain binary data', function(done)
{
var payload = readTestImage();
producer.put(0, 0, 60, payload, function(err, jobid)
{
should.not.exist(err);
jobid.should.exist;
console.log(jobid);
consumer.reserve(function(err, returnID, returnPayload)
{
should.not.exist(err);
returnID.should.equal(jobid);
returnPayload.length.should.equal(payload.length);
done();
});
});
});
*/

it('#peek_delayed returns data for a delayed job', function(done)
{
producer.peek_delayed(function(err, jobid, payload)
Expand Down Expand Up @@ -229,7 +259,7 @@ describe('FiveBeansClient', function()
});
});
});

describe('server statistics', function()
{
it('#stats returns a hash of server stats', function(done)
Expand Down Expand Up @@ -270,6 +300,6 @@ describe('FiveBeansClient', function()
});
});
});

// untested: consumer.touch(), consumer.pause_tube()
});
Binary file added test/test.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit b7794a8

Please sign in to comment.