Skip to content

Commit

Permalink
read code about 2 level message queue
Browse files Browse the repository at this point in the history
  • Loading branch information
chinabin committed Apr 15, 2018
1 parent 2bff073 commit fb6a711
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 22 deletions.
38 changes: 27 additions & 11 deletions src_code/skynet_mq.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,30 @@

#define DEFAULT_QUEUE_SIZE 64;

// 二级消息队列,和服务挂钩
struct message_queue {
uint32_t handle;
uint32_t handle; // 服务 id
int cap; // 消息队列能容纳的消息数
int head; // 指向当前可以取出消息的位置,由取出操作 skynet_mq_leave 来管理
int tail; // 指向当前可以插入的队列中的位置,由插入操作 skynet_mq_enter 来管理
int head; // 指向当前可以取出消息的位置,由取出操作 skynet_mq_pop 来管理
int tail; // 指向当前可以插入的队列中的位置,由插入操作 skynet_mq_push 来管理
int lock; // 自旋锁,确保添加消息和取出消息不出错
struct skynet_message *queue;
};

// 全局消息队列
struct global_queue {
int cap;
int head;
int tail;
int head; // 指向当前可以取出消息的位置,由取出操作 skynet_globalmq_pop 来管理
int tail; // 指向当前可以插入的队列中的位置,由插入操作 skynet_globalmq_push 管理
int lock;
struct message_queue ** queue;
struct message_queue ** queue; // 消息队列数组,里面存储的是各个服务相关的消息队列
};

// 远程消息队列
struct message_remote_queue {
int cap;
int head;
int tail;
int head; // 指向当前可以取出消息的位置,由取出操作 skynet_remotemq_pop 来管理
int tail; // 指向当前可以插入的队列中的位置,由插入操作 skynet_remotemq_push 管理
int lock;
struct skynet_remote_message *queue;
};
Expand All @@ -42,6 +45,7 @@ _lock_global_queue() {
#define LOCK(q) while (__sync_lock_test_and_set(&(q)->lock,1)) {}
#define UNLOCK(q) __sync_lock_release(&(q)->lock);

// 插入消息队列
void
skynet_globalmq_push(struct message_queue * queue) {
struct global_queue *q= Q;
Expand Down Expand Up @@ -69,7 +73,7 @@ skynet_globalmq_push(struct message_queue * queue) {
UNLOCK(q)
}


// 弹出一个消息队列
struct message_queue *
skynet_globalmq_pop() {
struct global_queue *q = Q;
Expand All @@ -88,6 +92,7 @@ skynet_globalmq_pop() {
return ret;
}

// 创建一个和服务挂钩的消息队列
struct message_queue *
skynet_mq_create(uint32_t handle) {
struct message_queue *q = malloc(sizeof(*q));
Expand All @@ -108,11 +113,13 @@ skynet_mq_release(struct message_queue *q) {
free(q);
}

// 返回一个消息队列的 handle
uint32_t
skynet_mq_handle(struct message_queue *q) {
return q->handle;
}

// 从指定的消息队列中弹出消息,返回 -1 表示没消息,返回 0 表示获取成功
int
skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {
int ret = -1;
Expand Down Expand Up @@ -160,6 +167,13 @@ skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
UNLOCK(q)
}

/*
创建全局消息队列 Q
cap: 使得消息队列的长度为 X , X 的值是大于 cap 的第一个 2 的次方,
同时 X 也是二级消息队列的个数
例如传入 5 ,则 X 等于 8
例如传入 8 ,则 X 等于 16
*/
void
skynet_mq_init(int n) {
struct global_queue *q = malloc(sizeof(*q));
Expand All @@ -174,8 +188,8 @@ skynet_mq_init(int n) {
Q=q;
}

// 创建远程消息队列
// remote message queue

struct message_remote_queue *
skynet_remotemq_create(void) {
struct message_remote_queue *q = malloc(sizeof(*q));
Expand All @@ -188,13 +202,14 @@ skynet_remotemq_create(void) {
return q;
}


// 释放远程消息队列
void
skynet_remotemq_release(struct message_remote_queue *q) {
free(q->queue);
free(q);
}

// 从指定的远程消息队列中弹出消息
int
skynet_remotemq_pop(struct message_remote_queue *q, struct skynet_remote_message *message) {
int ret = -1;
Expand All @@ -213,6 +228,7 @@ skynet_remotemq_pop(struct message_remote_queue *q, struct skynet_remote_message
return ret;
}

// 往指定的远程消息队列中加消息
void
skynet_remotemq_push(struct message_remote_queue *q, struct skynet_remote_message *message) {
assert(message->destination != 0);
Expand Down
20 changes: 17 additions & 3 deletions src_code/skynet_mq.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@ struct skynet_message {

struct message_queue;

// 插入消息队列
void skynet_globalmq_push(struct message_queue *);
// 弹出一个消息队列
struct message_queue * skynet_globalmq_pop(void);

// 创建一个和服务挂钩的消息队列
struct message_queue * skynet_mq_create(uint32_t handle);
// 释放消息队列
void skynet_mq_release(struct message_queue *);
// 从指定消息队列头中取出一个消息,返回此消息的目的地,返回 -1 表示阻塞( 也就是没消息 )
// 返回一个消息队列的 handle
uint32_t skynet_mq_handle(struct message_queue *);

// 0 for success
// 从指定的消息队列中弹出消息,返回 -1 表示没消息,返回 0 表示获取成功
int skynet_mq_pop(struct message_queue *q, struct skynet_message *message);
// 将新消息添加到指定队列尾,队列如果满了则扩大一倍
void skynet_mq_push(struct message_queue *q, struct skynet_message *message);

struct skynet_remote_message {
Expand All @@ -32,14 +36,24 @@ struct skynet_remote_message {

struct message_remote_queue;

// 创建远程消息队列
struct message_remote_queue * skynet_remotemq_create(void);
// 释放远程消息队列
void skynet_remotemq_release(struct message_remote_queue *);

// 从指定的远程消息队列中弹出消息
int skynet_remotemq_pop(struct message_remote_queue *q, struct skynet_remote_message *message);
// 往指定的远程消息队列中加消息
void skynet_remotemq_push(struct message_remote_queue *q, struct skynet_remote_message *message);


// skynet_mq_create 的语法糖,创建消息队列 Q
/*
创建全局消息队列 Q
cap: 使得消息队列的长度为 X , X 的值是大于 cap 的第一个 2 的次方,
同时 X 也是二级消息队列的个数
例如传入 5 ,则 X 等于 8
例如传入 8 ,则 X 等于 16
*/
void skynet_mq_init(int cap);

#endif
36 changes: 29 additions & 7 deletions src_code/skynet_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@
#define BLACKHOLE "blackhole"
#define DEFAULT_MESSAGE_QUEUE 16

/*
节点:运行可执行服务端程序文件 "skynet" ,即启动了一个“节点”。每个节点可以启动多个服务( service )。
模块( module ):模块是一个动态库文件(.so),若要创建一个能被 skynet 调用的模块,
假设模块名为xxx,必须实现4个函数xxx_create、xxx_init、xxx_release。
服务( service ): module 被 skynet 加载运行,就成为一个 service 。
每个 service 有唯一的 handle ID 。同节点的 service 之间可以通过全局
消息队列通讯。
harbor: harbor 是一种特殊的 service ,它代理不同节点之间的通讯。
用一个 32 位(4字节)的变量来唯一标识一个服务:高 8 位表示该 service 所
属的 harbor ID ,低 24 位表示 handle ID 。如果消息队列中的某个消息所指
定处理者的 ID 的高 8 位(即 harbor ID )与本节点 harbor ID 不同,表示该
消息将经过 harbor 转发到另外一个节点的服务来处理。
service 的 context :每个 service 对应一个 context , context 保存了本 service 运行
时的各种状态和变量。
*/
struct skynet_context {
void * instance; // 实例指针,通过调用创建实例函数返回
struct skynet_module * mod; // 实例对应的模块
Expand All @@ -24,7 +43,7 @@ struct skynet_context {
char result[32]; // 不同的命令会设置不同的返回值
void * cb_ud; // 回调函数的第二个参数
skynet_cb cb; // 回调函数指针,定义在 skynet.h : typedef void (*skynet_cb)(struct skynet_context * context, void *ud, const char * uid , const void * msg, size_t sz_session);
int in_global_queue;
int in_global_queue; // 当前服务的消息队列是否已经在全局消息队列的标识
struct message_queue *queue;
};

Expand Down Expand Up @@ -72,7 +91,7 @@ skynet_context_new(const char * name, const char *parm) {
_id_to_hex(uid+1, ctx->handle); // 应该放在后面一句 ctx->handle 赋值语句后面,后面的版本修复了

ctx->handle = skynet_handle_register(ctx); // 服务注册,获得服务编号
ctx->queue = skynet_mq_create(ctx->handle);
ctx->queue = skynet_mq_create(ctx->handle); // 创建和服务挂钩的消息队列
// init function maybe use ctx->handle, so it must init at last

int r = skynet_module_instance_init(mod, inst, ctx, parm);
Expand Down Expand Up @@ -134,6 +153,7 @@ _dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
}
}

// 丢弃消息队列
static void
_drop_queue(struct message_queue *q) {
// todo: send message back to message source
Expand All @@ -154,11 +174,11 @@ _drop_queue(struct message_queue *q) {
*/
int
skynet_context_message_dispatch(void) {
struct message_queue * q = skynet_globalmq_pop();
struct message_queue * q = skynet_globalmq_pop(); // 从全局消息队列中取出一个消息队列
if (q==NULL)
return 1;

uint32_t handle = skynet_mq_handle(q);
uint32_t handle = skynet_mq_handle(q); // 获得消息队列挂钩的服务 id

struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) { // 服务已销毁则丢弃消息到黑洞
Expand All @@ -171,7 +191,7 @@ skynet_context_message_dispatch(void) {
struct skynet_message msg;
if (skynet_mq_pop(q,&msg)) {
// empty queue
__sync_lock_release(&ctx->in_global_queue);
__sync_lock_release(&ctx->in_global_queue); // 将 ctx->in_global_queue 置 0
skynet_context_release(ctx);
return 0;
}
Expand All @@ -188,7 +208,7 @@ skynet_context_message_dispatch(void) {

skynet_context_release(ctx);

skynet_globalmq_push(q);
skynet_globalmq_push(q); // 塞回全局消息队列,这样的 push 和 pop 保证了公平性

return 0;
}
Expand Down Expand Up @@ -310,14 +330,16 @@ skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
context->cb_ud = ud;
}

// 将消息放入服务对应的消息队列,并将消息队列加入全局消息队列
// 返回 0 表示成功,返回 -1 表示 handle 对应的服务不存在
int
skynet_context_push(uint32_t handle, struct skynet_message *message) {
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
return -1;
}
skynet_mq_push(ctx->queue, message);
if (__sync_lock_test_and_set(&ctx->in_global_queue,1) == 0) {
if (__sync_lock_test_and_set(&ctx->in_global_queue,1) == 0) { // 将 ctx->in_global_queue 设为 1 并返回 ctx->in_global_queue 操作之前的值。
skynet_globalmq_push(ctx->queue);
}
skynet_context_release(ctx);
Expand Down
2 changes: 1 addition & 1 deletion src_code/skynet_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct skynet_context * skynet_context_release(struct skynet_context *);
uint32_t skynet_context_handle(struct skynet_context *);
// 设置 ctx 的 handle
void skynet_context_init(struct skynet_context *, uint32_t handle);
// 未实现
// 将消息放入服务对应的消息队列,并将消息队列加入全局消息队列
int skynet_context_push(uint32_t handle, struct skynet_message *message);
/*
从全局消息队列中取出消息分发,返回 1 表示阻塞,当前无消息
Expand Down
1 change: 1 addition & 0 deletions src_code/skynet_timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ timer_execute(struct timer *T)
current=link_clear(&T->near[idx]);

do {
// 定时到期,将消息推到消息队列
struct timer_event * event = (struct timer_event *)(current+1);
struct skynet_message message;
message.source = SKYNET_SYSTEM_TIMER;
Expand Down

0 comments on commit fb6a711

Please sign in to comment.