Kafka消息發(fā)送
1. 構(gòu)建ProducerRecord對象
該類如下:
public class ProducerRecord<K, V> {
//The topic the record will be appended to
private final String topic;
//The partition to which the record should be sent
private final Integer partition;
//the headers that will be included in the record
private final Headers headers;
//The key that will be included in the record
private final K key;
//The record contents
private final V value;
//The timestamp of the record, in milliseconds since epoch. If null, the producer will assign the timestamp using System.currentTimeMillis().
private final Long timestamp;
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null.");
if (timestamp != null && timestamp < 0)
throw new IllegalArgumentException(
String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
if (partition != null && partition < 0)
throw new IllegalArgumentException(
String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.headers = new RecordHeaders(headers);
}
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
this(topic, partition, timestamp, key, value, null);
}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
this(topic, partition, null, key, value, headers);
}
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}
public ProducerRecord(String topic, K key, V value) {
this(topic, null, null, key, value, null);
}
//Create a record with no key
public ProducerRecord(String topic, V value) {
this(topic, null, null, null, value, null);
}
//省略getters和setters
}
這里有5個構(gòu)造方法咨演,但是最后用的就是其中的一個闸昨。實(shí)際應(yīng)用中,構(gòu)建ProducerRecord對象是非常頻繁的操作薄风。
2. 發(fā)送消息
發(fā)送消息有三種模式饵较,發(fā)后即忘(fire-and-forget), 同步(sync),異步(async)
2.1 fire-and-forget(上一篇博客介紹的就是發(fā)后即忘)
producer.send(record);
特點(diǎn): 效率高,可靠性差
2.2 同步
代碼如下:
package com.ghq.kafka.server;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* 消息的發(fā)送
*/
public class ProducerSendMessage {
public static final String brokerList = "192.168.52.135:9092";
public static final String topic = "topic-demo";
public static Properties initProperties(){
Properties prop = new Properties();
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
prop.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");
/**
*
* 配置重試次數(shù)村刨,這里重試10次告抄,重試10次之后如果消息還是發(fā)送不成功,那么還是會拋出異常
* 那些類可以重試呢嵌牺?
* org.apache.kafka.common.errors.RetriableException 及其子類
*
*/
prop.put(ProducerConfig.RETRIES_CONFIG,10);
return prop;
}
public static void sync() {
Properties prop = initProperties();
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");
//3. 發(fā)送消息
while (true){
/**
* send方法本身就是異步的
*/
Future<RecordMetadata> future = producer.send(record);
try {
/**
* get方法是阻塞的
* 這里返回 RecordMetadata打洼,包含了發(fā)送消息的元數(shù)據(jù)信息
*/
RecordMetadata metadata = future.get();
System.out.println("topic:"+metadata.topic());
System.out.println("partition:"+metadata.partition());
System.out.println("offset:"+metadata.offset());
System.out.println("hasTimestamp:"+metadata.hasTimestamp());
System.out.println("-----------------------------------------");
Thread.sleep(1000);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
//4. 關(guān)閉資源
//producer.close();
}
}
特點(diǎn):性能差,可靠性高
注:什么異衬娲猓可以重試募疮?RetriableException
RetriableException.jpg
2.3 異步
public static void async() {
Properties prop = initProperties();
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");
Future<RecordMetadata> future = producer.send(record, new Callback() {
/**
* metadata 和 exception 互斥
* 消息發(fā)送成功:metadata != null exception == null
* 消息發(fā)送失敗:metadata == null exception != null
*/
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("消息發(fā)送失斊У:"+metadata);
}else {
System.out.println("消息發(fā)送成功:"+metadata);
}
}
});
//這里采用lambda表達(dá)式
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.out.println("消息2發(fā)送失敯⑴ā:"+metadata);
}else {
System.out.println("消息2發(fā)送成功:"+metadata);
}
});
producer.close();
}
輸出結(jié)果如下:
消息1發(fā)送成功:topic-demo-0@22
消息2發(fā)送成功:topic-demo-2@24
特點(diǎn):性能 :同步 < 異步 < 發(fā)后即忘,可靠性:同步 > 異步 > 發(fā)后即忘
結(jié)束蹋绽。