Skip to content

Commit

Permalink
Accept multiple clients per iteration.
Browse files Browse the repository at this point in the history
When the listening sockets readable event is fired, we have the chance
to accept multiple clients instead of accepting a single one. This makes
Redis more responsive when there is a mass-connect event (for example
after the server startup), and in workloads where a connect-disconnect
pattern is used often, so that multiple clients are waiting to be
accepted continuously.

As a side effect, this commit makes the LOADING, BUSY, and similar
errors much faster to deliver to the client, making Redis more
responsive when there is to return errors to inform the clients that the
server is blocked in an not interruptible operation.
  • Loading branch information
antirez committed Apr 24, 2014
1 parent cac4bae commit 3a3458e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 17 deletions.
38 changes: 23 additions & 15 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ void copyClientOutputBuffer(redisClient *dst, redisClient *src) {
dst->reply_bytes = src->reply_bytes;
}

#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags) {
redisClient *c;
if ((c = createClient(fd)) == NULL) {
Expand Down Expand Up @@ -580,37 +581,44 @@ static void acceptCommonHandler(int fd, int flags) {
}

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);

cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(cfd,0);
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(cfd,0);
}

void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cfd;
int cfd, max = MAX_ACCEPTS_PER_CALL;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);

cfd = anetUnixAccept(server.neterr, fd);
if (cfd == ANET_ERR) {
redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
while(max--) {
cfd = anetUnixAccept(server.neterr, fd);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
acceptCommonHandler(cfd,REDIS_UNIX_SOCKET);
}
redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
acceptCommonHandler(cfd,REDIS_UNIX_SOCKET);
}


static void freeClientArgv(redisClient *c) {
int j;
for (j = 0; j < c->argc; j++)
Expand Down
12 changes: 10 additions & 2 deletions src/redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -1615,10 +1615,16 @@ int listenToPort(int port, int *fds, int *count) {
* server.bindaddr_count == 0. */
fds[*count] = anetTcp6Server(server.neterr,port,NULL,
server.tcp_backlog);
if (fds[*count] != ANET_ERR) (*count)++;
if (fds[*count] != ANET_ERR) {
anetNonBlock(NULL,fds[*count]);
(*count)++;
}
fds[*count] = anetTcpServer(server.neterr,port,NULL,
server.tcp_backlog);
if (fds[*count] != ANET_ERR) (*count)++;
if (fds[*count] != ANET_ERR) {
anetNonBlock(NULL,fds[*count]);
(*count)++;
}
/* Exit the loop if we were able to bind * on IPv4 or IPv6,
* otherwise fds[*count] will be ANET_ERR and we'll print an
* error and return to the caller with an error. */
Expand All @@ -1639,6 +1645,7 @@ int listenToPort(int port, int *fds, int *count) {
port, server.neterr);
return REDIS_ERR;
}
anetNonBlock(NULL,fds[*count]);
(*count)++;
}
return REDIS_OK;
Expand Down Expand Up @@ -1708,6 +1715,7 @@ void initServer() {
redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
exit(1);
}
anetNonBlock(NULL,server.sofd);
}

/* Abort if there are no listening sockets at all. */
Expand Down

0 comments on commit 3a3458e

Please sign in to comment.