forked from redis/node-redis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmulti.js
187 lines (174 loc) · 6.36 KB
/
multi.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
'use strict';
var Queue = require('denque');
var utils = require('./utils');
var Command = require('./command');
function Multi (client, args) {
this._client = client;
this.queue = new Queue();
var command, tmp_args;
if (args) { // Either undefined or an array. Fail hard if it's not an array
for (var i = 0; i < args.length; i++) {
command = args[i][0];
tmp_args = args[i].slice(1);
if (Array.isArray(command)) {
this[command[0]].apply(this, command.slice(1).concat(tmp_args));
} else {
this[command].apply(this, tmp_args);
}
}
}
}
function pipeline_transaction_command (self, command_obj, index) {
// Queueing is done first, then the commands are executed
var tmp = command_obj.callback;
command_obj.callback = function (err, reply) {
// Ignore the multi command. This is applied by node_redis and the user does not benefit by it
if (err && index !== -1) {
if (tmp) {
tmp(err);
}
err.position = index;
self.errors.push(err);
}
// Keep track of who wants buffer responses:
// By the time the callback is called the command_obj got the buffer_args attribute attached
self.wants_buffers[index] = command_obj.buffer_args;
command_obj.callback = tmp;
};
self._client.internal_send_command(command_obj);
}
Multi.prototype.exec_atomic = Multi.prototype.EXEC_ATOMIC = Multi.prototype.execAtomic = function exec_atomic (callback) {
if (this.queue.length < 2) {
return this.exec_batch(callback);
}
return this.exec(callback);
};
function multi_callback (self, err, replies) {
var i = 0, command_obj;
if (err) {
err.errors = self.errors;
if (self.callback) {
self.callback(err);
// Exclude connection errors so that those errors won't be emitted twice
} else if (err.code !== 'CONNECTION_BROKEN') {
self._client.emit('error', err);
}
return;
}
if (replies) {
while (command_obj = self.queue.shift()) {
if (replies[i] instanceof Error) {
var match = replies[i].message.match(utils.err_code);
// LUA script could return user errors that don't behave like all other errors!
if (match) {
replies[i].code = match[1];
}
replies[i].command = command_obj.command.toUpperCase();
if (typeof command_obj.callback === 'function') {
command_obj.callback(replies[i]);
}
} else {
// If we asked for strings, even in detect_buffers mode, then return strings:
replies[i] = self._client.handle_reply(replies[i], command_obj.command, self.wants_buffers[i]);
if (typeof command_obj.callback === 'function') {
command_obj.callback(null, replies[i]);
}
}
i++;
}
}
if (self.callback) {
self.callback(null, replies);
}
}
Multi.prototype.exec_transaction = function exec_transaction (callback) {
if (this.monitoring || this._client.monitoring) {
var err = new RangeError(
'Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis.'
);
err.command = 'EXEC';
err.code = 'EXECABORT';
return utils.reply_in_order(this._client, callback, err);
}
var self = this;
var len = self.queue.length;
self.errors = [];
self.callback = callback;
self._client.cork();
self.wants_buffers = new Array(len);
pipeline_transaction_command(self, new Command('multi', []), -1);
// Drain queue, callback will catch 'QUEUED' or error
for (var index = 0; index < len; index++) {
// The commands may not be shifted off, since they are needed in the result handler
pipeline_transaction_command(self, self.queue.get(index), index);
}
self._client.internal_send_command(new Command('exec', [], function (err, replies) {
multi_callback(self, err, replies);
}));
self._client.uncork();
return !self._client.should_buffer;
};
function batch_callback (self, cb, i) {
return function batch_callback (err, res) {
if (err) {
self.results[i] = err;
// Add the position to the error
self.results[i].position = i;
} else {
self.results[i] = res;
}
cb(err, res);
};
}
Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = function exec_batch (callback) {
var self = this;
var len = self.queue.length;
var index = 0;
var command_obj;
if (len === 0) {
utils.reply_in_order(self._client, callback, null, []);
return !self._client.should_buffer;
}
self._client.cork();
if (!callback) {
while (command_obj = self.queue.shift()) {
self._client.internal_send_command(command_obj);
}
self._client.uncork();
return !self._client.should_buffer;
}
var callback_without_own_cb = function (err, res) {
if (err) {
self.results.push(err);
// Add the position to the error
var i = self.results.length - 1;
self.results[i].position = i;
} else {
self.results.push(res);
}
// Do not emit an error here. Otherwise each error would result in one emit.
// The errors will be returned in the result anyway
};
var last_callback = function (cb) {
return function (err, res) {
cb(err, res);
callback(null, self.results);
};
};
self.results = [];
while (command_obj = self.queue.shift()) {
if (typeof command_obj.callback === 'function') {
command_obj.callback = batch_callback(self, command_obj.callback, index);
} else {
command_obj.callback = callback_without_own_cb;
}
if (typeof callback === 'function' && index === len - 1) {
command_obj.callback = last_callback(command_obj.callback);
}
this._client.internal_send_command(command_obj);
index++;
}
self._client.uncork();
return !self._client.should_buffer;
};
module.exports = Multi;