Skip to content

Commit

Permalink
Fix monitoring mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Ruben Bridgewater committed Mar 25, 2016
1 parent bf568b6 commit 344291a
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 81 deletions.
14 changes: 6 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -545,22 +545,20 @@ If you fire many commands at once this is going to **boost the execution speed b
Redis supports the `MONITOR` command, which lets you see all commands received by the Redis server
across all client connections, including from other client libraries and other computers.

After you send the `MONITOR` command, no other commands are valid on that connection. `node_redis`
will emit a `monitor` event for every new monitor message that comes across. The callback for the
`monitor` event takes a timestamp from the Redis server and an array of command arguments.
A `monitor` event is going to be emitted for every command fired from any client connected to the server including the monitoring client itself.
The callback for the `monitor` event takes a timestamp from the Redis server, an array of command arguments and the raw monitoring string.

Here is a simple example:

```js
var client = require("redis").createClient(),
util = require("util");

var client = require("redis").createClient();
client.monitor(function (err, res) {
console.log("Entering monitoring mode.");
});
client.set('foo', 'bar');

client.on("monitor", function (time, args) {
console.log(time + ": " + util.inspect(args));
client.on("monitor", function (time, args, raw_reply) {
console.log(time + ": " + args); // 1458910076.446514:['set', 'foo', 'bar']
});
```

Expand Down
14 changes: 14 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
Changelog
=========

## v.2.6.0 - XX Mar, 2016

Features

- Monitor now works together with the offline queue
- All commands that were send after a connection loss are now going to be send after reconnecting
- Activating monitor mode does now work together with arbitrary commands including pub sub mode

Bugfixes

- Fixed calling monitor command while other commands are still running
- Fixed monitor and pub sub mode not working together
- Fixed monitor mode not working in combination with the offline queue

## v.2.5.3 - 21 Mar, 2016

Bugfixes
Expand Down
77 changes: 22 additions & 55 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ function RedisClient (options, stream) {
this.pipeline = 0;
this.times_connected = 0;
this.options = options;
this.buffers = options.return_buffers || options.detect_buffers;
// Init parser
this.reply_parser = Parser({
returnReply: function (data) {
Expand All @@ -154,7 +155,7 @@ function RedisClient (options, stream) {
self.stream.destroy();
self.return_error(err);
},
returnBuffers: options.return_buffers || options.detect_buffers,
returnBuffers: this.buffers,
name: options.parser
});
this.create_stream();
Expand Down Expand Up @@ -329,9 +330,7 @@ RedisClient.prototype.on_error = function (err) {
}

err.message = 'Redis connection to ' + this.address + ' failed - ' + err.message;

debug(err.message);

this.connected = false;
this.ready = false;

Expand Down Expand Up @@ -369,12 +368,6 @@ RedisClient.prototype.on_ready = function () {
debug('on_ready called ' + this.address + ' id ' + this.connection_id);
this.ready = true;

if (this.old_state !== null) {
this.monitoring = this.old_state.monitoring;
this.pub_sub_mode = this.old_state.pub_sub_mode;
this.old_state = null;
}

var cork;
if (!this.stream.cork) {
cork = function (len) {
Expand All @@ -393,16 +386,15 @@ RedisClient.prototype.on_ready = function () {
}
this.cork = cork;

// restore modal commands from previous connection
// restore modal commands from previous connection. The order of the commands is important
if (this.selected_db !== undefined) {
// this trick works if and only if the following send_command
// never goes into the offline queue
var pub_sub_mode = this.pub_sub_mode;
this.pub_sub_mode = false;
this.send_command('select', [this.selected_db]);
this.pub_sub_mode = pub_sub_mode;
}
if (this.pub_sub_mode === true) {
if (this.old_state !== null) {
this.monitoring = this.old_state.monitoring;
this.pub_sub_mode = this.old_state.pub_sub_mode;
}
if (this.pub_sub_mode) {
// only emit 'ready' when all subscriptions were made again
var callback_count = 0;
var callback = function () {
Expand All @@ -424,12 +416,10 @@ RedisClient.prototype.on_ready = function () {
});
return;
}

if (this.monitoring) {
this.send_command('monitor', []);
} else {
this.send_offline_queue();
}
this.send_offline_queue();
this.emit('ready');
};

Expand Down Expand Up @@ -525,15 +515,13 @@ RedisClient.prototype.connection_gone = function (why, error) {
this.cork = noop;
this.pipeline = 0;

if (this.old_state === null) {
var state = {
monitoring: this.monitoring,
pub_sub_mode: this.pub_sub_mode
};
this.old_state = state;
this.monitoring = false;
this.pub_sub_mode = false;
}
var state = {
monitoring: this.monitoring,
pub_sub_mode: this.pub_sub_mode
};
this.old_state = state;
this.monitoring = false;
this.pub_sub_mode = false;

// since we are collapsing end and close, users don't expect to be called twice
if (!this.emitted_end) {
Expand Down Expand Up @@ -604,9 +592,7 @@ RedisClient.prototype.connection_gone = function (why, error) {
};

RedisClient.prototype.return_error = function (err) {
var command_obj = this.command_queue.shift(),
queue_len = this.command_queue.length;

var command_obj = this.command_queue.shift();
if (command_obj && command_obj.command && command_obj.command.toUpperCase) {
err.command = command_obj.command.toUpperCase();
}
Expand All @@ -617,8 +603,7 @@ RedisClient.prototype.return_error = function (err) {
err.code = match[1];
}

this.emit_idle(queue_len);

this.emit_idle();
utils.callback_or_emit(this, command_obj && command_obj.callback, err);
};

Expand All @@ -627,8 +612,8 @@ RedisClient.prototype.drain = function () {
this.should_buffer = false;
};

RedisClient.prototype.emit_idle = function (queue_len) {
if (queue_len === 0 && this.pub_sub_mode === false) {
RedisClient.prototype.emit_idle = function () {
if (this.command_queue.length === 0 && this.pub_sub_mode === false) {
this.emit('idle');
}
};
Expand All @@ -640,20 +625,6 @@ function queue_state_error (self, command_obj) {
self.emit('error', err);
}

function monitor (self, reply) {
if (typeof reply !== 'string') {
reply = reply.toString();
}
// If in monitoring mode only two commands are valid ones: AUTH and MONITOR wich reply with OK
var len = reply.indexOf(' ');
var timestamp = reply.slice(0, len);
var argindex = reply.indexOf('"');
var args = reply.slice(argindex + 1, -1).split('" "').map(function (elem) {
return elem.replace(/\\"/g, '"');
});
self.emit('monitor', timestamp, args);
}

function normal_reply (self, reply, command_obj) {
if (typeof command_obj.callback === 'function') {
if ('exec' !== command_obj.command) {
Expand Down Expand Up @@ -716,17 +687,15 @@ RedisClient.prototype.return_reply = function (reply) {

queue_len = this.command_queue.length;

this.emit_idle(queue_len);
this.emit_idle();

if (command_obj && !command_obj.sub_command) {
normal_reply(this, reply, command_obj);
} else if (this.pub_sub_mode || command_obj && command_obj.sub_command) {
return_pub_sub(this, reply, command_obj);
}
/* istanbul ignore else: this is a safety check that we should not be able to trigger */
else if (this.monitoring) {
monitor(this, reply);
} else {
else if (!this.monitoring) {
queue_state_error(this, command_obj);
}
};
Expand Down Expand Up @@ -837,8 +806,6 @@ RedisClient.prototype.send_command = function (command, args, callback) {

if (command === 'subscribe' || command === 'psubscribe' || command === 'unsubscribe' || command === 'punsubscribe') {
this.pub_sub_command(command_obj); // TODO: This has to be moved to the result handler
} else if (command === 'monitor') {
this.monitoring = true;
} else if (command === 'quit') {
this.closing = true;
}
Expand Down
35 changes: 35 additions & 0 deletions lib/individualCommands.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,41 @@ RedisClient.prototype.select = RedisClient.prototype.SELECT = function select (d
});
};

RedisClient.prototype.monitor = RedisClient.prototype.MONITOR = function (callback) {
// Use a individual command, as this is a special case that does not has to be checked for any other command
var self = this;
return this.send_command('monitor', [], function (err, res) {
if (err === null) {
self.reply_parser.returnReply = function (reply) {
// If in monitor mode, all normal commands are still working and we only want to emit the streamlined commands
// As this is not the average use case and monitor is expensive anyway, let's change the code here, to improve
// the average performance of all other commands in case of no monitor mode
if (self.monitoring) {
var replyStr;
if (self.buffers && Buffer.isBuffer(reply)) {
replyStr = reply.toString();
} else {
replyStr = reply;
}
// While reconnecting the redis server does not recognize the client as in monitor mode anymore
// Therefor the monitor command has to finish before it catches further commands
if (typeof replyStr === 'string' && utils.monitor_regex.test(replyStr)) {
var timestamp = replyStr.slice(0, replyStr.indexOf(' '));
var args = replyStr.slice(replyStr.indexOf('"') + 1, -1).split('" "').map(function (elem) {
return elem.replace(/\\"/g, '"');
});
self.emit('monitor', timestamp, args, replyStr);
return;
}
}
self.return_reply(reply);
};
self.monitoring = true;
}
utils.callback_or_emit(self, callback, err, res);
});
};

// Store info in this.server_info after each call
RedisClient.prototype.info = RedisClient.prototype.INFO = function info (section, callback) {
var self = this;
Expand Down
5 changes: 2 additions & 3 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ function print (err, reply) {
}
}

var redisErrCode = /^([A-Z]+)\s+(.+)$/;

// Deep clone arbitrary objects with arrays. Can't handle cyclic structures (results in a range error)
// Any attribute with a non primitive value besides object and array will be passed by reference (e.g. Buffers, Maps, Functions)
function clone (obj) {
Expand Down Expand Up @@ -102,7 +100,8 @@ module.exports = {
reply_to_strings: replyToStrings,
reply_to_object: replyToObject,
print: print,
err_code: redisErrCode,
err_code: /^([A-Z]+)\s+(.+)$/,
monitor_regex: /^[0-9]{10,11}\.[0-9]+ \[[0-9]{1,3} [0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}:[0-9]{1,5}\].*/,
clone: convenienceClone,
callback_or_emit: callbackOrEmit,
reply_in_order: replyInOrder
Expand Down
Loading

0 comments on commit 344291a

Please sign in to comment.