目錄
一.Spark Streaming接收Flume數(shù)據(jù)
????1.基于Flume的Push模式
????2.基于Custom Sink的Pull模式
二.Spark Streaming接收Kafka數(shù)據(jù)
????1.搭建ZooKeeper(Standalone):
????2.搭建Kafka環(huán)境(單機單broker):
????3.搭建Spark Streaming和Kafka的集成開發(fā)環(huán)境
????4.基于Receiver的方式
????5.直接讀取方式
零.數(shù)據(jù)源:
1.基本的數(shù)據(jù)源:
- 文件流(監(jiān)控文件系統(tǒng)的變化新娜,如果文件有增加液荸,讀取新的文件內(nèi)容)
- RDD隊列流(DStream本質(zhì)就是RDD)
- 套接字流 socketTextStream
一.Spark Streaming接收Flume數(shù)據(jù)
1.基于Flume的Push模式
????Flume被用于在Flume agents之間推送數(shù)據(jù).在這種方式下,Spark Streaming可以很方便的建立一個receiver,起到一個Avro agent的作用.Flume可以將數(shù)據(jù)推送到改receiver.
(1)第一步:Flume的配置文件
(2)第二步:Spark Streaming程序
(3)第三步:注意除了需要使用Flume的lib的jar包以外永品,還需要以下jar包:
spark-streaming-flume_2.1.0.jar
(4)第四步:測試
- 啟動Spark Streaming程序
- 啟動Flume
- 拷貝日志文件到/root/training/logs目錄
- 觀察輸出滚躯,采集到數(shù)據(jù)
2.基于Custom Sink的Pull模式
????不同于Flume直接將數(shù)據(jù)推送到Spark Streaming中,第二種模式通過以下條件運行一個正常的Flume sink柳畔。Flume將數(shù)據(jù)推送到sink中馍管,并且數(shù)據(jù)保持buffered狀態(tài)。Spark Streaming使用一個可靠的Flume接收器和轉(zhuǎn)換器從sink拉取數(shù)據(jù)薪韩。只要當(dāng)數(shù)據(jù)被接收并且被Spark Streaming備份后确沸,轉(zhuǎn)換器才運行成功。
????這樣,與第一種模式相比,保證了很好的健壯性和容錯能力俘陷。然而,這種模式需要為Flume配置一個正常的sink罗捎。
以下為配置步驟:
(1)第一步:Flume的配置文件
(2)第二步:Spark Streaming程序
(3)第三步:需要的jar包
- 將Spark的jar包拷貝到Flume的lib目錄下
- 下面的這個jar包也需要拷貝到Flume的lib目錄下,同時加入IDEA工程的classpath
spark-streaming-flume-sink_2.1.0.jar
(4)第四步:測試
- 啟動Flume
- 在IDEA中啟動FlumeLogPull
- 將測試數(shù)據(jù)拷貝到/root/training/logs
- 觀察IDEA中的輸出
二.Spark Streaming接收Kafka數(shù)據(jù)
Apache Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)拉盾。
1.搭建ZooKeeper(Standalone):
(1)配置/root/training/zookeeper-3.4.10/conf/zoo.cfg文件
dataDir=/root/training/zookeeper-3.4.10/tmp
server.1=spark81:2888:3888
(2)在/root/training/zookeeper-3.4.10/tmp目錄下創(chuàng)建一個myid的空文件
echo 1 > /root/training/zookeeper-3.4.6/tmp/myid
2.搭建Kafka環(huán)境(單機單broker):
(1)修改server.properties文件
(2)啟動Kafka
bin/kafka-server-start.sh config/server.properties &
出現(xiàn)以下錯誤:
需要修改bin/kafka-run-class.sh文件桨菜,將這個選項注釋掉。
(3)測試Kafka
- 創(chuàng)建Topic
bin/kafka-topics.sh --create --zookeeper spark81:2181 -replication-factor 1 --partitions 3 --topic mydemo1
- 發(fā)送消息
bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
- 接收消息
bin/kafka-console-consumer.sh --zookeeper spark81:2181 --topic mydemo1
3.搭建Spark Streaming和Kafka的集成開發(fā)環(huán)境
????由于Spark Streaming和Kafka集成的時候捉偏,依賴的jar包比較多倒得,而且還會產(chǎn)生沖突。強烈建議使用Maven的方式來搭建項目工程夭禽。
下面是依賴的pom.xml文件:
4.基于Receiver的方式
????這個方法使用了Receivers來接收數(shù)據(jù)霞掺。Receivers的實現(xiàn)使用到Kafka高層次的消費者API。對于所有的Receivers讹躯,接收到的數(shù)據(jù)將會保存在Spark executors中菩彬,然后由Spark Streaming啟動的Job來處理這些數(shù)據(jù)。
(1)開發(fā)Spark Streaming的Kafka Receivers
(2)測試
- 啟動Kafka消息的生產(chǎn)者
bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
- 在IDEA中啟動任務(wù)蜀撑,接收Kafka消息
5.直接讀取方式
????和基于Receiver接收數(shù)據(jù)不一樣挤巡,這種方式定期地從Kafka的topic+partition中查詢最新的偏移量,再根據(jù)定義的偏移量范圍在每個batch里面處理數(shù)據(jù)酷麦。當(dāng)作業(yè)需要處理的數(shù)據(jù)來臨時矿卑,spark通過調(diào)用Kafka的簡單消費者API讀取一定范圍的數(shù)據(jù)。
(1)開發(fā)Spark Streaming的程序
(2)測試
- 啟動Kafka消息的生產(chǎn)者
bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
- 在IDEA中啟動任務(wù)沃饶,接收Kafka消息