Kafka學(xué)習(xí)筆記

Kafka 學(xué)習(xí)筆記

內(nèi)容大部分引用自Info - Apache Kafka:下一代分布式消息系統(tǒng)

原文作者Abhishek Sharma的項目Kafka-Message-Server

介紹

Kafka是使用scala語言開發(fā)镜盯,類似于RabbitMQ的分布式消息系統(tǒng)。
Kafka是分布式的,它通過可以多個broker組成一個集群骡显。
Kafka依賴于Zookeeper。

概念

Topic(話題) 特定類型的消息流奸鬓。消息是字節(jié)的有效負(fù)載(Payload)撇眯,話題是消息的分類或種子(Feed)名。

Producer(生產(chǎn)者) 能夠發(fā)布消息到話題的任何對象旷祸。

Broker(代理) 或稱Kafka集群。用于保存消息的服務(wù)器讼昆。

Consumer(消費(fèi)者) 可以訂閱一個或多個話題托享,并從Broker拉取數(shù)據(jù),從而消費(fèi)這些已發(fā)布的消息。

Kafka生產(chǎn)者, 消費(fèi)者 & 代理

架構(gòu)

生產(chǎn)者使用自己的序列化方法對消息內(nèi)容進(jìn)行編碼闰围。然后向broker發(fā)起消息赃绊。為了提高效率,一個發(fā)布請求中可以包含一組消息羡榴。

消費(fèi)者訂閱話題碧查,并為話題創(chuàng)建一個或多個消息流。發(fā)布到該話題的消息被均衡的分發(fā)到這些流中校仑。

每個消息流為不斷產(chǎn)生的消息提供了迭代接口忠售。

消費(fèi)者迭代流中每一條消息,并處理消息的有效負(fù)載迄沫。

迭代器不會停止稻扬。如果當(dāng)前沒有消息,迭代器將阻塞直至有新的消息發(fā)布到該話題羊瘩。

Kafka支持點(diǎn)到點(diǎn)分發(fā)模型(Proint-to-point delivery model)泰佳,即多個消費(fèi)者共同消費(fèi)隊列中某個消息的單個副本;也支持發(fā)布-訂閱模型(Publish-subscribe model)尘吗,即多個消費(fèi)者接收自己的消息副本逝她。

kafka-proc-topic-part-con.png

存儲

kafka的存儲,話題的每個分區(qū)對應(yīng)一個邏輯日志摇予。物理上汽绢,一個日志為相同大小的一段分組文件。

每次生產(chǎn)者發(fā)布消息到一個分區(qū)侧戴,代理就將消息追加到最后一個段文件中宁昭。

當(dāng)發(fā)布的消息數(shù)量達(dá)到設(shè)定值或經(jīng)過一段時間后,段文件真正寫入磁盤中酗宋。

寫入完成后积仗,消息公開給消費(fèi)者。

與傳統(tǒng)的消息不同蜕猫,kafka系統(tǒng)中存儲的消息沒有明確的id寂曹,而是通過日志的邏輯偏移量來公開。相比其他方式回右,這種處理更為高效隆圆。

消費(fèi)者始終從特殊分區(qū)順序的獲取消息。

代理

不同于其他消息系統(tǒng)翔烁,kafka代理是無狀態(tài)的渺氧,即消費(fèi)者必須維護(hù)已消費(fèi)的狀態(tài)消息,而代理完全不管蹬屹。

這種設(shè)計的創(chuàng)新在于:

  • 代理以一個基于時間的SLA應(yīng)用于保留策略侣背。當(dāng)消息在代理中超過一定時間后白华,將會被自動刪除。

  • 消費(fèi)者可以故意倒回到老的偏移量再次消費(fèi)數(shù)據(jù)贩耐。雖然這違法了隊列的常見約定弧腥,但常見于許多業(yè)務(wù)中。

與zookeeper的關(guān)系

kafka使用ZooKeeper用于管理潮太、協(xié)調(diào)代理管搪。每個Kafka代理通過Zookeeper協(xié)調(diào)其他Kafka代理。

當(dāng)Kafka系統(tǒng)中新增了代理或某個代理失效時消别,Zookeeper服務(wù)將通知生產(chǎn)者和消費(fèi)者抛蚤。

生產(chǎn)者與消費(fèi)者據(jù)此開始與其他代理協(xié)調(diào)工作。

安裝

wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
tar -xzvf kafka_2.11-0.8.2.1.tgz

配置

config/server.properties

# 如果配置多個kafka節(jié)點(diǎn)寻狂,id需設(shè)置為不同的值
broker.id=1

# !!務(wù)必將host.name配置為ip地址。
# 在java代碼里連接kafka時朋沮,服務(wù)端會把host.name的值傳給zookeeper
# 如果使用默認(rèn)配置的localhost蛇券,會出現(xiàn)連接失敗的異常
host.name=192.168.1.1

port=9092

log.dir=./logs

# 如果有多個zookeeper服務(wù),用,號隔開即可樊拓。
# zookeeper使用默認(rèn)配置的2181端口
zookeeper.connect=192.168.1.1:2181
zookeeper.connection.timeout.ms=6000

啟動

bin/

# 先啟動zookeeper服務(wù)
./zookeeper-server-start.sh ../config/zookeeper.properties &
# 再啟動kafka服務(wù)
./kafka-server-start.sh ../config/server.properties

測試

bin/

創(chuàng)建一個Topic

./kafka-topics.sh --create --zookeeper 192.168.1.1:2181 --replication-factor 2 --partitions 1 --topic test

查看Topic信息

./kafka-topics.sh --describe --zookeeper 192.168.1.1:2181 --topic test

啟動生產(chǎn)者(producer)生產(chǎn)該Topic的消息

./kafka-console-producer.sh --broker-list 192.168.1.1:9092 --topic test

啟動消費(fèi)者(consumer)消費(fèi)該Topic的消息

./kafka-console-consumer.sh --zookeeper 192.168.1.1:2181 --from-beginning -topic test

Java代碼

Producer 測試

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

public class KafkaProducer {
    
    public static void testProducer() {
        Properties props = new Properties();
        props.put("metadata.broker.list", "192.168.1.1:9092");
        props.put("serializer.class", StringEncoder.class.getName());
        //props.put("partitioner.class", );
        props.put("request.required.arks", "1");
        
        ProducerConfig config = new ProducerConfig(props);
        
        Producer<String, String> producer = new Producer<String, String>(config);
        
        String msg = new Date() + " - hello world : 測試 " ;
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", msg);
        producer.send(data);
        producer.close();
        System.out.println("--> producer sended: " + msg);
    }
    
    public static void main(String[] args) {
        testProducer();
    }
}

Consumer 測試

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer {

    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;

    public KafkaConsumer(String a_zookeeper, String a_groupId, String a_topic) {
        this.consumer = kafka.consumer.Consumer
                .createJavaConsumerConnector(createConsumerConfig(a_zookeeper,
                        a_groupId));

        this.topic = a_topic;
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper,
            String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "1000");
        props.put("zookeeper.sync.time.ms", "1000");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");

        return new ConsumerConfig(props);
    }

    public void shutdown() {
        if (consumer != null)
            consumer.shutdown();
        if (executor != null)
            executor.shutdown();
    }

    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
                .createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        System.out.println("streams.size = " + streams.size());

        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }

    public static void main(String[] args) {

        String zooKeeper = "192.168.212.100:2181";
        String groupId = "group1";
        String topic = "test";

        int threads = 3;

        KafkaConsumer example = new KafkaConsumer(zooKeeper, groupId, topic);

        example.run(threads);

    }
    
    public class ConsumerTest implements Runnable {

        private KafkaStream m_stream;
        private int m_threadNumber;

        public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
            m_threadNumber = a_threadNumber;
            m_stream = a_stream;
        }

        public void run() {
            System.out.println("calling ConsumerTest.run()");
            ConsumerIterator<byte[], byte[]> it = m_stream.iterator();

            while (it.hasNext()) {
                System.out.println("--> consumer  Thread " + m_threadNumber + ": "
                        + new String(it.next().message()));
            }

            System.out.println("Shutting down Thread: " + m_threadNumber);
        }
    }

}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末纠亚,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子筋夏,更是在濱河造成了極大的恐慌蒂胞,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件条篷,死亡現(xiàn)場離奇詭異骗随,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)赴叹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進(jìn)店門鸿染,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人乞巧,你說我怎么就攤上這事涨椒。” “怎么了绽媒?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵蚕冬,是天一觀的道長。 經(jīng)常有香客問我是辕,道長囤热,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任免糕,我火速辦了婚禮赢乓,結(jié)果婚禮上忧侧,老公的妹妹穿的比我還像新娘。我一直安慰自己牌芋,他們只是感情好蚓炬,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著躺屁,像睡著了一般肯夏。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上犀暑,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天驯击,我揣著相機(jī)與錄音,去河邊找鬼耐亏。 笑死徊都,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的广辰。 我是一名探鬼主播暇矫,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼择吊!你這毒婦竟也來了李根?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤几睛,失蹤者是張志新(化名)和其女友劉穎房轿,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體所森,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡囱持,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了必峰。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片洪唐。...
    茶點(diǎn)故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖吼蚁,靈堂內(nèi)的尸體忽然破棺而出凭需,到底是詐尸還是另有隱情,我是刑警寧澤肝匆,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布粒蜈,位于F島的核電站,受9級特大地震影響旗国,放射性物質(zhì)發(fā)生泄漏枯怖。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一能曾、第九天 我趴在偏房一處隱蔽的房頂上張望度硝。 院中可真熱鬧肿轨,春花似錦、人聲如沸蕊程。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽藻茂。三九已至驹暑,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間辨赐,已是汗流浹背优俘。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留掀序,地道東北人帆焕。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像森枪,于是被迫代替她去往敵國和親视搏。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 一县袱、Kafka簡介 Kafka (科技術(shù)語)。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)佑力,它可以處理消費(fèi)者規(guī)...
    邊學(xué)邊記閱讀 1,744評論 0 14
  • 原文:InfoQ 作者 郭俊 簡介 Kafka是一種分布式的式散,基于發(fā)布/訂閱的消息系統(tǒng)。使用Scala編寫...
    小小少年Boy閱讀 388評論 0 1
  • ** 今天看了一下kafka官網(wǎng)打颤,嘗試著在自己電腦上安裝和配置暴拄,然后學(xué)一下官方document。** Introd...
    RainChang閱讀 5,005評論 1 30
  • kafka的定義:是一個分布式消息系統(tǒng)编饺,由LinkedIn使用Scala編寫乖篷,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,324評論 1 15
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)透且,斷路器撕蔼,智...
    卡卡羅2017閱讀 134,659評論 18 139