kafka順序消息踩坑記錄

Kafka順序消息消息

1.消息發(fā)送的api

    public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
        return this.doSend(producerRecord);
    }

    public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
        return this.doSend(producerRecord);
    }

    public ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, (Object)null, data);
        return this.doSend(producerRecord);
    }

    public ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, key, data);
        return this.doSend(producerRecord);
    }

2.KafkaProducer.class 封裝獲取partition方法,優(yōu)先使用傳入的partition

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        //優(yōu)先返回傳入的partition
        return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

3.DefaultPartitioner.class 封裝獲取partition方法搅幅,優(yōu)先使用key的murmur2算法的hash值對partitionCount取模条辟,其次使用本地原子類計數(shù)器自增值對partitionCount取模

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        //根據(jù)topic獲取partitionCount
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = this.nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            //指定了key披诗,則根據(jù)key的hash來取模選取partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger((new Random()).nextInt());
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }

        return counter.getAndIncrement();
    }
  • Utils.class

|

<pre> public static int toPositive(int number) {
return number & 2147483647;
}

public static int murmur2(byte[] data) {
    int length = data.length;
    int seed = -1756908916;
    int m = 1540483477;
    int r = true;
    int h = seed ^ length;
    int length4 = length / 4;

    for(int i = 0; i < length4; ++i) {
        int i4 = i * 4;
        int k = (data[i4 + 0] & 255) + ((data[i4 + 1] & 255) << 8) + ((data[i4 + 2] & 255) << 16) + ((data[i4 + 3] & 255) << 24);
        k *= 1540483477;
        k ^= k >>> 24;
        k *= 1540483477;
        h *= 1540483477;
        h ^= k;
    }

    switch(length % 4) {
    case 3:
        h ^= (data[(length & -4) + 2] & 255) << 16;
    case 2:
        h ^= (data[(length & -4) + 1] & 255) << 8;
    case 1:
        h ^= data[length & -4] & 255;
        h *= 1540483477;
    default:
        h ^= h >>> 13;
        h *= 1540483477;
        h ^= h >>> 15;
        return h;
    }
}</pre>

|

踩坑記錄

public ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data) 

使用上述方法時拨匆,partition傳參不能為Integer類型籍救,生成serializedKey時出會出現(xiàn)類型轉(zhuǎn)換異常,導(dǎo)致消息發(fā)送失敗

 private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            // first make sure the metadata for the topic is available
            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;
            //此處keySerializer會做類型轉(zhuǎn)換污茵,傳入Integer會報錯
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer");
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer");
            }

            int partition = partition(record, serializedKey, serializedValue, cluster);
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
            ensureValidRecordSize(serializedSize);
            tp = new TopicPartition(record.topic(), partition);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末樱报,一起剝皮案震驚了整個濱河市葬项,隨后出現(xiàn)的幾起案子腔长,更是在濱河造成了極大的恐慌畏鼓,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,376評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異哩盲,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)镇饮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評論 2 385
  • 文/潘曉璐 我一進(jìn)店門广鳍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蝶溶,你說我怎么就攤上這事嗜历。” “怎么了抖所?”我有些...
    開封第一講書人閱讀 156,966評論 0 347
  • 文/不壞的土叔 我叫張陵梨州,是天一觀的道長。 經(jīng)常有香客問我田轧,道長暴匠,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,432評論 1 283
  • 正文 為了忘掉前任傻粘,我火速辦了婚禮每窖,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘弦悉。我一直安慰自己窒典,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,519評論 6 385
  • 文/花漫 我一把揭開白布稽莉。 她就那樣靜靜地躺著崇败,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上后室,一...
    開封第一講書人閱讀 49,792評論 1 290
  • 那天缩膝,我揣著相機(jī)與錄音,去河邊找鬼岸霹。 笑死疾层,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的贡避。 我是一名探鬼主播痛黎,決...
    沈念sama閱讀 38,933評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼刮吧!你這毒婦竟也來了湖饱?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,701評論 0 266
  • 序言:老撾萬榮一對情侶失蹤杀捻,失蹤者是張志新(化名)和其女友劉穎井厌,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體致讥,經(jīng)...
    沈念sama閱讀 44,143評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡仅仆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,488評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了垢袱。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片墓拜。...
    茶點故事閱讀 38,626評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖请契,靈堂內(nèi)的尸體忽然破棺而出咳榜,到底是詐尸還是另有隱情,我是刑警寧澤爽锥,帶...
    沈念sama閱讀 34,292評論 4 329
  • 正文 年R本政府宣布涌韩,位于F島的核電站,受9級特大地震影響救恨,放射性物質(zhì)發(fā)生泄漏贸辈。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,896評論 3 313
  • 文/蒙蒙 一肠槽、第九天 我趴在偏房一處隱蔽的房頂上張望擎淤。 院中可真熱鬧,春花似錦秸仙、人聲如沸嘴拢。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽席吴。三九已至赌结,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間孝冒,已是汗流浹背柬姚。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留庄涡,地道東北人量承。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像穴店,于是被迫代替她去往敵國和親撕捍。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,494評論 2 348

推薦閱讀更多精彩內(nèi)容