kafka_03_Kafka消息發(fā)送

Kafka消息發(fā)送

1. 構(gòu)建ProducerRecord對象

該類如下:

public class ProducerRecord<K, V> {

    //The topic the record will be appended to
    private final String topic;
    //The partition to which the record should be sent
    private final Integer partition;
    //the headers that will be included in the record
    private final Headers headers;
    //The key that will be included in the record
    private final K key;
    //The record contents
    private final V value;
    //The timestamp of the record, in milliseconds since epoch. If null, the producer will assign the timestamp using System.currentTimeMillis().
    private final Long timestamp;
    
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        if (partition != null && partition < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = new RecordHeaders(headers);
    }
    
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        this(topic, partition, timestamp, key, value, null);
    }
    
    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
        this(topic, partition, null, key, value, headers);
    }
    
    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    }
    
    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    }
    
    //Create a record with no key
    public ProducerRecord(String topic, V value) {
        this(topic, null, null, null, value, null);
    }
    //省略getters和setters
}

這里有5個構(gòu)造方法咨演,但是最后用的就是其中的一個闸昨。實(shí)際應(yīng)用中,構(gòu)建ProducerRecord對象是非常頻繁的操作薄风。

2. 發(fā)送消息

發(fā)送消息有三種模式饵较,發(fā)后即忘(fire-and-forget), 同步(sync),異步(async)

2.1 fire-and-forget(上一篇博客介紹的就是發(fā)后即忘)

 producer.send(record);

特點(diǎn): 效率高,可靠性差

2.2 同步

代碼如下:

package com.ghq.kafka.server;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * 消息的發(fā)送
 */
public class ProducerSendMessage {

    public static final String brokerList = "192.168.52.135:9092";
    public static final String topic = "topic-demo";

    public static Properties initProperties(){
        Properties prop = new Properties();
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        prop.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");
        /**
         *
         * 配置重試次數(shù)村刨,這里重試10次告抄,重試10次之后如果消息還是發(fā)送不成功,那么還是會拋出異常
         * 那些類可以重試呢嵌牺?
         * org.apache.kafka.common.errors.RetriableException 及其子類
         *
         */
        prop.put(ProducerConfig.RETRIES_CONFIG,10);
        return prop;
    }

    public static void sync() {

        Properties prop = initProperties();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");

        //3. 發(fā)送消息
        while (true){
            /**
             * send方法本身就是異步的
             */
            Future<RecordMetadata> future = producer.send(record);

            try {
                /**
                 * get方法是阻塞的
                 * 這里返回 RecordMetadata打洼,包含了發(fā)送消息的元數(shù)據(jù)信息
                 */
                RecordMetadata metadata = future.get();
                System.out.println("topic:"+metadata.topic());
                System.out.println("partition:"+metadata.partition());
                System.out.println("offset:"+metadata.offset());
                System.out.println("hasTimestamp:"+metadata.hasTimestamp());
                System.out.println("-----------------------------------------");
                Thread.sleep(1000);
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }

        }
        //4. 關(guān)閉資源
        //producer.close();

    }
}

特點(diǎn):性能差,可靠性高

注:什么異衬娲猓可以重試募疮?RetriableException

RetriableException.jpg

2.3 異步

public static void async() {
        Properties prop = initProperties();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");

        Future<RecordMetadata> future = producer.send(record, new Callback() {

            /**
             * metadata 和 exception 互斥
             * 消息發(fā)送成功:metadata != null exception == null
             * 消息發(fā)送失敗:metadata == null exception != null
             */
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {

                if (exception != null) {
                    System.out.println("消息發(fā)送失斊У:"+metadata);
                }else {
                    System.out.println("消息發(fā)送成功:"+metadata);
                }
            }
        });
        
        //這里采用lambda表達(dá)式
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                System.out.println("消息2發(fā)送失敯⑴ā:"+metadata);
            }else {
                System.out.println("消息2發(fā)送成功:"+metadata);
            }
        });
        producer.close();
    }

輸出結(jié)果如下:

消息1發(fā)送成功:topic-demo-0@22
消息2發(fā)送成功:topic-demo-2@24

特點(diǎn):性能 :同步 < 異步 < 發(fā)后即忘,可靠性:同步 > 異步 > 發(fā)后即忘

結(jié)束蹋绽。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末芭毙,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子卸耘,更是在濱河造成了極大的恐慌退敦,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蚣抗,死亡現(xiàn)場離奇詭異侈百,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進(jìn)店門钝域,熙熙樓的掌柜王于貴愁眉苦臉地迎上來讽坏,“玉大人,你說我怎么就攤上這事例证÷肺兀” “怎么了?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵织咧,是天一觀的道長拣宰。 經(jīng)常有香客問我,道長,這世上最難降的妖魔是什么否纬? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任昔逗,我火速辦了婚禮,結(jié)果婚禮上舍悯,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好绿渣,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著燕耿,像睡著了一般中符。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上誉帅,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天淀散,我揣著相機(jī)與錄音,去河邊找鬼蚜锨。 笑死档插,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的亚再。 我是一名探鬼主播郭膛,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼氛悬!你這毒婦竟也來了则剃?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤如捅,失蹤者是張志新(化名)和其女友劉穎棍现,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體伪朽,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡轴咱,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片朴肺。...
    茶點(diǎn)故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡窖剑,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出戈稿,到底是詐尸還是另有隱情西土,我是刑警寧澤,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布鞍盗,位于F島的核電站需了,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏般甲。R本人自食惡果不足惜肋乍,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望敷存。 院中可真熱鬧墓造,春花似錦、人聲如沸锚烦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽涮俄。三九已至蛉拙,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間彻亲,已是汗流浹背孕锄。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留睹栖,地道東北人硫惕。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像野来,于是被迫代替她去往敵國和親恼除。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 在協(xié)作越來越重要的今天曼氛,需要高效豁辉,快速的建立關(guān)系和信任,最有效的方法就是樹立個人品牌舀患,以下10項(xiàng)原則也許有所幫助徽级。...
    愛到成傷閱讀 513評論 0 0
  • 首先主布局文件是SwipeRefreshLayout+ListView 加載更多作為腳布局添加到ListView ...
    碼圣閱讀 511評論 2 5
  • 簡單吃頓“農(nóng)家菜”已經(jīng)無法在競爭中難以站穩(wěn)腳。 較為典型的是聊浅,不少地方“農(nóng)家樂”已從最初提供吃農(nóng)家菜餐抢、住農(nóng)家屋等簡...
    6959cef3343c閱讀 496評論 0 0
  • 即是建立多角度的框架 從多個角度去解決一個問題 問自己有哪幾種方法现使,其中哪種方法更加簡單方便
    老菜頭_dca8閱讀 118評論 0 0
  • 新時代文明實(shí)踐碳锈,我為山村小伙伴送溫暖 今天上午,作為孩子班級的家委會主任欺抗,組織實(shí)驗(yàn)小學(xué)2015級8班的學(xué)生售碳、家長一...
    見山聞道閱讀 275評論 0 4