123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- local skynet = require "skynet"
- local sc = require "skynet.socketchannel"
- local socket = require "skynet.socket"
- local cluster = require "skynet.cluster.core"
- local config_name = skynet.getenv "cluster"
- local node_address = {}
- local node_session = {}
- local command = {}
- local function read_response(sock)
- local sz = socket.header(sock:read(2))
- local msg = sock:read(sz)
- return cluster.unpackresponse(msg) -- session, ok, data, padding
- end
- local connecting = {}
- local function open_channel(t, key)
- local ct = connecting[key]
- if ct then
- local co = coroutine.running()
- table.insert(ct, co)
- skynet.wait(co)
- return assert(ct.channel)
- end
- ct = {}
- connecting[key] = ct
- local address = node_address[key]
- if address == nil then
- local co = coroutine.running()
- assert(ct.namequery == nil)
- ct.namequery = co
- skynet.error("Wating for cluster node [".. key.."]")
- skynet.wait(co)
- address = node_address[key]
- assert(address ~= nil)
- end
- local succ, err, c
- if address then
- local host, port = string.match(address, "([^:]+):(.*)$")
- c = sc.channel {
- host = host,
- port = tonumber(port),
- response = read_response,
- nodelay = true,
- }
- succ, err = pcall(c.connect, c, true)
- if succ then
- t[key] = c
- ct.channel = c
- end
- else
- err = "cluster node [" .. key .. "] is down."
- end
- connecting[key] = nil
- for _, co in ipairs(ct) do
- skynet.wakeup(co)
- end
- assert(succ, err)
- return c
- end
- local node_channel = setmetatable({}, { __index = open_channel })
- local function loadconfig(tmp)
- if tmp == nil then
- tmp = {}
- if config_name then
- local f = assert(io.open(config_name))
- local source = f:read "*a"
- f:close()
- assert(load(source, "@"..config_name, "t", tmp))()
- end
- end
- for name,address in pairs(tmp) do
- assert(address == false or type(address) == "string")
- if node_address[name] ~= address then
- -- address changed
- if rawget(node_channel, name) then
- node_channel[name] = nil -- reset connection
- end
- node_address[name] = address
- end
- local ct = connecting[name]
- if ct and ct.namequery then
- skynet.error(string.format("Cluster node [%s] resloved : %s", name, address))
- skynet.wakeup(ct.namequery)
- end
- end
- end
- function command.reload(source, config)
- loadconfig(config)
- skynet.ret(skynet.pack(nil))
- end
- function command.listen(source, addr, port)
- local gate = skynet.newservice("gate")
- if port == nil then
- local address = assert(node_address[addr], addr .. " is down")
- addr, port = string.match(address, "([^:]+):(.*)$")
- end
- skynet.call(gate, "lua", "open", { address = addr, port = port })
- skynet.ret(skynet.pack(nil))
- end
- local function send_request(source, node, addr, msg, sz)
- local session = node_session[node] or 1
- -- msg is a local pointer, cluster.packrequest will free it
- local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
- node_session[node] = new_session
- -- node_channel[node] may yield or throw error
- local c = node_channel[node]
- return c:request(request, session, padding)
- end
- function command.req(...)
- local ok, msg = pcall(send_request, ...)
- if ok then
- if type(msg) == "table" then
- skynet.ret(cluster.concat(msg))
- else
- skynet.ret(msg)
- end
- else
- skynet.error(msg)
- skynet.response()(false)
- end
- end
- function command.push(source, node, addr, msg, sz)
- local session = node_session[node] or 1
- local request, new_session, padding = cluster.packpush(addr, session, msg, sz)
- if padding then -- is multi push
- node_session[node] = new_session
- end
- -- node_channel[node] may yield or throw error
- local c = node_channel[node]
- c:request(request, nil, padding)
- -- notice: push may fail where the channel is disconnected or broken.
- end
- local proxy = {}
- function command.proxy(source, node, name)
- if name == nil then
- node, name = node:match "^([^@.]+)([@.].+)"
- if name == nil then
- error ("Invalid name " .. tostring(node))
- end
- end
- local fullname = node .. "." .. name
- if proxy[fullname] == nil then
- proxy[fullname] = skynet.newservice("clusterproxy", node, name)
- end
- skynet.ret(skynet.pack(proxy[fullname]))
- end
- local cluster_agent = {} -- fd:service
- local register_name = {}
- local function clearnamecache()
- for fd, service in pairs(cluster_agent) do
- if type(service) == "number" then
- skynet.send(service, "lua", "namechange")
- end
- end
- end
- function command.register(source, name, addr)
- assert(register_name[name] == nil)
- addr = addr or source
- local old_name = register_name[addr]
- if old_name then
- register_name[old_name] = nil
- clearnamecache()
- end
- register_name[addr] = name
- register_name[name] = addr
- skynet.ret(nil)
- skynet.error(string.format("Register [%s] :%08x", name, addr))
- end
- function command.queryname(source, name)
- skynet.ret(skynet.pack(register_name[name]))
- end
- function command.socket(source, subcmd, fd, msg)
- if subcmd == "open" then
- skynet.error(string.format("socket accept from %s", msg))
- -- new cluster agent
- cluster_agent[fd] = false
- local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
- local closed = cluster_agent[fd]
- cluster_agent[fd] = agent
- if closed then
- skynet.send(agent, "lua", "exit")
- cluster_agent[fd] = nil
- end
- else
- if subcmd == "close" or subcmd == "error" then
- -- close cluster agent
- local agent = cluster_agent[fd]
- if type(agent) == "boolean" then
- cluster_agent[fd] = true
- else
- skynet.send(agent, "lua", "exit")
- cluster_agent[fd] = nil
- end
- else
- skynet.error(string.format("socket %s %d %s", subcmd, fd, msg or ""))
- end
- end
- end
- skynet.start(function()
- loadconfig()
- skynet.dispatch("lua", function(session , source, cmd, ...)
- local f = assert(command[cmd])
- f(source, ...)
- end)
- end)
|