spark streaming java簡單demo

java寫的程序饱苟,所以程序中使用的都是java的類和api,例如JavaStreamingContext弄匕,JavaReceiverInputDStream督函,JavaDStream。注意使用Java開頭的類嚣镜。
這個例子演示的是spark streaming接收socket數(shù)據(jù)爬迟。

    public static void main(String[] args) throws InterruptedException {
        SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("test");
        JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
        JavaReceiverInputDStream<String> lines = jsc.socketTextStream("127.0.0.1", 9999);
        JavaDStream<String> flatMap = lines.flatMap((FlatMapFunction<String, String>) s -> {
            String[] split = s.split(" ");
            return Arrays.asList(split).iterator();
        });

        JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(
                (PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)).reduceByKey(
                (Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);
        mapToPair.print();

        jsc.start();
        jsc.awaitTermination();

    }

然后使用nc命令向9999端口發(fā)送數(shù)據(jù)。輸入命令nc -lk 9998菊匿,然后輸入數(shù)據(jù)即可雕旨。spark streaming程序就可以接收到數(shù)據(jù)了扮匠,spark streaming是把一段時間內(nèi)接收到的數(shù)據(jù)當(dāng)作一個批次,然后把這個批次轉(zhuǎn)為rdd凡涩,之后就是調(diào)用rdd的api進(jìn)行數(shù)據(jù)處理了棒搜。每個批次就執(zhí)行一次,rdd的轉(zhuǎn)換和輸出流程活箕。
注意:代碼中的.setMaster("local[4]").的含義力麸,該測試是在本地進(jìn)行的也就是local模式。下面這篇文件寫的很好:
https://blog.csdn.net/zpf336/article/details/82152286

StreamingListener
是spark提供的時間監(jiān)聽類育韩,實(shí)現(xiàn)該類的接口即可克蚂。
看一下源碼中都有哪些方法可以實(shí)現(xiàn):

trait StreamingListener {

  /** Called when the streaming has been started */
  def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }

  /** Called when a receiver has been started */
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }

  /** Called when a receiver has reported an error */
  def onReceiverError(receiverError: StreamingListenerReceiverError) { }

  /** Called when a receiver has been stopped */
  def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }

  /** Called when a batch of jobs has been submitted for processing. */
  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }

  /** Called when processing of a batch of jobs has started.  */
  def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }

  /** Called when processing of a batch of jobs has completed. */
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }

  /** Called when processing of a job of a batch has started. */
  def onOutputOperationStarted(
      outputOperationStarted: StreamingListenerOutputOperationStarted) { }

  /** Called when processing of a job of a batch has completed. */
  def onOutputOperationCompleted(
      outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}

下面用java實(shí)現(xiàn)一下該類,并測試這些方法:

public static void main(String[] args) throws InterruptedException {
        SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("test");
        JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(15));
        jsc.addStreamingListener(new TestListener());
        JavaReceiverInputDStream<String> lines = jsc.socketTextStream("127.0.0.1", 9998);
        JavaDStream<String> flatMap = lines.flatMap((FlatMapFunction<String, String>) s -> {
            String[] split = s.split(" ");
            return Arrays.asList(split).iterator();
        });

        JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(
                (PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)).reduceByKey(
                (Function2<Integer, Integer, Integer>) Integer::sum);
        mapToPair.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
                                 @Override
                                 public void call(JavaPairRDD<String, Integer> v1, Time v2) throws Exception {
                                     System.out.println("pair:: " + v1.collect() + " time: " + v2);

                                 }
                             }
        );
//        mapToPair.print();

        jsc.start();
        jsc.awaitTermination();

    }

    static class TestListener implements StreamingListener {

        @Override
        public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {
            System.out.println("onStreamingStarted");
        }

        @Override
        public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {
            System.out.println("onReceiverStarted");
        }

        @Override
        public void onReceiverError(StreamingListenerReceiverError receiverError) {
            System.out.println("onReceiverError");
        }

        @Override
        public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
            System.out.println("onReceiverStopped");
        }

        @Override
        public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {
            System.out.println("onBatchSubmitted");
        }

        @Override
        public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {
            System.out.println("onBatchStarted");
        }

        @Override
        public void onOutputOperationStarted(
                StreamingListenerOutputOperationStarted outputOperationStarted) {
            System.out.println("onOutputOperationStarted");
        }

        @Override
        public void onOutputOperationCompleted(
                StreamingListenerOutputOperationCompleted outputOperationCompleted) {
            System.out.println("onOutputOperationCompleted");
        }

        @Override
        public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
            BatchInfo batchInfo = batchCompleted.batchInfo();
            Long batchTime = batchInfo.batchTime().milliseconds();
            Long numRecords = batchInfo.numRecords();
            Long totalDelay = (Long) Optional.ofNullable(batchInfo.totalDelay().getOrElse(null))
                    .orElse(-1L);
            Long submissionTime = batchInfo.submissionTime();
            Long processingDelay = (Long) Optional
                    .ofNullable(batchInfo.processingDelay().getOrElse(null)).orElse(-1L);
            System.out.println("batchInfo :: " + batchInfo);
        }
    }

輸出:

19:04:08: onStreamingStarted
19:04:08: onReceiverStarted
19:04:15: onBatchSubmitted
19:04:15: onBatchStarted
19:04:15: onOutputOperationStarted
19:04:15: pair:: [(,1), (a,2), (v,1), (c,2)] time: 
19:04:15: onOutputOperationCompleted
19:04:15: batchInfo :: BatchInfo(1592996655000 ms,Map(0 -> StreamInputInfo(0,6,Map())),1592996655039,Some(1592996655044),Some(1592996655271),Map(0 -> OutputOperationInfo(1592996655000 ms,0,foreachRDD at DStreamingTest.java:56,org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42)
SparkLockTest.DStreamingTest.main(DStreamingTest.java:56),Some(1592996655044),Some(1592996655271),None)))

19:04:30: onBatchSubmitted
19:04:30: onBatchStarted
19:04:30: onOutputOperationStarted
19:04:30: pair:: [(d,1), (,1), (a,1), (b,1), (c,1)] time: 
19:04:30: onOutputOperationCompleted
19:04:30: batchInfo :: BatchInfo(1592996670000 ms,Map(0 -> StreamInputInfo(0,5,Map())),1592996670014,Some(1592996670015),Some(1592996670068),Map(0 -> OutputOperationInfo(1592996670000 ms,0,foreachRDD at DStreamingTest.java:56,org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42)
SparkLockTest.DStreamingTest.main(DStreamingTest.java:56),Some(1592996670015),Some(1592996670068),None)))
19:04:38: onReceiverStopped

可以看到在spark啟動時調(diào)用筋讨,onStreamingStarted和onReceiverStarted方法埃叭,表示啟動和開始接收數(shù)據(jù);一個批次結(jié)束時(該例子中也就是15秒時)調(diào)用onBatchSubmitted和onBatchStarted悉罕,表示批次開始和批次執(zhí)行赤屋;onOutputOperationStarted和onOutputOperationCompleted表示輸出開始和輸出結(jié)束,之間就是對數(shù)據(jù)的處理壁袄,在該例子中是數(shù)據(jù)打永嘣纭;當(dāng)整個批次完成時調(diào)用onBatchCompleted嗜逻;如果整個程序結(jié)束那么調(diào)用onReceiverStopped涩僻。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市栈顷,隨后出現(xiàn)的幾起案子逆日,更是在濱河造成了極大的恐慌,老刑警劉巖萄凤,帶你破解...
    沈念sama閱讀 222,729評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件屏富,死亡現(xiàn)場離奇詭異,居然都是意外死亡蛙卤,警方通過查閱死者的電腦和手機(jī)狠半,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,226評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來颤难,“玉大人神年,你說我怎么就攤上這事⌒朽停” “怎么了已日?”我有些...
    開封第一講書人閱讀 169,461評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長栅屏。 經(jīng)常有香客問我飘千,道長堂鲜,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,135評論 1 300
  • 正文 為了忘掉前任护奈,我火速辦了婚禮缔莲,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘霉旗。我一直安慰自己痴奏,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,130評論 6 398
  • 文/花漫 我一把揭開白布厌秒。 她就那樣靜靜地躺著读拆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪鸵闪。 梳的紋絲不亂的頭發(fā)上檐晕,一...
    開封第一講書人閱讀 52,736評論 1 312
  • 那天,我揣著相機(jī)與錄音蚌讼,去河邊找鬼辟灰。 笑死,一個胖子當(dāng)著我的面吹牛啦逆,可吹牛的內(nèi)容都是我干的伞矩。 我是一名探鬼主播笛洛,決...
    沈念sama閱讀 41,179評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼夏志,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了苛让?” 一聲冷哼從身側(cè)響起沟蔑,我...
    開封第一講書人閱讀 40,124評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎狱杰,沒想到半個月后瘦材,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,657評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡仿畸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,723評論 3 342
  • 正文 我和宋清朗相戀三年食棕,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片错沽。...
    茶點(diǎn)故事閱讀 40,872評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡簿晓,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出千埃,到底是詐尸還是另有隱情憔儿,我是刑警寧澤,帶...
    沈念sama閱讀 36,533評論 5 351
  • 正文 年R本政府宣布放可,位于F島的核電站谒臼,受9級特大地震影響朝刊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蜈缤,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,213評論 3 336
  • 文/蒙蒙 一拾氓、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧劫樟,春花似錦痪枫、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,700評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至附较,卻和暖如春吃粒,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背拒课。 一陣腳步聲響...
    開封第一講書人閱讀 33,819評論 1 274
  • 我被黑心中介騙來泰國打工徐勃, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人早像。 一個月前我還...
    沈念sama閱讀 49,304評論 3 379
  • 正文 我出身青樓僻肖,卻偏偏與公主長得像,于是被迫代替她去往敵國和親卢鹦。 傳聞我的和親對象是個殘疾皇子臀脏,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,876評論 2 361