Spark系列 - 實(shí)時(shí)數(shù)倉之top3熱門廣告實(shí)戰(zhàn)(二)

??在之前的文章中我們使用 Flink 也實(shí)現(xiàn)過 topn 的案例怕轿;這里断傲,為了溫習(xí) Spark 如何訪問 kafka 以及 DStream 的操作肄满,我們實(shí)現(xiàn)一個(gè)需求:

需求:每天每地區(qū)熱門廣告 top3
一谴古、數(shù)據(jù)源
[root@cdh101 kafka]# bin/kafka-console-consumer.sh --bootstrap-server cdh101:9092,cdh102:9092,cdh103:9092 --topic luchangyin --from-beginning

運(yùn)行結(jié)果:
image.png
前置-引入依賴
<dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

    </dependencies>
二、代碼實(shí)現(xiàn)

2.1 消費(fèi)原始數(shù)據(jù) :

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.text.SimpleDateFormat
import java.util.Date

// 需求:每天每地區(qū)熱門廣告 top3
object RealTime_App01 {

  def main(args: Array[String]): Unit = {

    //創(chuàng)建配置文件對象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")
    //創(chuàng)建SparkStreaming執(zhí)行的上下文
    val ssc = new StreamingContext(conf, Seconds(3))

    //kafka參數(shù)聲明
    val brokers = "cdh101:9092,cdh102:9092,cdh103:9092"
    val topic = "luchangyin"
    val group = "cloudera_mirrormaker"
    val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
    val autooffsetreset = "latest"
    val kafkaParams = Map(
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.AUTO_OFFSET_RESET_DOC -> autooffsetreset,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization
    )

    //設(shè)置檢查點(diǎn)目錄
    ssc.checkpoint("D:\\MySoftware\\StudySoftware\\MyIdea\\luchangyin2021\\MyFirstBigScreen\\TestFSLJavaDemon\\src\\main\\ck1")

    //創(chuàng)建DS
    val kafkaDS: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String,String](Set(topic), kafkaParams)
    )

    //從kafka的kv值中取value     1616683286749,華東,上海,102,1
    val dataDS = kafkaDS.map(_.value())
    dataDS.print()
    
    # 2.2 從kafka獲取到的原始數(shù)據(jù)進(jìn)行轉(zhuǎn)換  ==>(天_地區(qū)_廣告,1)

    ssc.start()
    ssc.awaitTermination()
  }

}

輸出為:
image.png

2.2 從kafka獲取到的原始數(shù)據(jù)進(jìn)行轉(zhuǎn)換 :

  val mapDS: DStream[(String, Int)] =dataDS.map{
      line => {
        val fields: Array[String] = line.split(",")
        //獲取時(shí)間戳
        val timeStamp: Long = fields(0).toLong
        //根據(jù)時(shí)間戳創(chuàng)建日期對象
        val day = new Date(timeStamp)
        //創(chuàng)建SimpleDataFormat稠歉,對日期對象進(jìn)行轉(zhuǎn)換
        val sdf = new SimpleDateFormat("yyyy-MM-dd")
        //將日期對象轉(zhuǎn)換為字符串
        val dayStr: String = sdf.format(day)
        //獲取地區(qū)
        var area = fields(1)
        // 獲取廣告
        var adv = fields(4)
        //封裝元組
        (dayStr +"_"+ area +"_"+ adv, 1)
      }
    }

    mapDS.print()  //(2021-03-25_華東_4,1)

    # 2.3 對每天每地區(qū)廣告點(diǎn)擊數(shù)進(jìn)行聚合處理   (天_地區(qū)_廣告,sum)

數(shù)據(jù)結(jié)果為:
image.png

2.3 對每天每地區(qū)廣告點(diǎn)擊數(shù)進(jìn)行聚合處理:

    //對每天每地區(qū)廣告點(diǎn)擊數(shù)進(jìn)行聚合處理   (天_地區(qū)_廣告,sum)
    //注意:這里要統(tǒng)計(jì)的是一天的數(shù)據(jù)掰担,所以要將每一個(gè)采集周期的數(shù)據(jù)都統(tǒng)計(jì),需要傳遞狀態(tài)怒炸,所以要用udpateStateByKey
    val updateDS: DStream[(String, Int)] = mapDS.updateStateByKey(
      (seq: Seq[Int], buffer: Option[Int]) => {
        Option(seq.sum + buffer.getOrElse(0))
      }
    )

    updateDS.print()  //(2021-03-25_華東_3,138)
    
    # 2.4 將相同的天和地區(qū)放到一組

運(yùn)行結(jié)果:
image.png

2.4 將相同的天和地區(qū)放到一組 :

    //再次對結(jié)構(gòu)進(jìn)行轉(zhuǎn)換
    val mapDS1: DStream[(String, (String, Int))] = updateDS.map{
      case (k, sum) => {
        val fields: Array[String] = k.split("_")
        (fields(0) +"_"+ fields(1), (fields(2), sum))
      }
    }

    //mapDS1.print() //  (2021-03-25_華北,(1,98))

    //將相同的天和地區(qū)放到一組
    val groupDS: DStream[(String, Iterable[(String, Int)])] = mapDS1.groupByKey()
    groupDS.print() //(2021-03-25_華中,ArrayBuffer((1,18), (2,21), (3,20), (4,22), (5,23)))

  # 2.5 對分組中的數(shù)據(jù)進(jìn)行排序

運(yùn)行結(jié)果:
image.png

2.5 對分組中的數(shù)據(jù)進(jìn)行排序:

    // 對分組中的數(shù)據(jù)進(jìn)行排序
    val resDS: DStream[(String, List[(String, Int)])] = groupDS.mapValues{
      datas => {
        datas.toList.sortBy(-_._2).take(3)
      }
    }

    // 打印結(jié)果
    resDS.print() // (2021-03-25_華北,List((5,107), (1,96), (3,92)))

我們得到最終的結(jié)果:
image.png

?? 通過這個(gè)案例带饱,我們曉得了 Spark 在項(xiàng)目中的應(yīng)用,看代碼感覺 scala 還是比 Java 簡潔了好多,在實(shí)際Spark 的項(xiàng)目中 scala 還是占主導(dǎo)地位的勺疼,當(dāng)然這也要看公司的規(guī)范以及個(gè)人的習(xí)慣了教寂;好了,廢話不多說了执庐,多看幾遍多敲幾遍都會(huì)慢慢熟悉的酪耕,實(shí)踐出真知,come on 轨淌, 少年 ~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末迂烁,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子递鹉,更是在濱河造成了極大的恐慌盟步,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件躏结,死亡現(xiàn)場離奇詭異却盘,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)窜觉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進(jìn)店門谷炸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人禀挫,你說我怎么就攤上這事旬陡。” “怎么了语婴?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵描孟,是天一觀的道長。 經(jīng)常有香客問我砰左,道長匿醒,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任缠导,我火速辦了婚禮廉羔,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘僻造。我一直安慰自己憋他,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布髓削。 她就那樣靜靜地躺著竹挡,像睡著了一般。 火紅的嫁衣襯著肌膚如雪立膛。 梳的紋絲不亂的頭發(fā)上揪罕,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天梯码,我揣著相機(jī)與錄音,去河邊找鬼好啰。 笑死轩娶,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的坎怪。 我是一名探鬼主播罢坝,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼搅窿!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起隙券,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤男应,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后娱仔,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體沐飘,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年牲迫,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了耐朴。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,785評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡盹憎,死狀恐怖筛峭,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情陪每,我是刑警寧澤影晓,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站檩禾,受9級(jí)特大地震影響挂签,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜盼产,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一饵婆、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧戏售,春花似錦侨核、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至紧卒,卻和暖如春侥衬,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工轴总, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留直颅,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓怀樟,卻偏偏與公主長得像功偿,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子往堡,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容