Skip to content

Commit

Permalink
Threaded IO: implement handleClientsWithPendingWritesUsingThreads().
Browse files Browse the repository at this point in the history
This is just an experiment for now, there are a couple of race
conditions, mostly harmless for the performance gain experiment that
this commit represents so far.

The general idea here is to take Redis single threaded and instead
fan-out on expansive kernel calls: write(2) in this case, but the same
concept could be easily implemented for read(2) and protcol parsing.

However just threading writes like in this commit, is enough to evaluate
if the approach is sounding.
  • Loading branch information
antirez committed May 6, 2019
1 parent 0a6090b commit f468e65
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 9 deletions.
156 changes: 151 additions & 5 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1065,9 +1065,17 @@ void freeClient(client *c) {
* a context where calling freeClient() is not possible, because the client
* should be valid for the continuation of the flow of the program. */
void freeClientAsync(client *c) {
/* We need to handle concurrent access to the server.clients_to_close list
* only in the freeClientAsync() function, since it's the only function that
* may access the list while Redis uses I/O threads. All the other accesses
* are in the context of the main thread while the other threads are
* idle. */
static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
c->flags |= CLIENT_CLOSE_ASAP;
pthread_mutex_lock(&async_free_queue_mutex);
listAddNodeTail(server.clients_to_close,c);
pthread_mutex_unlock(&async_free_queue_mutex);
}

void freeClientsInAsyncFreeQueue(void) {
Expand All @@ -1091,7 +1099,12 @@ client *lookupClientByID(uint64_t id) {
}

/* Write data in output buffers to client. Return C_OK if the client
* is still valid after the call, C_ERR if it was freed. */
* is still valid after the call, C_ERR if it was freed because of some
* error.
*
* This function is called by threads, but always with handler_installed
* set to 0. So when handler_installed is set to 0 the function must be
* thread safe. */
int writeToClient(int fd, client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
Expand Down Expand Up @@ -1153,14 +1166,15 @@ int writeToClient(int fd, client *c, int handler_installed) {
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
/* FIXME: Fixme, use atomic var for this. */
server.stat_net_output_bytes += totwritten;
if (nwritten == -1) {
if (errno == EAGAIN) {
nwritten = 0;
} else {
serverLog(LL_VERBOSE,
"Error writing to client: %s", strerror(errno));
freeClient(c);
// serverLog(LL_VERBOSE,
// "Error writing to client: %s", strerror(errno));
freeClientAsync(c);
return C_ERR;
}
}
Expand All @@ -1173,11 +1187,15 @@ int writeToClient(int fd, client *c, int handler_installed) {
}
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
/* Note that writeToClient() is called in a threaded way, but
* adDeleteFileEvent() is not thread safe: however writeToClient()
* is always called with handler_installed set to 0 from threads
* so we are fine. */
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);

/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
freeClient(c);
freeClientAsync(c);
return C_ERR;
}
}
Expand Down Expand Up @@ -2452,3 +2470,131 @@ int processEventsWhileBlocked(void) {
}
return count;
}

/* =============================================================================
* Threaded I/O
* =========================================================================== */

#define SERVER_MAX_IO_THREADS 32

pthread_t io_threads[SERVER_MAX_IO_THREADS];
pthread_mutex_t io_threads_done_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t io_threads_done_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t io_threads_idle_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t io_threads_idle_cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t io_threads_start_cond = PTHREAD_COND_INITIALIZER;
int io_threads_done = 0; /* Number of threads that completed the work. */
int io_threads_idle = 0; /* Number of threads in idle state ready to go. */
list *io_threads_list[SERVER_MAX_IO_THREADS];

void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;

while(1) {
/* ... Wait for start ... */
pthread_mutex_lock(&io_threads_idle_mutex);
io_threads_idle++;
pthread_cond_signal(&io_threads_idle_cond);
printf("[%ld] Waiting start...\n", id);
pthread_cond_wait(&io_threads_start_cond,&io_threads_idle_mutex);
printf("[%ld] Started\n", id);
pthread_mutex_unlock(&io_threads_idle_mutex);
printf("%d to handle\n", (int)listLength(io_threads_list[id]));

/* ... Process ... */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c->fd,c,0);
}
listEmpty(io_threads_list[id]);

/* Report success. */
pthread_mutex_lock(&io_threads_done_mutex);
io_threads_done++;
pthread_cond_signal(&io_threads_done_cond);
pthread_mutex_unlock(&io_threads_done_mutex);
printf("[%ld] Done\n", id);
}
}

/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
pthread_t tid;

server.io_threads_num = 4;
for (int i = 0; i < server.io_threads_num; i++) {
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
io_threads_list[i] = listCreate();
}
}

int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */

printf("%d TOTAL\n", processed);

/* Wait for all threads to be ready. */
pthread_mutex_lock(&io_threads_idle_mutex);
while(io_threads_idle < server.io_threads_num) {
pthread_cond_wait(&io_threads_idle_cond,&io_threads_idle_mutex);
}
printf("All threads are idle: %d\n", io_threads_idle);
io_threads_idle = 0;
pthread_mutex_unlock(&io_threads_idle_mutex);

/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}

/* Start all threads. */
printf("Send start condition\n");
pthread_mutex_lock(&io_threads_done_mutex);
io_threads_done = 0;
pthread_cond_broadcast(&io_threads_start_cond);
pthread_mutex_unlock(&io_threads_done_mutex);

/* Wait for all threads to end their work. */
pthread_mutex_lock(&io_threads_done_mutex);
while(io_threads_done < server.io_threads_num) {
pthread_cond_wait(&io_threads_done_cond,&io_threads_done_mutex);
}
pthread_mutex_unlock(&io_threads_done_mutex);
printf("All threads finshed\n");

/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);

/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
return processed;
}
11 changes: 7 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1981,9 +1981,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
flushAppendOnlyFile(0);
}

/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();

/* Clear the paused clients flag if needed. */
clientsArePaused(); /* Don't check return value, just use the side effect.*/

Expand Down Expand Up @@ -2075,7 +2072,12 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
flushAppendOnlyFile(0);

/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
/* XXX: Put a condition based on number of waiting clients: if we
* have less than a given number of clients, use non threaded code. */
handleClientsWithPendingWritesUsingThreads();

/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();

/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
Expand Down Expand Up @@ -2861,6 +2863,7 @@ void initServer(void) {
slowlogInit();
latencyMonitorInit();
bioInit();
initThreadedIO();
server.initial_memory_usage = zmalloc_used_memory();
}

Expand Down
4 changes: 4 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,8 @@ struct redisServer {
int protected_mode; /* Don't accept external connections. */
int gopher_enabled; /* If true the server will reply to gopher
queries. Will still serve RESP2 queries. */
int io_threads_num; /* Number of IO threads to use. */

/* RDB / AOF loading information */
int loading; /* We are loading data from disk if true */
off_t loading_total_bytes;
Expand Down Expand Up @@ -1576,12 +1578,14 @@ void pauseClients(mstime_t duration);
int clientsArePaused(void);
int processEventsWhileBlocked(void);
int handleClientsWithPendingWrites(void);
int handleClientsWithPendingWritesUsingThreads(void);
int clientHasPendingReplies(client *c);
void unlinkClient(client *c);
int writeToClient(int fd, client *c, int handler_installed);
void linkClient(client *c);
void protectClient(client *c);
void unprotectClient(client *c);
void initThreadedIO(void);

#ifdef __GNUC__
void addReplyErrorFormat(client *c, const char *fmt, ...)
Expand Down

0 comments on commit f468e65

Please sign in to comment.