storm學習第二天QA(flume-kafa-storm-redis 運行)

主要內(nèi)容:解決部署運行遇到問題

Q1 為什么storm 部署的任務沒有日志 也就是說沒有采集任何數(shù)據(jù)

image.png

A:

A:step1 【知識點補充】 emit transferred ack和fail是什么概念

  • emitted欄顯示的數(shù)字表示的是調(diào)用OutputCollector的emit方法的次數(shù).
  • transferred欄顯示的數(shù)字表示的是實際tuple發(fā)送到下一個task的計數(shù).
  • ack和failed 含義

Spout的可靠性保證
在Storm中夹囚,消息處理可靠性從Spout開始的抄沮。storm為了保證數(shù)據(jù)能正確的被處理, 對于spout產(chǎn)生的每一個tuple,storm都能夠進行跟蹤,這里面涉及到了ack/fail的處理, 如果一個tuple被處理成功,那么spout便會調(diào)用其ack方法,如果失敗,則會調(diào)用fail方法墓卦。而topology中處理tuple的每一個bolt都會通過OutputCollector來告知storm,當前bolt處理是否成功
當一個tuple被創(chuàng)建扒袖, 不管是spout還是bolt創(chuàng)建的辆它, 它會被賦予一個64位的id ,而acker就是利用這個id去跟蹤所有的tuple的执虹。 每個tuple知道它的祖宗的id(從spout發(fā)出來的那個tuple的id), 每當你新發(fā)射一個tuple丝格, 它的祖宗id都會傳給這個新的tuple。 所以當一個tuple被ack的時候碴萧,它會發(fā)一個消息給acker乙嘀,告訴它這個tuple樹發(fā)生了怎么樣的變化

A:step2 判斷flume有沒有采集 kafka有沒有數(shù)據(jù)

  • 查看flume 日志:

tail -f /usr/local/apache-flume-1.7.0-bin/logs/flume.log

14 Jul 2017 20:46:27,733 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:177) - Shutdown Metric for type: SOURCE, name: r1. src.open-connection.count == 0

  • ps -ef |grep flume

竟然沒有啟動 重啟flume程序

flume-ng agent --conf conf -f /usr/local/apache-flume-1.7.0-bin/conf/flume-conf.properties -n agent&

-查看kafa數(shù)據(jù)搜集情況

[root@VM-10-112-179-18 logs]# kafka-console-producer.sh --broker-list 10.112.179.18:9092 --topic gome

flume啟動后根本沒有采集數(shù)據(jù)
修改成文件形式
agent.sinks.s1.type = file_roll
agent.sinks.s1.sink.directory =/usr/local/apache-flume-1.7.0-bin/data
Specify the channel the sink should use
agent.sinks.s1.channel = c1

image.png
  • 查看kafkalog日志內(nèi)容
 kafka-run-class.sh kafka.tools.DumpLogSegments --files \
 /usr/local/kafka_2.12-0.10.2.1/logs/gome-0/00000000000000000000.log --print-data-log 

Dumping /usr/local/kafka_2.12-0.10.2.1/logs/gome-0/00000000000000000000.log
Starting offset: 0

再次查看flume 日志

org.apache.flume.EventDeliveryException: Failed to publish events
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
        at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
        ... 3 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

問題在于 這說明已經(jīng)采取到 數(shù)據(jù)沒有傳輸?shù)絢afka中去

對kafa進行監(jiān)控

image.png

重新檢查flume配置文件 kafa遷移其他主機 正常


image.png

估計是防火墻的原因

主機間通信:
關(guān)閉命令: service iptables stop
永久關(guān)閉防火墻:chkconfig iptables off
查看狀態(tài):service iptables status

或者topic 配置的不正確

發(fā)布storm程序出錯

Caused by: java.lang.RuntimeException: java.io.NotSerializableException: org.apache.log4j.Logger

原因分析:static修飾符作用
java序列化是不能序列化static變量的
解決辦法:

出錯代碼:
public class Customer implements Serializable{
private Logger logger =   Logger.getLogger(Customer.class)
}
修正代碼:
public class Customer implements Serializable{
private static final Logger logger =   Logger.getLogger(Customer.class)
 
}

storm storm 持久化引入json格式的數(shù)據(jù) --缺少依賴

<dependency>   
       <groupId>net.sf.json-lib</groupId>   
       <artifactId>json-lib</artifactId>   
       <version>2.4</version>   
       <classifier>jdk15</classifier>   
    </dependency> 

1. java.lang.NoClassDefFoundError: net/sf/json/JSONObject at gome.storm.model.SrsLogServeInfo.toJsonString(SrsLogServeInfo.java:31) at gome.storm.bolt.PersistentSrsLog.getEdgeTtoSrsToClitInfo
    ----->json-lib-2.4-jdk15.jar
2.  java.lang.NoClassDefFoundError: org/apache/commons/lang/exception/NestableRuntimeException at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:
   ---->commons-lang-2.5.jar 
3.  java.lang.NoClassDefFoundError: net/sf/ezmorph/Morpher at 
    ----> ezmorph-1.0.6.jar
4. java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory at net.sf.json.AbstractJSON.<clinit>(AbstractJSON.java:53) 
   ---->commons-logging-1.1.1.jar
5. java.lang.NoClassDefFoundError: org/apache/commons/beanutils/DynaBean at     
      net.sf.json.JSONObject.fromObject(JSONObject.java:147) at 
      net.sf.json.JSONObject.fromObject(JSONObject.java:134) 
   ------>commons-beanutils 1.8.0

A2:

json-lib官方網(wǎng)站

作用能把對象 map 數(shù)組 xml等轉(zhuǎn)換成json結(jié)構(gòu) 并且解析

http://json-lib.sourceforge.net/

image.png

綜上,想用一個最簡單的JSON也得導入以下的6個包:
Json-lib requires (at least) the following dependencies in your classpath:
commons-lang 2.5
commons-beanutils 1.8.0
commons-collections 3.2.1
commons-logging 1.1.1
ezmorph 1.0.6
json-lib-2.4-jdk15.jar

  • strom 運行插入redis錯誤

Could not connect to Redis at 10.77.88.99:6379: Connection refused
估計是防火墻的原因

主機間通信:

關(guān)閉命令: service iptables stop
永久關(guān)閉防火墻:chkconfig iptables off
查看狀態(tài):service iptables status

結(jié)果不行 問題在redis服務器上查看redis.conf 說明

################################## NETWORK #####################################
# By default, if no "bind" configuration directive is specified, Redis listens
# for connections from all the network interfaces available on the server.
# It is possible to listen to just one or multiple selected interfaces using
# the "bind" configuration directive, followed by one or more IP addresses.
#
# Examples:
#
# bind 192.168.1.100 10.0.0.1
# bind 127.0.0.1 ::1
#
# ~~~ WARNING ~~~ If the computer running Redis is directly exposed to the
# internet, binding to all the interfaces is dangerous and will expose the
# instance to everybody on the internet. So by default we uncomment the
# following bind directive, that will force Redis to listen only into
# the IPv4 lookback interface address (this means Redis will be able to
# accept connections only from clients running into the same computer it
# is running).
#
# IF YOU ARE SURE YOU WANT YOUR INSTANCE TO LISTEN TO ALL THE INTERFACES
# JUST COMMENT THE FOLLOWING LINE.
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#bind 127.0.0.1 

翻譯:

  • 如果配置成bind 127.0.0.1方式

redis服務器只能接受本地連接的請求(same computer)
Redis will be able to
accept connections only from clients running into the same computer it is running

  • 如果配置成#bind 127.0.0.1 方式

redis接受任何服務器的連接(all ) 默認配置
By default
if no "bind" configuration directive is specified, Redis listens

for connections from all the network interfaces available on the server.

  • bind 運行訪問的遠程主機 ip xxxx

bing 允許外網(wǎng)訪問的ip(followed by one or more IP addresses)
It is possible to listen to just one or multiple selected interfaces using
the "bind" configuration directive, followed by one or more IP addresses.
Examples:
bind 192.168.1.100 10.0.0.1

對比啟動

只允許本地訪問.png

支持遠程訪問.png

Q2 重啟之后從strom仍然從頭開始讀取kafka記錄

https://github.com/apache/storm/tree/master/external/storm-kafka

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?

How KafkaSpout recovers in case of failures

  • kafka.api.OffsetRequest.EarliestTime(): read from the beginning of the topic (i.e. from the oldest messages onwards)
  • kafka.api.OffsetRequest.LatestTime(): read from the end of the topic (i.e. any new messsages that are being written to the topic)
  • If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true. If true, the spout will always begin reading from the offset defined by KafkaConfig.startOffsetTime
    意思是說:
    spoutConf.ignoreZkOffsets = false; // 重啟均是從offset中讀取

參考
1 http://blog.csdn.net/yangyutong0506/article/details/46742601
2 https://github.com/dibbhatt/kafka-spark-consumer/issues/16
3 Storm消息可靠性與Ack機制
http://blog.jassassin.com/2014/10/22/storm/storm-ack/
4 Kafka 指南
http://wdxtub.com/2016/08/15/kafka-guide/
5 序列化
https://www.ibm.com/developerworks/cn/java/j-lo-serial/index.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末破喻,一起剝皮案震驚了整個濱河市虎谢,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌曹质,老刑警劉巖婴噩,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異羽德,居然都是意外死亡几莽,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門宅静,熙熙樓的掌柜王于貴愁眉苦臉地迎上來章蚣,“玉大人,你說我怎么就攤上這事姨夹∠舜梗” “怎么了?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵磷账,是天一觀的道長峭沦。 經(jīng)常有香客問我,道長逃糟,這世上最難降的妖魔是什么吼鱼? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮绰咽,結(jié)果婚禮上蛉抓,老公的妹妹穿的比我還像新娘。我一直安慰自己剃诅,他們只是感情好,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布驶忌。 她就那樣靜靜地躺著矛辕,像睡著了一般笑跛。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上聊品,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天飞蹂,我揣著相機與錄音,去河邊找鬼翻屈。 笑死陈哑,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的伸眶。 我是一名探鬼主播惊窖,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼厘贼!你這毒婦竟也來了界酒?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤嘴秸,失蹤者是張志新(化名)和其女友劉穎毁欣,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體岳掐,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡凭疮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了串述。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片执解。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖剖煌,靈堂內(nèi)的尸體忽然破棺而出材鹦,到底是詐尸還是另有隱情,我是刑警寧澤耕姊,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布桶唐,位于F島的核電站,受9級特大地震影響茉兰,放射性物質(zhì)發(fā)生泄漏尤泽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一规脸、第九天 我趴在偏房一處隱蔽的房頂上張望坯约。 院中可真熱鬧,春花似錦莫鸭、人聲如沸闹丐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽卿拴。三九已至衫仑,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間堕花,已是汗流浹背文狱。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留缘挽,地道東北人瞄崇。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像壕曼,于是被迫代替她去往敵國和親苏研。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容