Spark Streaming
隨著大數(shù)據(jù)技術(shù)的不斷發(fā)展立帖,人們對(duì)于大數(shù)據(jù)的實(shí)時(shí)性處理要求也在不斷提高,傳統(tǒng) 的 MapReduce 等批處理框架在某些特定領(lǐng)域猪贪,例如實(shí)時(shí)用戶推薦姿锭、用戶行為分析這 些應(yīng)用場(chǎng)景上逐漸不能滿足人們對(duì)實(shí)時(shí)性的需求,因此誕生了一批如 S3汉矿、Samza、 Storm备禀、Flink等流式分析洲拇、實(shí)時(shí)計(jì)算框架奈揍。
Spark 由于其內(nèi)部?jī)?yōu)秀的調(diào)度機(jī)制、快速的分布式計(jì)算能力赋续,能夠以極快的速度進(jìn)行 迭代計(jì)算男翰。正是由于具有這樣的優(yōu)勢(shì),Spark 能夠在某些程度上進(jìn)行實(shí)時(shí)處理纽乱, Spark Streaming 正是構(gòu)建在此之上的流式框架蛾绎。
Spark Streaming 概述
什么是Spark Streaming
Spark Streaming 類似于Apache Storm(來(lái)一條數(shù)據(jù)處理一條,延遲低鸦列、響應(yīng)快租冠、吞吐量低),用于流式數(shù)據(jù)處理薯嗤。Spark Streaming 具有高吞吐量和容錯(cuò)能力強(qiáng)等特點(diǎn)顽爹,支持?jǐn)?shù)據(jù)源有很多,例如Kafka(最重要的數(shù)據(jù)源)、Flume、Twitter和TCP套接字螃诅,數(shù)據(jù)輸入后可用高度抽象API凿可,如map、reduce涉馅、join归园、window等進(jìn)行運(yùn)算,處理結(jié)果能保存在很多地方稚矿,如HDFS庸诱、數(shù)據(jù)庫(kù)等,Spark Streaming能與MLlib已經(jīng)Graphx融合
Spark Streaming 與Spark 基于RDD的概念比較類似晤揣,Spark Streaming使用離散流(Discretized Stream)作為抽象表示桥爽,稱為DStream,DStream是隨著時(shí)間推移而收到的數(shù)據(jù)的序列昧识。在內(nèi)部钠四,每個(gè)時(shí)間區(qū)間收到的數(shù)據(jù)都作為RDD存在,DStream是由這些RDD所組成的序列
DStream可以從各種輸入源創(chuàng)建跪楞,比如Flume缀去、Kafka或者HDFS。創(chuàng)建出來(lái)的DStream支持兩種操作
- 轉(zhuǎn)化操作甸祭,會(huì)生成一個(gè)新的DStream
- 輸出操作(output operation)缕碎,把數(shù)據(jù)寫入外部系統(tǒng)
DStream提供了許多與RDD所支持的操作相類似的操作支持,還增加了時(shí)間相關(guān)的新操作池户,比如滑動(dòng)窗口咏雌。
Spark Streaming架構(gòu)
Spark Streaming 使用 mini-batch架構(gòu)凡怎,把流式計(jì)算當(dāng)作一系列連續(xù)的小規(guī)模批處理來(lái)對(duì)待,Spark Streaming 從各種輸入源中讀取數(shù)據(jù)处嫌,并把數(shù)據(jù)分組為小的批次栅贴,新的批次按均勻的時(shí)間間隔創(chuàng)建處理,在每個(gè)時(shí)間區(qū)間開(kāi)始的時(shí)候熏迹,一個(gè)新的批次就創(chuàng)建出來(lái)檐薯,在該區(qū)間內(nèi)接收到的數(shù)據(jù)都會(huì)被添加到這個(gè)批次中。在時(shí)間區(qū)間結(jié)束時(shí)注暗,批次停止增長(zhǎng)坛缕。時(shí)間區(qū)間的大小是有批次間隔這個(gè)參數(shù)覺(jué)得決定的,批次間隔一般設(shè)在500毫秒到幾秒之間捆昏,有開(kāi)發(fā)者配置赚楚。每個(gè)輸入批次都形成一個(gè)RDD,以 Spark 作業(yè)的方式處理并生成其他的 RDD骗卜。 處理的結(jié)果可以以批處理的方式傳給外部系統(tǒng)宠页。
Spark Streaming 的編程抽象時(shí)離散化流,也就是DStream寇仓,是一個(gè)RDD序列举户,每個(gè)RDD代表數(shù)據(jù)流中的一個(gè)時(shí)間片的內(nèi)的數(shù)據(jù)。
應(yīng)用于DStream上的轉(zhuǎn)換操作遍烦,都會(huì)轉(zhuǎn)換為底層RDD上的操作俭嘁,如對(duì)行DStream中的每個(gè)RDD應(yīng)用flatMap操作以生成單詞DStream的RDD
這些底層的RDD轉(zhuǎn)換是由Spark引擎完成的。DStream操作隱藏了大部分這些細(xì)節(jié)服猪, 為開(kāi)發(fā)人員提供了更高級(jí)別的API以方便使用供填。
Spark Streaming為每個(gè)輸入源啟動(dòng)對(duì)應(yīng)的接收器。接收器運(yùn)行在Executor中罢猪,從輸入源收集數(shù)據(jù)并保存為 RDD近她。默認(rèn)情況下接收到的數(shù)據(jù)后會(huì)復(fù)制到另一個(gè)Executor中,進(jìn)行容錯(cuò); Driver 中的 StreamingContext 會(huì)周期性地運(yùn)行 Spark 作業(yè)來(lái)處理這些數(shù)據(jù)膳帕。
Spark Streaming運(yùn)行流程
客戶端提交Spark Streaming 作業(yè)后啟動(dòng)Driver粘捎,Driver啟動(dòng)Receiver,Receiver接收數(shù)據(jù)源的數(shù)據(jù)
每個(gè)作業(yè)保護(hù)多個(gè)Executor备闲,每個(gè)Executor以線程的方式運(yùn)行task晌端,Spark Streaming至少包含一個(gè)receiver task(一般情況下)
Receiver接收數(shù)據(jù)后生成Block,并把BlockId匯報(bào)給Driver恬砂,然后備份到另外一個(gè)Executor上
ReceiverTracker維護(hù)Reciver匯報(bào)的BlockId
Driver定時(shí)啟動(dòng)JobGenerator咧纠,根據(jù)DStream的關(guān)系生成邏輯RDD,然后創(chuàng)建JobSet泻骤,叫個(gè)JobScheduler
JobScheduler負(fù)責(zé)調(diào)度JobSet漆羔,交給DAGScheduler梧奢,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages演痒,每個(gè)stage包含一到多個(gè)Task亲轨,將TaskSet提交給TaskSchedule。
TaskScheduler負(fù)責(zé)把Task調(diào)度到Executor上鸟顺,并維護(hù)Task運(yùn)行狀態(tài)惦蚊。
總結(jié):
- 提交完spark作業(yè)后,driver就會(huì)去啟動(dòng)Receiver取接收數(shù)據(jù)讯嫂,Receiver接收數(shù)據(jù)的同時(shí)蹦锋,會(huì)將數(shù)據(jù)備份到另一個(gè)節(jié)點(diǎn),Receiver接收到數(shù)據(jù)會(huì)回報(bào)給Driver的欧芽,然后
Spark Streaming優(yōu)缺點(diǎn)
EOS:exactly onece semantic 處理且僅處理一次
與傳統(tǒng)流式框架相比莉掂,Spark Streaming最大的不同點(diǎn)在于它對(duì)待數(shù)據(jù)是粗粒度的處理方式,記一次處理一小批數(shù)據(jù)千扔,而其他框架往往采用細(xì)粒度的處理模式憎妙,即依次處理一條數(shù)據(jù),Spark Streaming這樣的設(shè)計(jì)即為其帶來(lái)了優(yōu)點(diǎn)曲楚,也帶來(lái)的確定
優(yōu)點(diǎn)
- Spark Streaming 內(nèi)部實(shí)現(xiàn)和調(diào)度方式高度依賴Spark的DAG調(diào)度器和RDD厘唾,這就決定了Spark Streaming的設(shè)計(jì)初衷必須是粗粒度方式的,同時(shí)洞渤,由于Spark 內(nèi)部調(diào)度器足夠快速和高效阅嘶,可以快速地處理小批量數(shù)據(jù)属瓣,這就獲得準(zhǔn)實(shí)時(shí)的特性
- Spark Streaming 的粗粒度執(zhí)行方式使其確痹仄“處理且僅處理一次”的特性(EOS),同時(shí)也可以方便地實(shí)現(xiàn)容錯(cuò)回復(fù)機(jī)制
- 由于Spark Streaming的DStream本質(zhì)是RDD在流式數(shù)據(jù)上的抽象抡蛙,因此基于RDD的各種操作也有相應(yīng)的基于DStream的版本护昧,這樣就大大降低了用戶對(duì)于新框架的學(xué)習(xí)成本,在了解Spark的情況下粗截,用戶將很容易使用Spark Streaming
- 由于DStream是在RDD上抽象惋耙,那么也跟容易與RDD進(jìn)行交互操作,在需要將流式數(shù)據(jù)和批處理數(shù)據(jù)結(jié)合進(jìn)行分析的情況下熊昌,將會(huì)變得非常方便
缺點(diǎn)
- Spark Streaming 的粗粒度處理方式也造成了不可避免的延遲绽榛。在細(xì)粒度處理方式下,理想情況下每一條記錄都會(huì)被實(shí)時(shí)處理婿屹,而在Spark Streaming中灭美,數(shù)據(jù)需要匯總到一定的量后在一次性處理,這就增加了數(shù)據(jù)處理的延遲昂利,這種延遲是由框架的設(shè)計(jì)引入的届腐,并不是由網(wǎng)絡(luò)或其他情況造成的
Structured Streaming
Spark Streaming計(jì)算邏輯是把數(shù)據(jù)按時(shí)間劃分為DStream铁坎,存在以下問(wèn)題:
- 框架自身只能根據(jù)Batch Time單元進(jìn)行數(shù)據(jù)處理,很難處理基于Event time(即時(shí)間戳)的數(shù)據(jù)犁苏,很難處理延遲硬萍、亂序的數(shù)據(jù)
- 流式和批量處理的API完全不一致,兩種使用場(chǎng)景中围详,程序代碼還是需要一定的轉(zhuǎn)換
- 端到端的數(shù)據(jù)容錯(cuò)保障邏輯需要用戶自己構(gòu)建朴乖,難以處理增量更新和持久化存儲(chǔ)等一致性問(wèn)題
基于以上問(wèn)題,提出了下一代Structure Streaming助赞。將數(shù)據(jù)映射為一張無(wú)界長(zhǎng)度的表寒砖,通過(guò)表的計(jì)算,輸出結(jié)果映射為另一張表嫉拐。
以結(jié)構(gòu)化的方式去操作流式數(shù)據(jù)哩都,簡(jiǎn)化了實(shí)時(shí)計(jì)算過(guò)程,同時(shí)還復(fù)用了Catalyst引擎來(lái)優(yōu)化SQL操作此外還能支持增量計(jì)算和基于event time 的計(jì)算婉徘。
DStream基礎(chǔ)數(shù)據(jù)源
基礎(chǔ)數(shù)據(jù)源包括:文件數(shù)據(jù)流漠嵌、socket數(shù)據(jù)流、RDD隊(duì)列流;這些數(shù)據(jù)源主要用于測(cè)試盖呼。
引入依賴:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
文件數(shù)據(jù)流
文件數(shù)據(jù)流:通過(guò)textFileStream(directory)方法進(jìn)行讀取HDFS兼容的文件系統(tǒng)文件
/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). Files must be written to the
* monitored directory by "moving" them from another location within the same
* file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
Spark Streaming 將會(huì)監(jiān)控directory目錄儒鹿,并不斷處理移動(dòng)進(jìn)來(lái)的文件
- 不支持嵌套目錄
- 文件需要相同的數(shù)據(jù)格式
- 文件進(jìn)入directory的方式需要通過(guò)移動(dòng)或者重命名來(lái)實(shí)現(xiàn)
- 一旦文件移動(dòng)進(jìn)目錄,則不能在修改几晤,即使修改了也不會(huì)讀取新數(shù)據(jù)
- 文件流不需要接收器(receiver)约炎,不需要單獨(dú)分配CPU核
package com.hhb.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @description: 只能監(jiān)控啟動(dòng)后的新增的文件
* @date: 2020-11-19 10:28
**/
object FileDStream {
def main(args: Array[String]): Unit = {
//1 創(chuàng)建SparkConf
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
//2 初始化入口,設(shè)置任務(wù)5秒鐘執(zhí)行一次
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("warn")
//3蟹瘾、4圾浅、5
//3 讀取本地文件,創(chuàng)建DStream
val linesDStream = ssc.textFileStream("/Users/baiwang/myproject/spark/data/log/")
//4 DStream轉(zhuǎn)換
val wordDStream = linesDStream.flatMap(_.split("\\s+"))
val wordCountDStream = wordDStream.map((_, 1)).reduceByKey(_ + _)
//5 輸出DStream到屏幕
wordCountDStream.print()
//6 啟動(dòng)作業(yè)
ssc.start()
//啟動(dòng)作業(yè)后憾朴,等待狸捕,作業(yè)以5秒一次的頻率運(yùn)行
ssc.awaitTermination()
}
}
Socket數(shù)據(jù)流
Spark Streaming 可以通過(guò)Socket端口監(jiān)聽(tīng)并接受數(shù)據(jù),然后進(jìn)行相應(yīng)的處理
在linux120上執(zhí)行nc程序
nc -lk 9999
# yum install nc
隨后可以在nc窗口中隨意輸入一下單詞众雷,監(jiān)聽(tīng)窗口會(huì)自動(dòng)獲得單詞數(shù)據(jù)流信息灸拍,在監(jiān)聽(tīng)窗口每隔x秒就會(huì)打印出詞頻的統(tǒng)計(jì)信息,可以在屏幕上出現(xiàn)結(jié)果
備注:使用local[*]可能出現(xiàn)問(wèn)題
如果給虛擬機(jī)配置的cpu數(shù)為1砾省,使用local[*]也只會(huì)啟動(dòng)一個(gè)線程鸡岗,該線程用于receiver task,此時(shí)沒(méi)有資源處理接受打到的數(shù)據(jù)编兄。
【現(xiàn)象:程序正常執(zhí)行轩性,不會(huì)打印時(shí)間戳,屏幕上也不會(huì)有其他有效信息】
源碼:
/**
* Creates an input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
* @see [[socketStream]]
*/
def socketTextStream(
hostname: String,
port: Int,
//監(jiān)控到的數(shù)據(jù)翻诉,默認(rèn)存內(nèi)存炮姨,內(nèi)存存不下放到磁盤捌刮,并對(duì)數(shù)據(jù)進(jìn)行備份
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
注意:DStream的 StorageLevel 是 MEMORY_AND_DISK_SER_2;
package com.hhb.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-19 15:35
**/
object SocketDStream {
def main(args: Array[String]): Unit = {
//初始化StreamingContext
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
val ssc = new StreamingContext(conf, Seconds(5))
//接受數(shù)據(jù),轉(zhuǎn)換成socketStream
//鏈接服務(wù)器舒岸,需要在服務(wù)安裝nc 執(zhí)行命令nc -lk 9999绅作,然后輸入數(shù)據(jù)
// val socketDStream = ssc.socketTextStream("linux120", 9999)
//鏈接本地
val socketDStream = ssc.socketTextStream("localhost", 9999)
//數(shù)據(jù)轉(zhuǎn)換
val wordDStream = socketDStream.flatMap(_.split("\\s+"))
val wordCountDStream = wordDStream.map((_, 1)).reduceByKey(_ + _)
//數(shù)據(jù)輸出
wordCountDStream.print()
//啟動(dòng)
ssc.start()
ssc.awaitTermination()
}
}
SocketServer程序(單線程),監(jiān)聽(tīng)本機(jī)指定端口蛾派,與socket連接后可發(fā)送信息:
package com.hhb.spark.streaming
import java.io.PrintWriter
import java.net.ServerSocket
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-19 16:14
**/
object OneThreadSocket {
def main(args: Array[String]): Unit = {
val arr: Array[String] = "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop".split("\\s+")
val n = arr.length
val port = 9999
val random = scala.util.Random
val server = new ServerSocket(port)
val socket = server.accept()
println("鏈接成功俄认,鏈接地址:" + socket.getInetAddress)
while (true) {
val writer = new PrintWriter(socket.getOutputStream)
writer.println(arr(random.nextInt(n)) + " " + arr(random.nextInt(n)))
writer.flush()
Thread.sleep(100)
}
}
}
SocketServer程序(多線程)
RDD隊(duì)列流
調(diào)試Spark Streaming應(yīng)用程序的時(shí)候,可使用streamingContext.queueStream(queueOfRDD) 創(chuàng)建基于RDD隊(duì)列的DStream
源碼:
/**
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* @param queue Queue of RDDs. Modifications to this data structure must be synchronized.
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
*
* @note Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
* those RDDs, so `queueStream` doesn't support checkpointing.
*/
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
//表示一次只處理一個(gè)RDD
oneAtATime: Boolean = true
): InputDStream[T] = {
queueStream(queue, oneAtATime, sc.makeRDD(Seq.empty[T], 1))
}
備注:
- oneAtTime :缺省為true洪乍,一次處理一個(gè)RDD眯杏,設(shè)為false,一次處理全部的RDD
- RDD隊(duì)列流可以使用local[1]
- 涉及到同時(shí)出隊(duì)和入隊(duì)操作壳澳,所以需要同步
每秒創(chuàng)建一個(gè)RDD(RDD存放1-100的整數(shù))岂贩,Streaming每隔1秒就對(duì)數(shù)據(jù)進(jìn)行處 理,計(jì)算RDD中數(shù)據(jù)除10取余的個(gè)數(shù)巷波。
package com.hhb.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
* @description:
* @date: 2020-11-19 16:20
**/
object RDDDStream {
def main(args: Array[String]): Unit = {
//初始化ssc,沒(méi)個(gè)一秒處理一次數(shù)據(jù)
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
//由于RDD是一秒產(chǎn)生一個(gè)萎津,所以執(zhí)行實(shí)現(xiàn)需要小于一秒,否則產(chǎn)生完RDD后直接停止了抹镊,
// 例如設(shè)置10秒锉屈,還沒(méi)等執(zhí)行,程序已經(jīng)結(jié)束
val ssc = new StreamingContext(conf, Seconds(1))
//組裝RDD的對(duì)流
val queue = mutable.Queue[RDD[Int]]()
// 讀取RDD生成DStream
val rddDStream = ssc.queueStream(queue)
//具體的業(yè)務(wù)處理邏輯
val countDStream = rddDStream.map(x => (x % 10, 1)).reduceByKey(_ + _)
//輸出
countDStream.print()
//啟動(dòng)
ssc.start()
//每秒產(chǎn)生一個(gè)RDD
val arr = 1 to 100
for (i <- 1 to 5) {
queue.synchronized {
queue += ssc.sparkContext.makeRDD(arr.map(_ * i))
}
Thread.sleep(1000)
}
ssc.stop()
}
}
DStream 轉(zhuǎn)換操作
DStream上的操作與RDD類似垮耳,分為Transformations(轉(zhuǎn)換) 和 Output Operations(輸出)兩種颈渊,池外轉(zhuǎn)換操作中還有一些比較特殊的方法,如:updateStateByKey终佛、transform以及各種Window相關(guān)的操作
Transformation | Meaning |
---|---|
map(func) | 將源DStream中的每個(gè)元素通過(guò)一個(gè)函數(shù)func從 而得到新的DStreams |
flatMap(func) | 和map類似俊嗽,但是每個(gè)輸入的項(xiàng)可以被映射為0 或更多項(xiàng) |
filter(func) | 選擇源DStream中函數(shù)func判為true的記錄作為 新DStreams |
repartition(numPartitions) | 通過(guò)創(chuàng)建更多或者更少的partition來(lái)改變此 DStream的并行級(jí)別 |
union(otherStream) | 聯(lián)合源DStreams和其他DStreams來(lái)得到新 DStream |
count() | 統(tǒng)計(jì)源DStreams中每個(gè)RDD所含元素的個(gè)數(shù)得 到單元素RDD的新DStreams |
reduce(func) | 通過(guò)函數(shù)func(兩個(gè)參數(shù)一個(gè)輸出)來(lái)整合源 DStreams中每個(gè)RDD元素得到單元素RDD的 DStreams。這個(gè)函數(shù)需要關(guān)聯(lián)從而可以被并行 計(jì)算 |
countByValue() | 對(duì)于DStreams中元素類型為K調(diào)用此函數(shù)查蓉,得到 包含(K,Long)對(duì)的新DStream乌询,其中Long值表明 相應(yīng)的K在源DStream中每個(gè)RDD出現(xiàn)的頻率 |
reduceByKey(func, [numTasks]) | 對(duì)(K,V)對(duì)的DStream調(diào)用此函數(shù)榜贴,返回同樣(K,V) 的新DStream豌研,新DStream中的對(duì)應(yīng)V為使用 reduce函數(shù)整合而來(lái)。默認(rèn)情況下唬党,這個(gè)操作使 用Spark默認(rèn)數(shù)量的并行任務(wù)(本地模式為2鹃共,集 群模式中的數(shù)量取決于配置參數(shù) spark.default.parallelism)。也可以傳入可選 的參數(shù)numTasks來(lái)設(shè)置不同數(shù)量的任務(wù) |
join(otherStream, [numTasks]) | 兩DStream分別為(K,V)和(K,W)對(duì)驶拱,返回(K,(V,W)) 對(duì)的新DStream |
cogroup(otherStream, [numTasks]) | 兩DStream分別為(K,V)和(K,W)對(duì)霜浴,返回(K, (Seq[V],Seq[W])對(duì)新DStreams |
transform(func) | 將RDD到RDD映射的函數(shù)func作用于源DStream 中每個(gè)RDD上得到新DStream。這個(gè)可用于在 DStream的RDD上做任意操作 |
updateStateByKey(func) | 得到”狀態(tài)”DStream蓝纲,其中每個(gè)key狀態(tài)的更新是 通過(guò)將給定函數(shù)用于此key的上一個(gè)狀態(tài)和新值 而得到阴孟。這個(gè)可用于保存每個(gè)key值的任意狀態(tài) 數(shù)據(jù) |
備注:
- 在DStream與RDD上的轉(zhuǎn)換操作非常類似(無(wú)狀態(tài)操作)
- DStream有自己特殊的操作(窗口操作晌纫、追蹤狀態(tài)變化操作)
- 在DStream上的轉(zhuǎn)換操作比RDD上的轉(zhuǎn)換操作少
DStream的轉(zhuǎn)化操作可以分為無(wú)狀態(tài)(stateless)和有狀態(tài)(stateful)兩種:
- 無(wú)狀態(tài)轉(zhuǎn)換操作,每個(gè)批次處理不依賴之前批次的數(shù)據(jù)永丝,常見(jiàn)的RDD轉(zhuǎn)化操作锹漱,例如map、filter慕嚷、reduceByKey等
- 有狀態(tài)轉(zhuǎn)化操作哥牍。需要使用之前批次的數(shù)據(jù) 或者 中間結(jié)果來(lái)計(jì)算當(dāng)前批次的數(shù)據(jù)。有狀態(tài)轉(zhuǎn)化操作包括:基于滑動(dòng)窗口的轉(zhuǎn)化操作 或 追蹤狀態(tài)變化的轉(zhuǎn)化操作
無(wú)狀態(tài)轉(zhuǎn)換
無(wú)狀態(tài)轉(zhuǎn)化操作就是把簡(jiǎn)單的RDD轉(zhuǎn)化操作應(yīng)用到每個(gè)批次上喝检,也就是轉(zhuǎn)化DStream中的每一個(gè)RDD嗅辣。創(chuàng)建的無(wú)狀態(tài)轉(zhuǎn)換包括:map、flatMap挠说、filter澡谭、repartition、reduceByKey损俭、groupByKey;直接作用在DStream上译暂。重要的轉(zhuǎn)換操作:transform。通過(guò)對(duì)源DStream的每個(gè)RDD應(yīng)用RDD-to-RDD函 數(shù)撩炊,創(chuàng)建一個(gè)新的DStream外永。支持在新的DStream中做任何RDD操作。
/**
* Create a new DStream in which each RDD is generated by applying a function on RDDs of
* the DStreams.
*/
def transform[T: ClassTag](
dstreams: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T] = withScope {
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
}
這是一個(gè)功能強(qiáng)大的函數(shù)拧咳,它可以允許開(kāi)發(fā)者直接操作其內(nèi)部的RDD伯顶。也就是說(shuō)開(kāi) 發(fā)者,可以提供任意一個(gè)RDD到RDD的函數(shù)骆膝,這個(gè)函數(shù)在數(shù)據(jù)流每個(gè)批次中都被調(diào) 用祭衩,生成一個(gè)新的流。
示例:黑名單過(guò)濾
假設(shè):arr1為黑名單數(shù)據(jù)(自定義)阅签,true表示數(shù)據(jù)生效掐暮,需要被過(guò)濾掉;false表示數(shù)據(jù) 未生效
val arr1 = Array(("spark", true), ("scala", false))
假設(shè):流式數(shù)據(jù)格式為"time word",需要根據(jù)黑名單中的數(shù)據(jù)對(duì)流式數(shù)據(jù)執(zhí)行過(guò)濾操 作政钟。如"2 spark"要被過(guò)濾掉
1 hadoop
2 spark
3 scala
4 java
5 hive
結(jié)果:"2 spark" 被過(guò)濾
方法一:使用外鏈接
package com.hhb.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @description:ConstantInputDStream 主要用于流式計(jì)算的測(cè)試
* @date: 2020-11-20 10:09
**/
object BlackListFilter1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
val ssc = new StreamingContext(conf, Seconds(5))
//準(zhǔn)備數(shù)據(jù)
val arr: Array[(String, Int)] = "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop hello scala".split("\\s+").map(_.toLowerCase).zipWithIndex
val data = arr.map { case (k, v) => v + " " + k }
val rdd = ssc.sparkContext.makeRDD(data)
// 黑名單
val blackList = Array(("spark", true), ("scala", false),("hbase", true),("hello", true))
val blackListRDD = ssc.sparkContext.makeRDD(blackList)
//new ConstantInputDStream[String](ssc, rdd) =》 1 hello
// new ConstantInputDStream[String](ssc, rdd).map(value => (value.split("\\s+")(1), value)) => (hello,(1 hello))
val rddDStream = new ConstantInputDStream[String](ssc, rdd).map(value => (value.split("\\s+")(1), value))
val resultDStream = rddDStream.transform { rdd =>
// 通過(guò)leftOuterJoin操作既保留了左側(cè)RDD的所有內(nèi)容路克,又獲得了內(nèi)容是否在黑名單中
rdd.leftOuterJoin(blackListRDD)
// (hello,(hello 1,option(none)),(spark,(spark 3,option(true)))
.filter { case (_, (_, rightValue)) => rightValue.getOrElse(false) != true }
.map { case (_, (leftValue, _)) => leftValue }
}
resultDStream.print()
// 啟動(dòng)流式作業(yè)
ssc.start()
ssc.awaitTermination()
}
}
方案二:使用SQL或者DSL
package com.hhb.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @description:使用SQL或者DSL
* @author: huanghongbo
* @date: 2020-11-20 10:39
**/
object BlackListFilter2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("warn")
//準(zhǔn)備數(shù)據(jù)
val arr: Array[(String, Int)] = "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop hello scala".split("\\s+").map(_.toLowerCase).zipWithIndex
val arrRDD = ssc.sparkContext.makeRDD(arr).map { case (k, v) => v + " " + k }
val constantDStream = new ConstantInputDStream[String](ssc, arrRDD)
// 黑名單
val blackList = Array(("spark", true), ("scala", false), ("hbase", true), ("hello", true))
val blackListRDD = ssc.sparkContext.makeRDD(blackList)
val valueDStream = constantDStream.map(line => (line.split("\\s+")(1), line))
valueDStream.transform { rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val wordDF = rdd.toDF("word", "line")
val blackListDF = blackListRDD.toDF("word1", "flag")
wordDF.join(blackListDF, $"word" === $"word1", "left")
.filter("flag == false or flag is null")
.select("line")
.rdd
}.print()
ssc.start()
ssc.awaitTermination()
}
}
方案三:直接過(guò)濾,沒(méi)有shuffle
package com.hhb.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ConstantInputDStream
/**
* @description: 直接過(guò)濾养交,沒(méi)有shuffle
* @author: huanghongbo
* @date: 2020-11-20 10:39
**/
object BlackListFilter3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("warn")
//準(zhǔn)備數(shù)據(jù)
val arr: Array[(String, Int)] = "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop hello scala".split("\\s+").map(_.toLowerCase).zipWithIndex
val arrRDD = ssc.sparkContext.makeRDD(arr).map { case (k, v) => v + " " + k }
val constantDStream = new ConstantInputDStream[String](ssc, arrRDD)
// 黑名單
val blackList = Array(("spark", true), ("scala", false), ("hbase", true), ("hello", true))
.filter { case (_, v) => v }
.map { case (k, _) => k }
constantDStream.map(line => (line.split("\\s+")(1), line))
.filter { case (word, _) => !blackList.contains(word) }
.map { case (_, line) => line }
.print()
ssc.start()
ssc.awaitTermination()
}
}
有狀態(tài)轉(zhuǎn)換
有狀態(tài)轉(zhuǎn)化的主要有兩種操作:窗口操作精算、狀態(tài)跟蹤操作
窗口操作
Window Operations 可以設(shè)計(jì)窗口大小 和 滑動(dòng)窗口間隔。來(lái)動(dòng)態(tài)的獲取當(dāng)前Streaming的狀態(tài)碎连』矣穑基于窗口的操作會(huì)在一個(gè)比StreamingContext和batchDuration(批次間隔)更長(zhǎng)的時(shí)間范圍內(nèi),通過(guò)整合多個(gè)批次的結(jié)果,計(jì)算整個(gè)窗口的結(jié)果
基于窗口的操作需要兩個(gè)參數(shù):
- 窗口長(zhǎng)度(windowDuration)廉嚼∶蹈洌控制每次計(jì)算最近的多少個(gè)批次的數(shù)據(jù)
- 滑動(dòng)間隔(slideDuration)。用來(lái)控制對(duì)新的DStream進(jìn)行計(jì)算的間隔
兩者都必須是StreamingContext中批次間隔(batchDuration)的整數(shù)倍
每秒發(fā)送一個(gè)數(shù)字:
package com.hhb.spark.streaming
import java.io.PrintWriter
import java.net.ServerSocket
/**
* @description:
* @date: 2020-11-20 14:31
**/
object NumSocket {
def main(args: Array[String]): Unit = {
val server = new ServerSocket(9999)
val socket = server.accept()
var i = 0
println("服務(wù)注冊(cè):" + socket.getInetAddress)
while (true) {
i += 1
val writer = new PrintWriter(socket.getOutputStream)
writer.println(i)
writer.flush()
Thread.sleep(1000)
}
}
}
案例一:
觀察窗口的數(shù)據(jù)
觀察batchDuration怠噪、windowDuration摘悴、slideDuration三者之間的關(guān)系
使用窗口相關(guān)的操作
package com.hhb.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @description:
* @date: 2020-11-20 14:31
**/
object WindowDemo {
def main(args: Array[String]): Unit = {
//初始化
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
//每5秒生成一個(gè)RDD
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("error")
val lines = ssc.socketTextStream("localhost", 9999)
lines.foreachRDD { (rdd, time) =>
println(rdd.id + " time: " + time)
println("*" * 15)
rdd.foreach(x => print(x + "\t"))
}
// 窗口長(zhǎng)度為20s,每隔10s滑動(dòng)一次
lines.reduceByWindow(_ + " " + _, Seconds(20), Seconds(10))
.print()
println("=" * 15)
//對(duì)數(shù)據(jù)進(jìn)行求和
lines.map(_.toInt).window(Seconds(20), Seconds(10)).reduce(_ + _).print()
//對(duì)數(shù)據(jù)進(jìn)行求和
lines.map(_.toInt).reduceByWindow(_ + _, Seconds(20), Seconds(10)).print()
ssc.start()
ssc.awaitTermination()
}
}
案例二:熱點(diǎn)搜索詞實(shí)時(shí)統(tǒng)計(jì)舰绘。每隔 10 秒蹂喻,統(tǒng)計(jì)最近20秒的詞出現(xiàn)的次數(shù)
package com.hhb.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-20 15:12
**/
object HotWordStats {
def main(args: Array[String]): Unit = {
//初始化StreamingContext
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("error")
// 通過(guò)reduceByKeyAndWindow算子, 每隔10秒統(tǒng)計(jì)最近20秒的詞出現(xiàn)的次數(shù) ,后 3個(gè)參數(shù):窗口時(shí)間長(zhǎng)度、滑動(dòng)窗口時(shí)間捂寿、分區(qū)
val wordDStream = ssc.socketTextStream("localhost", 9999).flatMap(_.split("\\s+")).map((_, 1))
wordDStream.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(20), Seconds(10)).print()
println("*" * 20)
//設(shè)置檢查點(diǎn)口四,檢查點(diǎn)具有容錯(cuò)機(jī)制。生產(chǎn)環(huán)境中應(yīng)設(shè)置到HDFS
ssc.checkpoint("data/checkpoint/")
// 這里需要checkpoint的支持
wordDStream.reduceByKeyAndWindow(_ + _, _ - _, Seconds(20), Seconds(10)).print()
ssc.start()
ssc.awaitTermination()
}
}
狀態(tài)追蹤(updateStateByKey)
UpdateStateByKey的主要功能:
- 為Streaming中每一個(gè)key維護(hù)一份state狀態(tài)秦陋,state類型可以是任意類型的蔓彩,可以是自定義對(duì)象;更新函數(shù)也可以是自定義的
- 通過(guò)更新函數(shù)對(duì)該key的狀態(tài)不斷更新驳概,對(duì)于每個(gè)新的batch而言赤嚼,Spark Streaming會(huì)在使用updateStateKey的時(shí)候?yàn)橐呀?jīng)存在的key進(jìn)行state的狀態(tài)更新
- 使用updateStateByKey時(shí)要開(kāi)啟checkpoint功能
源碼:
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* In every batch the updateFunc will be called for each state even if there are no new values.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner())
}
流式程序啟動(dòng)后計(jì)算wordcount的累計(jì)值,將每個(gè)批次的結(jié)果保存到文件
package com.hhb.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-20 17:56
**/
object StateTracker1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("error")
ssc.checkpoint("data/checkpoint")
//準(zhǔn)備數(shù)據(jù)
val wordDStream = ssc.socketTextStream("localhost", 9999).flatMap(_.split("\\s+")).map((_, 1))
// def updateStateByKey[S: ClassTag](updateFunc: (Seq[V]value的類型, Option[S]) => Option[S] 返回值
//定義狀態(tài)更新函數(shù)
//函數(shù)常量定義顺又,返回值類型是Some(Int)更卒,表示的含義是最新的狀態(tài)
//函數(shù)的功能是將當(dāng)前時(shí)間間隔產(chǎn)生的key的value的集合,加到上一個(gè)狀態(tài)稚照,得到最新?tīng)顟B(tài)
val updateFunc = (currVale: Seq[Int], preValue: Option[Int]) => {
//通過(guò)Spark內(nèi)部的reduceByKey按key規(guī)約蹂空,然后這里傳入某key當(dāng)前批次的Seq,在計(jì)算當(dāng)前批次的總和
val currSum = currVale.sum
//以前已經(jīng)累加的值
val preSum = preValue.getOrElse(0)
Option(currSum + preSum)
}
val resultDStream = wordDStream.updateStateByKey[Int](updateFunc)
resultDStream.cache()
resultDStream.print()
// 把DStream保存到文本文件中果录,會(huì)生成很多的小文件上枕。一個(gè)批次生成一個(gè)目錄
resultDStream.repartition(1).saveAsTextFiles("data/output1/")
ssc.start()
ssc.awaitTermination()
}
}
統(tǒng)計(jì)全局的key的狀態(tài),但是就算沒(méi)有數(shù)據(jù)輸入弱恒,也會(huì)在每一個(gè)批次的時(shí)候返回之前的key的狀態(tài)辨萍。這樣的確定:如果數(shù)據(jù)量很大的話,checkpoint數(shù)據(jù)會(huì)占用較大的存儲(chǔ)返弹,而且效率也不高锈玉。
mapWithState:也是用于全局統(tǒng)計(jì)key的狀態(tài),如果沒(méi)有數(shù)據(jù)輸入琉苇,便不會(huì)返回之前key的狀態(tài)嘲玫,有一點(diǎn)增量的感覺(jué),這樣做的好處是并扇,只關(guān)心那些已經(jīng)發(fā)生變化的key,對(duì)于沒(méi)有數(shù)據(jù)輸入抡诞,則不會(huì)返回那些沒(méi)有變化的key的數(shù)據(jù)穷蛹,即使數(shù)據(jù)量很大土陪,checkpoint也不會(huì)像updateStateByKey那樣,占用太多的存儲(chǔ)
package com.hhb.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-20 17:56
**/
object StateTracker2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("error")
ssc.checkpoint("data/checkpoint")
//準(zhǔn)備數(shù)據(jù)
val wordDStream = ssc.socketTextStream("localhost", 9999).flatMap(_.split("\\s+")).map((_, 1))
//(KeyType, Option[ValueType], State[StateType]) => MappedType
//(key的類型肴熏,輸入的數(shù)據(jù)鬼雀,中間匯總)=> 返回值類型
def func(key: String, option: Option[Int], state: State[Int]): (String, Int) = {
val sum = option.getOrElse(0) + state.getOption().getOrElse(0)
state.update(sum)
(key, sum)
}
val spec = StateSpec.function(func _)
//只統(tǒng)計(jì)本批次出現(xiàn)的數(shù)據(jù),不會(huì)把所有數(shù)據(jù)匯總
val resultDStream = wordDStream.mapWithState(spec)
//.stateSnapshots() 以快照的形式蛙吏,把所有的數(shù)據(jù)都顯示出來(lái),結(jié)果類似于updateStateByKey
.stateSnapshots()
resultDStream.cache()
resultDStream.print()
// 把DStream保存到文本文件中源哩,會(huì)生成很多的小文件。一個(gè)批次生成一個(gè)目錄
resultDStream.repartition(1).saveAsTextFiles("data/output2/")
ssc.start()
ssc.awaitTermination()
}
}
總結(jié):mapWithState按照時(shí)間線在每一個(gè)批次間隔返回之前的發(fā)生改變的或者新的key的狀態(tài)鸦做,不發(fā)生變化的不返回励烦;updateStateByKey統(tǒng)計(jì)的是全局Key的狀態(tài),就算沒(méi)有數(shù)據(jù)輸入也會(huì)在每個(gè)批次的時(shí)候返回之前的Key的狀態(tài)泼诱。也就是說(shuō)坛掠,mapWithState統(tǒng)計(jì)的是一個(gè)批次的數(shù)據(jù),updateStateByKey統(tǒng)計(jì)的是全局的key
DStream 輸出操作
輸出操作定義 DStream 的輸出操作治筒。與 RDD 中的惰性求值類似屉栓,如果一個(gè)DStream及其派生出的 DStream 都沒(méi)有被執(zhí)行輸出操作,那么這些 DStream 就都不會(huì)被求值耸袜。如果 StreamingContext 中沒(méi)有設(shè)定輸出操作友多,整個(gè)流式作業(yè)不會(huì)啟動(dòng)。
Output Operation | Meaning |
---|---|
print() | 在運(yùn)行流程序的Driver上堤框,輸出DStream中每一批次數(shù)據(jù)最開(kāi)始的10個(gè)元素夷陋,用于開(kāi)發(fā)和調(diào)試 |
saveAsTestFile(prefix,[suffix]) | 以text文件形成存儲(chǔ)DStream的內(nèi)容,每一批次的存儲(chǔ)文件名基于參數(shù)中的prefix和suffix |
saveAsObjectFiles(prefix,[suffix]) | 以 Java 對(duì)象序列化的方式將Stream中的數(shù)據(jù)保 存為 Sequence Files胰锌。每一批次的存儲(chǔ)文件名基 于參數(shù)中的為"prefix-TIME_IN_MS[.suffix]" |
saveAsHadoopFiles(prefix, [suffix]) | 將Stream中的數(shù)據(jù)保存為 Hadoop files骗绕。每一批 次的存儲(chǔ)文件名基于參數(shù)中的為"prefix- TIME_IN_MS[.suffix]" |
foreachRDD(func) | 最通用的輸出操作。將函數(shù) func 應(yīng)用于 DStream 的每一個(gè)RDD上 |
通用的輸出操作foreachRDD资昧,用來(lái)對(duì)DStream中的RDD進(jìn)行任意計(jì)算酬土,在foreachRDD中,可以重用Spark RDD 中所有的Action操作格带,需要注意的是:
- 鏈接不要定義在Driver
- 鏈接定義在RDD的foreach算子中撤缴,則遍歷RDD的每個(gè)元素時(shí)都創(chuàng)建鏈接,得不償失
- 應(yīng)該在RDD的foreachPartition中定義鏈接叽唱,每個(gè)分區(qū)創(chuàng)建一個(gè)鏈接
- 可以考慮使用連接池
與Kafka整合
針對(duì)不同的spark屈呕、kafka版本,集成處理數(shù)據(jù)的方式分為兩種:Receiver Approach 和Direct Approach棺亭,不同集成版本處理方式的支持虎眨,可參考下圖:
對(duì)Kafka的支持分為兩個(gè)版本08(在高版本中將被廢棄)、010,兩個(gè)版本不兼容嗽桩。
Kafka-08接口
Receiver based Approach
基于Receiver的方式使用kafka舊版消費(fèi)者高階API實(shí)現(xiàn)岳守。對(duì)于所有的Receiver,通過(guò)Kafka接收的數(shù)據(jù)被存儲(chǔ)于Spark的Executors上碌冶,底層是寫入BlockManager中湿痢,默認(rèn)200ms生成一個(gè)block(spark.streaming.blockInterval)。然后由spark Streaming提交的job構(gòu)建BlockRDD扑庞,最終以Spark Core任務(wù)的形式運(yùn)行譬重,對(duì)應(yīng)的Receiver方式,有以下幾點(diǎn)需要注意:
- Receiver 作為一個(gè)常駐線程調(diào)度到Executor上運(yùn)行罐氨,占用一個(gè)CPU
- Receiver個(gè)數(shù)由KafkaUtil.createStream調(diào)用次數(shù)來(lái)決定臀规,一次一個(gè)Receiver。
- Kafka中的topic分區(qū)并不能關(guān)聯(lián)產(chǎn)生在spark streaming中的rdd分區(qū)岂昭,增加在KafkaUtils.createStream()中的指定的topic分區(qū)數(shù)以现,僅僅增加了單個(gè)receiver消費(fèi)的topic線程數(shù),他不會(huì)正價(jià)處理數(shù)據(jù)中并行的Spark數(shù)量【即:topicMap[topic,num_threads]中约啊,value對(duì)應(yīng)的數(shù)值應(yīng)該是每個(gè)topic對(duì)應(yīng)的消費(fèi)線程數(shù)】
- receiver默認(rèn)200ms生成一個(gè)block邑遏,可根據(jù)數(shù)據(jù)量大小調(diào)整block生成周期,一個(gè)block對(duì)應(yīng)RDD分區(qū)恰矩。
- receiver接收的數(shù)據(jù)會(huì)放入到BlockManager记盒,每個(gè)Executor都會(huì)有一個(gè)BlockManager實(shí)例,由于數(shù)據(jù)本地性外傅,那些存在Receiver的Executor會(huì)被調(diào)度執(zhí)行更多的Task纪吮,就會(huì)導(dǎo)致某些executor比較空閑
- 默認(rèn)情況下,Receiver是可能丟失數(shù)據(jù)的萎胰,可以通過(guò)設(shè)置spark.streaming.receiver.writeAheadLog.enable為true開(kāi)啟預(yù)寫日志機(jī)制碾盟,將數(shù)據(jù)先寫入到一個(gè)可靠的分布式文件系統(tǒng)(如HDFS),確保數(shù)據(jù)不丟失技竟,但會(huì)損失一定性能冰肴。
Kafka-08接口(Receiver方式):
- Offset保持在ZK中,系統(tǒng)管理
- 對(duì)應(yīng)kafka的版本0.8.2.1+
- 接口底層實(shí)現(xiàn)使用Kafka舊版消費(fèi)者高階API
- DStream底層實(shí)現(xiàn)為BlockRDD
Kafka-08接口(Receiver with WAL)
- 增強(qiáng)了故障恢復(fù)的能力
- 接收的數(shù)據(jù)與Dirver的元數(shù)據(jù)保存到HDFS
- 增加了流式應(yīng)用處理的延遲
Direct Approach
Direct Approach是 Spark Streaming不使用Receiver集成kafka的方式榔组,在企業(yè)生產(chǎn)環(huán)境中使用較多熙尉。相較于Receiver,有以下特點(diǎn):
不使用Receiver搓扯。減少不必要的CPU占用检痰,減少了Receiver接收數(shù)據(jù)寫入BlockManager,然后運(yùn)行時(shí)再通過(guò)BlockId锨推、網(wǎng)絡(luò)傳輸铅歼、磁盤讀取等來(lái)獲取數(shù)據(jù)的整個(gè)過(guò)程公壤,提高了效率;無(wú)需WAL谭贪,進(jìn)一步減少磁盤IO
-
Direct方式生成了RDD時(shí)KafkaRDD境钟,他的分區(qū)數(shù)與Kafka分區(qū)數(shù)保存一致锦担,便于控制并行度
注意:在Shuffle或Repartition操作后生成的RDD俭识,這種對(duì)應(yīng)關(guān)系會(huì)失效
-
可以手動(dòng)維護(hù)offset,實(shí)現(xiàn)Exactly once 語(yǔ)義(處理僅處理一次)
Direct Approach.png
Kafka-010接口
Spark Streaming與kafka 0.10的整合洞渔,和0.8版本的 Direct 方式很像秕岛。Kafka的分區(qū) 和Spark的RDD分區(qū)是一一對(duì)應(yīng)的侵贵,可以獲取 offsets 和元數(shù)據(jù),API 使用起來(lái)沒(méi)有 顯著的區(qū)別。
添加依賴:
<!-- sparkStreaming 結(jié)合 kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
不要手動(dòng)添加 org.apache.kafka 相關(guān)的依賴舔株,如kafka-clients。spark-streaming- kafka-0-10已經(jīng)包含相關(guān)的依賴了旨指,不同的版本會(huì)有不同程度的不兼容宏侍。
使用kafka010接口從Kafka中獲取數(shù)據(jù)
- Kafka集群
- Kakfa生產(chǎn)者發(fā)送數(shù)據(jù)
- Spark Streaming程序接收數(shù)據(jù)
package com.hhb.spark.streaming.kafka
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-23 11:08
**/
object KafkaProducer {
def main(args: Array[String]): Unit = {
val brokers = "linux121:9092,linux122:9092,linux123:9092"
val topic = "topicA"
val prop = new Properties()
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.StringSerializer])
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.StringSerializer])
// 創(chuàng)建生產(chǎn)者
val producer = new KafkaProducer[String, String](prop)
//向生產(chǎn)者里面發(fā)送信息
for (i <- 0 to 1000000) {
val record = new ProducerRecord[String, String](topic, i.toString, i.toString)
producer.send(record)
println(s"i -> $i")
Thread.sleep(100)
}
producer.close()
}
}
消費(fèi):
package com.hhb.spark.streaming.kafka
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-23 12:35
**/
object KafkaDStream1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("warn")
val topics = Array("topicA")
val groupId = "groupId"
//定義Kafka相關(guān)參數(shù)
val param: Map[String, Object] = getKafkaParam(groupId)
val kafkaDStream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, param)
)
kafkaDStream.foreachRDD((rdd, time) =>
println(s"rdd.count=>${rdd.count()},time=> ${time}")
)
ssc.start()
ssc.awaitTermination()
}
/**
* 定義Kafka相關(guān)參數(shù)
*
* @return
*/
def getKafkaParam(groupId: String): Map[String, Object] = {
Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092,linux122:9092,linux123:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[org.apache.kafka.common.serialization.StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[org.apache.kafka.common.serialization.StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
}
}
LocationStrategies(本地策略)
- LocationStrategies.PreferBrokers :如果Executor在Kafka集群中的某些節(jié)點(diǎn)上,可以使用這種策略医增,那么Executor中的數(shù)據(jù)都會(huì)來(lái)之當(dāng)前broker節(jié)點(diǎn)
- LocationStrategies.PreferConsistent: 大多數(shù)情況下使用的策略慎皱,將Kafka分區(qū)均勻分布在Spark集群的Executor上
- LocationStrategies.PreferFixed:如果節(jié)點(diǎn)之間的分區(qū)明顯分布不均,使用這種策略叶骨。通過(guò)一個(gè)Map指定將topic分區(qū)分布在哪些節(jié)點(diǎn)中
ConsumerStrategies(消費(fèi)策略)
- ConsumerStrategies.Subscribe茫多,用來(lái)訂閱一組Topic
- ConsumerStrategies.SubscribePattern,使用正則來(lái)指定感興趣的topic
- ConsumerStrategies.Assign忽刽,指定固定分區(qū)的集合
這三種策略都有重載構(gòu)造函數(shù)天揖,允許指定特定分區(qū)的起始偏移量;使用 Subscribe 或 SubscribePattern 在運(yùn)行時(shí)能實(shí)現(xiàn)分區(qū)自動(dòng)發(fā)現(xiàn)。
kafka相關(guān)命令:
# 創(chuàng)建Topic
kafka-topics.sh --create --zookeeper localhost:2181/myKafka --topic topicA --partitions 3 --replication-factor 3
# 顯示topic信息
kafka-topics.sh --zookeeper localhost:2181/myKafka --topic topicA --describe
# 檢查 topic 的最大offset
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list linux121:9092,linux122:9092,linux123:9092 --topic topicA --time -1
# 檢查消費(fèi)者的offset
kafka-consumer-groups.sh --bootstrap-server linux121:9092 --describe --group groupId
# 重置消費(fèi)者offset
kafka-consumer-groups.sh --bootstrap-server linux121:9092,linux122:9092,linux123:9092 --group groupId --reset-offsets --execute --to-offset 0 --topic topicA
Offset管理
Spark Streaming集成Kafka跪帝,允許Kafka中讀取一個(gè)或者多個(gè)topic數(shù)據(jù)今膊,一個(gè)Kafka Topic包含一個(gè)或多個(gè)分區(qū),每個(gè)分區(qū)中的消息順序存儲(chǔ)伞剑,并使用offset來(lái)標(biāo)記消息的位置斑唬,開(kāi)發(fā)者可以在SparkStreaming應(yīng)用中通過(guò)offset來(lái)控制數(shù)據(jù)的讀取位置。
Offsets管理對(duì)于保證流式應(yīng)用在整個(gè)生命周期中數(shù)據(jù)的連貫性是非常重要的纸泄,如果在應(yīng)用停止或報(bào)錯(cuò)退出之前沒(méi)有將offset持久化保存赖钞,該信息就會(huì)丟失,那么Spark Streaming就沒(méi)有辦法從上次停止或報(bào)錯(cuò)的位置繼續(xù)消費(fèi)Kafka中的消息聘裁。
獲取偏移量(Obtaining Offsets)
Spark Streaming與kafka整合時(shí)雪营,允許獲取其消費(fèi)的 offset ,具體方法如下:
package com.hhb.spark.streaming.kafka
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-23 12:35
**/
object KafkaDStream1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("warn")
val topics = Array("topicA")
val groupId = "groupId"
//定義Kafka相關(guān)參數(shù)
val param: Map[String, Object] = getKafkaParam(groupId)
val kafkaDStream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, param)
)
//topic:topicA,partition:1,from: 0,to:90
//topic:topicA,partition:0,from: 0,to:122
//topic:topicA,partition:2,from: 0,to:98
//topic:topicA,partition:1,from: 90,to:90
//topic:topicA,partition:0,from: 122,to:122
//topic:topicA,partition:2,from: 98,to:98
kafkaDStream.foreachRDD { rdd =>
val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
ranges.foreach { range =>
println(s"topic:${range.topic},partition:${range.partition},from: ${range.fromOffset},to:${range.untilOffset}")
}
}
// kafkaDStream.foreachRDD((rdd, time) =>
// println(s"rdd.count=>${rdd.count()},time=> ${time}")
// )
ssc.start()
ssc.awaitTermination()
}
/**
* 定義Kafka相關(guān)參數(shù)
*
* @return
*/
def getKafkaParam(groupId: String): Map[String, Object] = {
Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092,linux122:9092,linux123:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[org.apache.kafka.common.serialization.StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[org.apache.kafka.common.serialization.StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
}
}
注意:對(duì)HasOffsetRanges的類型轉(zhuǎn)換只有在對(duì)createDirectStream調(diào)用的第一個(gè)方法中完成才會(huì)成功衡便,而不是在隨后的方法鏈中献起。RDD分區(qū)和Kafka分區(qū)之間的對(duì)應(yīng)關(guān)系在shuffle或重分區(qū)后丟失洋访,如reduceByKey或Window。
存儲(chǔ)偏移量(Storing Offsets)
在Streaming 程序失敗的情況下谴餐,kafka交付語(yǔ)義取決于如何以及何時(shí)存儲(chǔ)偏移量姻政。Spark 輸出操作的語(yǔ)義為 at-least-once
如果要失效EOS語(yǔ)義(Exactly Once Semantics),必須在冪等的輸出之后存儲(chǔ)偏移量或者將存儲(chǔ)偏移量與輸出放在一個(gè)事務(wù)中,可以安裝增加可靠性(和代碼復(fù)雜度)的順序使用一下選項(xiàng)來(lái)存儲(chǔ)偏移量
-
CheckPoint
CheckPoint是對(duì)Spark Steaming運(yùn)行過(guò)程中的元數(shù)據(jù)和每個(gè)RDDs的數(shù)據(jù)狀態(tài)保存到一個(gè)持久化系統(tǒng)中岂嗓,這里也包含了offset汁展,一般是HDFS、S3厌殉,如果應(yīng)用程序或者集群掛了食绿,可以迅速恢復(fù),如果Streamging 程序的代碼變了公罕,重新打包執(zhí)行就會(huì)出翔反序列化異常的問(wèn)題器紧。這是因?yàn)镃heckPoint首次持久化時(shí)會(huì)講整個(gè)jar序列化,以便重啟時(shí)恢復(fù)楼眷,重新打包之后铲汪,新舊代碼邏輯不同,就會(huì)報(bào)錯(cuò)或仍然執(zhí)行舊版代碼罐柳。要解決這個(gè)問(wèn)題掌腰,只能將HDFS上的checkpoint文件刪除,但是這樣也會(huì)刪除Kafka的offset信息
-
Kafka
默認(rèn)情況下硝清,消費(fèi)者定期自動(dòng)提交偏移量辅斟,它將偏移量存儲(chǔ)在一個(gè)特殊的Kafka主題中(__consumer_offsets)。但在某些情況下芦拿,這將導(dǎo)致問(wèn)題士飒,因?yàn)橄⒖赡芤呀?jīng) 被消費(fèi)者從Kafka拉去出來(lái),但是還沒(méi)被處理蔗崎〗湍唬可以將 enable.auto.commit 設(shè)置為 false ,在 Streaming 程序輸出結(jié)果之后缓苛,手動(dòng) 提交偏移到kafka芳撒。與檢查點(diǎn)相比,使用Kafka保存偏移量的優(yōu)點(diǎn)是無(wú)論應(yīng)用程序代碼如何更改未桥,偏移量 仍然有效笔刹。
stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 在輸出操作完成之后,手工提交偏移量;此時(shí)將偏移量提交到 Kafka 的消息隊(duì)列中 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }
與HasOffsetRanges一樣冬耿,只有在createDirectStream的結(jié)果上調(diào)用時(shí)舌菜,轉(zhuǎn)換到 CanCommitOffsets才會(huì)成功,而不是在轉(zhuǎn)換之后亦镶。commitAsync調(diào)用是線程安全 的日月,但必須在輸出之后執(zhí)行袱瓮。
-
自定義存儲(chǔ)
Offsets可以通過(guò)多種方式來(lái)管理,但是一般來(lái)說(shuō)遵循下面的步驟:
- 在 DStream 初始化的時(shí)候爱咬,需要指定每個(gè)分區(qū)的offset用于從指定位置讀取數(shù)據(jù)
- 讀取并處理消息
- 處理完之后存儲(chǔ)結(jié)果數(shù)據(jù)
- 用虛線圈存儲(chǔ)和提交offset尺借,強(qiáng)調(diào)用戶可能會(huì)執(zhí)行一系列操作來(lái)滿足他們更加嚴(yán)格的語(yǔ)義要求。這包括冪等操作和通過(guò)原子操作的方式存儲(chǔ)offset
- 將 offsets 保存在外部持久化數(shù)據(jù)庫(kù)如 HBase精拟、Kafka燎斩、HDFS、ZooKeeper串前、 Redis瘫里、MySQL ... ...
可以將 Offsets 存儲(chǔ)到HDFS中实蔽,但這并不是一個(gè)好的方案荡碾。因?yàn)镠DFS延遲有點(diǎn)高, 此外將每批次數(shù)據(jù)的offset存儲(chǔ)到HDFS中還會(huì)帶來(lái)小文件問(wèn)題;
可以將 Offset 存儲(chǔ)到保存ZK中局装,但是將ZK作為存儲(chǔ)用坛吁,也并不是一個(gè)明智的選擇, 同時(shí)ZK也不適合頻繁的讀寫操作;
Redis管理的Offset
要想將Offset保存到外部存儲(chǔ)中铐尚,關(guān)鍵要實(shí)現(xiàn)以下幾個(gè)功能:
- Streaming程序啟動(dòng)時(shí)拨脉,從外部存儲(chǔ)獲取保存的Offsets(執(zhí)行一次)
- 在foreachRDD中,每個(gè)批次數(shù)據(jù)處理之后宣增,更新外部存儲(chǔ)的offsets(多次執(zhí)行)
案例一:使用自定義的offsets玫膀,從kafka讀數(shù)據(jù);處理完數(shù)據(jù)后打印offsets
package com.hhb.spark.streaming.kafka
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @description: 使用自定義offset,從kafka讀數(shù)據(jù)爹脾,處理完數(shù)據(jù)后打印offsets
* @date: 2020-11-23 16:07
**/
object KafkaDStream2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("warn")
val topics = Array("topicA")
val groupId = "groupId"
val param: Map[String, Object] = getKafkaMap(groupId)
val offsets: Map[TopicPartition, Long] = Map(
new TopicPartition(topics(0), 0) -> 50,
new TopicPartition(topics(0), 1) -> 60,
new TopicPartition(topics(0), 2) -> 80,
)
val kafkaDStream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, param, offsets)
)
kafkaDStream.foreachRDD { (rdd, times) =>
println(s"rdd.count = ${rdd.count()},time=>${times}")
val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
ranges.foreach { range =>
println(s"partition=>${range.partition},topic => ${range.topic},from=> ${range.fromOffset} ,to=>${range.untilOffset}")
}
}
ssc.start()
ssc.awaitTermination()
}
def getKafkaMap(groupId: String): Map[String, Object] = {
Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092,linux122:9092,linux123:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
}
}
案例二:根據(jù) key 從 Redis 獲取offsets帖旨,根據(jù)該offsets從kafka讀數(shù)據(jù);處理完數(shù)據(jù) 后將offsets保存到 Redis
Redis管理的Offsets:
1、數(shù)據(jù)結(jié)構(gòu)選擇:Hash;key灵妨、field解阅、value
Key:kafka:topic:TopicName:groupid
Field:partition
Value:offset
2、從 Redis 中獲取保存的offsets
3泌霍、消費(fèi)數(shù)據(jù)后將offsets保存到redis
工具類(Redis讀取货抄、保存offset)
package com.hhb.spark.streaming.kafka
import java.util
import java.util.{HashSet, Set}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{HostAndPort, Jedis, JedisCluster, JedisPool, JedisPoolConfig}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-23 16:59
**/
object OffsetsRedisUtils {
private val config = new JedisPoolConfig
val jedisClusterNode: util.Set[HostAndPort] = new util.HashSet[HostAndPort]
jedisClusterNode.add(new HostAndPort("59.110.241.53", 7001))
jedisClusterNode.add(new HostAndPort("59.110.241.53", 7002))
jedisClusterNode.add(new HostAndPort("59.110.241.53", 7003))
jedisClusterNode.add(new HostAndPort("59.110.241.53", 7004))
jedisClusterNode.add(new HostAndPort("59.110.241.53", 7005))
jedisClusterNode.add(new HostAndPort("59.110.241.53", 7006))
jedisClusterNode.add(new HostAndPort("59.110.241.53", 7007))
//最大鏈接
config.setMaxTotal(30)
//最大空閑
config.setMaxIdle(10)
private val topicKeyPrefix = "kafka:topic"
private def getJcd() = new JedisCluster(jedisClusterNode, config)
//獲取key信息
private def getKey(topic: String, groupId: String): String = {
topicKeyPrefix + ":" + topic + ":" + groupId
}
def getOffset(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
val cluster = getJcd()
val result = topics.map { topic =>
import scala.collection.JavaConverters._
//獲取該topic、groupId下所有的信息
cluster.hgetAll(getKey(topic, groupId))
//轉(zhuǎn)換成scala
.asScala
//再次遍歷朱转,將分區(qū)蟹地,offset 轉(zhuǎn)換成(new TopicPartition(topic, k.toInt), v.toLong)結(jié)構(gòu)
.map { case (k, v) => (new TopicPartition(topic, k.toInt), v.toLong) }
}
cluster.close()
//壓平數(shù)據(jù)后轉(zhuǎn)map
result.flatten.toMap
}
//自己的方法
def saveOffset(ranges: Array[OffsetRange], groupId: String): Unit = {
val cluster = getJcd()
ranges.foreach { range =>
val key = getKey(range.topic, groupId)
println(s"key=>${key},part=>${range.partition.toString},offset=>${range.untilOffset.toString}")
cluster.hset(key, range.partition.toString, range.untilOffset.toString)
}
cluster.close()
}
//老師的方法
def saveOffset2(ranges: Array[OffsetRange], groupId: String): Unit = {
val cluster = getJcd()
val result: Map[String, Array[(String, String)]] = ranges.map(range => (range.topic, (range.partition.toString -> range.untilOffset.toString)))
.groupBy(_._1)
.map { case (topic, buffer) => (topic, buffer.map(_._2)) }
result.map { r =>
import scala.collection.JavaConverters._
val offsets = r._2.toMap.asJava
cluster.hmset(getKey(r._1, groupId), offsets)
}
}
}
KafkaDStream(從kafka獲取數(shù)據(jù),使用 Redis 保存offsets)
package com.hhb.spark.streaming.kafka
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-23 16:59
**/
object KafkaDStream3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("warn")
val topics = Array("topicA")
val groupId = "groupId"
val param: Map[String, Object] = getKafkaMap(groupId)
val offsets: Map[TopicPartition, Long] = OffsetsRedisUtils.getOffset(topics, groupId)
val kafkaDStream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, param, offsets)
)
kafkaDStream.foreachRDD { (rdd, times) =>
if(!rdd.isEmpty()){
//數(shù)據(jù)處理邏輯藤为,這里只是輸出
println(s"rdd.count = ${rdd.count()},time=>${times}")
val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
OffsetsRedisUtils.saveOffset(ranges, groupId)
}
}
ssc.start()
ssc.awaitTermination()
}
def getKafkaMap(groupId: String): Map[String, Object] = {
Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092,linux122:9092,linux123:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
}
}