srvRedis.lua 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. local skynet = require "skynet"
  2. local redis = require "redis"
  3. local lib_logger = require("log.lib_logger")
  4. local redis_cf, connect_num = ...
  5. local db_pool = {}
  6. local logger
  7. local query_num = 0
  8. local WARN_QUERY_NUM = 10000
  9. local function acquire()
  10. if #db_pool < 1 then
  11. logger:warn("redis maybe busy!!!")
  12. local ok, db = pcall(redis.connect, redis_cf)
  13. if not ok then
  14. logger:error("redis connect error %s", db or "null")
  15. return
  16. end
  17. return db
  18. end
  19. return table.remove(db_pool, 1)
  20. end
  21. local function release(db)
  22. if #db_pool > 20 then
  23. pcall(db.disconnect, db)
  24. return
  25. end
  26. table.insert(db_pool, db)
  27. end
  28. local function query(cmd, ...)
  29. local db = acquire()
  30. if not db then
  31. return nil
  32. end
  33. local f = db[cmd]
  34. if not f then
  35. logger:error("redis not this cmd %s", cmd)
  36. release(db)
  37. return nil
  38. end
  39. local ok, r = pcall(f, db, ...)
  40. if not ok then
  41. pcall(db.disconnect, db)
  42. logger:error("redis query cmd %s error %s", cmd, r or "null")
  43. return nil
  44. end
  45. release(db) -- 释放redis db connect
  46. return r
  47. end
  48. local CMD = {}
  49. function CMD.query(cmd, ...)
  50. query_num = query_num + 1
  51. return query(cmd, ...)
  52. end
  53. function CMD.exit()
  54. for i, db in ipairs(db_pool) do
  55. pcall(db.disconnect, db)
  56. end
  57. skynet.exit()
  58. end
  59. local function init_redis(cf, num)
  60. num = num or 10
  61. cf = cjson_decode(cf)
  62. redis_cf = cf
  63. logger = lib_logger:new("redis")
  64. skynet.fork(
  65. function(...)
  66. for i = 1, num do
  67. table.insert(db_pool, redis.connect(redis_cf))
  68. end
  69. local handle = skynet.self()
  70. local port = redis_cf.port
  71. while true do
  72. if query_num > WARN_QUERY_NUM then
  73. logger:warn("redis %d port %d query number %d", handle, port, query_num)
  74. elseif query_num > 0 then
  75. logger:info("redis %d port %d query %d", handle, port, query_num)
  76. end
  77. query_num = 0
  78. skynet.sleep(5 * 60 * 100)
  79. end
  80. end
  81. )
  82. end
  83. skynet.start(
  84. function()
  85. init_redis(redis_cf, connect_num)
  86. skynet.dispatch(
  87. "lua",
  88. function(session, _, command, ...)
  89. local f = CMD[command]
  90. if session == 0 then
  91. return f(...)
  92. end
  93. skynet.ret(skynet.pack(f(...)))
  94. end
  95. )
  96. end
  97. )