主要內(nèi)容:解決部署運行遇到問題
Q1 為什么storm 部署的任務沒有日志 也就是說沒有采集任何數(shù)據(jù)
A:
A:step1 【知識點補充】 emit transferred ack和fail是什么概念
- emitted欄顯示的數(shù)字表示的是調(diào)用OutputCollector的emit方法的次數(shù).
- transferred欄顯示的數(shù)字表示的是實際tuple發(fā)送到下一個task的計數(shù).
- ack和failed 含義
Spout的可靠性保證
在Storm中夹囚,消息處理可靠性從Spout開始的抄沮。storm為了保證數(shù)據(jù)能正確的被處理, 對于spout產(chǎn)生的每一個tuple,storm都能夠進行跟蹤,這里面涉及到了ack/fail的處理, 如果一個tuple被處理成功,那么spout便會調(diào)用其ack方法,如果失敗,則會調(diào)用fail方法墓卦。而topology中處理tuple的每一個bolt都會通過OutputCollector來告知storm,當前bolt處理是否成功
當一個tuple被創(chuàng)建扒袖, 不管是spout還是bolt創(chuàng)建的辆它, 它會被賦予一個64位的id ,而acker就是利用這個id去跟蹤所有的tuple的执虹。 每個tuple知道它的祖宗的id(從spout發(fā)出來的那個tuple的id), 每當你新發(fā)射一個tuple丝格, 它的祖宗id都會傳給這個新的tuple。 所以當一個tuple被ack的時候碴萧,它會發(fā)一個消息給acker乙嘀,告訴它這個tuple樹發(fā)生了怎么樣的變化
A:step2 判斷flume有沒有采集 kafka有沒有數(shù)據(jù)
- 查看flume 日志:
tail -f /usr/local/apache-flume-1.7.0-bin/logs/flume.log
14 Jul 2017 20:46:27,733 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:177) - Shutdown Metric for type: SOURCE, name: r1. src.open-connection.count == 0
- ps -ef |grep flume
竟然沒有啟動 重啟flume程序
flume-ng agent --conf conf -f /usr/local/apache-flume-1.7.0-bin/conf/flume-conf.properties -n agent&
-查看kafa數(shù)據(jù)搜集情況
[root@VM-10-112-179-18 logs]# kafka-console-producer.sh --broker-list 10.112.179.18:9092 --topic gome
flume啟動后根本沒有采集數(shù)據(jù)
修改成文件形式
agent.sinks.s1.type = file_roll
agent.sinks.s1.sink.directory =/usr/local/apache-flume-1.7.0-bin/data
Specify the channel the sink should use
agent.sinks.s1.channel = c1
- 查看kafkalog日志內(nèi)容
kafka-run-class.sh kafka.tools.DumpLogSegments --files \
/usr/local/kafka_2.12-0.10.2.1/logs/gome-0/00000000000000000000.log --print-data-log
Dumping /usr/local/kafka_2.12-0.10.2.1/logs/gome-0/00000000000000000000.log
Starting offset: 0
再次查看flume 日志
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
... 3 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
問題在于 這說明已經(jīng)采取到 數(shù)據(jù)沒有傳輸?shù)絢afka中去
對kafa進行監(jiān)控
重新檢查flume配置文件 kafa遷移其他主機 正常
估計是防火墻的原因
主機間通信:
關(guān)閉命令: service iptables stop
永久關(guān)閉防火墻:chkconfig iptables off
查看狀態(tài):service iptables status
或者topic 配置的不正確
發(fā)布storm程序出錯
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: org.apache.log4j.Logger
原因分析:static修飾符作用
java序列化是不能序列化static變量的
解決辦法:
出錯代碼:
public class Customer implements Serializable{
private Logger logger = Logger.getLogger(Customer.class)
}
修正代碼:
public class Customer implements Serializable{
private static final Logger logger = Logger.getLogger(Customer.class)
}
storm storm 持久化引入json格式的數(shù)據(jù) --缺少依賴
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
1. java.lang.NoClassDefFoundError: net/sf/json/JSONObject at gome.storm.model.SrsLogServeInfo.toJsonString(SrsLogServeInfo.java:31) at gome.storm.bolt.PersistentSrsLog.getEdgeTtoSrsToClitInfo
----->json-lib-2.4-jdk15.jar
2. java.lang.NoClassDefFoundError: org/apache/commons/lang/exception/NestableRuntimeException at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:
---->commons-lang-2.5.jar
3. java.lang.NoClassDefFoundError: net/sf/ezmorph/Morpher at
----> ezmorph-1.0.6.jar
4. java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory at net.sf.json.AbstractJSON.<clinit>(AbstractJSON.java:53)
---->commons-logging-1.1.1.jar
5. java.lang.NoClassDefFoundError: org/apache/commons/beanutils/DynaBean at
net.sf.json.JSONObject.fromObject(JSONObject.java:147) at
net.sf.json.JSONObject.fromObject(JSONObject.java:134)
------>commons-beanutils 1.8.0
A2:
json-lib官方網(wǎng)站
作用能把對象 map 數(shù)組 xml等轉(zhuǎn)換成json結(jié)構(gòu) 并且解析
http://json-lib.sourceforge.net/
綜上,想用一個最簡單的JSON也得導入以下的6個包:
Json-lib requires (at least) the following dependencies in your classpath:
commons-lang 2.5
commons-beanutils 1.8.0
commons-collections 3.2.1
commons-logging 1.1.1
ezmorph 1.0.6
json-lib-2.4-jdk15.jar
- strom 運行插入redis錯誤
Could not connect to Redis at 10.77.88.99:6379: Connection refused
估計是防火墻的原因
主機間通信:
關(guān)閉命令: service iptables stop
永久關(guān)閉防火墻:chkconfig iptables off
查看狀態(tài):service iptables status
結(jié)果不行 問題在redis服務器上查看redis.conf 說明
################################## NETWORK #####################################
# By default, if no "bind" configuration directive is specified, Redis listens
# for connections from all the network interfaces available on the server.
# It is possible to listen to just one or multiple selected interfaces using
# the "bind" configuration directive, followed by one or more IP addresses.
#
# Examples:
#
# bind 192.168.1.100 10.0.0.1
# bind 127.0.0.1 ::1
#
# ~~~ WARNING ~~~ If the computer running Redis is directly exposed to the
# internet, binding to all the interfaces is dangerous and will expose the
# instance to everybody on the internet. So by default we uncomment the
# following bind directive, that will force Redis to listen only into
# the IPv4 lookback interface address (this means Redis will be able to
# accept connections only from clients running into the same computer it
# is running).
#
# IF YOU ARE SURE YOU WANT YOUR INSTANCE TO LISTEN TO ALL THE INTERFACES
# JUST COMMENT THE FOLLOWING LINE.
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#bind 127.0.0.1
翻譯:
- 如果配置成bind 127.0.0.1方式
redis服務器只能接受本地連接的請求(same computer)
Redis will be able to
accept connections only from clients running into the same computer it is running
- 如果配置成#bind 127.0.0.1 方式
redis接受任何服務器的連接(all ) 默認配置
By default
if no "bind" configuration directive is specified, Redis listens
for connections from all the network interfaces available on the server.
- bind 運行訪問的遠程主機 ip xxxx
bing 允許外網(wǎng)訪問的ip(followed by one or more IP addresses)
It is possible to listen to just one or multiple selected interfaces using
the "bind" configuration directive, followed by one or more IP addresses.
Examples:
bind 192.168.1.100 10.0.0.1
對比啟動
Q2 重啟之后從strom仍然從頭開始讀取kafka記錄
https://github.com/apache/storm/tree/master/external/storm-kafka
How KafkaSpout recovers in case of failures
- kafka.api.OffsetRequest.EarliestTime(): read from the beginning of the topic (i.e. from the oldest messages onwards)
- kafka.api.OffsetRequest.LatestTime(): read from the end of the topic (i.e. any new messsages that are being written to the topic)
- If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true. If true, the spout will always begin reading from the offset defined by KafkaConfig.startOffsetTime
意思是說:
spoutConf.ignoreZkOffsets = false; // 重啟均是從offset中讀取
參考
1 http://blog.csdn.net/yangyutong0506/article/details/46742601
2 https://github.com/dibbhatt/kafka-spark-consumer/issues/16
3 Storm消息可靠性與Ack機制
http://blog.jassassin.com/2014/10/22/storm/storm-ack/
4 Kafka 指南
http://wdxtub.com/2016/08/15/kafka-guide/
5 序列化
https://www.ibm.com/developerworks/cn/java/j-lo-serial/index.html