kafka learning note

2020/3/6

Kafka Producer API

  1. producer.send (new ProducerRecord<byte[],byte[]>(topIc, partition, key1, value1), callback);
    (1) ProducerRecord - asynchronously send message to the topic
    (2) callback - the callback sent back to the user when the server confirm the record
  2. public void send(KeyedMessage<k,v> message)
    -sends the data to a single topic, partitioned by key using either sync or async producer.
  3. public void send(List<KeyedMessage<k,v>>messages)
    -sends data to multiple topics.
  4. SETTING PRODICERCONFIG
properties prop = new Properties();
prop.put(producer.type,”async”) asynchronization -> asynchronous
ProducerConfig config = new ProducerConfig(prop);
  1. public void flush()
    -confirm that all the sending function has finished
  2. public Map metrics()
    -partition for get partition metadata of specifical topic
  3. producer API
  • clinet.id
  • producer.type
    -async/sync
  • acks ???

acks配置控制生產(chǎn)者請求下的標(biāo)準(zhǔn)是完全的。

  • bootstrapping broker list
  • linger.ms
  • retry
    -setting linger.ms higher than specifical value to reduce requests
  • key.serializer
  • value.serializer
  • batch.size
  • buffer.memory
  1. ProducerRecord
  • public ProducerRecord ( string topic, int partition, k key, v value)
  • public ProducerRecord ( string topic, k key, v value)
    -no partition
  • public ProducerRecord ( string topic, v value)
    -either partiton and key

SimpleProducer application

import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer"
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }      
      //Assign topicName to string variable
      String topicName = args[0].toString();      
      // create instance for properties to access producer configs   
      Properties props = new Properties();      
      //Assign localhost id
      props.put("bootstrap.servers", “l(fā)ocalhost:9092"); 
      //your ip:host based on config/servers.properties      
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++) {
         producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
      }
      System.out.println(“Message sent successfully");
      producer.close();
   }
}
  • compile
    • javac "$KAFKA_HOME/libs/*" SimpleProducer.java
  • run
    • java "$KAFKA_HOME/libs/*" SimpleProducer

kafka consumer API

  1. public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
  2. consumer api
  • public java.util.Set< TopicPar- tition> assignment()
    -get partition set currently divided by user
  • public string subscription()
    -get dynamic signature in specific topics
  • public void subscribe(java.util.List< java.lang.String> topics茧跋,ConsumerRe-balanceListener listener)
    -get dynamic signature in specific topics.
  • public void unsubscribe()
    -cancel subscription
  • public void subscribe(java.util.List< java.lang.String> topics)
    -if topics is null, share same function with unsubscribe
  • public void subscribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener)
  • public void assign(java.util.List< TopicPartion> partitions)
  • poll()
    -get data from partition
  • public void commitSync()
    -return the offset
  • public void seek(TopicPartition partition,long offset)
    -comsumer will adopt current offset in next poll
  • public void resume
  • public void wakeup

PublicRecord API

  • public ConsumerRecord(string topic,int partition, long offset,K key, V value)
    -get record from kafka group
  • public ConsumerRecords(java.util.Map<TopicPartition,java.util.List<Consumer-Record>K,V>>> records)
    -as the container of record
    • public int count() - count of all topics
    • public Set partitions() - return partition with data set
    • public Iterator()
    • public List records()

ConsumerClient API

  • broker list
  • group.id
  • enable.auto.commit
  • auto.commit.interval.ms
  • session.timeout.ms

SimpleConsumer

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props);
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      //print the topic name
      System.out.println("Subscribed to topic " &plus; topicName);
      int i = 0;
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
      }
   }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末盆驹,一起剝皮案震驚了整個(gè)濱河市褥实,隨后出現(xiàn)的幾起案子妈倔,更是在濱河造成了極大的恐慌,老刑警劉巖片排,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異速侈,居然都是意外死亡率寡,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進(jìn)店門倚搬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來冶共,“玉大人,你說我怎么就攤上這事每界⊥苯” “怎么了?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵眨层,是天一觀的道長庙楚。 經(jīng)常有香客問我,道長谐岁,這世上最難降的妖魔是什么醋奠? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任榛臼,我火速辦了婚禮,結(jié)果婚禮上窜司,老公的妹妹穿的比我還像新娘沛善。我一直安慰自己,他們只是感情好塞祈,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布金刁。 她就那樣靜靜地躺著,像睡著了一般议薪。 火紅的嫁衣襯著肌膚如雪尤蛮。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天斯议,我揣著相機(jī)與錄音产捞,去河邊找鬼。 笑死哼御,一個(gè)胖子當(dāng)著我的面吹牛坯临,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播恋昼,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼看靠,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了液肌?” 一聲冷哼從身側(cè)響起挟炬,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎嗦哆,沒想到半個(gè)月后谤祖,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡吝秕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年泊脐,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片烁峭。...
    茶點(diǎn)故事閱讀 38,059評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡容客,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出约郁,到底是詐尸還是另有隱情缩挑,我是刑警寧澤,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布鬓梅,位于F島的核電站供置,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏绽快。R本人自食惡果不足惜芥丧,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一紧阔、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧续担,春花似錦擅耽、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至询兴,卻和暖如春乃沙,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背诗舰。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工警儒, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人始衅。 一個(gè)月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓冷蚂,卻偏偏與公主長得像缭保,于是被迫代替她去往敵國和親汛闸。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容