diff --git a/redis.conf b/redis.conf index 2f176f247..db1402795 100644 --- a/redis.conf +++ b/redis.conf @@ -227,6 +227,28 @@ slave-read-only yes # be a good idea. repl-disable-tcp-nodelay no +# Set the replication backlog size. The backlog is a buffer that accumulates +# slave data when slaves are disconnected for some time, so that when a slave +# wants to reconnect again, often a full resync is not needed, but a partial +# resync is enough, just passing the portion of data the slave missed while +# disconnected. +# +# The biggest the replication backlog, the longer the time the slave can be +# disconnected and later be able to perform a partial resynchronization. +# +# The backlog is only allocated once there is at least a slave connected. +# +# repl-backlog-size 1mb + +# After a master has no longer connected slaves for some time, the backlog +# will be freed. The following option configures the amount of seconds that +# need to elapse, starting from the time the last slave disconnected, for +# the backlog buffer to be freed. +# +# A value of 0 means to never release the backlog. +# +# repl-backlog-ttl 3600 + # The slave priority is an integer number published by Redis in the INFO output. # It is used by Redis Sentinel in order to select a slave to promote into a # master if the master is no longer working correctly. diff --git a/src/config.c b/src/config.c index fea3a0349..32b949411 100644 --- a/src/config.c +++ b/src/config.c @@ -238,6 +238,18 @@ void loadServerConfigFromString(char *config) { } else if (!strcasecmp(argv[0],"repl-disable-tcp-nodelay") && argc==2) { if ((server.repl_disable_tcp_nodelay = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; + } else if (!strcasecmp(argv[0],"repl-backlog-size") && argc == 2) { + long long size = strtoll(argv[0],NULL,10); + if (size <= 0) { + err = "repl-backlog-size must be 1 or greater."; + goto loaderr; + } + resizeReplicationBacklog(size); + } else if (!strcasecmp(argv[0],"repl-backlog-ttl") && argc == 2) { + server.repl_backlog_time_limit = atoi(argv[1]); + if (server.repl_backlog_time_limit < 0) { + err = "repl-backlog-ttl can't be negative "; + goto loaderr; } } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) { server.masterauth = zstrdup(argv[1]); @@ -719,6 +731,12 @@ void configSetCommand(redisClient *c) { } else if (!strcasecmp(c->argv[2]->ptr,"repl-timeout")) { if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0) goto badfmt; server.repl_timeout = ll; + } else if (!strcasecmp(c->argv[2]->ptr,"repl-backlog-size")) { + if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0) goto badfmt; + resizeReplicationBacklog(ll); + } else if (!strcasecmp(c->argv[2]->ptr,"repl-backlog-ttl")) { + if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt; + server.repl_backlog_time_limit = ll; } else if (!strcasecmp(c->argv[2]->ptr,"watchdog-period")) { if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt; if (ll) @@ -832,6 +850,8 @@ void configGetCommand(redisClient *c) { config_get_numerical_field("databases",server.dbnum); config_get_numerical_field("repl-ping-slave-period",server.repl_ping_slave_period); config_get_numerical_field("repl-timeout",server.repl_timeout); + config_get_numerical_field("repl-backlog-size",server.repl_backlog_size); + config_get_numerical_field("repl-backlog-ttl",server.repl_backlog_time_limit); config_get_numerical_field("maxclients",server.maxclients); config_get_numerical_field("watchdog-period",server.watchdog_period); config_get_numerical_field("slave-priority",server.slave_priority); diff --git a/src/db.c b/src/db.c index 8429aa4b5..2de7e2c4c 100644 --- a/src/db.c +++ b/src/db.c @@ -511,8 +511,7 @@ void propagateExpire(redisDb *db, robj *key) { if (server.aof_state != REDIS_AOF_OFF) feedAppendOnlyFile(server.delCommand,db->id,argv,2); - if (listLength(server.slaves)) - replicationFeedSlaves(server.slaves,db->id,argv,2); + replicationFeedSlaves(server.slaves,db->id,argv,2); decrRefCount(argv[0]); decrRefCount(argv[1]); diff --git a/src/multi.c b/src/multi.c index dfac15c34..ba8baf0a2 100644 --- a/src/multi.c +++ b/src/multi.c @@ -108,8 +108,7 @@ void execCommandReplicateMulti(redisClient *c) { if (server.aof_state != REDIS_AOF_OFF) feedAppendOnlyFile(server.multiCommand,c->db->id,&multistring,1); - if (listLength(server.slaves)) - replicationFeedSlaves(server.slaves,c->db->id,&multistring,1); + replicationFeedSlaves(server.slaves,c->db->id,&multistring,1); decrRefCount(multistring); } diff --git a/src/networking.c b/src/networking.c index 66a2702c5..b90936011 100644 --- a/src/networking.c +++ b/src/networking.c @@ -87,6 +87,7 @@ redisClient *createClient(int fd) { c->ctime = c->lastinteraction = server.unixtime; c->authenticated = 0; c->replstate = REDIS_REPL_NONE; + c->reploff = 0; c->slave_listening_port = 0; c->reply = listCreate(); c->reply_bytes = 0; @@ -595,12 +596,42 @@ void disconnectSlaves(void) { } } +/* This function is called when the slave lose the connection with the + * master into an unexpected way. */ +void replicationHandleMasterDisconnection(void) { + server.master = NULL; + server.repl_state = REDIS_REPL_CONNECT; + server.repl_down_since = server.unixtime; + /* We lost connection with our master, force our slaves to resync + * with us as well to load the new data set. + * + * If server.masterhost is NULL the user called SLAVEOF NO ONE so + * slave resync is not needed. */ + if (server.masterhost != NULL) disconnectSlaves(); +} + void freeClient(redisClient *c) { listNode *ln; /* If this is marked as current client unset it */ if (server.current_client == c) server.current_client = NULL; + /* If it is our master that's beging disconnected we should make sure + * to cache the state to try a partial resynchronization later. + * + * Note that before doing this we make sure that the client is not in + * some unexpected state, by checking its flags. */ + if (server.master && + (c->flags & REDIS_MASTER) && + !(c->flags & (REDIS_CLOSE_AFTER_REPLY| + REDIS_CLOSE_ASAP| + REDIS_BLOCKED| + REDIS_UNBLOCKED))) + { + replicationCacheMaster(c); + return; + } + /* Note that if the client we are freeing is blocked into a blocking * call, we have to set querybuf to NULL *before* to call * unblockClientWaitingData() to avoid processInputBuffer() will get @@ -620,16 +651,21 @@ void freeClient(redisClient *c) { pubsubUnsubscribeAllPatterns(c,0); dictRelease(c->pubsub_channels); listRelease(c->pubsub_patterns); - /* Obvious cleanup */ - aeDeleteFileEvent(server.el,c->fd,AE_READABLE); - aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); + /* Close socket, unregister events, and remove list of replies and + * accumulated arguments. */ + if (c->fd != -1) { + aeDeleteFileEvent(server.el,c->fd,AE_READABLE); + aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); + close(c->fd); + } listRelease(c->reply); freeClientArgv(c); - close(c->fd); /* Remove from the list of clients */ - ln = listSearchKey(server.clients,c); - redisAssert(ln != NULL); - listDelNode(server.clients,ln); + if (c->fd != -1) { + ln = listSearchKey(server.clients,c); + redisAssert(ln != NULL); + listDelNode(server.clients,ln); + } /* When client was just unblocked because of a blocking operation, * remove it from the list with unblocked clients. */ if (c->flags & REDIS_UNBLOCKED) { @@ -647,20 +683,15 @@ void freeClient(redisClient *c) { ln = listSearchKey(l,c); redisAssert(ln != NULL); listDelNode(l,ln); + /* We need to remember the time when we started to have zero + * attached slaves, as after some time we'll free the replication + * backlog. */ + if (c->flags & REDIS_SLAVE && listLength(server.slaves) == 0) + server.repl_no_slaves_since = server.unixtime; } /* Case 2: we lost the connection with the master. */ - if (c->flags & REDIS_MASTER) { - server.master = NULL; - server.repl_state = REDIS_REPL_CONNECT; - server.repl_down_since = server.unixtime; - /* We lost connection with our master, force our slaves to resync - * with us as well to load the new data set. - * - * If server.masterhost is NULL the user called SLAVEOF NO ONE so - * slave resync is not needed. */ - if (server.masterhost != NULL) disconnectSlaves(); - } + if (c->flags & REDIS_MASTER) replicationHandleMasterDisconnection(); /* If this client was scheduled for async freeing we need to remove it * from the queue. */ @@ -1059,6 +1090,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { if (nread) { sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; + if (c->flags & REDIS_MASTER) c->reploff += nread; } else { server.current_client = NULL; return; diff --git a/src/redis.c b/src/redis.c index 863c54e25..7300cd89f 100644 --- a/src/redis.c +++ b/src/redis.c @@ -220,6 +220,7 @@ struct redisCommand redisCommandTable[] = { {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0}, {"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0}, {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0}, + {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0}, {"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0}, {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0}, {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0}, @@ -1202,6 +1203,8 @@ void initServerConfig() { server.masterhost = NULL; server.masterport = 6379; server.master = NULL; + server.cached_master = NULL; + server.repl_master_initial_offset = -1; server.repl_state = REDIS_REPL_NONE; server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT; server.repl_serve_stale_data = 1; @@ -1209,6 +1212,16 @@ void initServerConfig() { server.repl_down_since = time(NULL); server.repl_disable_tcp_nodelay = 0; server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY; + server.master_repl_offset = 0; + + /* Replication partial resync backlog */ + server.repl_backlog = NULL; + server.repl_backlog_size = REDIS_DEFAULT_REPL_BACKLOG_SIZE; + server.repl_backlog_histlen = 0; + server.repl_backlog_idx = 0; + server.repl_backlog_off = 0; + server.repl_backlog_time_limit = REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT; + server.repl_no_slaves_since = time(NULL); /* Client output buffer limits */ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0; @@ -1522,7 +1535,7 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, { if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); - if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves)) + if (flags & REDIS_PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); } @@ -2151,13 +2164,15 @@ sds genRedisInfoString(char *section) { "master_link_status:%s\r\n" "master_last_io_seconds_ago:%d\r\n" "master_sync_in_progress:%d\r\n" + "slave_repl_offset:%lld\r\n" ,server.masterhost, server.masterport, (server.repl_state == REDIS_REPL_CONNECTED) ? "up" : "down", server.master ? ((int)(server.unixtime-server.master->lastinteraction)) : -1, - server.repl_state == REDIS_REPL_TRANSFER + server.repl_state == REDIS_REPL_TRANSFER, + server.master ? server.master->reploff : -1 ); if (server.repl_state == REDIS_REPL_TRANSFER) { @@ -2215,6 +2230,17 @@ sds genRedisInfoString(char *section) { slaveid++; } } + info = sdscatprintf(info, + "master_repl_offset:%lld\r\n" + "repl_backlog_active:%d\r\n" + "repl_backlog_size:%lld\r\n" + "repl_backlog_first_byte_offset:%lld\r\n" + "repl_backlog_histlen:%lld\r\n", + server.master_repl_offset, + server.repl_backlog != NULL, + server.repl_backlog_size, + server.repl_backlog_off, + server.repl_backlog_histlen); } /* CPU */ diff --git a/src/redis.h b/src/redis.h index 8cb682fb0..90b95c5cf 100644 --- a/src/redis.h +++ b/src/redis.h @@ -93,6 +93,9 @@ #define REDIS_REPL_PING_SLAVE_PERIOD 10 #define REDIS_RUN_ID_SIZE 40 #define REDIS_OPS_SEC_SAMPLES 16 +#define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024) /* 1mb */ +#define REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT (60*60) /* 1 hour */ +#define REDIS_REPL_BACKLOG_MIN_SIZE (1024*16) /* 16k */ /* Protocol and I/O related defines */ #define REDIS_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */ @@ -100,6 +103,7 @@ #define REDIS_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */ #define REDIS_INLINE_MAX_SIZE (1024*64) /* Max size of inline reads */ #define REDIS_MBULK_BIG_ARG (1024*32) +#define REDIS_LONGSTR_SIZE 21 /* Bytes needed for long -> str */ /* Hash table parameters */ #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */ @@ -407,7 +411,8 @@ typedef struct redisClient { long bulklen; /* length of bulk argument in multi bulk request */ list *reply; unsigned long reply_bytes; /* Tot bytes of objects in reply list */ - int sentlen; + int sentlen; /* Amount of bytes already sent in the current + buffer or object being sent. */ time_t ctime; /* Client creation time */ time_t lastinteraction; /* time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; @@ -417,6 +422,8 @@ typedef struct redisClient { int repldbfd; /* replication DB file descriptor */ long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ + long long reploff; /* replication offset if this is our master */ + char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */ int slave_listening_port; /* As configured with: SLAVECONF listening-port */ multiState mstate; /* MULTI/EXEC state */ blockingState bpop; /* blocking state */ @@ -662,7 +669,6 @@ struct redisServer { list *clients; /* List of active clients */ list *clients_to_close; /* Clients to close asynchronously */ list *slaves, *monitors; /* List of slaves and MONITORs */ - int slaveseldb; /* Last SELECTed DB in replication output */ redisClient *current_client; /* Current client, only used on crash report */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ @@ -745,13 +751,27 @@ struct redisServer { int syslog_enabled; /* Is syslog enabled? */ char *syslog_ident; /* Syslog ident */ int syslog_facility; /* Syslog facility */ - /* Slave specific fields */ + /* Replication (master) */ + int slaveseldb; /* Last SELECTed DB in replication output */ + long long master_repl_offset; /* Global replication offset */ + int repl_ping_slave_period; /* Master pings the slave every N seconds */ + char *repl_backlog; /* Replication backlog for partial syncs */ + long long repl_backlog_size; /* Backlog circular buffer size */ + long long repl_backlog_histlen; /* Backlog actual data length */ + long long repl_backlog_idx; /* Backlog circular buffer current offset */ + long long repl_backlog_off; /* Replication offset of first byte in the + backlog buffer. */ + time_t repl_backlog_time_limit; /* Time without slaves after the backlog + gets released. */ + time_t repl_no_slaves_since; /* We have no slaves since that time. + Only valid if server.slaves len is 0. */ + /* Replication (slave) */ char *masterauth; /* AUTH with this password with master */ char *masterhost; /* Hostname of master */ int masterport; /* Port of master */ - int repl_ping_slave_period; /* Master pings the slave every N seconds */ int repl_timeout; /* Timeout after N seconds of master idle */ redisClient *master; /* Client that is master for this slave */ + redisClient *cached_master; /* Cached master to be reused for PSYNC. */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ int repl_state; /* Replication status if the instance is a slave */ off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ @@ -766,6 +786,8 @@ struct redisServer { time_t repl_down_since; /* Unix time at which link with master went down */ int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */ int slave_priority; /* Reported in INFO and used by Sentinel. */ + char repl_master_runid[REDIS_RUN_ID_SIZE+1]; /* Master run id for PSYNC. */ + long long repl_master_initial_offset; /* Master PSYNC offset. */ /* Limits */ unsigned int maxclients; /* Max number of simultaneous clients */ unsigned long long maxmemory; /* Max number of memory bytes to use */ @@ -930,6 +952,7 @@ void exitFromChild(int retcode); redisClient *createClient(int fd); void closeTimedoutClients(void); void freeClient(redisClient *c); +void freeClientAsync(redisClient *c); void resetClient(redisClient *c); void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask); void addReply(redisClient *c, robj *obj); @@ -1053,6 +1076,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc); void updateSlavesWaitingBgsave(int bgsaveerr); void replicationCron(void); +void replicationHandleMasterDisconnection(void); +void replicationCacheMaster(redisClient *c); +void resizeReplicationBacklog(long long newsize); /* Generic persistence functions */ void startLoading(FILE *fp); diff --git a/src/replication.c b/src/replication.c index 872cf4735..0458dc0ee 100644 --- a/src/replication.c +++ b/src/replication.c @@ -37,13 +37,234 @@ #include #include +void replicationDiscardCachedMaster(void); +void replicationResurrectCachedMaster(int newfd); + /* ---------------------------------- MASTER -------------------------------- */ +void createReplicationBacklog(void) { + redisAssert(server.repl_backlog == NULL); + server.repl_backlog = zmalloc(server.repl_backlog_size); + server.repl_backlog_histlen = 0; + server.repl_backlog_idx = 0; + /* When a new backlog buffer is created, we increment the replication + * offset by one to make sure we'll not be able to PSYNC with any + * previous slave. This is needed because we avoid incrementing the + * master_repl_offset if no backlog exists nor slaves are attached. */ + server.master_repl_offset++; + + /* We don't have any data inside our buffer, but virtually the first + * byte we have is the next byte that will be generated for the + * replication stream. */ + server.repl_backlog_off = server.master_repl_offset+1; +} + +/* This function is called when the user modifies the replication backlog + * size at runtime. It is up to the function to both update the + * server.repl_backlog_size and to resize the buffer and setup it so that + * it contains the same data as the previous one (possibly less data, but + * the most recent bytes, or the same data and more free space in case the + * buffer is enlarged). */ +void resizeReplicationBacklog(long long newsize) { + if (newsize < REDIS_REPL_BACKLOG_MIN_SIZE) + newsize = REDIS_REPL_BACKLOG_MIN_SIZE; + if (server.repl_backlog_size == newsize) return; + + server.repl_backlog_size = newsize; + if (server.repl_backlog != NULL) { + /* What we actually do is to flush the old buffer and realloc a new + * empty one. It will refill with new data incrementally. + * The reason is that copying a few gigabytes adds latency and even + * worse often we need to alloc additional space before freeing the + * old buffer. */ + zfree(server.repl_backlog); + server.repl_backlog = zmalloc(server.repl_backlog_size); + server.repl_backlog_histlen = 0; + server.repl_backlog_idx = 0; + /* Next byte we have is... the next since the buffer is emtpy. */ + server.repl_backlog_off = server.master_repl_offset+1; + } +} + +void freeReplicationBacklog(void) { + redisAssert(server.repl_backlog != NULL); + zfree(server.repl_backlog); + server.repl_backlog = NULL; +} + +/* Add data to the replication backlog. + * This function also increments the global replication offset stored at + * server.master_repl_offset, because there is no case where we want to feed + * the backlog without incrementing the buffer. */ +void feedReplicationBacklog(void *ptr, size_t len) { + unsigned char *p = ptr; + + server.master_repl_offset += len; + + /* This is a circular buffer, so write as much data we can at every + * iteration and rewind the "idx" index if we reach the limit. */ + while(len) { + size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; + if (thislen > len) thislen = len; + memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); + server.repl_backlog_idx += thislen; + if (server.repl_backlog_idx == server.repl_backlog_size) + server.repl_backlog_idx = 0; + len -= thislen; + p += thislen; + server.repl_backlog_histlen += thislen; + } + if (server.repl_backlog_histlen > server.repl_backlog_size) + server.repl_backlog_histlen = server.repl_backlog_size; + /* Set the offset of the first byte we have in the backlog. */ + server.repl_backlog_off = server.master_repl_offset - + server.repl_backlog_histlen + 1; +} + +/* Wrapper for feedReplicationBacklog() that takes Redis string objects + * as input. */ +void feedReplicationBacklogWithObject(robj *o) { + char llstr[REDIS_LONGSTR_SIZE]; + void *p; + size_t len; + + if (o->encoding == REDIS_ENCODING_INT) { + len = ll2string(llstr,sizeof(llstr),(long)o->ptr); + p = llstr; + } else { + len = sdslen(o->ptr); + p = o->ptr; + } + feedReplicationBacklog(p,len); +} + +#define FEEDSLAVE_BUF_SIZE (1024*64) void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listNode *ln; listIter li; - int j; + int j, i, len; + char buf[FEEDSLAVE_BUF_SIZE], *b = buf; + char llstr[REDIS_LONGSTR_SIZE]; + int buf_left = FEEDSLAVE_BUF_SIZE; + robj *o; + + /* If there aren't slaves, and there is no backlog buffer to populate, + * we can return ASAP. */ + if (server.repl_backlog == NULL && listLength(slaves) == 0) return; + + /* We can't have slaves attached and no backlog. */ + redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); + + /* What we do here is to try to write as much data as possible in a static + * buffer "buf" that is used to create an object that is later sent to all + * the slaves. This way we do the decoding only one time for most commands + * not containing big payloads. */ + + /* Create the SELECT command into the static buffer if needed. */ + if (server.slaveseldb != dictid) { + char *selectcmd; + size_t sclen; + + if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) { + selectcmd = shared.select[dictid]->ptr; + sclen = sdslen(selectcmd); + memcpy(b,selectcmd,sclen); + b += sclen; + buf_left -= sclen; + } else { + int dictid_len; + + dictid_len = ll2string(llstr,sizeof(llstr),dictid); + sclen = snprintf(b,buf_left,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", + dictid_len, llstr); + b += sclen; + buf_left -= sclen; + } + } + server.slaveseldb = dictid; + + /* Add the multi bulk reply size to the static buffer, that is, the number + * of arguments of the command to send to every slave. */ + b[0] = '*'; + len = ll2string(b+1,REDIS_LONGSTR_SIZE,argc); + b += len+1; + buf_left -= len; + b[0] = '\r'; + b[1] = '\n'; + b += 2; + buf_left -= 2; + + /* Try to use the static buffer for as much arguments is possible. */ + for (j = 0; j < argc; j++) { + int objlen; + char *objptr; + + if (argv[j]->encoding != REDIS_ENCODING_RAW && + argv[j]->encoding != REDIS_ENCODING_INT) { + redisPanic("Unexpected encoding"); + } + if (argv[j]->encoding == REDIS_ENCODING_RAW) { + objlen = sdslen(argv[j]->ptr); + objptr = argv[j]->ptr; + } else { + objlen = ll2string(llstr,REDIS_LONGSTR_SIZE,(long)argv[j]->ptr); + objptr = llstr; + } + /* We need enough space for bulk reply encoding, newlines, and + * the data itself. */ + if (buf_left < objlen+REDIS_LONGSTR_SIZE+32) break; + + /* Write $...CRLF */ + b[0] = '$'; + len = ll2string(b+1,REDIS_LONGSTR_SIZE,objlen); + b += len+1; + buf_left -= len; + b[0] = '\r'; + b[1] = '\n'; + b += 2; + buf_left -= 2; + + /* And data plus CRLF */ + memcpy(b,objptr,objlen); + b += objlen; + buf_left -= objlen; + b[0] = '\r'; + b[1] = '\n'; + b += 2; + buf_left -= 2; + } + + /* Create an object with the static buffer content. */ + redisAssert(buf_left < FEEDSLAVE_BUF_SIZE); + o = createStringObject(buf,b-buf); + + /* If we have a backlog, populate it with data and increment + * the global replication offset. */ + if (server.repl_backlog) { + feedReplicationBacklogWithObject(o); + for (i = j; i < argc; i++) { + char aux[REDIS_LONGSTR_SIZE+3]; + long objlen = stringObjectLen(argv[i]); + + /* We need to feed the buffer with the object as a bulk reply + * not just as a plain string, so create the $..CRLF payload len + * ad add the final CRLF */ + aux[0] = '$'; + len = ll2string(aux+1,objlen,sizeof(aux)-1); + aux[len+1] = '\r'; + aux[len+2] = '\n'; + feedReplicationBacklog(aux,len+3); + feedReplicationBacklogWithObject(argv[j]); + feedReplicationBacklogWithObject(shared.crlf); + } + } + /* Write data to slaves. Here we do two things: + * 1) We write the "o" object that was created using the accumulated + * static buffer. + * 2) We write any additional argument of the command to replicate that + * was not written inside the static buffer for lack of space. + */ listRewind(slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; @@ -54,29 +275,16 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */ - if (server.slaveseldb != dictid) { - robj *selectcmd; - - if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) { - selectcmd = shared.select[dictid]; - incrRefCount(selectcmd); - } else { - char dictid_str[64]; - int dictid_len; - - dictid_len = ll2string(dictid_str,sizeof(dictid_str),dictid); - selectcmd = createObject(REDIS_STRING, - sdscatprintf(sdsempty(), - "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", - dictid_len, dictid_str)); - } - addReply(slave,selectcmd); - decrRefCount(selectcmd); - } - addReplyMultiBulkLen(slave,argc); - for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); + + /* First, trasmit the object created from the static buffer. */ + addReply(slave,o); + + /* Finally any additional argument that was not stored inside the + * static buffer if any (from j to argc). */ + for (i = j; i < argc; i++) + addReplyBulk(slave,argv[i]); } - server.slaveseldb = dictid; + decrRefCount(o); } void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) { @@ -120,6 +328,120 @@ void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj ** decrRefCount(cmdobj); } +/* Feed the slave 'c' with the replication backlog starting from the + * specified 'offset' up to the end of the backlog. */ +long long addReplyReplicationBacklog(redisClient *c, long long offset) { + long long j, skip, len; + +// printf("SLAVE REQUEST %lld\n", offset); + + if (server.repl_backlog_histlen == 0) { +// printf("NO HISTORY\n"); + return 0; + } + +// printf("FIRST BYTE WE HAVE %lld\n", server.repl_backlog_off); +// printf("HISTLEN %lld\n", server.repl_backlog_histlen); +// printf("IDX %lld\n", server.repl_backlog_idx); + + /* Compute the amount of bytes we need to discard. */ + skip = offset - server.repl_backlog_off; +// printf("SKIP %lld\n", skip); + + /* Point j to the oldest byte, that is actaully our + * server.repl_backlog_off byte. */ + j = (server.repl_backlog_idx + + (server.repl_backlog_size-server.repl_backlog_histlen)) % + server.repl_backlog_size; +// printf("J %lld\n", j); + + /* Discard the amount of data to seek to the specified 'offset'. */ + j = (j + skip) % server.repl_backlog_size; + + /* Feed slave with data. Since it is a circular buffer we have to + * split the reply in two parts if we are cross-boundary. */ + len = server.repl_backlog_histlen - skip; +// printf("LEN %lld\n", len); + while(len) { + long long thislen = + ((server.repl_backlog_size - j) < len) ? + (server.repl_backlog_size - j) : len; + +// printf("WRITE %lld\n", thislen); + addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); + len -= thislen; + j = 0; + } + return server.repl_backlog_histlen - skip; +} + +/* This function handles the PSYNC command from the point of view of a + * master receiving a request for partial resynchronization. + * + * On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed + * with the usual full resync. */ +int masterTryPartialResynchronization(redisClient *c) { + long long psync_offset, psync_len; + char *master_runid = c->argv[1]->ptr; + + /* Is the runid of this master the same advertised by the wannabe slave + * via PSYNC? If runid changed this master is a different instance and + * there is no way to continue. */ + if (strcasecmp(master_runid, server.runid)) { + /* Run id "?" is used by slaves that want to force a full resync. */ + if (master_runid[0] != '?') { + redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: " + "Runid mismatch (Client asked for '%s', I'm '%s')", + master_runid, server.runid); + } else { + redisLog(REDIS_NOTICE,"Full resync requested by slave."); + } + goto need_full_resync; + } + + /* We still have the data our slave is asking for? */ + if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != + REDIS_OK) goto need_full_resync; + if (!server.repl_backlog || + psync_offset < server.repl_backlog_off || + psync_offset >= (server.repl_backlog_off + server.repl_backlog_size)) + { + redisLog(REDIS_NOTICE, + "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset); + goto need_full_resync; + } + + /* If we reached this point, we are able to perform a partial resync: + * 1) Set client state to make it a slave. + * 2) Inform the client we can continue with +CONTINUE + * 3) Send the backlog data (from the offset to the end) to the slave. */ + c->flags |= REDIS_SLAVE; + c->replstate = REDIS_REPL_ONLINE; + listAddNodeTail(server.slaves,c); + addReplySds(c,sdsnew("+CONTINUE\r\n")); + psync_len = addReplyReplicationBacklog(c,psync_offset); + redisLog(REDIS_NOTICE, + "Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset); + /* Note that we don't need to set the selected DB at server.slaveseldb + * to -1 to force the master to emit SELECT, since the slave already + * has this state from the previous connection with the master. */ + + return REDIS_OK; /* The caller can return, no full resync needed. */ + +need_full_resync: + /* We need a full resync for some reason... notify the client. */ + psync_offset = server.master_repl_offset; + /* Add 1 to psync_offset if it the replication backlog does not exists + * as when it will be created later we'll increment the offset by one. */ + if (server.repl_backlog == NULL) psync_offset++; + addReplySds(c, + sdscatprintf(sdsempty(),"+FULLRESYNC %s %lld\r\n", + server.runid, + psync_offset)); + return REDIS_ERR; +} + +/* SYNC ad PSYNC command implemenation. */ void syncCommand(redisClient *c) { /* ignore SYNC if already slave or in monitor mode */ if (c->flags & REDIS_SLAVE) return; @@ -136,11 +458,24 @@ void syncCommand(redisClient *c) { * buffer registering the differences between the BGSAVE and the current * dataset, so that we can copy to other slaves if needed. */ if (listLength(c->reply) != 0) { - addReplyError(c,"SYNC is invalid with pending input"); + addReplyError(c,"SYNC and PSYNC are invalid with pending input"); return; } - redisLog(REDIS_NOTICE,"Slave ask for synchronization"); + redisLog(REDIS_NOTICE,"Slave asks for synchronization"); + + /* Try a partial resynchronization if this is a PSYNC command. + * If it fails, we continue with usual full resynchronization, however + * when this happens masterTryPartialResynchronization() already + * replied with: + * + * +FULLRESYNC + * + * So the slave knows the new runid and offset to try a PSYNC later + * if the connection with the master is lost. */ + if (!strcasecmp(c->argv[0]->ptr,"psync") && + masterTryPartialResynchronization(c) == REDIS_OK) return; + /* Here we need to check if there is a background saving operation * in progress, or if it is required to start one */ if (server.rdb_child_pid != -1) { @@ -185,6 +520,8 @@ void syncCommand(redisClient *c) { c->flags |= REDIS_SLAVE; server.slaveseldb = -1; /* Force to re-emit the SELECT command. */ listAddNodeTail(server.slaves,c); + if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) + createReplicationBacklog(); return; } @@ -452,6 +789,9 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { server.master->flags |= REDIS_MASTER; server.master->authenticated = 1; server.repl_state = REDIS_REPL_CONNECTED; + server.master->reploff = server.repl_master_initial_offset; + memcpy(server.master->replrunid, server.repl_master_runid, + sizeof(server.repl_master_runid)); redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success"); /* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending @@ -481,8 +821,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { /* Send a synchronous command to the master. Used to send AUTH and * REPLCONF commands before starting the replication with SYNC. * - * On success NULL is returned. - * On error an sds string describing the error is returned. + * The command returns an sds string representing the result of the + * operation. On error the first byte is a "-". */ char *sendSynchronousCommand(int fd, ...) { va_list ap; @@ -504,7 +844,7 @@ char *sendSynchronousCommand(int fd, ...) { /* Transfer command to the server. */ if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) { sdsfree(cmd); - return sdscatprintf(sdsempty(),"Writing to master: %s", + return sdscatprintf(sdsempty(),"-Writing to master: %s", strerror(errno)); } sdsfree(cmd); @@ -512,22 +852,123 @@ char *sendSynchronousCommand(int fd, ...) { /* Read the reply from the server. */ if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1) { - return sdscatprintf(sdsempty(),"Reading from master: %s", + return sdscatprintf(sdsempty(),"-Reading from master: %s", strerror(errno)); } + return sdsnew(buf); +} + +/* Try a partial resynchronization with the master if we are about to reconnect. + * If there is no cached master structure, at least try to issue a + * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC + * command in order to obtain the master run id and the master replication + * global offset. + * + * This function is designed to be called from syncWithMaster(), so the + * following assumptions are made: + * + * 1) We pass the function an already connected socket "fd". + * 2) This function does not close the file descriptor "fd". However in case + * of successful partial resynchronization, the function will reuse + * 'fd' as file descriptor of the server.master client structure. + * + * The function returns: + * + * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue. + * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed. + * In this case the master run_id and global replication + * offset is saved. + * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and + * the caller should fall back to SYNC. + */ + +#define PSYNC_CONTINUE 0 +#define PSYNC_FULLRESYNC 1 +#define PSYNC_NOT_SUPPORTED 2 +int slaveTryPartialResynchronization(int fd) { + char *psync_runid; + char psync_offset[32]; + sds reply; + + /* Initially set repl_master_initial_offset to -1 to mark the current + * master run_id and offset as not valid. Later if we'll be able to do + * a FULL resync using the PSYNC command we'll set the offset at the + * right value, so that this information will be propagated to the + * client structure representing the master into server.master. */ + server.repl_master_initial_offset = -1; + + if (server.cached_master) { + psync_runid = server.cached_master->replrunid; + snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); + redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); + } else { + redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)"); + psync_runid = "?"; + memcpy(psync_offset,"-1",3); + } + + /* Issue the PSYNC command */ + reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL); + + if (!strncmp(reply,"+FULLRESYNC",11)) { + char *runid, *offset; + + /* FULL RESYNC, parse the reply in order to extract the run id + * and the replication offset. */ + runid = strchr(reply,' '); + if (runid) { + runid++; + offset = strchr(runid,' '); + if (offset) offset++; + } + if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) { + redisLog(REDIS_WARNING, + "Master replied with wrong +FULLRESYNC syntax."); + } else { + memcpy(server.repl_master_runid, runid, offset-runid-1); + server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0'; + server.repl_master_initial_offset = strtoll(offset,NULL,10); + redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld", + server.repl_master_runid, + server.repl_master_initial_offset); + } + /* We are going to full resync, discard the cached master structure. */ + replicationDiscardCachedMaster(); + sdsfree(reply); + return PSYNC_FULLRESYNC; + } - /* Check for errors from the server. */ - if (buf[0] != '+') { - return sdscatprintf(sdsempty(),"Error from master: %s", buf); + if (!strncmp(reply,"+CONTINUE",9)) { + /* Partial resync was accepted, set the replication state accordingly */ + redisLog(REDIS_NOTICE, + "Successful partial resynchronization with master."); + sdsfree(reply); + replicationResurrectCachedMaster(fd); + return PSYNC_CONTINUE; } - return NULL; /* No errors. */ + /* If we reach this point we receied either an error since the master does + * not understand PSYNC, or an unexpected reply from the master. + * Reply with PSYNC_NOT_SUPPORTED in both cases. */ + + if (strncmp(reply,"-ERR",4)) { + /* If it's not an error, log the unexpected event. */ + redisLog(REDIS_WARNING, + "Unexpected reply to PSYNC from master: %s", reply); + } else { + redisLog(REDIS_NOTICE, + "Master does not support PSYNC or is in " + "error state (reply: %s)", reply); + } + sdsfree(reply); + replicationDiscardCachedMaster(); + return PSYNC_NOT_SUPPORTED; } void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { char tmpfile[256], *err; int dfd, maxtries = 5; - int sockerr = 0; + int sockerr = 0, psync_result; socklen_t errlen = sizeof(sockerr); REDIS_NOTUSED(el); REDIS_NOTUSED(privdata); @@ -600,11 +1041,12 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { /* AUTH with the master if required. */ if(server.masterauth) { err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL); - if (err) { + if (err[0] == '-') { redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err); sdsfree(err); goto error; } + sdsfree(err); } /* Set the slave port, so that Master's INFO command can list the @@ -616,17 +1058,33 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { sdsfree(port); /* Ignore the error if any, not all the Redis versions support * REPLCONF listening-port. */ - if (err) { - redisLog(REDIS_NOTICE,"(non critical): Master does not understand REPLCONF listening-port: %s", err); - sdsfree(err); + if (err[0] == '-') { + redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err); } + sdsfree(err); } - /* Issue the SYNC command */ - if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { - redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s", - strerror(errno)); - goto error; + /* Try a partial resynchonization. If we don't have a cached master + * slaveTryPartialResynchronization() will at least try to use PSYNC + * to start a full resynchronization so that we get the master run id + * and the global offset, to try a partial resync at the next + * reconnection attempt. */ + psync_result = slaveTryPartialResynchronization(fd); + if (psync_result == PSYNC_CONTINUE) { + redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); + return; + } + + /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC + * and the server.repl_master_runid and repl_master_initial_offset are + * already populated. */ + if (psync_result == PSYNC_NOT_SUPPORTED) { + redisLog(REDIS_NOTICE,"Retrying with SYNC..."); + if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { + redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s", + strerror(errno)); + goto error; + } } /* Prepare a suitable temp file for bulk transfer */ @@ -733,6 +1191,7 @@ void slaveofCommand(redisClient *c) { sdsfree(server.masterhost); server.masterhost = NULL; if (server.master) freeClient(server.master); + replicationDiscardCachedMaster(); cancelReplicationHandshake(); server.repl_state = REDIS_REPL_NONE; redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)"); @@ -757,6 +1216,7 @@ void slaveofCommand(redisClient *c) { server.masterport = port; if (server.master) freeClient(server.master); disconnectSlaves(); /* Force our slaves to resync with us as well. */ + replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ cancelReplicationHandshake(); server.repl_state = REDIS_REPL_CONNECT; redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)", @@ -765,6 +1225,92 @@ void slaveofCommand(redisClient *c) { addReply(c,shared.ok); } +/* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */ + +/* In order to implement partial synchronization we need to be able to cache + * our master's client structure after a transient disconnection. + * It is cached into server.cached_master and flushed away using the following + * functions. */ + +/* This function is called by freeClient() in order to cache the master + * client structure instead of destryoing it. freeClient() will return + * ASAP after this function returns, so every action needed to avoid problems + * with a client that is really "suspended" has to be done by this function. + * + * The other functions that will deal with the cached master are: + * + * replicationDiscardCachedMaster() that will make sure to kill the client + * as for some reason we don't want to use it in the future. + * + * replicationResurrectCachedMaster() that is used after a successful PSYNC + * handshake in order to reactivate the cached master. + */ +void replicationCacheMaster(redisClient *c) { + listNode *ln; + + redisAssert(server.master != NULL && server.cached_master == NULL); + redisLog(REDIS_NOTICE,"Caching the disconnected master state."); + + /* Remove from the list of clients, we don't want this client to be + * listed by CLIENT LIST or processed in any way by batch operations. */ + ln = listSearchKey(server.clients,c); + redisAssert(ln != NULL); + listDelNode(server.clients,ln); + + /* Save the master. Server.master will be set to null later by + * replicationHandleMasterDisconnection(). */ + server.cached_master = server.master; + + /* Remove the event handlers and close the socket. We'll later reuse + * the socket of the new connection with the master during PSYNC. */ + aeDeleteFileEvent(server.el,c->fd,AE_READABLE); + aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); + close(c->fd); + + /* Set fd to -1 so that we can safely call freeClient(c) later. */ + c->fd = -1; + + /* Caching the master happens instead of the actual freeClient() call, + * so make sure to adjust the replication state. This function will + * also set server.master to NULL. */ + replicationHandleMasterDisconnection(); +} + +/* Free a cached master, called when there are no longer the conditions for + * a partial resync on reconnection. */ +void replicationDiscardCachedMaster(void) { + if (server.cached_master == NULL) return; + + redisLog(REDIS_NOTICE,"Discarding previously cached master state."); + server.cached_master->flags &= ~REDIS_MASTER; + freeClient(server.cached_master); + server.cached_master = NULL; +} + +/* Turn the cached master into the current master, using the file descriptor + * passed as argument as the socket for the new master. + * + * This funciton is called when successfully setup a partial resynchronization + * so the stream of data that we'll receive will start from were this + * master left. */ +void replicationResurrectCachedMaster(int newfd) { + server.master = server.cached_master; + server.cached_master = NULL; + server.master->fd = newfd; + server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP); + server.master->authenticated = 1; + server.master->lastinteraction = server.unixtime; + server.repl_state = REDIS_REPL_CONNECTED; + + /* Re-add to the list of clients. */ + listAddNodeTail(server.clients,server.master); + if (aeCreateFileEvent(server.el, newfd, AE_READABLE, + readQueryFromClient, server.master)) { + redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); + freeClientAsync(server.master); /* Close ASAP. */ + } +} + /* --------------------------- REPLICATION CRON ---------------------------- */ void replicationCron(void) { @@ -816,8 +1362,8 @@ void replicationCron(void) { replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); decrRefCount(ping_argv[0]); - /* Second, send a newline to all the slaves in pre-synchronization stage, - * that is, slaves waiting for the master to create the RDB file. + /* Second, send a newline to all the slaves in pre-synchronization + * stage, that is, slaves waiting for the master to create the RDB file. * The newline will be ignored by the slave but will refresh the * last-io timer preventing a timeout. */ listRewind(server.slaves,&li); @@ -832,4 +1378,19 @@ void replicationCron(void) { } } } + + /* If we have no attached slaves and there is a replication backlog + * using memory, free it after some (configured) time. */ + if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && + server.repl_backlog) + { + time_t idle = server.unixtime - server.repl_no_slaves_since; + + if (idle > server.repl_backlog_time_limit) { + freeReplicationBacklog(); + redisLog(REDIS_NOTICE, + "Replication backlog freed after %d seconds " + "without connected slaves.", server.repl_backlog_time_limit); + } + } }