123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- local skynet = require "skynet"
- local mysql = require "mysql"
- local lib_logger = require("log.lib_logger")
- local confConn, connCount = ...
- local logger
- local connPool = {}
- local connectCount = 0 -- 当前链接数
- local queryTimes = 0 -- 查询次数
- local createConnectTimes = 0 -- 创建链接次数
- local closeConnectTimes = 0 -- 关闭链接次数
- local function disconnect(db)
- connectCount = connectCount - 1
- local ok, rs = pcall(db.disconnect, db)
- if not ok then
- logger:error("disconnect error %s", rs)
- else
- closeConnectTimes = closeConnectTimes + 1
- logger:debug("disconnect success")
- end
- end
- local function acquire()
- if #connPool < 1 then
- logger:warn("mysql maybe busy connectCount %d !!!", connectCount)
- local ok, db = pcall(mysql.connect, confConn)
- createConnectTimes = createConnectTimes + 1
- if not ok then
- logger:error("mysql connect error %s", db or "null")
- return
- end
- db:query("set names utf8")
- connectCount = connectCount + 1
- return db
- end
- return table.remove(connPool, 1)
- end
- local function release(db)
- -- 链接池保持10个
- if #connPool > 10 then
- disconnect(db)
- return
- end
- table.insert(connPool, db)
- end
- local function try_query(...)
- local db = acquire()
- if not db then
- return false
- end
- local sql = ...
- -- 可能已经断开连接,先设置下字符集
- db:query("set names utf8")
- local ok, rs = pcall(db.query, db, ...)
- if not ok then
- disconnect(db)
- logger:error("try query sql %s error %s", sql, rs or "null")
- return false
- end
- release(db)
- return rs
- end
- local function query(...)
- local db = acquire()
- local sql = ...
- if not db then
- return false
- end
- -- 可能已经断开连接,先设置下字符集
- db:query("set names utf8")
- local ok, rs = pcall(db.query, db, ...)
- if not ok then
- disconnect(db)
- logger:error("query sql %s error %s", sql, rs or "null")
- return try_query(...)
- end
- release(db)
- return rs
- end
- local CMD = {}
- function CMD.query(...)
- queryTimes = queryTimes + 1
- return query(...)
- end
- function CMD.exit()
- for k, db in ipairs(connPool) do
- disconnect(db)
- end
- end
- local function init_mysql(cf, num)
- num = num or 5
- cf = cjson_decode(cf)
- confConn = cf
- connectCount = num
- logger = lib_logger:new("mysql")
- skynet.fork(
- function(...)
- for i = 1, num do
- local db = mysql.connect(confConn)
- db:query("set names utf8")
- table.insert(connPool, db)
- end
- end
- )
- skynet.fork(
- function(...)
- while true do
- skynet.sleep(5 * 60 * 100)
- logger:debug(
- "mysql 5 mins query[%s] create[%s] close[%s]",
- tostring(queryTimes),
- tostring(createConnectTimes),
- tostring(closeConnectTimes)
- )
- queryTimes = 0
- end
- end
- )
- end
- skynet.start(
- function()
- init_mysql(confConn, connCount)
- skynet.dispatch(
- "lua",
- function(session, _, command, ...)
- local f = CMD[command]
- if session == 0 then
- return f(...)
- end
- skynet.ret(skynet.pack(f(...)))
- end
- )
- end
- )
|