java寫的程序饱苟,所以程序中使用的都是java的類和api,例如JavaStreamingContext弄匕,JavaReceiverInputDStream督函,JavaDStream。注意使用Java開頭的類嚣镜。
這個例子演示的是spark streaming接收socket數(shù)據(jù)爬迟。
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("test");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("127.0.0.1", 9999);
JavaDStream<String> flatMap = lines.flatMap((FlatMapFunction<String, String>) s -> {
String[] split = s.split(" ");
return Arrays.asList(split).iterator();
});
JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(
(PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)).reduceByKey(
(Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);
mapToPair.print();
jsc.start();
jsc.awaitTermination();
}
然后使用nc命令向9999端口發(fā)送數(shù)據(jù)。輸入命令nc -lk 9998菊匿,然后輸入數(shù)據(jù)即可雕旨。spark streaming程序就可以接收到數(shù)據(jù)了扮匠,spark streaming是把一段時間內(nèi)接收到的數(shù)據(jù)當(dāng)作一個批次,然后把這個批次轉(zhuǎn)為rdd凡涩,之后就是調(diào)用rdd的api進(jìn)行數(shù)據(jù)處理了棒搜。每個批次就執(zhí)行一次,rdd的轉(zhuǎn)換和輸出流程活箕。
注意:代碼中的.setMaster("local[4]").的含義力麸,該測試是在本地進(jìn)行的也就是local模式。下面這篇文件寫的很好:
https://blog.csdn.net/zpf336/article/details/82152286
StreamingListener
是spark提供的時間監(jiān)聽類育韩,實(shí)現(xiàn)該類的接口即可克蚂。
看一下源碼中都有哪些方法可以實(shí)現(xiàn):
trait StreamingListener {
/** Called when the streaming has been started */
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }
/** Called when a receiver has been started */
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
/** Called when a receiver has reported an error */
def onReceiverError(receiverError: StreamingListenerReceiverError) { }
/** Called when a receiver has been stopped */
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }
/** Called when a batch of jobs has been submitted for processing. */
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
/** Called when processing of a batch of jobs has started. */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
/** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
/** Called when processing of a job of a batch has started. */
def onOutputOperationStarted(
outputOperationStarted: StreamingListenerOutputOperationStarted) { }
/** Called when processing of a job of a batch has completed. */
def onOutputOperationCompleted(
outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}
下面用java實(shí)現(xiàn)一下該類,并測試這些方法:
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("test");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(15));
jsc.addStreamingListener(new TestListener());
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("127.0.0.1", 9998);
JavaDStream<String> flatMap = lines.flatMap((FlatMapFunction<String, String>) s -> {
String[] split = s.split(" ");
return Arrays.asList(split).iterator();
});
JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(
(PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)).reduceByKey(
(Function2<Integer, Integer, Integer>) Integer::sum);
mapToPair.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
@Override
public void call(JavaPairRDD<String, Integer> v1, Time v2) throws Exception {
System.out.println("pair:: " + v1.collect() + " time: " + v2);
}
}
);
// mapToPair.print();
jsc.start();
jsc.awaitTermination();
}
static class TestListener implements StreamingListener {
@Override
public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {
System.out.println("onStreamingStarted");
}
@Override
public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {
System.out.println("onReceiverStarted");
}
@Override
public void onReceiverError(StreamingListenerReceiverError receiverError) {
System.out.println("onReceiverError");
}
@Override
public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
System.out.println("onReceiverStopped");
}
@Override
public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {
System.out.println("onBatchSubmitted");
}
@Override
public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {
System.out.println("onBatchStarted");
}
@Override
public void onOutputOperationStarted(
StreamingListenerOutputOperationStarted outputOperationStarted) {
System.out.println("onOutputOperationStarted");
}
@Override
public void onOutputOperationCompleted(
StreamingListenerOutputOperationCompleted outputOperationCompleted) {
System.out.println("onOutputOperationCompleted");
}
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
BatchInfo batchInfo = batchCompleted.batchInfo();
Long batchTime = batchInfo.batchTime().milliseconds();
Long numRecords = batchInfo.numRecords();
Long totalDelay = (Long) Optional.ofNullable(batchInfo.totalDelay().getOrElse(null))
.orElse(-1L);
Long submissionTime = batchInfo.submissionTime();
Long processingDelay = (Long) Optional
.ofNullable(batchInfo.processingDelay().getOrElse(null)).orElse(-1L);
System.out.println("batchInfo :: " + batchInfo);
}
}
輸出:
19:04:08: onStreamingStarted
19:04:08: onReceiverStarted
19:04:15: onBatchSubmitted
19:04:15: onBatchStarted
19:04:15: onOutputOperationStarted
19:04:15: pair:: [(,1), (a,2), (v,1), (c,2)] time:
19:04:15: onOutputOperationCompleted
19:04:15: batchInfo :: BatchInfo(1592996655000 ms,Map(0 -> StreamInputInfo(0,6,Map())),1592996655039,Some(1592996655044),Some(1592996655271),Map(0 -> OutputOperationInfo(1592996655000 ms,0,foreachRDD at DStreamingTest.java:56,org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42)
SparkLockTest.DStreamingTest.main(DStreamingTest.java:56),Some(1592996655044),Some(1592996655271),None)))
19:04:30: onBatchSubmitted
19:04:30: onBatchStarted
19:04:30: onOutputOperationStarted
19:04:30: pair:: [(d,1), (,1), (a,1), (b,1), (c,1)] time:
19:04:30: onOutputOperationCompleted
19:04:30: batchInfo :: BatchInfo(1592996670000 ms,Map(0 -> StreamInputInfo(0,5,Map())),1592996670014,Some(1592996670015),Some(1592996670068),Map(0 -> OutputOperationInfo(1592996670000 ms,0,foreachRDD at DStreamingTest.java:56,org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42)
SparkLockTest.DStreamingTest.main(DStreamingTest.java:56),Some(1592996670015),Some(1592996670068),None)))
19:04:38: onReceiverStopped
可以看到在spark啟動時調(diào)用筋讨,onStreamingStarted和onReceiverStarted方法埃叭,表示啟動和開始接收數(shù)據(jù);一個批次結(jié)束時(該例子中也就是15秒時)調(diào)用onBatchSubmitted和onBatchStarted悉罕,表示批次開始和批次執(zhí)行赤屋;onOutputOperationStarted和onOutputOperationCompleted表示輸出開始和輸出結(jié)束,之間就是對數(shù)據(jù)的處理壁袄,在該例子中是數(shù)據(jù)打永嘣纭;當(dāng)整個批次完成時調(diào)用onBatchCompleted嗜逻;如果整個程序結(jié)束那么調(diào)用onReceiverStopped涩僻。