問題引入
- 數據生產者的分組策略膜廊?
- 如何保證數據的完全生產?
- partition數量和broker數量關系?
- 每個partition的數據如何保存到硬盤亮隙?
- Kafka有什么獨特的特點?
- 消費者如何標記消費狀態(tài)垢夹?
- 消費者負載均衡的策略咱揍?
- 如何保證消費者消費的數據是有序的?
生產者是集群模式棚饵,全局序號管理器煤裙。
broker端只設置一個partition,kafka的高并發(fā)下的負載均衡噪漾。
消費者如果是一個組硼砰,如何保證消息有序?消費者來一個線程(自定義一個數據結構來排序)
模擬應用需求
- 采集訂單系統應用打印的日志文件欣硼。
日志文件使用log4j生成题翰,滾動生成。使用tail -F xxx.log
來監(jiān)控文件名稱诈胜,理解tail -f和tail -F的區(qū)別豹障。
- 將采集的日志文件保存到Kafka中。
(source)輸入:tail -F xxx.log
(channel)存儲:內存
(sink)輸出:Kafka
config樣例焦匈,
a1.source = s1
a1.channel = c1
a1.sink = k1
source exec tail -F xxx.log
channel RAM
sink xxxx.xxxx.xxxx.KafkaSink // 該類必須存放lib目錄
sink.topic = orderMq
sink.itcast = itcast
map = getConfig();
value = map.get("itcast")
- 通過Storm程序消費Kafka中數據血公。
KafkaSpout
Bolt1()
Bolt2()
業(yè)務模擬:統計雙十一當前的訂單金額、訂單數量缓熟、訂單人數累魔。訂單金額/數量/人數(整個網站、各個業(yè)務線够滑、各個品類垦写、各個店鋪、各個品牌彰触、每個商品)梯投。
環(huán)境配置
應用安裝的一般流程:下載、解壓、配置分蓖、分發(fā)吮龄。
在Flume官方網站下載Flume,解壓Flume安裝包咆疗,
tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /export/servers/
cd /export/servers/
ln -s apache-flume-1.6.0-bin flume
配置Flume環(huán)境變量漓帚,
vi /etc/profile
export FLUME_HOME=/export/servers/apache-flume-1.6.0-bin
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZK_HOME/bin:$STORM_HOME/bin:$KAFKA_HOME/bin:$FLUME_HOME/bin
source /etc/profile
創(chuàng)建Flume配置文件昧辽,
cd /export/servers/flume/conf/
mkdir myconf
cd myconf/
vi exec.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/data/flume_sources/click_log/1.log
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = orderMq
a1.sinks.k1.brokerList = kafka01:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
準備模擬應用日志數據的目錄,
mkdir -p /export/data/flume_sources/click_log
通過腳本模擬生產應用日志數據,
# click_log_out.sh
for((i=0;i<=50000;i++));
do echo "message-"+$i >> /export/data/flume_sources/click_log/1.log;
done
chomd +x click_log_out.sh
打通所有流程,第一步拿撩,啟動zk集群涎显,
zkServer.sh start
zkServer.sh status
第二步讨勤,封裝Kafka集群啟動和停止的腳本屉来,啟動Kafka集群蝶桶,
start-kafka.sh
第三步恢共,啟動Flume客戶端,監(jiān)控日志數據生成旁振,
./bin/flume-ng agent -n a1 -c conf -f conf/myconf/exec.conf -Dflume.root.logger=INFO,console
第四步获询,創(chuàng)建一個topic并開啟consumer,在客戶端模擬消費拐袜,
kafka-console-consumer.sh --zookeeper zk01:2181 --topic orderMq
第五步吉嚣,執(zhí)行應用日志數據生產腳本,
sh click_log_out.sh
整合Storm程序的bug解決
服務端沒有啟動ZooKeeper蹬铺,
ERROR org.apache.curator.ConnectionState - Connection timed out for connection string (zk01:2181,zk02:2181,zk03:2181) and timeout (15000) / elapsed (15071)
本地調試Storm程序尝哆,本機沒有配置kafka的解析,
kafka.consumer.SimpleConsumer - Reconnect due to error:
java.nio.channels.ClosedChannelException: null
放到Storm集群運行甜攀,相關環(huán)境和jar包沖突秋泄,把Storm相關的依賴去掉。
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
Exception in thread "main" java.lang.ExceptionInInitializerError
at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:106)
at cn.itcast.storm.kafkaAndStorm.KafkaAndStormTopologyMain.main(KafkaAndStormTopologyMain.java:27)
Caused by: java.lang.RuntimeException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/home/hadoop/kafka2storm.jar!/defaults.yaml, jar:file:/export/servers/apache-storm-0.9.5/lib/storm-core-0.9.5.jar!/defaults.yaml]
at backtype.storm.utils.Utils.findAndReadConfigFile(Utils.java:133)
at backtype.storm.utils.Utils.readDefaultConfig(Utils.java:160)
at backtype.storm.utils.Utils.readStormConfig(Utils.java:184)
at backtype.storm.utils.Utils.<clinit>(Utils.ja
本地模式规阀,提供storm的核心jar包恒序,
java.lang.NoClassDefFoundError: backtype/storm/topology/IRichSpout
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2615)
at java.lang.Class.getMethod0(Class.java:2856)
at java.lang.Class.getMethod(Class.java:1668)
at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: backtype.storm.topology.IRichSpout
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 6 more
Exception in thread "main"
Process finished with exit code 1
本文首發(fā)于steem,感謝閱讀谁撼,轉載請注明歧胁。
微信公眾號「數據分析」,分享數據科學家的自我修養(yǎng),既然遇見喊巍,不如一起成長屠缭。
讀者交流電報群
知識星球交流群