MQTT---HiveMQ源碼詳解(十七)Cluster-Consistent Hashing Ring & Node Lifecycle

Consistent Hashing Ring

基本上只要做Cluster,都會使用到一致性Hash環(huán),具體作用此處就不細講诅病,我們只了解HiveMQ怎么用它,怎么實現(xiàn)它,這樣實現(xiàn)能夠帶來什么好處贤笆。

  • HiveMQ沒有Master/Slave,它只由JGroup View(詳情請查閱JGroup)第一個node作為Coordinator蝇棉,這樣就可以達到一個node也可以做集群(雖然這樣的集群沒有什么卵用)。

  • HiveMQ采用兩個一致性Hash環(huán),來解決腦裂問題芥永,以及腦裂后merge的問題篡殷。

  • 每個node 500個虛擬節(jié)點,來增加node變化帶來的動蕩問題埋涧。

  • Primary環(huán):排除joining的node,即只添加RUNNING狀態(tài)的node板辽。

  • Minority環(huán):包含joining的node,即添加JOINING、RUNNING棘催、MERGING狀態(tài)的node劲弦。

  • 它的hash算法由net.openhft.hashing.LongHashFunction.xx_r39()提供

ConsistentHashingRing源碼

相對來說比較簡單,我就不一行一行寫注釋了醇坝,網(wǎng)上針對一致性hash環(huán)實現(xiàn)各種版本到處都是邑跪,詳細講解也到處都是。


@Singleton
public class ConsistentHashingRing {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsistentHashingRing.class);
    private final String name;
    public static final int NODE_BUCKET_COUNT = 500;
    private final LongHashFunction hashFunction;
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
    @VisibleForTesting
    final NavigableMap<Long, String> buckets;
    @VisibleForTesting
    final ConcurrentHashMap<String, String> bucketNodes = new ConcurrentHashMap<>();
    final Set<String> nodes = Sets.newConcurrentHashSet();

    public ConsistentHashingRing(String name, LongHashFunction hashFunction) {
        this.name = name;
        this.buckets = new ConcurrentSkipListMap();
        this.hashFunction = hashFunction;
    }

    public void add(@NotNull String node) {
        Preconditions.checkNotNull(node, "Name must not be null");
        LOGGER.trace("Add node {} to the {}.", node, this.name);
        Lock lock = this.readWriteLock.writeLock();
        lock.lock();
        try {
            for (int bucketIndex = 0; bucketIndex < NODE_BUCKET_COUNT; bucketIndex++) {
                long bucketHash = this.hashFunction.hashChars(node + bucketIndex);
                if (this.buckets.containsKey(bucketHash)) {
                    if (this.buckets.get(bucketHash).compareTo(node + 1) > 0) {
                        this.buckets.put(bucketHash, node + bucketIndex);
                        this.nodes.add(node);
                        this.bucketNodes.put(node + bucketIndex, node);
                    }
                } else {
                    this.buckets.put(bucketHash, node + bucketIndex);
                    this.nodes.add(node);
                    this.bucketNodes.put(node + bucketIndex, node);
                }
            }
        } finally {
            lock.unlock();
        }
    }

    public void remove(@NotNull String node) {
        Preconditions.checkNotNull(node, "Name must not be null");
        LOGGER.trace("Remove node {} from the {}.", node, this.name);
        Lock lock = this.readWriteLock.writeLock();
        lock.lock();
        try {
            for (int bucketIndex = 0; bucketIndex < NODE_BUCKET_COUNT; bucketIndex++) {
                long bucketHash = this.hashFunction.hashChars(node + bucketIndex);
                this.buckets.remove(bucketHash);
                this.bucketNodes.remove(node + bucketIndex);
            }
            this.nodes.remove(node);
        } finally {
            lock.unlock();
        }
    }

    public Set<String> getReplicaNodes(@NotNull String key, int replicateCount) {
        Preconditions.checkNotNull(key, "key must not be null");
        int nodeCount = this.nodes.size();
        if (replicateCount > nodeCount - 1) {
            LOGGER.trace("There are not enough buckets in the consistent hash ring for {} replicas.", replicateCount);
            replicateCount = nodeCount - 1;
        }
        String bucket = getBucket(key);
        long bucketHash = this.hashFunction.hashChars(bucket);
        Lock lock = this.readWriteLock.readLock();
        lock.lock();
        Set<String> buckets = new HashSet<>();
        try {
            for (Map.Entry<Long, String> entry = this.buckets.higherEntry(bucketHash);
                 buckets.size() < replicateCount;
                 entry = this.buckets.higherEntry(entry.getKey())) {
                if (entry == null) {
                    entry = this.buckets.firstEntry();
                }
                if (!this.bucketNodes.get(entry.getValue()).equals(this.bucketNodes.get(bucket))) {
                    buckets.add(this.bucketNodes.get(entry.getValue()));
                }
            }
            return buckets;
        } finally {
            lock.unlock();
        }
    }

    public Set<String> getNodes() {
        ImmutableSet.Builder<String> builder = ImmutableSet.builder();
        Lock lock = this.readWriteLock.readLock();
        lock.lock();
        try {
            return builder.addAll(this.nodes).build();
        } finally {
            lock.unlock();
        }
    }

    public String getBucket(@NotNull String key) {
        Preconditions.checkNotNull(key, "key must not be null");
        if (this.buckets.isEmpty()) {
            throw new IllegalStateException("Consistent hash ring is empty.");
        }
        long keyHash = this.hashFunction.hashChars(key);
        Lock lock = this.readWriteLock.readLock();
        lock.lock();
        try {
            Map.Entry<Long, String> entry = this.buckets.ceilingEntry(keyHash);
            if (entry != null) {
                return entry.getValue();
            }
            return this.buckets.ceilingEntry(Long.MIN_VALUE).getValue();
        } finally {
            lock.unlock();
        }
    }

    public String getNode(@NotNull String key) {
        Preconditions.checkNotNull(key, "key must not be null");
        if (this.buckets.isEmpty()) {
            throw new IllegalStateException("Consistent hash ring is empty.");
        }
        long keyHash = this.hashFunction.hashChars(key);
        Lock lock = this.readWriteLock.readLock();
        lock.lock();
        try {
            Map.Entry<Long, String> entry = this.buckets.ceilingEntry(keyHash);
            if (entry != null) {
                return this.bucketNodes.get(entry.getValue());
            }
            return this.bucketNodes.get(this.buckets.ceilingEntry(Long.MIN_VALUE).getValue());
        } finally {
            lock.unlock();
        }
    }
}

Node Lifecycle

其實了解了上面HiveMQ Cluster的基礎之后呼猪,再來看node的生命周期画畅,就是一件簡單的事情了。

廢話少說郑叠,我們直接上狀態(tài)變化圖夜赵。

這里寫圖片描述

各種狀態(tài)簡介

UNKNOWN

當JGroup通知新的node連接,但在本地不存在,則該node狀態(tài)標記為UNKNOWN

NOT_JOINED

當node連接上JGroup后,若它不是唯一的node,則它將自己主動標記為NOT_JOINED

JOINING

當node將自己的狀態(tài)更新至Cluster完成后,它將自己主動標記為JOINING

MERGE_MINORITY

當腦裂后與Coordinator在同組的其他node都將被標記為MERGE_MINORITY;或者加入Primary Group失敗后它將自己主動標記為MERGE_MINORITY

MERGING

MERGE_MINORITY會一直去嘗試主動將自己標記為MERGING

RUNNING

當MERGING成功后,node將會進行Replicate操作,當Replicate操作完成,就主動將自己標記為RUNNING

SHUTTING_DOWN/SHUTDOWN_FINISHED/DEAD

這三種狀態(tài)在源碼中未被使用,但HiveMQ還這樣定義,或許是保留吧乡革,反正博主未搞懂,不過不重要摊腋,不懂就算了沸版,_


MQTT交流群:221405150

RocketMQ交流群:10648794

NewSQL交流群:153575008


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末兴蒸,一起剝皮案震驚了整個濱河市视粮,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌橙凳,老刑警劉巖蕾殴,帶你破解...
    沈念sama閱讀 222,865評論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異岛啸,居然都是意外死亡钓觉,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,296評論 3 399
  • 文/潘曉璐 我一進店門坚踩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來荡灾,“玉大人,你說我怎么就攤上這事∨希” “怎么了础锐?”我有些...
    開封第一講書人閱讀 169,631評論 0 364
  • 文/不壞的土叔 我叫張陵,是天一觀的道長荧缘。 經(jīng)常有香客問我皆警,道長,這世上最難降的妖魔是什么截粗? 我笑而不...
    開封第一講書人閱讀 60,199評論 1 300
  • 正文 為了忘掉前任信姓,我火速辦了婚禮,結(jié)果婚禮上桐愉,老公的妹妹穿的比我還像新娘财破。我一直安慰自己,他們只是感情好从诲,可當我...
    茶點故事閱讀 69,196評論 6 398
  • 文/花漫 我一把揭開白布左痢。 她就那樣靜靜地躺著,像睡著了一般系洛。 火紅的嫁衣襯著肌膚如雪俊性。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,793評論 1 314
  • 那天描扯,我揣著相機與錄音定页,去河邊找鬼。 笑死绽诚,一個胖子當著我的面吹牛典徊,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播恩够,決...
    沈念sama閱讀 41,221評論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼卒落,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了蜂桶?” 一聲冷哼從身側(cè)響起儡毕,我...
    開封第一講書人閱讀 40,174評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎扑媚,沒想到半個月后腰湾,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,699評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡疆股,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,770評論 3 343
  • 正文 我和宋清朗相戀三年费坊,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片押桃。...
    茶點故事閱讀 40,918評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡葵萎,死狀恐怖导犹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情羡忘,我是刑警寧澤谎痢,帶...
    沈念sama閱讀 36,573評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站卷雕,受9級特大地震影響节猿,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜漫雕,卻給世界環(huán)境...
    茶點故事閱讀 42,255評論 3 336
  • 文/蒙蒙 一滨嘱、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧浸间,春花似錦太雨、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,749評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至兜看,卻和暖如春锥咸,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背细移。 一陣腳步聲響...
    開封第一講書人閱讀 33,862評論 1 274
  • 我被黑心中介騙來泰國打工搏予, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人弧轧。 一個月前我還...
    沈念sama閱讀 49,364評論 3 379
  • 正文 我出身青樓雪侥,卻偏偏與公主長得像,于是被迫代替她去往敵國和親精绎。 傳聞我的和親對象是個殘疾皇子校镐,可洞房花燭夜當晚...
    茶點故事閱讀 45,926評論 2 361

推薦閱讀更多精彩內(nèi)容