redis.lua 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. local skynet = require "skynet"
  2. local socket = require "skynet.socket"
  3. local socketchannel = require "skynet.socketchannel"
  4. local table = table
  5. local string = string
  6. local assert = assert
  7. local redis = {}
  8. local command = {}
  9. local meta = {
  10. __index = command,
  11. -- DO NOT close channel in __gc
  12. }
  13. ---------- redis response
  14. local redcmd = {}
  15. redcmd[36] = function(fd, data) -- '$'
  16. local bytes = tonumber(data)
  17. if bytes < 0 then
  18. return true,nil
  19. end
  20. local firstline = fd:read(bytes+2)
  21. return true,string.sub(firstline,1,-3)
  22. end
  23. redcmd[43] = function(fd, data) -- '+'
  24. return true,data
  25. end
  26. redcmd[45] = function(fd, data) -- '-'
  27. return false,data
  28. end
  29. redcmd[58] = function(fd, data) -- ':'
  30. -- todo: return string later
  31. return true, tonumber(data)
  32. end
  33. local function read_response(fd)
  34. local result = fd:readline "\r\n"
  35. local firstchar = string.byte(result)
  36. local data = string.sub(result,2)
  37. return redcmd[firstchar](fd,data)
  38. end
  39. redcmd[42] = function(fd, data) -- '*'
  40. local n = tonumber(data)
  41. if n < 0 then
  42. return true, nil
  43. end
  44. local bulk = {}
  45. local noerr = true
  46. for i = 1,n do
  47. local ok, v = read_response(fd)
  48. if not ok then
  49. noerr = false
  50. end
  51. bulk[i] = v
  52. end
  53. return noerr, bulk
  54. end
  55. -------------------
  56. function command:disconnect()
  57. self[1]:close()
  58. setmetatable(self, nil)
  59. end
  60. -- msg could be any type of value
  61. local function make_cache(f)
  62. return setmetatable({}, {
  63. __mode = "kv",
  64. __index = f,
  65. })
  66. end
  67. local header_cache = make_cache(function(t,k)
  68. local s = "\r\n$" .. k .. "\r\n"
  69. t[k] = s
  70. return s
  71. end)
  72. local command_cache = make_cache(function(t,cmd)
  73. local s = "\r\n$"..#cmd.."\r\n"..cmd:upper()
  74. t[cmd] = s
  75. return s
  76. end)
  77. local count_cache = make_cache(function(t,k)
  78. local s = "*" .. k
  79. t[k] = s
  80. return s
  81. end)
  82. local function compose_message(cmd, msg)
  83. local t = type(msg)
  84. local lines = {}
  85. if t == "table" then
  86. lines[1] = count_cache[#msg+1]
  87. lines[2] = command_cache[cmd]
  88. local idx = 3
  89. for _,v in ipairs(msg) do
  90. v= tostring(v)
  91. lines[idx] = header_cache[#v]
  92. lines[idx+1] = v
  93. idx = idx + 2
  94. end
  95. lines[idx] = "\r\n"
  96. else
  97. msg = tostring(msg)
  98. lines[1] = "*2"
  99. lines[2] = command_cache[cmd]
  100. lines[3] = header_cache[#msg]
  101. lines[4] = msg
  102. lines[5] = "\r\n"
  103. end
  104. return lines
  105. end
  106. local function redis_login(auth, db)
  107. if auth == nil and db == nil then
  108. return
  109. end
  110. return function(so)
  111. if auth then
  112. so:request(compose_message("AUTH", auth), read_response)
  113. end
  114. if db then
  115. so:request(compose_message("SELECT", db), read_response)
  116. end
  117. end
  118. end
  119. function redis.connect(db_conf)
  120. local channel = socketchannel.channel {
  121. host = db_conf.host,
  122. port = db_conf.port or 6379,
  123. auth = redis_login(db_conf.auth, db_conf.db),
  124. nodelay = true,
  125. }
  126. -- try connect first only once
  127. channel:connect(true)
  128. return setmetatable( { channel }, meta )
  129. end
  130. setmetatable(command, { __index = function(t,k)
  131. local cmd = string.upper(k)
  132. local f = function (self, v, ...)
  133. if type(v) == "table" then
  134. return self[1]:request(compose_message(cmd, v), read_response)
  135. else
  136. return self[1]:request(compose_message(cmd, {v, ...}), read_response)
  137. end
  138. end
  139. t[k] = f
  140. return f
  141. end})
  142. local function read_boolean(so)
  143. local ok, result = read_response(so)
  144. return ok, result ~= 0
  145. end
  146. function command:exists(key)
  147. local fd = self[1]
  148. return fd:request(compose_message ("EXISTS", key), read_boolean)
  149. end
  150. function command:sismember(key, value)
  151. local fd = self[1]
  152. return fd:request(compose_message ("SISMEMBER", {key, value}), read_boolean)
  153. end
  154. local function compose_table(lines, msg)
  155. local tinsert = table.insert
  156. tinsert(lines, count_cache[#msg])
  157. for _,v in ipairs(msg) do
  158. v = tostring(v)
  159. tinsert(lines,header_cache[#v])
  160. tinsert(lines,v)
  161. end
  162. tinsert(lines, "\r\n")
  163. return lines
  164. end
  165. function command:pipeline(ops,resp)
  166. assert(ops and #ops > 0, "pipeline is null")
  167. local fd = self[1]
  168. local cmds = {}
  169. for _, cmd in ipairs(ops) do
  170. compose_table(cmds, cmd)
  171. end
  172. if resp then
  173. return fd:request(cmds, function (fd)
  174. for i=1, #ops do
  175. local ok, out = read_response(fd)
  176. table.insert(resp, {ok = ok, out = out})
  177. end
  178. return true, resp
  179. end)
  180. else
  181. return fd:request(cmds, function (fd)
  182. local ok, out
  183. for i=1, #ops do
  184. ok, out = read_response(fd)
  185. end
  186. -- return last response
  187. return ok,out
  188. end)
  189. end
  190. end
  191. --- watch mode
  192. local watch = {}
  193. local watchmeta = {
  194. __index = watch,
  195. __gc = function(self)
  196. self.__sock:close()
  197. end,
  198. }
  199. local function watch_login(obj, auth)
  200. return function(so)
  201. if auth then
  202. so:request(compose_message("AUTH", auth), read_response)
  203. end
  204. for k in pairs(obj.__psubscribe) do
  205. so:request(compose_message ("PSUBSCRIBE", k))
  206. end
  207. for k in pairs(obj.__subscribe) do
  208. so:request(compose_message("SUBSCRIBE", k))
  209. end
  210. end
  211. end
  212. function redis.watch(db_conf)
  213. local obj = {
  214. __subscribe = {},
  215. __psubscribe = {},
  216. }
  217. local channel = socketchannel.channel {
  218. host = db_conf.host,
  219. port = db_conf.port or 6379,
  220. auth = watch_login(obj, db_conf.auth),
  221. nodelay = true,
  222. }
  223. obj.__sock = channel
  224. -- try connect first only once
  225. channel:connect(true)
  226. return setmetatable( obj, watchmeta )
  227. end
  228. function watch:disconnect()
  229. self.__sock:close()
  230. setmetatable(self, nil)
  231. end
  232. local function watch_func( name )
  233. local NAME = string.upper(name)
  234. watch[name] = function(self, ...)
  235. local so = self.__sock
  236. for i = 1, select("#", ...) do
  237. local v = select(i, ...)
  238. so:request(compose_message(NAME, v))
  239. end
  240. end
  241. end
  242. watch_func "subscribe"
  243. watch_func "psubscribe"
  244. watch_func "unsubscribe"
  245. watch_func "punsubscribe"
  246. function watch:message()
  247. local so = self.__sock
  248. while true do
  249. local ret = so:response(read_response)
  250. local type , channel, data , data2 = ret[1], ret[2], ret[3], ret[4]
  251. if type == "message" then
  252. return data, channel
  253. elseif type == "pmessage" then
  254. return data2, data, channel
  255. elseif type == "subscribe" then
  256. self.__subscribe[channel] = true
  257. elseif type == "psubscribe" then
  258. self.__psubscribe[channel] = true
  259. elseif type == "unsubscribe" then
  260. self.__subscribe[channel] = nil
  261. elseif type == "punsubscribe" then
  262. self.__psubscribe[channel] = nil
  263. end
  264. end
  265. end
  266. return redis