概述
- 最近公司對老版本的kafka做升級拓劝,我們的集群很小视卢,就三臺機器踱卵。主要用來爬取數(shù)據(jù)實時任務傳輸用的。老版本用的0.8版本的据过,這個版本zookeeper的依賴還是比較大惋砂,每次kafka讀取消費者topic偏移量都是從zk上讀過來,連接消耗比較大绳锅。在kafka0.9版本后西饵,就不依賴zk去記錄offset的位置了,而是統(tǒng)一記錄在broker端榨呆,通過一個內(nèi)置的一個topic :_consumer_offsets (來記錄各個消費組內(nèi)格topic的位置罗标。下面就集群搭建和使用過程中一些坑記錄一下庸队,記錄下學習筆記
集群搭建
- 集群搭建很簡單积蜻,主要步驟網(wǎng)上都有闯割,我這里就記錄下自己主要配置
zookeeper的
dataDir=/home/maijia/zookeeper-data-new --這個是zk日志目錄
clientPort=2182 --配置的zk端口
initLimit=10
syncLimit=5
tickTime=2000
server.1=192.168.xx.xx:4888:5888
server.2=192.168.xx.xx:4888:5888 ---配置的zk選取端口,采用的是內(nèi)網(wǎng)ip竿拆,只要互通都沒有問題
server.3=192.168.xx.xx:4888:5888
- 下面是kafka broker部分配置宙拉,其他都是采用默認
broker.id=4 --broker代號,這個唯一丙笋,不同機器不同即可
port=9093
listeners=PLAINTEXT://xx.xx.xx.xx:9093 --監(jiān)聽當臺機器外網(wǎng)ip地址和kafka的端口谢澈。消費者和生產(chǎn)者都會連接這個地址進行通信
log.dirs=/home/maijia/kafka-logs-new
zookeeper.connect=192.168.xx.xx:2182,192.168.xx.xx:2182,192.168.xx.xx:2182
zookeeper.connection.timeout.ms=20000
delete.topic.enable=true --配置為true就是刪除topic比較方便,命令行可直接刪除無用topic
這里說明一下幾個broker參數(shù)
- 這幾個和listeners只需要使用listeners御板,1和3是過時的锥忿,老版本出現(xiàn)過。第二個主要是把監(jiān)聽發(fā)布到zk上怠肋【戴蓿總之這幾個只需配置listeners這一個就行了。啟動kafka時候需要解壓包里bin下命令文件直接啟動笙各。具體命令行命令下面再說钉答。我之前就是一直用老版本的bin目錄下啟動腳本配上上面的配置,消費者死活不成功杈抢。所以一定要版本一致数尿。
kafka命令行使用
- 直接看這里就行了 http://orchome.com/454 很詳細的教程,重復的我就不記錄了
生產(chǎn)者和消費者爬坑
使用API maven地址
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
- 生產(chǎn)者還好惶楼,沒有遇到大問題右蹦,采用java客戶端api最好和集群的kafka的版本一致,這樣防止出現(xiàn)一些莫名其妙的幺蛾子歼捐,生產(chǎn)者注意發(fā)送內(nèi)容的key和value的字符格式解析嫩实,
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
我這邊是使用key采用string解析,value采用字符數(shù)組
消費者使用詳細說下窥岩。新版本的消費者變化比較大
1.首先消費者現(xiàn)在可以支持手動提交offset甲献,并且手動支持倆種方式同步和異步,我記得老版本好像都是只是自動提交颂翼。
2.消費者可以自己指定消費的分區(qū)和位置晃洒。以前老版本想改的,只能通過修改zk上對應節(jié)點值才能做到朦乏。
這倆點在使用中變化比較大
- 下面說下具體的消費者配置球及,常規(guī)的
props.put("enable.auto.commit", "true");
//自動提交間隔
props.put("auto.commit.interval.ms", 1000);
props.put("max.poll.interval.ms",300000);
props.put("max.poll.records",10);
//設置消費者心跳間隔
props.put("heartbeat.interval.ms",3000);
props.put("session.timeout.ms", 10000);
使用過程中重點遇到問題消費者消費一段時間后,停止消費了呻疹,offset位置一直沒發(fā)生變化吃引。調(diào)了很久才知道,consumer.poll(100);消費者每次poll阻塞拉取的時候拉取的任務太多,然而數(shù)據(jù)處理程序太慢镊尺,倆次poll之間時間差超過max.poll.interval.ms這個配置里的值朦佩,broker就認為這個消費者掛了,就會重新把它從組內(nèi)刪除庐氮,并且重新平衡语稠。后來通過設置max.poll.records這個值來設定每次poll拉取最多拉取任務就可以了。poll方法里的參數(shù)是每次拉取的阻塞時間ms弄砍。
下面具體配置說明一下
session.timeout.ms 這個值是會話超時時間仙畦,什么意思了,就是說如果發(fā)送心跳時間超過這個時間音婶,broker就會認為消費者死亡了慨畸,默認值是10000ms,也就是10s(這個值一般默認沒問題)
heartbeat.interval.ms 這個值是心跳時間衣式,表示多長時間想broker報告一次寸士,這個默認值3000ms,這個值官方推薦不要高于session.timeout.ms 的1/3(這個值默認沒問題)
enable.auto.commit 是否啟用自動提交瞳收。
auto.commit.interval.ms 自動提交間隔
max.poll.interval.ms 每倆次poll拉取數(shù)據(jù)時間間隔最大超時時間碉京,超過這個值,broker就會認為你這個消費者掛了螟深,并且重新平衡谐宙,這時候就消費不到信息了,如果你用kafka自帶的命令行工具查看
sh kafka-consumer-groups.sh --bootstrap-server localhost:9093 --group group2 --describe 就會有這樣的顯示
Consumer groupgroup2
is rebalancing
max.poll.records 這個值的意思是每次poll拉取數(shù)據(jù)的最大任務數(shù)界弧,設置為5凡蜻,就是一次poll里拉取5條偏移量數(shù)據(jù)
key.deserializer 序列化解析key值這個根據(jù)消費者配置而來org.apache.kafka.common.serialization.StringDeserializer
value.deserializer 序列化解析value值。org.apache.kafka.common.serialization.StringDeserializer
如果生產(chǎn)者或者消費者采用不同字符解析器垢箕,采取對應配置划栓,例如 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
我這個comsumer使用的是string字符解析格式 還有ByteArraySerializer字符數(shù)組這種字符格式。org.apache.kafka.common.serialization都是在這個包里
- 貼一下代碼
Properties props = new Properties();
//服務器位置
props.put("bootstrap.servers", "xxxxx");
//消費組id
props.put("group.id", "group3");
//是否啟動自動提交
props.put("enable.auto.commit", "false");
//自動提交間隔
props.put("auto.commit.interval.ms", 1000);
props.put("auto.offset.reset", "latest");
props.put("max.poll.interval.ms",300000);
props.put("max.poll.records",10);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 動態(tài)從分區(qū)獲取消息,負載均衡的獲取消息条获,如果想手動指定位置分區(qū)和offset忠荞,則使用consumer.assign();
consumer.subscribe(Arrays.asList("fsc"));
// 手動指定方式從分區(qū)獲取數(shù)據(jù)
// TopicPartition p0=new TopicPartition("fsc",0);
// TopicPartition p1=new TopicPartition("fsc",1);
// TopicPartition p2=new TopicPartition("fsc",2);
// List<TopicPartition> topicPartitionList=new ArrayList<TopicPartition>();
// topicPartitionList.add(p0);
// topicPartitionList.add(p1);
// topicPartitionList.add(p2);
//// 指定分區(qū)和offset方式消費數(shù)據(jù),
// consumer.assign(topicPartitionList);
//// 調(diào)到所有分區(qū)最開始的位置
// consumer.seekToBeginning(topicPartitionList);
//// 調(diào)到分區(qū)最后的位置
// consumer.seekToEnd(topicPartitionList);
//// 指定分區(qū)和offset進行消費
// consumer.seek(p2,40);
// consumer.seek(p1,40);
System.out.println("start consumering");
while (true) {
ConsumerRecords<String,String> records = consumer.poll(100);
for (ConsumerRecord<String,String> record : records) {
// 在這里進行插入數(shù)據(jù)庫操作,數(shù)據(jù)在valus里的json格式
System.out.println("分區(qū):"+record.partition()+" offset:"+record.offset()+"key:"+record.key());
// 同步提交
consumer.commitSync();
}
}
記錄完畢