wsSprotoWatchdog.lua 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. local skynet = require "skynet"
  2. local code = require "code"
  3. local agentPool = require "agentPool"
  4. local protoUtil = require("utils.protoUtil")
  5. local tokenUtil = require("utils.tokenUtil")
  6. local nodes = require("nodes")
  7. local CMD = {}
  8. local SOCKET = {}
  9. local gate
  10. local pool
  11. local isStoping --正在停服中
  12. -- UID登录状态
  13. local LOGINING = 2
  14. local LOGIN_DONE = 3
  15. local mapFd = {}
  16. local mapUid2Session = {}
  17. local function l_close_connect(fd)
  18. if gate then
  19. skynet.call(gate, "lua", "kick", fd)
  20. end
  21. -- 清除连接信息
  22. local fdInfo = mapFd[fd]
  23. mapFd[fd] = nil
  24. if fdInfo == nil or fdInfo.uid == nil then
  25. -- 链接未绑定玩家
  26. return
  27. end
  28. local uid = fdInfo.uid
  29. -- 清除gateSession信息
  30. local session = mapUid2Session[uid]
  31. mapUid2Session[uid] = nil
  32. -- 通知玩家断开
  33. if session and session.gateAgent then
  34. local gateAgent = session.gateAgent
  35. skynet.call(gateAgent, "lua", "logout", uid, fd)
  36. pool:unbind_agent(uid, gateAgent)
  37. end
  38. end
  39. -- 新增fd
  40. function SOCKET.open(fd, addr)
  41. -- 停服中不接受连接
  42. if isStoping then
  43. return
  44. end
  45. local fdInfo = {fd = fd, addr = addr, auth = false, uid = nil}
  46. mapFd[fd] = fdInfo
  47. end
  48. -- 关闭fd
  49. function SOCKET.close(fd)
  50. skynet.error("socket close", fd)
  51. l_close_connect(fd)
  52. end
  53. function SOCKET.error(fd, msg)
  54. skynet.error("socket error", fd, msg)
  55. l_close_connect(fd)
  56. end
  57. function SOCKET.warning(fd, size)
  58. -- size K bytes havn't send out in fd
  59. skynet.error("socket warning", fd, size)
  60. end
  61. -- 链接鉴权
  62. local function l_do_auth(msg, sz)
  63. local data = skynet.tostring(msg, sz)
  64. local ok, _, name, args, response = protoUtil:decode_c2s_req(data)
  65. if not ok then
  66. log.error("sproto parse fail...")
  67. return false, response
  68. end
  69. if name ~= "usr_auth_token" then
  70. log.error("proto:%s not auth package...", name)
  71. return false, response
  72. end
  73. local uid = args.uid
  74. local ok, errMsg = tokenUtil.auth(uid, args.password, args.token)
  75. if not ok then
  76. log.error("client [%s] auth fail, errMsg[%s]", uid, tostring(errMsg))
  77. return false, response
  78. end
  79. return true, response, uid
  80. end
  81. -- 通知 - 鉴权结果
  82. local function l_response_auth(fd, response, errCode)
  83. local rs = protoUtil:encode_c2s_rsp(response, {code = errCode})
  84. if not rs then
  85. return
  86. end
  87. skynet.send(gate, "lua", "send_to_client", fd, rs)
  88. end
  89. -- 创建链接
  90. local function l_build_connection(uid, fd, response)
  91. local session = {fd = fd, status = LOGINING}
  92. mapUid2Session[uid] = session
  93. -- 分配agent
  94. local gateAgent = pool:alloc_agent_srv(uid)
  95. if gateAgent == nil then
  96. session.status = LOGIN_DONE
  97. l_response_auth(fd, response, code.UNKNOWN)
  98. return l_close_connect(fd)
  99. end
  100. -- 进行连接
  101. local ok, ret = pcall(skynet.call, gateAgent, "lua", "login", uid, fd)
  102. if not ok or not ret then
  103. session.status = LOGIN_DONE
  104. return l_close_connect(fd)
  105. end
  106. session.gateAgent = gateAgent
  107. session.status = LOGIN_DONE
  108. -- 检查连接是否还在
  109. if not mapFd[fd] then
  110. return l_close_connect(fd)
  111. end
  112. -- 绑定agent
  113. pool:bind_agent(uid, gateAgent)
  114. return l_response_auth(fd, response, code.OK)
  115. end
  116. local function l_notify_server_code(fd, errCode)
  117. local rs = protoUtil:encode_s2c_req("on_server_code", {code = errCode})
  118. if not rs then
  119. return
  120. end
  121. skynet.send(gate, "lua", "send_to_client", fd, rs)
  122. end
  123. function SOCKET.data(fd, msg, size)
  124. local fdInfo = mapFd[fd]
  125. if not fdInfo then
  126. return l_close_connect(fd)
  127. end
  128. local ok, response, uid = l_do_auth(msg, size)
  129. if not ok then
  130. l_response_auth(fd, response, code.TOKEN_AUTH_FAIL)
  131. -- 关闭链接
  132. return l_close_connect(fd)
  133. end
  134. fdInfo.auth = true
  135. fdInfo.uid = uid
  136. local session = mapUid2Session[uid]
  137. -- 重复登录
  138. if session and session.status == LOGINING then
  139. -- 顶号登录
  140. return l_notify_server_code(fd, code.LOGINING)
  141. elseif session and session.status == LOGIN_DONE then
  142. -- 存在上一次连接 但状态不对 理论上不存在此可能性
  143. if fd == session.fd then
  144. return l_notify_server_code(fd, code.NOT_REPEAT_LOGIN)
  145. end
  146. -- 重复登陆
  147. session.status = LOGINING
  148. -- 踢旧号
  149. l_notify_server_code(session.fd, code.REPLACE_LOGIN)
  150. l_close_connect(session.fd)
  151. -- 进行连接
  152. return l_build_connection(uid, fd, response)
  153. elseif session then
  154. -- 首次登录
  155. if session.fd then
  156. l_close_connect(session.fd)
  157. end
  158. log.error("uid:%s 上一次的连接没有完全断开", uid)
  159. return l_build_connection(uid, fd, response)
  160. else
  161. return l_build_connection(uid, fd, response)
  162. end
  163. end
  164. function CMD.start()
  165. local nodeName = skynet.getenv("nodeName")
  166. local conf = {}
  167. conf.address = nodes[nodeName].ip
  168. conf.port = nodes[nodeName].port
  169. conf.maxclient = nodes[nodeName].maxClient
  170. conf.protocol = nodes[nodeName].protocol
  171. conf.wsPort = nodes[nodeName].wsPort
  172. local nodelay = nodes[nodeName].nodelay
  173. if nodelay and tonumber(nodelay) and tonumber(nodelay) == 1 then
  174. conf.nodelay = true
  175. end
  176. -- 开始监听链接
  177. skynet.call(gate, "lua", "open", conf)
  178. end
  179. -- 关服
  180. function CMD.exit()
  181. isStoping = true
  182. local fds = {}
  183. for fd, _ in pairs(mapFd) do
  184. table.insert(fds, fd)
  185. end
  186. for _, fd in ipairs(fds) do
  187. l_close_connect(fd)
  188. end
  189. skynet.send(".steward", "lua", "stop_service", skynet.self())
  190. end
  191. -- 关闭链接
  192. function CMD.close(fd)
  193. l_close_connect(fd)
  194. end
  195. -- 热更新 - 配置
  196. function CMD.update_config()
  197. end
  198. -- 热更新 - 协议
  199. function CMD.update_proto()
  200. local addrList = pool:get_agent_srv_addr_list()
  201. for _, v in pairs(addrList) do
  202. skynet.send(v, "lua", "update_proto")
  203. end
  204. end
  205. -- 热更新 - 逻辑
  206. function CMD.update_logic()
  207. end
  208. -- 广播协议
  209. function CMD.s2c_broadcast(protoName, msg)
  210. local addrList = pool:get_agent_srv_addr_list()
  211. for _, v in pairs(addrList) do
  212. skynet.send(v, "lua", "s2c_broadcast", protoName, msg)
  213. end
  214. end
  215. skynet.start(
  216. function()
  217. skynet.dispatch(
  218. "lua",
  219. function(session, source, cmd, subcmd, ...)
  220. if cmd == "socket" then
  221. -- socket api don't need return
  222. local f = SOCKET[subcmd]
  223. f(...)
  224. else
  225. local f = assert(CMD[cmd])
  226. skynet.retpack(f(subcmd, ...))
  227. end
  228. end
  229. )
  230. pool = agentPool.new("srvSprotoAgent", 4)
  231. gate = skynet.uniqueservice("srvGate")
  232. skynet.send(".steward", "lua", "start_service", skynet.self(), "wsSprotoWatchdog", nil, 1)
  233. end
  234. )