Skip to content

Commit e4d2bb6

Browse files
oranagraantirez
authored andcommitted
Keep track of meaningful replication offset in replicas too
Now both master and replicas keep track of the last replication offset that contains meaningful data (ignoring the tailing pings), and both trim that tail from the replication backlog, and the offset with which they try to use for psync. the implication is that if someone missed some pings, or even have excessive pings that the promoted replica has, it'll still be able to psync (avoid full sync). the downside (which was already committed) is that replicas running old code may fail to psync, since the promoted replica trims pings form it's backlog. This commit adds a test that reproduces several cases of promotions and demotions with stale and non-stale pings Background: The mearningful offset on the master was added recently to solve a problem were the master is left all alone, injecting PINGs into it's backlog when no one is listening and then gets demoted and tries to replicate from a replica that didn't have any of the PINGs (or at least not the last ones). however, consider this case: master A has two replicas (B and C) replicating directly from it. there's no traffic at all, and also no network issues, just many pings in the tail of the backlog. now B gets promoted, A becomes a replica of B, and C remains a replica of A. when A gets demoted, it trims the pings from its backlog, and successfully replicate from B. however, C is still aware of these PINGs, when it'll disconnect and re-connect to A, it'll ask for something that's not in the backlog anymore (since A trimmed the tail of it's backlog), and be forced to do a full sync (something it didn't have to do before the meaningful offset fix). Besides that, the psync2 test was always failing randomly here and there, it turns out the reason were PINGs. Investigating it shows the following scenario: cycle 1: redis Snapchat#1 is master, and all the rest are direct replicas of Snapchat#1 cycle 2: redis Snapchat#2 is promoted to master, Snapchat#1 is a replica of Snapchat#2 and Snapchat#3 is replica of Snapchat#1 now we see that when Snapchat#1 is demoted it prints: 17339:S 21 Apr 2020 11:16:38.523 * Using the meaningful offset 3929963 instead of 3929977 to exclude the final PINGs (14 bytes difference) 17339:S 21 Apr 2020 11:16:39.391 * Trying a partial resynchronization (request e2b3f8817735fdfe5fa4626766daa938b61419e5:3929964). 17339:S 21 Apr 2020 11:16:39.392 * Successful partial resynchronization with master. and when Snapchat#3 connects to the demoted Snapchat#2, Snapchat#2 says: 17339:S 21 Apr 2020 11:16:40.084 * Partial resynchronization not accepted: Requested offset for secondary ID was 3929978, but I can reply up to 3929964 so the issue here is that the meaningful offset feature saved the day for the demoted master (since it needs to sync from a replica that didn't get the last ping), but it didn't help one of the other replicas which did get the last ping.
1 parent fea9788 commit e4d2bb6

File tree

5 files changed

+212
-92
lines changed

5 files changed

+212
-92
lines changed

src/blocked.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ void processUnblockedClients(void) {
110110
* the code is conceptually more correct this way. */
111111
if (!(c->flags & CLIENT_BLOCKED)) {
112112
if (c->querybuf && sdslen(c->querybuf) > 0) {
113-
processInputBufferAndReplicate(c);
113+
processInputBuffer(c);
114114
}
115115
}
116116
}

src/networking.c

Lines changed: 49 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1671,33 +1671,63 @@ int processMultibulkBuffer(client *c) {
16711671
return C_ERR;
16721672
}
16731673

1674+
/* Perform necessary tasks after a command was executed:
1675+
*
1676+
* 1. The client is reset unless there are reasons to avoid doing it.
1677+
* 2. In the case of master clients, the replication offset is updated.
1678+
* 3. Propagate commands we got from our master to replicas down the line. */
1679+
void commandProcessed(client *c) {
1680+
int cmd_is_ping = c->cmd && c->cmd->proc == pingCommand;
1681+
long long prev_offset = c->reploff;
1682+
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
1683+
/* Update the applied replication offset of our master. */
1684+
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
1685+
}
1686+
1687+
/* Don't reset the client structure for clients blocked in a
1688+
* module blocking command, so that the reply callback will
1689+
* still be able to access the client argv and argc field.
1690+
* The client will be reset in unblockClientFromModule(). */
1691+
if (!(c->flags & CLIENT_BLOCKED) ||
1692+
c->btype != BLOCKED_MODULE)
1693+
{
1694+
resetClient(c);
1695+
}
1696+
1697+
/* If the client is a master we need to compute the difference
1698+
* between the applied offset before and after processing the buffer,
1699+
* to understand how much of the replication stream was actually
1700+
* applied to the master state: this quantity, and its corresponding
1701+
* part of the replication stream, will be propagated to the
1702+
* sub-replicas and to the replication backlog. */
1703+
if (c->flags & CLIENT_MASTER) {
1704+
long long applied = c->reploff - prev_offset;
1705+
long long prev_master_repl_meaningful_offset = server.master_repl_meaningful_offset;
1706+
if (applied) {
1707+
replicationFeedSlavesFromMasterStream(server.slaves,
1708+
c->pending_querybuf, applied);
1709+
sdsrange(c->pending_querybuf,applied,-1);
1710+
}
1711+
/* The server.master_repl_meaningful_offset variable represents
1712+
* the offset of the replication stream without the pending PINGs. */
1713+
if (cmd_is_ping)
1714+
server.master_repl_meaningful_offset = prev_master_repl_meaningful_offset;
1715+
}
1716+
}
1717+
16741718
/* This function calls processCommand(), but also performs a few sub tasks
1675-
* that are useful in that context:
1719+
* for the client that are useful in that context:
16761720
*
16771721
* 1. It sets the current client to the client 'c'.
1678-
* 2. In the case of master clients, the replication offset is updated.
1679-
* 3. The client is reset unless there are reasons to avoid doing it.
1722+
* 2. calls commandProcessed() if the command was handled.
16801723
*
16811724
* The function returns C_ERR in case the client was freed as a side effect
16821725
* of processing the command, otherwise C_OK is returned. */
16831726
int processCommandAndResetClient(client *c) {
16841727
int deadclient = 0;
16851728
server.current_client = c;
16861729
if (processCommand(c) == C_OK) {
1687-
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
1688-
/* Update the applied replication offset of our master. */
1689-
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
1690-
}
1691-
1692-
/* Don't reset the client structure for clients blocked in a
1693-
* module blocking command, so that the reply callback will
1694-
* still be able to access the client argv and argc field.
1695-
* The client will be reset in unblockClientFromModule(). */
1696-
if (!(c->flags & CLIENT_BLOCKED) ||
1697-
c->btype != BLOCKED_MODULE)
1698-
{
1699-
resetClient(c);
1700-
}
1730+
commandProcessed(c);
17011731
}
17021732
if (server.current_client == NULL) deadclient = 1;
17031733
server.current_client = NULL;
@@ -1794,31 +1824,6 @@ void processInputBuffer(client *c) {
17941824
}
17951825
}
17961826

1797-
/* This is a wrapper for processInputBuffer that also cares about handling
1798-
* the replication forwarding to the sub-replicas, in case the client 'c'
1799-
* is flagged as master. Usually you want to call this instead of the
1800-
* raw processInputBuffer(). */
1801-
void processInputBufferAndReplicate(client *c) {
1802-
if (!(c->flags & CLIENT_MASTER)) {
1803-
processInputBuffer(c);
1804-
} else {
1805-
/* If the client is a master we need to compute the difference
1806-
* between the applied offset before and after processing the buffer,
1807-
* to understand how much of the replication stream was actually
1808-
* applied to the master state: this quantity, and its corresponding
1809-
* part of the replication stream, will be propagated to the
1810-
* sub-replicas and to the replication backlog. */
1811-
size_t prev_offset = c->reploff;
1812-
processInputBuffer(c);
1813-
size_t applied = c->reploff - prev_offset;
1814-
if (applied) {
1815-
replicationFeedSlavesFromMasterStream(server.slaves,
1816-
c->pending_querybuf, applied);
1817-
sdsrange(c->pending_querybuf,applied,-1);
1818-
}
1819-
}
1820-
}
1821-
18221827
void readQueryFromClient(connection *conn) {
18231828
client *c = connGetPrivateData(conn);
18241829
int nread, readlen;
@@ -1886,7 +1891,7 @@ void readQueryFromClient(connection *conn) {
18861891

18871892
/* There is more data in the client input buffer, continue parsing it
18881893
* in case to check if there is a full command to execute. */
1889-
processInputBufferAndReplicate(c);
1894+
processInputBuffer(c);
18901895
}
18911896

18921897
void getClientsMaxBuffers(unsigned long *longest_output_list,
@@ -3101,7 +3106,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
31013106
continue;
31023107
}
31033108
}
3104-
processInputBufferAndReplicate(c);
3109+
processInputBuffer(c);
31053110
}
31063111
return processed;
31073112
}

src/replication.c

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include <sys/socket.h>
4040
#include <sys/stat.h>
4141

42+
long long adjustMeaningfulReplOffset();
4243
void replicationDiscardCachedMaster(void);
4344
void replicationResurrectCachedMaster(connection *conn);
4445
void replicationSendAck(void);
@@ -2693,6 +2694,9 @@ void replicationCacheMaster(client *c) {
26932694
* pending outputs to the master. */
26942695
sdsclear(server.master->querybuf);
26952696
sdsclear(server.master->pending_querybuf);
2697+
/* Adjust reploff and read_reploff to the last meaningful offset we executed.
2698+
* this is the offset the replica will use for future PSYNC. */
2699+
server.master->reploff = adjustMeaningfulReplOffset();
26962700
server.master->read_reploff = server.master->reploff;
26972701
if (c->flags & CLIENT_MULTI) discardTransaction(c);
26982702
listEmpty(c->reply);
@@ -2717,33 +2721,15 @@ void replicationCacheMaster(client *c) {
27172721
replicationHandleMasterDisconnection();
27182722
}
27192723

2720-
/* This function is called when a master is turend into a slave, in order to
2721-
* create from scratch a cached master for the new client, that will allow
2722-
* to PSYNC with the slave that was promoted as the new master after a
2723-
* failover.
2724-
*
2725-
* Assuming this instance was previously the master instance of the new master,
2726-
* the new master will accept its replication ID, and potentiall also the
2727-
* current offset if no data was lost during the failover. So we use our
2728-
* current replication ID and offset in order to synthesize a cached master. */
2729-
void replicationCacheMasterUsingMyself(void) {
2730-
serverLog(LL_NOTICE,
2731-
"Before turning into a replica, using my own master parameters "
2732-
"to synthesize a cached master: I may be able to synchronize with "
2733-
"the new master with just a partial transfer.");
2734-
2735-
/* This will be used to populate the field server.master->reploff
2736-
* by replicationCreateMasterClient(). We'll later set the created
2737-
* master as server.cached_master, so the replica will use such
2738-
* offset for PSYNC. */
2739-
server.master_initial_offset = server.master_repl_offset;
2740-
2741-
/* However if the "meaningful" offset, that is the offset without
2742-
* the final PINGs in the stream, is different, use this instead:
2743-
* often when the master is no longer reachable, replicas will never
2744-
* receive the PINGs, however the master will end with an incremented
2745-
* offset because of the PINGs and will not be able to incrementally
2746-
* PSYNC with the new master. */
2724+
/* If the "meaningful" offset, that is the offset without the final PINGs
2725+
* in the stream, is different than the last offset, use it instead:
2726+
* often when the master is no longer reachable, replicas will never
2727+
* receive the PINGs, however the master will end with an incremented
2728+
* offset because of the PINGs and will not be able to incrementally
2729+
* PSYNC with the new master.
2730+
* This function trims the replication backlog when needed, and returns
2731+
* the offset to be used for future partial sync. */
2732+
long long adjustMeaningfulReplOffset() {
27472733
if (server.master_repl_offset > server.master_repl_meaningful_offset) {
27482734
long long delta = server.master_repl_offset -
27492735
server.master_repl_meaningful_offset;
@@ -2753,7 +2739,6 @@ void replicationCacheMasterUsingMyself(void) {
27532739
server.master_repl_meaningful_offset,
27542740
server.master_repl_offset,
27552741
delta);
2756-
server.master_initial_offset = server.master_repl_meaningful_offset;
27572742
server.master_repl_offset = server.master_repl_meaningful_offset;
27582743
if (server.repl_backlog_histlen <= delta) {
27592744
server.repl_backlog_histlen = 0;
@@ -2765,6 +2750,29 @@ void replicationCacheMasterUsingMyself(void) {
27652750
server.repl_backlog_size;
27662751
}
27672752
}
2753+
return server.master_repl_offset;
2754+
}
2755+
2756+
/* This function is called when a master is turend into a slave, in order to
2757+
* create from scratch a cached master for the new client, that will allow
2758+
* to PSYNC with the slave that was promoted as the new master after a
2759+
* failover.
2760+
*
2761+
* Assuming this instance was previously the master instance of the new master,
2762+
* the new master will accept its replication ID, and potentiall also the
2763+
* current offset if no data was lost during the failover. So we use our
2764+
* current replication ID and offset in order to synthesize a cached master. */
2765+
void replicationCacheMasterUsingMyself(void) {
2766+
serverLog(LL_NOTICE,
2767+
"Before turning into a replica, using my own master parameters "
2768+
"to synthesize a cached master: I may be able to synchronize with "
2769+
"the new master with just a partial transfer.");
2770+
2771+
/* This will be used to populate the field server.master->reploff
2772+
* by replicationCreateMasterClient(). We'll later set the created
2773+
* master as server.cached_master, so the replica will use such
2774+
* offset for PSYNC. */
2775+
server.master_initial_offset = adjustMeaningfulReplOffset();
27682776

27692777
/* The master client we create can be set to any DBID, because
27702778
* the new master will start its replication stream with SELECT. */

src/server.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1600,7 +1600,6 @@ void setDeferredSetLen(client *c, void *node, long length);
16001600
void setDeferredAttributeLen(client *c, void *node, long length);
16011601
void setDeferredPushLen(client *c, void *node, long length);
16021602
void processInputBuffer(client *c);
1603-
void processInputBufferAndReplicate(client *c);
16041603
void processGopherRequest(client *c);
16051604
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
16061605
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);

0 commit comments

Comments
 (0)