一般解決思路
redis集群有16384個(gè)slot传藏,例如有3個(gè)節(jié)點(diǎn),那么每個(gè)節(jié)點(diǎn)可能分配的slot為Node A是0-5500,Node B是5501-11000邦危,Node C是11001-16383洋侨。pipeline是要基于某個(gè)節(jié)點(diǎn)的,所以如果要用pipeline查詢某些key的值倦蚪,那么就需要通過JedisClusterCRC16.getSlot(key)
計(jì)算key的slot值希坚,通過上面每個(gè)節(jié)點(diǎn)的slot分布,就知道了哪些key應(yīng)該在哪些節(jié)點(diǎn)上陵且。再獲取這個(gè)節(jié)點(diǎn)的JedisPool就可以使用pipeline進(jìn)行讀寫了裁僧。實(shí)現(xiàn)上面的過程可以有很多種方式,本文將介紹一種也許是代碼量最少的一種解決方法慕购。本文基于redis 3.2.9(如何安裝redis集群請(qǐng)參考http://www.reibang.com/p/64d05c4e0ae2)以及
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
解決方案
上一節(jié)提到的過程聊疲,其實(shí)在JedisClusterInfoCache
對(duì)象中都已經(jīng)幫助開發(fā)人員實(shí)現(xiàn)了,但是這個(gè)對(duì)象在JedisClusterConnectionHandler
中為protected
并沒有對(duì)外開放沪悲,而且通過JedisCluster
的API也無(wú)法拿到JedisClusterConnectionHandler
對(duì)象获洲。所以通過下面兩個(gè)類將這些對(duì)象暴露出來,這樣使用getJedisPoolFromSlot
就可以知道每個(gè)key對(duì)應(yīng)的JedisPool了殿如。
class JedisClusterPlus extends JedisCluster {
public JedisClusterPlus(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout, final GenericObjectPoolConfig poolConfig) {
super(jedisClusterNode);
super.connectionHandler = new JedisSlotAdvancedConnectionHandler(jedisClusterNode, poolConfig,
connectionTimeout, soTimeout);
}
public JedisSlotAdvancedConnectionHandler getConnectionHandler() {
return (JedisSlotAdvancedConnectionHandler)this.connectionHandler;
}
}
public class JedisSlotAdvancedConnectionHandler extends JedisSlotBasedConnectionHandler{
public JedisSlotAdvancedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout) {
super(nodes, poolConfig, connectionTimeout, soTimeout);
}
public JedisPool getJedisPoolFromSlot(int slot) {
JedisPool connectionPool = cache.getSlotPool(slot);
if (connectionPool != null) {
// It can't guaranteed to get valid connection because of node
// assignment
return connectionPool;
} else {
renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
connectionPool = cache.getSlotPool(slot);
if (connectionPool != null) {
return connectionPool;
} else {
throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot " + slot);
}
}
}
}
Demo
public class Tester {
public static void main(String[] args) {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
HostAndPort hostAndPort1 = new HostAndPort("hostA",7000);
HostAndPort hostAndPort2 = new HostAndPort("hostB",7001);
HostAndPort hostAndPort3 = new HostAndPort("hostC",7002);
jedisClusterNode.add(hostAndPort1);
jedisClusterNode.add(hostAndPort2);
jedisClusterNode.add(hostAndPort3);
JedisClusterPlus jedisClusterPlus = new JedisClusterPlus(jedisClusterNode, 2000, 2000, new JedisPoolConfig());
JedisSlotAdvancedConnectionHandler jedisSlotAdvancedConnectionHandler = jedisClusterPlus.getConnectionHandler();
String[] testKeys = {"foo","bar","xyz"};
Map<JedisPool, List<String>> poolKeys = new HashMap<>();
for (String key : testKeys) {
int slot = JedisClusterCRC16.getSlot(key);
JedisPool jedisPool = jedisSlotAdvancedConnectionHandler.getJedisPoolFromSlot(slot);
if (poolKeys.keySet().contains(jedisPool)){
List<String> keys = poolKeys.get(jedisPool);
keys.add(key);
}else {
List<String> keys = new ArrayList<>();
keys.add(key);
poolKeys.put(jedisPool, keys);
}
}
for (JedisPool jedisPool : poolKeys.keySet()) {
Jedis jedis = jedisPool.getResource();
Pipeline pipeline = jedis.pipelined();
List<String> keys = poolKeys.get(jedisPool);
keys.forEach(key ->pipeline.get(key));
List result = pipeline.syncAndReturnAll();
System.out.println(result);
jedis.close();
}
}
}