使用KafkaTemplate發(fā)送消息
定義:
KafkaTemplate包裝了一個(gè)生產(chǎn)者蒋困,并提供方便的方法用于發(fā)送數(shù)據(jù)給Kafka topic麸俘。
使用KafkaTemplate方法前需要配置一個(gè)Producer的工廠類,并將它作為Kafkatemplate的構(gòu)造器參數(shù)傳入構(gòu)造實(shí)例狞山。
@Bean
public ProducerFactory<Integer, String> producerFactory() {
????return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
????Map<String, Object> props = new HashMap<>();
????props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
????props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
????props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
????// See https://kafka.apache.org/documentation/#producerconfigs for more properties
????return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
????return new KafkaTemplate<Integer, String>(producerFactory());
}
可用API以及說明:
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {T doInKafka(Producer<K, V> producer);}
說明:
1码泞、?sendDefault方法使用前需要配置一個(gè)默認(rèn)的 topic給template吱涉;
2逛薇、?方法中入?yún)⒑衪imestamp時(shí)間戳的捺疼,會將給時(shí)間戳存儲在記錄中;
3永罚、?metrics和partitionsFor方法對應(yīng)Producer類的同名方法啤呼。execute方法提供了直接訪問Producer的能力;
4呢袱、 當(dāng)使用一個(gè)方法入?yún)⒑蠱essage<?>參數(shù)是官扣,topic、partition還有其他關(guān)鍵信息都可以在這個(gè)message header中獲得羞福;
? KafkaHeaders.TOPIC
? KafkaHeaders.PARTITION_ID
? KafkaHeaders.MESSAGE_KEY
? KafkaHeaders.TIMESTAMP
5惕蹄、?如果想要獲取異步的回調(diào)函數(shù)信息,其中帶有成功以及失敗的信息的話坯临,可以配置KafkaTemplate以及ProducerListener;
public interface ProducerListener<K, V> {
void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);
void onError(String topic, Integer partition, K key, V value, Exception exception);
boolean isInterestedInSuccess();
}
6恋昼、?默認(rèn)情況下看靠,Template配置了 LoggingProducerListenner,該類只記錄錯(cuò)誤信息液肌,發(fā)送成功信息不做記錄挟炬;
7、?onSuccess只當(dāng)isInterestedInsuccess返回True時(shí)被調(diào)用嗦哆;
8谤祖、 出于方便性考慮,當(dāng)你只想實(shí)現(xiàn)其中的一種方法是老速,抽象類ProducerListenerAdapter可以實(shí)現(xiàn)該需求粥喜。對于?isInterestedInSuccess它會返回false值;
9橘券、 發(fā)送數(shù)據(jù)后返回ListenableFuture<SendResult>额湘,可以注冊一個(gè)回調(diào)函數(shù)去異步接收發(fā)送結(jié)果數(shù)據(jù)信息卿吐;
ListenableFuture<SendResult<Integer, String>> future = template.send("foo");
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
????@Override
????public void onSuccess(SendResult<Integer, String> result) {
????????//成功代碼處理邏輯
????}
????@Override
????public void onFailure(Throwable ex) {
????????????//失敗代碼處理邏輯? ??
????}
});
SendResult有兩個(gè)屬性,分別為 ProducerRecord以及RecordMetaData锋华。
如果想要同步獲取發(fā)送結(jié)果數(shù)據(jù)嗡官,可以通過future的get方法獲取毯焕;
template中存在一個(gè)autoFlash構(gòu)造函數(shù)衍腥,該參數(shù)設(shè)為true可以導(dǎo)致每次發(fā)送都會調(diào)用flush()方法,可能降低程序性能纳猫。
代碼實(shí)例:
異步方法:
public void sendToKafka(final MyOutputData data) {
????final ProducerRecord<String, String> record = createRecord(data);
????ListenableFuture<SendResult<Integer, String>> future = template.send(record);
????future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
????????@Override
????????public void onSuccess(SendResult<Integer, String> result) {
????????????handleSuccess(data);
????????}
????????@Override
????????public void onFailure(Throwable ex) {
????????????handleFailure(data, record, ex);
????????}
????});
}
同步方法:
public void sendToKafka(final MyOutputData data) {
????final ProducerRecord<String, String> record = createRecord(data);
????try {
????????template.send(record).get(10, TimeUnit.SECONDS);
????handleSuccess(data);
????}
????catch (ExecutionException e) {
????????handleFailure(data, record, e.getCause());
????}
????catch (TimeoutException | InterruptedException e) {
????????handleFailure(data, record, e);
????}
}