multicast.lua 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. local skynet = require "skynet"
  2. local mc = require "skynet.multicast.core"
  3. local multicastd
  4. local multicast = {}
  5. local dispatch = setmetatable({} , {__mode = "kv" })
  6. local chan = {}
  7. local chan_meta = {
  8. __index = chan,
  9. __gc = function(self)
  10. self:unsubscribe()
  11. end,
  12. __tostring = function (self)
  13. return string.format("[Multicast:%x]",self.channel)
  14. end,
  15. }
  16. local function default_conf(conf)
  17. conf = conf or {}
  18. conf.pack = conf.pack or skynet.pack
  19. conf.unpack = conf.unpack or skynet.unpack
  20. return conf
  21. end
  22. function multicast.new(conf)
  23. assert(multicastd, "Init first")
  24. local self = {}
  25. conf = conf or self
  26. self.channel = conf.channel
  27. if self.channel == nil then
  28. self.channel = skynet.call(multicastd, "lua", "NEW")
  29. end
  30. self.__pack = conf.pack or skynet.pack
  31. self.__unpack = conf.unpack or skynet.unpack
  32. self.__dispatch = conf.dispatch
  33. return setmetatable(self, chan_meta)
  34. end
  35. function chan:delete()
  36. local c = assert(self.channel)
  37. skynet.send(multicastd, "lua", "DEL", c)
  38. self.channel = nil
  39. self.__subscribe = nil
  40. end
  41. function chan:publish(...)
  42. local c = assert(self.channel)
  43. skynet.call(multicastd, "lua", "PUB", c, mc.pack(self.__pack(...)))
  44. end
  45. function chan:subscribe()
  46. local c = assert(self.channel)
  47. if self.__subscribe then
  48. -- already subscribe
  49. return
  50. end
  51. skynet.call(multicastd, "lua", "SUB", c)
  52. self.__subscribe = true
  53. dispatch[c] = self
  54. end
  55. function chan:unsubscribe()
  56. if not self.__subscribe then
  57. -- already unsubscribe
  58. return
  59. end
  60. local c = assert(self.channel)
  61. skynet.send(multicastd, "lua", "USUB", c)
  62. self.__subscribe = nil
  63. end
  64. local function dispatch_subscribe(channel, source, pack, msg, sz)
  65. local self = dispatch[channel]
  66. if not self then
  67. mc.close(pack)
  68. error ("Unknown channel " .. channel)
  69. end
  70. if self.__subscribe then
  71. local ok, err = pcall(self.__dispatch, self, source, self.__unpack(msg, sz))
  72. mc.close(pack)
  73. assert(ok, err)
  74. else
  75. -- maybe unsubscribe first, but the message is send out. drop the message unneed
  76. mc.close(pack)
  77. end
  78. end
  79. local function init()
  80. multicastd = skynet.uniqueservice "multicastd"
  81. skynet.register_protocol {
  82. name = "multicast",
  83. id = skynet.PTYPE_MULTICAST,
  84. unpack = mc.unpack,
  85. dispatch = dispatch_subscribe,
  86. }
  87. end
  88. skynet.init(init, "multicast")
  89. return multicast