Skip to content

Commit

Permalink
Stream fixes from production testing
Browse files Browse the repository at this point in the history
  • Loading branch information
smfreegard committed Nov 9, 2012
1 parent 6147ad7 commit cc17f78
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
4 changes: 3 additions & 1 deletion connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,9 @@ Connection.prototype.tran_uuid = function () {
}

Connection.prototype.reset_transaction = function() {
this.transaction.messageStream.destroy();
if (this.transaction) {
this.transaction.messageStream.destroy();
}
delete this.transaction;
};

Expand Down
12 changes: 9 additions & 3 deletions messagestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var STATE_BODY = 2;
function MessageStream (config, id, headers) {
if (!id) throw new Error('id required');
Stream.call(this);
this.uuid = id;
this.write_ce = null;
this.read_ce = null;
this.bytes_read = 0;
Expand Down Expand Up @@ -226,7 +227,7 @@ MessageStream.prototype._read = function () {
if (this._queue.length > 0) {
// TODO: implement start/end offsets
for (var i=0; i<this._queue.length; i++) {
this.process_buf(this._queue[i]);
this.process_buf(this._queue[i].slice(0));
}
this._read_finish();
}
Expand Down Expand Up @@ -265,7 +266,10 @@ MessageStream.prototype.process_buf = function (buf) {
buf = buf.slice(line.length);
// Don't output headers if they where sent already
if (this.headers_done && !this.headers_found_eoh) {
if (line.length === 2 && line[0] === 0x0d && line[1] === 0x0a) {
// Allow \r\n or \n here...
if ((line.length === 2 && line[0] === 0x0d && line[1] === 0x0a) ||
(line.length === 1 && line[0] === 0x0a))
{
this.headers_found_eoh = true;
}
continue;
Expand Down Expand Up @@ -320,7 +324,9 @@ MessageStream.prototype.pipe = function (destination, options) {
this.clamd_style = ((options && options.clamd_style) ? true : false);
this.buffer_size = ((options && options.buffer_size) ? options.buffer_size : 1024 * 64);
this.start = ((options && parseInt(options.start)) ? parseInt(options.start) : 0);
// Reset
// Reset
this.readable = true;
this.paused = false;
this.headers_done = false;
this.headers_found_eoh = false;
this.data_buf = new Buffer(this.buffer_size);
Expand Down
9 changes: 6 additions & 3 deletions plugins/spamassassin.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ exports.hook_data_post = function (next, connection) {
if (connection.relaying) {
headers.push('X-Haraka-Relay: true');
}
socket.write(headers.join("\r\n"), function () {
connection.transaction.messageStream.pipe(socket);
});

socket.write(headers.join("\r\n"));
connection.transaction.messageStream.pipe(socket);
});

var spamd_response = {};
Expand Down Expand Up @@ -107,6 +107,9 @@ exports.hook_data_post = function (next, connection) {
});

socket.on('end', function () {
// Abort if the connection or transaction are gone
if (!connection || (connection && !connection.transaction)) return next();

// Now we do stuff with the results...

plugin.fixup_old_headers(config.main.old_headers_action, connection.transaction);
Expand Down

0 comments on commit cc17f78

Please sign in to comment.