Spark Streaming是構(gòu)建在Spark Core的RDD基礎(chǔ)之上的,與此同時Spark Streaming引入了一個新的概念:DStream(Discretized Stream,離散化數(shù)據(jù)流),表示連續(xù)不斷的數(shù)據(jù)流带欢。DStream抽象是Spark Streaming的流處理模型漩绵,在內(nèi)部實現(xiàn)上鼻由,Spark Streaming會對輸入數(shù)據(jù)按照時間間隔(如1秒)分段寇仓,每一段數(shù)據(jù)轉(zhuǎn)換為Spark中的RDD,這些分段就是Dstream器联,并且對DStream的操作都最終轉(zhuǎn)變?yōu)閷ο鄳?yīng)的RDD的操作二汛。
Spark SQL 是 Spark 用于結(jié)構(gòu)化數(shù)據(jù)(structured data)處理的 Spark 模塊。Spark SQL 的前身是Shark拨拓,Shark是基于 Hive 所開發(fā)的工具肴颊,它修改了下圖所示的右下角的內(nèi)存管理、物理計劃渣磷、執(zhí)行三個模塊婿着,并使之能運行在 Spark 引擎上。
(1)pom依賴:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.11</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.66</version>
</dependency>
</dependencies>
(2)定義消息對象
package com.pojo;
import java.io.Serializable;
import java.util.Date;
/**
* Created by lj on 2022-07-13.
*/
public class WaterSensor implements Serializable {
public String id;
public long ts;
public int vc;
public WaterSensor(){
}
public WaterSensor(String id,long ts,int vc){
this.id = id;
this.ts = ts;
this.vc = vc;
}
public int getVc() {
return vc;
}
public void setVc(int vc) {
this.vc = vc;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public long getTs() {
return ts;
}
public void setTs(long ts) {
this.ts = ts;
}
}
(3)構(gòu)建數(shù)據(jù)生產(chǎn)者
package com.producers;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;
/**
* Created by lj on 2022-07-12.
*/
public class Socket_Producer {
public static void main(String[] args) throws IOException {
try {
ServerSocket ss = new ServerSocket(9999);
System.out.println("啟動 server ....");
Socket s = ss.accept();
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
String response = "java,1,2";
//每 2s 發(fā)送一次消息
int i = 0;
Random r=new Random(); //不傳入種子
String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};
while(true){
response= lang[r.nextInt(lang.length)]+ i + "," + i + "," + i+"\n";
System.out.println(response);
try{
bw.write(response);
bw.flush();
i++;
}catch (Exception ex){
System.out.println(ex.getMessage());
}
Thread.sleep(1000 * 30);
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
(4)通過sparkstreaming接入socket數(shù)據(jù)源醋界,sparksql計算結(jié)果打印輸出:
package com.examples;
import com.pojo.WaterSensor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
* Created by lj on 2022-07-16.
*/
public class SparkSql_Socket1 {
private static String appName = "spark.streaming.demo";
private static String master = "local[*]";
private static String host = "localhost";
private static int port = 9999;
public static void main(String[] args) {
//初始化sparkConf
SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);
//獲得JavaStreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));
//從socket源獲取數(shù)據(jù)
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);
//將 DStream 轉(zhuǎn)換成 DataFrame 并且運行sql查詢
lines.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
@Override
public void call(JavaRDD<String> rdd, Time time) {
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
//通過反射將RDD轉(zhuǎn)換為DataFrame
JavaRDD<WaterSensor> rowRDD = rdd.map(new Function<String, WaterSensor>() {
@Override
public WaterSensor call(String line) {
String[] cols = line.split(",");
WaterSensor waterSensor = new WaterSensor(cols[0],Long.parseLong(cols[1]),Integer.parseInt(cols[2]));
return waterSensor;
}
});
Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, WaterSensor.class);
// 創(chuàng)建臨時表
dataFrame.createOrReplaceTempView("log");
Dataset<Row> result = spark.sql("select * from log");
System.out.println("========= " + time + "=========");
//輸出前20條數(shù)據(jù)
result.show();
}
});
//開始作業(yè)
ssc.start();
try {
ssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
} finally {
ssc.close();
}
}
}
(5)效果演示:
代碼中定義的是1分鐘的批處理間隔祟身,所以每1分鐘會觸發(fā)一次計算: