123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- local skynet = require "skynet"
- local mc = require "skynet.multicast.core"
- local multicastd
- local multicast = {}
- local dispatch = setmetatable({} , {__mode = "kv" })
- local chan = {}
- local chan_meta = {
- __index = chan,
- __gc = function(self)
- self:unsubscribe()
- end,
- __tostring = function (self)
- return string.format("[Multicast:%x]",self.channel)
- end,
- }
- local function default_conf(conf)
- conf = conf or {}
- conf.pack = conf.pack or skynet.pack
- conf.unpack = conf.unpack or skynet.unpack
- return conf
- end
- function multicast.new(conf)
- assert(multicastd, "Init first")
- local self = {}
- conf = conf or self
- self.channel = conf.channel
- if self.channel == nil then
- self.channel = skynet.call(multicastd, "lua", "NEW")
- end
- self.__pack = conf.pack or skynet.pack
- self.__unpack = conf.unpack or skynet.unpack
- self.__dispatch = conf.dispatch
- return setmetatable(self, chan_meta)
- end
- function chan:delete()
- local c = assert(self.channel)
- skynet.send(multicastd, "lua", "DEL", c)
- self.channel = nil
- self.__subscribe = nil
- end
- function chan:publish(...)
- local c = assert(self.channel)
- skynet.call(multicastd, "lua", "PUB", c, mc.pack(self.__pack(...)))
- end
- function chan:subscribe()
- local c = assert(self.channel)
- if self.__subscribe then
- -- already subscribe
- return
- end
- skynet.call(multicastd, "lua", "SUB", c)
- self.__subscribe = true
- dispatch[c] = self
- end
- function chan:unsubscribe()
- if not self.__subscribe then
- -- already unsubscribe
- return
- end
- local c = assert(self.channel)
- skynet.send(multicastd, "lua", "USUB", c)
- self.__subscribe = nil
- end
- local function dispatch_subscribe(channel, source, pack, msg, sz)
- local self = dispatch[channel]
- if not self then
- mc.close(pack)
- error ("Unknown channel " .. channel)
- end
- if self.__subscribe then
- local ok, err = pcall(self.__dispatch, self, source, self.__unpack(msg, sz))
- mc.close(pack)
- assert(ok, err)
- else
- -- maybe unsubscribe first, but the message is send out. drop the message unneed
- mc.close(pack)
- end
- end
- local function init()
- multicastd = skynet.uniqueservice "multicastd"
- skynet.register_protocol {
- name = "multicast",
- id = skynet.PTYPE_MULTICAST,
- unpack = mc.unpack,
- dispatch = dispatch_subscribe,
- }
- end
- skynet.init(init, "multicast")
- return multicast
|