與Spark SQL結(jié)合使用
Spark Streaming最強(qiáng)大的地方在于史隆,可以與Spark Core、Spark SQL整合使用集币,之前已經(jīng)通過transform考阱、foreachRDD等算子看到,如何將DStream中的RDD使用Spark Core執(zhí)行批處理操作』菰常現(xiàn)在就來看看羔砾,如何將DStream中的RDD與Spark SQL結(jié)合起來使用。
案例:每隔10秒偶妖,統(tǒng)計(jì)最近60秒的姜凄,每個(gè)種類的每個(gè)商品的點(diǎn)擊次數(shù),然后統(tǒng)計(jì)出每個(gè)種類top3熱門的商品趾访。
Java版本
public class Top3HotProduct {
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "hadoop");
SparkConf conf = new SparkConf().setAppName("Top3HotProductJava").setMaster("local[2]");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(10));
streamingContext.checkpoint("hdfs://hadoop-100:9000/streamingCheckpoint");
JavaReceiverInputDStream<String> productVisitDstream = streamingContext.socketTextStream("hadoop-100", 10000);
JavaPairDStream<String, Integer> productVisitNumsDstream = productVisitDstream.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
String[] strings = s.split(" ");
return new Tuple2<>(strings[1] + "_" + strings[2], 1);
}
});
JavaPairDStream<String, Integer> tempResultDstream = productVisitNumsDstream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}, Durations.seconds(60), Durations.seconds(10));
tempResultDstream.foreachRDD(new Function<JavaPairRDD<String, Integer>, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> productVisitRDD) throws Exception {
JavaRDD<Row> productVisitRowRDD = productVisitRDD.map(new Function<Tuple2<String, Integer>, Row>() {
@Override
public Row call(Tuple2<String, Integer> v1) throws Exception {
return RowFactory.create(v1._1.split("_")[0], v1._1.split("_")[1], v1._2);
}
});
List<StructField> fieldList = new ArrayList<StructField>();
fieldList.add(DataTypes.createStructField("category", DataTypes.StringType, true));
fieldList.add(DataTypes.createStructField("product", DataTypes.StringType, true));
fieldList.add(DataTypes.createStructField("visit", DataTypes.IntegerType, true));
StructType structType = DataTypes.createStructType(fieldList);
HiveContext hiveContext = new HiveContext(productVisitRDD.context());
DataFrame productVisitDF = hiveContext.createDataFrame(productVisitRowRDD, structType);
productVisitDF.show();
productVisitDF.registerTempTable("product_visit");
DataFrame top3DF = hiveContext.sql("select category, product, visit " +
"from ( " +
"select category, product, visit, " +
"row_number() over(partition by category order by visit desc) rank " +
"from product_visit " +
") tmp " +
"where rank < 4");
top3DF.show();
return null;
}
});
streamingContext.start();
streamingContext.awaitTermination();
streamingContext.close();
}
}
Scala版本