Kafka+Spark Streaming本地詞頻統(tǒng)計

前幾天簡單了解了Hadoop(HDFS,MR,YRAN)之后敢会,進(jìn)一步了解一下現(xiàn)在使用比較多的Spark生態(tài)--Sprak Streaming疏橄。

簡介

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.

Spark Streaming 是基于核心Spark API擴(kuò)展的一個可擴(kuò)展幔嫂,高吞吐量,容錯處理的實時流處理框架旅东。

SparkStreamingProccess.jpg

其核心思想是 將接受的數(shù)據(jù)流進(jìn)行數(shù)據(jù)分批處理混聊,然后經(jīng)過Spark 引擎處理以數(shù)據(jù)分片方式輸出到結(jié)果流。

核心概念

StreamingContext

要初始化 Spark Streaming 程序茸塞,必須創(chuàng)建一個 StreamingContext 對象,它是所有 Spark Streaming 功能的主要入口點查剖。

Discretized Streams (DStreams)

Discretized Stream 或 DStream 是 Spark Streaming 提供的基本抽象钾虐。 它表示一個連續(xù)的數(shù)據(jù)流,可以是從源接收到的輸入數(shù)據(jù)流笋庄,也可以是通過轉(zhuǎn)換輸入流生成的處理后的數(shù)據(jù)流效扫。 在內(nèi)部,DStream 由一系列連續(xù)的 RDD 表示直砂,這是 Spark 對不可變的分布式數(shù)據(jù)集的抽象菌仁。 DStream 中的每個 RDD 都包含來自某個區(qū)間的數(shù)據(jù),如下圖所示静暂。

RDD.jpg

對 DStream 應(yīng)用的任何操作都會轉(zhuǎn)換為對底層 RDD 的操作济丘。對行 DStream 中的每個 RDD 應(yīng)用 flatMap 操作以生成單詞 DStream 的 RDD。 這如下圖所示。

RDDOPERATION.jpg

這塊就比較像Spring5的 WebFlux里中的 FlatMap操作摹迷。

Input DStreams and Receivers

Input DStreams是DStreams 從流源接收的輸入數(shù)據(jù)流的代表疟赊。

每個輸入 DStream(除了文件流,本節(jié)稍后討論)都與一個接相關(guān)聯(lián)的Receiver峡碉。

詞頻統(tǒng)計代碼

搭建Kafka

可以參考我另外一篇云服務(wù)器安裝Kafka集群

本地生產(chǎn)者測試代碼

為了測試方便近哟,我使用SpringBoot 去搭建一個Kafka生產(chǎn)者的demo

工程目錄如下

producerDemo.jpg

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demok</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demok</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

YML 配置文件

spring:
  kafka:
    bootstrap-servers: xxx.xxx.xxx.xxx:9092  #你的KAFKA地址
    producer:
      # 發(fā)生錯誤后,消息重發(fā)的次數(shù)鲫寄。
      retries: 0
      #當(dāng)有多個消息需要被發(fā)送到同一個分區(qū)時吉执,生產(chǎn)者會把它們放在同一個批次里。該參數(shù)指定了一個批次可以使用的內(nèi)存大小地来,按照字節(jié)數(shù)計算鼠证。
      batch-size: 16384
      # 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。
      buffer-memory: 33554432
      # 鍵的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生產(chǎn)者在成功寫入消息之前不會等待任何來自服務(wù)器的響應(yīng)靠抑。
      # acks=1 : 只要集群的首領(lǐng)節(jié)點收到消息,生產(chǎn)者就會收到一個來自服務(wù)器成功響應(yīng)适掰。
      # acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點全部收到消息時颂碧,生產(chǎn)者才會收到一個來自服務(wù)器的成功響應(yīng)。
      acks: 1
    consumer:
      # 自動提交的時間間隔 在spring boot 2.X 版本中這里采用的是值的類型為Duration 需要符合特定的格式类浪,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:
      # latest(默認(rèn)值)在偏移量無效的情況下载城,消費者將從最新的記錄開始讀取數(shù)據(jù)(在消費者啟動之后生成的記錄)
      # earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區(qū)的記錄
      auto-offset-reset: earliest
      # 是否自動提交偏移量费就,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失诉瓦,可以把它設(shè)置為false,然后手動提交偏移量
      enable-auto-commit: false
      # 鍵的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在偵聽器容器中運行的線程數(shù)。
      concurrency: 5
      #listner負(fù)責(zé)ack力细,每調(diào)用一次睬澡,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false


KafkaProducer

package com.example.demok.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * description: KafkaProducer <br>
 * date: 2021/10/22 9:32 <br>
 * author: Neal <br>
 * version: 1.0 <br>
 */
@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    //自定義topic
    public static final String TOPIC_TEST = "test";


    public void send(String str) {
        //發(fā)送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, str);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息失敗:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                System.out.println(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }

}

測試類 DemokApplicationTests

package com.example.demok;

import com.example.demok.producer.KafkaProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class DemokApplicationTests {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    void contextLoads() {
        //聲明三條消息
        String[] arrStr = {"Deer,Bear,River","Car,Car,River","Deer,Car,Bear"};
        //循環(huán)向Kafka發(fā)送消息
        for (int i = 0; i < 3; i++) {
            kafkaProducer.send(arrStr[i]);
        }
    }
}

生產(chǎn)者代碼樣例很簡單眠蚂,就是像上一篇文章一樣煞聪,將3行數(shù)據(jù)寫入數(shù)組中,循環(huán)發(fā)送到Kafka的test Topic 中逝慧。

Spark Streaming 詞頻統(tǒng)計代碼

詞頻統(tǒng)計代碼工程目錄很簡單 就一個類 我就不貼圖了直接貼代碼,我使用的是Scala 2.11.8 版本

pom文件

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>kafkaSparkStream</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>kafkaSparkStream</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <spark.version>3.2.0</spark.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

統(tǒng)計代碼

package org.example.ss
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/**
 * description: App <br>
 * date: 2021/10/23 14:47 <br>
 * author: Neal <br>
 * version: 1.0 <br>
 */
object App {

  def main(args: Array[String]): Unit = {
    //Spark配置
    val sparkConfig  = new SparkConf()
      .setAppName(this.getClass.getSimpleName)
      .setMaster("local[2]")
    //初始化 Stream上下文 10秒計算一次
    val ssc = new StreamingContext(sparkConfig,Seconds(10))

    //Kafka 參數(shù)配置
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "xxx.xxx.xxxx.xxx:9092",      //kafka地址昔脯,集群可以以逗號分隔
      "key.deserializer" -> classOf[StringDeserializer],    //序列化類
      "value.deserializer" -> classOf[StringDeserializer], //序列化類
      "group.id" -> "use_a_separate_group_id_for_each_stream",  
      "auto.offset.reset" -> "latest",   //消費最新的消息
      "enable.auto.commit" -> (false: java.lang.Boolean)   //不自動提交
    )
    //設(shè)置消費主題
    val topics = Array("test")
    //初始化 連接Kafka 獲取到InputStream
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    stream
      //取出消息,逗號分隔后,將每一個值放入Map中 key 為值,value 為1
      .flatMap(_.value().split(",").map((_,1)))
      //使用key 值進(jìn)行計算
      .reduceByKey(_ + _).print()
    //啟動 SparkStreaming
    ssc.start()
    ssc.awaitTermination()
  }
}

這里由于Spark 是Scala語言寫的笛臣,因此如果想理解里面函數(shù)以及對應(yīng)的參數(shù)符號含義云稚,建議去簡單看一下Scala 的相關(guān)教程入門一下就可以看懂了。

測試以及結(jié)果

啟動Spark Streaming 詞頻統(tǒng)計程序

啟動后可以看到如下輸出

ss_re1.jpg

該輸出會10秒一刷新沈堡,因為代碼中配置的是每10S計算一次

啟動生產(chǎn)者測試代碼,向Kafka發(fā)送數(shù)據(jù)

producer_re.jpg

可以看到生產(chǎn)者發(fā)送了3條消息到Kafka

查看計算結(jié)果

我們回到頻次統(tǒng)計程序的輸出頁面静陈,可以看到其計算后的輸出結(jié)果。

ss_re2.jpg

小結(jié)

由于是簡單的Spark Streaming的應(yīng)用诞丽,只是為了熟悉Spark 流處理計算框架窿给,為以后的技術(shù)方案做技術(shù)積累贵白。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市崩泡,隨后出現(xiàn)的幾起案子禁荒,更是在濱河造成了極大的恐慌,老刑警劉巖角撞,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件呛伴,死亡現(xiàn)場離奇詭異,居然都是意外死亡谒所,警方通過查閱死者的電腦和手機(jī)热康,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來劣领,“玉大人姐军,你說我怎么就攤上這事〖馓裕” “怎么了奕锌?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長村生。 經(jīng)常有香客問我惊暴,道長,這世上最難降的妖魔是什么趁桃? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任辽话,我火速辦了婚禮,結(jié)果婚禮上卫病,老公的妹妹穿的比我還像新娘油啤。我一直安慰自己,他們只是感情好蟀苛,可當(dāng)我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布村砂。 她就那樣靜靜地躺著,像睡著了一般屹逛。 火紅的嫁衣襯著肌膚如雪础废。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天罕模,我揣著相機(jī)與錄音评腺,去河邊找鬼。 笑死淑掌,一個胖子當(dāng)著我的面吹牛蒿讥,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼芋绸,長吁一口氣:“原來是場噩夢啊……” “哼媒殉!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起摔敛,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤廷蓉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后马昙,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體桃犬,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年行楞,在試婚紗的時候發(fā)現(xiàn)自己被綠了攒暇。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡子房,死狀恐怖形用,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情证杭,我是刑警寧澤田度,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站躯砰,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏携丁。R本人自食惡果不足惜琢歇,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望梦鉴。 院中可真熱鬧李茫,春花似錦、人聲如沸肥橙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽存筏。三九已至宠互,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間椭坚,已是汗流浹背予跌。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留善茎,地道東北人券册。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親烁焙。 傳聞我的和親對象是個殘疾皇子航邢,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,877評論 2 345