Skip to content

Commit

Permalink
Fix Various Naming Conventions around Subscriber (dragonflydb#87)
Browse files Browse the repository at this point in the history
* Fix `SubscribeMap`

Signed-off-by: Ryan Russell <[email protected]>

* Fix `subscriber_arr`

Signed-off-by: Ryan Russell <[email protected]>

* Fix `CopySubscribers`

Signed-off-by: Ryan Russell <[email protected]>

* Fix `SubscriberMessagesLen`

Signed-off-by: Ryan Russell <[email protected]>

* Fix remaining `subscribe` variants

Signed-off-by: Ryan Russell <[email protected]>

* Add Ryan Russell to Contributors

Signed-off-by: Ryan Russell <[email protected]>
  • Loading branch information
ryanrussell authored Jun 4, 2022
1 parent e806e6c commit 4a86445
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@

* **[Philipp Born](https://github.com/tamcore)**
* Helm Chart
* **[Ryan Russell](https://github.com/ryanrussell)**
* Docs & Code Readability
6 changes: 3 additions & 3 deletions src/server/channel_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,21 @@ auto ChannelSlice::FetchSubscribers(string_view channel) -> vector<Subscriber> {
auto it = channels_.find(channel);
if (it != channels_.end()) {
res.reserve(it->second->subscribers.size());
CopySubsribers(it->second->subscribers, string{}, &res);
CopySubscribers(it->second->subscribers, string{}, &res);
}

for (const auto& k_v : patterns_) {
const string& pat = k_v.first;
// 1 - match
if (stringmatchlen(pat.data(), pat.size(), channel.data(), channel.size(), 0) == 1) {
CopySubsribers(k_v.second->subscribers, pat, &res);
CopySubscribers(k_v.second->subscribers, pat, &res);
}
}

return res;
}

void ChannelSlice::CopySubsribers(const SubsribeMap& src, const std::string& pattern,
void ChannelSlice::CopySubscribers(const SubscribeMap& src, const std::string& pattern,
vector<Subscriber>* dest) {
for (const auto& sub : src) {
Subscriber s(sub.first, sub.second.thread_id);
Expand Down
6 changes: 3 additions & 3 deletions src/server/channel_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ class ChannelSlice {
}
};

using SubsribeMap = absl::flat_hash_map<ConnectionContext*, SubscriberInternal>;
using SubscribeMap = absl::flat_hash_map<ConnectionContext*, SubscriberInternal>;

static void CopySubsribers(const SubsribeMap& src, const std::string& pattern,
static void CopySubscribers(const SubscribeMap& src, const std::string& pattern,
std::vector<Subscriber>* dest);

struct Channel {
SubsribeMap subscribers;
SubscribeMap subscribers;
};

absl::flat_hash_map<std::string, std::unique_ptr<Channel>> channels_;
Expand Down
12 changes: 6 additions & 6 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis
this->force_dispatch = true;
}

// Gather all the channels we need to subsribe to / remove.
// Gather all the channels we need to subscribe to / remove.
for (size_t i = 0; i < args.size(); ++i) {
bool res = false;
string_view channel = ArgS(args, i);
Expand Down Expand Up @@ -71,7 +71,7 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis
int32_t tid = util::ProactorBase::GetIndex();
DCHECK_GE(tid, 0);

// Update the subsribers on publisher's side.
// Update the subscribers on publisher's side.
auto cb = [&](EngineShard* shard) {
ChannelSlice& cs = shard->channel_slice();
unsigned start = shard_idx[shard->shard_id()];
Expand Down Expand Up @@ -100,8 +100,8 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis
(*this)->SendBulkString(action[to_add]);
(*this)->SendBulkString(ArgS(args, i)); // channel

// number of subsribed channels for this connection *right after*
// we subsribe.
// number of subscribed channels for this connection *right after*
// we subscribe.
(*this)->SendLong(result[i]);
}
}
Expand All @@ -121,7 +121,7 @@ void ConnectionContext::ChangePSub(bool to_add, bool to_reply, CmdArgList args)
this->force_dispatch = true;
}

// Gather all the patterns we need to subsribe to / remove.
// Gather all the patterns we need to subscribe to / remove.
for (size_t i = 0; i < args.size(); ++i) {
bool res = false;
string_view pattern = ArgS(args, i);
Expand All @@ -147,7 +147,7 @@ void ConnectionContext::ChangePSub(bool to_add, bool to_reply, CmdArgList args)
int32_t tid = util::ProactorBase::GetIndex();
DCHECK_GE(tid, 0);

// Update the subsribers on publisher's side.
// Update the subscribers on publisher's side.
auto cb = [&](EngineShard* shard) {
ChannelSlice& cs = shard->channel_slice();
for (string_view pattern : patterns) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ TEST_F(DflyEngineTest, PSubscribe) {
resp = pp_->at(0)->Await([&] { return Run({"publish", "ab", "foo"}); });
EXPECT_THAT(resp, IntArg(1));

ASSERT_EQ(1, SubsriberMessagesLen("IO1"));
ASSERT_EQ(1, SubscriberMessagesLen("IO1"));

facade::Connection::PubMessage msg = GetPublishedMessage("IO1", 0);
EXPECT_EQ("foo", msg.message);
Expand Down
24 changes: 12 additions & 12 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -897,31 +897,31 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {

auto cb = [&] { return EngineShard::tlocal()->channel_slice().FetchSubscribers(channel); };

vector<ChannelSlice::Subscriber> subsriber_arr = shard_set->Await(sid, std::move(cb));
vector<ChannelSlice::Subscriber> subscriber_arr = shard_set->Await(sid, std::move(cb));
atomic_uint32_t published{0};

if (!subsriber_arr.empty()) {
sort(subsriber_arr.begin(), subsriber_arr.end(),
if (!subscriber_arr.empty()) {
sort(subscriber_arr.begin(), subscriber_arr.end(),
[](const auto& left, const auto& right) { return left.thread_id < right.thread_id; });

vector<unsigned> slices(shard_set->pool()->size(), UINT_MAX);
for (size_t i = 0; i < subsriber_arr.size(); ++i) {
if (slices[subsriber_arr[i].thread_id] > i) {
slices[subsriber_arr[i].thread_id] = i;
for (size_t i = 0; i < subscriber_arr.size(); ++i) {
if (slices[subscriber_arr[i].thread_id] > i) {
slices[subscriber_arr[i].thread_id] = i;
}
}

fibers_ext::BlockingCounter bc(subsriber_arr.size());
fibers_ext::BlockingCounter bc(subscriber_arr.size());
auto publish_cb = [&, bc](unsigned idx, util::ProactorBase*) mutable {
unsigned start = slices[idx];

for (unsigned i = start; i < subsriber_arr.size(); ++i) {
const ChannelSlice::Subscriber& subscriber = subsriber_arr[i];
for (unsigned i = start; i < subscriber_arr.size(); ++i) {
const ChannelSlice::Subscriber& subscriber = subscriber_arr[i];
if (subscriber.thread_id != idx)
break;

published.fetch_add(1, memory_order_relaxed);
facade::Connection* conn = subsriber_arr[i].conn_cntx->owner();
facade::Connection* conn = subscriber_arr[i].conn_cntx->owner();
DCHECK(conn);
facade::Connection::PubMessage pmsg;
pmsg.channel = channel;
Expand All @@ -936,10 +936,10 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
bc.Wait(); // Wait for all the messages to be sent.
}

// If subsriber connections are closing they will wait
// If subscriber connections are closing they will wait
// for the tokens to be reclaimed in OnClose(). This guarantees that subscribers we gathered
// still exist till we finish publishing.
for (auto& s : subsriber_arr) {
for (auto& s : subscriber_arr) {
s.borrow_token.Dec();
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ string BaseFamilyTest::GetId() const {
return absl::StrCat("IO", id);
}

size_t BaseFamilyTest::SubsriberMessagesLen(string_view conn_id) const {
size_t BaseFamilyTest::SubscriberMessagesLen(string_view conn_id) const {
auto it = connections_.find(conn_id);
if (it == connections_.end())
return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/server/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class BaseFamilyTest : public ::testing::Test {
void UpdateTime(uint64_t ms);

std::string GetId() const;
size_t SubsriberMessagesLen(std::string_view conn_id) const;
size_t SubscriberMessagesLen(std::string_view conn_id) const;

// Returns message parts as returned by RESP:
// pmessage, pattern, channel, message
Expand Down

0 comments on commit 4a86445

Please sign in to comment.