123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- -- a simple redis-cluster client
- -- rewrite from https://github.com/antirez/redis-rb-cluster
- local skynet = require "skynet"
- local redis = require "skynet.db.redis"
- local crc16 = require "skynet.db.redis.crc16"
- local RedisClusterHashSlots = 16384
- local RedisClusterRequestTTL = 16
- local _M = {}
- local rediscluster = {}
- rediscluster.__index = rediscluster
- _M.rediscluster = rediscluster
- function _M.new(startup_nodes,opt)
- if #startup_nodes == 0 then
- startup_nodes = {startup_nodes,}
- end
- opt = opt or {}
- local self = {
- startup_nodes = startup_nodes,
- max_connections = opt.max_connections or 16,
- connections = setmetatable({},{__mode = "kv"}),
- opt = opt,
- refresh_table_asap = false,
- }
- setmetatable(self,rediscluster)
- self:initialize_slots_cache()
- return self
- end
- local function nodename(node)
- return string.format("%s:%d",node.host,node.port)
- end
- function rediscluster:get_redis_link(node)
- local conf = {
- host = node.host,
- port = node.port,
- auth = self.opt.auth,
- db = self.opt.db or 0,
- }
- return redis.connect(conf)
- end
-
- -- Given a node (that is just a Ruby hash) give it a name just
- -- concatenating the host and port. We use the node name as a key
- -- to cache connections to that node.
- function rediscluster:set_node_name(node)
- if not node.name then
- node.name = nodename(node)
- end
- if not node.slaves then
- local oldnode = self.name_node[node.name]
- if oldnode then
- node.slaves = oldnode.slaves
- end
- end
- self.name_node[node.name] = node
- end
- -- Contact the startup nodes and try to fetch the hash slots -> instances
- -- map in order to initialize the @slots hash.
- function rediscluster:initialize_slots_cache()
- self.slots = {}
- self.nodes = {}
- self.name_node = {}
- for _,startup_node in ipairs(self.startup_nodes) do
- local ok = pcall(function ()
- local name = nodename(startup_node)
- local conn = self.connections[name] or self:get_redis_link(startup_node)
- local list = conn:cluster("slots")
- for _,result in ipairs(list) do
- local ip,port = table.unpack(result[3])
- assert(ip)
- port = assert(tonumber(port))
- local master_node = {
- host = ip,
- port = port,
- slaves = {},
- }
- self:set_node_name(master_node)
- for i=4,#result do
- local ip,port = table.unpack(result[i])
- assert(ip)
- port = assert(tonumber(port))
- local slave_node = {
- host = ip,
- port = port,
- }
- self:set_node_name(slave_node)
- table.insert(master_node.slaves,slave_node)
- end
- for slot=tonumber(result[1]),tonumber(result[2]) do
- table.insert(self.nodes,master_node)
- self.slots[slot] = master_node
- end
- end
- self.refresh_table_asap = false
- if not self.connections[name] then
- self.connections[name] = conn
- end
- end)
- -- Exit the loop as long as the first node replies
- if ok then
- break
- end
- end
- end
- -- Flush the cache, mostly useful for debugging when we want to force
- -- redirection.
- function rediscluster:flush_slots_cache()
- self.slots = {}
- end
- -- Return the hash slot from the key.
- function rediscluster:keyslot(key)
- -- Only hash what is inside {...} if there is such a pattern in the key.
- -- Note that the specification requires the content that is between
- -- the first { and the first } after the first {. If we found {} without
- -- nothing in the middle, the whole key is hashed as usually.
- local startpos = string.find(key,"{",1,true)
- if startpos then
- local endpos = string.find(key,"}",startpos+1,true)
- if endpos and endpos ~= startpos + 1 then
- key = string.sub(key,startpos+1,endpos-1)
- end
- end
- return crc16(key) % RedisClusterHashSlots
- end
- -- Return the first key in the command arguments.
- --
- -- Currently we just return argv[1], that is, the first argument
- -- after the command name.
- --
- -- This is indeed the key for most commands, and when it is not true
- -- the cluster redirection will point us to the right node anyway.
- --
- -- For commands we want to explicitly bad as they don't make sense
- -- in the context of cluster, nil is returned.
- function rediscluster:get_key_from_command(argv)
- local cmd,key = table.unpack(argv)
- cmd = string.lower(cmd)
- if cmd == "info" or
- cmd == "multi" or
- cmd == "exec" or
- cmd == "slaveof" or
- cmd == "config" or
- cmd == "shutdown" then
- return nil
- end
- -- Unknown commands, and all the commands having the key
- -- as first argument are handled here:
- -- set, get, ...
- return key
- end
- -- If the current number of connections is already the maximum number
- -- allowed, close a random connection. This should be called every time
- -- we cache a new connection in the @connections hash.
- function rediscluster:close_existing_connection()
- local length = 0
- for name,conn in pairs(self.connections) do
- length = length + 1
- end
- if length >= self.max_connections then
- pcall(function ()
- local name,conn = next(self.connections)
- self.connections[name] = nil
- conn:disconnect()
- end)
- end
- end
- function rediscluster:close_all_connection()
- local connections = self.connections
- self.connections = setmetatable({},{__mode = "kv"})
- for name,conn in pairs(connections) do
- pcall(conn.disconnect,conn)
- end
- end
- function rediscluster:get_connection(node)
- node.port = assert(tonumber(node.port))
- local name = node.name or nodename(node)
- local conn = self.connections[name]
- if not conn then
- conn = self:get_redis_link(node)
- self.connections[name] = conn
- end
- return self.connections[name]
- end
- -- Return a link to a random node, or raise an error if no node can be
- -- contacted. This function is only called when we can't reach the node
- -- associated with a given hash slot, or when we don't know the right
- -- mapping.
- -- The function will try to get a successful reply to the PING command,
- -- otherwise the next node is tried.
- function rediscluster:get_random_connection()
- -- shuffle
- local shuffle_idx = {}
- local startpos = 1
- local endpos = #self.nodes
- for i=startpos,endpos do
- shuffle_idx[i] = i
- end
- for i=startpos,endpos do
- local idx = math.random(i,endpos)
- local tmp = shuffle_idx[i]
- shuffle_idx[i] = shuffle_idx[idx]
- shuffle_idx[idx] = tmp
- end
- for i,idx in ipairs(shuffle_idx) do
- local ok,conn = pcall(function ()
- local node = self.nodes[idx]
- local conn = self.connections[node.name]
- if not conn then
- -- Connect the node if it is not connected
- conn = self:get_redis_link(node)
- if conn:ping() == "PONG" then
- self:close_existing_connection()
- self.connections[node.name] = conn
- return conn
- else
- -- If the connection is not good close it ASAP in order
- -- to avoid waiting for the GC finalizer. File
- -- descriptors are a rare resource.
- conn:disconnect()
- end
- else
- -- The node was already connected, test the connection.
- if conn:ping() == "PONG" then
- return conn
- end
- end
- end)
- if ok and conn then
- return conn
- end
- end
- error("Can't reach a single startup node.")
- end
- -- Given a slot return the link (Redis instance) to the mapped node.
- -- Make sure to create a connection with the node if we don't have
- -- one.
- function rediscluster:get_connection_by_slot(slot)
- local node = self.slots[slot]
- -- If we don't know what the mapping is, return a random node.
- if not node then
- return self:get_random_connection()
- end
- if not self.connections[node.name] then
- local ok = pcall(function ()
- self:close_existing_connection()
- self.connections[node.name] = self:get_redis_link(node)
- end)
- if not ok then
- if self.opt.read_slave and node.slaves and #node.slaves > 0 then
- local slave_node = node.slaves[math.random(1,#node.slaves)]
- local ok2,conn = pcall(self.get_connection,self,slave_node)
- if ok2 then
- conn:readonly() -- allow this connection read-slave
- return conn
- end
- end
- -- This will probably never happen with recent redis-rb
- -- versions because the connection is enstablished in a lazy
- -- way only when a command is called. However it is wise to
- -- handle an instance creation error of some kind.
- return self:get_random_connection()
- end
- end
- return self.connections[node.name]
- end
- -- Dispatch commands.
- function rediscluster:call(...)
- local argv = table.pack(...)
- if self.refresh_table_asap then
- self:initialize_slots_cache()
- end
- local ttl = RedisClusterRequestTTL -- Max number of redirections
- local err
- local asking = false
- local try_random_node = false
- while ttl > 0 do
- ttl = ttl - 1
- local key = self:get_key_from_command(argv)
- if not key then
- error("No way to dispatch this command to Redis Cluster: " .. tostring(argv[1]))
- end
- local conn
- local slot = self:keyslot(key)
- if asking then
- conn = self:get_connection(asking)
- elseif try_random_node then
- conn = self:get_random_connection()
- try_random_node = false
- else
- conn = self:get_connection_by_slot(slot)
- end
- local result = {pcall(function ()
- -- TODO: use pipelining to send asking and save a rtt.
- if asking then
- conn:asking()
- end
- asking = false
- local cmd = argv[1]
- local func = conn[cmd]
- return func(conn,table.unpack(argv,2))
- end)}
- local ok = result[1]
- if not ok then
- err = table.unpack(result,2)
- err = tostring(err)
- if err == "[Error: socket]" then
- -- may be nerver come here?
- try_random_node = true
- if ttl < RedisClusterRequestTTL/2 then
- skynet.sleep(10)
- end
- else
- -- err: ./lualib/skynet/socketchannel.lua:371: xxx
- err = string.match(err,".+:%d+:%s(.*)$") or err
- local errlist = {}
- for e in string.gmatch(err,"([^%s]+)%s?") do
- table.insert(errlist,e)
- end
- if (errlist[1] ~= "MOVED" and errlist[1] ~= "ASK") then
- error(err)
- else
- if errlist[1] == "ASK" then
- asking = true
- else
- -- Serve replied with MOVED. It's better for us to
- -- ask for CLUSTER SLOTS the next time.
- self.refresh_table_asap = true
- end
- local newslot = tonumber(errlist[2])
- local node_ip,node_port = string.match(errlist[3],"^([^:]+):([^:]+)$")
- node_port = assert(tonumber(node_port))
- local node = {
- host = node_ip,
- port = node_port,
- }
- if not asking then
- self:set_node_name(node)
- self.slots[newslot] = node
- else
- asking = node
- end
- end
- end
- else
- return table.unpack(result,2)
- end
- end
- error(string.format("Too many Cluster redirections?,maybe node is disconnected (last error: %q)",err))
- end
- -- Currently we handle all the commands using method_missing for
- -- simplicity. For a Cluster client actually it will be better to have
- -- every single command as a method with the right arity and possibly
- -- additional checks (example: RPOPLPUSH with same src/dst key, SORT
- -- without GET or BY, and so forth).
- setmetatable(rediscluster,{
- __index = function (t,cmd)
- t[cmd] = function (self,...)
- return self:call(cmd,...)
- end
- return t[cmd]
- end,
- })
- return _M
|