lib_queue.lua 958 B

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. --
  2. -- 用户操作锁
  3. --
  4. local skynet = require "skynet"
  5. local coroutine = coroutine
  6. local xpcall = xpcall
  7. local traceback = debug.traceback
  8. local table = table
  9. local queue = {
  10. queue = {}
  11. }
  12. function queue.critical_section(uid)
  13. local pq = queue.queue[uid]
  14. if pq == nil then
  15. pq = {ref = 0, thread_queue = {}}
  16. queue.queue[uid] = pq
  17. end
  18. local function xpcall_ret(ok, ...)
  19. pq.ref = pq.ref - 1
  20. if pq.ref == 0 then
  21. pq.current_thread = table.remove(pq.thread_queue, 1)
  22. if pq.current_thread then
  23. skynet.wakeup(pq.current_thread)
  24. end
  25. end
  26. assert(ok, (...))
  27. return ...
  28. end
  29. return function(f, ...)
  30. local thread = coroutine.running()
  31. if pq.current_thread and pq.current_thread ~= thread then
  32. table.insert(pq.thread_queue, thread)
  33. skynet.wait()
  34. assert(pq.ref == 0) -- current_thread == thread
  35. end
  36. pq.current_thread = thread
  37. pq.ref = pq.ref + 1
  38. return xpcall_ret(xpcall(f, traceback, ...))
  39. end
  40. end
  41. return queue