Skip to content

Commit

Permalink
Implement basic XADD, XRANGE, XLEN commands.
Browse files Browse the repository at this point in the history
Covers most funvtionality for dragonflydb#59.
  • Loading branch information
romange committed Jun 8, 2022
1 parent 819b2f0 commit a9c0fa8
Show file tree
Hide file tree
Showing 16 changed files with 342 additions and 67 deletions.
7 changes: 6 additions & 1 deletion src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ extern "C" {
#include "redis/intset.h"
#include "redis/listpack.h"
#include "redis/object.h"
#include "redis/stream.h"
#include "redis/util.h"
#include "redis/zmalloc.h" // for non-string objects.
#include "redis/zset.h"
Expand Down Expand Up @@ -128,6 +129,10 @@ inline void FreeObjZset(unsigned encoding, void* ptr) {
}
}

inline void FreeObjStream(void* ptr) {
freeStream((stream*)ptr);
}

// Deniel's Lemire function validate_ascii_fast() - under Apache/MIT license.
// See https://github.com/lemire/fastvalidate-utf-8/
// The function returns true (1) if all chars passed in src are
Expand Down Expand Up @@ -282,7 +287,7 @@ void RobjWrapper::Free(pmr::memory_resource* mr) {
LOG(FATAL) << "Unsupported OBJ_MODULE type";
break;
case OBJ_STREAM:
LOG(FATAL) << "Unsupported OBJ_STREAM type";
FreeObjStream(inner_obj_);
break;
default:
LOG(FATAL) << "Unknown object type";
Expand Down
19 changes: 19 additions & 0 deletions src/core/compact_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ extern "C" {
#include "redis/intset.h"
#include "redis/object.h"
#include "redis/redis_aux.h"
#include "redis/stream.h"
#include "redis/zmalloc.h"
}

Expand Down Expand Up @@ -240,4 +241,22 @@ TEST_F(CompactObjectTest, FlatSet) {
EXPECT_LT(fs_used + 8 * kTestSize, dict_used);
}

TEST_F(CompactObjectTest, StreamObj) {
robj* stream_obj = createStreamObject();
stream* sm = (stream*)stream_obj->ptr;
robj* item[2];
item[0] = createStringObject("FIELD", 5);
item[1] = createStringObject("VALUE", 5);
ASSERT_EQ(C_OK, streamAppendItem(sm, item, 1, NULL, NULL, 0));

decrRefCount(item[0]);
decrRefCount(item[1]);

cobj_.ImportRObj(stream_obj);

EXPECT_EQ(OBJ_STREAM, cobj_.ObjType());
EXPECT_EQ(OBJ_ENCODING_STREAM, cobj_.Encoding());
EXPECT_FALSE(cobj_.IsInline());
}

} // namespace dfly
1 change: 0 additions & 1 deletion src/redis/rax.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
* it must be compressed back into a single node.
*
*/

#define RAX_NODE_MAX_SIZE ((1<<29)-1)
typedef struct raxNode {
uint32_t iskey:1; /* Does this node contain a key? */
Expand Down
3 changes: 2 additions & 1 deletion src/redis/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "object.h"
#include "listpack.h"


/* Stream item ID: a 128 bit number composed of a milliseconds time and
* a sequence counter. IDs generated in the same millisecond (or in a past
* millisecond if the clock jumped backward) will use the millisecond time
Expand All @@ -16,7 +17,7 @@ typedef struct streamID {
} streamID;

typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
rax *rax_tree; /* The radix tree holding the stream. */
uint64_t length; /* Current number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
streamID first_id; /* The first non-tombstone entry, zero if empty. */
Expand Down
32 changes: 16 additions & 16 deletions src/redis/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq)
/* Create a new stream data structure. */
stream *streamNew(void) {
stream *s = zmalloc(sizeof(*s));
s->rax = raxNew();
s->rax_tree = raxNew();
s->length = 0;
s->first_id.ms = 0;
s->first_id.seq = 0;
Expand All @@ -90,7 +90,7 @@ stream *streamNew(void) {

/* Free a stream, including the listpacks stored inside the radix tree. */
void freeStream(stream *s) {
raxFreeWithCallback(s->rax,(void(*)(void*))lpFree);
raxFreeWithCallback(s->rax_tree,(void(*)(void*))lpFree);
if (s->cgroups)
raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG);
zfree(s);
Expand Down Expand Up @@ -183,7 +183,7 @@ robj *streamDup(robj *o) {

raxIterator ri;
uint64_t rax_key[2];
raxStart(&ri, s->rax);
raxStart(&ri, s->rax_tree);
raxSeek(&ri, "^", NULL, 0);
size_t lp_bytes = 0; /* Total bytes in the listpack. */
unsigned char *lp = NULL; /* listpack pointer. */
Expand All @@ -194,7 +194,7 @@ robj *streamDup(robj *o) {
unsigned char *new_lp = zmalloc(lp_bytes);
memcpy(new_lp, lp, lp_bytes);
memcpy(rax_key, ri.key, sizeof(rax_key));
raxInsert(new_s->rax, (unsigned char *)&rax_key, sizeof(rax_key),
raxInsert(new_s->rax_tree, (unsigned char *)&rax_key, sizeof(rax_key),
new_lp, NULL);
}
new_s->length = s->length;
Expand Down Expand Up @@ -482,7 +482,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_

/* Add the new entry. */
raxIterator ri;
raxStart(&ri,s->rax);
raxStart(&ri,s->rax_tree);
raxSeek(&ri,"$",NULL,0);

size_t lp_bytes = 0; /* Total bytes in the tail listpack. */
Expand Down Expand Up @@ -550,7 +550,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
/* Shrink extra pre-allocated memory */
lp = lpShrinkToFit(lp);
if (ri.data != lp)
raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
raxInsert(s->rax_tree,ri.key,ri.key_len,lp,NULL);
lp = NULL;
}
}
Expand Down Expand Up @@ -579,7 +579,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
}
lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
raxInsert(s->rax_tree,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
/* The first entry we insert, has obviously the same fields of the
* master entry. */
flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
Expand Down Expand Up @@ -664,7 +664,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_

/* Insert back into the tree in order to update the listpack pointer. */
if (ri.data != lp)
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
raxInsert(s->rax_tree,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
s->length++;
s->entries_added++;
s->last_id = id;
Expand Down Expand Up @@ -731,7 +731,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
return 0;

raxIterator ri;
raxStart(&ri,s->rax);
raxStart(&ri,s->rax_tree);
raxSeek(&ri,"^",NULL,0);

int64_t deleted = 0;
Expand Down Expand Up @@ -765,7 +765,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {

if (remove_node) {
lpFree(lp);
raxRemove(s->rax,ri.key,ri.key_len,NULL);
raxRemove(s->rax_tree,ri.key,ri.key_len,NULL);
raxSeek(&ri,">=",ri.key,ri.key_len);
s->length -= entries;
deleted += entries;
Expand Down Expand Up @@ -863,7 +863,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
}

/* Update the listpack with the new pointer. */
raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
raxInsert(s->rax_tree,ri.key,ri.key_len,lp,NULL);

break; /* If we are here, there was enough to delete in the current
node, so no need to go to the next node. */
Expand Down Expand Up @@ -1085,7 +1085,7 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
}

/* Seek the correct node in the radix tree. */
raxStart(&si->ri,s->rax);
raxStart(&si->ri,s->rax_tree);
if (!rev) {
if (start && (start->ms || start->seq)) {
raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
Expand Down Expand Up @@ -1302,7 +1302,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
/* If this is the last element in the listpack, we can remove the whole
* node. */
lpFree(lp);
raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL);
raxRemove(si->stream->rax_tree,si->ri.key,si->ri.key_len,NULL);
} else {
/* In the base case we alter the counters of valid/deleted entries. */
lp = lpReplaceInteger(lp,&p,aux-1);
Expand All @@ -1312,7 +1312,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {

/* Update the listpack with the new pointer. */
if (si->lp != lp)
raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL);
raxInsert(si->stream->rax_tree,si->ri.key,si->ri.key_len,lp,NULL);
}

/* Update the number of entries counter. */
Expand Down Expand Up @@ -3668,9 +3668,9 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
addReplyBulkCString(c,"length");
addReplyLongLong(c,s->length);
addReplyBulkCString(c,"radix-tree-keys");
addReplyLongLong(c,raxSize(s->rax));
addReplyLongLong(c,raxSize(s->rax_tree));
addReplyBulkCString(c,"radix-tree-nodes");
addReplyLongLong(c,s->rax->numnodes);
addReplyLongLong(c,s->rax_tree->numnodes);
addReplyBulkCString(c,"last-generated-id");
addReplyStreamID(c,&s->last_id);
addReplyBulkCString(c,"max-deleted-entry-id");
Expand Down
2 changes: 2 additions & 0 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ const char* RdbTypeName(unsigned type) {
return "zset";
case RDB_TYPE_HASH:
return "hash";
case RDB_TYPE_STREAM_LISTPACKS:
return "stream";
}
return "other";
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ void DebugCmd::Run(CmdArgList args) {

string reply = absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd,
"'. Try DEBUG HELP.");
return (*cntx_)->SendError(reply, kSyntaxErr);
return (*cntx_)->SendError(reply, kSyntaxErrType);
}

void DebugCmd::Reload(CmdArgList args) {
Expand Down
1 change: 1 addition & 0 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ TEST_F(DflyEngineTest, PSubscribe) {
EXPECT_EQ("ab", msg.channel);
EXPECT_EQ("a*", msg.pattern);
}

TEST_F(DflyEngineTest, Unsubscribe) {
auto resp = Run({"unsubscribe", "a"});
EXPECT_THAT(resp.GetVec(), ElementsAre("unsubscribe", "a", IntArg(0)));
Expand Down
2 changes: 1 addition & 1 deletion src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) {

void GenericFamily::Ping(CmdArgList args, ConnectionContext* cntx) {
if (args.size() > 2) {
return (*cntx)->SendError(facade::WrongNumArgsError("ping"), kSyntaxErr);
return (*cntx)->SendError(facade::WrongNumArgsError("ping"), kSyntaxErrType);
}

// We synchronously block here until the engine sends us the payload and notifies that
Expand Down
2 changes: 1 addition & 1 deletion src/server/hset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) {
string_view cmd = ArgS(args, 0);

if (args.size() % 2 != 0) {
return (*cntx)->SendError(facade::WrongNumArgsError(cmd), kSyntaxErr);
return (*cntx)->SendError(facade::WrongNumArgsError(cmd), kSyntaxErrType);
}

args.remove_prefix(2);
Expand Down
13 changes: 7 additions & 6 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ extern "C" {
#include "server/script_mgr.h"
#include "server/server_state.h"
#include "server/set_family.h"
#include "server/stream_family.h"
#include "server/string_family.h"
#include "server/transaction.h"
#include "server/version.h"
Expand Down Expand Up @@ -61,8 +62,7 @@ namespace fibers = ::boost::fibers;
namespace this_fiber = ::boost::this_fiber;
using absl::GetFlag;
using absl::StrCat;
using facade::MCReplyBuilder;
using facade::RedisReplyBuilder;
using namespace facade;

namespace {

Expand Down Expand Up @@ -293,7 +293,7 @@ bool EvalValidator(CmdArgList args, ConnectionContext* cntx) {
}

if (unsigned(num_keys) > args.size() - 3) {
(*cntx)->SendError("Number of keys can't be greater than number of args", kSyntaxErr);
(*cntx)->SendError("Number of keys can't be greater than number of args", kSyntaxErrType);
return false;
}

Expand Down Expand Up @@ -424,11 +424,11 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)

if ((cid->arity() > 0 && args.size() != size_t(cid->arity())) ||
(cid->arity() < 0 && args.size() < size_t(-cid->arity()))) {
return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErr);
return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErrType);
}

if (cid->key_arg_step() == 2 && (args.size() % 2) == 0) {
return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErr);
return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErrType);
}

// Validate more complicated cases with custom validators.
Expand Down Expand Up @@ -994,7 +994,7 @@ void Service::Function(CmdArgList args, ConnectionContext* cntx) {
}

string err = StrCat("Unknown subcommand '", sub_cmd, "'. Try FUNCTION HELP.");
return (*cntx)->SendError(err, kSyntaxErr);
return (*cntx)->SendError(err, kSyntaxErrType);
}

VarzValue::Map Service::GetVarzStats() {
Expand Down Expand Up @@ -1052,6 +1052,7 @@ void Service::RegisterCommands() {
<< CI{"PUNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(PUnsubscribe)
<< CI{"FUNCTION", CO::NOSCRIPT, 2, 0, 0, 0}.MFUNC(Function);

StreamFamily::Register(&registry_);
StringFamily::Register(&registry_);
GenericFamily::Register(&registry_);
ListFamily::Register(&registry_);
Expand Down
2 changes: 1 addition & 1 deletion src/server/script_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void ScriptMgr::Run(CmdArgList args, ConnectionContext* cntx) {

string err = absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd,
"'. Try SCRIPT HELP.");
cntx->reply_builder()->SendError(err, kSyntaxErr);
cntx->reply_builder()->SendError(err, kSyntaxErrType);
}

bool ScriptMgr::InsertFunction(std::string_view id, std::string_view body) {
Expand Down
Loading

0 comments on commit a9c0fa8

Please sign in to comment.