forked from redis/redis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathblocked.c
195 lines (180 loc) · 8.04 KB
/
blocked.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
/* blocked.c - generic support for blocking operations like BLPOP & WAIT.
*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* ---------------------------------------------------------------------------
*
* API:
*
* getTimeoutFromObjectOrReply() is just an utility function to parse a
* timeout argument since blocking operations usually require a timeout.
*
* blockClient() set the CLIENT_BLOCKED flag in the client, and set the
* specified block type 'btype' filed to one of BLOCKED_* macros.
*
* unblockClient() unblocks the client doing the following:
* 1) It calls the btype-specific function to cleanup the state.
* 2) It unblocks the client by unsetting the CLIENT_BLOCKED flag.
* 3) It puts the client into a list of just unblocked clients that are
* processed ASAP in the beforeSleep() event loop callback, so that
* if there is some query buffer to process, we do it. This is also
* required because otherwise there is no 'readable' event fired, we
* already read the pending commands. We also set the CLIENT_UNBLOCKED
* flag to remember the client is in the unblocked_clients list.
*
* processUnblockedClients() is called inside the beforeSleep() function
* to process the query buffer from unblocked clients and remove the clients
* from the blocked_clients queue.
*
* replyToBlockedClientTimedOut() is called by the cron function when
* a client blocked reaches the specified timeout (if the timeout is set
* to 0, no timeout is processed).
* It usually just needs to send a reply to the client.
*
* When implementing a new type of blocking opeation, the implementation
* should modify unblockClient() and replyToBlockedClientTimedOut() in order
* to handle the btype-specific behavior of this two functions.
* If the blocking operation waits for certain keys to change state, the
* clusterRedirectBlockedClientIfNeeded() function should also be updated.
*/
#include "server.h"
/* Get a timeout value from an object and store it into 'timeout'.
* The final timeout is always stored as milliseconds as a time where the
* timeout will expire, however the parsing is performed according to
* the 'unit' that can be seconds or milliseconds.
*
* Note that if the timeout is zero (usually from the point of view of
* commands API this means no timeout) the value stored into 'timeout'
* is zero. */
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
long long tval;
if (getLongLongFromObjectOrReply(c,object,&tval,
"timeout is not an integer or out of range") != C_OK)
return C_ERR;
if (tval < 0) {
addReplyError(c,"timeout is negative");
return C_ERR;
}
if (tval > 0) {
if (unit == UNIT_SECONDS) tval *= 1000;
tval += mstime();
}
*timeout = tval;
return C_OK;
}
/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
* flag is set client query buffer is not longer processed, but accumulated,
* and will be processed when the client is unblocked. */
void blockClient(client *c, int btype) {
c->flags |= CLIENT_BLOCKED;
c->btype = btype;
server.bpop_blocked_clients++;
}
/* This function is called in the beforeSleep() function of the event loop
* in order to process the pending input buffer of clients that were
* unblocked after a blocking operation. */
void processUnblockedClients(void) {
listNode *ln;
client *c;
while (listLength(server.unblocked_clients)) {
ln = listFirst(server.unblocked_clients);
serverAssert(ln != NULL);
c = ln->value;
listDelNode(server.unblocked_clients,ln);
c->flags &= ~CLIENT_UNBLOCKED;
/* Process remaining data in the input buffer, unless the client
* is blocked again. Actually processInputBuffer() checks that the
* client is not blocked before to proceed, but things may change and
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
if (c->querybuf && sdslen(c->querybuf) > 0) {
processInputBuffer(c);
}
}
}
}
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c) {
if (c->btype == BLOCKED_LIST) {
unblockClientWaitingData(c);
} else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
} else if (c->btype == BLOCKED_MODULE) {
unblockClientFromModule(c);
} else {
serverPanic("Unknown btype in unblockClient().");
}
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
c->flags &= ~CLIENT_BLOCKED;
c->btype = BLOCKED_NONE;
server.bpop_blocked_clients--;
/* The client may already be into the unblocked list because of a previous
* blocking operation, don't add back it into the list multiple times. */
if (!(c->flags & CLIENT_UNBLOCKED)) {
c->flags |= CLIENT_UNBLOCKED;
listAddNodeTail(server.unblocked_clients,c);
}
}
/* This function gets called when a blocked client timed out in order to
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
if (c->btype == BLOCKED_LIST) {
addReply(c,shared.nullmultibulk);
} else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
} else if (c->btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
}
/* Mass-unblock clients because something changed in the instance that makes
* blocking no longer safe. For example clients blocked in list operations
* in an instance which turns from master to slave is unsafe, so this function
* is called when a master turns into a slave.
*
* The semantics is to send an -UNBLOCKED error to the client, disconnecting
* it at the same time. */
void disconnectAllBlockedClients(void) {
listNode *ln;
listIter li;
listRewind(server.clients,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_BLOCKED) {
addReplySds(c,sdsnew(
"-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> slave?)\r\n"));
unblockClient(c);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
}