docker 部署kafka(測試環(huán)境)

  1. zookeeper 鏡像選擇官方鏡像
docker pull zookeeper
docker run --name zoo -p 2181:2181 -d zookeeper
  1. kafka 鏡像選擇 bitnami/kafka
docker pull bitnami/kafka
docker run --name kafka -p 9092:9092  -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.3:2181 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -d  bitnami/kafka

docker容器部署必須指定以下環(huán)境變量:

  • KAFKA_ZOOKEEPER_CONNECT 指定 zookeeper 的地址:端口。
  • ALLOW_PLAINTEXT_LISTENER 允許使用PLAINTEXT偵聽器褥民。
  • KAFKA_ADVERTISED_LISTENERS 是指向Kafka代理的可用地址列表爸黄。 Kafka將在初次連接時將它們發(fā)送給客戶葱轩。格式為 PLAINTEXT://host:port 屠阻,此處已將容器9092端口映射到宿主機9092端口,所以host指定為localhost策菜,便可在宿主機執(zhí)行測試程序連接 kafka晶疼。
  • KAFKA_LISTENERS 是 Kafka 代理將偵聽傳入連接的地址列表。格式為 PLAINTEXT://host:port 又憨, 0.0.0.0代表接受所有地址翠霍。設(shè)置了上個變量就要設(shè)置此變量。
  1. 使用 docker-compose 集群部署
    docker-compose.yml
version: '2'

services:
  zoo1:
    image: zookeeper
    container_name: zoo
    ports:
      - 2181:2181

  kafka1:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    container_name: kafka1
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zoo1:2181
      - KAFKA_BROKER_ID=1
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.152.48:9092
    depends_on:
      - zoo1

  kafka2:
    image: 'bitnami/kafka:latest'
    ports:
      - '9093:9092'
    container_name: kafka2
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zoo1:2181
      - KAFKA_BROKER_ID=2
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.152.48:9093
    depends_on:
      - zoo1

  kafka3:
    image: 'bitnami/kafka:latest'
    ports:
      - '9094:9092'
    container_name: kafka3
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zoo1:2181
      - KAFKA_BROKER_ID=3
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.152.48:9094
    depends_on:
      - zoo1

將 KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.152.48:9094 中的 192.168.152.48 替換為docker宿主機的 ip 蠢莺。

  1. golang 連接 kafka
    此處使用 Shopify/sarama
go get github.com/Shopify/sarama

生產(chǎn)者

package main

import (
    "fmt"
    "time"

    "github.com/Shopify/sarama"
)

func main() {
    addrs := []string{"192.168.152.48:9092", "192.168.152.48:9093", "192.168.152.48:9094"}
    //addrs := []string{"192.168.152.48:9092"}
    config := sarama.NewConfig()
    config.Version = sarama.V2_1_0_0
    admin, err := sarama.NewClusterAdmin(addrs, config)
    if err != nil {
        fmt.Println(err)
    }
    err = admin.CreateTopic("tp33", &sarama.TopicDetail{NumPartitions: 1, ReplicationFactor: 3}, false)
    if err != nil {
        fmt.Println(err)
    }

    err = admin.Close()
    if err != nil {
        fmt.Println(err)
    }

    producer, err := sarama.NewSyncProducer(addrs, nil)
    if err != nil {
        fmt.Println(err)
    }
    defer func() {
        if err := producer.Close(); err != nil {
            fmt.Println(err)
        }
    }()

    msg := &sarama.ProducerMessage{Topic: "tp33", Value: sarama.StringEncoder("testing 123")}
    for {
        partition, offset, err := producer.SendMessage(msg)
        if err != nil {
            fmt.Println("failed to send message: ", err)
        } else {
            fmt.Printf("message sent to partition %d at offset %d\n", partition, offset)
        }
        time.Sleep(1500 * time.Millisecond)
    }
}

消費者

package main

import (
    "fmt"
    "os"
    "os/signal"

    "github.com/Shopify/sarama"
)

func main() {
    addrs := []string{"192.168.152.48:9092", "192.168.152.48:9093", "192.168.152.48:9094"}
    //addrs := []string{"192.168.152.48:9092"}
    consumer, err := sarama.NewConsumer(addrs, nil)
    if err != nil {
        panic(err)
    }
    defer func() {
        if err := consumer.Close(); err != nil {
            fmt.Println(err)
        }
    }()

    partitionConsumer, err := consumer.ConsumePartition("tp33", 0, sarama.OffsetNewest)
    if err != nil {
        panic(err)
    }
    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            fmt.Println(err)
        }
    }()

    signals := make(chan os.Signal)
    signal.Notify(signals, os.Interrupt)

    consumed := 0
ConsumerLoop:
    for {
        select {
        case msg := <-partitionConsumer.Messages():
            fmt.Println("Consumed message offset", msg.Offset)
            fmt.Println(string(msg.Value))
            consumed++
        case <-signals:
            break ConsumerLoop
        }
    }

    fmt.Println("Consumed:", consumed)
}

  1. 啟動生產(chǎn)者
  • 在宿主機執(zhí)行以下命令(其中 kafka1 是上邊 yml 文件中的 container_name )進入容器 kafka1
docker exec -it kafka1 /bin/bash
cd opt/bitnami/kafka/bin
  • 執(zhí)行以下命令創(chuàng)建 topic
./kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 3 --partitions 3 --topic tp33

  • 執(zhí)行以下命令查看 topic 列表
./kafka-topics.sh --bootstrap-server localhost:9092 --list

輸出
tp33

  • 執(zhí)行以下命令查看指定 topic 相關(guān)信息
./kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic tp33

輸出
Topic:tp33 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: tp33 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
第一行給出了所有分區(qū)的摘要寒匙,每個附加行提供有關(guān)一個分區(qū)的信息。由于此主題只有一個分區(qū)躏将,因此只有一行锄弱。
“l(fā)eader”是負責給定分區(qū)的所有讀寫的節(jié)點。每個節(jié)點將成為隨機選擇的分區(qū)部分的領(lǐng)導(dǎo)者祸憋。
“replicas”是復(fù)制此分區(qū)日志的節(jié)點列表会宪,無論它們是否為領(lǐng)導(dǎo)者,或者即使它們當前處于活動狀態(tài)蚯窥。
“isr”是“同步”復(fù)制品的集合掸鹅。這是副本列表的子集,該列表當前處于活躍狀態(tài)并且已經(jīng)被領(lǐng)導(dǎo)者捕獲拦赠。

  • 執(zhí)行以下命令像指定 topic 發(fā)送消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic tp33
>this test1
>test2
  • 執(zhí)行以下命令消費消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic tp33
this test1
test2
  • 現(xiàn)在讓我們測試一下容錯性巍沙。broker3 充當 leader 所以讓我們殺了它:
docker container rm kafka3

此時 kafka3 節(jié)點已經(jīng)關(guān)閉,但我們依舊可以發(fā)送/接受消息荷鼠。
再次查看 topic 相關(guān)信息

./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic tp33

輸出
Topic:tp33 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: tp33 Partition: 0 Leader: 2 Replicas: 3,2,1 Isr: 2,1

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末句携,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子颊咬,更是在濱河造成了極大的恐慌务甥,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件喳篇,死亡現(xiàn)場離奇詭異敞临,居然都是意外死亡,警方通過查閱死者的電腦和手機麸澜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門挺尿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事编矾∈焓罚” “怎么了?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵窄俏,是天一觀的道長蹂匹。 經(jīng)常有香客問我,道長凹蜈,這世上最難降的妖魔是什么限寞? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮仰坦,結(jié)果婚禮上履植,老公的妹妹穿的比我還像新娘。我一直安慰自己悄晃,他們只是感情好玫霎,可當我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著妈橄,像睡著了一般庶近。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上眷蚓,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天拦盹,我揣著相機與錄音,去河邊找鬼溪椎。 笑死,一個胖子當著我的面吹牛恬口,可吹牛的內(nèi)容都是我干的校读。 我是一名探鬼主播,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼祖能,長吁一口氣:“原來是場噩夢啊……” “哼歉秫!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起养铸,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤雁芙,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后钞螟,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體兔甘,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年鳞滨,在試婚紗的時候發(fā)現(xiàn)自己被綠了洞焙。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖澡匪,靈堂內(nèi)的尸體忽然破棺而出熔任,到底是詐尸還是另有隱情,我是刑警寧澤唁情,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布疑苔,位于F島的核電站,受9級特大地震影響甸鸟,放射性物質(zhì)發(fā)生泄漏惦费。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一哀墓、第九天 我趴在偏房一處隱蔽的房頂上張望趁餐。 院中可真熱鬧,春花似錦篮绰、人聲如沸后雷。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽臀突。三九已至,卻和暖如春贾漏,著一層夾襖步出監(jiān)牢的瞬間候学,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工纵散, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留梳码,地道東北人。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓伍掀,卻偏偏與公主長得像掰茶,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子蜜笤,可洞房花燭夜當晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容