Skip to content

Commit

Permalink
fix: 'xgroup help' should show help message (dragonflydb#1159)
Browse files Browse the repository at this point in the history
Along the way, performs small cleanups in command handling code.
XGROUP HELP is special because it falls out of Dragonfly command taxonomy design,
where a command name determines where its key is located. All other XGROUP subcommands
expect to see XGROUP <subcmd> <key> and this one obviously does not need any key.
I fix it by working around the issue and introduce a dedicated dummy command for this combination.

Fixes dragonflydb#854.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Apr 30, 2023
1 parent de0b733 commit 418f529
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 72 deletions.
28 changes: 20 additions & 8 deletions src/server/command_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ bool CommandId::IsTransactional() const {
if (first_key_ > 0 || (opt_mask_ & CO::GLOBAL_TRANS) || (opt_mask_ & CO::NO_KEY_JOURNAL))
return true;

string_view name{name_};
if (name == "EVAL" || name == "EVALSHA" || name == "EXEC")
if (name_ == "EVAL" || name_ == "EVALSHA" || name_ == "EXEC")
return true;

return false;
Expand All @@ -44,30 +43,41 @@ uint32_t CommandId::OptCount(uint32_t mask) {
}

CommandRegistry::CommandRegistry() {
CommandId cd("COMMAND", CO::LOADING | CO::NOSCRIPT, -1, 0, 0, 0);
static const char kCMD[] = "COMMAND";
CommandId cd(kCMD, CO::LOADING | CO::NOSCRIPT, -1, 0, 0, 0);

cd.SetHandler([this](const auto& args, auto* cntx) { return Command(args, cntx); });

const char* nm = cd.name();
cmd_map_.emplace(nm, std::move(cd));
cmd_map_.emplace(kCMD, std::move(cd));
}

void CommandRegistry::Command(CmdArgList args, ConnectionContext* cntx) {
unsigned cmd_cnt = 0;
for (const auto& val : cmd_map_) {
const CommandId& cd = val.second;
if (cd.opt_mask() & CO::HIDDEN)
continue;

++cmd_cnt;
}

if (args.size() > 0) {
ToUpper(&args[0]);
string_view subcmd = ArgS(args, 0);
if (subcmd == "COUNT") {
return (*cntx)->SendLong(cmd_map_.size());
return (*cntx)->SendLong(cmd_cnt);
} else {
return (*cntx)->SendError(kSyntaxErr, kSyntaxErrType);
}
}
size_t len = cmd_map_.size();

(*cntx)->StartArray(len);
(*cntx)->StartArray(cmd_cnt);

for (const auto& val : cmd_map_) {
const CommandId& cd = val.second;
if (cd.opt_mask() & CO::HIDDEN)
continue;

(*cntx)->StartArray(6);
(*cntx)->SendSimpleString(cd.name());
(*cntx)->SendLong(cd.arity());
Expand Down Expand Up @@ -118,6 +128,8 @@ const char* OptName(CO::CommandOpt fl) {
return "noscript";
case BLOCKING:
return "blocking";
case HIDDEN:
return "hidden";
case GLOBAL_TRANS:
return "global-trans";
case VARIADIC_KEYS:
Expand Down
9 changes: 7 additions & 2 deletions src/server/command_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ enum CommandOpt : uint32_t {
ADMIN = 1U << 7, // implies NOSCRIPT,
NOSCRIPT = 1U << 8,
BLOCKING = 1U << 9, // implies REVERSE_MAPPING
HIDDEN = 1U << 10, // does not show in COMMAND command output
GLOBAL_TRANS = 1U << 12,

NO_AUTOJOURNAL = 1U << 15, // Skip automatically logging command to journal inside transaction.
Expand All @@ -43,6 +44,10 @@ constexpr inline bool IsEvalKind(std::string_view name) {
return name.compare(0, 4, "EVAL") == 0;
}

constexpr inline bool IsTransKind(std::string_view name) {
return (name == "EXEC") || (name == "MULTI") || (name == "DISCARD");
}

static_assert(IsEvalKind("EVAL") && IsEvalKind("EVALSHA"));
static_assert(!IsEvalKind(""));

Expand Down Expand Up @@ -78,7 +83,7 @@ class CommandId {
CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first_key, int8_t last_key,
int8_t step);

const char* name() const {
std::string_view name() const {
return name_;
}

Expand Down Expand Up @@ -132,7 +137,7 @@ class CommandId {
static uint32_t OptCount(uint32_t mask);

private:
const char* name_;
std::string_view name_;

uint32_t opt_mask_;
int8_t arity_;
Expand Down
85 changes: 47 additions & 38 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ extern "C" {
#include <absl/cleanup/cleanup.h>
#include <absl/functional/bind_front.h>
#include <absl/strings/ascii.h>
#include <absl/strings/match.h>
#include <absl/strings/str_format.h>
#include <xxhash.h>

Expand Down Expand Up @@ -192,14 +193,14 @@ void SendMonitor(const std::string& msg) {
}
}

void DispatchMonitorIfNeeded(bool admin_cmd, ConnectionContext* connection, CmdArgList args) {
void DispatchMonitorIfNeeded(bool admin_cmd, ConnectionContext* cntx, CmdArgList args) {
// We are not sending any admin command in the monitor, and we do not want to
// do any processing if we don't have any waiting connections with monitor
// enabled on them - see https://redis.io/commands/monitor/
const auto& my_monitors = ServerState::tlocal()->Monitors();
if (!(my_monitors.Empty() || admin_cmd)) {
// We have connections waiting to get the info on the last command, send it to them
auto monitor_msg = MakeMonitorMessage(connection->conn_state, connection->owner(), args);
auto monitor_msg = MakeMonitorMessage(cntx->conn_state, cntx->owner(), args);

VLOG(1) << "sending command '" << monitor_msg << "' to the clients that registered on it";

Expand Down Expand Up @@ -583,12 +584,6 @@ void Service::Shutdown() {
ThisFiber::SleepFor(10ms);
}

static void SetMultiExecErrorFlag(ConnectionContext* cntx) {
if (cntx->conn_state.exec_info.IsActive()) {
cntx->conn_state.exec_info.state = ConnectionState::ExecInfo::EXEC_ERROR;
}
}

// Return OK if all keys are allowed to be accessed: either declared in EVAL or
// transaction is running in global or non-atomic mode.
OpStatus CheckKeysDeclared(const ConnectionState::ScriptInfo& eval_info, const CommandId* cid,
Expand Down Expand Up @@ -619,52 +614,53 @@ OpStatus CheckKeysDeclared(const ConnectionState::ScriptInfo& eval_info, const C
return OpStatus::OK;
}

bool Service::VerifyCommand(const CommandId* cid, CmdArgList args,
facade::ConnectionContext* cntx) {
bool Service::VerifyCommand(const CommandId* cid, CmdArgList args, ConnectionContext* dfly_cntx) {
ServerState& etl = *ServerState::tlocal();

string_view cmd_str = ArgS(args, 0);
ConnectionContext* dfly_cntx = static_cast<ConnectionContext*>(cntx);

bool is_trans_cmd = (cmd_str == "EXEC" || cmd_str == "MULTI" || cmd_str == "DISCARD");
bool under_script = bool(dfly_cntx->conn_state.script_info);

absl::Cleanup multi_error([dfly_cntx] { SetMultiExecErrorFlag(dfly_cntx); });
absl::Cleanup multi_error([exec_info = &dfly_cntx->conn_state.exec_info] {
if (exec_info->IsActive()) {
exec_info->state = ConnectionState::ExecInfo::EXEC_ERROR;
}
});

if (cid == nullptr) {
(*cntx)->SendError(StrCat("unknown command `", cmd_str, "`"), "unknown_cmd");
(*dfly_cntx)->SendError(StrCat("unknown command `", cmd_str, "`"), "unknown_cmd");

lock_guard lk(mu_);
if (unknown_cmds_.size() < 1024)
unknown_cmds_[cmd_str]++;
return false;
}

bool blocked_by_loading = !cntx->journal_emulated && etl.gstate() == GlobalState::LOADING &&
bool is_trans_cmd = CO::IsTransKind(cid->name());
bool under_script = bool(dfly_cntx->conn_state.script_info);
bool blocked_by_loading = !dfly_cntx->journal_emulated && etl.gstate() == GlobalState::LOADING &&
(cid->opt_mask() & CO::LOADING) == 0;
if (blocked_by_loading || etl.gstate() == GlobalState::SHUTTING_DOWN) {
string err = StrCat("Can not execute during ", GlobalStateName(etl.gstate()));
(*cntx)->SendError(err);
(*dfly_cntx)->SendError(err);
return false;
}

string_view cmd_name{cid->name()};

if (cntx->req_auth && !cntx->authenticated) {
if (dfly_cntx->req_auth && !dfly_cntx->authenticated) {
if (cmd_name != "AUTH" && cmd_name != "QUIT" && cmd_name != "HELLO") {
(*cntx)->SendError("-NOAUTH Authentication required.");
(*dfly_cntx)->SendError("-NOAUTH Authentication required.");
return false;
}
}

// only reset and quit are allow if this connection is used for monitoring
if (dfly_cntx->monitor && (cmd_name != "RESET" && cmd_name != "QUIT")) {
(*cntx)->SendError("Replica can't interact with the keyspace");
(*dfly_cntx)->SendError("Replica can't interact with the keyspace");
return false;
}

if (under_script && (cid->opt_mask() & CO::NOSCRIPT)) {
(*cntx)->SendError("This Redis command is not allowed from script");
(*dfly_cntx)->SendError("This Redis command is not allowed from script");
return false;
}

Expand All @@ -673,18 +669,18 @@ bool Service::VerifyCommand(const CommandId* cid, CmdArgList args,
bool under_multi = dfly_cntx->conn_state.exec_info.IsActive() && !is_trans_cmd;

if (!etl.is_master && is_write_cmd && !dfly_cntx->is_replicating) {
(*cntx)->SendError("-READONLY You can't write against a read only replica.");
(*dfly_cntx)->SendError("-READONLY You can't write against a read only replica.");
return false;
}

if ((cid->arity() > 0 && args.size() != size_t(cid->arity())) ||
(cid->arity() < 0 && args.size() < size_t(-cid->arity()))) {
(*cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErrType);
(*dfly_cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErrType);
return false;
}

if (cid->key_arg_step() == 2 && (args.size() % 2) == 0) {
(*cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErrType);
(*dfly_cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErrType);
return false;
}

Expand All @@ -695,12 +691,12 @@ bool Service::VerifyCommand(const CommandId* cid, CmdArgList args,

if (under_multi) {
if (cmd_name == "SELECT") {
(*cntx)->SendError("Can not call SELECT within a transaction");
(*dfly_cntx)->SendError("Can not call SELECT within a transaction");
return false;
}

if (cmd_name == "WATCH" || cmd_name == "FLUSHALL" || cmd_name == "FLUSHDB") {
(*cntx)->SendError(absl::StrCat("'", cmd_name, "' inside MULTI is not allowed"));
(*dfly_cntx)->SendError(absl::StrCat("'", cmd_name, "' inside MULTI is not allowed"));
return false;
}
}
Expand All @@ -710,12 +706,12 @@ bool Service::VerifyCommand(const CommandId* cid, CmdArgList args,
dfly_cntx->transaction);

if (status == OpStatus::KEY_NOTFOUND) {
(*cntx)->SendError("script tried accessing undeclared key");
(*dfly_cntx)->SendError("script tried accessing undeclared key");
return false;
}

if (status != OpStatus::OK) {
(*cntx)->SendError(status);
(*dfly_cntx)->SendError(status);
return false;
}
}
Expand All @@ -740,21 +736,20 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
<< " in dbid=" << dfly_cntx->conn_state.db_index;
}

string_view cmd_str = ArgS(args, 0);
bool is_trans_cmd = (cmd_str == "EXEC" || cmd_str == "MULTI" || cmd_str == "DISCARD");
const CommandId* cid = registry_.Find(cmd_str);
const CommandId* cid = FindCmd(args);
ServerState& etl = *ServerState::tlocal();

etl.RecordCmd();

if (!VerifyCommand(cid, args, cntx))
if (!VerifyCommand(cid, args, dfly_cntx))
return;

bool is_trans_cmd = CO::IsTransKind(cid->name());
etl.connection_stats.cmd_count_map[cid->name()]++;

auto args_no_cmd = args.subspan(1);
if (dfly_cntx->conn_state.exec_info.IsActive() && !is_trans_cmd) {
// TODO: protect against aggregating huge transactions.
StoredCmd stored_cmd{cid, args.subspan(1)};
StoredCmd stored_cmd{cid, args_no_cmd};
dfly_cntx->conn_state.exec_info.body.push_back(std::move(stored_cmd));

return (*cntx)->SendSimpleString("QUEUED");
Expand All @@ -771,7 +766,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
if (cid->IsTransactional()) {
dfly_cntx->transaction->MultiSwitchCmd(cid);
OpStatus status =
dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args.subspan(1));
dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args_no_cmd);

if (status != OpStatus::OK)
return (*cntx)->SendError(status);
Expand All @@ -783,7 +778,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
dist_trans.reset(new Transaction{cid, etl.thread_index()});

if (!dist_trans->IsMulti()) { // Multi command initialize themself based on their mode.
if (auto st = dist_trans->InitByArgs(dfly_cntx->conn_state.db_index, args.subspan(1));
if (auto st = dist_trans->InitByArgs(dfly_cntx->conn_state.db_index, args_no_cmd);
st != OpStatus::OK)
return (*cntx)->SendError(st);
}
Expand All @@ -807,7 +802,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
}

end_usec = ProactorBase::GetMonotonicTimeNs();
request_latency_usec.IncBy(cmd_str, (end_usec - start_usec) / 1000);
request_latency_usec.IncBy(cid->name(), (end_usec - start_usec) / 1000);

if (!under_script) {
dfly_cntx->transaction = nullptr;
Expand Down Expand Up @@ -1216,6 +1211,20 @@ bool StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptParams param
return false;
}

const CommandId* Service::FindCmd(CmdArgList args) const {
const CommandId* res = registry_.Find(ArgS(args, 0));
if (!res)
return nullptr;

// A workaround for XGROUP HELP that does not fit our static taxonomy of commands.
if (args.size() == 2 && res->name() == "XGROUP") {
if (absl::EqualsIgnoreCase(ArgS(args, 1), "HELP")) {
res = registry_.Find("_XGROUP_HELP");
}
}
return res;
}

void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
ConnectionContext* cntx) {
DCHECK(!eval_args.sha.empty());
Expand Down
4 changes: 3 additions & 1 deletion src/server/main_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ class Service : public facade::ServiceInterface {
};

// Return false if command is invalid and reply with error.
bool VerifyCommand(const CommandId* cid, CmdArgList args, facade::ConnectionContext* cntx);
bool VerifyCommand(const CommandId* cid, CmdArgList args, ConnectionContext* cntx);

const CommandId* FindCmd(CmdArgList args) const;

void EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, ConnectionContext* cntx);

Expand Down
2 changes: 1 addition & 1 deletion src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2158,7 +2158,7 @@ void ServerFamily::Register(CommandRegistry* registry) {
<< CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0}.HFUNC(Role)
<< CI{"SLOWLOG", CO::ADMIN | CO::FAST, -2, 0, 0, 0}.SetHandler(SlowLog)
<< CI{"SCRIPT", CO::NOSCRIPT | CO::NO_KEY_JOURNAL, -2, 0, 0, 0}.HFUNC(Script)
<< CI{"DFLY", CO::ADMIN | CO::GLOBAL_TRANS, -2, 0, 0, 0}.HFUNC(Dfly);
<< CI{"DFLY", CO::ADMIN | CO::GLOBAL_TRANS | CO::HIDDEN, -2, 0, 0, 0}.HFUNC(Dfly);
}

} // namespace dfly
Loading

0 comments on commit 418f529

Please sign in to comment.