queue.lua 801 B

123456789101112131415161718192021222324252627282930313233343536373839
  1. local skynet = require "skynet"
  2. local coroutine = coroutine
  3. local xpcall = xpcall
  4. local traceback = debug.traceback
  5. local table = table
  6. function skynet.queue()
  7. local current_thread
  8. local ref = 0
  9. local thread_queue = {}
  10. local function xpcall_ret(ok, ...)
  11. ref = ref - 1
  12. if ref == 0 then
  13. current_thread = table.remove(thread_queue,1)
  14. if current_thread then
  15. skynet.wakeup(current_thread)
  16. end
  17. end
  18. assert(ok, (...))
  19. return ...
  20. end
  21. return function(f, ...)
  22. local thread = coroutine.running()
  23. if current_thread and current_thread ~= thread then
  24. table.insert(thread_queue, thread)
  25. skynet.wait()
  26. assert(ref == 0) -- current_thread == thread
  27. end
  28. current_thread = thread
  29. ref = ref + 1
  30. return xpcall_ret(xpcall(f, traceback, ...))
  31. end
  32. end
  33. return skynet.queue