clusterd.lua 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. local skynet = require "skynet"
  2. local sc = require "skynet.socketchannel"
  3. local socket = require "skynet.socket"
  4. local cluster = require "skynet.cluster.core"
  5. local config_name = skynet.getenv "cluster"
  6. local node_address = {}
  7. local node_session = {}
  8. local command = {}
  9. local function read_response(sock)
  10. local sz = socket.header(sock:read(2))
  11. local msg = sock:read(sz)
  12. return cluster.unpackresponse(msg) -- session, ok, data, padding
  13. end
  14. local connecting = {}
  15. local function open_channel(t, key)
  16. local ct = connecting[key]
  17. if ct then
  18. local co = coroutine.running()
  19. table.insert(ct, co)
  20. skynet.wait(co)
  21. return assert(ct.channel)
  22. end
  23. ct = {}
  24. connecting[key] = ct
  25. local address = node_address[key]
  26. if address == nil then
  27. local co = coroutine.running()
  28. assert(ct.namequery == nil)
  29. ct.namequery = co
  30. skynet.error("Wating for cluster node [".. key.."]")
  31. skynet.wait(co)
  32. address = node_address[key]
  33. assert(address ~= nil)
  34. end
  35. local succ, err, c
  36. if address then
  37. local host, port = string.match(address, "([^:]+):(.*)$")
  38. c = sc.channel {
  39. host = host,
  40. port = tonumber(port),
  41. response = read_response,
  42. nodelay = true,
  43. }
  44. succ, err = pcall(c.connect, c, true)
  45. if succ then
  46. t[key] = c
  47. ct.channel = c
  48. end
  49. else
  50. err = "cluster node [" .. key .. "] is down."
  51. end
  52. connecting[key] = nil
  53. for _, co in ipairs(ct) do
  54. skynet.wakeup(co)
  55. end
  56. assert(succ, err)
  57. return c
  58. end
  59. local node_channel = setmetatable({}, { __index = open_channel })
  60. local function loadconfig(tmp)
  61. if tmp == nil then
  62. tmp = {}
  63. if config_name then
  64. local f = assert(io.open(config_name))
  65. local source = f:read "*a"
  66. f:close()
  67. assert(load(source, "@"..config_name, "t", tmp))()
  68. end
  69. end
  70. for name,address in pairs(tmp) do
  71. assert(address == false or type(address) == "string")
  72. if node_address[name] ~= address then
  73. -- address changed
  74. if rawget(node_channel, name) then
  75. node_channel[name] = nil -- reset connection
  76. end
  77. node_address[name] = address
  78. end
  79. local ct = connecting[name]
  80. if ct and ct.namequery then
  81. skynet.error(string.format("Cluster node [%s] resloved : %s", name, address))
  82. skynet.wakeup(ct.namequery)
  83. end
  84. end
  85. end
  86. function command.reload(source, config)
  87. loadconfig(config)
  88. skynet.ret(skynet.pack(nil))
  89. end
  90. function command.listen(source, addr, port)
  91. local gate = skynet.newservice("gate")
  92. if port == nil then
  93. local address = assert(node_address[addr], addr .. " is down")
  94. addr, port = string.match(address, "([^:]+):(.*)$")
  95. end
  96. skynet.call(gate, "lua", "open", { address = addr, port = port })
  97. skynet.ret(skynet.pack(nil))
  98. end
  99. local function send_request(source, node, addr, msg, sz)
  100. local session = node_session[node] or 1
  101. -- msg is a local pointer, cluster.packrequest will free it
  102. local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
  103. node_session[node] = new_session
  104. -- node_channel[node] may yield or throw error
  105. local c = node_channel[node]
  106. return c:request(request, session, padding)
  107. end
  108. function command.req(...)
  109. local ok, msg = pcall(send_request, ...)
  110. if ok then
  111. if type(msg) == "table" then
  112. skynet.ret(cluster.concat(msg))
  113. else
  114. skynet.ret(msg)
  115. end
  116. else
  117. skynet.error(msg)
  118. skynet.response()(false)
  119. end
  120. end
  121. function command.push(source, node, addr, msg, sz)
  122. local session = node_session[node] or 1
  123. local request, new_session, padding = cluster.packpush(addr, session, msg, sz)
  124. if padding then -- is multi push
  125. node_session[node] = new_session
  126. end
  127. -- node_channel[node] may yield or throw error
  128. local c = node_channel[node]
  129. c:request(request, nil, padding)
  130. -- notice: push may fail where the channel is disconnected or broken.
  131. end
  132. local proxy = {}
  133. function command.proxy(source, node, name)
  134. if name == nil then
  135. node, name = node:match "^([^@.]+)([@.].+)"
  136. if name == nil then
  137. error ("Invalid name " .. tostring(node))
  138. end
  139. end
  140. local fullname = node .. "." .. name
  141. if proxy[fullname] == nil then
  142. proxy[fullname] = skynet.newservice("clusterproxy", node, name)
  143. end
  144. skynet.ret(skynet.pack(proxy[fullname]))
  145. end
  146. local cluster_agent = {} -- fd:service
  147. local register_name = {}
  148. local function clearnamecache()
  149. for fd, service in pairs(cluster_agent) do
  150. if type(service) == "number" then
  151. skynet.send(service, "lua", "namechange")
  152. end
  153. end
  154. end
  155. function command.register(source, name, addr)
  156. assert(register_name[name] == nil)
  157. addr = addr or source
  158. local old_name = register_name[addr]
  159. if old_name then
  160. register_name[old_name] = nil
  161. clearnamecache()
  162. end
  163. register_name[addr] = name
  164. register_name[name] = addr
  165. skynet.ret(nil)
  166. skynet.error(string.format("Register [%s] :%08x", name, addr))
  167. end
  168. function command.queryname(source, name)
  169. skynet.ret(skynet.pack(register_name[name]))
  170. end
  171. function command.socket(source, subcmd, fd, msg)
  172. if subcmd == "open" then
  173. skynet.error(string.format("socket accept from %s", msg))
  174. -- new cluster agent
  175. cluster_agent[fd] = false
  176. local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
  177. local closed = cluster_agent[fd]
  178. cluster_agent[fd] = agent
  179. if closed then
  180. skynet.send(agent, "lua", "exit")
  181. cluster_agent[fd] = nil
  182. end
  183. else
  184. if subcmd == "close" or subcmd == "error" then
  185. -- close cluster agent
  186. local agent = cluster_agent[fd]
  187. if type(agent) == "boolean" then
  188. cluster_agent[fd] = true
  189. else
  190. skynet.send(agent, "lua", "exit")
  191. cluster_agent[fd] = nil
  192. end
  193. else
  194. skynet.error(string.format("socket %s %d %s", subcmd, fd, msg or ""))
  195. end
  196. end
  197. end
  198. skynet.start(function()
  199. loadconfig()
  200. skynet.dispatch("lua", function(session , source, cmd, ...)
  201. local f = assert(command[cmd])
  202. f(source, ...)
  203. end)
  204. end)