123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- local skynet = require "skynet"
- local code = require "code"
- local agentPool = require "agentPool"
- local protobufUtil = require("utils.protobufUtil")
- local tokenUtil = require("utils.tokenUtil")
- local nodes = require("nodes")
- local CMD = {}
- local SOCKET = {}
- local gate
- local pool
- local isStoping --正在停服中
- -- UID登录状态
- local LOGINING = 2
- local LOGIN_DONE = 3
- local mapFd = {}
- local mapUid2Session = {}
- local function l_close_connect(fd)
- if gate then
- skynet.call(gate, "lua", "kick", fd)
- end
- -- 清除连接信息
- local fdInfo = mapFd[fd]
- mapFd[fd] = nil
- if fdInfo == nil or fdInfo.uid == nil then
- -- 链接未绑定玩家
- return
- end
- local uid = fdInfo.uid
- -- 清除gateSession信息
- local session = mapUid2Session[uid]
- mapUid2Session[uid] = nil
- -- 通知玩家断开
- if session and session.gateAgent then
- local gateAgent = session.gateAgent
- skynet.call(gateAgent, "lua", "logout", uid, fd)
- pool:unbind_agent(uid, gateAgent)
- end
- end
- -- 新增fd
- function SOCKET.open(fd, addr)
- -- 停服中不接受连接
- if isStoping then
- return
- end
- local fdInfo = {fd = fd, addr = addr, auth = false, uid = nil}
- mapFd[fd] = fdInfo
- end
- -- 关闭fd
- function SOCKET.close(fd)
- skynet.error("socket close", fd)
- l_close_connect(fd)
- end
- function SOCKET.error(fd, msg)
- skynet.error("socket error", fd, msg)
- l_close_connect(fd)
- end
- function SOCKET.warning(fd, size)
- -- size K bytes havn't send out in fd
- skynet.error("socket warning", fd, size)
- end
- -- 链接鉴权
- local function l_do_auth(msg, sz)
- local data = skynet.tostring(msg, sz)
- local ok, protoname, args, response, session = pcall(protobufUtil.parse_protobuf_data, data)
- if not ok then
- log.error("sproto parse fail...")
- return false, response
- end
- if protoname ~= "usr_auth_token" then
- log.error("proto:%s not auth package...", protoname)
- return false, response
- end
- local uid = args.uid
- local ok, errMsg = tokenUtil.auth(uid, args.password, args.token)
- if not ok then
- log.error("client [%s] auth fail, errMsg[%s]", uid, tostring(errMsg))
- return false, response
- end
- return true, response, uid, session
- end
- -- 通知 - 鉴权结果
- local function l_response_auth(fd, errCode, session)
- local data = protobufUtil.pack_response_msg("user_rsp_auth_token", {code = errCode}, session)
- if not data then
- return
- end
- skynet.send(gate, "lua", "send_to_client", fd, data)
- end
- -- 创建链接
- local function l_build_connection(uid, fd, msgSession)
- local session = {fd = fd, status = LOGINING}
- mapUid2Session[uid] = session
- -- 分配agent
- local gateAgent = pool:alloc_agent_srv(uid)
- if gateAgent == nil then
- session.status = LOGIN_DONE
- l_response_auth(fd, code.UNKNOWN, msgSession)
- return l_close_connect(fd)
- end
- -- 进行连接
- local ok, ret = pcall(skynet.call, gateAgent, "lua", "login", uid, fd)
- if not ok or not ret then
- session.status = LOGIN_DONE
- return l_close_connect(fd)
- end
- session.gateAgent = gateAgent
- session.status = LOGIN_DONE
- -- 检查连接是否还在
- if not mapFd[fd] then
- return l_close_connect(fd)
- end
- -- 绑定agent
- pool:bind_agent(uid, gateAgent)
- return l_response_auth(fd, code.OK, msgSession)
- end
- local function l_notify_server_code(fd, errCode)
- local data = protobufUtil.pack_push_msg("on_server_code", {code = errCode})
- if not data then
- return
- end
- skynet.send(gate, "lua", "send_to_client", fd, data)
- end
- function SOCKET.data(fd, msg, size)
- local fdInfo = mapFd[fd]
- if not fdInfo then
- return l_close_connect(fd)
- end
- local ok, response, uid, msgSession = l_do_auth(msg, size)
- if not ok then
- l_response_auth(fd, code.TOKEN_AUTH_FAIL, msgSession)
- -- 关闭链接
- return l_close_connect(fd)
- end
- fdInfo.auth = true
- fdInfo.uid = uid
- local session = mapUid2Session[uid]
- -- 重复登录
- if session and session.status == LOGINING then
- -- 顶号登录
- return l_notify_server_code(fd, code.LOGINING)
- elseif session and session.status == LOGIN_DONE then
- -- 存在上一次连接 但状态不对 理论上不存在此可能性
- if fd == session.fd then
- return l_notify_server_code(fd, code.NOT_REPEAT_LOGIN)
- end
- -- 重复登陆
- session.status = LOGINING
- -- 踢旧号
- l_notify_server_code(session.fd, code.REPLACE_LOGIN)
- l_close_connect(session.fd)
- -- 进行连接
- return l_build_connection(uid, fd, response)
- elseif session then
- -- 首次登录
- if session.fd then
- l_close_connect(session.fd)
- end
- log.error("uid:%s 上一次的连接没有完全断开", uid)
- return l_build_connection(uid, fd, response)
- else
- return l_build_connection(uid, fd, response)
- end
- end
- function CMD.start()
- local nodeName = skynet.getenv("nodeName")
- local conf = {}
- conf.address = nodes[nodeName].ip
- conf.port = nodes[nodeName].port
- conf.maxclient = nodes[nodeName].maxClient
- conf.protocol = nodes[nodeName].protocol
- conf.wsPort = nodes[nodeName].wsPort
- local nodelay = nodes[nodeName].nodelay
- if nodelay and tonumber(nodelay) and tonumber(nodelay) == 1 then
- conf.nodelay = true
- end
- -- 开始监听链接
- skynet.call(gate, "lua", "open", conf)
- end
- -- 关服
- function CMD.exit()
- isStoping = true
- local fds = {}
- for fd, _ in pairs(mapFd) do
- table.insert(fds, fd)
- end
- for _, fd in ipairs(fds) do
- l_close_connect(fd)
- end
- skynet.send(".steward", "lua", "stop_service", skynet.self())
- end
- -- 关闭链接
- function CMD.close(fd)
- l_close_connect(fd)
- end
- -- 热更新 - 配置
- function CMD.update_config()
- end
- -- 热更新 - 协议
- function CMD.update_proto()
- local addrList = pool:get_agent_srv_addr_list()
- for _, v in pairs(addrList) do
- skynet.send(v, "lua", "update_proto")
- end
- end
- -- 热更新 - 逻辑
- function CMD.update_logic()
- end
- skynet.start(
- function()
- skynet.dispatch(
- "lua",
- function(session, source, cmd, subcmd, ...)
- if cmd == "socket" then
- -- socket api don't need return
- local f = SOCKET[subcmd]
- f(...)
- else
- local f = assert(CMD[cmd])
- skynet.retpack(f(subcmd, ...))
- end
- end
- )
- pool = agentPool.new("srvProtobufAgent", 4)
- gate = skynet.uniqueservice("srvGate")
- skynet.send(".steward", "lua", "start_service", skynet.self(), "wsProtobufWatchdog", nil, 1)
- end
- )
|