mqueue.lua 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. -- This is a deprecated module, use skynet.queue instead.
  2. local skynet = require "skynet"
  3. local c = require "skynet.core"
  4. local mqueue = {}
  5. local init_once
  6. local thread_id
  7. local message_queue = {}
  8. skynet.register_protocol {
  9. name = "queue",
  10. -- please read skynet.h for magic number 8
  11. id = 8,
  12. pack = skynet.pack,
  13. unpack = skynet.unpack,
  14. dispatch = function(session, from, ...)
  15. table.insert(message_queue, {session = session, addr = from, ... })
  16. if thread_id then
  17. skynet.wakeup(thread_id)
  18. thread_id = nil
  19. end
  20. end
  21. }
  22. local function do_func(f, msg)
  23. return pcall(f, table.unpack(msg))
  24. end
  25. local function message_dispatch(f)
  26. while true do
  27. if #message_queue==0 then
  28. thread_id = coroutine.running()
  29. skynet.wait()
  30. else
  31. local msg = table.remove(message_queue,1)
  32. local session = msg.session
  33. if session == 0 then
  34. local ok, msg = do_func(f, msg)
  35. if ok then
  36. if msg then
  37. skynet.fork(message_dispatch,f)
  38. error(string.format("[:%x] send a message to [:%x] return something", msg.addr, skynet.self()))
  39. end
  40. else
  41. skynet.fork(message_dispatch,f)
  42. error(string.format("[:%x] send a message to [:%x] throw an error : %s", msg.addr, skynet.self(),msg))
  43. end
  44. else
  45. local data, size = skynet.pack(do_func(f,msg))
  46. -- 1 means response
  47. c.send(msg.addr, 1, session, data, size)
  48. end
  49. end
  50. end
  51. end
  52. function mqueue.register(f)
  53. assert(init_once == nil)
  54. init_once = true
  55. skynet.fork(message_dispatch,f)
  56. end
  57. local function catch(succ, ...)
  58. if succ then
  59. return ...
  60. else
  61. error(...)
  62. end
  63. end
  64. function mqueue.call(addr, ...)
  65. return catch(skynet.call(addr, "queue", ...))
  66. end
  67. function mqueue.send(addr, ...)
  68. return skynet.send(addr, "queue", ...)
  69. end
  70. function mqueue.size()
  71. return #message_queue
  72. end
  73. return mqueue