Skip to content

Commit

Permalink
Replication: better way to send a preamble before RDB payload.
Browse files Browse the repository at this point in the history
During the replication full resynchronization process, the RDB file is
transfered from the master to the slave. However there is a short
preamble to send, that is currently just the bulk payload length of the
file in the usual Redis form $..length..<CR><LF>.

This preamble used to be sent with a direct write call, assuming that
there was alway room in the socket output buffer to hold the few bytes
needed, however this does not scale in case we'll need to send more
stuff, and is not very robust code in general.

This commit introduces a more general mechanism to send a preamble up to
2GB in size (the max length of an sds string) in a non blocking way.
  • Loading branch information
antirez committed Aug 12, 2013
1 parent db862e8 commit 89ffba9
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 15 deletions.
6 changes: 4 additions & 2 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -701,8 +701,10 @@ void freeClient(redisClient *c) {
/* Master/slave cleanup.
* Case 1: we lost the connection with a slave. */
if (c->flags & REDIS_SLAVE) {
if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
close(c->repldbfd);
if (c->replstate == REDIS_REPL_SEND_BULK) {
if (c->repldbfd != -1) close(c->repldbfd);
if (c->replpreamble) sdsfree(c->replpreamble);
}
list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
ln = listSearchKey(l,c);
redisAssert(ln != NULL);
Expand Down
1 change: 1 addition & 0 deletions src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ typedef struct redisClient {
int repldbfd; /* replication DB file descriptor */
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
sds replpreamble; /* replication DB preamble. */
long long reploff; /* replication offset if this is our master */
long long repl_ack_off; /* replication ack offset, if this is a slave */
long long repl_ack_time;/* replication ack time, if this is a slave */
Expand Down
34 changes: 21 additions & 13 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -628,23 +628,28 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[REDIS_IOBUF_LEN];
ssize_t nwritten, buflen;

if (slave->repldboff == 0) {
/* Write the bulk write count before to transfer the DB. In theory here
* we don't know how much room there is in the output buffer of the
* socket, but in practice SO_SNDLOWAT (the minimum count for output
* operations) will never be smaller than the few bytes we need. */
sds bulkcount;

bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
slave->repldbsize);
if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
{
sdsfree(bulkcount);
/* Before sending the RDB file, we send the preamble as configured by the
* replication process. Currently the preamble is just the bulk count of
* the file in the form "$<length>\r\n". */
if (slave->replpreamble) {
nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
if (nwritten == -1) {
redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s",
strerror(errno));
freeClient(slave);
return;
}
sdsfree(bulkcount);
sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
slave->replpreamble = NULL;
/* fall through sending data. */
} else {
return;
}
}

/* If the preamble was already transfered, send the RDB bulk data. */
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
if (buflen <= 0) {
Expand Down Expand Up @@ -711,6 +716,9 @@ void updateSlavesWaitingBgsave(int bgsaveerr) {
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = REDIS_REPL_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize);

aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
Expand Down

0 comments on commit 89ffba9

Please sign in to comment.