??在之前的文章中我們使用 Flink 也實(shí)現(xiàn)過 topn 的案例怕轿;這里断傲,為了溫習(xí) Spark 如何訪問 kafka 以及 DStream 的操作肄满,我們實(shí)現(xiàn)一個(gè)需求:
需求:每天每地區(qū)熱門廣告 top3
一谴古、數(shù)據(jù)源
[root@cdh101 kafka]# bin/kafka-console-consumer.sh --bootstrap-server cdh101:9092,cdh102:9092,cdh103:9092 --topic luchangyin --from-beginning
運(yùn)行結(jié)果:image.png
前置-引入依賴
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
二、代碼實(shí)現(xiàn)
2.1 消費(fèi)原始數(shù)據(jù) :
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.text.SimpleDateFormat
import java.util.Date
// 需求:每天每地區(qū)熱門廣告 top3
object RealTime_App01 {
def main(args: Array[String]): Unit = {
//創(chuàng)建配置文件對象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")
//創(chuàng)建SparkStreaming執(zhí)行的上下文
val ssc = new StreamingContext(conf, Seconds(3))
//kafka參數(shù)聲明
val brokers = "cdh101:9092,cdh102:9092,cdh103:9092"
val topic = "luchangyin"
val group = "cloudera_mirrormaker"
val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
val autooffsetreset = "latest"
val kafkaParams = Map(
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.AUTO_OFFSET_RESET_DOC -> autooffsetreset,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization
)
//設(shè)置檢查點(diǎn)目錄
ssc.checkpoint("D:\\MySoftware\\StudySoftware\\MyIdea\\luchangyin2021\\MyFirstBigScreen\\TestFSLJavaDemon\\src\\main\\ck1")
//創(chuàng)建DS
val kafkaDS: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](Set(topic), kafkaParams)
)
//從kafka的kv值中取value 1616683286749,華東,上海,102,1
val dataDS = kafkaDS.map(_.value())
dataDS.print()
# 2.2 從kafka獲取到的原始數(shù)據(jù)進(jìn)行轉(zhuǎn)換 ==>(天_地區(qū)_廣告,1)
ssc.start()
ssc.awaitTermination()
}
}
輸出為:image.png
2.2 從kafka獲取到的原始數(shù)據(jù)進(jìn)行轉(zhuǎn)換 :
val mapDS: DStream[(String, Int)] =dataDS.map{
line => {
val fields: Array[String] = line.split(",")
//獲取時(shí)間戳
val timeStamp: Long = fields(0).toLong
//根據(jù)時(shí)間戳創(chuàng)建日期對象
val day = new Date(timeStamp)
//創(chuàng)建SimpleDataFormat稠歉,對日期對象進(jìn)行轉(zhuǎn)換
val sdf = new SimpleDateFormat("yyyy-MM-dd")
//將日期對象轉(zhuǎn)換為字符串
val dayStr: String = sdf.format(day)
//獲取地區(qū)
var area = fields(1)
// 獲取廣告
var adv = fields(4)
//封裝元組
(dayStr +"_"+ area +"_"+ adv, 1)
}
}
mapDS.print() //(2021-03-25_華東_4,1)
# 2.3 對每天每地區(qū)廣告點(diǎn)擊數(shù)進(jìn)行聚合處理 (天_地區(qū)_廣告,sum)
數(shù)據(jù)結(jié)果為:image.png
2.3 對每天每地區(qū)廣告點(diǎn)擊數(shù)進(jìn)行聚合處理:
//對每天每地區(qū)廣告點(diǎn)擊數(shù)進(jìn)行聚合處理 (天_地區(qū)_廣告,sum)
//注意:這里要統(tǒng)計(jì)的是一天的數(shù)據(jù)掰担,所以要將每一個(gè)采集周期的數(shù)據(jù)都統(tǒng)計(jì),需要傳遞狀態(tài)怒炸,所以要用udpateStateByKey
val updateDS: DStream[(String, Int)] = mapDS.updateStateByKey(
(seq: Seq[Int], buffer: Option[Int]) => {
Option(seq.sum + buffer.getOrElse(0))
}
)
updateDS.print() //(2021-03-25_華東_3,138)
# 2.4 將相同的天和地區(qū)放到一組
運(yùn)行結(jié)果:image.png
2.4 將相同的天和地區(qū)放到一組 :
//再次對結(jié)構(gòu)進(jìn)行轉(zhuǎn)換
val mapDS1: DStream[(String, (String, Int))] = updateDS.map{
case (k, sum) => {
val fields: Array[String] = k.split("_")
(fields(0) +"_"+ fields(1), (fields(2), sum))
}
}
//mapDS1.print() // (2021-03-25_華北,(1,98))
//將相同的天和地區(qū)放到一組
val groupDS: DStream[(String, Iterable[(String, Int)])] = mapDS1.groupByKey()
groupDS.print() //(2021-03-25_華中,ArrayBuffer((1,18), (2,21), (3,20), (4,22), (5,23)))
# 2.5 對分組中的數(shù)據(jù)進(jìn)行排序
運(yùn)行結(jié)果:image.png
2.5 對分組中的數(shù)據(jù)進(jìn)行排序:
// 對分組中的數(shù)據(jù)進(jìn)行排序
val resDS: DStream[(String, List[(String, Int)])] = groupDS.mapValues{
datas => {
datas.toList.sortBy(-_._2).take(3)
}
}
// 打印結(jié)果
resDS.print() // (2021-03-25_華北,List((5,107), (1,96), (3,92)))
我們得到最終的結(jié)果:image.png
?? 通過這個(gè)案例带饱,我們曉得了 Spark 在項(xiàng)目中的應(yīng)用,看代碼感覺 scala 還是比 Java 簡潔了好多,在實(shí)際Spark 的項(xiàng)目中 scala 還是占主導(dǎo)地位的勺疼,當(dāng)然這也要看公司的規(guī)范以及個(gè)人的習(xí)慣了教寂;好了,廢話不多說了执庐,多看幾遍多敲幾遍都會(huì)慢慢熟悉的酪耕,實(shí)踐出真知,come on 轨淌, 少年 ~