Apache Kafka教程 之 Apache Kafka - 生產(chǎn)者示例
原文地址: http://blogxinxiucan.sh1.newtouch.com/2017/07/13/Apache-Kafka-生產(chǎn)者示例/
Apache Kafka - 生產(chǎn)者示例
讓我們創(chuàng)建一個使用Java客戶端發(fā)布和使用消息的應(yīng)用程序。Kafka生產(chǎn)者客戶端由以下API組成淫茵。
KafkaProducer API
讓我們了解本節(jié)中最重要的一套Kafka生產(chǎn)者API爪瓜。KafkaProducer API的核心部分是KafkaProducer類。
- KafkaProducer類提供了一個選項(xiàng)匙瘪,可以使用以下方法在其構(gòu)造函數(shù)中連接Kafka代理铆铆。
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);
- ProducerRecord - 生產(chǎn)者管理一個等待發(fā)送的記錄緩沖區(qū)。
- 回調(diào) - 當(dāng)服務(wù)器確認(rèn)記錄時執(zhí)行的用戶提供的回調(diào)(null表示無回調(diào))辆苔。
- KafkaProducer類提供了一種刷新方法算灸,以確保所有先前發(fā)送的消息已經(jīng)實(shí)際完成。flush方法的語法如下 public void flush()
- KafkaProducer類提供了partitionFor方法驻啤,它有助于獲取給定主題的分區(qū)元數(shù)據(jù)。這可以用于自定義分區(qū)荐吵。此方法的簽名如下
public Map metrics()
它返回生產(chǎn)者維護(hù)的內(nèi)部指標(biāo)圖骑冗。 - public void close() KafkaProducer類提供了緊密的方法塊,直到所有先前發(fā)送的請求完成為止先煎。
生產(chǎn)者API
Producer API的中心部分是Producer類贼涩。生產(chǎn)者類提供了通過以下方法在其構(gòu)造函數(shù)中連接Kafka代理的選項(xiàng)。
生產(chǎn)者類
生產(chǎn)者類提供發(fā)送方法薯蝎,使用以下簽名將消息發(fā)送到單個或多個主題遥倦。
public void send(KeyedMessaget<k,v> message)
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);
有兩種類型的生產(chǎn)者 - 同步和異步。
同樣的API配置也適用于同步生成器。它們之間的區(qū)別是同步生成器直接發(fā)送消息袒哥,但是在后臺發(fā)送消息缩筛。當(dāng)您想要更高的吞吐量時,Async生產(chǎn)者是首選堡称。在以前的版本瞎抛,如0.8,一個異步生成器沒有回調(diào)send()來注冊錯誤處理程序却紧。這僅在當(dāng)前版本的0.9中可用桐臊。
public void close()
生產(chǎn)者類提供了關(guān)閉與所有kafka兄弟的生產(chǎn)者池連接的緊密方法。
配置設(shè)置
Producer API的主要配置設(shè)置列在下表中晓殊,以便更好地了解 -
S.No | 配置設(shè)置和說明 |
---|---|
1 client.id | 識別生產(chǎn)者應(yīng)用程序 |
2 producer.type | 同步或異步 |
3 Acks | acks配置控制生產(chǎn)者請求下的條件被完全匹配断凶。 |
4 retries | 如果生產(chǎn)者請求失敗,則會自動重試具體值巫俺。 |
5 bootstrap.servers | 經(jīng)紀(jì)人的引導(dǎo)列表认烁。 |
6 linger.ms | 如果要減少請求數(shù),可以將linger.ms設(shè)置為大于某值的值识藤。 |
7 key.serializer | 串行器接口的關(guān)鍵砚著。 |
8 value.serializer | 串行器接口的值。 |
9 batch.size | 緩沖區(qū)大小痴昧。 |
10 buffer.memory | 控制生產(chǎn)者可用于緩沖的總內(nèi)存量稽穆。 |
ProducerRecord API
ProducerRecord是發(fā)送給Kafka cluster.ProducerRecord
類構(gòu)造函數(shù)的一個鍵/值對,用于使用以下簽名創(chuàng)建具有分區(qū)赶撰,鍵和值對的記錄舌镶。
public ProducerRecord (string topic, int partition, k key, v value)
Topic - 將附加到記錄的用戶定義的主題名稱。
Partition - 分區(qū)計數(shù)
Key - 將包括在記錄中的關(guān)鍵豪娜。
-
Value - 記錄內(nèi)容
public ProducerRecord (string topic, k key, v value)
ProducerRecord類構(gòu)造函數(shù)用于創(chuàng)建具有鍵惰匙,值對和無分區(qū)的記錄源武。
Topic- 創(chuàng)建主題以分配記錄。
Key - 鍵記錄。
-
Value - 記錄內(nèi)容吠卷。
public ProducerRecord (string topic, v value)
ProducerRecord類創(chuàng)建一個沒有分區(qū)和鍵的記錄。
- Topic- 創(chuàng)建主題虱痕。
- Value - 記錄內(nèi)容役拴。
ProducerRecord類方法列在下表中:
S.No | 類方法和描述 |
---|---|
1 | public string topic() 主題將附加到記錄。 |
2 | public K key()將包含在記錄中的關(guān)鍵字挎狸。如果沒有這樣的鍵扣汪,null將在這里重新轉(zhuǎn)換。 |
3 | public V value()記錄內(nèi)容锨匆。 |
4 | partition() 記錄的分區(qū)數(shù) |
SimpleProducer應(yīng)用程序
在創(chuàng)建應(yīng)用程序之前崭别,首先啟動ZooKeeper和Kafka代理,然后使用create topic命令在Kafka代理中創(chuàng)建自己的主題。之后茅主,創(chuàng)建一個名為SimplepleProducer.java
的java類舞痰,并鍵入以下代碼。
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
public static void main(String[] args) throws Exception{
// Check arguments length value
if(args.length == 0){
System.out.println("Enter topic name”);
return;
}
//Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “l(fā)ocalhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully”);
producer.close();
}
}
編譯 - 可以使用以下命令編譯應(yīng)用程序暗膜。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
執(zhí)行 - 可以使用以下命令執(zhí)行應(yīng)用程序匀奏。
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>
產(chǎn)量
Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10
簡單的消費(fèi)者例子
到目前為止,我們已經(jīng)創(chuàng)建了一個生產(chǎn)者來發(fā)送消息到Kafka集群⊙眩現(xiàn)在讓我們創(chuàng)建一個消費(fèi)者來消費(fèi)kafka群集的消息娃善。KafkaConsumer API用于消費(fèi)來自Kafka群集的消息。KafkaConsumer類構(gòu)造函數(shù)定義如下瑞佩。
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs - 返回消費(fèi)者配置的映射聚磺。
KafkaConsumer
類具有下表中列出的以下重要方法。
S.No | 方法和說明 |
---|---|
1 | public java.util.Set <TopicPar-tition> assignment()獲取當(dāng)前由con-sumer分配的分區(qū)集炬丸。 |
2 | public string subscription()訂閱給定的主題列表以獲得動態(tài)簽名的分區(qū)瘫寝。 |
3 | public void sub-scribe(java.util.List <java.lang.String> topics,ConsumerRe-balanceListener listener)訂閱給定的主題列表以獲得動態(tài)簽名的分區(qū)稠炬。 |
4 | public void unsubscribe()從給定的分區(qū)列表中取消訂閱主題焕阿。 |
5 | public void sub-scribe(java.util.List <java.lang.String> topics)訂閱給定的主題列表以獲得動態(tài)簽名的分區(qū)。如果給定的主題列表為空首启,則它將被視為與unsubscribe()相同暮屡。 |
6 | public void sub-scribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener)參數(shù)模式是指以正則表達(dá)式格式的訂閱模式毅桃,并且listener參數(shù)從訂閱模式獲取通知褒纲。 |
7 | public void as-sign(java.util.List <TopicParti-tion>分區(qū))手動分配給客戶的分區(qū)列表。 |
8 | poll()獲取使用其中一個訂閱/分配API指定的主題或分區(qū)的數(shù)據(jù)钥飞。如果在輪詢數(shù)據(jù)之前沒有訂閱主題莺掠,這將返回錯誤。 |
9 | public void commitSync()針對所有主題和分區(qū)的劃分列表读宙,對最后一次poll()返回的提交偏移量彻秆。相同的操作將應(yīng)用于commitAsyn()。 |
10 | public void seek(TopicPartition partition结闸,long offset)獲取消費(fèi)者將在下一個poll()方法上使用的當(dāng)前偏移值掖棉。 |
11 | public void resume()恢復(fù)已暫停的分區(qū)。 |
12 | public void wakeup()喚醒消費(fèi)者膀估。 |
ConsumerRecord API
ConsumerRecord API
用于從Kafka集群接收記錄。該API由一個主題名稱耻讽,分區(qū)號察纯,從其接收的記錄和指向Kafka分區(qū)中的記錄的偏移量組成。ConsumerRecord
類用于創(chuàng)建具有特定主題名稱,分區(qū)計數(shù)和<key饼记,value>對的消費(fèi)者記錄香伴。它具有以下簽名。
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
- Topic - 從kafka群集收到的消費(fèi)者記錄的主題名稱具则。
- Partition - 主題分區(qū)即纲。
- Key - 記錄的鍵,如果沒有鍵存在null將被返回博肋。
- Value - 記錄內(nèi)容低斋。
ConsumerRecords API
ConsumerRecords API充當(dāng)ConsumerRecord的容器。該API用于為特定主題保留每個分區(qū)的ConsumerRecord列表匪凡。其構(gòu)造函數(shù)定義如下膊畴。
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List <Consumer-Record>K,V>>> records)
- TopicPartition - 返回特定主題的分區(qū)映射。
- 記錄 - ConsumerRecord的返回列表病游。
ConsumerRecords類定義了以下方法唇跨。
S.No | 方法和說明 |
---|---|
1 | public int count() 所有主題的記錄數(shù)。 |
2 | public set partitions() 該記錄集中的數(shù)據(jù)集(如果沒有數(shù)據(jù)被返回衬衬,則該集合為空)买猖。 |
3 | public Iterator iterator() 迭代器使您能夠遍歷集合,獲取或重新移動元素滋尉。 |
4 | 公開列表記錄() 獲取給定分區(qū)的記錄列表玉控。 |
配置設(shè)置
Consumer客戶端API主配置設(shè)置的配置設(shè)置如下所示:
S.No | 設(shè)置和說明 |
---|---|
1 | bootstrap.servers 經(jīng)紀(jì)人列表。 |
2 | group.id 將一個消費(fèi)者分配給一個組兼砖。 |
3 | enable.auto.commit 如果值為true奸远,則啟用自動提交偏移量,否則不提交讽挟。 |
4 | auto.commit.interval.ms 更新消耗的偏移量返回給ZooKeeper的頻率懒叛。 |
5 | session.timeout.ms 表示Kafka將在放棄并繼續(xù)使用消息之前等待ZooKeeper響應(yīng)請求(讀取或?qū)懭耄┒嗌俸撩搿?/td> |
SimpleConsumer應(yīng)用程序
生產(chǎn)者應(yīng)用步驟在此保持不變。首先耽梅,啟動您的ZooKeeper和Kafka經(jīng)紀(jì)人薛窥。然后使用名為SimpleCon-sumer.java
的java類創(chuàng)建一個SimpleConsumer
應(yīng)用程序,并鍵入以下代碼眼姐。
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName))
//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
編譯 - 可以使用以下命令編譯應(yīng)用程序诅迷。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
執(zhí)行 -可以使用以下命令執(zhí)行應(yīng)用程序
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>
輸入 - 打開生產(chǎn)者CLI并向主題發(fā)送一些消息。您可以將smple輸入作為“您好消費(fèi)者”众旗。
輸出 - 以下是輸出罢杉。
Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer