前幾天簡單了解了Hadoop(HDFS,MR,YRAN)之后敢会,進(jìn)一步了解一下現(xiàn)在使用比較多的Spark生態(tài)--Sprak Streaming疏橄。
簡介
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.
Spark Streaming 是基于核心Spark API擴(kuò)展的一個可擴(kuò)展幔嫂,高吞吐量,容錯處理的實時流處理框架旅东。
其核心思想是 將接受的數(shù)據(jù)流進(jìn)行數(shù)據(jù)分批處理混聊,然后經(jīng)過Spark 引擎處理以數(shù)據(jù)分片方式輸出到結(jié)果流。
核心概念
StreamingContext
要初始化 Spark Streaming 程序茸塞,必須創(chuàng)建一個 StreamingContext 對象,它是所有 Spark Streaming 功能的主要入口點查剖。
Discretized Streams (DStreams)
Discretized Stream 或 DStream 是 Spark Streaming 提供的基本抽象钾虐。 它表示一個連續(xù)的數(shù)據(jù)流,可以是從源接收到的輸入數(shù)據(jù)流笋庄,也可以是通過轉(zhuǎn)換輸入流生成的處理后的數(shù)據(jù)流效扫。 在內(nèi)部,DStream 由一系列連續(xù)的 RDD 表示直砂,這是 Spark 對不可變的分布式數(shù)據(jù)集的抽象菌仁。 DStream 中的每個 RDD 都包含來自某個區(qū)間的數(shù)據(jù),如下圖所示静暂。
對 DStream 應(yīng)用的任何操作都會轉(zhuǎn)換為對底層 RDD 的操作济丘。對行 DStream 中的每個 RDD 應(yīng)用 flatMap 操作以生成單詞 DStream 的 RDD。 這如下圖所示。
這塊就比較像Spring5的 WebFlux里中的 FlatMap操作摹迷。
Input DStreams and Receivers
Input DStreams是DStreams 從流源接收的輸入數(shù)據(jù)流的代表疟赊。
每個輸入 DStream(除了文件流,本節(jié)稍后討論)都與一個接相關(guān)聯(lián)的Receiver峡碉。
詞頻統(tǒng)計代碼
搭建Kafka
可以參考我另外一篇云服務(wù)器安裝Kafka集群
本地生產(chǎn)者測試代碼
為了測試方便近哟,我使用SpringBoot 去搭建一個Kafka生產(chǎn)者的demo
工程目錄如下
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demok</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demok</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
YML 配置文件
spring:
kafka:
bootstrap-servers: xxx.xxx.xxx.xxx:9092 #你的KAFKA地址
producer:
# 發(fā)生錯誤后,消息重發(fā)的次數(shù)鲫寄。
retries: 0
#當(dāng)有多個消息需要被發(fā)送到同一個分區(qū)時吉执,生產(chǎn)者會把它們放在同一個批次里。該參數(shù)指定了一個批次可以使用的內(nèi)存大小地来,按照字節(jié)數(shù)計算鼠证。
batch-size: 16384
# 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。
buffer-memory: 33554432
# 鍵的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生產(chǎn)者在成功寫入消息之前不會等待任何來自服務(wù)器的響應(yīng)靠抑。
# acks=1 : 只要集群的首領(lǐng)節(jié)點收到消息,生產(chǎn)者就會收到一個來自服務(wù)器成功響應(yīng)适掰。
# acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點全部收到消息時颂碧,生產(chǎn)者才會收到一個來自服務(wù)器的成功響應(yīng)。
acks: 1
consumer:
# 自動提交的時間間隔 在spring boot 2.X 版本中這里采用的是值的類型為Duration 需要符合特定的格式类浪,如1S,1M,2H,5D
auto-commit-interval: 1S
# 該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:
# latest(默認(rèn)值)在偏移量無效的情況下载城,消費者將從最新的記錄開始讀取數(shù)據(jù)(在消費者啟動之后生成的記錄)
# earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區(qū)的記錄
auto-offset-reset: earliest
# 是否自動提交偏移量费就,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失诉瓦,可以把它設(shè)置為false,然后手動提交偏移量
enable-auto-commit: false
# 鍵的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在偵聽器容器中運行的線程數(shù)。
concurrency: 5
#listner負(fù)責(zé)ack力细,每調(diào)用一次睬澡,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
KafkaProducer
package com.example.demok.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* description: KafkaProducer <br>
* date: 2021/10/22 9:32 <br>
* author: Neal <br>
* version: 1.0 <br>
*/
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
//自定義topic
public static final String TOPIC_TEST = "test";
public void send(String str) {
//發(fā)送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, str);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息失敗:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
System.out.println(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息成功:" + stringObjectSendResult.toString());
}
});
}
}
測試類 DemokApplicationTests
package com.example.demok;
import com.example.demok.producer.KafkaProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class DemokApplicationTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
void contextLoads() {
//聲明三條消息
String[] arrStr = {"Deer,Bear,River","Car,Car,River","Deer,Car,Bear"};
//循環(huán)向Kafka發(fā)送消息
for (int i = 0; i < 3; i++) {
kafkaProducer.send(arrStr[i]);
}
}
}
生產(chǎn)者代碼樣例很簡單眠蚂,就是像上一篇文章一樣煞聪,將3行數(shù)據(jù)寫入數(shù)組中,循環(huán)發(fā)送到Kafka的test
Topic 中逝慧。
Spark Streaming 詞頻統(tǒng)計代碼
詞頻統(tǒng)計代碼工程目錄很簡單 就一個類 我就不貼圖了直接貼代碼,我使用的是Scala 2.11.8 版本
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafkaSparkStream</artifactId>
<version>1.0-SNAPSHOT</version>
<name>kafkaSparkStream</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>3.2.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
統(tǒng)計代碼
package org.example.ss
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/**
* description: App <br>
* date: 2021/10/23 14:47 <br>
* author: Neal <br>
* version: 1.0 <br>
*/
object App {
def main(args: Array[String]): Unit = {
//Spark配置
val sparkConfig = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[2]")
//初始化 Stream上下文 10秒計算一次
val ssc = new StreamingContext(sparkConfig,Seconds(10))
//Kafka 參數(shù)配置
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "xxx.xxx.xxxx.xxx:9092", //kafka地址昔脯,集群可以以逗號分隔
"key.deserializer" -> classOf[StringDeserializer], //序列化類
"value.deserializer" -> classOf[StringDeserializer], //序列化類
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest", //消費最新的消息
"enable.auto.commit" -> (false: java.lang.Boolean) //不自動提交
)
//設(shè)置消費主題
val topics = Array("test")
//初始化 連接Kafka 獲取到InputStream
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream
//取出消息,逗號分隔后,將每一個值放入Map中 key 為值,value 為1
.flatMap(_.value().split(",").map((_,1)))
//使用key 值進(jìn)行計算
.reduceByKey(_ + _).print()
//啟動 SparkStreaming
ssc.start()
ssc.awaitTermination()
}
}
這里由于Spark 是Scala語言寫的笛臣,因此如果想理解里面函數(shù)以及對應(yīng)的參數(shù)符號含義云稚,建議去簡單看一下Scala 的相關(guān)教程入門一下就可以看懂了。
測試以及結(jié)果
啟動Spark Streaming 詞頻統(tǒng)計程序
啟動后可以看到如下輸出
該輸出會10秒一刷新沈堡,因為代碼中配置的是每10S計算一次
啟動生產(chǎn)者測試代碼,向Kafka發(fā)送數(shù)據(jù)
可以看到生產(chǎn)者發(fā)送了3條消息到Kafka
查看計算結(jié)果
我們回到頻次統(tǒng)計程序的輸出頁面静陈,可以看到其計算后的輸出結(jié)果。
小結(jié)
由于是簡單的Spark Streaming的應(yīng)用诞丽,只是為了熟悉Spark 流處理計算框架窿给,為以后的技術(shù)方案做技術(shù)積累贵白。