Apache Kafka -5 生產(chǎn)者示例

Apache Kafka教程 之 Apache Kafka - 生產(chǎn)者示例

http://blogxinxiucan.sh1.newtouch.com/

原文地址: http://blogxinxiucan.sh1.newtouch.com/2017/07/13/Apache-Kafka-生產(chǎn)者示例/

Apache Kafka - 生產(chǎn)者示例

讓我們創(chuàng)建一個使用Java客戶端發(fā)布和使用消息的應(yīng)用程序。Kafka生產(chǎn)者客戶端由以下API組成淫茵。

KafkaProducer API

讓我們了解本節(jié)中最重要的一套Kafka生產(chǎn)者API爪瓜。KafkaProducer API的核心部分是KafkaProducer類。

  • KafkaProducer類提供了一個選項(xiàng)匙瘪,可以使用以下方法在其構(gòu)造函數(shù)中連接Kafka代理铆铆。

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);

  • ProducerRecord - 生產(chǎn)者管理一個等待發(fā)送的記錄緩沖區(qū)。
  • 回調(diào) - 當(dāng)服務(wù)器確認(rèn)記錄時執(zhí)行的用戶提供的回調(diào)(null表示無回調(diào))辆苔。
  • KafkaProducer類提供了一種刷新方法算灸,以確保所有先前發(fā)送的消息已經(jīng)實(shí)際完成。flush方法的語法如下 public void flush()
  • KafkaProducer類提供了partitionFor方法驻啤,它有助于獲取給定主題的分區(qū)元數(shù)據(jù)。這可以用于自定義分區(qū)荐吵。此方法的簽名如下
    public Map metrics()
    它返回生產(chǎn)者維護(hù)的內(nèi)部指標(biāo)圖骑冗。
  • public void close() KafkaProducer類提供了緊密的方法塊,直到所有先前發(fā)送的請求完成為止先煎。

生產(chǎn)者API

Producer API的中心部分是Producer類贼涩。生產(chǎn)者類提供了通過以下方法在其構(gòu)造函數(shù)中連接Kafka代理的選項(xiàng)。

生產(chǎn)者類
生產(chǎn)者類提供發(fā)送方法薯蝎,使用以下簽名將消息發(fā)送到單個或多個主題遥倦。

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

有兩種類型的生產(chǎn)者 - 同步異步

同樣的API配置也適用于同步生成器。它們之間的區(qū)別是同步生成器直接發(fā)送消息袒哥,但是在后臺發(fā)送消息缩筛。當(dāng)您想要更高的吞吐量時,Async生產(chǎn)者是首選堡称。在以前的版本瞎抛,如0.8,一個異步生成器沒有回調(diào)send()來注冊錯誤處理程序却紧。這僅在當(dāng)前版本的0.9中可用桐臊。

public void close()
生產(chǎn)者類提供了關(guān)閉與所有kafka兄弟的生產(chǎn)者池連接的緊密方法。

配置設(shè)置

Producer API的主要配置設(shè)置列在下表中晓殊,以便更好地了解 -

S.No 配置設(shè)置和說明
1 client.id 識別生產(chǎn)者應(yīng)用程序
2 producer.type 同步或異步
3 Acks acks配置控制生產(chǎn)者請求下的條件被完全匹配断凶。
4 retries 如果生產(chǎn)者請求失敗,則會自動重試具體值巫俺。
5 bootstrap.servers 經(jīng)紀(jì)人的引導(dǎo)列表认烁。
6 linger.ms 如果要減少請求數(shù),可以將linger.ms設(shè)置為大于某值的值识藤。
7 key.serializer 串行器接口的關(guān)鍵砚著。
8 value.serializer 串行器接口的值。
9 batch.size 緩沖區(qū)大小痴昧。
10 buffer.memory 控制生產(chǎn)者可用于緩沖的總內(nèi)存量稽穆。

ProducerRecord API

ProducerRecord是發(fā)送給Kafka cluster.ProducerRecord類構(gòu)造函數(shù)的一個鍵/值對,用于使用以下簽名創(chuàng)建具有分區(qū)赶撰,鍵和值對的記錄舌镶。

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic - 將附加到記錄的用戶定義的主題名稱。

  • Partition - 分區(qū)計數(shù)

  • Key - 將包括在記錄中的關(guān)鍵豪娜。

  • Value - 記錄內(nèi)容

    public ProducerRecord (string topic, k key, v value)

ProducerRecord類構(gòu)造函數(shù)用于創(chuàng)建具有鍵惰匙,值對和無分區(qū)的記錄源武。

  • Topic- 創(chuàng)建主題以分配記錄。

  • Key - 鍵記錄。

  • Value - 記錄內(nèi)容吠卷。

    public ProducerRecord (string topic, v value)

ProducerRecord類創(chuàng)建一個沒有分區(qū)和鍵的記錄。

  • Topic- 創(chuàng)建主題虱痕。
  • Value - 記錄內(nèi)容役拴。

ProducerRecord類方法列在下表中:

S.No 類方法和描述
1 public string topic() 主題將附加到記錄。
2 public K key()將包含在記錄中的關(guān)鍵字挎狸。如果沒有這樣的鍵扣汪,null將在這里重新轉(zhuǎn)換。
3 public V value()記錄內(nèi)容锨匆。
4 partition() 記錄的分區(qū)數(shù)

SimpleProducer應(yīng)用程序

在創(chuàng)建應(yīng)用程序之前崭别,首先啟動ZooKeeper和Kafka代理,然后使用create topic命令在Kafka代理中創(chuàng)建自己的主題。之后茅主,創(chuàng)建一個名為SimplepleProducer.java的java類舞痰,并鍵入以下代碼。


//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “l(fā)ocalhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

編譯 - 可以使用以下命令編譯應(yīng)用程序暗膜。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

執(zhí)行 - 可以使用以下命令執(zhí)行應(yīng)用程序匀奏。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

產(chǎn)量

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

簡單的消費(fèi)者例子

到目前為止,我們已經(jīng)創(chuàng)建了一個生產(chǎn)者來發(fā)送消息到Kafka集群⊙眩現(xiàn)在讓我們創(chuàng)建一個消費(fèi)者來消費(fèi)kafka群集的消息娃善。KafkaConsumer API用于消費(fèi)來自Kafka群集的消息。KafkaConsumer類構(gòu)造函數(shù)定義如下瑞佩。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - 返回消費(fèi)者配置的映射聚磺。

KafkaConsumer類具有下表中列出的以下重要方法。

S.No 方法和說明
1 public java.util.Set <TopicPar-tition> assignment()獲取當(dāng)前由con-sumer分配的分區(qū)集炬丸。
2 public string subscription()訂閱給定的主題列表以獲得動態(tài)簽名的分區(qū)瘫寝。
3 public void sub-scribe(java.util.List <java.lang.String> topics,ConsumerRe-balanceListener listener)訂閱給定的主題列表以獲得動態(tài)簽名的分區(qū)稠炬。
4 public void unsubscribe()從給定的分區(qū)列表中取消訂閱主題焕阿。
5 public void sub-scribe(java.util.List <java.lang.String> topics)訂閱給定的主題列表以獲得動態(tài)簽名的分區(qū)。如果給定的主題列表為空首启,則它將被視為與unsubscribe()相同暮屡。
6 public void sub-scribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener)參數(shù)模式是指以正則表達(dá)式格式的訂閱模式毅桃,并且listener參數(shù)從訂閱模式獲取通知褒纲。
7 public void as-sign(java.util.List <TopicParti-tion>分區(qū))手動分配給客戶的分區(qū)列表。
8 poll()獲取使用其中一個訂閱/分配API指定的主題或分區(qū)的數(shù)據(jù)钥飞。如果在輪詢數(shù)據(jù)之前沒有訂閱主題莺掠,這將返回錯誤。
9 public void commitSync()針對所有主題和分區(qū)的劃分列表读宙,對最后一次poll()返回的提交偏移量彻秆。相同的操作將應(yīng)用于commitAsyn()。
10 public void seek(TopicPartition partition结闸,long offset)獲取消費(fèi)者將在下一個poll()方法上使用的當(dāng)前偏移值掖棉。
11 public void resume()恢復(fù)已暫停的分區(qū)。
12 public void wakeup()喚醒消費(fèi)者膀估。

ConsumerRecord API

ConsumerRecord API用于從Kafka集群接收記錄。該API由一個主題名稱耻讽,分區(qū)號察纯,從其接收的記錄和指向Kafka分區(qū)中的記錄的偏移量組成。ConsumerRecord類用于創(chuàng)建具有特定主題名稱,分區(qū)計數(shù)和<key饼记,value>對的消費(fèi)者記錄香伴。它具有以下簽名。

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic - 從kafka群集收到的消費(fèi)者記錄的主題名稱具则。
  • Partition - 主題分區(qū)即纲。
  • Key - 記錄的鍵,如果沒有鍵存在null將被返回博肋。
  • Value - 記錄內(nèi)容低斋。

ConsumerRecords API
ConsumerRecords API充當(dāng)ConsumerRecord的容器。該API用于為特定主題保留每個分區(qū)的ConsumerRecord列表匪凡。其構(gòu)造函數(shù)定義如下膊畴。

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List <Consumer-Record>K,V>>> records)

  • TopicPartition - 返回特定主題的分區(qū)映射。
  • 記錄 - ConsumerRecord的返回列表病游。

ConsumerRecords類定義了以下方法唇跨。

S.No 方法和說明
1 public int count() 所有主題的記錄數(shù)。
2 public set partitions() 該記錄集中的數(shù)據(jù)集(如果沒有數(shù)據(jù)被返回衬衬,則該集合為空)买猖。
3 public Iterator iterator() 迭代器使您能夠遍歷集合,獲取或重新移動元素滋尉。
4 公開列表記錄() 獲取給定分區(qū)的記錄列表玉控。

配置設(shè)置

Consumer客戶端API主配置設(shè)置的配置設(shè)置如下所示:

S.No 設(shè)置和說明
1 bootstrap.servers 經(jīng)紀(jì)人列表。
2 group.id 將一個消費(fèi)者分配給一個組兼砖。
3 enable.auto.commit 如果值為true奸远,則啟用自動提交偏移量,否則不提交讽挟。
4 auto.commit.interval.ms 更新消耗的偏移量返回給ZooKeeper的頻率懒叛。
5 session.timeout.ms 表示Kafka將在放棄并繼續(xù)使用消息之前等待ZooKeeper響應(yīng)請求(讀取或?qū)懭耄┒嗌俸撩搿?/td>

SimpleConsumer應(yīng)用程序
生產(chǎn)者應(yīng)用步驟在此保持不變。首先耽梅,啟動您的ZooKeeper和Kafka經(jīng)紀(jì)人薛窥。然后使用名為SimpleCon-sumer.java的java類創(chuàng)建一個SimpleConsumer應(yīng)用程序,并鍵入以下代碼眼姐。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

編譯 - 可以使用以下命令編譯應(yīng)用程序诅迷。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

執(zhí)行 -可以使用以下命令執(zhí)行應(yīng)用程序

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

輸入 - 打開生產(chǎn)者CLI并向主題發(fā)送一些消息。您可以將smple輸入作為“您好消費(fèi)者”众旗。

輸出 - 以下是輸出罢杉。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市贡歧,隨后出現(xiàn)的幾起案子滩租,更是在濱河造成了極大的恐慌赋秀,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,386評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件律想,死亡現(xiàn)場離奇詭異猎莲,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)技即,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評論 3 394
  • 文/潘曉璐 我一進(jìn)店門著洼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人而叼,你說我怎么就攤上這事身笤。” “怎么了澈歉?”我有些...
    開封第一講書人閱讀 164,704評論 0 353
  • 文/不壞的土叔 我叫張陵展鸡,是天一觀的道長。 經(jīng)常有香客問我埃难,道長莹弊,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,702評論 1 294
  • 正文 為了忘掉前任涡尘,我火速辦了婚禮忍弛,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘考抄。我一直安慰自己细疚,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,716評論 6 392
  • 文/花漫 我一把揭開白布川梅。 她就那樣靜靜地躺著疯兼,像睡著了一般。 火紅的嫁衣襯著肌膚如雪贫途。 梳的紋絲不亂的頭發(fā)上吧彪,一...
    開封第一講書人閱讀 51,573評論 1 305
  • 那天,我揣著相機(jī)與錄音丢早,去河邊找鬼姨裸。 笑死,一個胖子當(dāng)著我的面吹牛怨酝,可吹牛的內(nèi)容都是我干的傀缩。 我是一名探鬼主播,決...
    沈念sama閱讀 40,314評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼农猬,長吁一口氣:“原來是場噩夢啊……” “哼赡艰!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起斤葱,我...
    開封第一講書人閱讀 39,230評論 0 276
  • 序言:老撾萬榮一對情侶失蹤瞄摊,失蹤者是張志新(化名)和其女友劉穎勋又,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體换帜,經(jīng)...
    沈念sama閱讀 45,680評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,873評論 3 336
  • 正文 我和宋清朗相戀三年鹤啡,在試婚紗的時候發(fā)現(xiàn)自己被綠了惯驼。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,991評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡递瑰,死狀恐怖祟牲,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情抖部,我是刑警寧澤说贝,帶...
    沈念sama閱讀 35,706評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站慎颗,受9級特大地震影響乡恕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜俯萎,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,329評論 3 330
  • 文/蒙蒙 一傲宜、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧夫啊,春花似錦函卒、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至熊榛,卻和暖如春锚国,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背来候。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評論 1 270
  • 我被黑心中介騙來泰國打工跷叉, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人营搅。 一個月前我還...
    沈念sama閱讀 48,158評論 3 370
  • 正文 我出身青樓云挟,卻偏偏與公主長得像,于是被迫代替她去往敵國和親转质。 傳聞我的和親對象是個殘疾皇子园欣,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,941評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)休蟹,斷路器沸枯,智...
    卡卡羅2017閱讀 134,657評論 18 139
  • 發(fā)行說明 - Kafka - 版本1.0.0 以下是Kafka 1.0.0發(fā)行版中解決的JIRA問題的摘要日矫。有關(guān)該...
    全能程序猿閱讀 2,859評論 2 7
  • Kafka官網(wǎng):http://kafka.apache.org/入門1.1 介紹Kafka? 是一個分布式流處理系...
    it_zzy閱讀 3,894評論 3 53
  • kafka的定義:是一個分布式消息系統(tǒng),由LinkedIn使用Scala編寫绑榴,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,320評論 1 15
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,831評論 4 54