cluster.lua 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. -- a simple redis-cluster client
  2. -- rewrite from https://github.com/antirez/redis-rb-cluster
  3. local skynet = require "skynet"
  4. local redis = require "skynet.db.redis"
  5. local crc16 = require "skynet.db.redis.crc16"
  6. local RedisClusterHashSlots = 16384
  7. local RedisClusterRequestTTL = 16
  8. local _M = {}
  9. local rediscluster = {}
  10. rediscluster.__index = rediscluster
  11. _M.rediscluster = rediscluster
  12. function _M.new(startup_nodes,opt)
  13. if #startup_nodes == 0 then
  14. startup_nodes = {startup_nodes,}
  15. end
  16. opt = opt or {}
  17. local self = {
  18. startup_nodes = startup_nodes,
  19. max_connections = opt.max_connections or 16,
  20. connections = setmetatable({},{__mode = "kv"}),
  21. opt = opt,
  22. refresh_table_asap = false,
  23. }
  24. setmetatable(self,rediscluster)
  25. self:initialize_slots_cache()
  26. return self
  27. end
  28. local function nodename(node)
  29. return string.format("%s:%d",node.host,node.port)
  30. end
  31. function rediscluster:get_redis_link(node)
  32. local conf = {
  33. host = node.host,
  34. port = node.port,
  35. auth = self.opt.auth,
  36. db = self.opt.db or 0,
  37. }
  38. return redis.connect(conf)
  39. end
  40. -- Given a node (that is just a Ruby hash) give it a name just
  41. -- concatenating the host and port. We use the node name as a key
  42. -- to cache connections to that node.
  43. function rediscluster:set_node_name(node)
  44. if not node.name then
  45. node.name = nodename(node)
  46. end
  47. if not node.slaves then
  48. local oldnode = self.name_node[node.name]
  49. if oldnode then
  50. node.slaves = oldnode.slaves
  51. end
  52. end
  53. self.name_node[node.name] = node
  54. end
  55. -- Contact the startup nodes and try to fetch the hash slots -> instances
  56. -- map in order to initialize the @slots hash.
  57. function rediscluster:initialize_slots_cache()
  58. self.slots = {}
  59. self.nodes = {}
  60. self.name_node = {}
  61. for _,startup_node in ipairs(self.startup_nodes) do
  62. local ok = pcall(function ()
  63. local name = nodename(startup_node)
  64. local conn = self.connections[name] or self:get_redis_link(startup_node)
  65. local list = conn:cluster("slots")
  66. for _,result in ipairs(list) do
  67. local ip,port = table.unpack(result[3])
  68. assert(ip)
  69. port = assert(tonumber(port))
  70. local master_node = {
  71. host = ip,
  72. port = port,
  73. slaves = {},
  74. }
  75. self:set_node_name(master_node)
  76. for i=4,#result do
  77. local ip,port = table.unpack(result[i])
  78. assert(ip)
  79. port = assert(tonumber(port))
  80. local slave_node = {
  81. host = ip,
  82. port = port,
  83. }
  84. self:set_node_name(slave_node)
  85. table.insert(master_node.slaves,slave_node)
  86. end
  87. for slot=tonumber(result[1]),tonumber(result[2]) do
  88. table.insert(self.nodes,master_node)
  89. self.slots[slot] = master_node
  90. end
  91. end
  92. self.refresh_table_asap = false
  93. if not self.connections[name] then
  94. self.connections[name] = conn
  95. end
  96. end)
  97. -- Exit the loop as long as the first node replies
  98. if ok then
  99. break
  100. end
  101. end
  102. end
  103. -- Flush the cache, mostly useful for debugging when we want to force
  104. -- redirection.
  105. function rediscluster:flush_slots_cache()
  106. self.slots = {}
  107. end
  108. -- Return the hash slot from the key.
  109. function rediscluster:keyslot(key)
  110. -- Only hash what is inside {...} if there is such a pattern in the key.
  111. -- Note that the specification requires the content that is between
  112. -- the first { and the first } after the first {. If we found {} without
  113. -- nothing in the middle, the whole key is hashed as usually.
  114. local startpos = string.find(key,"{",1,true)
  115. if startpos then
  116. local endpos = string.find(key,"}",startpos+1,true)
  117. if endpos and endpos ~= startpos + 1 then
  118. key = string.sub(key,startpos+1,endpos-1)
  119. end
  120. end
  121. return crc16(key) % RedisClusterHashSlots
  122. end
  123. -- Return the first key in the command arguments.
  124. --
  125. -- Currently we just return argv[1], that is, the first argument
  126. -- after the command name.
  127. --
  128. -- This is indeed the key for most commands, and when it is not true
  129. -- the cluster redirection will point us to the right node anyway.
  130. --
  131. -- For commands we want to explicitly bad as they don't make sense
  132. -- in the context of cluster, nil is returned.
  133. function rediscluster:get_key_from_command(argv)
  134. local cmd,key = table.unpack(argv)
  135. cmd = string.lower(cmd)
  136. if cmd == "info" or
  137. cmd == "multi" or
  138. cmd == "exec" or
  139. cmd == "slaveof" or
  140. cmd == "config" or
  141. cmd == "shutdown" then
  142. return nil
  143. end
  144. -- Unknown commands, and all the commands having the key
  145. -- as first argument are handled here:
  146. -- set, get, ...
  147. return key
  148. end
  149. -- If the current number of connections is already the maximum number
  150. -- allowed, close a random connection. This should be called every time
  151. -- we cache a new connection in the @connections hash.
  152. function rediscluster:close_existing_connection()
  153. local length = 0
  154. for name,conn in pairs(self.connections) do
  155. length = length + 1
  156. end
  157. if length >= self.max_connections then
  158. pcall(function ()
  159. local name,conn = next(self.connections)
  160. self.connections[name] = nil
  161. conn:disconnect()
  162. end)
  163. end
  164. end
  165. function rediscluster:close_all_connection()
  166. local connections = self.connections
  167. self.connections = setmetatable({},{__mode = "kv"})
  168. for name,conn in pairs(connections) do
  169. pcall(conn.disconnect,conn)
  170. end
  171. end
  172. function rediscluster:get_connection(node)
  173. node.port = assert(tonumber(node.port))
  174. local name = node.name or nodename(node)
  175. local conn = self.connections[name]
  176. if not conn then
  177. conn = self:get_redis_link(node)
  178. self.connections[name] = conn
  179. end
  180. return self.connections[name]
  181. end
  182. -- Return a link to a random node, or raise an error if no node can be
  183. -- contacted. This function is only called when we can't reach the node
  184. -- associated with a given hash slot, or when we don't know the right
  185. -- mapping.
  186. -- The function will try to get a successful reply to the PING command,
  187. -- otherwise the next node is tried.
  188. function rediscluster:get_random_connection()
  189. -- shuffle
  190. local shuffle_idx = {}
  191. local startpos = 1
  192. local endpos = #self.nodes
  193. for i=startpos,endpos do
  194. shuffle_idx[i] = i
  195. end
  196. for i=startpos,endpos do
  197. local idx = math.random(i,endpos)
  198. local tmp = shuffle_idx[i]
  199. shuffle_idx[i] = shuffle_idx[idx]
  200. shuffle_idx[idx] = tmp
  201. end
  202. for i,idx in ipairs(shuffle_idx) do
  203. local ok,conn = pcall(function ()
  204. local node = self.nodes[idx]
  205. local conn = self.connections[node.name]
  206. if not conn then
  207. -- Connect the node if it is not connected
  208. conn = self:get_redis_link(node)
  209. if conn:ping() == "PONG" then
  210. self:close_existing_connection()
  211. self.connections[node.name] = conn
  212. return conn
  213. else
  214. -- If the connection is not good close it ASAP in order
  215. -- to avoid waiting for the GC finalizer. File
  216. -- descriptors are a rare resource.
  217. conn:disconnect()
  218. end
  219. else
  220. -- The node was already connected, test the connection.
  221. if conn:ping() == "PONG" then
  222. return conn
  223. end
  224. end
  225. end)
  226. if ok and conn then
  227. return conn
  228. end
  229. end
  230. error("Can't reach a single startup node.")
  231. end
  232. -- Given a slot return the link (Redis instance) to the mapped node.
  233. -- Make sure to create a connection with the node if we don't have
  234. -- one.
  235. function rediscluster:get_connection_by_slot(slot)
  236. local node = self.slots[slot]
  237. -- If we don't know what the mapping is, return a random node.
  238. if not node then
  239. return self:get_random_connection()
  240. end
  241. if not self.connections[node.name] then
  242. local ok = pcall(function ()
  243. self:close_existing_connection()
  244. self.connections[node.name] = self:get_redis_link(node)
  245. end)
  246. if not ok then
  247. if self.opt.read_slave and node.slaves and #node.slaves > 0 then
  248. local slave_node = node.slaves[math.random(1,#node.slaves)]
  249. local ok2,conn = pcall(self.get_connection,self,slave_node)
  250. if ok2 then
  251. conn:readonly() -- allow this connection read-slave
  252. return conn
  253. end
  254. end
  255. -- This will probably never happen with recent redis-rb
  256. -- versions because the connection is enstablished in a lazy
  257. -- way only when a command is called. However it is wise to
  258. -- handle an instance creation error of some kind.
  259. return self:get_random_connection()
  260. end
  261. end
  262. return self.connections[node.name]
  263. end
  264. -- Dispatch commands.
  265. function rediscluster:call(...)
  266. local argv = table.pack(...)
  267. if self.refresh_table_asap then
  268. self:initialize_slots_cache()
  269. end
  270. local ttl = RedisClusterRequestTTL -- Max number of redirections
  271. local err
  272. local asking = false
  273. local try_random_node = false
  274. while ttl > 0 do
  275. ttl = ttl - 1
  276. local key = self:get_key_from_command(argv)
  277. if not key then
  278. error("No way to dispatch this command to Redis Cluster: " .. tostring(argv[1]))
  279. end
  280. local conn
  281. local slot = self:keyslot(key)
  282. if asking then
  283. conn = self:get_connection(asking)
  284. elseif try_random_node then
  285. conn = self:get_random_connection()
  286. try_random_node = false
  287. else
  288. conn = self:get_connection_by_slot(slot)
  289. end
  290. local result = {pcall(function ()
  291. -- TODO: use pipelining to send asking and save a rtt.
  292. if asking then
  293. conn:asking()
  294. end
  295. asking = false
  296. local cmd = argv[1]
  297. local func = conn[cmd]
  298. return func(conn,table.unpack(argv,2))
  299. end)}
  300. local ok = result[1]
  301. if not ok then
  302. err = table.unpack(result,2)
  303. err = tostring(err)
  304. if err == "[Error: socket]" then
  305. -- may be nerver come here?
  306. try_random_node = true
  307. if ttl < RedisClusterRequestTTL/2 then
  308. skynet.sleep(10)
  309. end
  310. else
  311. -- err: ./lualib/skynet/socketchannel.lua:371: xxx
  312. err = string.match(err,".+:%d+:%s(.*)$") or err
  313. local errlist = {}
  314. for e in string.gmatch(err,"([^%s]+)%s?") do
  315. table.insert(errlist,e)
  316. end
  317. if (errlist[1] ~= "MOVED" and errlist[1] ~= "ASK") then
  318. error(err)
  319. else
  320. if errlist[1] == "ASK" then
  321. asking = true
  322. else
  323. -- Serve replied with MOVED. It's better for us to
  324. -- ask for CLUSTER SLOTS the next time.
  325. self.refresh_table_asap = true
  326. end
  327. local newslot = tonumber(errlist[2])
  328. local node_ip,node_port = string.match(errlist[3],"^([^:]+):([^:]+)$")
  329. node_port = assert(tonumber(node_port))
  330. local node = {
  331. host = node_ip,
  332. port = node_port,
  333. }
  334. if not asking then
  335. self:set_node_name(node)
  336. self.slots[newslot] = node
  337. else
  338. asking = node
  339. end
  340. end
  341. end
  342. else
  343. return table.unpack(result,2)
  344. end
  345. end
  346. error(string.format("Too many Cluster redirections?,maybe node is disconnected (last error: %q)",err))
  347. end
  348. -- Currently we handle all the commands using method_missing for
  349. -- simplicity. For a Cluster client actually it will be better to have
  350. -- every single command as a method with the right arity and possibly
  351. -- additional checks (example: RPOPLPUSH with same src/dst key, SORT
  352. -- without GET or BY, and so forth).
  353. setmetatable(rediscluster,{
  354. __index = function (t,cmd)
  355. t[cmd] = function (self,...)
  356. return self:call(cmd,...)
  357. end
  358. return t[cmd]
  359. end,
  360. })
  361. return _M