Skip to content

Commit

Permalink
Use element rank instead of iterating in ZCOUNT
Browse files Browse the repository at this point in the history
  • Loading branch information
pietern committed Oct 3, 2011
1 parent 13c7e5e commit 62d774e
Showing 1 changed file with 74 additions and 1 deletion.
75 changes: 74 additions & 1 deletion src/t_zset.c
Original file line number Diff line number Diff line change
Expand Up @@ -1943,7 +1943,80 @@ void zrevrangebyscoreCommand(redisClient *c) {
}

void zcountCommand(redisClient *c) {
genericZrangebyscoreCommand(c,0,1);
robj *key = c->argv[1];
robj *zobj;
zrangespec range;
int count = 0;

/* Parse the range arguments */
if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
addReplyError(c,"min or max is not a double");
return;
}

/* Lookup the sorted set */
if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
checkType(c, zobj, REDIS_ZSET)) return;

if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
double score;

/* Use the first element in range as the starting point */
eptr = zzlFirstInRange(zl,range);

/* No "first" element */
if (eptr == NULL) {
addReply(c, shared.czero);
return;
}

/* First element is in range */
sptr = ziplistNext(zl,eptr);
score = zzlGetScore(sptr);
redisAssert(zslValueLteMax(score,&range));

/* Iterate over elements in range */
while (eptr) {
score = zzlGetScore(sptr);

/* Abort when the node is no longer in range. */
if (!zslValueLteMax(score,&range)) {
break;
} else {
count++;
zzlNext(zl,&eptr,&sptr);
}
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *zn;
unsigned long rank;

/* Find first element in range */
zn = zslFirstInRange(zsl, range);

/* Use rank of first element, if any, to determine preliminary count */
if (zn != NULL) {
rank = zslGetRank(zsl, zn->score, zn->obj);
count = (zsl->length - (rank - 1));

/* Find last element in range */
zn = zslLastInRange(zsl, range);

/* Use rank of last element, if any, to determine the actual count */
if (zn != NULL) {
rank = zslGetRank(zsl, zn->score, zn->obj);
count -= (zsl->length - rank);
}
}
} else {
redisPanic("Unknown sorted set encoding");
}

addReplyLongLong(c, count);
}

void zcardCommand(redisClient *c) {
Expand Down

0 comments on commit 62d774e

Please sign in to comment.