kafka入門及其使用
Kafka是由LinkedIn開發(fā)的一個(gè)分布式基于發(fā)布/訂閱的消息系統(tǒng)岳服,使用Scala編寫钉嘹,它以可水平擴(kuò)展和高吞吐率而被廣泛使用剂娄。
關(guān)于kafka背景
最初是LinkedIn的一個(gè)內(nèi)部基礎(chǔ)設(shè)施系統(tǒng)蠢涝,發(fā)現(xiàn)數(shù)據(jù)庫難以處理持續(xù)數(shù)據(jù)流,因此產(chǎn)生了kafka阅懦,一開始用于社交網(wǎng)絡(luò)的實(shí)時(shí)應(yīng)用和數(shù)據(jù)流中和二。
可以認(rèn)為kafka是一個(gè)流平臺(tái):在這個(gè)平臺(tái)可以發(fā)布和訂閱流.并將它保存、處理
可以作為消息系統(tǒng)耳胎、有點(diǎn)像實(shí)時(shí)版的hadoop惯吕,支持集群、高性能擁有諸多優(yōu)點(diǎn)
消息中間件是干啥用的怕午,有啥好處
通過消息隊(duì)列達(dá)到將業(yè)務(wù)異步解耦废登,設(shè)計(jì)變得更簡(jiǎn)單可以分布式,通過消息一致性【只要不丟失消息】保證數(shù)據(jù)最終到用戶郁惜。增加業(yè)務(wù)系統(tǒng)異步能力堡距,較小并發(fā)問題。比如驗(yàn)證碼發(fā)送到用戶兆蕉。
【生產(chǎn)】和【消費(fèi)】速度或穩(wěn)定性不一致是使用消息中間件的重要原因
這邊舉個(gè)網(wǎng)上的例子
http://orchome.com/kafka/index
舉個(gè)例子羽戒,生產(chǎn)者消費(fèi)者,生產(chǎn)者生產(chǎn)雞蛋虎韵,消費(fèi)者消費(fèi)雞蛋易稠,生產(chǎn)者生產(chǎn)一個(gè)雞蛋,消費(fèi)者就消費(fèi)一個(gè)雞蛋包蓝,假設(shè)消費(fèi)者消費(fèi)雞蛋的時(shí)候噎住了(系統(tǒng)宕機(jī)了)驶社,生產(chǎn)者還在生產(chǎn)雞蛋企量,那新生產(chǎn)的雞蛋就丟失了。再比如生產(chǎn)者很強(qiáng)勁(大交易量的情況)亡电,生產(chǎn)者1秒鐘生產(chǎn)100個(gè)雞蛋届巩,消費(fèi)者1秒鐘只能吃50個(gè)雞蛋,那要不了一會(huì)逊抡,消費(fèi)者就吃不消了(消息堵塞姆泻,最終導(dǎo)致系統(tǒng)超時(shí)),消費(fèi)者拒絕再吃了冒嫡,”雞蛋“又丟失了,這個(gè)時(shí)候我們放個(gè)籃子在它們中間四苇,生產(chǎn)出來的雞蛋都放到籃子里孝凌,消費(fèi)者去籃子里拿雞蛋,這樣雞蛋就不會(huì)丟失了月腋,都在籃子里蟀架,而這個(gè)籃子就是”kafka“。
雞蛋其實(shí)就是“數(shù)據(jù)流”榆骚,系統(tǒng)之間的交互都是通過“數(shù)據(jù)流”來傳輸?shù)模ň褪莟cp片拍、http什么的),也稱為報(bào)文妓肢,也叫“消息”捌省。
kafka原理
幾個(gè)基本術(shù)語
Topic
Kafka將消息種子(Feed)分門別類,每一類的消息稱之為一個(gè)主題(Topic).kafka集群存儲(chǔ)消息是以top為類別記錄的
Producer
發(fā)布消息的對(duì)象稱之為主題生產(chǎn)者(Kafka topic producer)
Consumer
訂閱消息并處理發(fā)布的消息的種子的對(duì)象稱之為主題消費(fèi)者(consumers)
Broker
已發(fā)布的消息保存在一組服務(wù)器中碉钠,稱之為Kafka集群纲缓。集群中的每一個(gè)服務(wù)器都是一個(gè)代理(Broker). 消費(fèi)者可以訂閱一個(gè)或多個(gè)主題(topic),并從Broker拉數(shù)據(jù)喊废,從而消費(fèi)這些已發(fā)布的消息祝高。
4個(gè)核心API
應(yīng)用程序使用 Producer API
發(fā)布消息到1個(gè)或多個(gè)topic(主題)。
應(yīng)用程序使用 Consumer API
來訂閱一個(gè)或多個(gè)topic污筷,并處理產(chǎn)生的消息工闺。
應(yīng)用程序使用 Streams API
充當(dāng)一個(gè)流處理器,從1個(gè)或多個(gè)topic消費(fèi)輸入流瓣蛀,并生產(chǎn)一個(gè)輸出流到1個(gè)或多個(gè)輸出topic陆蟆,有效地將輸入流轉(zhuǎn)換到輸出流。
Connector API
允許構(gòu)建或運(yùn)行可重復(fù)使用的生產(chǎn)者或消費(fèi)者揪惦,將topic連接到現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)遍搞。例如,一個(gè)關(guān)系數(shù)據(jù)庫的連接器可捕獲每一個(gè)變化器腋。
kafka對(duì)比rabbitmq
就接觸過這兩個(gè)溪猿,簡(jiǎn)單對(duì)比了下
kafka內(nèi)部使用的zk達(dá)到分布式一致性钩杰,構(gòu)建分布式擴(kuò)展消息系統(tǒng)、且具有非常高的數(shù)據(jù)吞吐量诊县,底層采用scala編寫
因此對(duì)實(shí)時(shí)性要求高的話kafka是個(gè)不錯(cuò)的選擇
RabbitMQ是采用Erlang語言實(shí)現(xiàn)的AMQP協(xié)議的消息中間件讲弄,最初起源于金融系統(tǒng),用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息
其它幾個(gè)重要區(qū)別
都支持持久化依痊,kafka消息堆積效率更高避除,rmq積壓大會(huì)影響性能
rabbitmq常用于金融場(chǎng)景,具有較高嚴(yán)謹(jǐn)性胸嘁、安全性瓶摆,據(jù)說不會(huì)丟消息,但是高版本的kafka也支持性宏,雖說吞吐量?jī)?yōu)勢(shì)很大群井,但是嚴(yán)謹(jǐn)性不如amq,由于kafka保證每條消息最少發(fā)送一次毫胜,因此有重復(fù)發(fā)消息的可能书斜。
kafka安裝
#新建kafka用戶
adduser kafka
#為其設(shè)置密碼
passwd kafka
#賦予root權(quán)限
#方式1
修改/etc/sudoers,去掉對(duì)%wheel ALL=(ALL) ALL的注釋
然后 usermod -g root kafka使其屬于root組
#方式2
用root用戶 在root ALL=(ALL) ALL下一行加
kafka ALL=(ALL) ALL
#之后使用sudo即可獲取權(quán)限
#解壓kafka包
tar zxvf kafka_2.11-2.0.0.tgz
#先啟動(dòng)zk服務(wù)【kafka安裝包中自帶酵使,再啟用kafka服務(wù) 否則啟動(dòng)kafka服務(wù)將報(bào)連接錯(cuò)誤問題(kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING)】
#防火墻需開啟默認(rèn)需要的端口 zk:2181 kafka:9092
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
#停止kafka【需先停止kafka,再停zk】
bin/kafka-server-stop.sh
bin/zookeeper-server-stop.sh
安裝可視化管理工具kafka-manage
為了簡(jiǎn)化維護(hù)kafka集群荐吉,yahoo創(chuàng)建該web工具,github地址
環(huán)境要求java8+,zk 2+
安裝參考地址
https://www.cnblogs.com/frankdeng/p/9584870.html
https://www.cnblogs.com/dadonggg/p/8205302.html
#下載
git clone https://github.com/yahoo/kafka-manager kafka-manager #該安裝方式前提需要有g(shù)it【沒有的話yum install git-core 此處為centos系統(tǒng)】
#進(jìn)入kafka-manager 這步需要編譯要比較久【因?yàn)閟bt要越過GFW去拉取相關(guān)依賴庫,配置sbt源或許會(huì)快點(diǎn),這種用sbt的方式比較蛋疼搞了快半小時(shí)】
./sbt clean dist
#沒有sbt的話需安裝
curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
sudo mv bintray-sbt-rpm.repo /etc/yum.repos.d/
sudo yum install sbt
#解壓編譯好的
unzip kafka-manager-1.3.3.22.zip
生成zip即成功【很久很蛋疼】
配置與啟動(dòng)相關(guān)
#編輯kafka-manage內(nèi)部的 本機(jī)路徑為/home/kafka/kafka-manager/target/universal/kafka-manager-1.3.3.22/conf
#編輯配置文件
application.conf
如果zk地址不為本機(jī)或?yàn)榧耗J?可修改
#kafka-manager.zkhosts="localhost:2181" ##注釋這一行口渔,下面添加一行
kafka-manager.zkhosts="依賴zk的地址"
#啟動(dòng)kafka-manage 指定9099端口啟動(dòng)
nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9099 &
#查看啟動(dòng)日志
tail -f nohup.out
這邊第一次啟動(dòng)報(bào)錯(cuò)样屠,并且日志瘋狂報(bào)[error] k.m.ApiError$ - error : Ask timed out on [ActorSelection[Anchor(akka://kafka-manager-system/), Path(/user/kafka-manager)]] after [5000 ms]錯(cuò)
就是因?yàn)闆]有配對(duì)kafka-manager.zkhosts地址 改為localhost:2181 [根據(jù)自己zk(單機(jī)/集群)地址來]
之后Cluster添加幾個(gè)配置,例如我這邊
之后點(diǎn)最下方的保存
為什么需要依賴zk
zk作為去中心化的集群模式
需要要消費(fèi)者知道現(xiàn)在那些生產(chǎn)者(對(duì)于消費(fèi)者而言搓劫,kafka就是生產(chǎn)者)是可用的瞧哟,如果沒有zk消費(fèi)者將不知道去哪里消費(fèi),這里zk作為解決分布式一致性問題的工具枪向,這里可以理解為kafka將zk作為數(shù)據(jù)庫使用
kafka常用命令
https://www.cnblogs.com/dragkiss/p/5668019.html
#查看kafka版本
find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
#查詢topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
#新建命名為garwer的topic主題 –partitions指定分區(qū)數(shù)勤揩,這個(gè)參數(shù)需要根據(jù)broker數(shù)和數(shù)據(jù)量決定,正常情況下秘蛔,每個(gè)broker上兩個(gè)partition最好陨亡; –replication-factor指定partition的replicas數(shù),建議設(shè)置為2深员;
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic garwer
#展示特定topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic garwer
#啟動(dòng)生產(chǎn)者 往garwer主題發(fā)送一些消息
bin/kafka-console-producer.sh --topic garwer --broker-list 47.98.176.212:9092
#啟動(dòng)消費(fèi)者 消費(fèi)garwer主題消息
#bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic garwer --from-beginnin #可能老版本用這個(gè) 這邊不行
#創(chuàng)建組為linjiawei topic為garwer的消費(fèi)者端
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group linjiawei --topic garwer
#刪除topic
刪除topicbin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic garwer
刪除topic中存儲(chǔ)的內(nèi)容在config/server.properties中找到如下的位置
#kafka日志默認(rèn)保存路徑
springboot結(jié)合kafka的簡(jiǎn)單程序
java.io.IOException: Can't resolve address: aliyun-spark:9092 【這里剛開始用localhost被映射到這個(gè)地址负蠕,導(dǎo)致外網(wǎng)無法訪問】
可能會(huì)出現(xiàn)以下場(chǎng)景場(chǎng)景:kafka連接后在使用主機(jī)名導(dǎo)致連接失敗
這是因?yàn)闀?huì)先根據(jù)ip獲取主機(jī)名,由于這邊是外網(wǎng)倦畅,不能這樣需修改配置
當(dāng)Kafka broker啟動(dòng)時(shí)遮糖,它會(huì)在ZK上注冊(cè)自己的IP和端口號(hào),客戶端就通過這個(gè)IP和端口號(hào)來連接叠赐。
在AWS這種IaaS環(huán)境下欲账,由于`java.net.InetAddress.getCanonicalHostName`調(diào)用拿到的HostName是主機(jī)名屡江,所以默認(rèn)注冊(cè)到ZK上的是主機(jī)名
#具體配置如下
advertised.listeners=PLAINTEXT://ip:9092 #注意這邊的ip填自己的 我這邊是外網(wǎng) 用外網(wǎng)ip
配置server.properties完并重啟kafka、zk
依賴jar
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.2.3.RELEASE'
compile('org.projectlombok:lombok:1.18.2')
controller層
package com.garwer.kafka.product.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: Garwer
* @Date: 19/2/26 下午10:43
* @Version 1.0
*/
@RestController
@RequestMapping("/product")
@Slf4j
public class ProductController {
@Autowired
private KafkaTemplate kafkaTemplate;
@GetMapping("/sendMsg")
public void sendMsg(@RequestParam String msg) {
ListenableFuture future = kafkaTemplate.send("garwer", msg);
future.addCallback(o -> log.info("發(fā)送消息:{} success",msg), Throwable::printStackTrace);
}
}
yml簡(jiǎn)單配置
server:
port: 8083
spring:
kafka:
producer:
#這邊會(huì)先連47.98.176.212:9092 再匹配相應(yīng)的hostname 這邊我用aliyun外網(wǎng) 需修改kafka中的server.properties配置
bootstrap-servers: 47.98.176.212:9092
可能是由于重啟kafka赛不,這邊kafka-mange掛了也需要做重啟
#查看所有消費(fèi)組
bin/kafka-consumer-groups.sh --command-config config/consumer.properties --bootstrap-server localhost:9092 --list
#查看消費(fèi)情況
bin/kafka-consumer-groups.sh --command-config config/consumer.properties --describe --bootstrap-server localhost:9092 --group linjiawei
#查看實(shí)時(shí)消費(fèi)數(shù)據(jù)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic garwer
Kafka授權(quán)為kafka添加用戶/密碼-SASL配置
這部分網(wǎng)上資料大都講得好亂惩嘉,這邊自己整理并實(shí)踐了下
為了安全,最好像數(shù)據(jù)庫那樣有用戶/密碼的校驗(yàn)踢故,否則放開了防火墻文黎、安全組誰都能訪問并發(fā)送/消費(fèi)很不安全。
kafka提供了SASL/PLAIN配置來配置
Kafka使用Java認(rèn)證和授權(quán)服務(wù)(JAAS)進(jìn)行SASL配置殿较。
這邊我是單機(jī)模式耸峭,沒試過集群,看網(wǎng)上資料大致相同
參考:https://blog.csdn.net/u012842205/article/details/73188684
https://blog.csdn.net/javastart/article/details/78498884
為了不影響原有配置淋纲,普通方式和saal方式區(qū)分抓艳,這里采用新建文件、并以采用saal校驗(yàn)的sh啟動(dòng)方式
#先關(guān)閉kafka
bin/kafka-server-stop.sh
bin/zookeeper-server-stop.sh
#進(jìn)入kafka的config目錄
步驟1 基于server.properties新建server-saal.properties
cd config
#為了不影響原先的server.properties 新建一個(gè)文件 后續(xù)要用saal模式啟動(dòng)的話也采用這個(gè)
cp server.properties server-saal.properties
更改advertised.listeners 否則啟動(dòng)server會(huì)報(bào)inter.broker.listener.name must be a listener name defined in advertised.lis
#advertised.listeners=PLAINTEXT://47.98.176.212:9092
#這邊要用listeners 而且要用內(nèi)網(wǎng)ip而不是跟之前一樣用外網(wǎng)org.apache.kafka.common.KafkaException: Socket server failed to bind to 47.98.176.212:9092: Cannot assign requested address.
advertised.listeners=SASL_PLAINTEXT://47.98.176.212:9092
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin;User:garwer;User:alice
步驟2 config下新建文件
#1 kafka_server_jaas.conf
touch kafka_server_jaas.conf
#在文件內(nèi)配置以下內(nèi)容
# Kafka 定義了關(guān)鍵字KafkaServer字段用于指定服務(wù)端登錄配置
#這邊配置兩個(gè)用戶 一個(gè)admin 密碼為garwer 一個(gè)alice 密碼為alice user_garwer="garwer"指用戶為garwer 密碼為garwer
#兩個(gè)屬性帚戳,username和password,其中username是配置Zookeeper節(jié)點(diǎn)之間內(nèi)部認(rèn)證的用戶名儡首,password是對(duì)應(yīng)的密碼片任。
# 用戶通過usemame 和password 指定該代理與集群其他代理初始化連接的用戶名和密碼, 通過“ user_"為前綴后接用戶名方式創(chuàng)建連接代理的用戶名和密碼
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_garwer="garwer"
user_alice="alice";
};
#2 kafka_cilent_jaas.conf
touch kafka_client_jaas.conf
#在kafkaClient部分 username和password是配置連接broke的用戶 這邊即為kafka
#添加
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
步驟3 進(jìn)入bin目錄下新建生產(chǎn)者和消費(fèi)者sh文件
#添加帶有saal校驗(yàn)的kafka-server-start.sh文件
#為了不影響原有腳本蔬胯,同時(shí)也是為了備份 這邊通過復(fù)制
cp kafka-server-start.sh kafka-server-start-saal.sh
#之后修改kafka-server-start-saal.sh 在合理位置處【不要在最后一行】加上
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.0.0/config/kafka_server_jaas.conf"
cp kafka-console-consumer.sh kafka-console-consumer-saal.sh
#同上 加一行步驟2配置的kafka_client_jaas.conf
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.0.0/config/kafka_client_jaas.conf"
cp kafka-console-producer.sh kafka-console-producer-saal.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.0.0/config/kafka_cilent_jaas.conf"
到此為止对供,saal基本配置已完成,還可以配置多節(jié)點(diǎn)zk認(rèn)證氛濒,這邊沒有产场,操作類似
校驗(yàn)saal配置
以saal的方式啟動(dòng)
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start-saal.sh config/server-saal.properties &
#啟動(dòng)saal認(rèn)證生產(chǎn)者 如果沒有正確的配置用戶密碼 會(huì)一直報(bào)invalid校驗(yàn)用戶密碼錯(cuò)誤,【Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)可以參考https://stackoverflow.com/questions/39521691/kafka-authentication-producer-unable-to-connect-producer】
#啟動(dòng)生產(chǎn)者
bin/kafka-console-producer-saal.sh --broker-list 47.98.176.212:9092 --topic garwer --producer.config config/producer-saal.properties
#啟動(dòng)消費(fèi)者 創(chuàng)建組為linjiawei topic為garwer的消費(fèi)者端 這邊不知道為啥加group不行
bin/kafka-console-consumer-saal.sh --bootstrap-server 47.98.176.212:9092 --topic garwer --consumer.config config/consumer-saal.properties
java端校驗(yàn)
自此完成校驗(yàn)
總結(jié)
關(guān)于kafka,官方文檔已經(jīng)有很多資料舞竿,博主也是初學(xué)不久入個(gè)門京景,由于近期要把kafka放到個(gè)人做的一個(gè)程序中,用本機(jī)感覺麻煩,反正后面都要上生產(chǎn)骗奖,還不如就在生產(chǎn)搞确徙,但是用生產(chǎn)的話用外網(wǎng)又比較蛋疼【特別是sasl校驗(yàn)這塊】,但為了安全【密碼博主后續(xù)有做修改】也是無奈执桌。