(1)sparkstreaming結(jié)合sparksql讀取socket實時數(shù)據(jù)流

Spark Streaming是構(gòu)建在Spark Core的RDD基礎(chǔ)之上的,與此同時Spark Streaming引入了一個新的概念:DStream(Discretized Stream,離散化數(shù)據(jù)流),表示連續(xù)不斷的數(shù)據(jù)流带欢。DStream抽象是Spark Streaming的流處理模型漩绵,在內(nèi)部實現(xiàn)上鼻由,Spark Streaming會對輸入數(shù)據(jù)按照時間間隔(如1秒)分段寇仓,每一段數(shù)據(jù)轉(zhuǎn)換為Spark中的RDD,這些分段就是Dstream器联,并且對DStream的操作都最終轉(zhuǎn)變?yōu)閷ο鄳?yīng)的RDD的操作二汛。
Spark SQL 是 Spark 用于結(jié)構(gòu)化數(shù)據(jù)(structured data)處理的 Spark 模塊。Spark SQL 的前身是Shark拨拓,Shark是基于 Hive 所開發(fā)的工具肴颊,它修改了下圖所示的右下角的內(nèi)存管理、物理計劃渣磷、執(zhí)行三個模塊婿着,并使之能運行在 Spark 引擎上。


1.png

(1)pom依賴:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.11.11</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.3.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.3.1</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.66</version>
    </dependency>
</dependencies>

(2)定義消息對象

package com.pojo;

import java.io.Serializable;
import java.util.Date;

/**
 * Created by lj on 2022-07-13.
 */
public class WaterSensor implements Serializable {
    public String id;
    public long ts;
    public int vc;

    public WaterSensor(){

    }

    public WaterSensor(String id,long ts,int vc){
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }

    public int getVc() {
        return vc;
    }

    public void setVc(int vc) {
        this.vc = vc;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public long getTs() {
        return ts;
    }

    public void setTs(long ts) {
        this.ts = ts;
    }
}

(3)構(gòu)建數(shù)據(jù)生產(chǎn)者

package com.producers;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

/**
 * Created by lj on 2022-07-12.
 */
public class Socket_Producer {
    public static void main(String[] args) throws IOException {

        try {
            ServerSocket ss = new ServerSocket(9999);
            System.out.println("啟動 server ....");
            Socket s = ss.accept();
            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
            String response = "java,1,2";

            //每 2s 發(fā)送一次消息
            int i = 0;
            Random r=new Random();   //不傳入種子
            String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};

            while(true){
                response= lang[r.nextInt(lang.length)]+ i + "," + i + "," + i+"\n";
                System.out.println(response);
                try{
                    bw.write(response);
                    bw.flush();
                    i++;
                }catch (Exception ex){
                    System.out.println(ex.getMessage());
                }
                Thread.sleep(1000 * 30);
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

(4)通過sparkstreaming接入socket數(shù)據(jù)源醋界,sparksql計算結(jié)果打印輸出:

package com.examples;

import com.pojo.WaterSensor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
 * Created by lj on 2022-07-16.
 */
public class SparkSql_Socket1 {
    private static String appName = "spark.streaming.demo";
    private static String master = "local[*]";
    private static String host = "localhost";
    private static int port = 9999;

    public static void main(String[] args) {
        //初始化sparkConf
        SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);

        //獲得JavaStreamingContext
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));

        //從socket源獲取數(shù)據(jù)
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);

        //將 DStream 轉(zhuǎn)換成 DataFrame 并且運行sql查詢
        lines.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
            @Override
            public void call(JavaRDD<String> rdd, Time time) {
                SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());

                //通過反射將RDD轉(zhuǎn)換為DataFrame
                JavaRDD<WaterSensor> rowRDD = rdd.map(new Function<String, WaterSensor>() {
                    @Override
                    public WaterSensor call(String line) {
                        String[] cols = line.split(",");
                        WaterSensor waterSensor = new WaterSensor(cols[0],Long.parseLong(cols[1]),Integer.parseInt(cols[2]));
                        return waterSensor;
                    }
                });

                Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, WaterSensor.class);
                // 創(chuàng)建臨時表
                dataFrame.createOrReplaceTempView("log");
                Dataset<Row> result = spark.sql("select * from log");
                System.out.println("========= " + time + "=========");
                //輸出前20條數(shù)據(jù)
                result.show();
            }
        });

        //開始作業(yè)
        ssc.start();
        try {
            ssc.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ssc.close();
        }
    }
}

(5)效果演示:


2.png

代碼中定義的是1分鐘的批處理間隔祟身,所以每1分鐘會觸發(fā)一次計算:


3.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市物独,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌氯葬,老刑警劉巖挡篓,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡官研,警方通過查閱死者的電腦和手機(jī)秽澳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來戏羽,“玉大人担神,你說我怎么就攤上這事∈蓟ǎ” “怎么了妄讯?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長酷宵。 經(jīng)常有香客問我亥贸,道長,這世上最難降的妖魔是什么浇垦? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任炕置,我火速辦了婚禮,結(jié)果婚禮上男韧,老公的妹妹穿的比我還像新娘朴摊。我一直安慰自己,他們只是感情好此虑,可當(dāng)我...
    茶點故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布甚纲。 她就那樣靜靜地躺著,像睡著了一般寡壮。 火紅的嫁衣襯著肌膚如雪贩疙。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天况既,我揣著相機(jī)與錄音这溅,去河邊找鬼。 笑死棒仍,一個胖子當(dāng)著我的面吹牛悲靴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播莫其,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼癞尚,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了乱陡?” 一聲冷哼從身側(cè)響起浇揩,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎憨颠,沒想到半個月后胳徽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體积锅,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年养盗,在試婚紗的時候發(fā)現(xiàn)自己被綠了缚陷。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡往核,死狀恐怖箫爷,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情聂儒,我是刑警寧澤虎锚,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站薄货,受9級特大地震影響翁都,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜谅猾,卻給世界環(huán)境...
    茶點故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一柄慰、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧税娜,春花似錦坐搔、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至弧岳,卻和暖如春凳忙,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背禽炬。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工涧卵, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人腹尖。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓柳恐,卻偏偏與公主長得像,于是被迫代替她去往敵國和親热幔。 傳聞我的和親對象是個殘疾皇子乐设,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容