Skip to content

Commit

Permalink
Streams: propagate lastid in XCLAIM when it has effect
Browse files Browse the repository at this point in the history
  • Loading branch information
soloestoy committed Oct 11, 2018
1 parent 183ef7a commit 08ae522
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions src/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -2075,6 +2075,8 @@ void xclaimCommand(client *c) {
/* If we stopped because some IDs cannot be parsed, perhaps they
* are trailing options. */
time_t now = mstime();
streamID last_id = {0,0};
int lastid_updated = 0;
for (; j < c->argc; j++) {
int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
char *opt = c->argv[j]->ptr;
Expand All @@ -2100,18 +2102,18 @@ void xclaimCommand(client *c) {
!= C_OK) return;
} else if (!strcasecmp(opt,"LASTID") && moreargs) {
j++;
streamID id;
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
/* Technically it could be more correct to update that only after
* checking for syntax errors, but this option is only used by
* the replication command that outputs correct syntax. */
if (streamCompareID(&id,&group->last_id) > 0) group->last_id = id;
if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) return;
} else {
addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
return;
}
}

if (streamCompareID(&last_id,&group->last_id) > 0) {
group->last_id = last_id;
lastid_updated = 1;
}

if (deliverytime != -1) {
/* If a delivery time was passed, either with IDLE or TIME, we
* do some sanity check on it, and set the deliverytime to now
Expand All @@ -2132,6 +2134,7 @@ void xclaimCommand(client *c) {
streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1);
void *arraylenptr = addDeferredMultiBulkLength(c);
size_t arraylen = 0;
long long dirty = server.dirty;
for (int j = 5; j <= last_id_arg; j++) {
streamID id;
unsigned char buf[sizeof(streamID)];
Expand Down Expand Up @@ -2198,6 +2201,10 @@ void xclaimCommand(client *c) {
server.dirty++;
}
}
if (server.dirty == dirty && lastid_updated) {
streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
server.dirty++;
}
setDeferredMultiBulkLength(c,arraylenptr,arraylen);
preventCommandPropagation(c);
}
Expand Down

0 comments on commit 08ae522

Please sign in to comment.