Skip to content

Commit

Permalink
code copy
Browse files Browse the repository at this point in the history
8bcc7dce016dfeb04f7475dc7eb3a9cba8951935
bugfix: use negative session id for request message
  • Loading branch information
tanbin committed Apr 24, 2018
1 parent cefd906 commit 31d244a
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 49 deletions.
2 changes: 1 addition & 1 deletion src_code/agent.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ skynet.dispatch(function(msg,session)
if session == 0 then
print("client command",msg)
local result = skynet.call("SIMPLEDB",msg)
skynet.send(client,0,result)
skynet.send(client, result)
else
print("server command",msg)
if msg == "CLOSE" then
Expand Down
8 changes: 4 additions & 4 deletions src_code/gate/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ _cb(struct skynet_context * ctx, void * ud, int session, const char * uid, const
struct mread_pool * m = g->pool;
int connection_id = mread_poll(m,100); // timeout : 100ms
if (connection_id < 0) {
skynet_command(ctx, "TIMEOUT", 0, "1");
skynet_command(ctx, "TIMEOUT", "1");
} else {
int id = g->map[connection_id].uid;
if (id == 0) {
Expand Down Expand Up @@ -215,7 +215,7 @@ _cb(struct skynet_context * ctx, void * ud, int session, const char * uid, const
_forward(ctx, g, id, data, *plen); // 将接收到的数据发往 watchdog 或者 agent
mread_yield(m);
_break:
skynet_command(ctx, "TIMEOUT",0,"0");
skynet_command(ctx, "TIMEOUT", "0");
}
}

Expand Down Expand Up @@ -255,8 +255,8 @@ gate_init(struct gate *g , struct skynet_context * ctx, char * parm) {
}

skynet_callback(ctx,g,_cb);
skynet_command(ctx,"REG",0,"gate");
skynet_command(ctx,"TIMEOUT",0, "0");
skynet_command(ctx,"REG", "gate");
skynet_command(ctx,"TIMEOUT", "0");

return 0;
}
2 changes: 1 addition & 1 deletion src_code/globallog.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
local skynet = require "skynet"

skynet.dispatch(function(message, session , from)
print("[GLOBALLOG]",session, from,message)
print("[GLOBALLOG]", from,message)
end)

skynet.register "LOG"
21 changes: 5 additions & 16 deletions src_code/lua-skynet.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,13 @@ static int
_command(lua_State *L) {
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
const char * cmd = luaL_checkstring(L,1);
const char * parm = NULL;
int session = 0;
const char * result;
int top = lua_gettop(L);
if (top == 2) {
if (lua_type(L,2) == LUA_TNUMBER) {
session = lua_tointeger(L,2);
} else {
parm = luaL_checkstring(L,2);
}
} else if (top == 3) {
session = lua_tointeger(L,2);
parm = luaL_checkstring(L,3);
} else if (top != 1) {
luaL_error(L, "skynet.command support only 3 parms (%d)",top);
const char * parm = NULL;
if (lua_gettop(L) == 2) {
parm = luaL_checkstring(L,2);
}

result = skynet_command(context, cmd, session, parm);
result = skynet_command(context, cmd, parm);
if (result) {
lua_pushstring(L, result); // skynet.command 调用的返回结果
return 1;
Expand All @@ -97,7 +86,7 @@ _command(lua_State *L) {
static int
_send(lua_State *L) {
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
int session = -1;
int session = 0;
int index = 0;
const char * dest = luaL_checkstring(L,1);

Expand Down
2 changes: 1 addition & 1 deletion src_code/skynet.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ struct skynet_context;

// 从 context->handle 中给 logger 发消息
void skynet_error(struct skynet_context * context, const char *msg, ...);
const char * skynet_command(struct skynet_context * context, const char * cmd , int session, const char * parm);
const char * skynet_command(struct skynet_context * context, const char * cmd , const char * parm);
/*
服务 context->handle 给服务 addr 发消息
addr: 如果以':'开头则后面跟的是 handle ,如果以'.'开头则后面跟的是 handle name
Expand Down
10 changes: 6 additions & 4 deletions src_code/skynet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,25 @@ end

function skynet.dispatch(f)
c.callback(function(session, address , message)
local co = session_id_coroutine[session]
if co == nil then
if session <= 0 then
session = - session
co = coroutine.create(f)
session_coroutine_id[co] = session
session_coroutine_address[co] = address
suspend(co, coroutine.resume(co, message, session, address))
else
local co = session_id_coroutine[session]
assert(co, session)
session_id_coroutine[session] = nil
suspend(co, coroutine.resume(co, message))
end
end)
end

function skynet.start(f)
c.command("TIMEOUT",0,"0")
local session = c.command("TIMEOUT","0")
local co = coroutine.create(f)
session_id_coroutine[0] = co
session_id_coroutine[tonumber(session)] = co
end

return skynet
2 changes: 1 addition & 1 deletion src_code/skynet_logger.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ logger_init(struct logger * inst, struct skynet_context *ctx, const char * parm)
}
if (inst->handle) {
skynet_callback(ctx, inst, _logger);
skynet_command(ctx, "REG", 0, ".logger");
skynet_command(ctx, "REG", ".logger");
return 0;
}
return 1;
Expand Down
12 changes: 7 additions & 5 deletions src_code/skynet_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ skynet_context_message_dispatch(void) {
session : 约定的 session
*/
const char *
skynet_command(struct skynet_context * context, const char * cmd , int session, const char * parm) {
skynet_command(struct skynet_context * context, const char * cmd , const char * parm) {
if (strcmp(cmd,"TIMEOUT") == 0) { // 添加一个定时器消息,自己给自己发消息
//time:session
// 上面的 time:session 是作者旧的注释,添加 session 之后就变成错误的注释了
Expand All @@ -252,9 +252,10 @@ skynet_command(struct skynet_context * context, const char * cmd , int session,
char * session_ptr = NULL;
//strtol会将parm按照10指定的基数转换然后返回。遇到的第一个非法值会将地址赋值给第二个参数
int ti = strtol(parm, &session_ptr, 10);
session = skynet_timeout(context->handle, ti, session);
int session = skynet_context_newsession(context);
if (session < 0)
return NULL;
skynet_timeout(context->handle, ti, session);
sprintf(context->result, "%d", session);
return context->result;
}
Expand Down Expand Up @@ -323,8 +324,10 @@ skynet_command(struct skynet_context * context, const char * cmd , int session,
*/
int
skynet_send(struct skynet_context * context, const char * addr , int session, void * msg, size_t sz) {
int session_id = session;
if (session < 0) {
session = skynet_context_newsession(context);
session_id = - session;
}
uint32_t des = 0;
if (addr[0] == ':') {
Expand All @@ -339,7 +342,7 @@ skynet_send(struct skynet_context * context, const char * addr , int session, vo
} else {
struct skynet_message smsg;
smsg.source = context->handle;
smsg.session = session;
smsg.session = session_id;
smsg.data = msg;
smsg.sz = sz;
skynet_harbor_send(addr, 0, &smsg);
Expand All @@ -349,7 +352,7 @@ skynet_send(struct skynet_context * context, const char * addr , int session, vo
assert(des > 0);
struct skynet_message smsg;
smsg.source = context->handle;
smsg.session = session;
smsg.session = session_id;
smsg.data = msg;
smsg.sz = sz;

Expand Down Expand Up @@ -391,7 +394,6 @@ skynet_context_push(uint32_t handle, struct skynet_message *message) {
if (ctx == NULL) {
return -1;
}
assert(message->session >= 0);
skynet_mq_push(ctx->queue, message);
if (__sync_lock_test_and_set(&ctx->in_global_queue,1) == 0) { // 将 ctx->in_global_queue 设为 1 并返回 ctx->in_global_queue 操作之前的值。
skynet_globalmq_push(ctx->queue);
Expand Down
9 changes: 0 additions & 9 deletions src_code/skynet_timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,6 @@ timer_create_timer()
*/
int
skynet_timeout(uint32_t handle, int time, int session) {
if (session < 0) {
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
return -1;
}
session = skynet_context_newsession(ctx);
skynet_context_release(ctx);
}

// time 为0属于特例,不进入 timer 队列,而是直接进入消息队列
if (time == 0) {
struct skynet_message message;
Expand Down
14 changes: 7 additions & 7 deletions src_code/watchdog.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,29 @@ local agent_all = {}
function command:open(parm)
local fd,addr = string.match(parm,"(%d+) ([^%s]+)")
fd = tonumber(fd)
skynet.send("LOG", 0, string.format("%d %d %s",self,fd,addr))
skynet.send("LOG", string.format("%d %d %s",self,fd,addr))
local client = skynet.launch("client",fd)
print("client",client)
skynet.send("LOG", "client " .. client)
-- 这里启动一个 snlua 服务,返回服务地址。并且因为 open 命令处理的是用户连接,所以经常在网上看到的说每个用户接入会启动一个 agent 。
local agent = skynet.launch("snlua","agent.lua",client)
if agent then
agent_all[self] = agent
skynet.send("gate",0, "forward ".. self .. " " .. agent)
skynet.send("gate", "forward ".. self .. " " .. agent)
end
end

function command:close()
skynet.send("LOG",0, string.format("close %d",self))
skynet.send(agent_all[self],1,"CLOSE")
skynet.send("LOG", string.format("close %d",self))
skynet.send(agent_all[self], -1,"CLOSE")
agent_all[self] = nil
end

function command:data(data)
local agent = agent_all[self]
if agent then
skynet.send(agent,0,data)
skynet.send(agent, data)
else
skynet.send("LOG",0,string.format("data %d size=%d",self,#data))
skynet.send("LOG", string.format("data %d size=%d",self,#data))
end
end

Expand Down

0 comments on commit 31d244a

Please sign in to comment.