四.KafkaAPI實戰(zhàn)

4.1 環(huán)境準(zhǔn)備

1)在eclipse中創(chuàng)建一個java工程

2)在工程的根目錄創(chuàng)建一個lib文件夾

3)解壓kafka安裝包缺脉,將安裝包libs目錄下的jar包拷貝到工程的lib目錄下,并build path秕衙。

4)啟動zk和kafka集群贴唇,在kafka集群中打開一個消費者

[itstar@bigdata11 kafka]$ bin/kafka-console-consumer.sh --zookeeper

bigdata11:2181 --topic first

4.2 Kafka生產(chǎn)者Java API

4.2.1 創(chuàng)建生產(chǎn)者(過時的API)

package com.itstar.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class OldProducer {

@SuppressWarnings("deprecation")

public static void main(String[] args) {

Properties properties = new Properties();

properties.put("metadata.broker.list", "bigdata11:9092");

properties.put("request.required.acks", "1");

properties.put("serializer.class", "kafka.serializer.StringEncoder");

Producer<Integer, String> producer = new Producer<Integer,String>(new

ProducerConfig(properties));

KeyedMessage<Integer, String> message = new KeyedMessage<Integer,

String>("first", "hello world");

producer.send(message );

}

}

4.2.2 創(chuàng)建生產(chǎn)者(新API)

package com.itstar.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

public class NewProducer {

public static void main(String[] args) {

Properties props = new Properties();

// Kafka服務(wù)端的主機名和端口號

props.put("bootstrap.servers", "bigdata12:9092");

// 等待所有副本節(jié)點的應(yīng)答

props.put("acks", "all");

// 消息發(fā)送最大嘗試次數(shù)

props.put("retries", 0);

// 一批消息處理大小

props.put("batch.size", 16384);

// 請求延時

props.put("linger.ms", 1);

// 發(fā)送緩存區(qū)內(nèi)存大小

props.put("buffer.memory", 33554432);

// key序列化

props.put("key.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

// value序列化

props.put("value.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

for (int i = 0; i < 50; i++) {

producer.send(new ProducerRecord<String, String>("first",

Integer.toString(i), "hello world-" + i));

}

producer.close();

}

}

4.2.3 創(chuàng)建生產(chǎn)者帶回調(diào)函數(shù)(新API)

package com.itstar.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

public class CallBackProducer {

public static void main(String[] args) {

Properties props = new Properties();

// Kafka服務(wù)端的主機名和端口號

props.put("bootstrap.servers", "bigdata12:9092");

// 等待所有副本節(jié)點的應(yīng)答

props.put("acks", "all");

// 消息發(fā)送最大嘗試次數(shù)

props.put("retries", 0);

// 一批消息處理大小

props.put("batch.size", 16384);

// 增加服務(wù)端請求延時

props.put("linger.ms", 1);

// 發(fā)送緩存區(qū)內(nèi)存大小

props.put("buffer.memory", 33554432);

// key序列化

props.put("key.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

// value序列化

props.put("value.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>

(props);

for (int i = 0; i < 50; i++) {

kafkaProducer.send(new ProducerRecord<String, String>("first",

"hello" + i), new Callback() {

@Override

public void onCompletion(RecordMetadata metadata, Exception

exception) {

if (metadata != null) {

System.out.println(metadata.partition() + "---" +

metadata.offset());

}

}

});

}

kafkaProducer.close();

}

}

4.2.4 自定義分區(qū)生產(chǎn)者

0)需求:將所有數(shù)據(jù)存儲到topic的第0號分區(qū)上

1)定義一個類實現(xiàn)Partitioner接口,重寫里面的方法(過時API)

package com.itstar.kafka;

import java.util.Map;

import kafka.producer.Partitioner;

public class CustomPartitioner implements Partitioner {

public CustomPartitioner() {

super();

}

@Override

public int partition(Object key, int numPartitions) {

// 控制分區(qū)

return 0;

}

}

2)自定義分區(qū)(新API)

package com.itstar.kafka;

import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;

import org.apache.kafka.common.Cluster;

public class CustomPartitioner implements Partitioner {

@Override

public void configure(Map<String, ?> configs) {

}

@Override

public int partition(String topic, Object key, byte[] keyBytes, Object

value, byte[] valueBytes, Cluster cluster) {

// 控制分區(qū)

return 0;

}

@Override

public void close() {

}

}

3)在代碼中調(diào)用

package com.itstar.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

public class PartitionerProducer {

public static void main(String[] args) {

Properties props = new Properties();

// Kafka服務(wù)端的主機名和端口號

props.put("bootstrap.servers", "bigdata12:9092");

// 等待所有副本節(jié)點的應(yīng)答

props.put("acks", "all");

// 消息發(fā)送最大嘗試次數(shù)

props.put("retries", 0);

// 一批消息處理大小

props.put("batch.size", 16384);

// 增加服務(wù)端請求延時

props.put("linger.ms", 1);

// 發(fā)送緩存區(qū)內(nèi)存大小

props.put("buffer.memory", 33554432);

// key序列化

props.put("key.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

// value序列化

props.put("value.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

// 自定義分區(qū)

props.put("partitioner.class", "com.itstar.kafka.CustomPartitioner");

Producer<String, String> producer = new KafkaProducer<>(props);

producer.send(new ProducerRecord<String, String>("first", "1",

"itstar"));

producer.close();

}

}

4)測試

(1)在bigdata11上監(jiān)控/opt/module/kafka/logs/目錄下fifirst主題3個分區(qū)的log日志動態(tài)變化情況

[itstar@bigdata11 first-0]$ tail -f 00000000000000000000.log

[itstar@bigdata11 first-1]$ tail -f 00000000000000000000.log

[itstar@bigdata11 first-2]$ tail -f 00000000000000000000.log

(2)發(fā)現(xiàn)數(shù)據(jù)都存儲到指定的分區(qū)了。

4.3 Kafka消費者Java API

0)在控制臺創(chuàng)建發(fā)送者

[itstar@bigdata13 kafka]$ bin/kafka-console-producer.sh --broker-list

bigdata11:9092 --topic first

>hello world

1)創(chuàng)建消費者(過時API)

package com.itstar.kafka.consume;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

public class CustomConsumer {

@SuppressWarnings("deprecation")

public static void main(String[] args) {

Properties properties = new Properties();

properties.put("zookeeper.connect", "bigdata11:2181");

properties.put("group.id", "g1");

properties.put("zookeeper.session.timeout.ms", "500");

properties.put("zookeeper.sync.time.ms", "250");

properties.put("auto.commit.interval.ms", "1000");

// 創(chuàng)建消費者連接器

ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new

ConsumerConfig(properties));

HashMap<String, Integer> topicCount = new HashMap<>();

topicCount.put("first", 1);

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =

consumer.createMessageStreams(topicCount);

KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0);

ConsumerIterator<byte[], byte[]> it = stream.iterator();

while (it.hasNext()) {

System.out.println(new String(it.next().message()));

}

}

}

2)官方提供案例(自動維護(hù)消費情況)(新API)

package com.itstar.kafka.consume;

import java.util.Arrays;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

public class CustomNewConsumer {

public static void main(String[] args) {

Properties props = new Properties();

// 定義kakfa 服務(wù)的地址瞻坝,不需要將所有broker指定上

props.put("bootstrap.servers", "bigdata11:9092");

// 制定consumer group

props.put("group.id", "test");

// 是否自動確認(rèn)offset

props.put("enable.auto.commit", "true");

// 自動確認(rèn)offset的時間間隔

props.put("auto.commit.interval.ms", "1000");

// key的序列化類

props.put("key.deserializer",

"org.apache.kafka.common.serialization.StringDeserializer");

// value的序列化類

props.put("value.deserializer",

"org.apache.kafka.common.serialization.StringDeserializer");

// 定義consumer

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 消費者訂閱的topic, 可同時訂閱多個

consumer.subscribe(Arrays.asList("first", "second","third"));

while (true) {

// 讀取數(shù)據(jù),讀取超時時間為100ms

ConsumerRecords<String, String> records = consumer.poll(100);

for (ConsumerRecord<String, String> record : records)

System.out.printf("offset = %d, key = %s, value = %s%n",

record.offset(), record.key(), record.value());

}

}

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末杏瞻,一起剝皮案震驚了整個濱河市所刀,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌捞挥,老刑警劉巖浮创,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異砌函,居然都是意外死亡斩披,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進(jìn)店門胸嘴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來雏掠,“玉大人,你說我怎么就攤上這事劣像。” “怎么了摧玫?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵耳奕,是天一觀的道長。 經(jīng)常有香客問我诬像,道長屋群,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任坏挠,我火速辦了婚禮芍躏,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘降狠。我一直安慰自己对竣,他們只是感情好庇楞,可當(dāng)我...
    茶點故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著否纬,像睡著了一般吕晌。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上临燃,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天睛驳,我揣著相機與錄音,去河邊找鬼膜廊。 笑死乏沸,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的爪瓜。 我是一名探鬼主播蹬跃,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼钥勋!你這毒婦竟也來了炬转?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤算灸,失蹤者是張志新(化名)和其女友劉穎扼劈,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體菲驴,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡荐吵,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了赊瞬。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片先煎。...
    茶點故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖巧涧,靈堂內(nèi)的尸體忽然破棺而出薯蝎,到底是詐尸還是另有隱情,我是刑警寧澤谤绳,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布占锯,位于F島的核電站,受9級特大地震影響缩筛,放射性物質(zhì)發(fā)生泄漏消略。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一瞎抛、第九天 我趴在偏房一處隱蔽的房頂上張望艺演。 院中可真熱鬧,春花似錦、人聲如沸胎撤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽哩照。三九已至挺物,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間飘弧,已是汗流浹背识藤。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留次伶,地道東北人痴昧。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓,卻偏偏與公主長得像冠王,于是被迫代替她去往敵國和親赶撰。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容