Spark Streaming

Spark Streaming

隨著大數(shù)據(jù)技術(shù)的不斷發(fā)展立帖,人們對(duì)于大數(shù)據(jù)的實(shí)時(shí)性處理要求也在不斷提高,傳統(tǒng) 的 MapReduce 等批處理框架在某些特定領(lǐng)域猪贪,例如實(shí)時(shí)用戶推薦姿锭、用戶行為分析這 些應(yīng)用場(chǎng)景上逐漸不能滿足人們對(duì)實(shí)時(shí)性的需求,因此誕生了一批如 S3汉矿、Samza、 Storm备禀、Flink等流式分析洲拇、實(shí)時(shí)計(jì)算框架奈揍。

Spark 由于其內(nèi)部?jī)?yōu)秀的調(diào)度機(jī)制、快速的分布式計(jì)算能力赋续,能夠以極快的速度進(jìn)行 迭代計(jì)算男翰。正是由于具有這樣的優(yōu)勢(shì),Spark 能夠在某些程度上進(jìn)行實(shí)時(shí)處理纽乱, Spark Streaming 正是構(gòu)建在此之上的流式框架蛾绎。

Spark Streaming 概述

什么是Spark Streaming

Spark Streaming 類似于Apache Storm(來(lái)一條數(shù)據(jù)處理一條,延遲低鸦列、響應(yīng)快租冠、吞吐量低),用于流式數(shù)據(jù)處理薯嗤。Spark Streaming 具有高吞吐量和容錯(cuò)能力強(qiáng)等特點(diǎn)顽爹,支持?jǐn)?shù)據(jù)源有很多,例如Kafka(最重要的數(shù)據(jù)源)、Flume、Twitter和TCP套接字螃诅,數(shù)據(jù)輸入后可用高度抽象API凿可,如map、reduce涉馅、join归园、window等進(jìn)行運(yùn)算,處理結(jié)果能保存在很多地方稚矿,如HDFS庸诱、數(shù)據(jù)庫(kù)等,Spark Streaming能與MLlib已經(jīng)Graphx融合

什么是Spark Streaming1.png

Spark Streaming 與Spark 基于RDD的概念比較類似晤揣,Spark Streaming使用離散流(Discretized Stream)作為抽象表示桥爽,稱為DStream,DStream是隨著時(shí)間推移而收到的數(shù)據(jù)的序列昧识。在內(nèi)部钠四,每個(gè)時(shí)間區(qū)間收到的數(shù)據(jù)都作為RDD存在,DStream是由這些RDD所組成的序列

什么是Spark Streaming2.png

DStream可以從各種輸入源創(chuàng)建跪楞,比如Flume缀去、Kafka或者HDFS。創(chuàng)建出來(lái)的DStream支持兩種操作

  • 轉(zhuǎn)化操作甸祭,會(huì)生成一個(gè)新的DStream
  • 輸出操作(output operation)缕碎,把數(shù)據(jù)寫入外部系統(tǒng)

DStream提供了許多與RDD所支持的操作相類似的操作支持,還增加了時(shí)間相關(guān)的新操作池户,比如滑動(dòng)窗口咏雌。

Spark Streaming架構(gòu)

Spark Streaming 使用 mini-batch架構(gòu)凡怎,把流式計(jì)算當(dāng)作一系列連續(xù)的小規(guī)模批處理來(lái)對(duì)待,Spark Streaming 從各種輸入源中讀取數(shù)據(jù)处嫌,并把數(shù)據(jù)分組為小的批次栅贴,新的批次按均勻的時(shí)間間隔創(chuàng)建處理,在每個(gè)時(shí)間區(qū)間開(kāi)始的時(shí)候熏迹,一個(gè)新的批次就創(chuàng)建出來(lái)檐薯,在該區(qū)間內(nèi)接收到的數(shù)據(jù)都會(huì)被添加到這個(gè)批次中。在時(shí)間區(qū)間結(jié)束時(shí)注暗,批次停止增長(zhǎng)坛缕。時(shí)間區(qū)間的大小是有批次間隔這個(gè)參數(shù)覺(jué)得決定的,批次間隔一般設(shè)在500毫秒到幾秒之間捆昏,有開(kāi)發(fā)者配置赚楚。每個(gè)輸入批次都形成一個(gè)RDD,以 Spark 作業(yè)的方式處理并生成其他的 RDD骗卜。 處理的結(jié)果可以以批處理的方式傳給外部系統(tǒng)宠页。

Spark Streaming架構(gòu)1.png

Spark Streaming 的編程抽象時(shí)離散化流,也就是DStream寇仓,是一個(gè)RDD序列举户,每個(gè)RDD代表數(shù)據(jù)流中的一個(gè)時(shí)間片的內(nèi)的數(shù)據(jù)。

Spark Streaming架構(gòu)2.png

應(yīng)用于DStream上的轉(zhuǎn)換操作遍烦,都會(huì)轉(zhuǎn)換為底層RDD上的操作俭嘁,如對(duì)行DStream中的每個(gè)RDD應(yīng)用flatMap操作以生成單詞DStream的RDD

Spark Streaming架構(gòu)3.png

這些底層的RDD轉(zhuǎn)換是由Spark引擎完成的。DStream操作隱藏了大部分這些細(xì)節(jié)服猪, 為開(kāi)發(fā)人員提供了更高級(jí)別的API以方便使用供填。

Spark Streaming架構(gòu)4.png

Spark Streaming為每個(gè)輸入源啟動(dòng)對(duì)應(yīng)的接收器。接收器運(yùn)行在Executor中罢猪,從輸入源收集數(shù)據(jù)并保存為 RDD近她。默認(rèn)情況下接收到的數(shù)據(jù)后會(huì)復(fù)制到另一個(gè)Executor中,進(jìn)行容錯(cuò); Driver 中的 StreamingContext 會(huì)周期性地運(yùn)行 Spark 作業(yè)來(lái)處理這些數(shù)據(jù)膳帕。

Spark Streaming架構(gòu)5.png

Spark Streaming運(yùn)行流程

  1. 客戶端提交Spark Streaming 作業(yè)后啟動(dòng)Driver粘捎,Driver啟動(dòng)Receiver,Receiver接收數(shù)據(jù)源的數(shù)據(jù)

  2. 每個(gè)作業(yè)保護(hù)多個(gè)Executor备闲,每個(gè)Executor以線程的方式運(yùn)行task晌端,Spark Streaming至少包含一個(gè)receiver task(一般情況下)

  3. Receiver接收數(shù)據(jù)后生成Block,并把BlockId匯報(bào)給Driver恬砂,然后備份到另外一個(gè)Executor上

  4. ReceiverTracker維護(hù)Reciver匯報(bào)的BlockId

  5. Driver定時(shí)啟動(dòng)JobGenerator咧纠,根據(jù)DStream的關(guān)系生成邏輯RDD,然后創(chuàng)建JobSet泻骤,叫個(gè)JobScheduler

  6. JobScheduler負(fù)責(zé)調(diào)度JobSet漆羔,交給DAGScheduler梧奢,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages演痒,每個(gè)stage包含一到多個(gè)Task亲轨,將TaskSet提交給TaskSchedule。

  7. TaskScheduler負(fù)責(zé)把Task調(diào)度到Executor上鸟顺,并維護(hù)Task運(yùn)行狀態(tài)惦蚊。

總結(jié):

  1. 提交完spark作業(yè)后,driver就會(huì)去啟動(dòng)Receiver取接收數(shù)據(jù)讯嫂,Receiver接收數(shù)據(jù)的同時(shí)蹦锋,會(huì)將數(shù)據(jù)備份到另一個(gè)節(jié)點(diǎn),Receiver接收到數(shù)據(jù)會(huì)回報(bào)給Driver的欧芽,然后

Spark Streaming優(yōu)缺點(diǎn)

EOS:exactly onece semantic 處理且僅處理一次

與傳統(tǒng)流式框架相比莉掂,Spark Streaming最大的不同點(diǎn)在于它對(duì)待數(shù)據(jù)是粗粒度的處理方式,記一次處理一小批數(shù)據(jù)千扔,而其他框架往往采用細(xì)粒度的處理模式憎妙,即依次處理一條數(shù)據(jù),Spark Streaming這樣的設(shè)計(jì)即為其帶來(lái)了優(yōu)點(diǎn)曲楚,也帶來(lái)的確定

優(yōu)點(diǎn)
  • Spark Streaming 內(nèi)部實(shí)現(xiàn)和調(diào)度方式高度依賴Spark的DAG調(diào)度器和RDD厘唾,這就決定了Spark Streaming的設(shè)計(jì)初衷必須是粗粒度方式的,同時(shí)洞渤,由于Spark 內(nèi)部調(diào)度器足夠快速和高效阅嘶,可以快速地處理小批量數(shù)據(jù)属瓣,這就獲得準(zhǔn)實(shí)時(shí)的特性
  • Spark Streaming 的粗粒度執(zhí)行方式使其確痹仄“處理且僅處理一次”的特性(EOS),同時(shí)也可以方便地實(shí)現(xiàn)容錯(cuò)回復(fù)機(jī)制
  • 由于Spark Streaming的DStream本質(zhì)是RDD在流式數(shù)據(jù)上的抽象抡蛙,因此基于RDD的各種操作也有相應(yīng)的基于DStream的版本护昧,這樣就大大降低了用戶對(duì)于新框架的學(xué)習(xí)成本,在了解Spark的情況下粗截,用戶將很容易使用Spark Streaming
  • 由于DStream是在RDD上抽象惋耙,那么也跟容易與RDD進(jìn)行交互操作,在需要將流式數(shù)據(jù)和批處理數(shù)據(jù)結(jié)合進(jìn)行分析的情況下熊昌,將會(huì)變得非常方便
缺點(diǎn)
  • Spark Streaming 的粗粒度處理方式也造成了不可避免的延遲绽榛。在細(xì)粒度處理方式下,理想情況下每一條記錄都會(huì)被實(shí)時(shí)處理婿屹,而在Spark Streaming中灭美,數(shù)據(jù)需要匯總到一定的量后在一次性處理,這就增加了數(shù)據(jù)處理的延遲昂利,這種延遲是由框架的設(shè)計(jì)引入的届腐,并不是由網(wǎng)絡(luò)或其他情況造成的

Structured Streaming

Spark Streaming計(jì)算邏輯是把數(shù)據(jù)按時(shí)間劃分為DStream铁坎,存在以下問(wèn)題:

  • 框架自身只能根據(jù)Batch Time單元進(jìn)行數(shù)據(jù)處理,很難處理基于Event time(即時(shí)間戳)的數(shù)據(jù)犁苏,很難處理延遲硬萍、亂序的數(shù)據(jù)
  • 流式和批量處理的API完全不一致,兩種使用場(chǎng)景中围详,程序代碼還是需要一定的轉(zhuǎn)換
  • 端到端的數(shù)據(jù)容錯(cuò)保障邏輯需要用戶自己構(gòu)建朴乖,難以處理增量更新和持久化存儲(chǔ)等一致性問(wèn)題

基于以上問(wèn)題,提出了下一代Structure Streaming助赞。將數(shù)據(jù)映射為一張無(wú)界長(zhǎng)度的表寒砖,通過(guò)表的計(jì)算,輸出結(jié)果映射為另一張表嫉拐。

以結(jié)構(gòu)化的方式去操作流式數(shù)據(jù)哩都,簡(jiǎn)化了實(shí)時(shí)計(jì)算過(guò)程,同時(shí)還復(fù)用了Catalyst引擎來(lái)優(yōu)化SQL操作此外還能支持增量計(jì)算和基于event time 的計(jì)算婉徘。

DStream基礎(chǔ)數(shù)據(jù)源

基礎(chǔ)數(shù)據(jù)源包括:文件數(shù)據(jù)流漠嵌、socket數(shù)據(jù)流、RDD隊(duì)列流;這些數(shù)據(jù)源主要用于測(cè)試盖呼。

DStream基礎(chǔ)數(shù)據(jù)源.png

引入依賴:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>${spark.version}</version>
</dependency>

文件數(shù)據(jù)流

文件數(shù)據(jù)流:通過(guò)textFileStream(directory)方法進(jìn)行讀取HDFS兼容的文件系統(tǒng)文件

/**
 * Create an input stream that monitors a Hadoop-compatible filesystem
 * for new files and reads them as text files (using key as LongWritable, value
 * as Text and input format as TextInputFormat). Files must be written to the
 * monitored directory by "moving" them from another location within the same
 * file system. File names starting with . are ignored.
 * @param directory HDFS directory to monitor for new file
 */
def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
  fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}

Spark Streaming 將會(huì)監(jiān)控directory目錄儒鹿,并不斷處理移動(dòng)進(jìn)來(lái)的文件

  • 不支持嵌套目錄
  • 文件需要相同的數(shù)據(jù)格式
  • 文件進(jìn)入directory的方式需要通過(guò)移動(dòng)或者重命名來(lái)實(shí)現(xiàn)
  • 一旦文件移動(dòng)進(jìn)目錄,則不能在修改几晤,即使修改了也不會(huì)讀取新數(shù)據(jù)
  • 文件流不需要接收器(receiver)约炎,不需要單獨(dú)分配CPU核
package com.hhb.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @description: 只能監(jiān)控啟動(dòng)后的新增的文件
 * @date: 2020-11-19 10:28
 **/
object FileDStream {

  def main(args: Array[String]): Unit = {

    //1 創(chuàng)建SparkConf
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    //2 初始化入口,設(shè)置任務(wù)5秒鐘執(zhí)行一次
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("warn")
    //3蟹瘾、4圾浅、5
    //3 讀取本地文件,創(chuàng)建DStream
    val linesDStream = ssc.textFileStream("/Users/baiwang/myproject/spark/data/log/")
    //4 DStream轉(zhuǎn)換
    val wordDStream = linesDStream.flatMap(_.split("\\s+"))
    val wordCountDStream = wordDStream.map((_, 1)).reduceByKey(_ + _)
    //5 輸出DStream到屏幕
    wordCountDStream.print()
    //6 啟動(dòng)作業(yè)
    ssc.start()
    //啟動(dòng)作業(yè)后憾朴,等待狸捕,作業(yè)以5秒一次的頻率運(yùn)行
    ssc.awaitTermination()
  }
}

Socket數(shù)據(jù)流

Spark Streaming 可以通過(guò)Socket端口監(jiān)聽(tīng)并接受數(shù)據(jù),然后進(jìn)行相應(yīng)的處理

在linux120上執(zhí)行nc程序

nc -lk 9999
# yum install nc

隨后可以在nc窗口中隨意輸入一下單詞众雷,監(jiān)聽(tīng)窗口會(huì)自動(dòng)獲得單詞數(shù)據(jù)流信息灸拍,在監(jiān)聽(tīng)窗口每隔x秒就會(huì)打印出詞頻的統(tǒng)計(jì)信息,可以在屏幕上出現(xiàn)結(jié)果

備注:使用local[*]可能出現(xiàn)問(wèn)題

如果給虛擬機(jī)配置的cpu數(shù)為1砾省,使用local[*]也只會(huì)啟動(dòng)一個(gè)線程鸡岗,該線程用于receiver task,此時(shí)沒(méi)有資源處理接受打到的數(shù)據(jù)编兄。

【現(xiàn)象:程序正常執(zhí)行轩性,不會(huì)打印時(shí)間戳,屏幕上也不會(huì)有其他有效信息】

源碼:

/**
 * Creates an input stream from TCP source hostname:port. Data is received using
 * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
 * lines.
 * @param hostname      Hostname to connect to for receiving data
 * @param port          Port to connect to for receiving data
 * @param storageLevel  Storage level to use for storing the received objects
 *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
 * @see [[socketStream]]
 */
def socketTextStream(
    hostname: String,
    port: Int,
   //監(jiān)控到的數(shù)據(jù)翻诉,默認(rèn)存內(nèi)存炮姨,內(nèi)存存不下放到磁盤捌刮,并對(duì)數(shù)據(jù)進(jìn)行備份
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

注意:DStream的 StorageLevel 是 MEMORY_AND_DISK_SER_2;

package com.hhb.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-11-19 15:35
 **/
object SocketDStream {

  def main(args: Array[String]): Unit = {

    //初始化StreamingContext
    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    val ssc = new StreamingContext(conf, Seconds(5))

    //接受數(shù)據(jù),轉(zhuǎn)換成socketStream
    //鏈接服務(wù)器舒岸,需要在服務(wù)安裝nc 執(zhí)行命令nc -lk 9999绅作,然后輸入數(shù)據(jù)
//    val socketDStream = ssc.socketTextStream("linux120", 9999)
    //鏈接本地
    val socketDStream = ssc.socketTextStream("localhost", 9999)
    //數(shù)據(jù)轉(zhuǎn)換
    val wordDStream = socketDStream.flatMap(_.split("\\s+"))
    val wordCountDStream = wordDStream.map((_, 1)).reduceByKey(_ + _)

    //數(shù)據(jù)輸出
    wordCountDStream.print()

    //啟動(dòng)
    ssc.start()
    ssc.awaitTermination()
  }

}

SocketServer程序(單線程),監(jiān)聽(tīng)本機(jī)指定端口蛾派,與socket連接后可發(fā)送信息:

package com.hhb.spark.streaming


import java.io.PrintWriter
import java.net.ServerSocket

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-11-19 16:14
 **/
object OneThreadSocket {

  def main(args: Array[String]): Unit = {

    val arr: Array[String] = "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop".split("\\s+")
    val n = arr.length
    val port = 9999
    val random = scala.util.Random
    val server = new ServerSocket(port)
    val socket = server.accept()
    println("鏈接成功俄认,鏈接地址:" + socket.getInetAddress)
    while (true) {
      val writer = new PrintWriter(socket.getOutputStream)
      writer.println(arr(random.nextInt(n)) + " " + arr(random.nextInt(n)))
      writer.flush()
      Thread.sleep(100)
    }
  }
}

SocketServer程序(多線程)


RDD隊(duì)列流

調(diào)試Spark Streaming應(yīng)用程序的時(shí)候,可使用streamingContext.queueStream(queueOfRDD) 創(chuàng)建基于RDD隊(duì)列的DStream

源碼:

/**
 * Create an input stream from a queue of RDDs. In each batch,
 * it will process either one or all of the RDDs returned by the queue.
 *
 * @param queue      Queue of RDDs. Modifications to this data structure must be synchronized.
 * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
 * @tparam T         Type of objects in the RDD
 *
 * @note Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
 * those RDDs, so `queueStream` doesn't support checkpointing.
 */
def queueStream[T: ClassTag](
    queue: Queue[RDD[T]],
  //表示一次只處理一個(gè)RDD
    oneAtATime: Boolean = true
  ): InputDStream[T] = {
  queueStream(queue, oneAtATime, sc.makeRDD(Seq.empty[T], 1))
}

備注:

  • oneAtTime :缺省為true洪乍,一次處理一個(gè)RDD眯杏,設(shè)為false,一次處理全部的RDD
  • RDD隊(duì)列流可以使用local[1]
  • 涉及到同時(shí)出隊(duì)和入隊(duì)操作壳澳,所以需要同步

每秒創(chuàng)建一個(gè)RDD(RDD存放1-100的整數(shù))岂贩,Streaming每隔1秒就對(duì)數(shù)據(jù)進(jìn)行處 理,計(jì)算RDD中數(shù)據(jù)除10取余的個(gè)數(shù)巷波。

package com.hhb.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
 * @description:
 * @date: 2020-11-19 16:20
 **/
object RDDDStream {

  def main(args: Array[String]): Unit = {
    //初始化ssc,沒(méi)個(gè)一秒處理一次數(shù)據(jù)
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    //由于RDD是一秒產(chǎn)生一個(gè)萎津,所以執(zhí)行實(shí)現(xiàn)需要小于一秒,否則產(chǎn)生完RDD后直接停止了抹镊,
    // 例如設(shè)置10秒锉屈,還沒(méi)等執(zhí)行,程序已經(jīng)結(jié)束
    val ssc = new StreamingContext(conf, Seconds(1))
    //組裝RDD的對(duì)流
    val queue = mutable.Queue[RDD[Int]]()
    // 讀取RDD生成DStream
    val rddDStream = ssc.queueStream(queue)
    //具體的業(yè)務(wù)處理邏輯
    val countDStream = rddDStream.map(x => (x % 10, 1)).reduceByKey(_ + _)

    //輸出
    countDStream.print()

    //啟動(dòng)
    ssc.start()

    //每秒產(chǎn)生一個(gè)RDD
    val arr = 1 to 100
    for (i <- 1 to 5) {
      queue.synchronized {
        queue += ssc.sparkContext.makeRDD(arr.map(_ * i))
      }
      Thread.sleep(1000)
    }
    ssc.stop()
  }
}

DStream 轉(zhuǎn)換操作

DStream上的操作與RDD類似垮耳,分為Transformations(轉(zhuǎn)換) 和 Output Operations(輸出)兩種颈渊,池外轉(zhuǎn)換操作中還有一些比較特殊的方法,如:updateStateByKey终佛、transform以及各種Window相關(guān)的操作

Transformation Meaning
map(func) 將源DStream中的每個(gè)元素通過(guò)一個(gè)函數(shù)func從 而得到新的DStreams
flatMap(func) 和map類似俊嗽,但是每個(gè)輸入的項(xiàng)可以被映射為0 或更多項(xiàng)
filter(func) 選擇源DStream中函數(shù)func判為true的記錄作為 新DStreams
repartition(numPartitions) 通過(guò)創(chuàng)建更多或者更少的partition來(lái)改變此 DStream的并行級(jí)別
union(otherStream) 聯(lián)合源DStreams和其他DStreams來(lái)得到新 DStream
count() 統(tǒng)計(jì)源DStreams中每個(gè)RDD所含元素的個(gè)數(shù)得 到單元素RDD的新DStreams
reduce(func) 通過(guò)函數(shù)func(兩個(gè)參數(shù)一個(gè)輸出)來(lái)整合源 DStreams中每個(gè)RDD元素得到單元素RDD的 DStreams。這個(gè)函數(shù)需要關(guān)聯(lián)從而可以被并行 計(jì)算
countByValue() 對(duì)于DStreams中元素類型為K調(diào)用此函數(shù)查蓉,得到 包含(K,Long)對(duì)的新DStream乌询,其中Long值表明 相應(yīng)的K在源DStream中每個(gè)RDD出現(xiàn)的頻率
reduceByKey(func, [numTasks]) 對(duì)(K,V)對(duì)的DStream調(diào)用此函數(shù)榜贴,返回同樣(K,V) 的新DStream豌研,新DStream中的對(duì)應(yīng)V為使用 reduce函數(shù)整合而來(lái)。默認(rèn)情況下唬党,這個(gè)操作使 用Spark默認(rèn)數(shù)量的并行任務(wù)(本地模式為2鹃共,集 群模式中的數(shù)量取決于配置參數(shù) spark.default.parallelism)。也可以傳入可選 的參數(shù)numTasks來(lái)設(shè)置不同數(shù)量的任務(wù)
join(otherStream, [numTasks]) 兩DStream分別為(K,V)和(K,W)對(duì)驶拱,返回(K,(V,W)) 對(duì)的新DStream
cogroup(otherStream, [numTasks]) 兩DStream分別為(K,V)和(K,W)對(duì)霜浴,返回(K, (Seq[V],Seq[W])對(duì)新DStreams
transform(func) 將RDD到RDD映射的函數(shù)func作用于源DStream 中每個(gè)RDD上得到新DStream。這個(gè)可用于在 DStream的RDD上做任意操作
updateStateByKey(func) 得到”狀態(tài)”DStream蓝纲,其中每個(gè)key狀態(tài)的更新是 通過(guò)將給定函數(shù)用于此key的上一個(gè)狀態(tài)和新值 而得到阴孟。這個(gè)可用于保存每個(gè)key值的任意狀態(tài) 數(shù)據(jù)

備注:

  • 在DStream與RDD上的轉(zhuǎn)換操作非常類似(無(wú)狀態(tài)操作)
  • DStream有自己特殊的操作(窗口操作晌纫、追蹤狀態(tài)變化操作)
  • 在DStream上的轉(zhuǎn)換操作比RDD上的轉(zhuǎn)換操作少

DStream的轉(zhuǎn)化操作可以分為無(wú)狀態(tài)(stateless)和有狀態(tài)(stateful)兩種:

  • 無(wú)狀態(tài)轉(zhuǎn)換操作,每個(gè)批次處理不依賴之前批次的數(shù)據(jù)永丝,常見(jiàn)的RDD轉(zhuǎn)化操作锹漱,例如map、filter慕嚷、reduceByKey等
  • 有狀態(tài)轉(zhuǎn)化操作哥牍。需要使用之前批次的數(shù)據(jù) 或者 中間結(jié)果來(lái)計(jì)算當(dāng)前批次的數(shù)據(jù)。有狀態(tài)轉(zhuǎn)化操作包括:基于滑動(dòng)窗口的轉(zhuǎn)化操作 或 追蹤狀態(tài)變化的轉(zhuǎn)化操作

無(wú)狀態(tài)轉(zhuǎn)換

無(wú)狀態(tài)轉(zhuǎn)化操作就是把簡(jiǎn)單的RDD轉(zhuǎn)化操作應(yīng)用到每個(gè)批次上喝检,也就是轉(zhuǎn)化DStream中的每一個(gè)RDD嗅辣。創(chuàng)建的無(wú)狀態(tài)轉(zhuǎn)換包括:map、flatMap挠说、filter澡谭、repartition、reduceByKey损俭、groupByKey;直接作用在DStream上译暂。重要的轉(zhuǎn)換操作:transform。通過(guò)對(duì)源DStream的每個(gè)RDD應(yīng)用RDD-to-RDD函 數(shù)撩炊,創(chuàng)建一個(gè)新的DStream外永。支持在新的DStream中做任何RDD操作。

/**
* Create a new DStream in which each RDD is generated by applying a function on RDDs of
* the DStreams.
*/
def transform[T: ClassTag](
  dstreams: Seq[DStream[_]],
  transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T] = withScope {
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
}

這是一個(gè)功能強(qiáng)大的函數(shù)拧咳,它可以允許開(kāi)發(fā)者直接操作其內(nèi)部的RDD伯顶。也就是說(shuō)開(kāi) 發(fā)者,可以提供任意一個(gè)RDD到RDD的函數(shù)骆膝,這個(gè)函數(shù)在數(shù)據(jù)流每個(gè)批次中都被調(diào) 用祭衩,生成一個(gè)新的流。

示例:黑名單過(guò)濾

假設(shè):arr1為黑名單數(shù)據(jù)(自定義)阅签,true表示數(shù)據(jù)生效掐暮,需要被過(guò)濾掉;false表示數(shù)據(jù) 未生效
val arr1 = Array(("spark", true), ("scala", false))
假設(shè):流式數(shù)據(jù)格式為"time word",需要根據(jù)黑名單中的數(shù)據(jù)對(duì)流式數(shù)據(jù)執(zhí)行過(guò)濾操 作政钟。如"2 spark"要被過(guò)濾掉
1 hadoop
2 spark
3 scala
4 java
5 hive
結(jié)果:"2 spark" 被過(guò)濾

方法一:使用外鏈接

package com.hhb.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @description:ConstantInputDStream 主要用于流式計(jì)算的測(cè)試
 * @date: 2020-11-20 10:09
 **/
object BlackListFilter1 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    val ssc = new StreamingContext(conf, Seconds(5))
    //準(zhǔn)備數(shù)據(jù)
    val arr: Array[(String, Int)] = "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop hello scala".split("\\s+").map(_.toLowerCase).zipWithIndex
    val data = arr.map { case (k, v) => v + " " + k }
    val rdd = ssc.sparkContext.makeRDD(data)
    //  黑名單
    val blackList = Array(("spark", true), ("scala", false),("hbase", true),("hello", true))
    val blackListRDD = ssc.sparkContext.makeRDD(blackList)
    //new ConstantInputDStream[String](ssc, rdd) =》 1 hello
    //    new ConstantInputDStream[String](ssc, rdd).map(value => (value.split("\\s+")(1), value))  => (hello,(1 hello))
    val rddDStream = new ConstantInputDStream[String](ssc, rdd).map(value => (value.split("\\s+")(1), value))
    val resultDStream = rddDStream.transform { rdd =>
      // 通過(guò)leftOuterJoin操作既保留了左側(cè)RDD的所有內(nèi)容路克,又獲得了內(nèi)容是否在黑名單中
      rdd.leftOuterJoin(blackListRDD)
        //      (hello,(hello 1,option(none)),(spark,(spark 3,option(true)))
        .filter { case (_, (_, rightValue)) => rightValue.getOrElse(false) != true }
        .map { case (_, (leftValue, _)) => leftValue }
    }
    resultDStream.print()
    // 啟動(dòng)流式作業(yè)
    ssc.start()
    ssc.awaitTermination()
  }
}

方案二:使用SQL或者DSL

package com.hhb.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @description:使用SQL或者DSL
 * @author: huanghongbo
 * @date: 2020-11-20 10:39
 **/
object BlackListFilter2 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(2))
    ssc.sparkContext.setLogLevel("warn")
    //準(zhǔn)備數(shù)據(jù)
    val arr: Array[(String, Int)] = "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop hello scala".split("\\s+").map(_.toLowerCase).zipWithIndex
    val arrRDD = ssc.sparkContext.makeRDD(arr).map { case (k, v) => v + " " + k }
    val constantDStream = new ConstantInputDStream[String](ssc, arrRDD)

    //  黑名單
    val blackList = Array(("spark", true), ("scala", false), ("hbase", true), ("hello", true))
    val blackListRDD = ssc.sparkContext.makeRDD(blackList)

    val valueDStream = constantDStream.map(line => (line.split("\\s+")(1), line))
    valueDStream.transform { rdd =>
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
      import spark.implicits._

      val wordDF = rdd.toDF("word", "line")
      val blackListDF = blackListRDD.toDF("word1", "flag")

      wordDF.join(blackListDF, $"word" === $"word1", "left")
        .filter("flag == false or flag is null")
        .select("line")
        .rdd

    }.print()


    ssc.start()
    ssc.awaitTermination()
  }
}

方案三:直接過(guò)濾,沒(méi)有shuffle

package com.hhb.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ConstantInputDStream

/**
 * @description: 直接過(guò)濾养交,沒(méi)有shuffle
 * @author: huanghongbo
 * @date: 2020-11-20 10:39
 **/
object BlackListFilter3 {


  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(2))
    ssc.sparkContext.setLogLevel("warn")
    //準(zhǔn)備數(shù)據(jù)
    val arr: Array[(String, Int)] = "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop hello scala".split("\\s+").map(_.toLowerCase).zipWithIndex
    val arrRDD = ssc.sparkContext.makeRDD(arr).map { case (k, v) => v + " " + k }
    val constantDStream = new ConstantInputDStream[String](ssc, arrRDD)

    //  黑名單
    val blackList = Array(("spark", true), ("scala", false), ("hbase", true), ("hello", true))
      .filter { case (_, v) => v }
      .map { case (k, _) => k }

   constantDStream.map(line => (line.split("\\s+")(1), line))
      .filter { case (word, _) => !blackList.contains(word) }
      .map { case (_, line) => line }
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}

有狀態(tài)轉(zhuǎn)換

有狀態(tài)轉(zhuǎn)化的主要有兩種操作:窗口操作精算、狀態(tài)跟蹤操作

窗口操作

Window Operations 可以設(shè)計(jì)窗口大小滑動(dòng)窗口間隔。來(lái)動(dòng)態(tài)的獲取當(dāng)前Streaming的狀態(tài)碎连』矣穑基于窗口的操作會(huì)在一個(gè)比StreamingContext和batchDuration(批次間隔)更長(zhǎng)的時(shí)間范圍內(nèi),通過(guò)整合多個(gè)批次的結(jié)果,計(jì)算整個(gè)窗口的結(jié)果

窗口操作.png

基于窗口的操作需要兩個(gè)參數(shù):

  • 窗口長(zhǎng)度(windowDuration)廉嚼∶蹈洌控制每次計(jì)算最近的多少個(gè)批次的數(shù)據(jù)
  • 滑動(dòng)間隔(slideDuration)。用來(lái)控制對(duì)新的DStream進(jìn)行計(jì)算的間隔

兩者都必須是StreamingContext中批次間隔(batchDuration)的整數(shù)倍

每秒發(fā)送一個(gè)數(shù)字:

package com.hhb.spark.streaming

import java.io.PrintWriter
import java.net.ServerSocket

/**
 * @description:
 * @date: 2020-11-20 14:31
 **/
object NumSocket {

  def main(args: Array[String]): Unit = {

    val server = new ServerSocket(9999)
    val socket = server.accept()
    var i = 0
    println("服務(wù)注冊(cè):" + socket.getInetAddress)
    while (true) {
      i += 1
      val writer = new PrintWriter(socket.getOutputStream)
      writer.println(i)
      writer.flush()
      Thread.sleep(1000)
    }
  }

}

案例一:

  • 觀察窗口的數(shù)據(jù)

  • 觀察batchDuration怠噪、windowDuration摘悴、slideDuration三者之間的關(guān)系

  • 使用窗口相關(guān)的操作

package com.hhb.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @description:
 * @date: 2020-11-20 14:31
 **/
object WindowDemo {

  def main(args: Array[String]): Unit = {

    //初始化
    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    //每5秒生成一個(gè)RDD
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("error")

    val lines = ssc.socketTextStream("localhost", 9999)

    lines.foreachRDD { (rdd, time) =>
      println(rdd.id + "   time: " + time)
      println("*" * 15)
      rdd.foreach(x => print(x + "\t"))
    }
    // 窗口長(zhǎng)度為20s,每隔10s滑動(dòng)一次
    lines.reduceByWindow(_ + " " + _, Seconds(20), Seconds(10))
      .print()
    println("=" * 15)

    //對(duì)數(shù)據(jù)進(jìn)行求和
    lines.map(_.toInt).window(Seconds(20), Seconds(10)).reduce(_ + _).print()
    //對(duì)數(shù)據(jù)進(jìn)行求和
    lines.map(_.toInt).reduceByWindow(_ + _, Seconds(20), Seconds(10)).print()

    ssc.start()
    ssc.awaitTermination()

  }
}

案例二:熱點(diǎn)搜索詞實(shí)時(shí)統(tǒng)計(jì)舰绘。每隔 10 秒蹂喻,統(tǒng)計(jì)最近20秒的詞出現(xiàn)的次數(shù)

package com.hhb.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-11-20 15:12
 **/
object HotWordStats {

  def main(args: Array[String]): Unit = {
    //初始化StreamingContext
    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("error")

    // 通過(guò)reduceByKeyAndWindow算子, 每隔10秒統(tǒng)計(jì)最近20秒的詞出現(xiàn)的次數(shù) ,后 3個(gè)參數(shù):窗口時(shí)間長(zhǎng)度、滑動(dòng)窗口時(shí)間捂寿、分區(qū)
    val wordDStream = ssc.socketTextStream("localhost", 9999).flatMap(_.split("\\s+")).map((_, 1))
    wordDStream.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(20), Seconds(10)).print()

    println("*" * 20)
    //設(shè)置檢查點(diǎn)口四,檢查點(diǎn)具有容錯(cuò)機(jī)制。生產(chǎn)環(huán)境中應(yīng)設(shè)置到HDFS
    ssc.checkpoint("data/checkpoint/")
    // 這里需要checkpoint的支持
    wordDStream.reduceByKeyAndWindow(_ + _, _ - _, Seconds(20), Seconds(10)).print()

    ssc.start()
    ssc.awaitTermination()
  }
}
狀態(tài)追蹤(updateStateByKey)

UpdateStateByKey的主要功能:

  • 為Streaming中每一個(gè)key維護(hù)一份state狀態(tài)秦陋,state類型可以是任意類型的蔓彩,可以是自定義對(duì)象;更新函數(shù)也可以是自定義的
  • 通過(guò)更新函數(shù)對(duì)該key的狀態(tài)不斷更新驳概,對(duì)于每個(gè)新的batch而言赤嚼,Spark Streaming會(huì)在使用updateStateKey的時(shí)候?yàn)橐呀?jīng)存在的key進(jìn)行state的狀態(tài)更新
  • 使用updateStateByKey時(shí)要開(kāi)啟checkpoint功能

源碼:

/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* In every batch the updateFunc will be called for each state even if there are no new values.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
*                   corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
  updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner())
}

流式程序啟動(dòng)后計(jì)算wordcount的累計(jì)值,將每個(gè)批次的結(jié)果保存到文件

package com.hhb.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-11-20 17:56
 **/
object StateTracker1 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("error")
    ssc.checkpoint("data/checkpoint")
    //準(zhǔn)備數(shù)據(jù)
    val wordDStream = ssc.socketTextStream("localhost", 9999).flatMap(_.split("\\s+")).map((_, 1))

    // def updateStateByKey[S: ClassTag](updateFunc: (Seq[V]value的類型, Option[S]) => Option[S] 返回值
    //定義狀態(tài)更新函數(shù)
    //函數(shù)常量定義顺又,返回值類型是Some(Int)更卒,表示的含義是最新的狀態(tài)
    //函數(shù)的功能是將當(dāng)前時(shí)間間隔產(chǎn)生的key的value的集合,加到上一個(gè)狀態(tài)稚照,得到最新?tīng)顟B(tài)
    val updateFunc = (currVale: Seq[Int], preValue: Option[Int]) => {
      //通過(guò)Spark內(nèi)部的reduceByKey按key規(guī)約蹂空,然后這里傳入某key當(dāng)前批次的Seq,在計(jì)算當(dāng)前批次的總和
      val currSum = currVale.sum
      //以前已經(jīng)累加的值
      val preSum = preValue.getOrElse(0)
      Option(currSum + preSum)
    }
    val resultDStream = wordDStream.updateStateByKey[Int](updateFunc)
    resultDStream.cache()

    resultDStream.print()
    // 把DStream保存到文本文件中果录,會(huì)生成很多的小文件上枕。一個(gè)批次生成一個(gè)目錄
    resultDStream.repartition(1).saveAsTextFiles("data/output1/")

    ssc.start()
    ssc.awaitTermination()
  }
}

統(tǒng)計(jì)全局的key的狀態(tài),但是就算沒(méi)有數(shù)據(jù)輸入弱恒,也會(huì)在每一個(gè)批次的時(shí)候返回之前的key的狀態(tài)辨萍。這樣的確定:如果數(shù)據(jù)量很大的話,checkpoint數(shù)據(jù)會(huì)占用較大的存儲(chǔ)返弹,而且效率也不高锈玉。

mapWithState:也是用于全局統(tǒng)計(jì)key的狀態(tài),如果沒(méi)有數(shù)據(jù)輸入琉苇,便不會(huì)返回之前key的狀態(tài)嘲玫,有一點(diǎn)增量的感覺(jué),這樣做的好處是并扇,只關(guān)心那些已經(jīng)發(fā)生變化的key,對(duì)于沒(méi)有數(shù)據(jù)輸入抡诞,則不會(huì)返回那些沒(méi)有變化的key的數(shù)據(jù)穷蛹,即使數(shù)據(jù)量很大土陪,checkpoint也不會(huì)像updateStateByKey那樣,占用太多的存儲(chǔ)

package com.hhb.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-11-20 17:56
 **/
object StateTracker2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("error")
    ssc.checkpoint("data/checkpoint")
    //準(zhǔn)備數(shù)據(jù)
    val wordDStream = ssc.socketTextStream("localhost", 9999).flatMap(_.split("\\s+")).map((_, 1))


    //(KeyType, Option[ValueType], State[StateType]) => MappedType
    //(key的類型肴熏,輸入的數(shù)據(jù)鬼雀,中間匯總)=> 返回值類型
    def func(key: String, option: Option[Int], state: State[Int]): (String, Int) = {
      val sum = option.getOrElse(0) + state.getOption().getOrElse(0)
      state.update(sum)
      (key, sum)
    }

    val spec = StateSpec.function(func _)
    //只統(tǒng)計(jì)本批次出現(xiàn)的數(shù)據(jù),不會(huì)把所有數(shù)據(jù)匯總
    val resultDStream = wordDStream.mapWithState(spec)
      //.stateSnapshots() 以快照的形式蛙吏,把所有的數(shù)據(jù)都顯示出來(lái),結(jié)果類似于updateStateByKey
      .stateSnapshots()

    resultDStream.cache()

    resultDStream.print()
    // 把DStream保存到文本文件中源哩,會(huì)生成很多的小文件。一個(gè)批次生成一個(gè)目錄
    resultDStream.repartition(1).saveAsTextFiles("data/output2/")

    ssc.start()
    ssc.awaitTermination()

  }
}

總結(jié):mapWithState按照時(shí)間線在每一個(gè)批次間隔返回之前的發(fā)生改變的或者新的key的狀態(tài)鸦做,不發(fā)生變化的不返回励烦;updateStateByKey統(tǒng)計(jì)的是全局Key的狀態(tài),就算沒(méi)有數(shù)據(jù)輸入也會(huì)在每個(gè)批次的時(shí)候返回之前的Key的狀態(tài)泼诱。也就是說(shuō)坛掠,mapWithState統(tǒng)計(jì)的是一個(gè)批次的數(shù)據(jù),updateStateByKey統(tǒng)計(jì)的是全局的key

DStream 輸出操作

輸出操作定義 DStream 的輸出操作治筒。與 RDD 中的惰性求值類似屉栓,如果一個(gè)DStream及其派生出的 DStream 都沒(méi)有被執(zhí)行輸出操作,那么這些 DStream 就都不會(huì)被求值耸袜。如果 StreamingContext 中沒(méi)有設(shè)定輸出操作友多,整個(gè)流式作業(yè)不會(huì)啟動(dòng)。

Output Operation Meaning
print() 在運(yùn)行流程序的Driver上堤框,輸出DStream中每一批次數(shù)據(jù)最開(kāi)始的10個(gè)元素夷陋,用于開(kāi)發(fā)和調(diào)試
saveAsTestFile(prefix,[suffix]) 以text文件形成存儲(chǔ)DStream的內(nèi)容,每一批次的存儲(chǔ)文件名基于參數(shù)中的prefix和suffix
saveAsObjectFiles(prefix,[suffix]) 以 Java 對(duì)象序列化的方式將Stream中的數(shù)據(jù)保 存為 Sequence Files胰锌。每一批次的存儲(chǔ)文件名基 于參數(shù)中的為"prefix-TIME_IN_MS[.suffix]"
saveAsHadoopFiles(prefix, [suffix]) 將Stream中的數(shù)據(jù)保存為 Hadoop files骗绕。每一批 次的存儲(chǔ)文件名基于參數(shù)中的為"prefix- TIME_IN_MS[.suffix]"
foreachRDD(func) 最通用的輸出操作。將函數(shù) func 應(yīng)用于 DStream 的每一個(gè)RDD上

通用的輸出操作foreachRDD资昧,用來(lái)對(duì)DStream中的RDD進(jìn)行任意計(jì)算酬土,在foreachRDD中,可以重用Spark RDD 中所有的Action操作格带,需要注意的是:

  • 鏈接不要定義在Driver
  • 鏈接定義在RDD的foreach算子中撤缴,則遍歷RDD的每個(gè)元素時(shí)都創(chuàng)建鏈接,得不償失
  • 應(yīng)該在RDD的foreachPartition中定義鏈接叽唱,每個(gè)分區(qū)創(chuàng)建一個(gè)鏈接
  • 可以考慮使用連接池

與Kafka整合

官網(wǎng)

針對(duì)不同的spark屈呕、kafka版本,集成處理數(shù)據(jù)的方式分為兩種:Receiver Approach 和Direct Approach棺亭,不同集成版本處理方式的支持虎眨,可參考下圖:

與Kafka整合.png

對(duì)Kafka的支持分為兩個(gè)版本08(在高版本中將被廢棄)、010,兩個(gè)版本不兼容嗽桩。

Kafka-08接口

Receiver based Approach

基于Receiver的方式使用kafka舊版消費(fèi)者高階API實(shí)現(xiàn)岳守。對(duì)于所有的Receiver,通過(guò)Kafka接收的數(shù)據(jù)被存儲(chǔ)于Spark的Executors上碌冶,底層是寫入BlockManager中湿痢,默認(rèn)200ms生成一個(gè)block(spark.streaming.blockInterval)。然后由spark Streaming提交的job構(gòu)建BlockRDD扑庞,最終以Spark Core任務(wù)的形式運(yùn)行譬重,對(duì)應(yīng)的Receiver方式,有以下幾點(diǎn)需要注意:

  • Receiver 作為一個(gè)常駐線程調(diào)度到Executor上運(yùn)行罐氨,占用一個(gè)CPU
  • Receiver個(gè)數(shù)由KafkaUtil.createStream調(diào)用次數(shù)來(lái)決定臀规,一次一個(gè)Receiver。
  • Kafka中的topic分區(qū)并不能關(guān)聯(lián)產(chǎn)生在spark streaming中的rdd分區(qū)岂昭,增加在KafkaUtils.createStream()中的指定的topic分區(qū)數(shù)以现,僅僅增加了單個(gè)receiver消費(fèi)的topic線程數(shù),他不會(huì)正價(jià)處理數(shù)據(jù)中并行的Spark數(shù)量【即:topicMap[topic,num_threads]中约啊,value對(duì)應(yīng)的數(shù)值應(yīng)該是每個(gè)topic對(duì)應(yīng)的消費(fèi)線程數(shù)】
  • receiver默認(rèn)200ms生成一個(gè)block邑遏,可根據(jù)數(shù)據(jù)量大小調(diào)整block生成周期,一個(gè)block對(duì)應(yīng)RDD分區(qū)恰矩。
  • receiver接收的數(shù)據(jù)會(huì)放入到BlockManager记盒,每個(gè)Executor都會(huì)有一個(gè)BlockManager實(shí)例,由于數(shù)據(jù)本地性外傅,那些存在Receiver的Executor會(huì)被調(diào)度執(zhí)行更多的Task纪吮,就會(huì)導(dǎo)致某些executor比較空閑
  • 默認(rèn)情況下,Receiver是可能丟失數(shù)據(jù)的萎胰,可以通過(guò)設(shè)置spark.streaming.receiver.writeAheadLog.enable為true開(kāi)啟預(yù)寫日志機(jī)制碾盟,將數(shù)據(jù)先寫入到一個(gè)可靠的分布式文件系統(tǒng)(如HDFS),確保數(shù)據(jù)不丟失技竟,但會(huì)損失一定性能冰肴。
Kafka-08接口.png

Kafka-08接口(Receiver方式):

  • Offset保持在ZK中,系統(tǒng)管理
  • 對(duì)應(yīng)kafka的版本0.8.2.1+
  • 接口底層實(shí)現(xiàn)使用Kafka舊版消費(fèi)者高階API
  • DStream底層實(shí)現(xiàn)為BlockRDD
Receiver based Approach.png

Kafka-08接口(Receiver with WAL)

  • 增強(qiáng)了故障恢復(fù)的能力
  • 接收的數(shù)據(jù)與Dirver的元數(shù)據(jù)保存到HDFS
  • 增加了流式應(yīng)用處理的延遲
Direct Approach

Direct Approach是 Spark Streaming不使用Receiver集成kafka的方式榔组,在企業(yè)生產(chǎn)環(huán)境中使用較多熙尉。相較于Receiver,有以下特點(diǎn):

  • 不使用Receiver搓扯。減少不必要的CPU占用检痰,減少了Receiver接收數(shù)據(jù)寫入BlockManager,然后運(yùn)行時(shí)再通過(guò)BlockId锨推、網(wǎng)絡(luò)傳輸铅歼、磁盤讀取等來(lái)獲取數(shù)據(jù)的整個(gè)過(guò)程公壤,提高了效率;無(wú)需WAL谭贪,進(jìn)一步減少磁盤IO

  • Direct方式生成了RDD時(shí)KafkaRDD境钟,他的分區(qū)數(shù)與Kafka分區(qū)數(shù)保存一致锦担,便于控制并行度

    注意:在Shuffle或Repartition操作后生成的RDD俭识,這種對(duì)應(yīng)關(guān)系會(huì)失效

  • 可以手動(dòng)維護(hù)offset,實(shí)現(xiàn)Exactly once 語(yǔ)義(處理僅處理一次)


    Direct Approach.png

Kafka-010接口

Spark Streaming與kafka 0.10的整合洞渔,和0.8版本的 Direct 方式很像秕岛。Kafka的分區(qū) 和Spark的RDD分區(qū)是一一對(duì)應(yīng)的侵贵,可以獲取 offsets 和元數(shù)據(jù),API 使用起來(lái)沒(méi)有 顯著的區(qū)別。

添加依賴:

<!-- sparkStreaming 結(jié)合 kafka -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

不要手動(dòng)添加 org.apache.kafka 相關(guān)的依賴舔株,如kafka-clients。spark-streaming- kafka-0-10已經(jīng)包含相關(guān)的依賴了旨指,不同的版本會(huì)有不同程度的不兼容宏侍。

使用kafka010接口從Kafka中獲取數(shù)據(jù)

  • Kafka集群
  • Kakfa生產(chǎn)者發(fā)送數(shù)據(jù)
  • Spark Streaming程序接收數(shù)據(jù)
package com.hhb.spark.streaming.kafka

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-11-23 11:08
 **/
object KafkaProducer {


  def main(args: Array[String]): Unit = {

    val brokers = "linux121:9092,linux122:9092,linux123:9092"
    val topic = "topicA"
    val prop = new Properties()

    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.StringSerializer])
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.StringSerializer])

    // 創(chuàng)建生產(chǎn)者
    val producer = new KafkaProducer[String, String](prop)

    //向生產(chǎn)者里面發(fā)送信息
    for (i <- 0 to 1000000) {
      val record = new ProducerRecord[String, String](topic, i.toString, i.toString)
      producer.send(record)
      println(s"i -> $i")
      Thread.sleep(100)
    }
    producer.close()
  }
}

消費(fèi):

package com.hhb.spark.streaming.kafka

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-11-23 12:35
 **/
object KafkaDStream1 {


  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(2))
    ssc.sparkContext.setLogLevel("warn")
    val topics = Array("topicA")
    val groupId = "groupId"
    //定義Kafka相關(guān)參數(shù)
    val param: Map[String, Object] = getKafkaParam(groupId)

    val kafkaDStream = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, param)
    )
    kafkaDStream.foreachRDD((rdd, time) =>
      println(s"rdd.count=>${rdd.count()},time=> ${time}")
    )
    ssc.start()
    ssc.awaitTermination()
  }

  /**
   * 定義Kafka相關(guān)參數(shù)
   *
   * @return
   */
  def getKafkaParam(groupId: String): Map[String, Object] = {
    Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092,linux122:9092,linux123:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[org.apache.kafka.common.serialization.StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[org.apache.kafka.common.serialization.StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
    )
  }
}

LocationStrategies(本地策略)

  • LocationStrategies.PreferBrokers :如果Executor在Kafka集群中的某些節(jié)點(diǎn)上,可以使用這種策略医增,那么Executor中的數(shù)據(jù)都會(huì)來(lái)之當(dāng)前broker節(jié)點(diǎn)
  • LocationStrategies.PreferConsistent: 大多數(shù)情況下使用的策略慎皱,將Kafka分區(qū)均勻分布在Spark集群的Executor上
  • LocationStrategies.PreferFixed:如果節(jié)點(diǎn)之間的分區(qū)明顯分布不均,使用這種策略叶骨。通過(guò)一個(gè)Map指定將topic分區(qū)分布在哪些節(jié)點(diǎn)中

ConsumerStrategies(消費(fèi)策略)

  • ConsumerStrategies.Subscribe茫多,用來(lái)訂閱一組Topic
  • ConsumerStrategies.SubscribePattern,使用正則來(lái)指定感興趣的topic
  • ConsumerStrategies.Assign忽刽,指定固定分區(qū)的集合

這三種策略都有重載構(gòu)造函數(shù)天揖,允許指定特定分區(qū)的起始偏移量;使用 Subscribe 或 SubscribePattern 在運(yùn)行時(shí)能實(shí)現(xiàn)分區(qū)自動(dòng)發(fā)現(xiàn)。

kafka相關(guān)命令:

# 創(chuàng)建Topic
kafka-topics.sh --create --zookeeper localhost:2181/myKafka --topic topicA --partitions 3 --replication-factor 3

# 顯示topic信息
kafka-topics.sh --zookeeper localhost:2181/myKafka --topic topicA --describe
 
# 檢查 topic 的最大offset
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list linux121:9092,linux122:9092,linux123:9092 --topic topicA --time -1


# 檢查消費(fèi)者的offset
kafka-consumer-groups.sh --bootstrap-server linux121:9092 --describe --group groupId

# 重置消費(fèi)者offset
kafka-consumer-groups.sh --bootstrap-server linux121:9092,linux122:9092,linux123:9092 --group groupId --reset-offsets --execute --to-offset 0 --topic topicA

Offset管理

Spark Streaming集成Kafka跪帝,允許Kafka中讀取一個(gè)或者多個(gè)topic數(shù)據(jù)今膊,一個(gè)Kafka Topic包含一個(gè)或多個(gè)分區(qū),每個(gè)分區(qū)中的消息順序存儲(chǔ)伞剑,并使用offset來(lái)標(biāo)記消息的位置斑唬,開(kāi)發(fā)者可以在SparkStreaming應(yīng)用中通過(guò)offset來(lái)控制數(shù)據(jù)的讀取位置。

Offsets管理對(duì)于保證流式應(yīng)用在整個(gè)生命周期中數(shù)據(jù)的連貫性是非常重要的纸泄,如果在應(yīng)用停止或報(bào)錯(cuò)退出之前沒(méi)有將offset持久化保存赖钞,該信息就會(huì)丟失,那么Spark Streaming就沒(méi)有辦法從上次停止或報(bào)錯(cuò)的位置繼續(xù)消費(fèi)Kafka中的消息聘裁。

獲取偏移量(Obtaining Offsets)

Spark Streaming與kafka整合時(shí)雪营,允許獲取其消費(fèi)的 offset ,具體方法如下:

 package com.hhb.spark.streaming.kafka

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-11-23 12:35
 **/
object KafkaDStream1 {


  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(2))
    ssc.sparkContext.setLogLevel("warn")
    val topics = Array("topicA")
    val groupId = "groupId"
    //定義Kafka相關(guān)參數(shù)
    val param: Map[String, Object] = getKafkaParam(groupId)


    val kafkaDStream = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, param)
    )

    //topic:topicA,partition:1,from: 0,to:90
    //topic:topicA,partition:0,from: 0,to:122
    //topic:topicA,partition:2,from: 0,to:98
    //topic:topicA,partition:1,from: 90,to:90
    //topic:topicA,partition:0,from: 122,to:122
    //topic:topicA,partition:2,from: 98,to:98
    kafkaDStream.foreachRDD { rdd =>
      val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      ranges.foreach { range =>
        println(s"topic:${range.topic},partition:${range.partition},from: ${range.fromOffset},to:${range.untilOffset}")
      }
    }

    //    kafkaDStream.foreachRDD((rdd, time) =>
    //      println(s"rdd.count=>${rdd.count()},time=> ${time}")
    //    )


    ssc.start()
    ssc.awaitTermination()
  }


  /**
   * 定義Kafka相關(guān)參數(shù)
   *
   * @return
   */
  def getKafkaParam(groupId: String): Map[String, Object] = {
    Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092,linux122:9092,linux123:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[org.apache.kafka.common.serialization.StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[org.apache.kafka.common.serialization.StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
    )
  }
}

注意:對(duì)HasOffsetRanges的類型轉(zhuǎn)換只有在對(duì)createDirectStream調(diào)用的第一個(gè)方法中完成才會(huì)成功衡便,而不是在隨后的方法鏈中献起。RDD分區(qū)和Kafka分區(qū)之間的對(duì)應(yīng)關(guān)系在shuffle或重分區(qū)后丟失洋访,如reduceByKey或Window。

存儲(chǔ)偏移量(Storing Offsets)

在Streaming 程序失敗的情況下谴餐,kafka交付語(yǔ)義取決于如何以及何時(shí)存儲(chǔ)偏移量姻政。Spark 輸出操作的語(yǔ)義為 at-least-once

如果要失效EOS語(yǔ)義(Exactly Once Semantics),必須在冪等的輸出之后存儲(chǔ)偏移量或者將存儲(chǔ)偏移量與輸出放在一個(gè)事務(wù)中,可以安裝增加可靠性(和代碼復(fù)雜度)的順序使用一下選項(xiàng)來(lái)存儲(chǔ)偏移量

  • CheckPoint

    CheckPoint是對(duì)Spark Steaming運(yùn)行過(guò)程中的元數(shù)據(jù)和每個(gè)RDDs的數(shù)據(jù)狀態(tài)保存到一個(gè)持久化系統(tǒng)中岂嗓,這里也包含了offset汁展,一般是HDFS、S3厌殉,如果應(yīng)用程序或者集群掛了食绿,可以迅速恢復(fù),如果Streamging 程序的代碼變了公罕,重新打包執(zhí)行就會(huì)出翔反序列化異常的問(wèn)題器紧。這是因?yàn)镃heckPoint首次持久化時(shí)會(huì)講整個(gè)jar序列化,以便重啟時(shí)恢復(fù)楼眷,重新打包之后铲汪,新舊代碼邏輯不同,就會(huì)報(bào)錯(cuò)或仍然執(zhí)行舊版代碼罐柳。要解決這個(gè)問(wèn)題掌腰,只能將HDFS上的checkpoint文件刪除,但是這樣也會(huì)刪除Kafka的offset信息

  • Kafka

    默認(rèn)情況下硝清,消費(fèi)者定期自動(dòng)提交偏移量辅斟,它將偏移量存儲(chǔ)在一個(gè)特殊的Kafka主題中(__consumer_offsets)。但在某些情況下芦拿,這將導(dǎo)致問(wèn)題士飒,因?yàn)橄⒖赡芤呀?jīng) 被消費(fèi)者從Kafka拉去出來(lái),但是還沒(méi)被處理蔗崎〗湍唬可以將 enable.auto.commit 設(shè)置為 false ,在 Streaming 程序輸出結(jié)果之后缓苛,手動(dòng) 提交偏移到kafka芳撒。與檢查點(diǎn)相比,使用Kafka保存偏移量的優(yōu)點(diǎn)是無(wú)論應(yīng)用程序代碼如何更改未桥,偏移量 仍然有效笔刹。

    stream.foreachRDD { rdd => val offsetRanges =
    rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    // 在輸出操作完成之后,手工提交偏移量;此時(shí)將偏移量提交到 Kafka 的消息隊(duì)列中
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }
    

    與HasOffsetRanges一樣冬耿,只有在createDirectStream的結(jié)果上調(diào)用時(shí)舌菜,轉(zhuǎn)換到 CanCommitOffsets才會(huì)成功,而不是在轉(zhuǎn)換之后亦镶。commitAsync調(diào)用是線程安全 的日月,但必須在輸出之后執(zhí)行袱瓮。

  • 自定義存儲(chǔ)

    Offsets可以通過(guò)多種方式來(lái)管理,但是一般來(lái)說(shuō)遵循下面的步驟:

    • 在 DStream 初始化的時(shí)候爱咬,需要指定每個(gè)分區(qū)的offset用于從指定位置讀取數(shù)據(jù)
    • 讀取并處理消息
    • 處理完之后存儲(chǔ)結(jié)果數(shù)據(jù)
    • 用虛線圈存儲(chǔ)和提交offset尺借,強(qiáng)調(diào)用戶可能會(huì)執(zhí)行一系列操作來(lái)滿足他們更加嚴(yán)格的語(yǔ)義要求。這包括冪等操作和通過(guò)原子操作的方式存儲(chǔ)offset
    • 將 offsets 保存在外部持久化數(shù)據(jù)庫(kù)如 HBase精拟、Kafka燎斩、HDFS、ZooKeeper串前、 Redis瘫里、MySQL ... ...
自定義存儲(chǔ).png

可以將 Offsets 存儲(chǔ)到HDFS中实蔽,但這并不是一個(gè)好的方案荡碾。因?yàn)镠DFS延遲有點(diǎn)高, 此外將每批次數(shù)據(jù)的offset存儲(chǔ)到HDFS中還會(huì)帶來(lái)小文件問(wèn)題;

可以將 Offset 存儲(chǔ)到保存ZK中局装,但是將ZK作為存儲(chǔ)用坛吁,也并不是一個(gè)明智的選擇, 同時(shí)ZK也不適合頻繁的讀寫操作;

Redis管理的Offset

要想將Offset保存到外部存儲(chǔ)中铐尚,關(guān)鍵要實(shí)現(xiàn)以下幾個(gè)功能:

  • Streaming程序啟動(dòng)時(shí)拨脉,從外部存儲(chǔ)獲取保存的Offsets(執(zhí)行一次)
  • 在foreachRDD中,每個(gè)批次數(shù)據(jù)處理之后宣增,更新外部存儲(chǔ)的offsets(多次執(zhí)行)

案例一:使用自定義的offsets玫膀,從kafka讀數(shù)據(jù);處理完數(shù)據(jù)后打印offsets

package com.hhb.spark.streaming.kafka

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @description: 使用自定義offset,從kafka讀數(shù)據(jù)爹脾,處理完數(shù)據(jù)后打印offsets
 * @date: 2020-11-23 16:07
 **/
object KafkaDStream2 {


  def main(args: Array[String]): Unit = {


    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    val ssc = new StreamingContext(conf, Seconds(2))
    ssc.sparkContext.setLogLevel("warn")
    val topics = Array("topicA")
    val groupId = "groupId"
    val param: Map[String, Object] = getKafkaMap(groupId)
    val offsets: Map[TopicPartition, Long] = Map(
      new TopicPartition(topics(0), 0) -> 50,
      new TopicPartition(topics(0), 1) -> 60,
      new TopicPartition(topics(0), 2) -> 80,
    )

    val kafkaDStream = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, param, offsets)
    )
    kafkaDStream.foreachRDD { (rdd, times) =>
      println(s"rdd.count = ${rdd.count()},time=>${times}")
      val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      ranges.foreach { range =>
        println(s"partition=>${range.partition},topic => ${range.topic},from=> ${range.fromOffset} ,to=>${range.untilOffset}")
      }
    }

    ssc.start()
    ssc.awaitTermination()

  }

  def getKafkaMap(groupId: String): Map[String, Object] = {
    Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092,linux122:9092,linux123:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
    )
  }
}

案例二:根據(jù) key 從 Redis 獲取offsets帖旨,根據(jù)該offsets從kafka讀數(shù)據(jù);處理完數(shù)據(jù) 后將offsets保存到 Redis

Redis管理的Offsets:

1、數(shù)據(jù)結(jié)構(gòu)選擇:Hash;key灵妨、field解阅、value 
Key:kafka:topic:TopicName:groupid 
Field:partition
Value:offset
2、從 Redis 中獲取保存的offsets 
3泌霍、消費(fèi)數(shù)據(jù)后將offsets保存到redis

工具類(Redis讀取货抄、保存offset)

package com.hhb.spark.streaming.kafka

import java.util
import java.util.{HashSet, Set}

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{HostAndPort, Jedis, JedisCluster, JedisPool, JedisPoolConfig}

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-11-23 16:59
 **/
object OffsetsRedisUtils {

  private val config = new JedisPoolConfig

  val jedisClusterNode: util.Set[HostAndPort] = new util.HashSet[HostAndPort]
  jedisClusterNode.add(new HostAndPort("59.110.241.53", 7001))
  jedisClusterNode.add(new HostAndPort("59.110.241.53", 7002))
  jedisClusterNode.add(new HostAndPort("59.110.241.53", 7003))
  jedisClusterNode.add(new HostAndPort("59.110.241.53", 7004))
  jedisClusterNode.add(new HostAndPort("59.110.241.53", 7005))
  jedisClusterNode.add(new HostAndPort("59.110.241.53", 7006))
  jedisClusterNode.add(new HostAndPort("59.110.241.53", 7007))

  //最大鏈接
  config.setMaxTotal(30)

  //最大空閑
  config.setMaxIdle(10)

  private val topicKeyPrefix = "kafka:topic"


  private def getJcd() = new JedisCluster(jedisClusterNode, config)


  //獲取key信息
  private def getKey(topic: String, groupId: String): String = {
    topicKeyPrefix + ":" + topic + ":" + groupId
  }


  def getOffset(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
    val cluster = getJcd()
    val result = topics.map { topic =>
      import scala.collection.JavaConverters._
      //獲取該topic、groupId下所有的信息
      cluster.hgetAll(getKey(topic, groupId))
        //轉(zhuǎn)換成scala
        .asScala
        //再次遍歷朱转,將分區(qū)蟹地,offset 轉(zhuǎn)換成(new TopicPartition(topic, k.toInt), v.toLong)結(jié)構(gòu)
        .map { case (k, v) => (new TopicPartition(topic, k.toInt), v.toLong) }
    }
    cluster.close()
    //壓平數(shù)據(jù)后轉(zhuǎn)map
    result.flatten.toMap
  }

  //自己的方法
  def saveOffset(ranges: Array[OffsetRange], groupId: String): Unit = {
    val cluster = getJcd()
    ranges.foreach { range =>
      val key = getKey(range.topic, groupId)
      println(s"key=>${key},part=>${range.partition.toString},offset=>${range.untilOffset.toString}")
      cluster.hset(key, range.partition.toString, range.untilOffset.toString)
    }
    cluster.close()
  }

  //老師的方法
  def saveOffset2(ranges: Array[OffsetRange], groupId: String): Unit = {
    val cluster = getJcd()
    val result: Map[String, Array[(String, String)]] = ranges.map(range => (range.topic, (range.partition.toString -> range.untilOffset.toString)))
      .groupBy(_._1)
      .map { case (topic, buffer) => (topic, buffer.map(_._2)) }
    result.map { r =>
      import scala.collection.JavaConverters._
      val offsets = r._2.toMap.asJava
      cluster.hmset(getKey(r._1, groupId), offsets)
    }
  }
}

KafkaDStream(從kafka獲取數(shù)據(jù),使用 Redis 保存offsets)

package com.hhb.spark.streaming.kafka

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-11-23 16:59
 **/
object KafkaDStream3 {

  def main(args: Array[String]): Unit = {


    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    val ssc = new StreamingContext(conf, Seconds(2))
    ssc.sparkContext.setLogLevel("warn")
    val topics = Array("topicA")
    val groupId = "groupId"
    val param: Map[String, Object] = getKafkaMap(groupId)
    val offsets: Map[TopicPartition, Long] = OffsetsRedisUtils.getOffset(topics, groupId)
    val kafkaDStream = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, param, offsets)
    )
    kafkaDStream.foreachRDD { (rdd, times) =>
      if(!rdd.isEmpty()){
        //數(shù)據(jù)處理邏輯藤为,這里只是輸出
        println(s"rdd.count = ${rdd.count()},time=>${times}")
        val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        OffsetsRedisUtils.saveOffset(ranges, groupId)
      }
    }

    ssc.start()
    ssc.awaitTermination()

  }

  def getKafkaMap(groupId: String): Map[String, Object] = {
    Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092,linux122:9092,linux123:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
    )
  }

}
spark下任務(wù)一錯(cuò)題1.png
spark下任務(wù)一錯(cuò)題2.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末怪与,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子凉蜂,更是在濱河造成了極大的恐慌琼梆,老刑警劉巖性誉,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異茎杂,居然都是意外死亡错览,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門煌往,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)倾哺,“玉大人,你說(shuō)我怎么就攤上這事刽脖⌒吆#” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵曲管,是天一觀的道長(zhǎng)却邓。 經(jīng)常有香客問(wèn)我,道長(zhǎng)院水,這世上最難降的妖魔是什么腊徙? 我笑而不...
    開(kāi)封第一講書人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮檬某,結(jié)果婚禮上撬腾,老公的妹妹穿的比我還像新娘。我一直安慰自己恢恼,他們只是感情好民傻,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著场斑,像睡著了一般漓踢。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上和簸,一...
    開(kāi)封第一講書人閱讀 51,292評(píng)論 1 301
  • 那天彭雾,我揣著相機(jī)與錄音,去河邊找鬼锁保。 笑死薯酝,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的爽柒。 我是一名探鬼主播吴菠,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼浩村!你這毒婦竟也來(lái)了做葵?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤心墅,失蹤者是張志新(化名)和其女友劉穎酿矢,沒(méi)想到半個(gè)月后榨乎,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡瘫筐,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年蜜暑,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片策肝。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡肛捍,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出之众,到底是詐尸還是另有隱情拙毫,我是刑警寧澤,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布棺禾,位于F島的核電站缀蹄,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏帘睦。R本人自食惡果不足惜袍患,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望竣付。 院中可真熱鬧,春花似錦滞欠、人聲如沸古胆。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)逸绎。三九已至,卻和暖如春夭谤,著一層夾襖步出監(jiān)牢的瞬間棺牧,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工朗儒, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留颊乘,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓醉锄,卻偏偏與公主長(zhǎng)得像乏悄,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子恳不,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容