clusteragent.lua 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. local skynet = require "skynet"
  2. local sc = require "skynet.socketchannel"
  3. local socket = require "skynet.socket"
  4. local cluster = require "skynet.cluster.core"
  5. local clusterd, gate, fd = ...
  6. clusterd = tonumber(clusterd)
  7. gate = tonumber(gate)
  8. fd = tonumber(fd)
  9. local large_request = {}
  10. local register_name = {}
  11. setmetatable(register_name, { __index =
  12. function(self, name)
  13. local addr = skynet.call(clusterd, "lua", "queryname", name:sub(2)) -- name must be '@xxxx'
  14. if addr then
  15. self[name] = addr
  16. end
  17. return addr
  18. end
  19. })
  20. local function dispatch_request(_,_,addr, session, msg, sz, padding, is_push)
  21. if padding then
  22. local req = large_request[session] or { addr = addr , is_push = is_push }
  23. large_request[session] = req
  24. cluster.append(req, msg, sz)
  25. return
  26. else
  27. local req = large_request[session]
  28. if req then
  29. large_request[session] = nil
  30. cluster.append(req, msg, sz)
  31. msg,sz = cluster.concat(req)
  32. addr = req.addr
  33. is_push = req.is_push
  34. end
  35. if not msg then
  36. local response = cluster.packresponse(session, false, "Invalid large req")
  37. socket.write(fd, response)
  38. return
  39. end
  40. end
  41. local ok, response
  42. if addr == 0 then
  43. local name = skynet.unpack(msg, sz)
  44. skynet.trash(msg, sz)
  45. local addr = register_name["@" .. name]
  46. if addr then
  47. ok = true
  48. msg, sz = skynet.pack(addr)
  49. else
  50. ok = false
  51. msg = "name not found"
  52. end
  53. else
  54. if cluster.isname(addr) then
  55. addr = register_name[addr]
  56. end
  57. if addr then
  58. if is_push then
  59. skynet.rawsend(addr, "lua", msg, sz)
  60. return -- no response
  61. else
  62. ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz)
  63. end
  64. else
  65. ok = false
  66. msg = "Invalid name"
  67. end
  68. end
  69. if ok then
  70. response = cluster.packresponse(session, true, msg, sz)
  71. if type(response) == "table" then
  72. for _, v in ipairs(response) do
  73. socket.lwrite(fd, v)
  74. end
  75. else
  76. socket.write(fd, response)
  77. end
  78. else
  79. response = cluster.packresponse(session, false, msg)
  80. socket.write(fd, response)
  81. end
  82. end
  83. skynet.start(function()
  84. skynet.register_protocol {
  85. name = "client",
  86. id = skynet.PTYPE_CLIENT,
  87. unpack = cluster.unpackrequest,
  88. dispatch = dispatch_request,
  89. }
  90. -- fd can write, but don't read fd, the data package will forward from gate though client protocol.
  91. skynet.call(gate, "lua", "forward", fd)
  92. skynet.dispatch("lua", function(_,source, cmd, ...)
  93. if cmd == "exit" then
  94. socket.close(fd)
  95. skynet.exit()
  96. elseif cmd == "namechange" then
  97. register_name = {}
  98. else
  99. skynet.error(string.format("Invalid command %s from %s", cmd, skynet.address(source)))
  100. end
  101. end)
  102. end)