spark streaming + flume+python(編程)初探

一廉嚼、環(huán)境部署

hadoop集群2.7.1
flume 1.7.0
spark集群:spark-2.0.1-bin-hadoop2.7.tgz
環(huán)境搭建可參考我前面幾篇文章。不再贅述
三臺機器:master,slave1,slave2

二、啟動集群環(huán)境

1.啟動hadoop集群
start-all.sh
2.啟動spark集群
start-master.sh
start-slaves.sh

三光羞、配置flume

編輯conf/flume-conf.properties
配置source厉颤、channel穴豫、sink
sink是關(guān)鍵
一種是推式接收器
一種是拉式接收器
相對于的spark streaming 讀取代碼也要相應(yīng)改變
這里我采用的是推式接收器
sink則應(yīng)這樣配置

a1.sinks = k1
a1.sinks.k1.type = avro  
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = master  #綁定的主機名
a1.sinks.k1.port = 9999  #綁定的端口號```
source可謂avro,syslog等等,第一次測試為確保成功逼友,可先考慮avro,與sink對應(yīng)
**avro:**

a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.31.131
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1

**syslog:**

a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = 192.168.31.131
a1.sources.r1.channels = c1

channel均為內(nèi)存

最后完整配置如下:
source為avro:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

Describe/configure the source

a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.31.131
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1

Describe the sink

a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 9999

Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

yi bai tiao jiu submit

source 為syslog:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

Describe/configure the source

a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = 192.168.31.131
a1.sources.r1.channels = c1

Describe the sink

a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 9999

Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

yi bai tiao jiu submit

####四精肃、編程FlumeWordCount.py
**編寫spark steaming 代碼,讀取flume采集的數(shù)據(jù)帜乞,并統(tǒng)計詞頻**

代碼(python 實現(xiàn)):
```
# -*- coding: UTF-8 -*-
###spark streaming&&Flume 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

sc=SparkContext("local[2]","FlumeWordCount")
#處理時間間隔為2s
ssc=StreamingContext(sc,2)

#打開一個TCP socket 地址 和 端口號
#lines=ssc.socketTextStream("master",9999)
lines = FlumeUtils.createStream(ssc, "master",9999)
lines1=lines.map(lambda x:x[1])
#對1s內(nèi)收到的字符串進行分割
words=lines1.flatMap(lambda line:line.split(" "))

#映射為(word司抱,1)元祖
pairs=words.map(lambda word:(word,1))

wordcounts=pairs.reduceByKey(lambda x,y:x+y)

#輸出文件,前綴+自動加日期
wordcounts.saveAsTextFiles("/tmp/flume")

wordcounts.pprint()

#啟動spark streaming應(yīng)用
ssc.start()
#等待計算終止
ssc.awaitTermination()
```
####五黎烈、運行
#####1.下載依賴的jars包
注意习柠,應(yīng)該去官網(wǎng)找對應(yīng)的jar包匀谣,例如
kafka2.01對應(yīng)

![](http://upload-images.jianshu.io/upload_images/1908836-042927ced5e8bf74.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
下載spark-streaming-flume_2.11.jar 我放在了/home/cms下
(ps:之前放在了flume/lib/下,就會報找不到classpath的錯
#####2.啟動flume agent 
```
flume-ng agent --conf ./conf/ -f /home/cms/flume/conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console
```
>--conf對于的是flume下的conf目錄-f 對應(yīng)的是agent配置文件-n 對于的是agent的名字
基本上終端都會報錯 资溃,因為沒有啟動client和sink落地的程序振定,暫時不用理

#####3.**若source為avro:**,啟動測試
1)準備測試數(shù)據(jù)
新建一個log_test.txt
輸入數(shù)據(jù):

![](http://upload-images.jianshu.io/upload_images/1908836-57970eaa4c5f7e80.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

2)運行spark streaming
開啟另一個終端
```
 spark-submit --jars spark-streaming-flume-assembly_2.11-2.0.1.jar FlumeWordCount.py 2> error_log.txt```

3)發(fā)送數(shù)據(jù)
開啟另外一個終端肉拓,將log_test.txt發(fā)送出去
```
flume-ng  avro-client --conf ./conf/ -H 192.168.31.131 -p 4141 -F /home/cms/log_test.txt```

![](http://upload-images.jianshu.io/upload_images/1908836-4e64718c4b7a6436.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

4)觀察運行spark streaming的程序的終端的輸出

![](http://upload-images.jianshu.io/upload_images/1908836-ce0b33bc0a01e0d7.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
若只是輸出time后频。。暖途。等等卑惜,則檢查flume的配置
hdfs上查看:

![](http://upload-images.jianshu.io/upload_images/1908836-4f413506d9c19af5.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

![](http://upload-images.jianshu.io/upload_images/1908836-40e7a2bd0e9b3516.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

![](http://upload-images.jianshu.io/upload_images/1908836-d8b36c1143406ad3.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

![](http://upload-images.jianshu.io/upload_images/1908836-7e347d968cb5302a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
#####4.**若source為syslog:**,啟動測試
1)運行spark streaming 程序
```
 spark-submit --jars spark-streaming-flume-assembly_2.11-2.0.1.jar FlumeWordCount.py 2> error_log.txt```
2)開啟另一個終端驻售,發(fā)送數(shù)據(jù)
```
echo "hello'\t'word" | nc 192.168.31.131 5140
```

![](http://upload-images.jianshu.io/upload_images/1908836-44c4113f614dae33.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
3)觀察運行spark streaming的程序的終端的輸出

![](http://upload-images.jianshu.io/upload_images/1908836-1ff470da2eb4603f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

####四露久、下一步
flume+kafka+spark streaming

參考:[官網(wǎng)](http://spark.apache.org/docs/latest/streaming-flume-integration.html)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市欺栗,隨后出現(xiàn)的幾起案子毫痕,更是在濱河造成了極大的恐慌,老刑警劉巖迟几,帶你破解...
    沈念sama閱讀 211,743評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件消请,死亡現(xiàn)場離奇詭異,居然都是意外死亡类腮,警方通過查閱死者的電腦和手機臊泰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蚜枢,“玉大人缸逃,你說我怎么就攤上這事〕С椋” “怎么了需频?”我有些...
    開封第一講書人閱讀 157,285評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長筷凤。 經(jīng)常有香客問我昭殉,道長,這世上最難降的妖魔是什么嵌施? 我笑而不...
    開封第一講書人閱讀 56,485評論 1 283
  • 正文 為了忘掉前任饲化,我火速辦了婚禮莽鸭,結(jié)果婚禮上吗伤,老公的妹妹穿的比我還像新娘。我一直安慰自己硫眨,他們只是感情好足淆,可當我...
    茶點故事閱讀 65,581評論 6 386
  • 文/花漫 我一把揭開白布巢块。 她就那樣靜靜地躺著,像睡著了一般巧号。 火紅的嫁衣襯著肌膚如雪族奢。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,821評論 1 290
  • 那天丹鸿,我揣著相機與錄音越走,去河邊找鬼。 笑死靠欢,一個胖子當著我的面吹牛廊敌,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播门怪,決...
    沈念sama閱讀 38,960評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼骡澈,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了掷空?” 一聲冷哼從身側(cè)響起肋殴,我...
    開封第一講書人閱讀 37,719評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎坦弟,沒想到半個月后护锤,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,186評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡酿傍,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,516評論 2 327
  • 正文 我和宋清朗相戀三年蔽豺,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片拧粪。...
    茶點故事閱讀 38,650評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡修陡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出可霎,到底是詐尸還是另有隱情魄鸦,我是刑警寧澤,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布癣朗,位于F島的核電站拾因,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏旷余。R本人自食惡果不足惜绢记,卻給世界環(huán)境...
    茶點故事閱讀 39,936評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望正卧。 院中可真熱鬧蠢熄,春花似錦、人聲如沸炉旷。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至饥追,卻和暖如春图仓,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背但绕。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評論 1 266
  • 我被黑心中介騙來泰國打工救崔, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人捏顺。 一個月前我還...
    沈念sama閱讀 46,370評論 2 360
  • 正文 我出身青樓帚豪,卻偏偏與公主長得像,于是被迫代替她去往敵國和親草丧。 傳聞我的和親對象是個殘疾皇子狸臣,可洞房花燭夜當晚...
    茶點故事閱讀 43,527評論 2 349

推薦閱讀更多精彩內(nèi)容