sharedatad.lua 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. local skynet = require "skynet"
  2. local sharedata = require "skynet.sharedata.corelib"
  3. local table = table
  4. local cache = require "skynet.codecache"
  5. cache.mode "OFF" -- turn off codecache, because CMD.new may load data file
  6. local NORET = {}
  7. local pool = {}
  8. local pool_count = {}
  9. local objmap = {}
  10. local collect_tick = 600
  11. local function newobj(name, tbl)
  12. assert(pool[name] == nil)
  13. local cobj = sharedata.host.new(tbl)
  14. sharedata.host.incref(cobj)
  15. local v = { value = tbl , obj = cobj, watch = {} }
  16. objmap[cobj] = v
  17. pool[name] = v
  18. pool_count[name] = { n = 0, threshold = 16 }
  19. end
  20. local function collect10sec()
  21. if collect_tick > 10 then
  22. collect_tick = 10
  23. end
  24. end
  25. local function collectobj()
  26. while true do
  27. skynet.sleep(100) -- sleep 1s
  28. if collect_tick <= 0 then
  29. collect_tick = 600 -- reset tick count to 600 sec
  30. collectgarbage()
  31. for obj, v in pairs(objmap) do
  32. if v == true then
  33. if sharedata.host.getref(obj) <= 0 then
  34. objmap[obj] = nil
  35. sharedata.host.delete(obj)
  36. end
  37. end
  38. end
  39. else
  40. collect_tick = collect_tick - 1
  41. end
  42. end
  43. end
  44. local CMD = {}
  45. local env_mt = { __index = _ENV }
  46. function CMD.new(name, t, ...)
  47. local dt = type(t)
  48. local value
  49. if dt == "table" then
  50. value = t
  51. elseif dt == "string" then
  52. value = setmetatable({}, env_mt)
  53. local f
  54. if t:sub(1,1) == "@" then
  55. f = assert(loadfile(t:sub(2),"bt",value))
  56. else
  57. f = assert(load(t, "=" .. name, "bt", value))
  58. end
  59. local _, ret = assert(skynet.pcall(f, ...))
  60. setmetatable(value, nil)
  61. if type(ret) == "table" then
  62. value = ret
  63. end
  64. elseif dt == "nil" then
  65. value = {}
  66. else
  67. error ("Unknown data type " .. dt)
  68. end
  69. newobj(name, value)
  70. end
  71. function CMD.delete(name)
  72. local v = assert(pool[name])
  73. pool[name] = nil
  74. pool_count[name] = nil
  75. assert(objmap[v.obj])
  76. objmap[v.obj] = true
  77. sharedata.host.decref(v.obj)
  78. for _,response in pairs(v.watch) do
  79. response(true)
  80. end
  81. end
  82. function CMD.query(name)
  83. local v = assert(pool[name])
  84. local obj = v.obj
  85. sharedata.host.incref(obj)
  86. return v.obj
  87. end
  88. function CMD.confirm(cobj)
  89. if objmap[cobj] then
  90. sharedata.host.decref(cobj)
  91. end
  92. return NORET
  93. end
  94. function CMD.update(name, t, ...)
  95. local v = pool[name]
  96. local watch, oldcobj
  97. if v then
  98. watch = v.watch
  99. oldcobj = v.obj
  100. objmap[oldcobj] = true
  101. sharedata.host.decref(oldcobj)
  102. pool[name] = nil
  103. pool_count[name] = nil
  104. end
  105. CMD.new(name, t, ...)
  106. local newobj = pool[name].obj
  107. if watch then
  108. sharedata.host.markdirty(oldcobj)
  109. for _,response in pairs(watch) do
  110. response(true, newobj)
  111. end
  112. end
  113. collect10sec() -- collect in 10 sec
  114. end
  115. local function check_watch(queue)
  116. local n = 0
  117. for k,response in pairs(queue) do
  118. if not response "TEST" then
  119. queue[k] = nil
  120. n = n + 1
  121. end
  122. end
  123. return n
  124. end
  125. function CMD.monitor(name, obj)
  126. local v = assert(pool[name])
  127. if obj ~= v.obj then
  128. return v.obj
  129. end
  130. local n = pool_count[name].n + 1
  131. if n > pool_count[name].threshold then
  132. n = n - check_watch(v.watch)
  133. pool_count[name].threshold = n * 2
  134. end
  135. pool_count[name].n = n
  136. table.insert(v.watch, skynet.response())
  137. return NORET
  138. end
  139. skynet.start(function()
  140. skynet.fork(collectobj)
  141. skynet.dispatch("lua", function (session, source ,cmd, ...)
  142. local f = assert(CMD[cmd])
  143. local r = f(...)
  144. if r ~= NORET then
  145. skynet.ret(skynet.pack(r))
  146. end
  147. end)
  148. end)