Kafka實踐

kafka基本概念:

  • Kafka是一個分布式消息隊列。Kafka對消息保存時根據(jù)Topic進行歸類僚纷,發(fā)送消息者稱為Producer,消息接受者稱為Consumer拗盒,此外kafka集群有多個kafka實例組成怖竭,每個實例(server)稱為broker。無論是kafka集群陡蝇,還是consumer都依賴于zookeeper集群保存一些meta信息痊臭,來保證系統(tǒng)高可用性。

kafka的優(yōu)點:

  1. batch機制和request機制解決頻繁網(wǎng)絡通信帶來的性能低下問題;
  2. ACK應答機制解決消息一定能夠被消費到,就算傳輸過程中出現(xiàn)故障,只要消息到達了kafka,就會被保存到offset中,方便恢復數(shù)據(jù);
  3. 每個主題topic可以有多個分區(qū);kafka將分區(qū)均勻地分配到整個集群中,提高吞吐量;
  4. 順序讀寫:kafka是個可持久化的日志服務登夫,它將數(shù)據(jù)以數(shù)據(jù)日志的形式進行追加趣兄,最后持久化在磁盤中。利用了磁盤的順序讀寫悼嫉,來提高讀寫效率艇潭。時間復雜度為O(1)。

kafka的缺點:

  1. 部署集群的話,至少需要6臺服務器,3臺zookeeper(kafka的topic和consumer依賴于zookeeper);
  2. 復雜性:Kafka依賴Zookeeper進行元數(shù)據(jù)管理,Topic一般需要人工創(chuàng)建蹋凝,部署和維護比一般MQ成本更高;
  3. 消息亂序鲁纠。Kafka某一個固定的Partition內(nèi)部的消息是保證有序的,如果一個Topic有多個Partition鳍寂,partition之間的消息送達不保證有序改含。
  4. 監(jiān)控不完善,需要安裝插件迄汛;(rabbitmq自帶可視化監(jiān)控web界面,能夠清晰的看到各種參數(shù).)
    kafka和其它消息中間件的優(yōu)缺點見鏈接: https://www.cnblogs.com/mengchunchen/p/9999774.html
    kafka性能基準測試見鏈接:http://www.cnblogs.com/xiaodf/p/6023531.html
    [kafka和其它消息中間件的優(yōu)缺點見鏈接:]: https://www.cnblogs.com/mengchunchen/p/9999774.html
    [kafka性能基準測試見鏈接:]: http://www.cnblogs.com/xiaodf/p/6023531.html

kafka安裝及配置:

kafka安裝及配置見鏈接: https://www.cnblogs.com/RUReady/p/6479464.html
[kafka安裝及配置見鏈接]: https://www.cnblogs.com/RUReady/p/6479464.html


kafka實戰(zhàn)demo:

  1. 導入pom依賴:
<dependency>
       <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
       <version>0.11.0.0</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
  <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.12</artifactId>
       <version>0.11.0.0</version>
   </dependency>

  1. 創(chuàng)建生產(chǎn)者:
    package com.byavs.kafka.produce;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class CustomProducer {
    
    public static void main(String[] args) {
    Properties props = new Properties();
    // Kafka服務端的主機名和端口號
    props.put("bootstrap.servers", "47.98.63.22:9092");
    // 等待所有副本節(jié)點的應答
    props.put("acks", "all");
    // 消息發(fā)送最大嘗試次數(shù)
    props.put("retries", 0);
    // 一批消息處理大小
    props.put("batch.size", 16384);
    // 請求延時
    props.put("linger.ms", 1);
    // 發(fā)送緩存區(qū)內(nèi)存大小
    props.put("buffer.memory", 33554432);
    // key序列化
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // value序列化
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 50; i++) {
    producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i))
    }
    producer.close();
    }
    }
    
    1. 創(chuàng)建消費者:
    package com.byavs.kafka.consume;

    import java.util.Arrays;
    import java.util.Properties;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
public class CustomNewConsumer {

        public static void main(String[] args) {
            Properties props = new Properties();
            // 定義kakfa 服務的地址捍壤,不需要將所有broker指定上
            props.put("bootstrap.servers", "47.98.63.22:9092");
            // 制定consumer group
            props.put("group.id", "test");
            // 是否自動確認offset
            props.put("enable.auto.commit", "true");
            // 自動確認offset的時間間隔
            props.put("auto.commit.interval.ms", "1000");
            // key的序列化類
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // value的序列化類
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 定義consumer
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            // 消費者訂閱的topic, 可同時訂閱多個
            consumer.subscribe(Arrays.asList("first", "second","third"));
            while (true) {
                // 讀取數(shù)據(jù),讀取超時時間為100ms
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }

spring也提供了一套對kafka操作的API,更加方便.

  1. 導入pom依賴
<dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
  1. application.yml配置
    spring:
      kafka:
        bootstrap-servers: 47.98.63.22:9092
        consumer:
          group-id: kafka2
          auto-offset-reset: latest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer

? 主配置類加注解@EnableKafka
? 3. 生產(chǎn)者消費者代碼:

    @Component
    @EnableScheduling
    public class KafkaProducer {
        @Autowired
        private KafkaTemplate kafkaTemplate;
        /**
        * 定時任務
        */
        @Scheduled(cron = "* * * * * ?")
        public void send(){
            String message = UUID.randomUUID().toString();
            // topic1為你在kafka中手動創(chuàng)建的分區(qū)
            ListenableFuture future = kafkaTemplate.send("topic1", message);
            future.addCallback(o -> System.out.println("send-消息發(fā)送成功:" + message), throwable -> System.out.println("消息發(fā)送失敯鞍:" + message));
        }
    }
/**
* kafka消費者測試
*/
@Component
    public class TestConsumer {
        @KafkaListener(topics = "topic1")
        public void listen (ConsumerRecord<?, ?> record) {
            System.out.printf("接受到消息: topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
        }
    }

消息隊列內(nèi)部實現(xiàn)原理:

消息隊列.png

kafka架構:

kafka架構.png

各名詞解釋:
1)Producer :消息生產(chǎn)者鹃觉,就是向kafka broker發(fā)消息的客戶端;
2)Consumer :消息消費者睹逃,向kafka broker取消息的客戶端盗扇;
3)Topic :可以理解為一個隊列;
4) Consumer Group (CG):這是kafka用來實現(xiàn)一個topic消息的廣播(發(fā)給所有的consumer)和單播(發(fā)給任意一個consumer)的手段沉填。一個topic可以有多個CG疗隶。topic的消息會復制(不是真的復制,是概念上的)到所有的CG翼闹,但每個partion只會把消息發(fā)給該CG中的一個consumer斑鼻。如果需要實現(xiàn)廣播,只要每個consumer有一個獨立的CG就可以了猎荠。要實現(xiàn)單播只要所有的consumer在同一個CG坚弱。用CG還可以將consumer進行自由的分組而不需要多次發(fā)送消息到不同的topic;
5)Broker :一臺kafka服務器就是一個broker法牲。一個集群由多個broker組成。一個broker可以容納多個topic琼掠;
6)Partition:為了實現(xiàn)擴展性拒垃,一個非常大的topic可以分布到多個broker(即服務器)上,一個topic可以分為多個partition瓷蛙,每個partition是一個有序的隊列悼瓮。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序?qū)⑾l(fā)給consumer艰猬,不保證一個topic的整體(多個partition間)的順序横堡;
7)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找冠桃。例如你想找位于2049的位置命贴,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末胸蛛,一起剝皮案震驚了整個濱河市污茵,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌葬项,老刑警劉巖泞当,帶你破解...
    沈念sama閱讀 212,599評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異民珍,居然都是意外死亡襟士,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評論 3 385
  • 文/潘曉璐 我一進店門嚷量,熙熙樓的掌柜王于貴愁眉苦臉地迎上來陋桂,“玉大人,你說我怎么就攤上這事津肛≌潞恚” “怎么了?”我有些...
    開封第一講書人閱讀 158,084評論 0 348
  • 文/不壞的土叔 我叫張陵身坐,是天一觀的道長秸脱。 經(jīng)常有香客問我,道長部蛇,這世上最難降的妖魔是什么摊唇? 我笑而不...
    開封第一講書人閱讀 56,708評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮涯鲁,結(jié)果婚禮上巷查,老公的妹妹穿的比我還像新娘。我一直安慰自己抹腿,他們只是感情好岛请,可當我...
    茶點故事閱讀 65,813評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著警绩,像睡著了一般崇败。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上肩祥,一...
    開封第一講書人閱讀 50,021評論 1 291
  • 那天后室,我揣著相機與錄音,去河邊找鬼混狠。 笑死岸霹,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的将饺。 我是一名探鬼主播贡避,決...
    沈念sama閱讀 39,120評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼痛黎,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了贸桶?” 一聲冷哼從身側(cè)響起舅逸,我...
    開封第一講書人閱讀 37,866評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎皇筛,沒想到半個月后琉历,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,308評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡水醋,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,633評論 2 327
  • 正文 我和宋清朗相戀三年旗笔,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片拄踪。...
    茶點故事閱讀 38,768評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡蝇恶,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出惶桐,到底是詐尸還是另有隱情撮弧,我是刑警寧澤,帶...
    沈念sama閱讀 34,461評論 4 333
  • 正文 年R本政府宣布姚糊,位于F島的核電站贿衍,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏救恨。R本人自食惡果不足惜贸辈,卻給世界環(huán)境...
    茶點故事閱讀 40,094評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望肠槽。 院中可真熱鬧擎淤,春花似錦、人聲如沸秸仙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,850評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽寂纪。三九已至席吴,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間弊攘,已是汗流浹背抢腐。 一陣腳步聲響...
    開封第一講書人閱讀 32,082評論 1 267
  • 我被黑心中介騙來泰國打工姑曙, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留襟交,地道東北人。 一個月前我還...
    沈念sama閱讀 46,571評論 2 362
  • 正文 我出身青樓伤靠,卻偏偏與公主長得像捣域,于是被迫代替她去往敵國和親啼染。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,666評論 2 350