kafka學(xué)習(xí)一

kafka介紹:

Kafka最初由LinkedIn公司開發(fā),使用Scala語(yǔ)言編寫镀赌,之后成為Apache項(xiàng)目的一部分氯哮。Kafka是一個(gè)分布式,可劃分的商佛,多訂閱者喉钢,冗余備份的持久性的日志服務(wù)。它主要用于處理活躍的流式數(shù)據(jù)良姆。在大系統(tǒng)中肠虽,我們經(jīng)常會(huì)碰到這樣的一個(gè)問題,大系統(tǒng)下的各個(gè)子系統(tǒng)需要數(shù)據(jù)高性能玛追、低延遲的不停流轉(zhuǎn)税课。kafka很適合處理這樣的問題!

消息隊(duì)列的分類:
點(diǎn)對(duì)點(diǎn):消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中痊剖,然后消息消費(fèi)者從queue中取出并且消費(fèi)消息韩玩,消息被消費(fèi)以后,queue中不再有儲(chǔ)存陆馁,所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息找颓。,即對(duì)消息而言氮惯,只會(huì)有一個(gè)消費(fèi)者叮雳。
發(fā)布/訂閱:消息生產(chǎn)者將消息發(fā)不到topic中想暗,同時(shí)可以有多個(gè)消息消費(fèi)者消費(fèi)該消息。和點(diǎn)對(duì)點(diǎn)方式不同帘不,發(fā)不到topic的消息會(huì)被所有訂閱者消費(fèi),kafka 就是典型發(fā)布储狭。

kafka的特點(diǎn):
1捣郊、同時(shí)為發(fā)布和訂閱提供高吞吐量刮萌,kafka每秒可以產(chǎn)生約25萬的消息(50MB),每秒能夠處理55萬消息(110MB)涮阔。
2敬特、可進(jìn)行持久化操作,將消息持久化到磁盤减俏。
3、分布式系統(tǒng)怕篷,易于向外擴(kuò)展。所有的producer蒸痹、broker和consumer都會(huì)有多個(gè)匿沛。均為分布式的逃呼。
4、 消息被處理的狀態(tài)是在consumer端維護(hù)推姻,而不是在server端維護(hù)。當(dāng)失敗是能自動(dòng)平衡校翔。
5防症、支持online和offline的場(chǎng)景。

下面介紹一下Kafka的架構(gòu)和組成:
Producer:是能夠發(fā)布消息到topic的任何對(duì)象奈嘿。
Consumer:消息和數(shù)據(jù)的消費(fèi)者,訂閱topics 并處理其發(fā)布的消息叶圃。
Consumer Group:可以并行消費(fèi)Topic中的partition的消息。
Broker:緩存代理德崭,Kafka集群中的一個(gè)kafka節(jié)點(diǎn)就是一個(gè)broker眉厨。
Topic: 特指Kafka 處理的消息源(feeds of messages)的不同分類缨叫。
Partition:topic物理上的分組耻姥,一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列婉商。partition中的每條消息都會(huì)被分配一個(gè)有序的 id(offset)。
Message:消息蘑秽,是通信的基本單位,每個(gè) producer 可以向一個(gè)topic(主題)發(fā)布一些消息缀雳。

kafka結(jié)構(gòu)圖

kafka的安裝和使用

kafka下載和相關(guān)文檔地址:

http://kafka.apache.org/

修改zookeper配置文件:

dataDir=D:\tmp\kafka\zookeeper
clientPort=2181
maxClientCnxns=0

修改server配置文件:

broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=D:\tmp\kafka\kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1

啟動(dòng)服務(wù):
先啟動(dòng)zookeper服務(wù):

./bin/zookeeper-server-start.sh  ./config/zookeeper.properties

再啟動(dòng)server服務(wù):

./bin/kafka-server-start.sh  ./config/server.properties

pom依賴:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>

Config.java文件代碼:

package com.lqq.demo1;

public class Config {

    public final static String TOPIC = "TEST-TOPIC";
    public final static String bootstrap_servers = "localhost:9092";

    public final static String group_id = "jd-group";
    public final static String key_serializer="org.apache.kafka.common.serialization.StringSerializer";
    public final static String value_serializer="org.apache.kafka.common.serialization.StringSerializer";
    public final static String key_deserializer="org.apache.kafka.common.serialization.StringDeserializer";
    public final static String value_deserializer="org.apache.kafka.common.serialization.StringDeserializer";
    
}

生產(chǎn)者端:

package com.lqq.demo1;

public class KProducer {
    private final Producer<String, String> producer;

    public KProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", Config.bootstrap_servers);  
        props.put("acks", "all");  
        props.put("retries", 1);  
        props.put("batch.size", 16384);  
        props.put("key.serializer", Config.key_deserializer);    
        props.put("value.serializer", Config.value_deserializer);     
        producer = new KafkaProducer<>(props);
    }

    public void produce() {
        for(int i=0;i<1000;i++){
            String key = String.valueOf(i);
            String data = "hello kafka message " + i;
            ProducerRecord<String, String> record=new ProducerRecord<String, String>(Config.TOPIC, key, data);
            System.out.println("Produce record key: "+key+" value: "+data);
            producer.send(record);
        }
    }
    
    public void close(){
        producer.close();
    }
    
    public static void main(String[] args) {
        KProducer producer=new KProducer();
        producer.produce();
        producer.close();
    }
}

指定broker和序列化類型挤牛,然后向broker發(fā)送消息竞膳。

消費(fèi)者端:

package com.lqq.demo1;

public class KConsumer {
    private final Consumer<String, String> consumer;

    public  KConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", Config.bootstrap_servers);    
        props.put("group.id", Config.group_id);    
        props.put("enable.auto.commit", "true");    
        props.put("client.id", "25424tg2");
        props.put("heartbeat.interval.ms","1000");
        props.put("auto.commit.interval.ms", "1000");    
        props.put("session.timeout.ms", "30000");    
        props.put("key.deserializer", Config.key_deserializer);    
        props.put("value.deserializer", Config.value_deserializer);   
        consumer = new KafkaConsumer<String, String>(props);
    }

    public void consume() {
        consumer.subscribe(Arrays.asList(Config.TOPIC));
        consumer.seekToBeginning(new ArrayList<TopicPartition>());
        while(true){
            ConsumerRecords<String,String> records=consumer.poll(1000);
            for(ConsumerRecord<String,String> record:records){
                System.out.println("Consumer  record  offset="+record.offset()+"  key="+record.key()+" value="+record.value());
            }
        }

    }
    public void close(){
        consumer.close();
    }
    public static void main(String[] args) {
        KConsumer kConsumer=new KConsumer();
        kConsumer.consume();
        kConsumer.close();
    }
}
  
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末锉走,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子梁厉,更是在濱河造成了極大的恐慌词顾,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,198評(píng)論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異航棱,居然都是意外死亡饮醇,警方通過查閱死者的電腦和手機(jī)观蓄,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來亲茅,“玉大人克锣,你說我怎么就攤上這事验残∧唬” “怎么了?”我有些...
    開封第一講書人閱讀 167,643評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)何缓。 經(jīng)常有香客問我传轰,道長(zhǎng),這世上最難降的妖魔是什么纪挎? 我笑而不...
    開封第一講書人閱讀 59,495評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮封孙,結(jié)果婚禮上虎忌,老公的妹妹穿的比我還像新娘堪藐。我一直安慰自己庶橱,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,502評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著硼端,像睡著了一般。 火紅的嫁衣襯著肌膚如雪句喷。 梳的紋絲不亂的頭發(fā)上唾琼,一...
    開封第一講書人閱讀 52,156評(píng)論 1 308
  • 那天赶舆,我揣著相機(jī)與錄音,去河邊找鬼九串。 笑死,一個(gè)胖子當(dāng)著我的面吹牛躬贡,可吹牛的內(nèi)容都是我干的拂玻。 我是一名探鬼主播魄懂,決...
    沈念sama閱讀 40,743評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了篡腌?” 一聲冷哼從身側(cè)響起嘹悼,我...
    開封第一講書人閱讀 39,659評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤卒密,失蹤者是張志新(化名)和其女友劉穎领炫,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鼎俘,經(jīng)...
    沈念sama閱讀 46,200評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡哲身,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,282評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了贸伐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片勘天。...
    茶點(diǎn)故事閱讀 40,424評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖捉邢,靈堂內(nèi)的尸體忽然破棺而出伏伐,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 36,107評(píng)論 5 349
  • 正文 年R本政府宣布,位于F島的核電站,受9級(jí)特大地震影響抄谐,放射性物質(zhì)發(fā)生泄漏浦箱。R本人自食惡果不足惜蓬推,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,789評(píng)論 3 333
  • 文/蒙蒙 一纠脾、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦殷绍、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)贝乎。三九已至挽拔,卻和暖如春倘是,著一層夾襖步出監(jiān)牢的瞬間门坷,已是汗流浹背设江。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評(píng)論 1 271
  • 我被黑心中介騙來泰國(guó)打工练俐, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人跺撼。 一個(gè)月前我還...
    沈念sama閱讀 48,798評(píng)論 3 376
  • 正文 我出身青樓戈鲁,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親荆陆。 傳聞我的和親對(duì)象是個(gè)殘疾皇子娄猫,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,435評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容