前言
現(xiàn)在很多的博客論壇族购,很多都是以前寫的代碼。殊不知陵珍,這代碼不是一層不變的寝杖。特別是涉及到源碼的改變。這就導(dǎo)致很多網(wǎng)上的文章幾乎都是 copy 來?copy 去的互纯。這里也只是建議大家的有看源碼的習(xí)慣瑟幕。不然,照抄網(wǎng)上的博客有時候真的不能解決問題留潦。還得動動腦子只盹。本人也是踩坑過來的 。好了兔院÷拱裕回到重點,
為什么??JedisCluster 不支持直接操作管道(Pipeline)?? (如果面試這么問秆乳。你怎么回答?百思不得其姐(解)? 歡迎留言^_^)
首先我們看下?JedisCluster? 源碼钻哩。
public class JedisCluster extends BinaryJedisCluster implements JedisCommands,
? ? MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {
/*****************一堆方法*****************/
? }
看到這個 JedisCluster 類 繼承 BinaryJedisCluster屹堰。 好了我們在下一步看? ?BinaryJedisCluster 類里面到底是什么?
public class BinaryJedisCluster implements BasicCommands, BinaryJedisClusterCommands,
? ? MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable {
? public static final short HASHSLOTS = 16384;
? protected static final int DEFAULT_TIMEOUT = 2000;
? protected static final int DEFAULT_MAX_REDIRECTIONS = 5;
? protected int maxAttempts;
? protected JedisClusterConnectionHandler connectionHandler;
/****************************一堆方法*****************************/
}
可以看到在?BinaryJedisCluster? 繼承一些接口街氢。所以我我們先看下這個類下除了構(gòu)造方法還剩下什么東東扯键?
埃!I核唷荣刑!?JedisClusterConnectionHandler connectionHandler; 這個類里面會不會有我們想要的東西呢馅笙?進(jìn)去看下
package redis.clients.jedis;
import java.io.Closeable;
import java.util.Map;
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.exceptions.JedisConnectionException;
public abstract class JedisClusterConnectionHandler implements Closeable {
????protected final JedisClusterInfoCache cache;
? public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
? ? this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);
? ? initializeSlotsCache(nodes, poolConfig, password);
? }
? abstract Jedis getConnection();
? abstract Jedis getConnectionFromSlot(int slot);
? public Jedis getConnectionFromNode(HostAndPort node) {
? ? return cache.setupNodeIfNotExist(node).getResource();
? }
? public Map<String, JedisPool> getNodes() {
? ? return cache.getNodes();
? }
? private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {
? ? for (HostAndPort hostAndPort : startNodes) {
? ? ? Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
? ? ? if (password != null) {
? ? ? ? jedis.auth(password);
? ? ? }
? ? ? try {
? ? ? ? cache.discoverClusterNodesAndSlots(jedis);
? ? ? ? break;
? ? ? } catch (JedisConnectionException e) {
? ? ? ? // try next nodes
? ? ? } finally {
? ? ? ? if (jedis != null) {
? ? ? ? ? jedis.close();
? ? ? ? }
? ? ? }
? ? }
? }
? public void renewSlotCache() {
? ? cache.renewClusterSlots(null);
? }
? public void renewSlotCache(Jedis jedis) {
? ? cache.renewClusterSlots(jedis);
? }
? @Override
? public void close() {
? ? cache.reset();
? }
}
這是個抽象類。里面有2個抽象方法厉亏。在2.9以前版本?
??abstract Jedis getConnection();
? abstract Jedis getConnectionFromSlot(int slot);
這2方法可有所實現(xiàn)董习。(沒去看2.9以前的版本源碼)
網(wǎng)上很多以前博客的都是使用??getConnectionFromSlot(int slot); 來獲取某個 jedis? 來操作??pipeline。
所以在此我們還能怎么辦呢?
這會兒爱只。我們看到
JedisClusterInfoCache cache;? ? ? ?這里面會不會有有我們想要的皿淋?
public class JedisClusterInfoCache {
? private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
? private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
? private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
? private final Lock r = rwl.readLock();
? private final Lock w = rwl.writeLock();
? private volatile boolean rediscovering;
? private final GenericObjectPoolConfig poolConfig;
? private int connectionTimeout;
? private int soTimeout;
? private String password;
? private static final int MASTER_NODE_INDEX = 2;
? public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) {
? ? this(poolConfig, timeout, timeout, null);
? }
? public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig,
? ? ? final int connectionTimeout, final int soTimeout, final String password) {
? ? this.poolConfig = poolConfig;
? ? this.connectionTimeout = connectionTimeout;
? ? this.soTimeout = soTimeout;
? ? this.password = password;
? }
/******方法******以下方法是我重點標(biāo)注的*************/
public JedisPool getNode(String nodeKey) {
? ? r.lock();
? ? try {
? ? ? return nodes.get(nodeKey);
? ? } finally {
? ? ? r.unlock();
? ? }
? }
? public JedisPool getSlotPool(int slot) {
? ? r.lock();
? ? try {
? ? ? return slots.get(slot);
? ? } finally {
? ? ? r.unlock();
? ? }
? }
? public Map<String, JedisPool> getNodes() {
? ? r.lock();
? ? try {
? ? ? return new HashMap<String, JedisPool>(nodes);
? ? } finally {
? ? ? r.unlock();
? ? }
? }
? public List<JedisPool> getShuffledNodesPool() {
? ? r.lock();
? ? try {
? ? ? List<JedisPool> pools = new ArrayList<JedisPool>(nodes.values());
? ? ? Collections.shuffle(pools);
? ? ? return pools;
? ? } finally {
? ? ? r.unlock();
? ? }
? }
埃~看到了可以獲取到某個jedis??
其實這個?JedisClusterInfoCache 類 是你在初始化jedisCluster時 將所有的節(jié)點放入緩存。
因此恬试,這個類的方法能給我們返回相關(guān)的jedis實例
我們要這么做呢窝趣?
接下來是我的代碼。通過java的反射機制直接獲取训柴。
public static void main(String[] args) throws NoSuchFieldException {
? ? ? ? JedisPoolConfig config = new JedisPoolConfig();
? ? ? ? Set<HostAndPort> nodeList = new HashSet<>();
? ? ? ? nodeList.add(new HostAndPort("192.168.41.65", 6379));
? ? ? ? nodeList.add(new HostAndPort("192.168.41.70", 6379));
? ? ? ? nodeList.add(new HostAndPort("192.168.41.42", 6379));
? ? ? ? nodeList.add(new HostAndPort("192.168.41.20", 6380));
? ? ? ? nodeList.add(new HostAndPort("192.168.41.30", 6380));
? ? ? ? nodeList.add(new HostAndPort("192.168.41.40", 6380));
? ? ? ? JedisCluster jedisCluster = new JedisCluster(nodeList, 3000, config);
? ? ? ? jedisCluster.set("James", "Bond");
? ? ? ? //通過 java.lang.reflect.Field 反射
? ? ? ? Jedis jedis = getJedisFieldBySlot(jedisCluster, 0, "James");
? ? ? ? //通過spring 工具類? ReflectionUtils 反射
? ? ? ? Jedis j = getJedisBySlot(jedisCluster, 0, "James");
// 接下來就是pipeline操作了
if(jedis != null) {
Pipeline pipeline = jedis.pipelined();
pipeline.syncAndReturnAll();
// jedis會自動將資源歸還到連接池
jedis.close();
}else {
System.err.println("找不到 jedis");
}
? ? }
/**
* 集裙中根據(jù) key對應(yīng)的slot 獲取槽位 或 key 返回對應(yīng)的某個Jedis 實例
* @param jedisCluster
* @param slot
* @param key
* @return Jedis
*/
public static Jedis getJedisFieldBySlot(JedisCluster jedisCluster,int slot,String key) {
try {
if(key !=null) {
// 獲取key對應(yīng)的slot 獲取槽號(0~16383)
slot = JedisClusterCRC16.getSlot(key);
}
Field field = BinaryJedisCluster.class.getDeclaredField("connectionHandler");
field.setAccessible(true);
JedisClusterConnectionHandler connectionHandler =? (JedisClusterConnectionHandler) field.get(jedisCluster);
Field jedisclusterinfocache =? JedisClusterConnectionHandler.class.getDeclaredField("cache");
jedisclusterinfocache.setAccessible(true);
JedisClusterInfoCache cache = (JedisClusterInfoCache) jedisclusterinfocache.get(connectionHandler);
JedisPool pool = cache.getSlotPool(slot);
Jedis jedis = pool.getResource();
return jedis;
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
/**
* 集裙中根據(jù) key對應(yīng)的slot 獲取槽位 或 key 返回對應(yīng)的某個Jedis 實例
* @param jedisCluster
* @param slot
* @param key
* @return Jedis
*/
public static Jedis getJedisBySlot(JedisCluster jedisCluster,int slot,String key) {
try {
if(key !=null) {
slot = JedisClusterCRC16.getSlot(key);
}
//org.springframework.util.ReflectionUtils 工具類? BinaryJedisCluster 下的? JedisClusterConnectionHandler
Field field = ReflectionUtils.findField(BinaryJedisCluster.class, null, JedisClusterConnectionHandler.class);
field.setAccessible(true);
JedisClusterConnectionHandler connectionHandler =? (JedisClusterConnectionHandler) field.get(jedisCluster);
Field jedisclusterinfocache = ReflectionUtils.findField(JedisClusterConnectionHandler.class, null, JedisClusterInfoCache.class);
jedisclusterinfocache.setAccessible(true);
JedisClusterInfoCache cache = (JedisClusterInfoCache) jedisclusterinfocache.get(connectionHandler);
JedisPool pool = cache.getSlotPool(slot);
Jedis jedis = pool.getResource();
return jedis;
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
但是大家以為以上的代碼就很完美了么哑舒?
確實。在去找緩存里的jedis時幻馁,可能某個節(jié)點掛了洗鸵,然后剛好程序拿到這個實例,
這時候這里就會出現(xiàn)錯誤宣赔。因此我們 應(yīng)該在原來的基礎(chǔ)上预麸,去刷新一遍集群。
代碼由你們來給吧儒将。我不會寫了吏祸。哈哈哈·
最后 如果針對? JedisClusterInfoCache 源碼分析的 請看? ?https://www.cnblogs.com/zhengzuozhanglina/p/11383035.html