Skip to content

Commit

Permalink
code copy
Browse files Browse the repository at this point in the history
c0e4bb462cd437a0064f5991fe8000011f222472
skynet support session in message
  • Loading branch information
chinabin committed Apr 15, 2018
1 parent fb6a711 commit 7ea33b7
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 58 deletions.
4 changes: 2 additions & 2 deletions src_code/agent.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ local skynet = require "skynet"
-- 输出传入 agent.lua 文件的参数
print("agent",...)

skynet.callback(function(addr, msg)
print("[agent]",addr,msg)
skynet.callback(function(session, addr, msg)
print("[agent]",session, addr,msg)
end)
4 changes: 2 additions & 2 deletions src_code/console.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ skynet.callback(function()
else
print("Lauch:",handle)
end
skynet.command("TIMEOUT","0:0")
skynet.command("TIMEOUT", 0, "0")
end)

skynet.command("TIMEOUT","0:0") --定时器消息,循环读取输入的关键
skynet.command("TIMEOUT", 0, "0") --定时器消息,循环读取输入的关键
14 changes: 7 additions & 7 deletions src_code/gate/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ _report(struct skynet_context * ctx, const char * data, ...) {
tmp[n] = '\0';
}

skynet_send(ctx, WATCHDOG, strdup(tmp), n);
skynet_send(ctx, WATCHDOG, 0, strdup(tmp), n);
}

// 给 watchdog 或者 agent 发送 data 命令
Expand All @@ -127,7 +127,7 @@ _forward(struct skynet_context * ctx,struct gate *g, int uid, void * data, size_
char * tmp = malloc(len + 32);
int n = snprintf(tmp,len+32,"%d data ",uid);
memcpy(tmp+n,data,len);
skynet_send(ctx, agent->agent ? agent->agent : WATCHDOG, tmp, len + n);
skynet_send(ctx, agent->agent ? agent->agent : WATCHDOG, 0, tmp, len + n);
}

static int
Expand Down Expand Up @@ -169,7 +169,7 @@ _remove_id(struct gate *g, int uid) {
}

static void
_cb(struct skynet_context * ctx, void * ud, const char * uid, const void * msg, size_t sz) {
_cb(struct skynet_context * ctx, void * ud, int session, const char * uid, const void * msg, size_t sz) {
struct gate *g = ud;
if (msg) {
_ctrl(ctx, g , msg , (int)sz);
Expand All @@ -178,7 +178,7 @@ _cb(struct skynet_context * ctx, void * ud, const char * uid, const void * msg,
struct mread_pool * m = g->pool;
int connection_id = mread_poll(m,100); // timeout : 100ms
if (connection_id < 0) {
skynet_command(ctx, "TIMEOUT","1:0");
skynet_command(ctx, "TIMEOUT", 0, "1");
} else {
int id = g->map[connection_id].uid;
if (id == 0) {
Expand Down Expand Up @@ -209,7 +209,7 @@ _cb(struct skynet_context * ctx, void * ud, const char * uid, const void * msg,
_forward(ctx, g, id, data, *plen); // 将接收到的数据发往 watchdog 或者 agent
mread_yield(m);
_break:
skynet_command(ctx, "TIMEOUT","0:0");
skynet_command(ctx, "TIMEOUT",0,"0");
}
}

Expand Down Expand Up @@ -249,8 +249,8 @@ gate_init(struct gate *g , struct skynet_context * ctx, char * parm) {
}

skynet_callback(ctx,g,_cb);
skynet_command(ctx,"REG","gate");
skynet_command(ctx,"TIMEOUT","0:0");
skynet_command(ctx,"REG",0,"gate");
skynet_command(ctx,"TIMEOUT",0, "0");

return 0;
}
4 changes: 2 additions & 2 deletions src_code/globallog.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
local skynet = require "skynet"

skynet.callback(function(from , message)
print("[GLOBALLOG]",from,message)
skynet.callback(function(session, from , message)
print("[GLOBALLOG]",session, from,message)
end)

skynet.command("REG","LOG")
49 changes: 34 additions & 15 deletions src_code/lua-skynet.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@

// lua 服务的回调函数入口,在 _callback 中设置,在 _dispatch_message 中被调用。最终调用 skynet.callback 中设定的回调函数
static void
_cb(struct skynet_context * context, void * ud, const char * addr, const void * msg, size_t sz_session) {
_cb(struct skynet_context * context, void * ud, int session, const char * addr, const void * msg, size_t sz) {
lua_State *L = ud;
//从注册表中获取 _cb 为 key 的值,放入栈中
lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);
int r;
if (msg == NULL) {
lua_pushinteger(L, (int)sz_session);
lua_pushinteger(L, session);
r = lua_pcall(L, 1, 0 , 0);
} else {
lua_pushinteger(L, session);
lua_pushstring(L, addr);
lua_pushlstring(L, msg, sz_session);
r = lua_pcall(L, 2, 0 , 0);
lua_pushlstring(L, msg, sz);
r = lua_pcall(L, 3, 0 , 0);
}
if (r == LUA_OK)
return;
Expand Down Expand Up @@ -53,13 +54,24 @@ static int
_command(lua_State *L) {
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
const char * cmd = luaL_checkstring(L,1);
const char * parm = NULL;
int session = 0;
const char * result;
if (lua_gettop(L) == 2) {
const char * parm = luaL_checkstring(L,2);
result = skynet_command(context, cmd, parm);
} else {
result = skynet_command(context, cmd, NULL);
int top = lua_gettop(L);
if (top == 2) {
if (lua_type(L,2) == LUA_TNUMBER) {
session = lua_tointeger(L,2);
} else {
parm = luaL_checkstring(L,2);
}
} else if (top == 3) {
session = lua_tointeger(L,2);
parm = luaL_checkstring(L,3);
} else if (top != 1) {
luaL_error(L, "skynet.command support only 3 parms (%d)",top);
}

result = skynet_command(context, cmd, session, parm);
if (result) {
lua_pushstring(L, result); // skynet.command 调用的返回结果
return 1;
Expand All @@ -72,21 +84,28 @@ _command(lua_State *L) {
static int
_send(lua_State *L) {
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
int session = -1;
int index = 0;
const char * dest = luaL_checkstring(L,1);
int type = lua_type(L,2);

if (lua_type(L,2) == LUA_TNUMBER) {
session = lua_tointeger(L,2);
++index;
}
int type = lua_type(L,index+2);
if (type == LUA_TSTRING) {
size_t len = 0;
void * msg = (void *)lua_tolstring(L,2,&len);
void * msg = (void *)lua_tolstring(L,index+2,&len);
void * message = malloc(len);
memcpy(message, msg, len);
skynet_send(context, dest, message, len);
skynet_send(context, dest, session , message, len);
} else {
void * msg = lua_touserdata(L,2);
void * msg = lua_touserdata(L,index+2);
if (msg == NULL) {
return luaL_error(L, "skynet.send need userdata or string (%s)", lua_typename(L,type));
}
int size = luaL_checkinteger(L,3);
skynet_send(context, dest, msg, size);
int size = luaL_checkinteger(L,index+3);
skynet_send(context, dest, session, msg, size);
}
return 0;
}
Expand Down
10 changes: 5 additions & 5 deletions src_code/skynet.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ struct skynet_context;

// 从 context->handle 中给 logger 发消息
void skynet_error(struct skynet_context * context, const char *msg, ...);
const char * skynet_command(struct skynet_context * context, const char * cmd , const char * parm);
const char * skynet_command(struct skynet_context * context, const char * cmd , int session, const char * parm);
/*
服务 context->handle 给服务 addr 发消息
addr: 如果以':'开头则后面跟的是 handle ,如果以'.'开头则后面跟的是 handle name
*/
void skynet_send(struct skynet_context * context, const char * addr , void * msg, size_t sz_session);
void skynet_send(struct skynet_context * context, const char * addr , int session, void * msg, size_t sz);

/*
context 是服务指针
ud 是 skynet_callback 设置的第二个参数
uid 是源服务地址
addr 是源服务地址
msg 是消息数据
sz_session 是数据大小或者一个约定号
sz 是数据大小或者一个约定号
*/
typedef void (*skynet_cb)(struct skynet_context * context, void *ud, const char * uid , const void * msg, size_t sz_session);
typedef void (*skynet_cb)(struct skynet_context * context, void *ud, int session, const char * addr , const void * msg, size_t sz);
// 设置 ctx 的 回调函数接口以及传入回调函数的第二个参数
void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb);

Expand Down
1 change: 1 addition & 0 deletions src_code/skynet_error.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ skynet_error(struct skynet_context * context, const char *msg, ...) {
} else {
smsg.source = skynet_context_handle(context);
}
smsg.session = 0;
smsg.data = strdup(tmp);
smsg.sz = len;
skynet_context_push(logger, &smsg);
Expand Down
11 changes: 9 additions & 2 deletions src_code/skynet_harbor.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct hashmap {
struct remote_header {
uint32_t source;
uint32_t destination;
uint32_t session;
};

struct remote {
Expand Down Expand Up @@ -67,6 +68,7 @@ static inline void
buffer_to_remote_header(uint8_t *buffer, struct remote_header *header) {
header->source = buffer[0] | buffer[1] << 8 | buffer[2] << 16 | buffer[3] << 24;
header->destination = buffer[4] | buffer[5] << 8 | buffer[6] << 16 | buffer[7] << 24;
header->session = buffer[8] | buffer[9] << 8 | buffer[10] << 16 | buffer[11] << 24;
}

static inline void
Expand All @@ -79,6 +81,10 @@ remote_header_to_buffer(struct remote_header *header, uint8_t *buffer) {
buffer[5] = (header->destination >>8) & 0xff;
buffer[6] = (header->destination >>16) & 0xff;
buffer[7] = (header->destination >>24) & 0xff;
buffer[8] = (header->session) & 0xff;
buffer[9] = (header->session >>8) & 0xff;
buffer[10] = (header->session >>16) & 0xff;
buffer[11] = (header->session >>24) & 0xff;
}

static uint32_t
Expand Down Expand Up @@ -277,6 +283,7 @@ _register_name(const char *name, uint32_t addr) {
message.destination = addr;
message.message = msg;
skynet_remotemq_push(Z->queue, &message);
send_notice();
}
} else { // 此服务属于本地 skynet 节点,则将之前堆积的消息放入本地消息队列
while (!skynet_mq_pop(queue, &msg)) {
Expand Down Expand Up @@ -475,7 +482,7 @@ remote_socket_send(void * socket, struct skynet_remote_message *msg) {
rh.source = msg->message.source;
rh.destination = msg->destination;
zmq_msg_t part;
zmq_msg_init_size(&part,8);
zmq_msg_init_size(&part,sizeof(struct remote_header));
uint8_t * buffer = zmq_msg_data(&part);
remote_header_to_buffer(&rh,buffer);
zmq_send(socket, &part, ZMQ_SNDMORE);
Expand All @@ -498,7 +505,7 @@ _remote_recv() {
int rc = zmq_recv(Z->zmq_local,&header,0);
_report_zmq_error(rc);
size_t s = zmq_msg_size(&header);
if (s!=8) {
if (s!=sizeof(struct remote_header)) {
// s should be 0
if (s>0) {
char tmp[s+1];
Expand Down
4 changes: 2 additions & 2 deletions src_code/skynet_logger.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ logger_release(struct logger * inst) {

// uid: 源 handle
static void
_logger(struct skynet_context * context, void *ud, const char * uid, const void * msg, size_t sz) {
_logger(struct skynet_context * context, void *ud, int session, const char * uid, const void * msg, size_t sz) {
struct logger * inst = ud;
fprintf(inst->handle, "[%s] ",uid);
fwrite(msg, sz , 1, inst->handle); // size_t fwrite(const void* buffer, size_t size, size_t count, FILE* stream);
Expand All @@ -47,7 +47,7 @@ logger_init(struct logger * inst, struct skynet_context *ctx, const char * parm)
}
if (inst->handle) {
skynet_callback(ctx, inst, _logger);
skynet_command(ctx, "REG", ".logger");
skynet_command(ctx, "REG", 0, ".logger");
return 0;
}
return 1;
Expand Down
1 change: 1 addition & 0 deletions src_code/skynet_mq.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

struct skynet_message {
uint32_t source; // 源的 handle
int session;
void * data;
size_t sz;
};
Expand Down
34 changes: 26 additions & 8 deletions src_code/skynet_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct skynet_context {
char result[32]; // 不同的命令会设置不同的返回值
void * cb_ud; // 回调函数的第二个参数
skynet_cb cb; // 回调函数指针,定义在 skynet.h : typedef void (*skynet_cb)(struct skynet_context * context, void *ud, const char * uid , const void * msg, size_t sz_session);
int session_id;
int in_global_queue; // 当前服务的消息队列是否已经在全局消息队列的标识
struct message_queue *queue;
};
Expand Down Expand Up @@ -86,6 +87,7 @@ skynet_context_new(const char * name, const char *parm) {
ctx->cb = NULL;
ctx->cb_ud = NULL;
ctx->in_global_queue = 0;
ctx->session_id = 0;
char * uid = ctx->handle_name;
uid[0] = ':';
_id_to_hex(uid+1, ctx->handle); // 应该放在后面一句 ctx->handle 赋值语句后面,后面的版本修复了
Expand All @@ -104,6 +106,17 @@ skynet_context_new(const char * name, const char *parm) {
}
}

static int
_new_session(struct skynet_context *ctx) {
int session = ++ctx->session_id;
if (session < 0) {
ctx->session_id = 1;
return 1;
}

return session;
}

// 增加 ctx 的引用计数
void
skynet_context_grab(struct skynet_context *ctx) {
Expand Down Expand Up @@ -136,17 +149,17 @@ static void
_dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
// source 等于 SKYNET_SYSTEM_TIMER 表示源于系统,见 skynet_timeout
if (msg->source == SKYNET_SYSTEM_TIMER) {
ctx->cb(ctx, ctx->cb_ud, NULL, msg->data, msg->sz);
ctx->cb(ctx, ctx->cb_ud, msg->session, NULL, msg->data, msg->sz);
} else {
char tmp[10];
tmp[0] = ':';
_id_to_hex(tmp+1, msg->source);
if (skynet_harbor_message_isremote(msg->source)) {
void * data = skynet_harbor_message_open(msg);
ctx->cb(ctx, ctx->cb_ud, tmp, data, msg->sz);
ctx->cb(ctx, ctx->cb_ud, msg->session, tmp, data, msg->sz);
skynet_harbor_message_close(msg);
} else {
ctx->cb(ctx, ctx->cb_ud, tmp, msg->data, msg->sz);
ctx->cb(ctx, ctx->cb_ud, msg->session, tmp, msg->data, msg->sz);
}

free(msg->data);
Expand Down Expand Up @@ -214,15 +227,12 @@ skynet_context_message_dispatch(void) {
}

const char *
skynet_command(struct skynet_context * context, const char * cmd , const char * parm) {
skynet_command(struct skynet_context * context, const char * cmd , int session, const char * parm) {
if (strcmp(cmd,"TIMEOUT") == 0) { // 添加一个定时器消息,自己给自己发消息
//time:session
char * session_ptr = NULL;
//strtol会将parm按照10指定的基数转换然后返回。遇到的第一个非法值会将地址赋值给第二个参数
int ti = strtol(parm, &session_ptr, 10);
char sep = session_ptr[0];
assert(sep == ':');
int session = strtol(session_ptr+1, NULL, 10);
skynet_timeout(context->handle, ti, session);
return NULL;
}
Expand Down Expand Up @@ -275,7 +285,10 @@ skynet_command(struct skynet_context * context, const char * cmd , const char *
addr: 如果以':'开头则后面跟的是 handle ,如果以'.'开头则后面跟的是 handle name
*/
void
skynet_send(struct skynet_context * context, const char * addr , void * msg, size_t sz) {
skynet_send(struct skynet_context * context, const char * addr , int session, void * msg, size_t sz) {
if (session < 0) {
session = _new_session(context);
}
uint32_t des = 0;
if (addr[0] == ':') {
des = strtol(addr+1, NULL, 16);
Expand All @@ -289,6 +302,7 @@ skynet_send(struct skynet_context * context, const char * addr , void * msg, siz
} else {
struct skynet_message smsg;
smsg.source = context->handle;
smsg.session = session;
smsg.data = msg;
smsg.sz = sz;
skynet_harbor_send(addr, 0, &smsg);
Expand All @@ -298,6 +312,7 @@ skynet_send(struct skynet_context * context, const char * addr , void * msg, siz
assert(des > 0);
struct skynet_message smsg;
smsg.source = context->handle;
smsg.session = session;
smsg.data = msg;
smsg.sz = sz;

Expand Down Expand Up @@ -338,6 +353,9 @@ skynet_context_push(uint32_t handle, struct skynet_message *message) {
if (ctx == NULL) {
return -1;
}
if (message->session < 0) {
message->session = _new_session(ctx);
}
skynet_mq_push(ctx->queue, message);
if (__sync_lock_test_and_set(&ctx->in_global_queue,1) == 0) { // 将 ctx->in_global_queue 设为 1 并返回 ctx->in_global_queue 操作之前的值。
skynet_globalmq_push(ctx->queue);
Expand Down
Loading

0 comments on commit 7ea33b7

Please sign in to comment.