docker搭建Kafka集群及監(jiān)控、可視化部署實(shí)戰(zhàn)

下載zookeeper鏡像

docker pull wurstmeister/zookeeper

下載kafka鏡像

docker pull wurstmeister/kafka

啟動(dòng)zk鏡像生成容器

docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2  --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper

啟動(dòng)kafka1鏡像生成容器

docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka  -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.31.131:2181   -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.31.131:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092   -v /etc/localtime:/etc/localtime wurstmeister/kafka

啟動(dòng)kafka2鏡像生成容器

docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka  -p 9093:9093 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=192.168.31.131:2181   -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.31.131:9093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093   -v /etc/localtime:/etc/localtime wurstmeister/kafka

查看docker進(jìn)程

docker ps -a

向kafka docker中拷貝測(cè)試數(shù)據(jù)日志文件

docker cp /home/test/test.log kafka:/opt

進(jìn)入kafka docker進(jìn)程中,就可以使用命令操作kafka

docker exec -it kafka bash

進(jìn)入kafka docker的進(jìn)程中,執(zhí)行命令向kafka寫入test.log的測(cè)試數(shù)據(jù)

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < /opt/test.log

用代碼操作kafka
生產(chǎn)消息

public class KafkaProducerService {
    public static Properties props = new Properties();
    public final static String topic = "test";
    static {
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.31.131:9092,192.168.31.131:9093");
        props.put(ProducerConfig.ACKS_CONFIG,"all");
        props.put(ProducerConfig.RETRIES_CONFIG,"3");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
        props.put(ProducerConfig.LINGER_MS_CONFIG,"1");
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    }
    public static Runnable runnable = () -> {
        try {
            Producer<String,String> producer = new KafkaProducer<>(props);
            for(int i=0;i<1000;i++){
                ProducerRecord<String,String> record =
                        new ProducerRecord<>(topic,"key-"+i,"kafka-value-"+i);
                producer.send(record, (recordMetadata, e) -> {
                      if (e==null){
                        System.out.println("消息發(fā)送成功");
                        System.out.println("partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset()+",topic"+recordMetadata.topic());
                    }else {
                        System.out.println("消息發(fā)送失敗");
                    }
                });
            }
            // 所有的通道打開都需要關(guān)閉
            producer.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    };
      public static void runService() {
        int producer_num = 10;
        ExecutorService executor = Executors.newFixedThreadPool(producer_num);
        for (int i=0;i<producer_num;i++){
            executor.submit(runnable);
        }
    }
}

消費(fèi)消息

@Slf4j
public class KafkaConsumerService {
    public static Properties props = new Properties();
    public final static String topic = "test";
    static {
        props.put("bootstrap.servers","192.168.31.131:9092,192.168.31.131:9093");
        props.put("group.id", "test_consumer");
        props.put("enable.auto.commit", "true");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("auto.offset.reset", "latest");
        props.put("deserializer.encoding", "UTF-8");
    }

   public static Runnable runnable = () -> {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(10000);
                records.partitions().forEach(topicPartition -> {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
                    partitionRecords.forEach(record -> {
                        log.info("kafka的消費(fèi)日志{}",record.toString());
                    });
                });
            }
    };
     public static void runService() {
        int producer_num = 2;
        ExecutorService executor = Executors.newFixedThreadPool(producer_num);
        for (int i=0;i<producer_num;i++){
            executor.submit(runnable);
        }
    }
}

kafka可視化工具 offsetexplorer

下載地址:http://www.kafkatool.com/download.html

kafka監(jiān)控工具 Kafka Eagle

下載地址:http://download.kafka-eagle.org/
解壓出來(lái)的路徑:/usr/local/kafka-eagle-web-2.0.6
修改配置

vim /usr/local/kafka-eagle-web-2.0.6/conf/system-config.properties

修改的地方是cluster1.zk.list和kafka.eagle.url

kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.31.131:2181
....
kafka.eagle.webui.port=8048
kafka.eagle.url=jdbc:sqlite:/usr/local/kafka-eagle-web-2.0.6/db/ke.db

添加環(huán)境變量

vim ~/.bash_profile

export KE_HOME=/usr/local/kafka-eagle-web-2.0.6
export PATH=$KE_HOME/bin:$PATH

source ~/.bash_profile

進(jìn)入kafka docker中修改kafka-server-sta

docker exec -it kafka bash
docker exec -it kafka2 bash
cd /opt/kafka_2.13-2.7.0/bin/
vim kafka-server-start.sh

添加配置

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    # 這里的端口不一定非要設(shè)置成9999,端口只要可用,均可。
    export JMX_PORT="9999" 
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

啟動(dòng)程序

chmod a+x /usr/local/kafka-eagle-web-2.0.6/bin/*
./ke.sh start
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末勿她,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子阵翎,更是在濱河造成了極大的恐慌逢并,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件郭卫,死亡現(xiàn)場(chǎng)離奇詭異砍聊,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)贰军,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門玻蝌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人词疼,你說(shuō)我怎么就攤上這事俯树。” “怎么了贰盗?”我有些...
    開封第一講書人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵许饿,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我舵盈,道長(zhǎng)陋率,這世上最難降的妖魔是什么球化? 我笑而不...
    開封第一講書人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮翘贮,結(jié)果婚禮上赊窥,老公的妹妹穿的比我還像新娘爆惧。我一直安慰自己狸页,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開白布扯再。 她就那樣靜靜地躺著芍耘,像睡著了一般。 火紅的嫁衣襯著肌膚如雪熄阻。 梳的紋絲不亂的頭發(fā)上斋竞,一...
    開封第一講書人閱讀 51,624評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音秃殉,去河邊找鬼坝初。 笑死,一個(gè)胖子當(dāng)著我的面吹牛钾军,可吹牛的內(nèi)容都是我干的鳄袍。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼吏恭,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼拗小!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起樱哼,我...
    開封第一講書人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤哀九,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后搅幅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體阅束,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年茄唐,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了围俘。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡琢融,死狀恐怖界牡,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情漾抬,我是刑警寧澤宿亡,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站纳令,受9級(jí)特大地震影響挽荠,放射性物質(zhì)發(fā)生泄漏克胳。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一圈匆、第九天 我趴在偏房一處隱蔽的房頂上張望漠另。 院中可真熱鬧,春花似錦跃赚、人聲如沸笆搓。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)满败。三九已至,卻和暖如春叹括,著一層夾襖步出監(jiān)牢的瞬間算墨,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工汁雷, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留净嘀,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓侠讯,卻偏偏與公主長(zhǎng)得像挖藏,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子继低,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容