multicastd.lua 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. local skynet = require "skynet"
  2. local mc = require "skynet.multicast.core"
  3. local datacenter = require "skynet.datacenter"
  4. local harbor_id = skynet.harbor(skynet.self())
  5. local command = {}
  6. local channel = {}
  7. local channel_n = {}
  8. local channel_remote = {}
  9. local channel_id = harbor_id
  10. local NORET = {}
  11. local function get_address(t, id)
  12. local v = assert(datacenter.get("multicast", id))
  13. t[id] = v
  14. return v
  15. end
  16. local node_address = setmetatable({}, { __index = get_address })
  17. -- new LOCAL channel , The low 8bit is the same with harbor_id
  18. function command.NEW()
  19. while channel[channel_id] do
  20. channel_id = mc.nextid(channel_id)
  21. end
  22. channel[channel_id] = {}
  23. channel_n[channel_id] = 0
  24. local ret = channel_id
  25. channel_id = mc.nextid(channel_id)
  26. return ret
  27. end
  28. -- MUST call by the owner node of channel, delete a remote channel
  29. function command.DELR(source, c)
  30. channel[c] = nil
  31. channel_n[c] = nil
  32. return NORET
  33. end
  34. -- delete a channel, if the channel is remote, forward the command to the owner node
  35. -- otherwise, delete the channel, and call all the remote node, DELR
  36. function command.DEL(source, c)
  37. local node = c % 256
  38. if node ~= harbor_id then
  39. skynet.send(node_address[node], "lua", "DEL", c)
  40. return NORET
  41. end
  42. local remote = channel_remote[c]
  43. channel[c] = nil
  44. channel_n[c] = nil
  45. channel_remote[c] = nil
  46. if remote then
  47. for node in pairs(remote) do
  48. skynet.send(node_address[node], "lua", "DELR", c)
  49. end
  50. end
  51. return NORET
  52. end
  53. -- forward multicast message to a node (channel id use the session field)
  54. local function remote_publish(node, channel, source, ...)
  55. skynet.redirect(node_address[node], source, "multicast", channel, ...)
  56. end
  57. -- publish a message, for local node, use the message pointer (call mc.bind to add the reference)
  58. -- for remote node, call remote_publish. (call mc.unpack and skynet.tostring to convert message pointer to string)
  59. local function publish(c , source, pack, size)
  60. local remote = channel_remote[c]
  61. if remote then
  62. -- remote publish should unpack the pack, because we should not publish the pointer out.
  63. local _, msg, sz = mc.unpack(pack, size)
  64. local msg = skynet.tostring(msg,sz)
  65. for node in pairs(remote) do
  66. remote_publish(node, c, source, msg)
  67. end
  68. end
  69. local group = channel[c]
  70. if group == nil or next(group) == nil then
  71. -- dead channel, delete the pack. mc.bind returns the pointer in pack and free the pack (struct mc_package **)
  72. local pack = mc.bind(pack, 1)
  73. mc.close(pack)
  74. return
  75. end
  76. local msg = skynet.tostring(pack, size) -- copy (pack,size) to a string
  77. mc.bind(pack, channel_n[c]) -- mc.bind will free the pack(struct mc_package **)
  78. for k in pairs(group) do
  79. -- the msg is a pointer to the real message, publish pointer in local is ok.
  80. skynet.redirect(k, source, "multicast", c , msg)
  81. end
  82. end
  83. skynet.register_protocol {
  84. name = "multicast",
  85. id = skynet.PTYPE_MULTICAST,
  86. unpack = function(msg, sz)
  87. return mc.packremote(msg, sz)
  88. end,
  89. dispatch = publish,
  90. }
  91. -- publish a message, if the caller is remote, forward the message to the owner node (by remote_publish)
  92. -- If the caller is local, call publish
  93. function command.PUB(source, c, pack, size)
  94. assert(skynet.harbor(source) == harbor_id)
  95. local node = c % 256
  96. if node ~= harbor_id then
  97. -- remote publish
  98. remote_publish(node, c, source, mc.remote(pack))
  99. else
  100. publish(c, source, pack,size)
  101. end
  102. end
  103. -- the node (source) subscribe a channel
  104. -- MUST call by channel owner node (assert source is not local and channel is create by self)
  105. -- If channel is not exist, return true
  106. -- Else set channel_remote[channel] true
  107. function command.SUBR(source, c)
  108. local node = skynet.harbor(source)
  109. if not channel[c] then
  110. -- channel none exist
  111. return true
  112. end
  113. assert(node ~= harbor_id and c % 256 == harbor_id)
  114. local group = channel_remote[c]
  115. if group == nil then
  116. group = {}
  117. channel_remote[c] = group
  118. end
  119. group[node] = true
  120. end
  121. -- the service (source) subscribe a channel
  122. -- If the channel is remote, node subscribe it by send a SUBR to the owner .
  123. function command.SUB(source, c)
  124. local node = c % 256
  125. if node ~= harbor_id then
  126. -- remote group
  127. if channel[c] == nil then
  128. if skynet.call(node_address[node], "lua", "SUBR", c) then
  129. return
  130. end
  131. if channel[c] == nil then
  132. -- double check, because skynet.call whould yield, other SUB may occur.
  133. channel[c] = {}
  134. channel_n[c] = 0
  135. end
  136. end
  137. end
  138. local group = channel[c]
  139. if group and not group[source] then
  140. channel_n[c] = channel_n[c] + 1
  141. group[source] = true
  142. end
  143. end
  144. -- MUST call by a node, unsubscribe a channel
  145. function command.USUBR(source, c)
  146. local node = skynet.harbor(source)
  147. assert(node ~= harbor_id)
  148. local group = assert(channel_remote[c])
  149. group[node] = nil
  150. return NORET
  151. end
  152. -- Unsubscribe a channel, if the subscriber is empty and the channel is remote, send USUBR to the channel owner
  153. function command.USUB(source, c)
  154. local group = assert(channel[c])
  155. if group[source] then
  156. group[source] = nil
  157. channel_n[c] = channel_n[c] - 1
  158. if channel_n[c] == 0 then
  159. local node = c % 256
  160. if node ~= harbor_id then
  161. -- remote group
  162. channel[c] = nil
  163. channel_n[c] = nil
  164. skynet.send(node_address[node], "lua", "USUBR", c)
  165. end
  166. end
  167. end
  168. return NORET
  169. end
  170. skynet.start(function()
  171. skynet.dispatch("lua", function(_,source, cmd, ...)
  172. local f = assert(command[cmd])
  173. local result = f(source, ...)
  174. if result ~= NORET then
  175. skynet.ret(skynet.pack(result))
  176. end
  177. end)
  178. local self = skynet.self()
  179. local id = skynet.harbor(self)
  180. assert(datacenter.set("multicast", id, self) == nil)
  181. end)