Java Spark 簡單示例(六)Spark Streaming Window

大數(shù)據(jù)學(xué)習(xí)交流微信群

前兩天分享的Flink 學(xué)習(xí)筆記中有介紹滾動窗口滑動窗口Spark Streaming也是支持的昔头。
Java Spark 簡單示例(五)Spark Streaming 演示了Spark Streaming的常規(guī)用法就是滾動窗口。我們設(shè)置了批處理的時(shí)間長度芯肤,Spark 默認(rèn)每隔一段時(shí)間滾動一次窗口脓匿,窗口之間不存在重復(fù)數(shù)據(jù)。

//批處理時(shí)間舱卡,即一個滾動窗口的長度辅肾,滾動間隔等于該長度
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));

本篇將結(jié)合官方文檔基于上一篇demo演示如何實(shí)現(xiàn)滑動窗口

官網(wǎng)介紹: window(windowLength, slideInterval) Return a new DStream which is computed based on windowed batches of the source DStream.其中windowLength表示滑動窗口的長度,slideInterval表示滑動間隔轮锥。windowLengthslideInterval 必須是批處理時(shí)間的整數(shù)倍矫钓,即上述定義的3s的整數(shù)倍.slideInterval不填默認(rèn)是批處理時(shí)間長度即上述定義的3s.

package com.yzy.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class demo7 {
    private static String appName = "spark.streaming.demo";
    private static String master = "local[*]";
    private static String host = "localhost";
    private static int port = 9999;

    public static void main(String[] args) {
        //初始化sparkConf
        SparkConf sparkConf = SparkConfig.getSparkConf().setMaster(master).setAppName(appName);

        //獲得JavaStreamingContext
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));

        //從socket源獲取數(shù)據(jù)
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);

        //拆分行成單詞
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });

        //調(diào)用window函數(shù),生成新的DStream舍杜,每隔3秒聚合過去6秒內(nèi)的源數(shù)據(jù)新娜,滑動間隔不填默認(rèn)3秒
        //等價(jià)于words.window(Durations.seconds(6),Durations.seconds(3));
        JavaDStream<String> newWords = words.window(Durations.seconds(6));

        //計(jì)算每個單詞出現(xiàn)的個數(shù)
        JavaPairDStream<String, Integer> wordCounts = newWords.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        //輸出結(jié)果
        wordCounts.print();

        //開始作業(yè)
        ssc.start();
        try {
            ssc.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ssc.close();
        }
    }
}

輸出結(jié)果:

-------------------------------------------
Time: 1537934565000 ms
-------------------------------------------
(spark,1)
(1,1)
(test,1)
(streaming,1)

-------------------------------------------
Time: 1537934568000 ms
-------------------------------------------
(spark,4)
(1,1)
(2,1)
(3,1)
(4,1)
(test,4)
(streaming,4)

-------------------------------------------
Time: 1537934571000 ms
-------------------------------------------
(spark,6)
(2,1)
(3,1)
(4,1)
(5,1)
(6,1)
(7,1)
(test,6)
(streaming,6)

-------------------------------------------
Time: 1537934574000 ms
-------------------------------------------
(spark,6)
(10,1)
(5,1)
(6,1)
(7,1)
(8,1)
(9,1)
(test,6)
(streaming,6)

-------------------------------------------
Time: 1537934577000 ms
-------------------------------------------
(spark,6)
(10,1)
(11,1)
(12,1)
(13,1)
(8,1)
(9,1)
(test,6)
(streaming,6)
....

除了調(diào)用window()來轉(zhuǎn)化Dstream,還可以直接調(diào)用reduceByKeyAndWindow()函數(shù)既绩,使聚合函數(shù)按照滑動窗口來執(zhí)行概龄。如下:

//.....省略
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s,1);
            }
        });

JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }, Durations.seconds(6), Durations.seconds(3));//用法相同

windowedWordCounts.print();
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市饲握,隨后出現(xiàn)的幾起案子旁钧,更是在濱河造成了極大的恐慌吸重,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件歪今,死亡現(xiàn)場離奇詭異嚎幸,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)寄猩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進(jìn)店門嫉晶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人田篇,你說我怎么就攤上這事替废。” “怎么了泊柬?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵椎镣,是天一觀的道長。 經(jīng)常有香客問我兽赁,道長状答,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任刀崖,我火速辦了婚禮惊科,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘亮钦。我一直安慰自己馆截,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布蜂莉。 她就那樣靜靜地躺著蜡娶,像睡著了一般。 火紅的嫁衣襯著肌膚如雪映穗。 梳的紋絲不亂的頭發(fā)上窖张,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天,我揣著相機(jī)與錄音男公,去河邊找鬼。 笑死合陵,一個胖子當(dāng)著我的面吹牛枢赔,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播拥知,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼踏拜,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了低剔?” 一聲冷哼從身側(cè)響起速梗,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤肮塞,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后姻锁,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體枕赵,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年位隶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了拷窜。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡涧黄,死狀恐怖篮昧,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情笋妥,我是刑警寧澤懊昨,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站春宣,受9級特大地震影響酵颁,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜信认,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一材义、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧嫁赏,春花似錦其掂、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至攘乒,卻和暖如春贤牛,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背则酝。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工殉簸, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人沽讹。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓般卑,卻偏偏與公主長得像,于是被迫代替她去往敵國和親爽雄。 傳聞我的和親對象是個殘疾皇子蝠检,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容