From c7c90e82f1429fbe6333a4b409519cd4ba9e5799 Mon Sep 17 00:00:00 2001 From: actboy168 Date: Sun, 12 May 2024 17:42:16 +0800 Subject: [PATCH 01/11] ltask init --- .gitmodules | 3 ++ 3rd/bee.lua | 2 +- 3rd/ltask | 1 + main.lua | 5 +++ make.lua | 44 +++++++++++++++++++++++++++ make/modules.cpp | 5 ++- script/ltask/.gitignore | 4 +++ script/ltask/init.lua | 54 +++++++++++++++++++++++++++++++++ script/ltask/service/logger.lua | 34 +++++++++++++++++++++ script/ltask/service/main.lua | 2 ++ 10 files changed, 152 insertions(+), 2 deletions(-) create mode 160000 3rd/ltask create mode 100644 script/ltask/.gitignore create mode 100644 script/ltask/init.lua create mode 100644 script/ltask/service/logger.lua create mode 100644 script/ltask/service/main.lua diff --git a/.gitmodules b/.gitmodules index 32d827327..26603632e 100644 --- a/.gitmodules +++ b/.gitmodules @@ -61,3 +61,6 @@ [submodule "meta/3rd/luv"] path = meta/3rd/luv url = https://github.com/LuaCATS/luv.git +[submodule "3rd/ltask"] + path = 3rd/ltask + url = https://github.com/cloudwu/ltask diff --git a/3rd/bee.lua b/3rd/bee.lua index b4fda4e08..cdccd083d 160000 --- a/3rd/bee.lua +++ b/3rd/bee.lua @@ -1 +1 @@ -Subproject commit b4fda4e0865d4369c845015ab2ad45573d6ba245 +Subproject commit cdccd083deb4deaa1c26d7fb4fb5ccde955103aa diff --git a/3rd/ltask b/3rd/ltask new file mode 160000 index 000000000..79a3f1bd0 --- /dev/null +++ b/3rd/ltask @@ -0,0 +1 @@ +Subproject commit 79a3f1bd03ed3bd59a246daaa3398819efee08dd diff --git a/main.lua b/main.lua index 5dfbab362..78627ab9a 100644 --- a/main.lua +++ b/main.lua @@ -1,3 +1,8 @@ +if not package.loaded["ltask"] then + require "ltask.init" + return +end + local fs = require 'bee.filesystem' local util = require 'utility' local version = require 'version' diff --git a/make.lua b/make.lua index a0dcb4513..83749a0e4 100644 --- a/make.lua +++ b/make.lua @@ -21,6 +21,49 @@ require "make.detect_platform" lm:import "3rd/bee.lua/make.lua" lm:import "make/code_format.lua" +lm:copy 'ltask-script' { + inputs = { + "3rd/ltask/lualib/bootstrap.lua", + "3rd/ltask/lualib/service.lua", + "3rd/ltask/service/timer.lua", + "3rd/ltask/service/root.lua", + }, + outputs = { + "script/ltask/bootstrap.lua", + "script/ltask/service.lua", + "script/ltask/service/timer.lua", + "script/ltask/service/root.lua", + }, +} + +lm:source_set 'ltask' { + deps = "ltask-script", + rootdir = '3rd', + c = "c11", + includes = "bee.lua/3rd/lua", + sources = { + "ltask/src/*.c", + "!ltask/src/lua-seri.c", + }, + defines = { + --"DEBUGLOG", + lm.mode=="debug" and "DEBUGTHREADNAME", + }, + windows = { + defines = { + "_WIN32_WINNT=0x0601" + }, + }, + msvc = { + flags = { + "/experimental:c11atomics" + }, + }, + gcc = { + defines = "_XOPEN_SOURCE=600", + } +} + lm:source_set 'lpeglabel' { rootdir = '3rd', includes = "bee.lua/3rd/lua", @@ -36,6 +79,7 @@ lm:executable "lua-language-server" { "source_lua", "source_bootstrap", "lpeglabel", + "ltask", includeCodeFormat and "code_format" or nil, }, includes = { diff --git a/make/modules.cpp b/make/modules.cpp index 1dd4a3704..69cbb4f3a 100644 --- a/make/modules.cpp +++ b/make/modules.cpp @@ -1,7 +1,10 @@ #include extern "C" int luaopen_lpeglabel (lua_State *L); -static ::bee::lua::callfunc _init(::bee::lua::register_module, "lpeglabel", luaopen_lpeglabel); +static ::bee::lua::callfunc _init_lpeg(::bee::lua::register_module, "lpeglabel", luaopen_lpeglabel); + +extern "C" int luaopen_ltask_bootstrap (lua_State *L); +static ::bee::lua::callfunc _init_ltask(::bee::lua::register_module, "ltask.bootstrap", luaopen_ltask_bootstrap); #ifdef CODE_FORMAT extern "C" int luaopen_code_format(lua_State *L); diff --git a/script/ltask/.gitignore b/script/ltask/.gitignore new file mode 100644 index 000000000..97599c6cf --- /dev/null +++ b/script/ltask/.gitignore @@ -0,0 +1,4 @@ +/bootstrap.lua +/service.lua +/service/timer.lua +/service/root.lua diff --git a/script/ltask/init.lua b/script/ltask/init.lua new file mode 100644 index 000000000..e16e7f3d4 --- /dev/null +++ b/script/ltask/init.lua @@ -0,0 +1,54 @@ +local boot = require "ltask.bootstrap" + +local function searchpath(name) + return assert(package.searchpath(name, "script/ltask/?.lua")) +end + +local function readall(path) + local f = assert(io.open(path)) + return f:read "a" +end + +local servicepath = searchpath "service" + +local root_config = { + bootstrap = { + { + name = "timer", + unique = true, + }, + { + name = "logger", + unique = true, + }, + { + name = "main", + args = { arg }, + }, + }, + service_source = readall(servicepath), + service_chunkname = "@" .. servicepath, + initfunc = ([=[ +local name = ... +package.path = [[${lua_path}]] +package.cpath = [[${lua_cpath}]] +local filename, err = package.searchpath(name, "${service_path}") +if not filename then + return nil, err +end +return loadfile(filename) +]=]):gsub("%$%{([^}]*)%}", { + lua_path = package.path, + lua_cpath = package.cpath, + service_path = "script/ltask/service/?.lua", + }), +} + +boot.init_socket() +local bootstrap = dofile(searchpath "bootstrap") +local ctx = bootstrap.start { + core = {}, + root = root_config, + root_initfunc = root_config.initfunc, +} +bootstrap.wait(ctx) diff --git a/script/ltask/service/logger.lua b/script/ltask/service/logger.lua new file mode 100644 index 000000000..ffae2dbc8 --- /dev/null +++ b/script/ltask/service/logger.lua @@ -0,0 +1,34 @@ +local ltask = require "ltask" + +local S = {} + +local function writelog() + local flush + while true do + local ti, _, msg, sz = ltask.poplog() + if ti == nil then + if flush then + io.flush() + end + break + end + local tsec = ti // 100 + local msec = ti % 100 + local level, message = ltask.unpack_remove(msg, sz) + io.write(string.format("[%s.%02d][%-5s]%s\n", os.date("%Y-%m-%d %H:%M:%S", tsec), msec, level:upper(), message)) + flush = true + end +end + +ltask.fork(function() + while true do + writelog() + ltask.sleep(100) + end +end) + +function S.quit() + writelog() +end + +return S diff --git a/script/ltask/service/main.lua b/script/ltask/service/main.lua new file mode 100644 index 000000000..65db86f19 --- /dev/null +++ b/script/ltask/service/main.lua @@ -0,0 +1,2 @@ +arg = ... +dofile "main.lua" From 2fb44d5e05f91faa912818c9fd14b98337258380 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=80=E8=90=8C=E5=B0=8F=E6=B1=90?= Date: Mon, 13 May 2024 11:40:33 +0800 Subject: [PATCH 02/11] =?UTF-8?q?=E6=95=B4=E7=90=86=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=EF=BC=8C=E6=9A=82=E6=97=B6=E5=B1=8F=E8=94=BD=E6=8E=89logger?= =?UTF-8?q?=EF=BC=8C=E6=99=9A=E7=82=B9=E5=86=8D=E6=8E=A5=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.lua | 2 +- script/ltask/init.lua | 4 ---- script/ltask/service/logger.lua | 2 +- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/main.lua b/main.lua index 78627ab9a..128fbc5dc 100644 --- a/main.lua +++ b/main.lua @@ -1,5 +1,5 @@ if not package.loaded["ltask"] then - require "ltask.init" + require "ltask" return end diff --git a/script/ltask/init.lua b/script/ltask/init.lua index e16e7f3d4..c15c2275f 100644 --- a/script/ltask/init.lua +++ b/script/ltask/init.lua @@ -17,10 +17,6 @@ local root_config = { name = "timer", unique = true, }, - { - name = "logger", - unique = true, - }, { name = "main", args = { arg }, diff --git a/script/ltask/service/logger.lua b/script/ltask/service/logger.lua index ffae2dbc8..2b058df76 100644 --- a/script/ltask/service/logger.lua +++ b/script/ltask/service/logger.lua @@ -15,7 +15,7 @@ local function writelog() local tsec = ti // 100 local msec = ti % 100 local level, message = ltask.unpack_remove(msg, sz) - io.write(string.format("[%s.%02d][%-5s]%s\n", os.date("%Y-%m-%d %H:%M:%S", tsec), msec, level:upper(), message)) + --io.write(string.format("[%s.%02d][%-5s]%s\n", os.date("%Y-%m-%d %H:%M:%S", tsec), msec, level:upper(), message)) flush = true end end From 7e2a4979b12cd9e788aab493e11a0c1310e2d770 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=80=E8=90=8C=E5=B0=8F=E6=B1=90?= Date: Mon, 13 May 2024 17:26:42 +0800 Subject: [PATCH 03/11] =?UTF-8?q?=E6=9A=82=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .luarc.json | 3 +- main.lua | 9 +- script/ltask/.gitignore | 4 - script/ltask/service/main.lua | 2 - script/{ltask => ltask2}/init.lua | 24 +- script/ltask2/lualib/bootstrap.lua | 42 + script/ltask2/lualib/logger.lua | 37 + script/ltask2/lualib/service.lua | 890 ++++++++++++++++++++ script/{ltask => ltask2}/service/logger.lua | 0 script/ltask2/service/main.lua | 10 + 10 files changed, 1000 insertions(+), 21 deletions(-) delete mode 100644 script/ltask/.gitignore delete mode 100644 script/ltask/service/main.lua rename script/{ltask => ltask2}/init.lua (59%) create mode 100644 script/ltask2/lualib/bootstrap.lua create mode 100644 script/ltask2/lualib/logger.lua create mode 100644 script/ltask2/lualib/service.lua rename script/{ltask => ltask2}/service/logger.lua (100%) create mode 100644 script/ltask2/service/main.lua diff --git a/.luarc.json b/.luarc.json index fc02379f9..b9bb65344 100644 --- a/.luarc.json +++ b/.luarc.json @@ -40,7 +40,8 @@ "/libs/", "/3rd", "/.vscode", - "/meta" + "/meta", + "script/ltask/lualib" ], "checkThirdParty": false }, diff --git a/main.lua b/main.lua index 128fbc5dc..61ad0cab1 100644 --- a/main.lua +++ b/main.lua @@ -1,8 +1,3 @@ -if not package.loaded["ltask"] then - require "ltask" - return -end - local fs = require 'bee.filesystem' local util = require 'utility' local version = require 'version' @@ -82,6 +77,4 @@ xpcall(dofile, log.debug, (ROOT / 'debugger.lua'):string()) require 'cli' -local _, service = xpcall(require, log.error, 'service') - -service.start() +xpcall(require, log.error, 'ltask2') diff --git a/script/ltask/.gitignore b/script/ltask/.gitignore deleted file mode 100644 index 97599c6cf..000000000 --- a/script/ltask/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -/bootstrap.lua -/service.lua -/service/timer.lua -/service/root.lua diff --git a/script/ltask/service/main.lua b/script/ltask/service/main.lua deleted file mode 100644 index 65db86f19..000000000 --- a/script/ltask/service/main.lua +++ /dev/null @@ -1,2 +0,0 @@ -arg = ... -dofile "main.lua" diff --git a/script/ltask/init.lua b/script/ltask2/init.lua similarity index 59% rename from script/ltask/init.lua rename to script/ltask2/init.lua index c15c2275f..8a4db9160 100644 --- a/script/ltask/init.lua +++ b/script/ltask2/init.lua @@ -1,7 +1,7 @@ local boot = require "ltask.bootstrap" local function searchpath(name) - return assert(package.searchpath(name, "script/ltask/?.lua")) + return assert(package.searchpath(name, (ROOT / 'script/ltask2/lualib/?.lua'):string())) end local function readall(path) @@ -17,9 +17,20 @@ local root_config = { name = "timer", unique = true, }, + { + name = "logger", + unique = true, + args = { + LOGPATH = LOGPATH, + } + }, { name = "main", - args = { arg }, + args = { + ROOT = ROOT:string(), + LOGPATH = LOGPATH, + METAPATH = METAPATH, + }, }, }, service_source = readall(servicepath), @@ -28,23 +39,24 @@ local root_config = { local name = ... package.path = [[${lua_path}]] package.cpath = [[${lua_cpath}]] -local filename, err = package.searchpath(name, "${service_path}") +local filename, err = package.searchpath(name, package.path) if not filename then - return nil, err + return nil, err end return loadfile(filename) ]=]):gsub("%$%{([^}]*)%}", { lua_path = package.path, lua_cpath = package.cpath, - service_path = "script/ltask/service/?.lua", + service_path = (ROOT / "script/ltask2/service/?.lua"):string(), }), } boot.init_socket() -local bootstrap = dofile(searchpath "bootstrap") +local bootstrap = require 'ltask2.lualib.bootstrap' local ctx = bootstrap.start { core = {}, root = root_config, root_initfunc = root_config.initfunc, } + bootstrap.wait(ctx) diff --git a/script/ltask2/lualib/bootstrap.lua b/script/ltask2/lualib/bootstrap.lua new file mode 100644 index 000000000..12a984ca0 --- /dev/null +++ b/script/ltask2/lualib/bootstrap.lua @@ -0,0 +1,42 @@ +local boot = require "ltask.bootstrap" + +local SERVICE_ROOT = 1 +local MESSSAGE_SYSTEM = 0 + +local function bootstrap_root(initfunc, config) + local sid = assert(boot.new_service("root", config.service_source, config.service_chunkname, SERVICE_ROOT)) + assert(sid == SERVICE_ROOT) + boot.init_root(SERVICE_ROOT) + -- send init message to root service + local init_msg, sz = boot.pack("init", { + initfunc = initfunc, + name = "root", + args = {config} + }) + -- self bootstrap + boot.post_message { + from = SERVICE_ROOT, + to = SERVICE_ROOT, + session = 1, -- 1 for root init + type = MESSSAGE_SYSTEM, + message = init_msg, + size = sz, + } +end + +local function start(config) + boot.init(config.core) + boot.init_timer() + bootstrap_root(config.root_initfunc, config.root) + return boot.run(config.mainthread) +end + +local function wait(ctx) + boot.wait(ctx) + boot.deinit() +end + +return { + start = start, + wait = wait, +} diff --git a/script/ltask2/lualib/logger.lua b/script/ltask2/lualib/logger.lua new file mode 100644 index 000000000..33771c1cf --- /dev/null +++ b/script/ltask2/lualib/logger.lua @@ -0,0 +1,37 @@ +local ltask = require "ltask" +local logpath = (...).LOGPATH + +local S = {} + +local logfile = io.open(logpath .. '/ltask.log', 'w+b') +if logfile then + logfile:setvbuf("no") +end + +local function writelog() + while true do + local ti, _, msg, sz = ltask.poplog() + if ti == nil then + break + end + local tsec = ti // 100 + local msec = ti % 100 + local level, message = ltask.unpack_remove(msg, sz) + if logfile then + logfile:write(string.format("[%s.%02d][%-5s]%s\n", os.date("%Y-%m-%d %H:%M:%S", tsec), msec, level:upper(), message)) + end + end +end + +ltask.fork(function() + while true do + writelog() + ltask.sleep(100) + end +end) + +function S.quit() + writelog() +end + +return S diff --git a/script/ltask2/lualib/service.lua b/script/ltask2/lualib/service.lua new file mode 100644 index 000000000..aca61878b --- /dev/null +++ b/script/ltask2/lualib/service.lua @@ -0,0 +1,890 @@ +local SERVICE_ROOT = 1 + +local MESSAGE_SYSTEM = 0 +local MESSAGE_REQUEST = 1 +local MESSAGE_RESPONSE = 2 +local MESSAGE_ERROR = 3 +local MESSAGE_SIGNAL = 4 +local MESSAGE_IDLE = 5 + +local RECEIPT_DONE = 1 +local RECEIPT_ERROR = 2 +local RECEIPT_BLOCK = 3 + +local SESSION_SEND_MESSAGE = 0 + +local ltask = require "ltask" + +local CURRENT_SERVICE = ltask.self() +local CURRENT_SERVICE_LABEL = ltask.label() + +ltask.log = {} +for _, level in ipairs {"info","error"} do + ltask.log[level] = function (...) + local t = table.pack(...) + local str = {} + for i = 1, t.n do + str[#str+1] = tostring(t[i]) + end + local message = string.format("( %s ) %s", CURRENT_SERVICE_LABEL, table.concat(str, "\t")) + ltask.pushlog(ltask.pack(level, message)) + end +end + +ltask.log.info ( "startup " .. CURRENT_SERVICE ) + +local yield_service = coroutine.yield +local yield_session = coroutine.yield +local function continue_session() + coroutine.yield(true) +end + +local running_thread + +local session_coroutine_suspend_lookup = {} +local session_coroutine_response = {} +local session_coroutine_address = {} +local session_id = 2 -- 1 is reserved for root + +local session_waiting = {} +local wakeup_queue = {} + +----- error handling ------ + +local error_mt = {} +function error_mt:__tostring() + return table.concat(self, "\n") +end + +local traceback, create_traceback; do + local selfsource = debug.getinfo(1, "S").source + local function getshortsrc(source) + local maxlen = 60 + local type = source:byte(1) + if type == 61 --[['=']] then + if #source <= maxlen then + return source:sub(2) + else + return source:sub(2, maxlen) + end + elseif type == 64 --[['@']] then + if #source <= maxlen then + return source:sub(2) + else + return '...' .. source:sub(#source - maxlen + 5) + end + else + local nl = source:find '\n' + local maxlen = maxlen - 15 + if #source < maxlen and nl == nil then + return ('[string "%s"]'):format(source) + else + local n = #source + if nl ~= nil then + n = nl - 1 + end + if n > maxlen then + n = maxlen + end + return ('[string "%s..."]'):format(source:sub(1, n)) + end + end + end + local function findfield(t, f, level) + if level == 0 or type(t) ~= 'table' then + return + end + for key, value in pairs(t) do + if type(key) == 'string' and not (level == 2 and key == '_G') then + if value == f then + return key + end + local res = findfield(value, f, level - 1) + if res then + return key .. '.' .. res + end + end + end + end + local function pushglobalfuncname(f) + return findfield(_G, f, 2) + end + local function pushfuncname(info) + local funcname = pushglobalfuncname(info.func) + if funcname then + return ("function '%s'"):format(funcname) + elseif info.namewhat ~= '' then + return ("%s '%s'"):format(info.namewhat, info.name) + elseif info.what == 'main' then + return 'main chunk' + elseif info.what ~= 'C' then + return ('function <%s:%d>'):format(getshortsrc(info.source), info.linedefined) + else + return '?' + end + end + function create_traceback(co, level) + local s = {} + local depth = level or 0 + while true do + local info = co and debug.getinfo(co, depth, "Slntf") + or debug.getinfo(depth, "Slntf") + if not info then + s[#s] = nil + break + end + if #s > 0 and selfsource == info.source then + goto continue + end + s[#s + 1] = ('\t%s:'):format(getshortsrc(info.source)) + if info.currentline > 0 then + s[#s + 1] = ('%d:'):format(info.currentline) + end + s[#s + 1] = " in " + s[#s + 1] = pushfuncname(info) + if info.istailcall then + s[#s + 1] = '\n\t(...tail calls...)' + end + s[#s + 1] = "\n" + ::continue:: + depth = depth + 1 + end + return table.concat(s) + end + local function replacewhere(co, message, level) + local f, l = message:find ':[-%d]+: ' + if f and l then + local where_path = message:sub(1, f - 1) + local where_line = tonumber(message:sub(f + 1, l - 2)) + local where_src = "@"..where_path + message = message:sub(l + 1) + local depth = level or 0 + while true do + local info = debug.getinfo(co, depth, "Sl") + if not info then + break + end + if info.what ~= 'C' and info.source == where_src and where_line == info.currentline then + return message, depth + end + depth = depth + 1 + end + end + return message, level + end + function traceback(errobj, where) + if type(where) == "string" then + if type(errobj) ~= "table" then + local message = tostring(errobj) + local level = 0 + errobj = { + message, + "stack traceback:", + ("\t( service:%d )"):format(CURRENT_SERVICE), + where, + level = level, + } + end + errobj[#errobj+1] = ("\t( service:%d )"):format(CURRENT_SERVICE) + errobj[#errobj+1] = where + setmetatable(errobj, error_mt) + return errobj + end + local co, level + if type(where) == "thread" then + co = where + else + co = running_thread + level = where + end + if type(errobj) ~= "table" then + local message + message, level = replacewhere(co, tostring(errobj), level) + errobj = { + message, + "stack traceback:", + level = level, + } + end + errobj[#errobj+1] = ("\t( service:%d )"):format(CURRENT_SERVICE) + errobj[#errobj+1] = create_traceback(co, level or errobj.level) + setmetatable(errobj, error_mt) + return errobj + end +end + +local function rethrow_error(level, errobj) + if type(errobj) ~= "table" then + error(errobj, level + 1) + else + errobj.level = level + 1 + setmetatable(errobj, error_mt) + error(errobj) + end +end + +function ltask.post_message(addr, session, type, msg, sz) + ltask.send_message(addr, session, type, msg, sz) + continue_session() + return ltask.message_receipt() +end + +local function send_blocked_message(addr, session, type, ...) + local msg, sz = ltask.pack("send_retry", addr, session, type, ...) + while true do + local receipt_type = ltask.post_message(SERVICE_ROOT, SESSION_SEND_MESSAGE, MESSAGE_REQUEST, msg, sz) + if receipt_type == RECEIPT_DONE then + break + elseif receipt_type == RECEIPT_BLOCK then + ltask.sleep(1) + else + -- error (root quit?) + ltask.remove(msg, sz) + break + end + end +end + +local function post_request_message(addr, session, type, msg, sz) + local receipt_type, receipt_msg, receipt_sz = ltask.post_message(addr, session, type, msg, sz) + if receipt_type == RECEIPT_DONE then + return + end + if receipt_type == RECEIPT_ERROR then + ltask.remove(receipt_msg, receipt_sz) + if session ~= SESSION_SEND_MESSAGE then + error(string.format("{service:%d} is dead", addr)) + end + else + --RECEIPT_BLOCK + -- todo: send again + ltask.remove(receipt_msg, receipt_sz) + error(string.format("{service:%d} is busy", addr)) + end +end + +local function post_response_message(addr, session, type, msg, sz) + local receipt_type, receipt_msg, receipt_sz = ltask.post_message(addr, session, type, msg, sz) + if receipt_type == RECEIPT_DONE then + return + end + if receipt_type == RECEIPT_ERROR then + ltask.remove(receipt_msg, receipt_sz) + else + --RECEIPT_BLOCK + ltask.fork(function () + send_blocked_message(addr, session, type, ltask.unpack_remove(receipt_msg, receipt_sz)) + end) + end +end + +function ltask.rasie_error(addr, session, message) + if session == SESSION_SEND_MESSAGE then + return + end + local errobj = traceback(message, 4) + post_response_message(addr, session, MESSAGE_ERROR, ltask.pack(errobj)) +end + +local function resume_session(co, ...) + running_thread = co + local ok, errobj = coroutine.resume(co, ...) + running_thread = nil + if ok then + return errobj + else + local from = session_coroutine_address[co] + local session = session_coroutine_response[co] + + -- term session + session_coroutine_address[co] = nil + session_coroutine_response[co] = nil + + errobj = traceback(errobj, co) + if from == nil or from == 0 or session == SESSION_SEND_MESSAGE then + ltask.log.error(tostring(errobj)) + else + post_response_message(from, session, MESSAGE_ERROR, ltask.pack(errobj)) + end + coroutine.close(co) + end +end + +local function wakeup_session(co, ...) + local cont = resume_session(co, ...) + while cont do + yield_service() + cont = resume_session(co) + end +end + +local coroutine_pool = setmetatable({}, { __mode = "kv" }) + +local function new_thread(f) + local co = table.remove(coroutine_pool) + if co == nil then + co = coroutine.create(function(...) + f(...) + while true do + f = nil + coroutine_pool[#coroutine_pool+1] = co + f = coroutine.yield() + f(coroutine.yield()) + end + end) + else + coroutine.resume(co, f) + end + return co +end + +local function new_session(f, from, session) + local co = new_thread(f) + session_coroutine_address[co] = from + session_coroutine_response[co] = session + return co +end + +local SESSION = {} + +local function send_response(...) + local session = session_coroutine_response[running_thread] + + if session ~= SESSION_SEND_MESSAGE then + local from = session_coroutine_address[running_thread] + post_response_message(from, session, MESSAGE_RESPONSE, ltask.pack(...)) + end + + -- End session + session_coroutine_address[running_thread] = nil + session_coroutine_response[running_thread] = nil +end + +------------- ltask lua api + +function ltask.suspend(session, co) + session_coroutine_suspend_lookup[session] = co +end + +function ltask.call(address, ...) + post_request_message(address, session_id, MESSAGE_REQUEST, ltask.pack(...)) + session_coroutine_suspend_lookup[session_id] = running_thread + session_id = session_id + 1 + local type, session, msg, sz = yield_session() + if type == MESSAGE_RESPONSE then + return ltask.unpack_remove(msg, sz) + else + -- type == MESSAGE_ERROR + rethrow_error(2, ltask.unpack_remove(msg, sz)) + end +end + +do -- async object + local async = {} ; async.__index = async + + local function still_session(obj, session) + local s = obj._sessions + s[session] = nil + return next(s) + end + + function ltask.async() + local obj + local function wait_func(type, session, msg, sz) + -- ignore type + ltask.unpack_remove(msg, sz) + while still_session(obj, session) do + type, session, msg, sz = yield_session() + ltask.unpack_remove(msg, sz) + end + + if obj._wakeup then + ltask.wakeup(obj._wakeup) + end + return wait_func(yield_session()) + end + + obj = { + _sessions = {}, + _wait = new_thread(wait_func), + } + return setmetatable(obj, async) + end + + function async:request(address, ...) + post_request_message(address, session_id, MESSAGE_REQUEST, ltask.pack(...)) + session_coroutine_suspend_lookup[session_id] = self._wait + self._sessions[session_id] = true + session_id = session_id + 1 + end + + function async:wait() + if next(self._sessions) then + if not self._wakeup then + self._wakeup = self + ltask.wait(self) + end + end + self._wakeup = nil + end +end + +function ltask.send(address, ...) + post_request_message(address, SESSION_SEND_MESSAGE, MESSAGE_REQUEST, ltask.pack(...)) +end + +function ltask.syscall(address, ...) + post_request_message(address, session_id, MESSAGE_SYSTEM, ltask.pack(...)) + session_coroutine_suspend_lookup[session_id] = running_thread + session_id = session_id + 1 + local type, session, msg, sz = yield_session() + if type == MESSAGE_RESPONSE then + return ltask.unpack_remove(msg, sz) + else + -- type == MESSAGE_ERROR + rethrow_error(2, ltask.unpack_remove(msg, sz)) + end +end + +function ltask.sleep(ti) + session_coroutine_suspend_lookup[session_id] = running_thread + if ti == 0 then + if RECEIPT_DONE ~= ltask.post_message(CURRENT_SERVICE, session_id, MESSAGE_RESPONSE) then + ltask.timer_add(session_id, 0) + end + else + ltask.timer_add(session_id, ti) + end + session_id = session_id + 1 + yield_session() +end + +function ltask.thread_info(thread) + local v = {} + v[".name"] = debug.getinfo(thread, 1, "n") + local index = 1 + while true do + local name, value = debug.getlocal(thread, 1, index) + if name then + v[name] = value + else + break + end + index = index + 1 + end + return v +end + +function ltask.timeout(ti, func) + local co = new_thread(func) + session_coroutine_suspend_lookup[session_id] = co + if ti == 0 then + if RECEIPT_DONE ~= ltask.post_message(CURRENT_SERVICE, session_id, MESSAGE_RESPONSE) then + ltask.timer_add(session_id, 0) + end + else + ltask.timer_add(session_id, ti) + end + session_id = session_id + 1 +end + +local function wait_interrupt(errobj) + rethrow_error(3, errobj) +end + +local function wait_response(type, ...) + if type == MESSAGE_RESPONSE then + return ... + else -- type == MESSAGE_ERROR + wait_interrupt(...) + end +end + +function ltask.wait(token) + token = token or running_thread + assert(session_waiting[token] == nil) + session_waiting[token] = running_thread + session_id = session_id + 1 + return wait_response(yield_session()) +end + +function ltask.multi_wait(token) + token = token or running_thread + local thr = session_waiting[token] + if thr then + thr[#thr+1] = running_thread + else + session_waiting[token] = { running_thread } + end + session_id = session_id + 1 + return wait_response(yield_session()) +end + +function ltask.wakeup(token, ...) + local co = session_waiting[token] + if co then + wakeup_queue[#wakeup_queue+1] = {co, MESSAGE_RESPONSE, ...} + session_waiting[token] = nil + return true + end +end + +function ltask.multi_wakeup(token, ...) + local co = session_waiting[token] + if co then + local n = #wakeup_queue + for i = 1, #co do + wakeup_queue[n+i] = {co[i], MESSAGE_RESPONSE, ...} + end + session_waiting[token] = nil + return true + end +end + +function ltask.interrupt(token, errobj) + local co = session_waiting[token] + if co then + errobj = traceback(errobj, 4) + wakeup_queue[#wakeup_queue+1] = {co, MESSAGE_ERROR, errobj} + session_waiting[token] = nil + return true + end +end + +function ltask.multi_interrupt(token, errobj) + local co = session_waiting[token] + if co then + errobj = traceback(errobj, 4) + local n = #wakeup_queue + for i = 1, #co do + wakeup_queue[n+i] = {co[i], MESSAGE_ERROR, errobj} + end + session_waiting[token] = nil + return true + end +end + +function ltask.fork(func, ...) + local co = new_thread(func) + wakeup_queue[#wakeup_queue+1] = {co, ...} +end + +function ltask.current_session() + local from = session_coroutine_address[running_thread] + local session = session_coroutine_response[running_thread] + return { from = from, session = session } +end + +function ltask.no_response() + session_coroutine_response[running_thread] = nil +end + +function ltask.spawn(name, ...) + return ltask.call(SERVICE_ROOT, "spawn", name, ...) +end + +function ltask.queryservice(name) + return ltask.call(SERVICE_ROOT, "queryservice", name) +end + +function ltask.uniqueservice(name, ...) + return ltask.call(SERVICE_ROOT, "uniqueservice", name, ...) +end + +function ltask.spawn_service(name, ...) + return ltask.call(SERVICE_ROOT, "spawn_service", name, ...) +end + +function ltask.parallel(task) + local n = #task + if n == 0 then + return function () end + end + local ret_head = 0 + local ret_tail = 0 + local ret = {} + local token + local function rethrow(res) + rethrow_error(2, res.error) + end + local function resp(t, ok, ...) + local res = {} + if ok then + res = table.pack(...) + else + res.error = ... + res.rethrow = rethrow + end + ret_tail = ret_tail + 1 + ret[ret_tail] = { t, res } + if token then + ltask.wakeup(token) + token = nil + end + end + local idx = 1 + local supervisor_running = false + local run_task -- function + local function next_task() + local i = idx + idx = idx + 1 + local t = task[i] + if t then + run_task(t) + end + end + local function run_supervisor() + supervisor_running = false -- only one supervisor + next_task() + end + local function error_handler(errobj) + return traceback(errobj, 4) + end + function run_task(t) + if not supervisor_running then + supervisor_running = true + ltask.fork(run_supervisor) + end + resp(t, xpcall(t[1], error_handler, table.unpack(t, 2))) + next_task() + end + ltask.fork(next_task) + return function() + if ret_tail == n and ret_head == ret_tail then + return + end + while ret_head == ret_tail do + token = {} + ltask.wait(token) + end + ret_head = ret_head + 1 + local t = ret[ret_head] + ret[ret_head] = nil + return t[1], t[2] + end +end + +------------- + +local quit + +function ltask.quit() + ltask.fork(function () + for co, addr in pairs(session_coroutine_address) do + local session = session_coroutine_response[co] + ltask.rasie_error(addr, session, "Service has been quit.") + end + quit = true + end) +end + +local service = nil +local sys_service = {} + +function ltask.dispatch(handler) + if handler then + service = service or {} + -- merge handler into service + for k,v in pairs(handler) do + if type(v) == "function" then + assert(service[k] == nil) + service[k] = v + end + end + end + return service +end + +local function register_handler(msg_type, f) + SESSION[msg_type] = function (type) + local from = session_coroutine_address[running_thread] + local session = session_coroutine_response[running_thread] + f(from, session) + end +end + +function ltask.signal_handler(f) -- root only + register_handler(MESSAGE_SIGNAL, f) +end + +function ltask.idle_handler(f) + register_handler(MESSAGE_IDLE, f) +end + +local yieldable_require; do + local require = _G.require + local loaded = package.loaded + local loading = {} + local function findloader(name) + local msg = '' + local searchers = package.searchers + assert(type(searchers) == "table", "'package.searchers' must be a table") + for _, searcher in ipairs(searchers) do + local f, extra = searcher(name) + if type(f) == 'function' then + return f, extra + elseif type(f) == 'string' then + msg = msg .. "\n\t" .. f + end + end + error(("module '%s' not found:%s"):format(name, msg), 3) + end + local function finish_loading(loading_queue) + local waiting = #loading_queue + if waiting > 0 then + for i = 1, waiting do + ltask.wakeup(loading_queue[i]) + end + end + loading[loading_queue.name] = nil + end + local toclosed_loading = {__close = finish_loading} + local function start_loading(name, co) + local loading_queue = loading[name] + if loading_queue then + if loading_queue.co == co then + error("circular dependency", 2) + end + loading_queue[#loading_queue+1] = co + ltask.wait(co) + return + end + loading_queue = setmetatable({co = co, name = name}, toclosed_loading) + loading[name] = loading_queue + return loading_queue + end + function yieldable_require(name) + local m = loaded[name] + if m ~= nil then + return m + end + local co, main = coroutine.running() + if main then + return require(name) + end + local queue = start_loading(name, co) + if not queue then + local r = loaded[name] + if r == nil then + error(("require %q failed"):format(name), 2) + end + return r + end + local initfunc, extra = findloader(name) + local r = initfunc(name, extra) + if r == nil then + r = true + end + loaded[name] = r + return r + end +end + +local function sys_service_init(t) + -- The first system message + _G.require = yieldable_require + local initfunc = assert(load(t.initfunc)) + local func = assert(initfunc(t.name)) + local handler = func(table.unpack(t.args)) + ltask.dispatch(handler) + if service == nil then + ltask.quit() + end +end + +local function error_handler(errobj) + return traceback(errobj, 4) +end + +function sys_service.init(t) + local ok, errobj = xpcall(sys_service_init, error_handler, t) + if not ok then + ltask.quit() + rethrow_error(1, errobj) + end +end + +function sys_service.quit() + if service and service.quit then + return service.quit() + else + ltask.quit() + end +end + +function sys_service.memory() + return collectgarbage "count" * 1024 +end + +function sys_service.traceback() + local tlog = {} + local n = 1 + for session, co in pairs(session_coroutine_suspend_lookup) do + tlog[n] = "Session : " .. tostring(session) ; n = n + 1 + tlog[n] = debug.traceback(co) ; n = n + 1 + end + return table.concat(tlog, "\n") +end + +local function system(command, ...) + local s = sys_service[command] + if not s then + error("Unknown system message : " .. command) + return + end + send_response(s(...)) +end + +SESSION[MESSAGE_SYSTEM] = function (type, msg, sz) + system(ltask.unpack_remove(msg, sz)) +end + +local function request(command, ...) + local s = service[command] + if not s then + error("Unknown request message : " .. command) + return + end + send_response(s(...)) +end + +SESSION[MESSAGE_REQUEST] = function (type, msg, sz) + request(ltask.unpack_remove(msg, sz)) +end + +local function schedule_message() + local from, session, type, msg, sz = ltask.recv_message() + local f = SESSION[type] + if f then + -- new session for this message + local co = new_session(f, from, session) + wakeup_session(co, type, msg, sz) + else + local co = session_coroutine_suspend_lookup[session] + if co == nil then + print("Unknown response session : ", session, "from", from, "type", type, ltask.unpack_remove(msg, sz)) + else + session_coroutine_suspend_lookup[session] = nil + wakeup_session(co, type, session, msg, sz) + end + end + while #wakeup_queue > 0 do + local s = table.remove(wakeup_queue, 1) + wakeup_session(table.unpack(s)) + end +end + +print = ltask.log.info + +local function mainloop() + while true do + schedule_message() + if quit then + ltask.log.info "quit." + return + end + yield_service() + end +end + +mainloop() diff --git a/script/ltask/service/logger.lua b/script/ltask2/service/logger.lua similarity index 100% rename from script/ltask/service/logger.lua rename to script/ltask2/service/logger.lua diff --git a/script/ltask2/service/main.lua b/script/ltask2/service/main.lua new file mode 100644 index 000000000..00e42ea91 --- /dev/null +++ b/script/ltask2/service/main.lua @@ -0,0 +1,10 @@ +local fs = require 'bee.filesystem' +local args = ... + +ROOT = fs.path(args.ROOT) +LOGPATH = args.LOGPATH +METAPATH = args.METAPATH + +local _, service = xpcall(require, log.error, 'service') + +service.start() From 509dd5cef074e36773ace2b1542cf4d8971ed523 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=80=E8=90=8C=E5=B0=8F=E6=B1=90?= Date: Mon, 13 May 2024 18:06:33 +0800 Subject: [PATCH 04/11] =?UTF-8?q?=E6=9A=82=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .luarc.json | 2 +- script/ltask2/init.lua | 6 +-- script/ltask2/lualib/timer.lua | 70 ++++++++++++++++++++++++++++ script/ltask2/service/logger.lua | 34 -------------- script/{ltask2/service => }/main.lua | 1 - 5 files changed, 74 insertions(+), 39 deletions(-) create mode 100644 script/ltask2/lualib/timer.lua delete mode 100644 script/ltask2/service/logger.lua rename script/{ltask2/service => }/main.lua (99%) diff --git a/.luarc.json b/.luarc.json index b9bb65344..608ee9305 100644 --- a/.luarc.json +++ b/.luarc.json @@ -41,7 +41,7 @@ "/3rd", "/.vscode", "/meta", - "script/ltask/lualib" + "/script/ltask2/lualib" ], "checkThirdParty": false }, diff --git a/script/ltask2/init.lua b/script/ltask2/init.lua index 8a4db9160..1ede9cded 100644 --- a/script/ltask2/init.lua +++ b/script/ltask2/init.lua @@ -14,11 +14,11 @@ local servicepath = searchpath "service" local root_config = { bootstrap = { { - name = "timer", + name = "ltask2.lualib.timer", unique = true, }, { - name = "logger", + name = "ltask2.lualib.logger", unique = true, args = { LOGPATH = LOGPATH, @@ -47,7 +47,6 @@ return loadfile(filename) ]=]):gsub("%$%{([^}]*)%}", { lua_path = package.path, lua_cpath = package.cpath, - service_path = (ROOT / "script/ltask2/service/?.lua"):string(), }), } @@ -60,3 +59,4 @@ local ctx = bootstrap.start { } bootstrap.wait(ctx) +print('bootstrap.wait(ctx) 结束!') diff --git a/script/ltask2/lualib/timer.lua b/script/ltask2/lualib/timer.lua new file mode 100644 index 000000000..c2ff83a20 --- /dev/null +++ b/script/ltask2/lualib/timer.lua @@ -0,0 +1,70 @@ +local ltask = require "ltask" + +local MESSAGE_RESPONSE = 2 +local RECEIPT_BLOCK = 3 + +local messages = {} +local timer = {} + +function timer.quit() + ltask.quit() +end + +local blocked + +local mcount = 0 + +local function send_all_messages() + for i = 1, #messages do + local v = messages[i] + local session = v >> 32 + local addr = v & 0xffffffff + local blocked_queue = blocked and blocked[addr] + if blocked_queue then + blocked_queue[#blocked_queue+1] = session + else + mcount = mcount + 1 + if ltask.post_message(addr, session, MESSAGE_RESPONSE) == RECEIPT_BLOCK then + blocked = blocked or {} + blocked[addr] = { session } + end + end + end +end + +local function send_blocked_queue(addr, queue) + local n = #queue + for i = 1, n do + if ltask.post_message(addr, queue[i], MESSAGE_RESPONSE) == RECEIPT_BLOCK then + table.move(queue, i, n, 1) + return true + end + end +end + +local function send_blocked() + local b + for addr, queue in pairs(blocked) do + if send_blocked_queue(addr, queue) then + b = true + else + blocked[addr] = nil + end + end + if not b then + blocked = nil + end +end + +ltask.eventinit() -- enable idle handler + +ltask.idle_handler(function () + ltask.timer_update(messages) + send_all_messages() + ltask.timer_sleep(10) + if blocked then + send_blocked() + end +end) + +return timer diff --git a/script/ltask2/service/logger.lua b/script/ltask2/service/logger.lua deleted file mode 100644 index 2b058df76..000000000 --- a/script/ltask2/service/logger.lua +++ /dev/null @@ -1,34 +0,0 @@ -local ltask = require "ltask" - -local S = {} - -local function writelog() - local flush - while true do - local ti, _, msg, sz = ltask.poplog() - if ti == nil then - if flush then - io.flush() - end - break - end - local tsec = ti // 100 - local msec = ti % 100 - local level, message = ltask.unpack_remove(msg, sz) - --io.write(string.format("[%s.%02d][%-5s]%s\n", os.date("%Y-%m-%d %H:%M:%S", tsec), msec, level:upper(), message)) - flush = true - end -end - -ltask.fork(function() - while true do - writelog() - ltask.sleep(100) - end -end) - -function S.quit() - writelog() -end - -return S diff --git a/script/ltask2/service/main.lua b/script/main.lua similarity index 99% rename from script/ltask2/service/main.lua rename to script/main.lua index 00e42ea91..3f2894444 100644 --- a/script/ltask2/service/main.lua +++ b/script/main.lua @@ -6,5 +6,4 @@ LOGPATH = args.LOGPATH METAPATH = args.METAPATH local _, service = xpcall(require, log.error, 'service') - service.start() From b1e58f2c99a133c2a32fab5df29997c4d9ff84ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=80=E8=90=8C=E5=B0=8F=E6=B1=90?= Date: Mon, 13 May 2024 18:21:25 +0800 Subject: [PATCH 05/11] =?UTF-8?q?=E6=9A=82=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- script/ltask2/init.lua | 19 +- script/ltask2/{lualib => service}/logger.lua | 0 script/{ => ltask2/service}/main.lua | 0 script/ltask2/service/root.lua | 280 +++++++++++++++++++ script/ltask2/{lualib => service}/timer.lua | 0 5 files changed, 292 insertions(+), 7 deletions(-) rename script/ltask2/{lualib => service}/logger.lua (100%) rename script/{ => ltask2/service}/main.lua (100%) create mode 100644 script/ltask2/service/root.lua rename script/ltask2/{lualib => service}/timer.lua (100%) diff --git a/script/ltask2/init.lua b/script/ltask2/init.lua index 1ede9cded..10d751780 100644 --- a/script/ltask2/init.lua +++ b/script/ltask2/init.lua @@ -14,22 +14,26 @@ local servicepath = searchpath "service" local root_config = { bootstrap = { { - name = "ltask2.lualib.timer", + name = "timer", unique = true, }, { - name = "ltask2.lualib.logger", + name = "logger", unique = true, args = { - LOGPATH = LOGPATH, + { + LOGPATH = LOGPATH, + } } }, { name = "main", args = { - ROOT = ROOT:string(), - LOGPATH = LOGPATH, - METAPATH = METAPATH, + { + ROOT = ROOT:string(), + LOGPATH = LOGPATH, + METAPATH = METAPATH, + } }, }, }, @@ -39,7 +43,7 @@ local root_config = { local name = ... package.path = [[${lua_path}]] package.cpath = [[${lua_cpath}]] -local filename, err = package.searchpath(name, package.path) +local filename, err = package.searchpath(name, [[${service_path}]]) if not filename then return nil, err end @@ -47,6 +51,7 @@ return loadfile(filename) ]=]):gsub("%$%{([^}]*)%}", { lua_path = package.path, lua_cpath = package.cpath, + service_path = (ROOT / 'script/ltask2/service/?.lua'):string() }), } diff --git a/script/ltask2/lualib/logger.lua b/script/ltask2/service/logger.lua similarity index 100% rename from script/ltask2/lualib/logger.lua rename to script/ltask2/service/logger.lua diff --git a/script/main.lua b/script/ltask2/service/main.lua similarity index 100% rename from script/main.lua rename to script/ltask2/service/main.lua diff --git a/script/ltask2/service/root.lua b/script/ltask2/service/root.lua new file mode 100644 index 000000000..c7bf41664 --- /dev/null +++ b/script/ltask2/service/root.lua @@ -0,0 +1,280 @@ +local ltask = require "ltask" +local root = require "ltask.root" + +local config = ... + +local SERVICE_SYSTEM = 0 + +local MESSAGE_ERROR = 3 + +local MESSAGE_SCHEDULE_NEW = 0 +local MESSAGE_SCHEDULE_DEL = 1 + +local RECEIPT_ERROR = 2 +local RECEIPT_BLOCK = 3 +local RECEIPT_RESPONCE = 4 + +local S = {} + +local anonymous_services = {} +local named_services = {} + +local root_quit = ltask.quit +ltask.quit = function() end + +local function writelog() + while true do + local ti, _, msg, sz = ltask.poplog() + if ti == nil then + break + end + local tsec = ti // 100 + local msec = ti % 100 + local level, message = ltask.unpack_remove(msg, sz) + io.write(string.format("[%s.%02d][%-5s]%s\n", os.date("%Y-%m-%d %H:%M:%S", tsec), msec, level:upper(), message)) + end +end + +do + -- root init response to itself + local function init_receipt(type, session, msg, sz) + if type == MESSAGE_ERROR then + local errobj = ltask.unpack_remove(msg, sz) + ltask.log.error("Root fatal:", table.concat(errobj, "\n")) + writelog() + root_quit() + end + end + + -- The session of root init message must be 1 + ltask.suspend(1, coroutine.create(init_receipt)) +end + +local retry_queue + +local function send_retry_queue(addr, queue) + local n = #queue + for i = 1, n do + local type = ltask.post_message(addr, table.unpack(queue[i])) + if type == RECEIPT_BLOCK then + table.move(queue, i, n, 1) + return true + elseif type == RECEIPT_ERROR then + for j = i, n do + local msg, sz = queue[j][3], queue[j][4] + ltask.remove(msg, sz) + end + return + end + end +end + +local function send_all_retry() + while true do + local removed = {} + for addr, queue in pairs(retry_queue) do + if not send_retry_queue(addr, queue) then + removed[addr] = true + end + end + for addr in pairs(removed) do + retry_queue[addr] = nil + end + if next(retry_queue) == nil then + break + end + ltask.sleep(1) + end + retry_queue = nil +end + +function S.send_retry(addr, session, type, ...) + local message = { session, type, ltask.pack(...) } + if retry_queue then + local q = retry_queue[addr] + if q then + q[#q+1] = message + else + retry_queue[addr] = { message } + end + else + retry_queue = { + [addr] = { message }, + } + ltask.fork(send_all_retry) + end +end + +local function register_service(address, name) + if named_services[name] then + error(("Name `%s` already exists."):format(name)) + end + anonymous_services[address] = nil + named_services[#named_services+1] = name + named_services[name] = address + ltask.multi_wakeup("unique."..name, address) +end + +local function spawn(t) + local type, address = ltask.post_message(SERVICE_SYSTEM, 0, MESSAGE_SCHEDULE_NEW) + if type ~= RECEIPT_RESPONCE then + -- RECEIPT_ERROR + error("send MESSAGE_SCHEDULE_NEW failed.") + end + anonymous_services[address] = true + assert(root.init_service(address, t.name, config.service_source, config.service_chunkname, t.worker_id)) + ltask.syscall(address, "init", { + initfunc = t.initfunc or config.initfunc, + name = t.name, + args = t.args or {}, + }) + return address +end + +local unique = {} + +local function spawn_unique(t) + local address = named_services[t.name] + if address then + return address + end + local key = "unique."..t.name + if not unique[t.name] then + unique[t.name] = true + ltask.fork(function () + local ok, addr = pcall(spawn, t) + if not ok then + local err = addr + ltask.multi_interrupt(key, err) + unique[t.name] = nil + return + end + register_service(addr, t.name) + unique[t.name] = nil + end) + end + return ltask.multi_wait(key) +end + +function S.tracelog(timeout) + local tlog = {} + local tasks = {} + local n = 1 + for addr in pairs(anonymous_services) do + tasks[n] = { ltask.syscall, addr, "traceback" , addr = addr } ; n = n + 1 + tlog[addr] = {} + end + for _, name in ipairs(named_services) do + local addr = named_services[name] + tasks[n] = { ltask.syscall, addr, "traceback" , addr = addr, name = name } ; n = n + 1 + tlog[addr] = { name = name } + end + if timeout then + tasks[n] = { ltask.sleep, timeout } + end + + for req, resp in ltask.parallel(tasks) do + if not req.addr then + -- timeout + break + end + if not resp.error then + tlog[req.addr].traceback = resp[1] + else + tlog[req.addr].error = resp.error + end + end + + return tlog +end + +function S.spawn(name, ...) + return spawn { + name = name, + args = {...}, + } +end + +function S.queryservice(name) + local address = named_services[name] + if address then + return address + end + return ltask.multi_wait("unique."..name) +end + +function S.uniqueservice(name, ...) + return spawn_unique { + name = name, + args = {...}, + } +end + +function S.spawn_service(t) + if t.unique then + return spawn_unique(t) + else + return spawn(t) + end +end + +local function del_service(address) + if anonymous_services[address] then + anonymous_services[address] = nil + else + for _, name in ipairs(named_services) do + if named_services[name] == address then + break + end + end + end + local msg = root.close_service(address) + ltask.post_message(SERVICE_SYSTEM, address, MESSAGE_SCHEDULE_DEL) + if msg then + local err = "Service " .. address .. " has been quit." + for i=1, #msg, 2 do + local addr = msg[i] + local session = msg[i+1] + ltask.rasie_error(addr, session, err) + end + end +end + +function S.quit_ltask() + ltask.signal_handler(del_service) + for i = #named_services, 1, -1 do + local name = named_services[i] + local address = named_services[name] + local ok, err = pcall(ltask.syscall, address, "quit") + if not ok then + print(string.format("named service %s(%d) quit error: %s.", name, address, err)) + end + end + writelog() + root_quit() +end + +local function quit() + if next(anonymous_services) ~= nil then + return + end + ltask.send(ltask.self(), "quit_ltask") +end + +local function signal_handler(from) + del_service(from) + quit() +end + +ltask.signal_handler(signal_handler) + +local function bootstrap() + for _, t in ipairs(config.bootstrap) do + S.spawn_service(t) + end +end + +ltask.dispatch(S) + +bootstrap() +quit() diff --git a/script/ltask2/lualib/timer.lua b/script/ltask2/service/timer.lua similarity index 100% rename from script/ltask2/lualib/timer.lua rename to script/ltask2/service/timer.lua From d2028b194b853ef0196fbc013e7bdfedc2fa76e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=80=E8=90=8C=E5=B0=8F=E6=B1=90?= Date: Mon, 13 May 2024 18:25:41 +0800 Subject: [PATCH 06/11] =?UTF-8?q?=E7=9C=8B=E6=9D=A5=E6=B2=A1=E6=B3=95?= =?UTF-8?q?=E4=B8=8D=E6=94=B9=E8=BF=99=E4=B8=AA=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .luarc.json | 3 +-- script/ltask2/lualib/service.lua | 5 +++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.luarc.json b/.luarc.json index 608ee9305..fc02379f9 100644 --- a/.luarc.json +++ b/.luarc.json @@ -40,8 +40,7 @@ "/libs/", "/3rd", "/.vscode", - "/meta", - "/script/ltask2/lualib" + "/meta" ], "checkThirdParty": false }, diff --git a/script/ltask2/lualib/service.lua b/script/ltask2/lualib/service.lua index aca61878b..c88dbdec2 100644 --- a/script/ltask2/lualib/service.lua +++ b/script/ltask2/lualib/service.lua @@ -1,3 +1,4 @@ +---@diagnostic disable: await-in-sync local SERVICE_ROOT = 1 local MESSAGE_SYSTEM = 0 @@ -797,7 +798,7 @@ local function error_handler(errobj) end function sys_service.init(t) - local ok, errobj = xpcall(sys_service_init, error_handler, t) + local ok, errobj = xpcall(sys_service_init, debug.traceback, t) if not ok then ltask.quit() rethrow_error(1, errobj) @@ -840,7 +841,7 @@ SESSION[MESSAGE_SYSTEM] = function (type, msg, sz) end local function request(command, ...) - local s = service[command] + local s = service and service[command] if not s then error("Unknown request message : " .. command) return From 4dcff3e1bf7cbfddfdf20d128986faf9aeb77516 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=80=E8=90=8C=E5=B0=8F=E6=B1=90?= Date: Mon, 13 May 2024 18:28:38 +0800 Subject: [PATCH 07/11] =?UTF-8?q?=E5=8F=AF=E4=BB=A5=E5=8A=A0=E8=BD=BDmain?= =?UTF-8?q?=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- script/ltask2/service/main.lua | 1 + 1 file changed, 1 insertion(+) diff --git a/script/ltask2/service/main.lua b/script/ltask2/service/main.lua index 3f2894444..5ca9e70a0 100644 --- a/script/ltask2/service/main.lua +++ b/script/ltask2/service/main.lua @@ -5,5 +5,6 @@ ROOT = fs.path(args.ROOT) LOGPATH = args.LOGPATH METAPATH = args.METAPATH +local log = require 'brave.log' local _, service = xpcall(require, log.error, 'service') service.start() From e1ab93aac729c469eadc0f6fee7630d32f947fed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=80=E8=90=8C=E5=B0=8F=E6=B1=90?= Date: Mon, 13 May 2024 18:35:25 +0800 Subject: [PATCH 08/11] =?UTF-8?q?=E5=8B=89=E5=BC=BA=E8=B7=91=E9=80=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.lua | 4 +--- script/ltask2/init.lua | 1 + script/ltask2/service/main.lua | 11 ++++++++++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/main.lua b/main.lua index 61ad0cab1..3e62e342d 100644 --- a/main.lua +++ b/main.lua @@ -60,7 +60,7 @@ collectgarbage('generational', 10, 50) ---@diagnostic disable-next-line: lowercase-global log = require 'log' -log.init(ROOT, fs.path(LOGPATH) / 'service.log') +log.init(ROOT, fs.path(LOGPATH) / 'startup.log') if LOGLEVEL then log.level = tostring(LOGLEVEL):lower() end @@ -71,8 +71,6 @@ log.info('LOGPATH:', LOGPATH) log.info('METAPATH:', METAPATH) log.info('VERSION:', version.getVersion()) -require 'tracy' - xpcall(dofile, log.debug, (ROOT / 'debugger.lua'):string()) require 'cli' diff --git a/script/ltask2/init.lua b/script/ltask2/init.lua index 10d751780..bf06096ff 100644 --- a/script/ltask2/init.lua +++ b/script/ltask2/init.lua @@ -33,6 +33,7 @@ local root_config = { ROOT = ROOT:string(), LOGPATH = LOGPATH, METAPATH = METAPATH, + LOGLEVEL = LOGLEVEL, } }, }, diff --git a/script/ltask2/service/main.lua b/script/ltask2/service/main.lua index 5ca9e70a0..626cb1405 100644 --- a/script/ltask2/service/main.lua +++ b/script/ltask2/service/main.lua @@ -4,7 +4,16 @@ local args = ... ROOT = fs.path(args.ROOT) LOGPATH = args.LOGPATH METAPATH = args.METAPATH +LOGLEVEL = args.LOGLEVEL + +require 'tracy' + +---@diagnostic disable-next-line: lowercase-global +log = require 'log' +log.init(ROOT, fs.path(LOGPATH) / 'service.log') +if LOGLEVEL then + log.level = tostring(LOGLEVEL):lower() +end -local log = require 'brave.log' local _, service = xpcall(require, log.error, 'service') service.start() From fe512bae4622936b02445cab713b1a779cb89b83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=80=E8=90=8C=E5=B0=8F=E6=B1=90?= Date: Mon, 13 May 2024 18:37:40 +0800 Subject: [PATCH 09/11] =?UTF-8?q?=E5=8A=A0=E7=82=B9=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- script/ltask2/service/main.lua | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/script/ltask2/service/main.lua b/script/ltask2/service/main.lua index 626cb1405..02067b69c 100644 --- a/script/ltask2/service/main.lua +++ b/script/ltask2/service/main.lua @@ -15,5 +15,13 @@ if LOGLEVEL then log.level = tostring(LOGLEVEL):lower() end +local version = require 'version' + +log.info('Lua Lsp master startup, root: ', ROOT) +log.info('ROOT:', ROOT:string()) +log.info('LOGPATH:', LOGPATH) +log.info('METAPATH:', METAPATH) +log.info('VERSION:', version.getVersion()) + local _, service = xpcall(require, log.error, 'service') service.start() From 880d55106d49d3ff381c97ede66b9d6d783587ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=80=E8=90=8C=E5=B0=8F=E6=B1=90?= Date: Tue, 14 May 2024 15:31:04 +0800 Subject: [PATCH 10/11] =?UTF-8?q?=E4=BC=A0=E9=80=92=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.lua | 8 +++++--- script/config/env.lua | 5 +++++ script/ltask2/init.lua | 3 ++- script/ltask2/service/main.lua | 14 +++++++++----- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/main.lua b/main.lua index 3e62e342d..08f71d8d5 100644 --- a/main.lua +++ b/main.lua @@ -2,7 +2,7 @@ local fs = require 'bee.filesystem' local util = require 'utility' local version = require 'version' -require 'config.env' +ARG = require 'config.env' local function getValue(value) if value == 'true' or value == nil then @@ -38,7 +38,9 @@ local function loadArgs() end end if key then - _G[key:upper():gsub('-', '_')] = getValue(value) + local lkey = key:lower():gsub('-', '_') + _G[lkey] = getValue(value) + ARG[lkey] = getValue(value) end end end @@ -60,7 +62,7 @@ collectgarbage('generational', 10, 50) ---@diagnostic disable-next-line: lowercase-global log = require 'log' -log.init(ROOT, fs.path(LOGPATH) / 'startup.log') +log.init(ROOT, fs.path(LOGPATH) / 'cli.log') if LOGLEVEL then log.level = tostring(LOGLEVEL):lower() end diff --git a/script/config/env.lua b/script/config/env.lua index ef5b31f28..9f64535ec 100644 --- a/script/config/env.lua +++ b/script/config/env.lua @@ -55,6 +55,8 @@ local vars = { }, } +local env = {} + for _, var in ipairs(vars) do local value = os.getenv(var.name) if value then @@ -63,5 +65,8 @@ for _, var in ipairs(vars) do end _G[var.key] = value + env[var.key] = value end end + +return env diff --git a/script/ltask2/init.lua b/script/ltask2/init.lua index bf06096ff..4bb361fdd 100644 --- a/script/ltask2/init.lua +++ b/script/ltask2/init.lua @@ -34,7 +34,8 @@ local root_config = { LOGPATH = LOGPATH, METAPATH = METAPATH, LOGLEVEL = LOGLEVEL, - } + }, + ARG, }, }, }, diff --git a/script/ltask2/service/main.lua b/script/ltask2/service/main.lua index 02067b69c..4cc342e3c 100644 --- a/script/ltask2/service/main.lua +++ b/script/ltask2/service/main.lua @@ -1,10 +1,14 @@ local fs = require 'bee.filesystem' -local args = ... +local ENV, ARG = ... -ROOT = fs.path(args.ROOT) -LOGPATH = args.LOGPATH -METAPATH = args.METAPATH -LOGLEVEL = args.LOGLEVEL +for k, v in pairs(ARG) do + _G[k] = v +end + +ROOT = fs.path(ENV.ROOT) +LOGPATH = ENV.LOGPATH +METAPATH = ENV.METAPATH +LOGLEVEL = ENV.LOGLEVEL require 'tracy' From 46bff10d4b7fded8fc4a7dbce3bd6e62671d7847 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=80=E8=90=8C=E5=B0=8F=E6=B1=90?= Date: Tue, 14 May 2024 16:59:55 +0800 Subject: [PATCH 11/11] =?UTF-8?q?=E6=9A=82=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- script/brave/log.lua | 12 ++++++++-- script/ltask2/init.lua | 4 ++++ script/ltask2/lualib/service.lua | 1 + script/ltask2/service/log.lua | 16 +++++++++++++ script/ltask2/service/main.lua | 2 +- script/ltask2/service/root.lua | 1 + script/meta/ltask.lua | 41 ++++++++++++++++++++++++++++++++ 7 files changed, 74 insertions(+), 3 deletions(-) create mode 100644 script/ltask2/service/log.lua create mode 100644 script/meta/ltask.lua diff --git a/script/brave/log.lua b/script/brave/log.lua index d7c92b262..7735c0683 100644 --- a/script/brave/log.lua +++ b/script/brave/log.lua @@ -1,5 +1,5 @@ -local brave = require 'brave' local time = require 'bee.time' +local ltask = require 'ltask' local tablePack = table.pack local tostring = tostring @@ -8,6 +8,8 @@ local debugTraceBack = debug.traceback local debugGetInfo = debug.getinfo local monotonic = time.monotonic +local logAddr = ltask.queryservice('log') + _ENV = nil local function pushLog(level, ...) @@ -20,7 +22,7 @@ local function pushLog(level, ...) str = str .. '\n' .. debugTraceBack(nil, 3) end local info = debugGetInfo(3, 'Sl') - brave.push('log', { + ltask.call(logAddr, 'log', { level = level, msg = str, src = info.source, @@ -52,4 +54,10 @@ function m.error(...) pushLog('error', ...) end +---@param root fs.path +---@param path fs.path +function m.init(root, path) + ltask.call(logAddr, 'init', root:string(), path:string()) +end + return m diff --git a/script/ltask2/init.lua b/script/ltask2/init.lua index 4bb361fdd..a92211e67 100644 --- a/script/ltask2/init.lua +++ b/script/ltask2/init.lua @@ -26,6 +26,10 @@ local root_config = { } } }, + { + name = 'log', + unique = true, + }, { name = "main", args = { diff --git a/script/ltask2/lualib/service.lua b/script/ltask2/lualib/service.lua index c88dbdec2..10aa8e895 100644 --- a/script/ltask2/lualib/service.lua +++ b/script/ltask2/lualib/service.lua @@ -14,6 +14,7 @@ local RECEIPT_BLOCK = 3 local SESSION_SEND_MESSAGE = 0 +---@class ltask local ltask = require "ltask" local CURRENT_SERVICE = ltask.self() diff --git a/script/ltask2/service/log.lua b/script/ltask2/service/log.lua new file mode 100644 index 000000000..d6c3be0de --- /dev/null +++ b/script/ltask2/service/log.lua @@ -0,0 +1,16 @@ +local fs = require 'bee.filesystem' +local log = require 'log' + +local S = {} + +---@param root string +---@param path string +function S.init(root, path) + log.init(fs.path(root), fs.path(path)) +end + +function S.log(params) + log.raw(-1, params.level, params.msg, params.src, params.line, params.clock) +end + +return S diff --git a/script/ltask2/service/main.lua b/script/ltask2/service/main.lua index 4cc342e3c..63a43ce5c 100644 --- a/script/ltask2/service/main.lua +++ b/script/ltask2/service/main.lua @@ -13,7 +13,7 @@ LOGLEVEL = ENV.LOGLEVEL require 'tracy' ---@diagnostic disable-next-line: lowercase-global -log = require 'log' +log = require 'brave.log' log.init(ROOT, fs.path(LOGPATH) / 'service.log') if LOGLEVEL then log.level = tostring(LOGLEVEL):lower() diff --git a/script/ltask2/service/root.lua b/script/ltask2/service/root.lua index c7bf41664..5b9102632 100644 --- a/script/ltask2/service/root.lua +++ b/script/ltask2/service/root.lua @@ -20,6 +20,7 @@ local anonymous_services = {} local named_services = {} local root_quit = ltask.quit +---@diagnostic disable-next-line: duplicate-set-field ltask.quit = function() end local function writelog() diff --git a/script/meta/ltask.lua b/script/meta/ltask.lua new file mode 100644 index 000000000..03871053c --- /dev/null +++ b/script/meta/ltask.lua @@ -0,0 +1,41 @@ +---@meta ltask + +---@class ltask +local M = {} + +---@return string +function M.self() end + +---@return string +function M.label() end + +function M.pushlog(...) end + +function M.pack(...) end + +function M.send_message(...) end + +---@return ... +function M.message_receipt() end + +function M.remove(...) end + +---@return ... +function M.unpack_remove(...) end + +function M.timer_add(...) end + +---@return ... +function M.recv_message() end + +---@return ... +function M.poplog() end + +function M.eventinit() end + +function M.timer_update(message) end + +---@param ms integer +function M.timer_sleep(ms) end + +return M