前兩天分享的Flink 學(xué)習(xí)筆記
中有介紹滾動窗口
和滑動窗口
。Spark Streaming
也是支持的昔头。
在 Java Spark 簡單示例(五)Spark Streaming 演示了Spark Streaming
的常規(guī)用法就是滾動窗口
。我們設(shè)置了批處理的時(shí)間長度芯肤,Spark 默認(rèn)每隔一段時(shí)間滾動一次窗口脓匿,窗口之間不存在重復(fù)數(shù)據(jù)。
//批處理時(shí)間舱卡,即一個滾動窗口的長度辅肾,滾動間隔等于該長度
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));
本篇將結(jié)合官方文檔基于上一篇demo
演示如何實(shí)現(xiàn)滑動窗口
官網(wǎng)介紹: window(windowLength, slideInterval) Return a new DStream which is computed based on windowed batches of the source DStream.其中windowLength表示滑動窗口的長度,slideInterval表示滑動間隔轮锥。windowLength 和slideInterval 必須是批處理時(shí)間的整數(shù)倍矫钓,即上述定義的
3s
的整數(shù)倍.slideInterval不填默認(rèn)是批處理時(shí)間長度即上述定義的3s
.
package com.yzy.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class demo7 {
private static String appName = "spark.streaming.demo";
private static String master = "local[*]";
private static String host = "localhost";
private static int port = 9999;
public static void main(String[] args) {
//初始化sparkConf
SparkConf sparkConf = SparkConfig.getSparkConf().setMaster(master).setAppName(appName);
//獲得JavaStreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));
//從socket源獲取數(shù)據(jù)
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);
//拆分行成單詞
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//調(diào)用window函數(shù),生成新的DStream舍杜,每隔3秒聚合過去6秒內(nèi)的源數(shù)據(jù)新娜,滑動間隔不填默認(rèn)3秒
//等價(jià)于words.window(Durations.seconds(6),Durations.seconds(3));
JavaDStream<String> newWords = words.window(Durations.seconds(6));
//計(jì)算每個單詞出現(xiàn)的個數(shù)
JavaPairDStream<String, Integer> wordCounts = newWords.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
//輸出結(jié)果
wordCounts.print();
//開始作業(yè)
ssc.start();
try {
ssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
} finally {
ssc.close();
}
}
}
輸出結(jié)果:
-------------------------------------------
Time: 1537934565000 ms
-------------------------------------------
(spark,1)
(1,1)
(test,1)
(streaming,1)
-------------------------------------------
Time: 1537934568000 ms
-------------------------------------------
(spark,4)
(1,1)
(2,1)
(3,1)
(4,1)
(test,4)
(streaming,4)
-------------------------------------------
Time: 1537934571000 ms
-------------------------------------------
(spark,6)
(2,1)
(3,1)
(4,1)
(5,1)
(6,1)
(7,1)
(test,6)
(streaming,6)
-------------------------------------------
Time: 1537934574000 ms
-------------------------------------------
(spark,6)
(10,1)
(5,1)
(6,1)
(7,1)
(8,1)
(9,1)
(test,6)
(streaming,6)
-------------------------------------------
Time: 1537934577000 ms
-------------------------------------------
(spark,6)
(10,1)
(11,1)
(12,1)
(13,1)
(8,1)
(9,1)
(test,6)
(streaming,6)
....
除了調(diào)用window()
來轉(zhuǎn)化Dstream
,還可以直接調(diào)用reduceByKeyAndWindow()
函數(shù)既绩,使聚合函數(shù)按照滑動窗口來執(zhí)行概龄。如下:
//.....省略
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s,1);
}
});
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}, Durations.seconds(6), Durations.seconds(3));//用法相同
windowedWordCounts.print();