Skip to content

Commit

Permalink
Add probe for server readiness.
Browse files Browse the repository at this point in the history
When a Redis server starts up, it might take a while to load the dataset into memory.
During this time, the server will accept connections, but will return errors for all non-INFO
commands.  Now node_redis will send an INFO command whenever it connects to a server.
If the info command indicates that the server is not ready, the client will keep trying until
the server is ready.  Once it is ready, the client will emit a "ready" event as well as the
"connect" event.  The client will queue up all commands sent before the server is ready, just
like it did before.  When the server is ready, all offline/non-ready commands will be replayed.
This should be backward compatible with previous versions.

To disable this ready check behavior, set `options.no_ready_check` when creating the client.

As a side effect of this change, the key/val params from the info command are available as
`client.server_options`.  Further, the version string is decomposed into individual elements
in `client.server_options.versions`.
  • Loading branch information
mranney committed Feb 17, 2011
1 parent 8cf8c99 commit ee93d1b
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 51 deletions.
19 changes: 19 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
Changelog
=========

## v0.5.5 - February 16, 2011

Add probe for server readiness.

When a Redis server starts up, it might take a while to load the dataset into memory.
During this time, the server will accept connections, but will return errors for all non-INFO
commands. Now node_redis will send an INFO command whenever it connects to a server.
If the info command indicates that the server is not ready, the client will keep trying until
the server is ready. Once it is ready, the client will emit a "ready" event as well as the
"connect" event. The client will queue up all commands sent before the server is ready, just
like it did before. When the server is ready, all offline/non-ready commands will be replayed.
This should be backward compatible with previous versions.

To disable this ready check behavior, set `options.no_ready_check` when creating the client.

As a side effect of this change, the key/val params from the info command are available as
`client.server_options`. Further, the version string is decomposed into individual elements
in `client.server_options.versions`.

## v0.5.4 - February 11, 2011

Fix excess memory consumption from Queue backing store.
Expand Down
111 changes: 91 additions & 20 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ function RedisClient(stream, options) {
this.options = options || {};

this.connected = false;
this.ready = false;
this.connections = 0;
this.attempts = 1;
this.command_queue = new Queue(); // holds sent commands to de-pipeline them
Expand All @@ -48,6 +49,7 @@ function RedisClient(stream, options) {
this.retry_backoff = 1.7;
this.subscriptions = false;
this.closing = false;
this.server_info = {};

var parser_module, self = this;

Expand Down Expand Up @@ -76,22 +78,23 @@ function RedisClient(stream, options) {
});

// "reply error" is an error sent back by redis
self.reply_parser.on("reply error", function (reply) {
this.reply_parser.on("reply error", function (reply) {
self.return_error(new Error(reply));
});
self.reply_parser.on("reply", function (reply) {
this.reply_parser.on("reply", function (reply) {
self.return_reply(reply);
});
// "error" is bad. Somehow the parser got confused. It'll try to reset and continue.
self.reply_parser.on("error", function (err) {
this.reply_parser.on("error", function (err) {
self.emit("error", new Error("Redis reply parser error: " + err.stack));
});

this.stream.on("connect", function () {
if (exports.debug_mode) {
console.log("Stream connected");
console.log("Stream connected fd " + self.stream.fd);
}
self.connected = true;
self.ready = false;
self.connections += 1;
self.command_queue = new Queue();
self.emitted_end = false;
Expand All @@ -101,19 +104,14 @@ function RedisClient(stream, options) {
self.stream.setNoDelay();
self.stream.setTimeout(0);

// give connect listeners a chance to run first in case they need to auth
self.emit("connect");

var command_obj;
while (self.offline_queue.length > 0) {
command_obj = self.offline_queue.shift();
if (exports.debug_mode) {
console.log("Sending offline command: " + command_obj.command);
}
self.send_command(command_obj.command, command_obj.args, command_obj.callback);
if (self.options.no_ready_check) {
self.ready = true;
self.send_offline_queue();
} else {
self.ready_check();
}
self.offline_queue = new Queue();
// Even though items were shifted off, Queue backing store still uses memory until next add
});

this.stream.on("data", function (buffer_from_socket) {
Expand Down Expand Up @@ -145,6 +143,7 @@ function RedisClient(stream, options) {
self.command_queue = new Queue();

self.connected = false;
self.ready = false;
self.emit("error", new Error(message));
});

Expand All @@ -165,6 +164,76 @@ function RedisClient(stream, options) {
util.inherits(RedisClient, events.EventEmitter);
exports.RedisClient = RedisClient;

RedisClient.prototype.ready_check = function () {
var self = this;

function send_info_cmd() {
if (exports.debug_mode) {
console.log("checking server ready state...");
}

self.send_anyway = true; // secret flag to send_command to send something even if not "ready"
self.info(function (err, res) {
if (err) {
self.emit("error", "Ready check failed: " + err);
return;
}

var lines = res.split("\r\n"), obj = {}, retry_time;

lines.forEach(function (line) {
var parts = line.split(':');
if (parts[1]) {
obj[parts[0]] = parts[1];
}
});

obj.versions = [];
obj.redis_version.split('.').forEach(function (num) {
obj.versions.push(+num);
});

// expose info key/vals to users
self.server_info = obj;

if (!obj["loading"] || (obj["loading"] && obj["loading"] == 0)) {
if (exports.debug_mode) {
console.log("Redis server ready.");
}
self.ready = true;

self.send_offline_queue();
self.emit("ready");
} else {
retry_time = obj.loading_eta_seconds * 1000;
if (retry_time > 1000) {
retry_time = 1000;
}
if (exports.debug_mode) {
console.log("Redis server still loading, trying again in " + retry_time);
}
setTimeout(send_info_cmd, retry_time);
}
});
self.send_anyway = false;
}

send_info_cmd();
};

RedisClient.prototype.send_offline_queue = function () {
var command_obj;
while (this.offline_queue.length > 0) {
command_obj = this.offline_queue.shift();
if (exports.debug_mode) {
console.log("Sending offline command: " + command_obj.command);
}
this.send_command(command_obj.command, command_obj.args, command_obj.callback);
}
this.offline_queue = new Queue();
// Even though items were shifted off, Queue backing store still uses memory until next add
};

RedisClient.prototype.connection_gone = function (why) {
var self = this;

Expand All @@ -180,6 +249,7 @@ RedisClient.prototype.connection_gone = function (why) {
console.warn("Redis connection is gone from " + why + " event.");
}
self.connected = false;
self.ready = false;
self.subscriptions = false;

// since we are collapsing end and close, users don't expect to be called twice
Expand Down Expand Up @@ -357,7 +427,7 @@ RedisClient.prototype.send_command = function () {
sub_command: false
};

if (! this.connected) {
if (!this.ready && !this.send_anyway) {
if (exports.debug_mode) {
console.log("Queueing " + command + " for next server connection.");
}
Expand Down Expand Up @@ -447,6 +517,7 @@ RedisClient.prototype.send_command = function () {
RedisClient.prototype.end = function () {
this.stream._events = {};
this.connected = false;
this.ready = false;
return this.stream.end();
};

Expand Down Expand Up @@ -616,16 +687,16 @@ RedisClient.prototype.MULTI = function (args) {
exports.createClient = function (port_arg, host_arg, options) {
var port = port_arg || default_port,
host = host_arg || default_host,
red_client, net_client;
redis_client, net_client;

net_client = net.createConnection(port, host);

red_client = new RedisClient(net_client, options);
redis_client = new RedisClient(net_client, options);

red_client.port = port;
red_client.host = host;
redis_client.port = port;
redis_client.host = host;

return red_client;
return redis_client;
};

exports.print = function (err, reply) {
Expand Down
1 change: 0 additions & 1 deletion multi_bench.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ var redis = require("./index"),
tests = [],
test_start,
client_options = {
parser: "javascript",
return_buffers: false
};

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{ "name" : "redis",
"version" : "0.5.4",
"version" : "0.5.5",
"description" : "Redis client library",
"author": "Matt Ranney <[email protected]>",
"contributors": [
Expand Down
37 changes: 8 additions & 29 deletions test.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
/*global require console setTimeout process Buffer */
var redis = require("./index"),
client = redis.createClient(6379, "127.0.0.1", {
parser: "javascript"
}),
client = redis.createClient(),
client2 = redis.createClient(),
client3 = redis.createClient(),
assert = require("assert"),
util = require("./lib/util").util,
test_db_num = 15, // this DB will be flushed and used for testing
tests = {},
connected = false,
ended = false,
server_info;
ended = false;

// Uncomment this to see the wire protocol and other debugging info
redis.debug_mode = false;
// Set this to truthy to see the wire protocol and other debugging info
redis.debug_mode = process.argv[2];

function buffers_to_strings(arr) {
return arr.map(function (val) {
Expand Down Expand Up @@ -231,7 +228,7 @@ tests.MULTI_6 = function () {
tests.WATCH_MULTI = function () {
var name = 'WATCH_MULTI';

if (server_info.versions[0] >= 2 && server_info.versions[1] >= 1) {
if (client.server_info.versions[0] >= 2 && client.server_info.versions[1] >= 1) {
client.watch(name);
client.incr(name);
var multi = client.multi();
Expand Down Expand Up @@ -1057,28 +1054,10 @@ function run_next_test() {

console.log("Using reply parser " + client.reply_parser.name);

client.on("connect", function start_tests() {
// remove listener so we don't restart all tests on reconnect
client.removeListener("connect", start_tests);

// Fetch and stash info results in case anybody needs info on the server we are using.
client.info(function (err, reply) {
var obj = {};
reply.toString().split('\n').forEach(function (line) {
var parts = line.split(':');
if (parts[1]) {
obj[parts[0]] = parts[1];
}
});
obj.versions = [];
obj.redis_version.split('.').forEach(function (num) {
obj.versions.push(+num);
});
server_info = obj;
console.log("Connected to " + client.host + ":" + client.port + ", Redis server version " + obj.redis_version + "\n");
client.once("ready", function start_tests() {
console.log("Connected to " + client.host + ":" + client.port + ", Redis server version " + client.server_info.redis_version + "\n");

run_next_test();
});
run_next_test();

connected = true;
});
Expand Down

0 comments on commit ee93d1b

Please sign in to comment.