builder.lua 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. local skynet = require "skynet"
  2. local dump = require "skynet.datasheet.dump"
  3. local core = require "skynet.datasheet.core"
  4. local service = require "skynet.service"
  5. local builder = {}
  6. local cache = {}
  7. local dataset = {}
  8. local address
  9. local unique_id = 0
  10. local function unique_string(str)
  11. unique_id = unique_id + 1
  12. return str .. tostring(unique_id)
  13. end
  14. local function monitor(pointer)
  15. skynet.fork(function()
  16. skynet.call(address, "lua", "collect", pointer)
  17. for k,v in pairs(cache) do
  18. if v == pointer then
  19. cache[k] = nil
  20. return
  21. end
  22. end
  23. end)
  24. end
  25. local function dumpsheet(v)
  26. if type(v) == "string" then
  27. return v
  28. else
  29. return dump.dump(v)
  30. end
  31. end
  32. function builder.new(name, v)
  33. assert(dataset[name] == nil)
  34. local datastring = unique_string(dumpsheet(v))
  35. local pointer = core.stringpointer(datastring)
  36. skynet.call(address, "lua", "update", name, pointer)
  37. cache[datastring] = pointer
  38. dataset[name] = datastring
  39. monitor(pointer)
  40. end
  41. function builder.update(name, v)
  42. local lastversion = assert(dataset[name])
  43. local newversion = dumpsheet(v)
  44. local diff = unique_string(dump.diff(lastversion, newversion))
  45. local pointer = core.stringpointer(diff)
  46. skynet.call(address, "lua", "update", name, pointer)
  47. cache[diff] = pointer
  48. local lp = assert(cache[lastversion])
  49. skynet.send(address, "lua", "release", lp)
  50. dataset[name] = diff
  51. monitor(pointer)
  52. end
  53. function builder.compile(v)
  54. return dump.dump(v)
  55. end
  56. local function datasheet_service()
  57. local skynet = require "skynet"
  58. local datasheet = {}
  59. local handles = {} -- handle:{ ref:count , name:name , collect:resp }
  60. local dataset = {} -- name:{ handle:handle, monitor:{monitors queue} }
  61. local function releasehandle(source, handle)
  62. local h = handles[handle]
  63. h.ref = h.ref - 1
  64. if h.ref == 0 and h.collect then
  65. h.collect(true)
  66. h.collect = nil
  67. handles[handle] = nil
  68. end
  69. local t=dataset[h.name]
  70. t.monitor[source]=nil
  71. end
  72. -- from builder, create or update handle
  73. function datasheet.update(source, name, handle)
  74. local t = dataset[name]
  75. if not t then
  76. -- new datasheet
  77. t = { handle = handle, monitor = {} }
  78. dataset[name] = t
  79. handles[handle] = { ref = 1, name = name }
  80. else
  81. -- report update to customers
  82. handles[handle] = { ref = handles[t.handle].ref, name = name }
  83. t.handle = handle
  84. for k,v in pairs(t.monitor) do
  85. v(true, handle)
  86. t.monitor[k] = nil
  87. end
  88. end
  89. skynet.ret()
  90. end
  91. -- from customers
  92. function datasheet.query(source, name)
  93. local t = assert(dataset[name], "create data first")
  94. local handle = t.handle
  95. local h = handles[handle]
  96. h.ref = h.ref + 1
  97. skynet.ret(skynet.pack(handle))
  98. end
  99. -- from customers, monitor handle change
  100. function datasheet.monitor(source, handle)
  101. local h = assert(handles[handle], "Invalid data handle")
  102. local t = dataset[h.name]
  103. if t.handle ~= handle then -- already changes
  104. skynet.ret(skynet.pack(t.handle))
  105. else
  106. assert(not t.monitor[source])
  107. t.monitor[source]=skynet.response()
  108. end
  109. end
  110. -- from customers, release handle , ref count - 1
  111. function datasheet.release(source, handle)
  112. -- send message, don't ret
  113. releasehandle(source, handle)
  114. end
  115. -- from builder, monitor handle release
  116. function datasheet.collect(source, handle)
  117. local h = assert(handles[handle], "Invalid data handle")
  118. if h.ref == 0 then
  119. handles[handle] = nil
  120. skynet.ret()
  121. else
  122. assert(h.collect == nil, "Only one collect allows")
  123. h.collect = skynet.response()
  124. end
  125. end
  126. skynet.dispatch("lua", function(_,source,cmd,...)
  127. datasheet[cmd](source,...)
  128. end)
  129. skynet.info_func(function()
  130. local info = {}
  131. local tmp = {}
  132. for k,v in pairs(handles) do
  133. tmp[k] = v
  134. end
  135. for k,v in pairs(dataset) do
  136. local h = handles[v.handle]
  137. tmp[v.handle] = nil
  138. info[k] = {
  139. handle = v.handle,
  140. monitors = h.ref,
  141. }
  142. end
  143. for k,v in pairs(tmp) do
  144. info[k] = v.ref
  145. end
  146. return info
  147. end)
  148. end
  149. skynet.init(function()
  150. address=service.new("datasheet", datasheet_service)
  151. end)
  152. return builder