一廉嚼、環(huán)境部署
hadoop集群2.7.1
flume 1.7.0
spark集群:spark-2.0.1-bin-hadoop2.7.tgz
環(huán)境搭建可參考我前面幾篇文章。不再贅述
三臺機器:master,slave1,slave2
二、啟動集群環(huán)境
1.啟動hadoop集群
start-all.sh
2.啟動spark集群
start-master.sh
start-slaves.sh
三光羞、配置flume
編輯conf/flume-conf.properties
配置source厉颤、channel穴豫、sink
sink是關(guān)鍵
一種是推式接收器
一種是拉式接收器
相對于的spark streaming 讀取代碼也要相應(yīng)改變
這里我采用的是推式接收器
sink則應(yīng)這樣配置
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = master #綁定的主機名
a1.sinks.k1.port = 9999 #綁定的端口號```
source可謂avro,syslog等等,第一次測試為確保成功逼友,可先考慮avro,與sink對應(yīng)
**avro:**
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.31.131
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1
**syslog:**
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = 192.168.31.131
a1.sources.r1.channels = c1
channel均為內(nèi)存
最后完整配置如下:
source為avro:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.31.131
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1
Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 9999
Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
yi bai tiao jiu submit
source 為syslog:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = 192.168.31.131
a1.sources.r1.channels = c1
Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 9999
Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
yi bai tiao jiu submit
####四精肃、編程FlumeWordCount.py
**編寫spark steaming 代碼,讀取flume采集的數(shù)據(jù)帜乞,并統(tǒng)計詞頻**
代碼(python 實現(xiàn)):
```
# -*- coding: UTF-8 -*-
###spark streaming&&Flume
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
sc=SparkContext("local[2]","FlumeWordCount")
#處理時間間隔為2s
ssc=StreamingContext(sc,2)
#打開一個TCP socket 地址 和 端口號
#lines=ssc.socketTextStream("master",9999)
lines = FlumeUtils.createStream(ssc, "master",9999)
lines1=lines.map(lambda x:x[1])
#對1s內(nèi)收到的字符串進行分割
words=lines1.flatMap(lambda line:line.split(" "))
#映射為(word司抱,1)元祖
pairs=words.map(lambda word:(word,1))
wordcounts=pairs.reduceByKey(lambda x,y:x+y)
#輸出文件,前綴+自動加日期
wordcounts.saveAsTextFiles("/tmp/flume")
wordcounts.pprint()
#啟動spark streaming應(yīng)用
ssc.start()
#等待計算終止
ssc.awaitTermination()
```
####五黎烈、運行
#####1.下載依賴的jars包
注意习柠,應(yīng)該去官網(wǎng)找對應(yīng)的jar包匀谣,例如
kafka2.01對應(yīng)
![](http://upload-images.jianshu.io/upload_images/1908836-042927ced5e8bf74.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
下載spark-streaming-flume_2.11.jar 我放在了/home/cms下
(ps:之前放在了flume/lib/下,就會報找不到classpath的錯
#####2.啟動flume agent
```
flume-ng agent --conf ./conf/ -f /home/cms/flume/conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console
```
>--conf對于的是flume下的conf目錄-f 對應(yīng)的是agent配置文件-n 對于的是agent的名字
基本上終端都會報錯 资溃,因為沒有啟動client和sink落地的程序振定,暫時不用理
#####3.**若source為avro:**,啟動測試
1)準備測試數(shù)據(jù)
新建一個log_test.txt
輸入數(shù)據(jù):
![](http://upload-images.jianshu.io/upload_images/1908836-57970eaa4c5f7e80.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
2)運行spark streaming
開啟另一個終端
```
spark-submit --jars spark-streaming-flume-assembly_2.11-2.0.1.jar FlumeWordCount.py 2> error_log.txt```
3)發(fā)送數(shù)據(jù)
開啟另外一個終端肉拓,將log_test.txt發(fā)送出去
```
flume-ng avro-client --conf ./conf/ -H 192.168.31.131 -p 4141 -F /home/cms/log_test.txt```
![](http://upload-images.jianshu.io/upload_images/1908836-4e64718c4b7a6436.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
4)觀察運行spark streaming的程序的終端的輸出
![](http://upload-images.jianshu.io/upload_images/1908836-ce0b33bc0a01e0d7.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
若只是輸出time后频。。暖途。等等卑惜,則檢查flume的配置
hdfs上查看:
![](http://upload-images.jianshu.io/upload_images/1908836-4f413506d9c19af5.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/1908836-40e7a2bd0e9b3516.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/1908836-d8b36c1143406ad3.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/1908836-7e347d968cb5302a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
#####4.**若source為syslog:**,啟動測試
1)運行spark streaming 程序
```
spark-submit --jars spark-streaming-flume-assembly_2.11-2.0.1.jar FlumeWordCount.py 2> error_log.txt```
2)開啟另一個終端驻售,發(fā)送數(shù)據(jù)
```
echo "hello'\t'word" | nc 192.168.31.131 5140
```
![](http://upload-images.jianshu.io/upload_images/1908836-44c4113f614dae33.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
3)觀察運行spark streaming的程序的終端的輸出
![](http://upload-images.jianshu.io/upload_images/1908836-1ff470da2eb4603f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
####四露久、下一步
flume+kafka+spark streaming
參考:[官網(wǎng)](http://spark.apache.org/docs/latest/streaming-flume-integration.html)