Spark Streaming(五):與Spark SQL整合

Spark Streaming最強(qiáng)大的地方在于尘颓,可以與Spark Core佃声、Spark SQL整合使用菲嘴,之前已經(jīng)通過(guò)transform虏辫、foreachRDD等算子看到蚌吸,如何將DStream中的RDD使用Spark Core執(zhí)行批處理操作。現(xiàn)在就來(lái)看看乒裆,如何將DStream中的RDD與Spark SQL結(jié)合起來(lái)使用套利。

Demo:每隔10秒推励,統(tǒng)計(jì)最近60秒的鹤耍,每個(gè)種類的每個(gè)商品的點(diǎn)擊次數(shù),然后統(tǒng)計(jì)出每個(gè)種類top3熱門的商品验辞。

package cn.spark.study.streaming;

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

/**
 * 與Spark SQL整合使用稿黄,top3熱門商品實(shí)時(shí)統(tǒng)計(jì)
 */
public class Top3HotProduct {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setMaster("local[2]")
                .setAppName("Top3HotProduct");  
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
        
        // 輸入日志的格式
        // leo iphone mobile_phone
        
        // 首先,獲取輸入數(shù)據(jù)流               
        JavaReceiverInputDStream<String> productClickLogsDStream = jssc.socketTextStream("hadoop1", 9999);
        
        // 然后跌造,應(yīng)該是做一個(gè)映射杆怕,將每個(gè)種類的每個(gè)商品,映射為(category_product, 1)的這種格式
        // 從而在后面可以使用window操作壳贪,對(duì)窗口中的這種格式的數(shù)據(jù)陵珍,進(jìn)行reduceByKey操作
        // 從而統(tǒng)計(jì)出來(lái),一個(gè)窗口中的每個(gè)種類的每個(gè)商品的违施,點(diǎn)擊次數(shù)
        JavaPairDStream<String, Integer> categoryProductPairsDStream = productClickLogsDStream
                .mapToPair(new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String productClickLog)
                            throws Exception {
                        String[] productClickLogSplited = productClickLog.split(" "); 
                        return new Tuple2<String, Integer>(productClickLogSplited[2] + "_" + 
                                productClickLogSplited[1], 1);
                    }
                    
                });
        
        // 然后執(zhí)行window操作
        // 到這里互纯,就可以做到,每隔10秒鐘磕蒲,對(duì)最近60秒的數(shù)據(jù)留潦,執(zhí)行reduceByKey操作
        // 計(jì)算出來(lái)這60秒內(nèi),每個(gè)種類的每個(gè)商品的點(diǎn)擊次數(shù)
        JavaPairDStream<String, Integer> categoryProductCountsDStream = 
                categoryProductPairsDStream.reduceByKeyAndWindow(
                        
                        new Function2<Integer, Integer, Integer>() {

                            private static final long serialVersionUID = 1L;
                
                            @Override
                            public Integer call(Integer v1, Integer v2) throws Exception {
                                return v1 + v2;
                            }
                            
                        }, Durations.seconds(60), Durations.seconds(10));  
        
        // 然后針對(duì)60秒內(nèi)的每個(gè)種類的每個(gè)商品的點(diǎn)擊次數(shù)
        // foreachRDD辣往,在內(nèi)部兔院,使用Spark SQL執(zhí)行top3熱門商品的統(tǒng)計(jì)
        categoryProductCountsDStream.foreachRDD(new Function<JavaPairRDD<String,Integer>, Void>() {
            
            private static final long serialVersionUID = 1L;

            @Override
            public Void call(JavaPairRDD<String, Integer> categoryProductCountsRDD) throws Exception {
                // 將該RDD,轉(zhuǎn)換為JavaRDD<Row>的格式
                JavaRDD<Row> categoryProductCountRowRDD = categoryProductCountsRDD.map(
                        
                        new Function<Tuple2<String,Integer>, Row>() {

                            private static final long serialVersionUID = 1L;

                            @Override
                            public Row call(Tuple2<String, Integer> categoryProductCount)
                                    throws Exception {
                                String category = categoryProductCount._1.split("_")[0];
                                String product = categoryProductCount._1.split("_")[1];
                                Integer count = categoryProductCount._2;
                                return RowFactory.create(category, product, count);   
                            }
                            
                        });
                
                // 然后站削,執(zhí)行DataFrame轉(zhuǎn)換
                List<StructField> structFields = new ArrayList<StructField>();
                structFields.add(DataTypes.createStructField("category", DataTypes.StringType, true)); 
                structFields.add(DataTypes.createStructField("product", DataTypes.StringType, true));  
                structFields.add(DataTypes.createStructField("click_count", DataTypes.IntegerType, true));  
                StructType structType = DataTypes.createStructType(structFields);
                
                HiveContext hiveContext = new HiveContext(categoryProductCountsRDD.context());
                
                DataFrame categoryProductCountDF = hiveContext.createDataFrame(
                        categoryProductCountRowRDD, structType);
                
                // 將60秒內(nèi)的每個(gè)種類的每個(gè)商品的點(diǎn)擊次數(shù)的數(shù)據(jù)坊萝,注冊(cè)為一個(gè)臨時(shí)表
                categoryProductCountDF.registerTempTable("product_click_log");  
                
                // 執(zhí)行SQL語(yǔ)句,針對(duì)臨時(shí)表,統(tǒng)計(jì)出來(lái)每個(gè)種類下屹堰,點(diǎn)擊次數(shù)排名前3的熱門商品
                DataFrame top3ProductDF = hiveContext.sql(
                        "SELECT category,product,click_count "
                        + "FROM ("
                            + "SELECT "
                                + "category,"
                                + "product,"
                                + "click_count,"
                                + "row_number() OVER (PARTITION BY category ORDER BY click_count DESC) rank "
                            + "FROM product_click_log"  
                        + ") tmp "
                        + "WHERE rank<=3");
                

                // 接下來(lái)應(yīng)該將數(shù)據(jù)保存到redis緩存肛冶、或者是mysql db中
                // 然后,配合一個(gè)J2EE系統(tǒng)扯键,進(jìn)行數(shù)據(jù)的展示和查詢睦袖、圖形報(bào)表
                
                top3ProductDF.show();      
                
                return null;
            }
            
        });
        
        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }   
}
package cn.spark.study.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.hive.HiveContext


object Top3HotProduct {
  
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
        .setMaster("local[2]")  
        .setAppName("Top3HotProduct")
    val ssc = new StreamingContext(conf, Seconds(1))
    
    val productClickLogsDStream = ssc.socketTextStream("spark1", 9999)  
    val categoryProductPairsDStream = productClickLogsDStream
        .map { productClickLog => (productClickLog.split(" ")(2) + "_" + productClickLog.split(" ")(1), 1)}
    val categoryProductCountsDStream = categoryProductPairsDStream.reduceByKeyAndWindow(
        (v1: Int, v2: Int) => v1 + v2, 
        Seconds(60), 
        Seconds(10))  
    
    categoryProductCountsDStream.foreachRDD(categoryProductCountsRDD => {
      val categoryProductCountRowRDD = categoryProductCountsRDD.map(tuple => {
        val category = tuple._1.split("_")(0)
        val product = tuple._1.split("_")(1)  
        val count = tuple._2
        Row(category, product, count)  
      })
      
      val structType = StructType(Array(
          StructField("category", StringType, true),
          StructField("product", StringType, true),
          StructField("click_count", IntegerType, true)))
          
      val hiveContext = new HiveContext(categoryProductCountsRDD.context)
      
      val categoryProductCountDF = hiveContext.createDataFrame(categoryProductCountRowRDD, structType)  
      
      categoryProductCountDF.registerTempTable("product_click_log")  
      
      val top3ProductDF = hiveContext.sql(
            "SELECT category,product,click_count "
            + "FROM ("
              + "SELECT "
                + "category,"
                + "product,"
                + "click_count,"
                + "row_number() OVER (PARTITION BY category ORDER BY click_count DESC) rank "
              + "FROM product_click_log"  
            + ") tmp "
            + "WHERE rank<=3")
            
      top3ProductDF.show()
    })
    
    ssc.start()
    ssc.awaitTermination()
  } 
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市荣刑,隨后出現(xiàn)的幾起案子馅笙,更是在濱河造成了極大的恐慌,老刑警劉巖厉亏,帶你破解...
    沈念sama閱讀 218,036評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件董习,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡爱只,警方通過(guò)查閱死者的電腦和手機(jī)皿淋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)恬试,“玉大人窝趣,你說(shuō)我怎么就攤上這事⊙挡瘢” “怎么了哑舒?”我有些...
    開(kāi)封第一講書人閱讀 164,411評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)幻馁。 經(jīng)常有香客問(wèn)我洗鸵,道長(zhǎng),這世上最難降的妖魔是什么仗嗦? 我笑而不...
    開(kāi)封第一講書人閱讀 58,622評(píng)論 1 293
  • 正文 為了忘掉前任膘滨,我火速辦了婚禮,結(jié)果婚禮上稀拐,老公的妹妹穿的比我還像新娘火邓。我一直安慰自己,他們只是感情好钩蚊,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,661評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布贡翘。 她就那樣靜靜地躺著,像睡著了一般砰逻。 火紅的嫁衣襯著肌膚如雪鸣驱。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 51,521評(píng)論 1 304
  • 那天蝠咆,我揣著相機(jī)與錄音踊东,去河邊找鬼北滥。 笑死,一個(gè)胖子當(dāng)著我的面吹牛闸翅,可吹牛的內(nèi)容都是我干的再芋。 我是一名探鬼主播,決...
    沈念sama閱讀 40,288評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼坚冀,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼济赎!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起记某,我...
    開(kāi)封第一講書人閱讀 39,200評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤司训,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后液南,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體壳猜,經(jīng)...
    沈念sama閱讀 45,644評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,837評(píng)論 3 336
  • 正文 我和宋清朗相戀三年滑凉,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了统扳。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,953評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡畅姊,死狀恐怖咒钟,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情涡匀,我是刑警寧澤盯腌,帶...
    沈念sama閱讀 35,673評(píng)論 5 346
  • 正文 年R本政府宣布溉知,位于F島的核電站陨瘩,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏级乍。R本人自食惡果不足惜舌劳,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,281評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望玫荣。 院中可真熱鬧甚淡,春花似錦、人聲如沸捅厂。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,889評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)焙贷。三九已至撵割,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間辙芍,已是汗流浹背啡彬。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,011評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工羹与, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人庶灿。 一個(gè)月前我還...
    沈念sama閱讀 48,119評(píng)論 3 370
  • 正文 我出身青樓纵搁,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親往踢。 傳聞我的和親對(duì)象是個(gè)殘疾皇子腾誉,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,901評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容