kafka 0.10.1一些使用經(jīng)驗

概述

  • 最近公司對老版本的kafka做升級拓劝,我們的集群很小视卢,就三臺機器踱卵。主要用來爬取數(shù)據(jù)實時任務傳輸用的。老版本用的0.8版本的据过,這個版本zookeeper的依賴還是比較大惋砂,每次kafka讀取消費者topic偏移量都是從zk上讀過來,連接消耗比較大绳锅。在kafka0.9版本后西饵,就不依賴zk去記錄offset的位置了,而是統(tǒng)一記錄在broker端榨呆,通過一個內(nèi)置的一個topic :_consumer_offsets (來記錄各個消費組內(nèi)格topic的位置罗标。下面就集群搭建和使用過程中一些坑記錄一下庸队,記錄下學習筆記

集群搭建

  • 集群搭建很簡單积蜻,主要步驟網(wǎng)上都有闯割,我這里就記錄下自己主要配置
    zookeeper的

dataDir=/home/maijia/zookeeper-data-new --這個是zk日志目錄
clientPort=2182 --配置的zk端口
initLimit=10
syncLimit=5
tickTime=2000
server.1=192.168.xx.xx:4888:5888
server.2=192.168.xx.xx:4888:5888 ---配置的zk選取端口,采用的是內(nèi)網(wǎng)ip竿拆,只要互通都沒有問題
server.3=192.168.xx.xx:4888:5888

  • 下面是kafka broker部分配置宙拉,其他都是采用默認

broker.id=4 --broker代號,這個唯一丙笋,不同機器不同即可
port=9093
listeners=PLAINTEXT://xx.xx.xx.xx:9093 --監(jiān)聽當臺機器外網(wǎng)ip地址和kafka的端口谢澈。消費者和生產(chǎn)者都會連接這個地址進行通信
log.dirs=/home/maijia/kafka-logs-new
zookeeper.connect=192.168.xx.xx:2182,192.168.xx.xx:2182,192.168.xx.xx:2182
zookeeper.connection.timeout.ms=20000
delete.topic.enable=true --配置為true就是刪除topic比較方便,命令行可直接刪除無用topic

這里說明一下幾個broker參數(shù)


image.png
  • 這幾個和listeners只需要使用listeners御板,1和3是過時的锥忿,老版本出現(xiàn)過。第二個主要是把監(jiān)聽發(fā)布到zk上怠肋【戴蓿總之這幾個只需配置listeners這一個就行了。啟動kafka時候需要解壓包里bin下命令文件直接啟動笙各。具體命令行命令下面再說钉答。我之前就是一直用老版本的bin目錄下啟動腳本配上上面的配置,消費者死活不成功杈抢。所以一定要版本一致数尿。

kafka命令行使用

生產(chǎn)者和消費者爬坑

使用API maven地址

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.0</version>
        </dependency>
  • 生產(chǎn)者還好惶楼,沒有遇到大問題右蹦,采用java客戶端api最好和集群的kafka的版本一致,這樣防止出現(xiàn)一些莫名其妙的幺蛾子歼捐,生產(chǎn)者注意發(fā)送內(nèi)容的key和value的字符格式解析嫩实,

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
我這邊是使用key采用string解析,value采用字符數(shù)組

消費者使用詳細說下窥岩。新版本的消費者變化比較大
1.首先消費者現(xiàn)在可以支持手動提交offset甲献,并且手動支持倆種方式同步和異步,我記得老版本好像都是只是自動提交颂翼。
2.消費者可以自己指定消費的分區(qū)和位置晃洒。以前老版本想改的,只能通過修改zk上對應節(jié)點值才能做到朦乏。
這倆點在使用中變化比較大

  • 下面說下具體的消費者配置球及,常規(guī)的
        props.put("enable.auto.commit", "true");
        //自動提交間隔
        props.put("auto.commit.interval.ms", 1000);
        props.put("max.poll.interval.ms",300000);
        props.put("max.poll.records",10);  
        //設置消費者心跳間隔
        props.put("heartbeat.interval.ms",3000);
        props.put("session.timeout.ms", 10000); 

使用過程中重點遇到問題消費者消費一段時間后,停止消費了呻疹,offset位置一直沒發(fā)生變化吃引。調(diào)了很久才知道,consumer.poll(100);消費者每次poll阻塞拉取的時候拉取的任務太多,然而數(shù)據(jù)處理程序太慢镊尺,倆次poll之間時間差超過max.poll.interval.ms這個配置里的值朦佩,broker就認為這個消費者掛了,就會重新把它從組內(nèi)刪除庐氮,并且重新平衡语稠。后來通過設置max.poll.records這個值來設定每次poll拉取最多拉取任務就可以了。poll方法里的參數(shù)是每次拉取的阻塞時間ms弄砍。

下面具體配置說明一下

session.timeout.ms 這個值是會話超時時間仙畦,什么意思了,就是說如果發(fā)送心跳時間超過這個時間音婶,broker就會認為消費者死亡了慨畸,默認值是10000ms,也就是10s(這個值一般默認沒問題)
heartbeat.interval.ms 這個值是心跳時間衣式,表示多長時間想broker報告一次寸士,這個默認值3000ms,這個值官方推薦不要高于session.timeout.ms 的1/3(這個值默認沒問題)
enable.auto.commit 是否啟用自動提交瞳收。
auto.commit.interval.ms 自動提交間隔
max.poll.interval.ms 每倆次poll拉取數(shù)據(jù)時間間隔最大超時時間碉京,超過這個值,broker就會認為你這個消費者掛了螟深,并且重新平衡谐宙,這時候就消費不到信息了,如果你用kafka自帶的命令行工具查看
sh kafka-consumer-groups.sh --bootstrap-server localhost:9093 --group group2 --describe 就會有這樣的顯示
Consumer group group2 is rebalancing
max.poll.records 這個值的意思是每次poll拉取數(shù)據(jù)的最大任務數(shù)界弧,設置為5凡蜻,就是一次poll里拉取5條偏移量數(shù)據(jù)
key.deserializer 序列化解析key值這個根據(jù)消費者配置而來org.apache.kafka.common.serialization.StringDeserializer
value.deserializer 序列化解析value值。org.apache.kafka.common.serialization.StringDeserializer
如果生產(chǎn)者或者消費者采用不同字符解析器垢箕,采取對應配置划栓,例如 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
我這個comsumer使用的是string字符解析格式 還有ByteArraySerializer字符數(shù)組這種字符格式。org.apache.kafka.common.serialization都是在這個包里

  • 貼一下代碼

        Properties props = new Properties();
        //服務器位置
        props.put("bootstrap.servers", "xxxxx");
        //消費組id
        props.put("group.id", "group3");
        //是否啟動自動提交
        props.put("enable.auto.commit", "false");
        //自動提交間隔
        props.put("auto.commit.interval.ms", 1000);
        props.put("auto.offset.reset", "latest");
        props.put("max.poll.interval.ms",300000);
        props.put("max.poll.records",10);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//        動態(tài)從分區(qū)獲取消息,負載均衡的獲取消息条获,如果想手動指定位置分區(qū)和offset忠荞,則使用consumer.assign();
         consumer.subscribe(Arrays.asList("fsc"));

//        手動指定方式從分區(qū)獲取數(shù)據(jù)
//        TopicPartition p0=new TopicPartition("fsc",0);
//        TopicPartition p1=new TopicPartition("fsc",1);
//        TopicPartition p2=new TopicPartition("fsc",2);
//        List<TopicPartition> topicPartitionList=new ArrayList<TopicPartition>();
//        topicPartitionList.add(p0);
//        topicPartitionList.add(p1);
//        topicPartitionList.add(p2);
////        指定分區(qū)和offset方式消費數(shù)據(jù),
//        consumer.assign(topicPartitionList);
////         調(diào)到所有分區(qū)最開始的位置
//        consumer.seekToBeginning(topicPartitionList);
////        調(diào)到分區(qū)最后的位置
//        consumer.seekToEnd(topicPartitionList);
////        指定分區(qū)和offset進行消費
//        consumer.seek(p2,40);
//        consumer.seek(p1,40);
        System.out.println("start consumering");
        while (true) {
            ConsumerRecords<String,String> records = consumer.poll(100);
            for (ConsumerRecord<String,String> record : records) {
//              在這里進行插入數(shù)據(jù)庫操作,數(shù)據(jù)在valus里的json格式
                System.out.println("分區(qū):"+record.partition()+" offset:"+record.offset()+"key:"+record.key());
//              同步提交     
                consumer.commitSync();
            }
        }

記錄完畢

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末帅掘,一起剝皮案震驚了整個濱河市委煤,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌修档,老刑警劉巖碧绞,帶你破解...
    沈念sama閱讀 216,470評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異吱窝,居然都是意外死亡讥邻,警方通過查閱死者的電腦和手機迫靖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來兴使,“玉大人系宜,你說我怎么就攤上這事■昊蹋” “怎么了蜈首?”我有些...
    開封第一講書人閱讀 162,577評論 0 353
  • 文/不壞的土叔 我叫張陵实抡,是天一觀的道長欠母。 經(jīng)常有香客問我,道長吆寨,這世上最難降的妖魔是什么赏淌? 我笑而不...
    開封第一講書人閱讀 58,176評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮啄清,結果婚禮上六水,老公的妹妹穿的比我還像新娘。我一直安慰自己辣卒,他們只是感情好掷贾,可當我...
    茶點故事閱讀 67,189評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著荣茫,像睡著了一般想帅。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上啡莉,一...
    開封第一講書人閱讀 51,155評論 1 299
  • 那天港准,我揣著相機與錄音,去河邊找鬼咧欣。 笑死浅缸,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的魄咕。 我是一名探鬼主播衩椒,決...
    沈念sama閱讀 40,041評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼哮兰!你這毒婦竟也來了毛萌?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,903評論 0 274
  • 序言:老撾萬榮一對情侶失蹤奠蹬,失蹤者是張志新(化名)和其女友劉穎朝聋,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體囤躁,經(jīng)...
    沈念sama閱讀 45,319評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡冀痕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,539評論 2 332
  • 正文 我和宋清朗相戀三年荔睹,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片言蛇。...
    茶點故事閱讀 39,703評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡僻他,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出腊尚,到底是詐尸還是另有隱情吨拗,我是刑警寧澤,帶...
    沈念sama閱讀 35,417評論 5 343
  • 正文 年R本政府宣布婿斥,位于F島的核電站劝篷,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏民宿。R本人自食惡果不足惜娇妓,卻給世界環(huán)境...
    茶點故事閱讀 41,013評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望活鹰。 院中可真熱鬧哈恰,春花似錦、人聲如沸志群。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽锌云。三九已至荠医,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間宾抓,已是汗流浹背子漩。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留石洗,地道東北人幢泼。 一個月前我還...
    沈念sama閱讀 47,711評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像讲衫,于是被迫代替她去往敵國和親缕棵。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,601評論 2 353

推薦閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,721評論 13 425
  • 發(fā)行說明 - Kafka - 版本1.0.0 以下是Kafka 1.0.0發(fā)行版中解決的JIRA問題的摘要涉兽。有關該...
    全能程序猿閱讀 2,859評論 2 7
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理招驴,服務發(fā)現(xiàn),斷路器枷畏,智...
    卡卡羅2017閱讀 134,651評論 18 139
  • kafka的定義:是一個分布式消息系統(tǒng)别厘,由LinkedIn使用Scala編寫,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,317評論 1 15
  • 記憶 如落葉浮萍 鏡花水月 存在時多絢麗 再回首就有多苦痛 記憶 如利刀重錘 一雕一刻 深入骨髓 對你而言 我不過...
    一笑誤終生閱讀 429評論 2 3