123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- local skynet = require "skynet"
- local socket = require "skynet.socket"
- local socketchannel = require "skynet.socketchannel"
- local table = table
- local string = string
- local assert = assert
- local redis = {}
- local command = {}
- local meta = {
- __index = command,
- -- DO NOT close channel in __gc
- }
- ---------- redis response
- local redcmd = {}
- redcmd[36] = function(fd, data) -- '$'
- local bytes = tonumber(data)
- if bytes < 0 then
- return true,nil
- end
- local firstline = fd:read(bytes+2)
- return true,string.sub(firstline,1,-3)
- end
- redcmd[43] = function(fd, data) -- '+'
- return true,data
- end
- redcmd[45] = function(fd, data) -- '-'
- return false,data
- end
- redcmd[58] = function(fd, data) -- ':'
- -- todo: return string later
- return true, tonumber(data)
- end
- local function read_response(fd)
- local result = fd:readline "\r\n"
- local firstchar = string.byte(result)
- local data = string.sub(result,2)
- return redcmd[firstchar](fd,data)
- end
- redcmd[42] = function(fd, data) -- '*'
- local n = tonumber(data)
- if n < 0 then
- return true, nil
- end
- local bulk = {}
- local noerr = true
- for i = 1,n do
- local ok, v = read_response(fd)
- if not ok then
- noerr = false
- end
- bulk[i] = v
- end
- return noerr, bulk
- end
- -------------------
- function command:disconnect()
- self[1]:close()
- setmetatable(self, nil)
- end
- -- msg could be any type of value
- local function make_cache(f)
- return setmetatable({}, {
- __mode = "kv",
- __index = f,
- })
- end
- local header_cache = make_cache(function(t,k)
- local s = "\r\n$" .. k .. "\r\n"
- t[k] = s
- return s
- end)
- local command_cache = make_cache(function(t,cmd)
- local s = "\r\n$"..#cmd.."\r\n"..cmd:upper()
- t[cmd] = s
- return s
- end)
- local count_cache = make_cache(function(t,k)
- local s = "*" .. k
- t[k] = s
- return s
- end)
- local function compose_message(cmd, msg)
- local t = type(msg)
- local lines = {}
- if t == "table" then
- lines[1] = count_cache[#msg+1]
- lines[2] = command_cache[cmd]
- local idx = 3
- for _,v in ipairs(msg) do
- v= tostring(v)
- lines[idx] = header_cache[#v]
- lines[idx+1] = v
- idx = idx + 2
- end
- lines[idx] = "\r\n"
- else
- msg = tostring(msg)
- lines[1] = "*2"
- lines[2] = command_cache[cmd]
- lines[3] = header_cache[#msg]
- lines[4] = msg
- lines[5] = "\r\n"
- end
- return lines
- end
- local function redis_login(auth, db)
- if auth == nil and db == nil then
- return
- end
- return function(so)
- if auth then
- so:request(compose_message("AUTH", auth), read_response)
- end
- if db then
- so:request(compose_message("SELECT", db), read_response)
- end
- end
- end
- function redis.connect(db_conf)
- local channel = socketchannel.channel {
- host = db_conf.host,
- port = db_conf.port or 6379,
- auth = redis_login(db_conf.auth, db_conf.db),
- nodelay = true,
- }
- -- try connect first only once
- channel:connect(true)
- return setmetatable( { channel }, meta )
- end
- setmetatable(command, { __index = function(t,k)
- local cmd = string.upper(k)
- local f = function (self, v, ...)
- if type(v) == "table" then
- return self[1]:request(compose_message(cmd, v), read_response)
- else
- return self[1]:request(compose_message(cmd, {v, ...}), read_response)
- end
- end
- t[k] = f
- return f
- end})
- local function read_boolean(so)
- local ok, result = read_response(so)
- return ok, result ~= 0
- end
- function command:exists(key)
- local fd = self[1]
- return fd:request(compose_message ("EXISTS", key), read_boolean)
- end
- function command:sismember(key, value)
- local fd = self[1]
- return fd:request(compose_message ("SISMEMBER", {key, value}), read_boolean)
- end
- local function compose_table(lines, msg)
- local tinsert = table.insert
- tinsert(lines, count_cache[#msg])
- for _,v in ipairs(msg) do
- v = tostring(v)
- tinsert(lines,header_cache[#v])
- tinsert(lines,v)
- end
- tinsert(lines, "\r\n")
- return lines
- end
- function command:pipeline(ops,resp)
- assert(ops and #ops > 0, "pipeline is null")
- local fd = self[1]
- local cmds = {}
- for _, cmd in ipairs(ops) do
- compose_table(cmds, cmd)
- end
- if resp then
- return fd:request(cmds, function (fd)
- for i=1, #ops do
- local ok, out = read_response(fd)
- table.insert(resp, {ok = ok, out = out})
- end
- return true, resp
- end)
- else
- return fd:request(cmds, function (fd)
- local ok, out
- for i=1, #ops do
- ok, out = read_response(fd)
- end
- -- return last response
- return ok,out
- end)
- end
- end
- --- watch mode
- local watch = {}
- local watchmeta = {
- __index = watch,
- __gc = function(self)
- self.__sock:close()
- end,
- }
- local function watch_login(obj, auth)
- return function(so)
- if auth then
- so:request(compose_message("AUTH", auth), read_response)
- end
- for k in pairs(obj.__psubscribe) do
- so:request(compose_message ("PSUBSCRIBE", k))
- end
- for k in pairs(obj.__subscribe) do
- so:request(compose_message("SUBSCRIBE", k))
- end
- end
- end
- function redis.watch(db_conf)
- local obj = {
- __subscribe = {},
- __psubscribe = {},
- }
- local channel = socketchannel.channel {
- host = db_conf.host,
- port = db_conf.port or 6379,
- auth = watch_login(obj, db_conf.auth),
- nodelay = true,
- }
- obj.__sock = channel
- -- try connect first only once
- channel:connect(true)
- return setmetatable( obj, watchmeta )
- end
- function watch:disconnect()
- self.__sock:close()
- setmetatable(self, nil)
- end
- local function watch_func( name )
- local NAME = string.upper(name)
- watch[name] = function(self, ...)
- local so = self.__sock
- for i = 1, select("#", ...) do
- local v = select(i, ...)
- so:request(compose_message(NAME, v))
- end
- end
- end
- watch_func "subscribe"
- watch_func "psubscribe"
- watch_func "unsubscribe"
- watch_func "punsubscribe"
- function watch:message()
- local so = self.__sock
- while true do
- local ret = so:response(read_response)
- local type , channel, data , data2 = ret[1], ret[2], ret[3], ret[4]
- if type == "message" then
- return data, channel
- elseif type == "pmessage" then
- return data2, data, channel
- elseif type == "subscribe" then
- self.__subscribe[channel] = true
- elseif type == "psubscribe" then
- self.__psubscribe[channel] = true
- elseif type == "unsubscribe" then
- self.__subscribe[channel] = nil
- elseif type == "punsubscribe" then
- self.__psubscribe[channel] = nil
- end
- end
- end
- return redis
|