skynet.lua 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686
  1. -- read https://github.com/cloudwu/skynet/wiki/FAQ for the module "skynet.core"
  2. local c = require "skynet.core"
  3. local tostring = tostring
  4. local tonumber = tonumber
  5. local coroutine = coroutine
  6. local assert = assert
  7. local pairs = pairs
  8. local pcall = pcall
  9. local table = table
  10. local profile = require "skynet.profile"
  11. local coroutine_resume = profile.resume
  12. local coroutine_yield = profile.yield
  13. local proto = {}
  14. local skynet = {
  15. -- read skynet.h
  16. PTYPE_TEXT = 0,
  17. PTYPE_RESPONSE = 1,
  18. PTYPE_MULTICAST = 2,
  19. PTYPE_CLIENT = 3,
  20. PTYPE_SYSTEM = 4,
  21. PTYPE_HARBOR = 5,
  22. PTYPE_SOCKET = 6,
  23. PTYPE_ERROR = 7,
  24. PTYPE_QUEUE = 8, -- used in deprecated mqueue, use skynet.queue instead
  25. PTYPE_DEBUG = 9,
  26. PTYPE_LUA = 10,
  27. PTYPE_SNAX = 11,
  28. }
  29. -- code cache
  30. skynet.cache = require "skynet.codecache"
  31. function skynet.register_protocol(class)
  32. local name = class.name
  33. local id = class.id
  34. assert(proto[name] == nil and proto[id] == nil)
  35. assert(type(name) == "string" and type(id) == "number" and id >=0 and id <=255)
  36. proto[name] = class
  37. proto[id] = class
  38. end
  39. local session_id_coroutine = {}
  40. local session_coroutine_id = {}
  41. local session_coroutine_address = {}
  42. local session_response = {}
  43. local unresponse = {}
  44. local wakeup_queue = {}
  45. local sleep_session = {}
  46. local watching_service = {}
  47. local watching_session = {}
  48. local dead_service = {}
  49. local error_queue = {}
  50. local fork_queue = {}
  51. -- suspend is function
  52. local suspend
  53. local function string_to_handle(str)
  54. return tonumber("0x" .. string.sub(str , 2))
  55. end
  56. ----- monitor exit
  57. local function dispatch_error_queue()
  58. local session = table.remove(error_queue,1)
  59. if session then
  60. local co = session_id_coroutine[session]
  61. session_id_coroutine[session] = nil
  62. return suspend(co, coroutine_resume(co, false))
  63. end
  64. end
  65. local function _error_dispatch(error_session, error_source)
  66. if error_session == 0 then
  67. -- service is down
  68. -- Don't remove from watching_service , because user may call dead service
  69. if watching_service[error_source] then
  70. dead_service[error_source] = true
  71. end
  72. for session, srv in pairs(watching_session) do
  73. if srv == error_source then
  74. table.insert(error_queue, session)
  75. end
  76. end
  77. else
  78. -- capture an error for error_session
  79. if watching_session[error_session] then
  80. table.insert(error_queue, error_session)
  81. end
  82. end
  83. end
  84. -- coroutine reuse
  85. local coroutine_pool = setmetatable({}, { __mode = "kv" })
  86. local function co_create(f)
  87. local co = table.remove(coroutine_pool)
  88. if co == nil then
  89. co = coroutine.create(function(...)
  90. f(...)
  91. while true do
  92. f = nil
  93. coroutine_pool[#coroutine_pool+1] = co
  94. f = coroutine_yield "EXIT"
  95. f(coroutine_yield())
  96. end
  97. end)
  98. else
  99. coroutine_resume(co, f)
  100. end
  101. return co
  102. end
  103. local function dispatch_wakeup()
  104. local co = table.remove(wakeup_queue,1)
  105. if co then
  106. local session = sleep_session[co]
  107. if session then
  108. session_id_coroutine[session] = "BREAK"
  109. return suspend(co, coroutine_resume(co, false, "BREAK"))
  110. end
  111. end
  112. end
  113. local function release_watching(address)
  114. local ref = watching_service[address]
  115. if ref then
  116. ref = ref - 1
  117. if ref > 0 then
  118. watching_service[address] = ref
  119. else
  120. watching_service[address] = nil
  121. end
  122. end
  123. end
  124. -- suspend is local function
  125. function suspend(co, result, command, param, size)
  126. if not result then
  127. local session = session_coroutine_id[co]
  128. if session then -- coroutine may fork by others (session is nil)
  129. local addr = session_coroutine_address[co]
  130. if session ~= 0 then
  131. -- only call response error
  132. c.send(addr, skynet.PTYPE_ERROR, session, "")
  133. end
  134. session_coroutine_id[co] = nil
  135. session_coroutine_address[co] = nil
  136. end
  137. error(debug.traceback(co,tostring(command)))
  138. end
  139. if command == "CALL" then
  140. session_id_coroutine[param] = co
  141. elseif command == "SLEEP" then
  142. session_id_coroutine[param] = co
  143. sleep_session[co] = param
  144. elseif command == "RETURN" then
  145. local co_session = session_coroutine_id[co]
  146. if co_session == 0 then
  147. if size ~= nil then
  148. c.trash(param, size)
  149. end
  150. return suspend(co, coroutine_resume(co, false)) -- send don't need ret
  151. end
  152. local co_address = session_coroutine_address[co]
  153. if param == nil or session_response[co] then
  154. error(debug.traceback(co))
  155. end
  156. session_response[co] = true
  157. local ret
  158. if not dead_service[co_address] then
  159. ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) ~= nil
  160. if not ret then
  161. -- If the package is too large, returns nil. so we should report error back
  162. c.send(co_address, skynet.PTYPE_ERROR, co_session, "")
  163. end
  164. elseif size ~= nil then
  165. c.trash(param, size)
  166. ret = false
  167. end
  168. return suspend(co, coroutine_resume(co, ret))
  169. elseif command == "RESPONSE" then
  170. local co_session = session_coroutine_id[co]
  171. local co_address = session_coroutine_address[co]
  172. if session_response[co] then
  173. error(debug.traceback(co))
  174. end
  175. local f = param
  176. local function response(ok, ...)
  177. if ok == "TEST" then
  178. if dead_service[co_address] then
  179. release_watching(co_address)
  180. unresponse[response] = nil
  181. f = false
  182. return false
  183. else
  184. return true
  185. end
  186. end
  187. if not f then
  188. if f == false then
  189. f = nil
  190. return false
  191. end
  192. error "Can't response more than once"
  193. end
  194. local ret
  195. -- do not response when session == 0 (send)
  196. if co_session ~= 0 and not dead_service[co_address] then
  197. if ok then
  198. ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, f(...)) ~= nil
  199. if not ret then
  200. -- If the package is too large, returns false. so we should report error back
  201. c.send(co_address, skynet.PTYPE_ERROR, co_session, "")
  202. end
  203. else
  204. ret = c.send(co_address, skynet.PTYPE_ERROR, co_session, "") ~= nil
  205. end
  206. else
  207. ret = false
  208. end
  209. release_watching(co_address)
  210. unresponse[response] = nil
  211. f = nil
  212. return ret
  213. end
  214. watching_service[co_address] = watching_service[co_address] + 1
  215. session_response[co] = true
  216. unresponse[response] = true
  217. return suspend(co, coroutine_resume(co, response))
  218. elseif command == "EXIT" then
  219. -- coroutine exit
  220. local address = session_coroutine_address[co]
  221. if address then
  222. release_watching(address)
  223. session_coroutine_id[co] = nil
  224. session_coroutine_address[co] = nil
  225. session_response[co] = nil
  226. end
  227. elseif command == "QUIT" then
  228. -- service exit
  229. return
  230. elseif command == "USER" then
  231. -- See skynet.coutine for detail
  232. error("Call skynet.coroutine.yield out of skynet.coroutine.resume\n" .. debug.traceback(co))
  233. elseif command == nil then
  234. -- debug trace
  235. return
  236. else
  237. error("Unknown command : " .. command .. "\n" .. debug.traceback(co))
  238. end
  239. dispatch_wakeup()
  240. dispatch_error_queue()
  241. end
  242. function skynet.timeout(ti, func)
  243. local session = c.intcommand("TIMEOUT",ti)
  244. assert(session)
  245. local co = co_create(func)
  246. assert(session_id_coroutine[session] == nil)
  247. session_id_coroutine[session] = co
  248. end
  249. function skynet.sleep(ti)
  250. local session = c.intcommand("TIMEOUT",ti)
  251. assert(session)
  252. local succ, ret = coroutine_yield("SLEEP", session)
  253. sleep_session[coroutine.running()] = nil
  254. if succ then
  255. return
  256. end
  257. if ret == "BREAK" then
  258. return "BREAK"
  259. else
  260. error(ret)
  261. end
  262. end
  263. function skynet.yield()
  264. return skynet.sleep(0)
  265. end
  266. function skynet.wait(co)
  267. local session = c.genid()
  268. local ret, msg = coroutine_yield("SLEEP", session)
  269. co = co or coroutine.running()
  270. sleep_session[co] = nil
  271. session_id_coroutine[session] = nil
  272. end
  273. function skynet.self()
  274. return c.addresscommand "REG"
  275. end
  276. function skynet.localname(name)
  277. return c.addresscommand("QUERY", name)
  278. end
  279. skynet.now = c.now
  280. local starttime
  281. function skynet.starttime()
  282. if not starttime then
  283. starttime = c.intcommand("STARTTIME")
  284. end
  285. return starttime
  286. end
  287. function skynet.time()
  288. return skynet.now()/100 + (starttime or skynet.starttime())
  289. end
  290. function skynet.exit()
  291. fork_queue = {} -- no fork coroutine can be execute after skynet.exit
  292. skynet.send(".launcher","lua","REMOVE",skynet.self(), false)
  293. -- report the sources that call me
  294. for co, session in pairs(session_coroutine_id) do
  295. local address = session_coroutine_address[co]
  296. if session~=0 and address then
  297. c.send(address, skynet.PTYPE_ERROR, session, "")
  298. end
  299. end
  300. for resp in pairs(unresponse) do
  301. resp(false)
  302. end
  303. -- report the sources I call but haven't return
  304. local tmp = {}
  305. for session, address in pairs(watching_session) do
  306. tmp[address] = true
  307. end
  308. for address in pairs(tmp) do
  309. c.send(address, skynet.PTYPE_ERROR, 0, "")
  310. end
  311. c.command("EXIT")
  312. -- quit service
  313. coroutine_yield "QUIT"
  314. end
  315. function skynet.getenv(key)
  316. return (c.command("GETENV",key))
  317. end
  318. function skynet.setenv(key, value)
  319. assert(c.command("GETENV",key) == nil, "Can't setenv exist key : " .. key)
  320. c.command("SETENV",key .. " " ..value)
  321. end
  322. function skynet.send(addr, typename, ...)
  323. local p = proto[typename]
  324. return c.send(addr, p.id, 0 , p.pack(...))
  325. end
  326. function skynet.rawsend(addr, typename, msg, sz)
  327. local p = proto[typename]
  328. return c.send(addr, p.id, 0 , msg, sz)
  329. end
  330. skynet.genid = assert(c.genid)
  331. skynet.redirect = function(dest,source,typename,...)
  332. return c.redirect(dest, source, proto[typename].id, ...)
  333. end
  334. skynet.pack = assert(c.pack)
  335. skynet.packstring = assert(c.packstring)
  336. skynet.unpack = assert(c.unpack)
  337. skynet.tostring = assert(c.tostring)
  338. skynet.trash = assert(c.trash)
  339. local function yield_call(service, session)
  340. watching_session[session] = service
  341. local succ, msg, sz = coroutine_yield("CALL", session)
  342. watching_session[session] = nil
  343. if not succ then
  344. error "call failed"
  345. end
  346. return msg,sz
  347. end
  348. function skynet.call(addr, typename, ...)
  349. local p = proto[typename]
  350. local session = c.send(addr, p.id , nil , p.pack(...))
  351. if session == nil then
  352. error("call to invalid address " .. skynet.address(addr))
  353. end
  354. return p.unpack(yield_call(addr, session))
  355. end
  356. function skynet.rawcall(addr, typename, msg, sz)
  357. local p = proto[typename]
  358. local session = assert(c.send(addr, p.id , nil , msg, sz), "call to invalid address")
  359. return yield_call(addr, session)
  360. end
  361. function skynet.ret(msg, sz)
  362. msg = msg or ""
  363. return coroutine_yield("RETURN", msg, sz)
  364. end
  365. function skynet.response(pack)
  366. pack = pack or skynet.pack
  367. return coroutine_yield("RESPONSE", pack)
  368. end
  369. function skynet.retpack(...)
  370. return skynet.ret(skynet.pack(...))
  371. end
  372. function skynet.wakeup(co)
  373. if sleep_session[co] then
  374. table.insert(wakeup_queue, co)
  375. return true
  376. end
  377. end
  378. function skynet.dispatch(typename, func)
  379. local p = proto[typename]
  380. if func then
  381. local ret = p.dispatch
  382. p.dispatch = func
  383. return ret
  384. else
  385. return p and p.dispatch
  386. end
  387. end
  388. local function unknown_request(session, address, msg, sz, prototype)
  389. skynet.error(string.format("Unknown request (%s): %s", prototype, c.tostring(msg,sz)))
  390. error(string.format("Unknown session : %d from %x", session, address))
  391. end
  392. function skynet.dispatch_unknown_request(unknown)
  393. local prev = unknown_request
  394. unknown_request = unknown
  395. return prev
  396. end
  397. local function unknown_response(session, address, msg, sz)
  398. skynet.error(string.format("Response message : %s" , c.tostring(msg,sz)))
  399. error(string.format("Unknown session : %d from %x", session, address))
  400. end
  401. function skynet.dispatch_unknown_response(unknown)
  402. local prev = unknown_response
  403. unknown_response = unknown
  404. return prev
  405. end
  406. function skynet.fork(func,...)
  407. local args = table.pack(...)
  408. local co = co_create(function()
  409. func(table.unpack(args,1,args.n))
  410. end)
  411. table.insert(fork_queue, co)
  412. return co
  413. end
  414. local function raw_dispatch_message(prototype, msg, sz, session, source)
  415. -- skynet.PTYPE_RESPONSE = 1, read skynet.h
  416. if prototype == 1 then
  417. local co = session_id_coroutine[session]
  418. if co == "BREAK" then
  419. session_id_coroutine[session] = nil
  420. elseif co == nil then
  421. unknown_response(session, source, msg, sz)
  422. else
  423. session_id_coroutine[session] = nil
  424. suspend(co, coroutine_resume(co, true, msg, sz))
  425. end
  426. else
  427. local p = proto[prototype]
  428. if p == nil then
  429. if session ~= 0 then
  430. c.send(source, skynet.PTYPE_ERROR, session, "")
  431. else
  432. unknown_request(session, source, msg, sz, prototype)
  433. end
  434. return
  435. end
  436. local f = p.dispatch
  437. if f then
  438. local ref = watching_service[source]
  439. if ref then
  440. watching_service[source] = ref + 1
  441. else
  442. watching_service[source] = 1
  443. end
  444. local co = co_create(f)
  445. session_coroutine_id[co] = session
  446. session_coroutine_address[co] = source
  447. suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
  448. elseif session ~= 0 then
  449. c.send(source, skynet.PTYPE_ERROR, session, "")
  450. else
  451. unknown_request(session, source, msg, sz, proto[prototype].name)
  452. end
  453. end
  454. end
  455. function skynet.dispatch_message(...)
  456. local succ, err = pcall(raw_dispatch_message,...)
  457. while true do
  458. local key,co = next(fork_queue)
  459. if co == nil then
  460. break
  461. end
  462. fork_queue[key] = nil
  463. local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
  464. if not fork_succ then
  465. if succ then
  466. succ = false
  467. err = tostring(fork_err)
  468. else
  469. err = tostring(err) .. "\n" .. tostring(fork_err)
  470. end
  471. end
  472. end
  473. assert(succ, tostring(err))
  474. end
  475. function skynet.newservice(name, ...)
  476. return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)
  477. end
  478. function skynet.uniqueservice(global, ...)
  479. if global == true then
  480. return assert(skynet.call(".service", "lua", "GLAUNCH", ...))
  481. else
  482. return assert(skynet.call(".service", "lua", "LAUNCH", global, ...))
  483. end
  484. end
  485. function skynet.queryservice(global, ...)
  486. if global == true then
  487. return assert(skynet.call(".service", "lua", "GQUERY", ...))
  488. else
  489. return assert(skynet.call(".service", "lua", "QUERY", global, ...))
  490. end
  491. end
  492. function skynet.address(addr)
  493. if type(addr) == "number" then
  494. return string.format(":%08x",addr)
  495. else
  496. return tostring(addr)
  497. end
  498. end
  499. function skynet.harbor(addr)
  500. return c.harbor(addr)
  501. end
  502. skynet.error = c.error
  503. ----- register protocol
  504. do
  505. local REG = skynet.register_protocol
  506. REG {
  507. name = "lua",
  508. id = skynet.PTYPE_LUA,
  509. pack = skynet.pack,
  510. unpack = skynet.unpack,
  511. }
  512. REG {
  513. name = "response",
  514. id = skynet.PTYPE_RESPONSE,
  515. }
  516. REG {
  517. name = "error",
  518. id = skynet.PTYPE_ERROR,
  519. unpack = function(...) return ... end,
  520. dispatch = _error_dispatch,
  521. }
  522. end
  523. local init_func = {}
  524. function skynet.init(f, name)
  525. assert(type(f) == "function")
  526. if init_func == nil then
  527. f()
  528. else
  529. table.insert(init_func, f)
  530. if name then
  531. assert(type(name) == "string")
  532. assert(init_func[name] == nil)
  533. init_func[name] = f
  534. end
  535. end
  536. end
  537. local function init_all()
  538. local funcs = init_func
  539. init_func = nil
  540. if funcs then
  541. for _,f in ipairs(funcs) do
  542. f()
  543. end
  544. end
  545. end
  546. local function ret(f, ...)
  547. f()
  548. return ...
  549. end
  550. local function init_template(start, ...)
  551. init_all()
  552. init_func = {}
  553. return ret(init_all, start(...))
  554. end
  555. function skynet.pcall(start, ...)
  556. return xpcall(init_template, debug.traceback, start, ...)
  557. end
  558. function skynet.init_service(start)
  559. local ok, err = skynet.pcall(start)
  560. if not ok then
  561. skynet.error("init service failed: " .. tostring(err))
  562. skynet.send(".launcher","lua", "ERROR")
  563. skynet.exit()
  564. else
  565. skynet.send(".launcher","lua", "LAUNCHOK")
  566. end
  567. end
  568. function skynet.start(start_func)
  569. c.callback(skynet.dispatch_message)
  570. skynet.timeout(0, function()
  571. skynet.init_service(start_func)
  572. end)
  573. end
  574. function skynet.endless()
  575. return (c.intcommand("STAT", "endless") == 1)
  576. end
  577. function skynet.mqlen()
  578. return c.intcommand("STAT", "mqlen")
  579. end
  580. function skynet.stat(what)
  581. return c.intcommand("STAT", what)
  582. end
  583. function skynet.task(ret)
  584. local t = 0
  585. for session,co in pairs(session_id_coroutine) do
  586. if ret then
  587. ret[session] = debug.traceback(co)
  588. end
  589. t = t + 1
  590. end
  591. return t
  592. end
  593. function skynet.term(service)
  594. return _error_dispatch(0, service)
  595. end
  596. function skynet.memlimit(bytes)
  597. debug.getregistry().memlimit = bytes
  598. skynet.memlimit = nil -- set only once
  599. end
  600. -- Inject internal debug framework
  601. local debug = require "skynet.debug"
  602. debug.init(skynet, {
  603. dispatch = skynet.dispatch_message,
  604. suspend = suspend,
  605. })
  606. return skynet