JedisCluster 操作管道(基于jedis 2.9+的版本)

前言

現(xiàn)在很多的博客論壇族购,很多都是以前寫的代碼。殊不知陵珍,這代碼不是一層不變的寝杖。特別是涉及到源碼的改變。這就導(dǎo)致很多網(wǎng)上的文章幾乎都是 copy 來?copy 去的互纯。這里也只是建議大家的有看源碼的習(xí)慣瑟幕。不然,照抄網(wǎng)上的博客有時候真的不能解決問題留潦。還得動動腦子只盹。本人也是踩坑過來的 。好了兔院÷拱裕回到重點,

為什么??JedisCluster 不支持直接操作管道(Pipeline)?? (如果面試這么問秆乳。你怎么回答?百思不得其姐(解)? 歡迎留言^_^)

首先我們看下?JedisCluster? 源碼钻哩。

public class JedisCluster extends BinaryJedisCluster implements JedisCommands,

? ? MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {

/*****************一堆方法*****************/

? }

看到這個 JedisCluster 類 繼承 BinaryJedisCluster屹堰。 好了我們在下一步看? ?BinaryJedisCluster 類里面到底是什么?

public class BinaryJedisCluster implements BasicCommands, BinaryJedisClusterCommands,

? ? MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable {

? public static final short HASHSLOTS = 16384;

? protected static final int DEFAULT_TIMEOUT = 2000;

? protected static final int DEFAULT_MAX_REDIRECTIONS = 5;

? protected int maxAttempts;

? protected JedisClusterConnectionHandler connectionHandler;

/****************************一堆方法*****************************/

}

可以看到在?BinaryJedisCluster? 繼承一些接口街氢。所以我我們先看下這個類下除了構(gòu)造方法還剩下什么東東扯键?

埃!I核唷荣刑!?JedisClusterConnectionHandler connectionHandler; 這個類里面會不會有我們想要的東西呢馅笙?進(jìn)去看下

package redis.clients.jedis;

import java.io.Closeable;

import java.util.Map;

import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.exceptions.JedisConnectionException;

public abstract class JedisClusterConnectionHandler implements Closeable {

????protected final JedisClusterInfoCache cache;

? public JedisClusterConnectionHandler(Set<HostAndPort> nodes,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {

? ? this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);

? ? initializeSlotsCache(nodes, poolConfig, password);

? }

? abstract Jedis getConnection();

? abstract Jedis getConnectionFromSlot(int slot);

? public Jedis getConnectionFromNode(HostAndPort node) {

? ? return cache.setupNodeIfNotExist(node).getResource();

? }


? public Map<String, JedisPool> getNodes() {

? ? return cache.getNodes();

? }

? private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {

? ? for (HostAndPort hostAndPort : startNodes) {

? ? ? Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());

? ? ? if (password != null) {

? ? ? ? jedis.auth(password);

? ? ? }

? ? ? try {

? ? ? ? cache.discoverClusterNodesAndSlots(jedis);

? ? ? ? break;

? ? ? } catch (JedisConnectionException e) {

? ? ? ? // try next nodes

? ? ? } finally {

? ? ? ? if (jedis != null) {

? ? ? ? ? jedis.close();

? ? ? ? }

? ? ? }

? ? }

? }

? public void renewSlotCache() {

? ? cache.renewClusterSlots(null);

? }

? public void renewSlotCache(Jedis jedis) {

? ? cache.renewClusterSlots(jedis);

? }

? @Override

? public void close() {

? ? cache.reset();

? }

}


這是個抽象類。里面有2個抽象方法厉亏。在2.9以前版本?

??abstract Jedis getConnection();

? abstract Jedis getConnectionFromSlot(int slot);

這2方法可有所實現(xiàn)董习。(沒去看2.9以前的版本源碼)

網(wǎng)上很多以前博客的都是使用??getConnectionFromSlot(int slot); 來獲取某個 jedis? 來操作??pipeline。

所以在此我們還能怎么辦呢?

這會兒爱只。我們看到

JedisClusterInfoCache cache;? ? ? ?這里面會不會有有我們想要的皿淋?

public class JedisClusterInfoCache {

? private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();

? private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();

? private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

? private final Lock r = rwl.readLock();

? private final Lock w = rwl.writeLock();

? private volatile boolean rediscovering;

? private final GenericObjectPoolConfig poolConfig;

? private int connectionTimeout;

? private int soTimeout;

? private String password;

? private static final int MASTER_NODE_INDEX = 2;

? public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) {

? ? this(poolConfig, timeout, timeout, null);

? }

? public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig,

? ? ? final int connectionTimeout, final int soTimeout, final String password) {

? ? this.poolConfig = poolConfig;

? ? this.connectionTimeout = connectionTimeout;

? ? this.soTimeout = soTimeout;

? ? this.password = password;

? }

/******方法******以下方法是我重點標(biāo)注的*************/

public JedisPool getNode(String nodeKey) {

? ? r.lock();

? ? try {

? ? ? return nodes.get(nodeKey);

? ? } finally {

? ? ? r.unlock();

? ? }

? }

? public JedisPool getSlotPool(int slot) {

? ? r.lock();

? ? try {

? ? ? return slots.get(slot);

? ? } finally {

? ? ? r.unlock();

? ? }

? }

? public Map<String, JedisPool> getNodes() {

? ? r.lock();

? ? try {

? ? ? return new HashMap<String, JedisPool>(nodes);

? ? } finally {

? ? ? r.unlock();

? ? }

? }

? public List<JedisPool> getShuffledNodesPool() {

? ? r.lock();

? ? try {

? ? ? List<JedisPool> pools = new ArrayList<JedisPool>(nodes.values());

? ? ? Collections.shuffle(pools);

? ? ? return pools;

? ? } finally {

? ? ? r.unlock();

? ? }

? }

埃~看到了可以獲取到某個jedis??

其實這個?JedisClusterInfoCache 類 是你在初始化jedisCluster時 將所有的節(jié)點放入緩存。

因此恬试,這個類的方法能給我們返回相關(guān)的jedis實例

我們要這么做呢窝趣?

接下來是我的代碼。通過java的反射機制直接獲取训柴。

public static void main(String[] args) throws NoSuchFieldException {

? ? ? ? JedisPoolConfig config = new JedisPoolConfig();

? ? ? ? Set<HostAndPort> nodeList = new HashSet<>();

? ? ? ? nodeList.add(new HostAndPort("192.168.41.65", 6379));

? ? ? ? nodeList.add(new HostAndPort("192.168.41.70", 6379));

? ? ? ? nodeList.add(new HostAndPort("192.168.41.42", 6379));

? ? ? ? nodeList.add(new HostAndPort("192.168.41.20", 6380));

? ? ? ? nodeList.add(new HostAndPort("192.168.41.30", 6380));

? ? ? ? nodeList.add(new HostAndPort("192.168.41.40", 6380));

? ? ? ? JedisCluster jedisCluster = new JedisCluster(nodeList, 3000, config);

? ? ? ? jedisCluster.set("James", "Bond");

? ? ? ? //通過 java.lang.reflect.Field 反射

? ? ? ? Jedis jedis = getJedisFieldBySlot(jedisCluster, 0, "James");


? ? ? ? //通過spring 工具類? ReflectionUtils 反射

? ? ? ? Jedis j = getJedisBySlot(jedisCluster, 0, "James");

// 接下來就是pipeline操作了

if(jedis != null) {

Pipeline pipeline = jedis.pipelined();

pipeline.syncAndReturnAll();

// jedis會自動將資源歸還到連接池

jedis.close();

}else {

System.err.println("找不到 jedis");

}

? ? }

/**

* 集裙中根據(jù) key對應(yīng)的slot 獲取槽位 或 key 返回對應(yīng)的某個Jedis 實例

* @param jedisCluster

* @param slot

* @param key

* @return Jedis

*/

public static Jedis getJedisFieldBySlot(JedisCluster jedisCluster,int slot,String key) {

try {

if(key !=null) {

// 獲取key對應(yīng)的slot 獲取槽號(0~16383)

slot = JedisClusterCRC16.getSlot(key);

}

Field field = BinaryJedisCluster.class.getDeclaredField("connectionHandler");

field.setAccessible(true);

JedisClusterConnectionHandler connectionHandler =? (JedisClusterConnectionHandler) field.get(jedisCluster);

Field jedisclusterinfocache =? JedisClusterConnectionHandler.class.getDeclaredField("cache");

jedisclusterinfocache.setAccessible(true);

JedisClusterInfoCache cache = (JedisClusterInfoCache) jedisclusterinfocache.get(connectionHandler);

JedisPool pool = cache.getSlotPool(slot);

Jedis jedis = pool.getResource();

return jedis;

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return null;

}

/**

* 集裙中根據(jù) key對應(yīng)的slot 獲取槽位 或 key 返回對應(yīng)的某個Jedis 實例

* @param jedisCluster

* @param slot

* @param key

* @return Jedis

*/

public static Jedis getJedisBySlot(JedisCluster jedisCluster,int slot,String key) {

try {

if(key !=null) {

slot = JedisClusterCRC16.getSlot(key);

}

//org.springframework.util.ReflectionUtils 工具類? BinaryJedisCluster 下的? JedisClusterConnectionHandler

Field field = ReflectionUtils.findField(BinaryJedisCluster.class, null, JedisClusterConnectionHandler.class);

field.setAccessible(true);

JedisClusterConnectionHandler connectionHandler =? (JedisClusterConnectionHandler) field.get(jedisCluster);

Field jedisclusterinfocache = ReflectionUtils.findField(JedisClusterConnectionHandler.class, null, JedisClusterInfoCache.class);

jedisclusterinfocache.setAccessible(true);

JedisClusterInfoCache cache = (JedisClusterInfoCache) jedisclusterinfocache.get(connectionHandler);

JedisPool pool = cache.getSlotPool(slot);

Jedis jedis = pool.getResource();

return jedis;

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return null;

}

但是大家以為以上的代碼就很完美了么哑舒?

確實。在去找緩存里的jedis時幻馁,可能某個節(jié)點掛了洗鸵,然后剛好程序拿到這個實例,

這時候這里就會出現(xiàn)錯誤宣赔。因此我們 應(yīng)該在原來的基礎(chǔ)上预麸,去刷新一遍集群。

代碼由你們來給吧儒将。我不會寫了吏祸。哈哈哈·


最后 如果針對? JedisClusterInfoCache 源碼分析的 請看? ?https://www.cnblogs.com/zhengzuozhanglina/p/11383035.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市钩蚊,隨后出現(xiàn)的幾起案子贡翘,更是在濱河造成了極大的恐慌,老刑警劉巖砰逻,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鸣驱,死亡現(xiàn)場離奇詭異,居然都是意外死亡蝠咆,警方通過查閱死者的電腦和手機踊东,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來刚操,“玉大人闸翅,你說我怎么就攤上這事【账” “怎么了坚冀?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長鉴逞。 經(jīng)常有香客問我记某,道長司训,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任液南,我火速辦了婚禮壳猜,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘贺拣。我一直安慰自己蓖谢,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布譬涡。 她就那樣靜靜地躺著闪幽,像睡著了一般。 火紅的嫁衣襯著肌膚如雪涡匀。 梳的紋絲不亂的頭發(fā)上盯腌,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天,我揣著相機與錄音陨瘩,去河邊找鬼腕够。 笑死,一個胖子當(dāng)著我的面吹牛舌劳,可吹牛的內(nèi)容都是我干的帚湘。 我是一名探鬼主播,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼甚淡,長吁一口氣:“原來是場噩夢啊……” “哼大诸!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起贯卦,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤资柔,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后撵割,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體贿堰,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年啡彬,在試婚紗的時候發(fā)現(xiàn)自己被綠了羹与。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡庶灿,死狀恐怖注簿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情跳仿,我是刑警寧澤,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布捐晶,位于F島的核電站菲语,受9級特大地震影響妄辩,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜山上,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一眼耀、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧佩憾,春花似錦哮伟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至抡驼,卻和暖如春鬼廓,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背致盟。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工碎税, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人馏锡。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓雷蹂,卻偏偏與公主長得像,于是被迫代替她去往敵國和親杯道。 傳聞我的和親對象是個殘疾皇子匪煌,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容