-
Notifications
You must be signed in to change notification settings - Fork 350
/
Copy pathGMessageManager.h
367 lines (323 loc) · 12.1 KB
/
GMessageManager.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
/***************************
@Author: Chunel
@Contact: [email protected]
@File: GMessageManager.h
@Time: 2022/10/30 20:48
@Desc: 信息管理类
***************************/
#ifndef CGRAPH_GMESSAGEMANAGER_H
#define CGRAPH_GMESSAGEMANAGER_H
#include <string>
#include <set>
#include <unordered_map>
#include <memory>
#include "GMessage.h"
#include "GMessageDefine.h"
CGRAPH_NAMESPACE_BEGIN
template<typename T = GMessageParam,
c_enable_if_t<std::is_base_of<GMessageParam, T>::value, int> = 0>
class GMessageManager : public GMessageObject,
public GraphManager<GMessage<T> > {
public:
/**
* 创建 topic
* @tparam TImpl
* @param topic
* @param size
* @return
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus createTopic(const std::string& topic, CUInt size) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::SEND_RECV_PREFIX + topic; // 中间做一层映射,用来区分是 PubSub的,还是SendRecv的
CGRAPH_LOCK_GUARD lk(send_recv_mutex_);
auto result = send_recv_message_map_.find(innerTopic);
if (result != send_recv_message_map_.end()) {
// 如果类型和size完全匹配的话,则直接返回创建成功。否则返回错误
auto curTopic = result->second;
status = (typeid(*curTopic).name() == typeid(GMessage<TImpl>).name() && curTopic->getCapacity() == size)
? CStatus() : CStatus("create topic [" + topic + "] duplicate");
} else {
// 创建一个 topic信息
auto message = CAllocator::safeMallocTemplateCObject<GMessage<TImpl>>(size);
send_recv_message_map_.insert(std::pair<const std::string&, GMessagePtr<T> >(innerTopic, GMessagePtr<T>(message)));
}
CGRAPH_FUNCTION_END
}
/**
* 删除 topic
* @param topic
* @return
*/
CStatus removeTopic(const std::string& topic) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::SEND_RECV_PREFIX + topic;
CGRAPH_LOCK_GUARD lk(send_recv_mutex_);
auto result = send_recv_message_map_.find(innerTopic);
if (result == send_recv_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
}
CGRAPH_DELETE_PTR(result->second);
send_recv_message_map_.erase(result);
CGRAPH_FUNCTION_END
}
/**
* 根据传入的topic,获得信息
* @tparam TImpl
* @param topic
* @param value
* @param timeout
* @return
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus recvTopicValue(const std::string& topic,
TImpl& value,
CMSec timeout = CGRAPH_MAX_BLOCK_TTL) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::SEND_RECV_PREFIX + topic;
auto result = send_recv_message_map_.find(innerTopic);
if (result == send_recv_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
}
auto message = (GMessagePtr<TImpl>)(result->second);
CGRAPH_ASSERT_NOT_NULL(message);
status = message->recv(value, timeout);
CGRAPH_FUNCTION_END
}
/**
* 根据传入的topic,获得信息。仅针对传入智能指针的情况
* @tparam TImpl
* @param topic
* @param value
* @param timeout
* @return
* @notice 这里的逻辑,跟上面的函数一致。里面调用了底层RingBuffer的同名不同入参的接口。
* 本人暂时没有能力完成接口的统一。如果有了解这一块内容的朋友,欢迎交流指正。
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus recvTopicValue(const std::string& topic,
std::unique_ptr<TImpl>& value,
CMSec timeout = CGRAPH_MAX_BLOCK_TTL) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::SEND_RECV_PREFIX + topic;
auto result = send_recv_message_map_.find(innerTopic);
if (result == send_recv_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
}
auto message = (GMessagePtr<TImpl>)(result->second);
CGRAPH_ASSERT_NOT_NULL(message);
status = message->recv(value, timeout);
CGRAPH_FUNCTION_END
}
/**
* 根据传入的topic,输入信息
* @tparam TImpl
* @param topic
* @param value
* @param strategy
* @return
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus sendTopicValue(const std::string& topic,
const TImpl& value,
GMessagePushStrategy strategy) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::SEND_RECV_PREFIX + topic;
auto result = send_recv_message_map_.find(innerTopic);
if (result == send_recv_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
}
auto message = static_cast<GMessagePtr<T> >(result->second);
CGRAPH_ASSERT_NOT_NULL(message);
message->send(value, strategy);
CGRAPH_FUNCTION_END
}
/**
* 根据传入的topic,输入智能指针类型的信息
* @tparam TImpl
* @param topic
* @param value
* @param strategy
* @return
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus sendTopicValue(const std::string& topic,
std::unique_ptr<TImpl>& value,
GMessagePushStrategy strategy) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::SEND_RECV_PREFIX + topic;
auto result = send_recv_message_map_.find(innerTopic);
if (result == send_recv_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
}
auto message = static_cast<GMessagePtr<T> >(result->second);
CGRAPH_ASSERT_NOT_NULL(message);
message->send(value, strategy);
CGRAPH_FUNCTION_END
}
/**
* 绑定对应的topic信息,并且获取 conn_id 信息
* @tparam TImpl
* @param topic
* @param size
* @return
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CIndex bindTopic(const std::string& topic, CUInt size) {
auto innerTopic = internal::PUB_SUB_PREFIX + topic;
CGRAPH_LOCK_GUARD lk(pub_sub_mutex_);
auto message = CAllocator::safeMallocTemplateCObject<GMessage<TImpl>>(size);
CIndex connId = (++cur_conn_id_);
auto result = pub_sub_message_map_.find(innerTopic);
if (result != pub_sub_message_map_.end()) {
// 如果之前有的话,则在后面添加一个
auto& messageSet = result->second;
messageSet.insert((GMessagePtr<T>)message);
} else {
// 如果是这个topic第一次被绑定,则创建一个对应的set信息
std::set<GMessagePtr<T>> messageSet;
messageSet.insert((GMessagePtr<T>)message);
pub_sub_message_map_[innerTopic] = messageSet;
}
conn_message_map_[connId] = (GMessagePtr<T>)message;
return connId;
}
/**
* 开始发送对应topic的信息
* @tparam TImpl
* @param topic
* @param value
* @param strategy
* @return
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus pubTopicValue(const std::string& topic,
const TImpl& value,
GMessagePushStrategy strategy) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::PUB_SUB_PREFIX + topic;
auto result = pub_sub_message_map_.find(innerTopic);
if (result == pub_sub_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
}
auto& messageSet = result->second;
for (auto msg : messageSet) {
msg->send(value, strategy); // 给所有订阅的信息,一次发送消息
}
CGRAPH_FUNCTION_END
}
/**
* 根据传入的 connId信息,来获取对应的message信息
* @tparam TImpl
* @param connId
* @param value
* @param timeout
* @return
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus subTopicValue(CIndex connId, TImpl& value, CMSec timeout = CGRAPH_MAX_BLOCK_TTL) {
CGRAPH_FUNCTION_BEGIN
if (conn_message_map_.end() == conn_message_map_.find(connId)) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + std::to_string(connId) + "] connect");
}
auto message = (GMessagePtr<TImpl>)(conn_message_map_[connId]);
status = message->recv(value, timeout);
CGRAPH_FUNCTION_END
}
/**
* 根据传入的 connId信息,来获取对应的message信息。仅针对传入智能指针的情况
* @tparam TImpl
* @param connId
* @param value
* @param timeout
* @return
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus subTopicValue(CIndex connId, std::unique_ptr<TImpl>& value, CMSec timeout = CGRAPH_MAX_BLOCK_TTL) {
CGRAPH_FUNCTION_BEGIN
if (conn_message_map_.end() == conn_message_map_.find(connId)) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + std::to_string(connId) + "] connect");
}
auto message = (GMessagePtr<TImpl>)(conn_message_map_[connId]);
status = message->recv(value, timeout);
CGRAPH_FUNCTION_END
}
/**
* 删除对应的topic信息
* @param topic
* @return
*/
CStatus dropTopic(const std::string& topic) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::PUB_SUB_PREFIX + topic;
CGRAPH_LOCK_GUARD lk(pub_sub_mutex_);
auto result = pub_sub_message_map_.find(innerTopic);
if (result == pub_sub_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
}
auto& messageSet = result->second;
for (auto msg : messageSet) {
CGRAPH_DELETE_PTR(msg)
}
pub_sub_message_map_.erase(innerTopic);
CGRAPH_FUNCTION_END
}
/**
* 清空数据
* @return
*/
CStatus clear() final {
CGRAPH_FUNCTION_BEGIN
{
CGRAPH_LOCK_GUARD lk(send_recv_mutex_);
for (auto& cur : send_recv_message_map_) {
CGRAPH_DELETE_PTR(cur.second)
}
send_recv_message_map_.clear();
}
{
CGRAPH_LOCK_GUARD lk(pub_sub_mutex_);
for (auto& cur : pub_sub_message_map_) {
for (auto iter : cur.second) {
CGRAPH_DELETE_PTR(iter);
}
}
pub_sub_message_map_.clear();
cur_conn_id_ = 0;
}
CGRAPH_FUNCTION_END
}
protected:
CStatus init() final {
/**
* 仅可能被 USingleton 类调用,自动构造时候使用
* 防止菱形引用的奇异
*/
CGRAPH_EMPTY_FUNCTION
}
CStatus destroy() final {
CGRAPH_EMPTY_FUNCTION
}
~GMessageManager() override {
clear(); // 释放所有的信息
}
private:
std::unordered_map<std::string, GMessagePtr<T>> send_recv_message_map_; // 记录 topic 和 message queue 信息
std::unordered_map<std::string, std::set<GMessagePtr<T>>> pub_sub_message_map_; // 记录 pub和sub的 message 的信息
std::unordered_map<CIndex, GMessagePtr<T>> conn_message_map_; // 用于根据 index反推message信息
CIndex cur_conn_id_ = 0; // 记录当前的conn信息
std::mutex pub_sub_mutex_;
std::mutex send_recv_mutex_;
template<typename U, USingletonType, CBool> friend class USingleton;
};
CGRAPH_NAMESPACE_END
#endif //CGRAPH_GMESSAGEMANAGER_H