Skip to content

Commit

Permalink
[feat] add mutex in create topic, no mutex for send/recv/pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Oct 27, 2024
1 parent 3d0f45f commit 4c19d58
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions src/GraphCtrl/GraphMessage/GMessageManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class GMessageManager : public GMessageObject,
CGRAPH_FUNCTION_BEGIN

auto innerTopic = internal::SEND_RECV_PREFIX + topic; // 中间做一层映射,用来区分是 PubSub的,还是SendRecv的
CGRAPH_LOCK_GUARD lk(send_recv_mutex_);
auto result = send_recv_message_map_.find(innerTopic);
if (result != send_recv_message_map_.end()) {
// 如果类型和size完全匹配的话,则直接返回创建成功。否则返回错误
Expand All @@ -60,6 +61,7 @@ class GMessageManager : public GMessageObject,
CStatus removeTopic(const std::string& topic) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::SEND_RECV_PREFIX + topic;
CGRAPH_LOCK_GUARD lk(send_recv_mutex_);
auto result = send_recv_message_map_.find(innerTopic);
if (result == send_recv_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
Expand Down Expand Up @@ -192,9 +194,9 @@ class GMessageManager : public GMessageObject,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CIndex bindTopic(const std::string& topic, CUInt size) {
auto innerTopic = internal::PUB_SUB_PREFIX + topic;
auto message = UAllocator::safeMallocTemplateCObject<GMessage<TImpl>>(size);

CGRAPH_LOCK_GUARD lock(bind_mutex_);
CGRAPH_LOCK_GUARD lk(pub_sub_mutex_);
auto message = UAllocator::safeMallocTemplateCObject<GMessage<TImpl>>(size);
CIndex connId = (++cur_conn_id_);
auto result = pub_sub_message_map_.find(innerTopic);
if (result != pub_sub_message_map_.end()) {
Expand Down Expand Up @@ -288,6 +290,7 @@ class GMessageManager : public GMessageObject,
CStatus dropTopic(const std::string& topic) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::PUB_SUB_PREFIX + topic;
CGRAPH_LOCK_GUARD lk(pub_sub_mutex_);
auto result = pub_sub_message_map_.find(innerTopic);
if (result == pub_sub_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
Expand All @@ -307,18 +310,25 @@ class GMessageManager : public GMessageObject,
*/
CStatus clear() final {
CGRAPH_FUNCTION_BEGIN
for (auto& cur : send_recv_message_map_) {
CGRAPH_DELETE_PTR(cur.second)
{
CGRAPH_LOCK_GUARD lk(send_recv_mutex_);
for (auto& cur : send_recv_message_map_) {
CGRAPH_DELETE_PTR(cur.second)
}
send_recv_message_map_.clear();
}

for (auto& cur : pub_sub_message_map_) {
for (auto iter : cur.second) {
CGRAPH_DELETE_PTR(iter);
{
CGRAPH_LOCK_GUARD lk(pub_sub_mutex_);
for (auto& cur : pub_sub_message_map_) {
for (auto iter : cur.second) {
CGRAPH_DELETE_PTR(iter);
}
}
pub_sub_message_map_.clear();
cur_conn_id_ = 0;
}
send_recv_message_map_.clear();
pub_sub_message_map_.clear();
cur_conn_id_ = 0;

CGRAPH_FUNCTION_END
}

Expand Down Expand Up @@ -346,7 +356,8 @@ class GMessageManager : public GMessageObject,
std::unordered_map<CIndex, GMessagePtr<T>> conn_message_map_; // 用于根据 index反推message信息
CIndex cur_conn_id_ = 0; // 记录当前的conn信息

std::mutex bind_mutex_;
std::mutex pub_sub_mutex_;
std::mutex send_recv_mutex_;

template<typename U, USingletonType, CBool> friend class USingleton;
};
Expand Down

0 comments on commit 4c19d58

Please sign in to comment.