Skip to content

Commit

Permalink
feat(server): Allow configuration of hashtag extraction (dragonflydb#…
Browse files Browse the repository at this point in the history
…2890)

* feat(server): Allow configuration of hashtag extraction

Before this PR: Hashtags, if enabled, meant the text between `{` and `}`
in the key (if exists and non-empty).

After this PR:

* Hashtags can _still_ be enabled / disabled
* Hashtag open / close char can be specified (and can be the same), like `:` popular with BullMQ
* Hashtag can include `N` closing tags to skip, like `{a}b}c}d` with `2` will return `a}b}c`.

This will allow some existing systems to migrate without code changes in
client code.
  • Loading branch information
chakaz authored Apr 14, 2024
1 parent f6f7cfb commit 256e07f
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 31 deletions.
21 changes: 17 additions & 4 deletions src/server/cluster/cluster_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ extern "C" {
#include <shared_mutex>
#include <string_view>

#include "absl/strings/match.h"
#include "base/flags.h"
#include "base/logging.h"
#include "cluster_config.h"
Expand Down Expand Up @@ -55,14 +56,26 @@ bool ClusterConfig::IsEmulated() {
}

string_view ClusterConfig::KeyTag(string_view key) {
size_t start = key.find('{');
if (start == key.npos) {
auto options = KeyLockArgs::GetLockTagOptions();

if (!absl::StartsWith(key, options.prefix)) {
return key;
}
size_t end = key.find('}', start + 1);
if (end == key.npos || end == start + 1) {

const size_t start = key.find(options.open_locktag);
if (start == key.npos) {
return key;
}

size_t end = start;
for (unsigned i = 0; i <= options.skip_n_end_delimiters; ++i) {
size_t next = end + 1;
end = key.find(options.close_locktag, next);
if (end == key.npos || end == next) {
return key;
}
}

return key.substr(start + 1, end - start - 1);
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class ClusterConfig {
}

static bool IsShardedByTag() {
return IsEnabledOrEmulated() || KeyLockArgs::IsLockHashTagEnabled();
return IsEnabledOrEmulated() || KeyLockArgs::GetLockTagOptions().enabled;
}

// If the key contains the {...} pattern, return only the part between { and }
Expand Down
63 changes: 53 additions & 10 deletions src/server/cluster/cluster_config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "base/gtest.h"
#include "base/logging.h"
#include "server/test_utils.h"

using namespace std;
using namespace testing;
Expand All @@ -21,26 +22,68 @@ MATCHER_P(NodeMatches, expected, "") {
return arg.id == expected.id && arg.ip == expected.ip && arg.port == expected.port;
}

class ClusterConfigTest : public ::testing::Test {
class ClusterConfigTest : public BaseFamilyTest {
protected:
const string kMyId = "my-id";
};

TEST_F(ClusterConfigTest, KeyTagTest) {
string key = "{user1000}.following";
ASSERT_EQ("user1000", ClusterConfig::KeyTag(key));
SetTestFlag("lock_on_hashtags", "true");

key = " foo{}{bar}";
ASSERT_EQ(key, ClusterConfig::KeyTag(key));
EXPECT_EQ(ClusterConfig::KeyTag("{user1000}.following"), "user1000");

key = "foo{{bar}}zap";
ASSERT_EQ("{bar", ClusterConfig::KeyTag(key));
EXPECT_EQ(ClusterConfig::KeyTag("foo{{bar}}zap"), "{bar");

key = "foo{bar}{zap}";
ASSERT_EQ("bar", ClusterConfig::KeyTag(key));
EXPECT_EQ(ClusterConfig::KeyTag("foo{bar}{zap}"), "bar");

string_view key = " foo{}{bar}";
EXPECT_EQ(key, ClusterConfig::KeyTag(key));

key = "{}foo{bar}{zap}";
ASSERT_EQ(key, ClusterConfig::KeyTag(key));
EXPECT_EQ(key, ClusterConfig::KeyTag(key));

SetTestFlag("locktag_delimiter", ":");
TEST_InvalidateLockTagOptions();

key = "{user1000}.following";
EXPECT_EQ(ClusterConfig::KeyTag(key), key);

EXPECT_EQ(ClusterConfig::KeyTag("bull:queue1:123"), "queue1");
EXPECT_EQ(ClusterConfig::KeyTag("bull:queue:1:123"), "queue");
EXPECT_EQ(ClusterConfig::KeyTag("bull:queue:1:123:456:789:1000"), "queue");

key = "bull::queue:1:123";
EXPECT_EQ(ClusterConfig::KeyTag(key), key);

SetTestFlag("locktag_delimiter", ":");
SetTestFlag("locktag_skip_n_end_delimiters", "0");
SetTestFlag("locktag_prefix", "bull");
TEST_InvalidateLockTagOptions();
EXPECT_EQ(ClusterConfig::KeyTag("bull:queue:123"), "queue");
EXPECT_EQ(ClusterConfig::KeyTag("bull:queue:123:456:789:1000"), "queue");

key = "not-bull:queue1:123";
EXPECT_EQ(ClusterConfig::KeyTag(key), key);

SetTestFlag("locktag_delimiter", ":");
SetTestFlag("locktag_skip_n_end_delimiters", "1");
SetTestFlag("locktag_prefix", "bull");
TEST_InvalidateLockTagOptions();

key = "bull:queue1:123";
EXPECT_EQ(ClusterConfig::KeyTag(key), key);
EXPECT_EQ(ClusterConfig::KeyTag("bull:queue:1:123"), "queue:1");
EXPECT_EQ(ClusterConfig::KeyTag("bull:queue:1:123:456:789:1000"), "queue:1");

key = "bull::queue:1:123";
EXPECT_EQ(ClusterConfig::KeyTag(key), key);

SetTestFlag("locktag_delimiter", "|");
SetTestFlag("locktag_skip_n_end_delimiters", "2");
SetTestFlag("locktag_prefix", "");
TEST_InvalidateLockTagOptions();

EXPECT_EQ(ClusterConfig::KeyTag("|a|b|c|d|e"), "a|b|c");
}

TEST_F(ClusterConfigTest, ConfigSetInvalidEmpty) {
Expand Down
53 changes: 43 additions & 10 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,25 @@ extern "C" {
#include "server/transaction.h"
#include "strings/human_readable.h"

// We've generalized "hashtags" so that users can specify custom delimiter and closures, see below.
// If I had a time machine, I'd rename this to lock_on_tags.
ABSL_FLAG(bool, lock_on_hashtags, false,
"When true, locks are done in the {hashtag} level instead of key level.");
"When true, locks are done in the {hashtag} level instead of key level. Hashtag "
"extraction can be further configured with locktag_* flags.");

// We would have used `char` instead of `string`, but that's impossible.
ABSL_FLAG(
std::string, locktag_delimiter, "",
"If set, this char is used to extract a lock tag by looking at delimiters, like hash tags. If "
"unset, regular hashtag extraction is done (with {}). Must be used with --lock_on_hashtags");

ABSL_FLAG(unsigned, locktag_skip_n_end_delimiters, 0,
"How many closing tag delimiters should we skip when extracting lock tags. 0 for no "
"skipping. For example, when delimiter is ':' and this flag is 2, the locktag for "
"':a:b:c:d:e' will be 'a:b:c'.");

ABSL_FLAG(std::string, locktag_prefix, "",
"Only keys with this prefix participate in tag extraction.");

namespace dfly {

Expand All @@ -36,26 +53,42 @@ using namespace util;

namespace {
// Thread-local cache with static linkage.
thread_local std::optional<bool> is_enabled_flag_cache;
thread_local std::optional<LockTagOptions> locktag_lock_options;
} // namespace

void TEST_InvalidateLockHashTag() {
is_enabled_flag_cache = nullopt;
void TEST_InvalidateLockTagOptions() {
locktag_lock_options = nullopt; // For test main thread
CHECK(shard_set != nullptr);
shard_set->pool()->Await(
[](ShardId shard, ProactorBase* proactor) { is_enabled_flag_cache = nullopt; });
[](ShardId shard, ProactorBase* proactor) { locktag_lock_options = nullopt; });
}

bool KeyLockArgs::IsLockHashTagEnabled() {
if (!is_enabled_flag_cache.has_value()) {
is_enabled_flag_cache = absl::GetFlag(FLAGS_lock_on_hashtags);
/* static */ LockTagOptions KeyLockArgs::GetLockTagOptions() {
if (!locktag_lock_options.has_value()) {
string delimiter = absl::GetFlag(FLAGS_locktag_delimiter);
if (delimiter.empty()) {
delimiter = "{}";
} else if (delimiter.size() == 1) {
delimiter += delimiter; // Copy delimiter (e.g. "::") so that it's easier to use below
} else {
LOG(ERROR) << "Invalid value for locktag_delimiter - must be a single char";
exit(-1);
}

locktag_lock_options = {
.enabled = absl::GetFlag(FLAGS_lock_on_hashtags),
.open_locktag = delimiter[0],
.close_locktag = delimiter[1],
.skip_n_end_delimiters = absl::GetFlag(FLAGS_locktag_skip_n_end_delimiters),
.prefix = absl::GetFlag(FLAGS_locktag_prefix),
};
}

return *is_enabled_flag_cache;
return *locktag_lock_options;
}

string_view KeyLockArgs::GetLockKey(string_view key) {
if (IsLockHashTagEnabled()) {
if (GetLockTagOptions().enabled) {
return ClusterConfig::KeyTag(key);
}

Expand Down
10 changes: 9 additions & 1 deletion src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,16 @@ class CommandId;
class Transaction;
class EngineShard;

struct LockTagOptions {
bool enabled = false;
char open_locktag = '{';
char close_locktag = '}';
unsigned skip_n_end_delimiters = 0;
std::string prefix;
};

struct KeyLockArgs {
static bool IsLockHashTagEnabled();
static LockTagOptions GetLockTagOptions();

// Before acquiring and releasing keys, one must "normalize" them via GetLockKey().
static std::string_view GetLockKey(std::string_view key);
Expand Down
2 changes: 1 addition & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ void ClusterHtmlPage(const http::QueryArgs& args, HttpContext* send, ClusterFami
: "Disabled");

if (ClusterConfig::IsEnabledOrEmulated()) {
print_kb("Lock on hashtags", KeyLockArgs::IsLockHashTagEnabled());
print_kb("Lock on hashtags", KeyLockArgs::GetLockTagOptions().enabled);
}

if (ClusterConfig::IsEnabled()) {
Expand Down
5 changes: 1 addition & 4 deletions src/server/test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,9 @@ void BaseFamilyTest::TearDown() {
LOG(INFO) << "Finishing " << test_info->name();
}

// Test hook defined in common.cc.
void TEST_InvalidateLockHashTag();

void BaseFamilyTest::ResetService() {
if (service_ != nullptr) {
TEST_InvalidateLockHashTag();
TEST_InvalidateLockTagOptions();

ShutdownService();
}
Expand Down
3 changes: 3 additions & 0 deletions src/server/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ using namespace facade;
using util::fb2::Fiber;
using util::fb2::Launch;

// Test hook defined in common.cc.
void TEST_InvalidateLockTagOptions();

class TestConnection : public facade::Connection {
public:
TestConnection(Protocol protocol, io::StringSink* sink);
Expand Down

0 comments on commit 256e07f

Please sign in to comment.