Spring-Kafka集成-發(fā)送消息

使用KafkaTemplate發(fā)送消息


定義:

KafkaTemplate包裝了一個(gè)生產(chǎn)者蒋困,并提供方便的方法用于發(fā)送數(shù)據(jù)給Kafka topic麸俘。

使用KafkaTemplate方法前需要配置一個(gè)Producer的工廠類,并將它作為Kafkatemplate的構(gòu)造器參數(shù)傳入構(gòu)造實(shí)例狞山。

@Bean

public ProducerFactory<Integer, String> producerFactory() {

????return new DefaultKafkaProducerFactory<>(producerConfigs());

}

@Bean

public Map<String, Object> producerConfigs() {

????Map<String, Object> props = new HashMap<>();

????props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

????props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

????props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

????// See https://kafka.apache.org/documentation/#producerconfigs for more properties

????return props;

}

@Bean

public KafkaTemplate<Integer, String> kafkaTemplate() {

????return new KafkaTemplate<Integer, String>(producerFactory());

}

可用API以及說明:

ListenableFuture<SendResult<K, V>> sendDefault(V data);

ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, V data);

ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

ListenableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

// Flush the producer.

void flush();

interface ProducerCallback<K, V, T> {T doInKafka(Producer<K, V> producer);}

說明:

1码泞、?sendDefault方法使用前需要配置一個(gè)默認(rèn)的 topic給template吱涉;

2逛薇、?方法中入?yún)⒑衪imestamp時(shí)間戳的捺疼,會將給時(shí)間戳存儲在記錄中;

3永罚、?metrics和partitionsFor方法對應(yīng)Producer類的同名方法啤呼。execute方法提供了直接訪問Producer的能力;

4呢袱、 當(dāng)使用一個(gè)方法入?yún)⒑蠱essage<?>參數(shù)是官扣,topic、partition還有其他關(guān)鍵信息都可以在這個(gè)message header中獲得羞福;

? KafkaHeaders.TOPIC

? KafkaHeaders.PARTITION_ID

? KafkaHeaders.MESSAGE_KEY

? KafkaHeaders.TIMESTAMP

5惕蹄、?如果想要獲取異步的回調(diào)函數(shù)信息,其中帶有成功以及失敗的信息的話坯临,可以配置KafkaTemplate以及ProducerListener;

public interface ProducerListener<K, V> {

void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);

void onError(String topic, Integer partition, K key, V value, Exception exception);

boolean isInterestedInSuccess();

}

6恋昼、?默認(rèn)情況下看靠,Template配置了 LoggingProducerListenner,該類只記錄錯(cuò)誤信息液肌,發(fā)送成功信息不做記錄挟炬;

7、?onSuccess只當(dāng)isInterestedInsuccess返回True時(shí)被調(diào)用嗦哆;

8谤祖、 出于方便性考慮,當(dāng)你只想實(shí)現(xiàn)其中的一種方法是老速,抽象類ProducerListenerAdapter可以實(shí)現(xiàn)該需求粥喜。對于?isInterestedInSuccess它會返回false值;

9橘券、 發(fā)送數(shù)據(jù)后返回ListenableFuture<SendResult>额湘,可以注冊一個(gè)回調(diào)函數(shù)去異步接收發(fā)送結(jié)果數(shù)據(jù)信息卿吐;

ListenableFuture<SendResult<Integer, String>> future = template.send("foo");

future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {

????@Override

????public void onSuccess(SendResult<Integer, String> result) {

????????//成功代碼處理邏輯

????}

????@Override

????public void onFailure(Throwable ex) {

????????????//失敗代碼處理邏輯? ??

????}

});

SendResult有兩個(gè)屬性,分別為 ProducerRecord以及RecordMetaData锋华。

如果想要同步獲取發(fā)送結(jié)果數(shù)據(jù)嗡官,可以通過future的get方法獲取毯焕;

template中存在一個(gè)autoFlash構(gòu)造函數(shù)衍腥,該參數(shù)設(shè)為true可以導(dǎo)致每次發(fā)送都會調(diào)用flush()方法,可能降低程序性能纳猫。

代碼實(shí)例:

異步方法:

public void sendToKafka(final MyOutputData data) {

????final ProducerRecord<String, String> record = createRecord(data);

????ListenableFuture<SendResult<Integer, String>> future = template.send(record);

????future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {

????????@Override

????????public void onSuccess(SendResult<Integer, String> result) {

????????????handleSuccess(data);

????????}

????????@Override

????????public void onFailure(Throwable ex) {

????????????handleFailure(data, record, ex);

????????}

????});

}

同步方法:

public void sendToKafka(final MyOutputData data) {

????final ProducerRecord<String, String> record = createRecord(data);

????try {

????????template.send(record).get(10, TimeUnit.SECONDS);

????handleSuccess(data);

????}

????catch (ExecutionException e) {

????????handleFailure(data, record, e.getCause());

????}

????catch (TimeoutException | InterruptedException e) {

????????handleFailure(data, record, e);

????}

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末婆咸,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子续担,更是在濱河造成了極大的恐慌擅耽,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件物遇,死亡現(xiàn)場離奇詭異乖仇,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)询兴,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進(jìn)店門乃沙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人诗舰,你說我怎么就攤上這事警儒。” “怎么了眶根?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵蜀铲,是天一觀的道長。 經(jīng)常有香客問我属百,道長记劝,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任族扰,我火速辦了婚禮厌丑,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘渔呵。我一直安慰自己怒竿,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布扩氢。 她就那樣靜靜地躺著耕驰,像睡著了一般。 火紅的嫁衣襯著肌膚如雪录豺。 梳的紋絲不亂的頭發(fā)上耍属,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天托嚣,我揣著相機(jī)與錄音,去河邊找鬼厚骗。 笑死示启,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的领舰。 我是一名探鬼主播夫嗓,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼冲秽!你這毒婦竟也來了舍咖?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤锉桑,失蹤者是張志新(化名)和其女友劉穎排霉,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體民轴,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡攻柠,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了后裸。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瑰钮。...
    茶點(diǎn)故事閱讀 38,664評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖微驶,靈堂內(nèi)的尸體忽然破棺而出浪谴,到底是詐尸還是另有隱情,我是刑警寧澤因苹,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布苟耻,位于F島的核電站,受9級特大地震影響扶檐,放射性物質(zhì)發(fā)生泄漏凶杖。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一蘸秘、第九天 我趴在偏房一處隱蔽的房頂上張望官卡。 院中可真熱鬧蝗茁,春花似錦醋虏、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至饭寺,卻和暖如春阻课,著一層夾襖步出監(jiān)牢的瞬間叫挟,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工限煞, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留抹恳,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓署驻,卻偏偏與公主長得像奋献,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子旺上,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評論 2 349

推薦閱讀更多精彩內(nèi)容