gateserver.lua 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. local skynet = require "skynet"
  2. local netpack = require "skynet.netpack"
  3. local socketdriver = require "skynet.socketdriver"
  4. local gateserver = {}
  5. local socket -- listen socket
  6. local queue -- message queue
  7. local maxclient -- max client
  8. local client_number = 0
  9. local CMD = setmetatable({}, { __gc = function() netpack.clear(queue) end })
  10. local nodelay = false
  11. local connection = {}
  12. function gateserver.openclient(fd)
  13. if connection[fd] then
  14. socketdriver.start(fd)
  15. end
  16. end
  17. function gateserver.closeclient(fd)
  18. local c = connection[fd]
  19. if c then
  20. connection[fd] = false
  21. socketdriver.close(fd)
  22. end
  23. end
  24. function gateserver.start(handler)
  25. assert(handler.message)
  26. assert(handler.connect)
  27. function CMD.open( source, conf )
  28. assert(not socket)
  29. local address = conf.address or "0.0.0.0"
  30. local port = assert(conf.port)
  31. maxclient = conf.maxclient or 1024
  32. nodelay = conf.nodelay
  33. skynet.error(string.format("Listen on %s:%d", address, port))
  34. socket = socketdriver.listen(address, port)
  35. socketdriver.start(socket)
  36. if handler.open then
  37. return handler.open(source, conf)
  38. end
  39. end
  40. function CMD.close()
  41. assert(socket)
  42. socketdriver.close(socket)
  43. end
  44. local MSG = {}
  45. local function dispatch_msg(fd, msg, sz)
  46. if connection[fd] then
  47. handler.message(fd, msg, sz)
  48. else
  49. skynet.error(string.format("Drop message from fd (%d) : %s", fd, netpack.tostring(msg,sz)))
  50. end
  51. end
  52. MSG.data = dispatch_msg
  53. local function dispatch_queue()
  54. local fd, msg, sz = netpack.pop(queue)
  55. if fd then
  56. -- may dispatch even the handler.message blocked
  57. -- If the handler.message never block, the queue should be empty, so only fork once and then exit.
  58. skynet.fork(dispatch_queue)
  59. dispatch_msg(fd, msg, sz)
  60. for fd, msg, sz in netpack.pop, queue do
  61. dispatch_msg(fd, msg, sz)
  62. end
  63. end
  64. end
  65. MSG.more = dispatch_queue
  66. function MSG.open(fd, msg)
  67. if client_number >= maxclient then
  68. socketdriver.close(fd)
  69. return
  70. end
  71. if nodelay then
  72. socketdriver.nodelay(fd)
  73. end
  74. connection[fd] = true
  75. client_number = client_number + 1
  76. handler.connect(fd, msg)
  77. end
  78. local function close_fd(fd)
  79. local c = connection[fd]
  80. if c ~= nil then
  81. connection[fd] = nil
  82. client_number = client_number - 1
  83. end
  84. end
  85. function MSG.close(fd)
  86. if fd ~= socket then
  87. if handler.disconnect then
  88. handler.disconnect(fd)
  89. end
  90. close_fd(fd)
  91. else
  92. socket = nil
  93. end
  94. end
  95. function MSG.error(fd, msg)
  96. if fd == socket then
  97. socketdriver.close(fd)
  98. skynet.error("gateserver close listen socket, accpet error:",msg)
  99. else
  100. if handler.error then
  101. handler.error(fd, msg)
  102. end
  103. close_fd(fd)
  104. end
  105. end
  106. function MSG.warning(fd, size)
  107. if handler.warning then
  108. handler.warning(fd, size)
  109. end
  110. end
  111. skynet.register_protocol {
  112. name = "socket",
  113. id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
  114. unpack = function ( msg, sz )
  115. return netpack.filter( queue, msg, sz)
  116. end,
  117. dispatch = function (_, _, q, type, ...)
  118. queue = q
  119. if type then
  120. MSG[type](...)
  121. end
  122. end
  123. }
  124. skynet.start(function()
  125. skynet.dispatch("lua", function (_, address, cmd, ...)
  126. local f = CMD[cmd]
  127. if f then
  128. skynet.ret(skynet.pack(f(address, ...)))
  129. else
  130. skynet.ret(skynet.pack(handler.command(cmd, address, ...)))
  131. end
  132. end)
  133. end)
  134. end
  135. return gateserver