我們知道sparkstreaming官方已經(jīng)停止了維護(hù)么夫,從spark2.2開(kāi)始全力打造Structured Streaming岗憋,下面我們來(lái)介紹Structured Streaming如何讀取kafka中的數(shù)據(jù)。
Structured Streaming讀取數(shù)據(jù)分為批處理和流處理:
package com.ky.service
import org.apache.log4j.lf5.LogLevel
import org.apache.spark.sql.{Dataset, SparkSession}
/**
- @Author: xwj
- @Date: 2019/1/31 0031 13:48
- @Version 1.0
*/
object KafkaStreaming {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(s"${this.getClass.getSimpleName}").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel(LogLevel.ERROR.getLabel)
import spark.implicits._
val topic = "kafka"
val df = spark
//read是批量讀取开睡,readStream是流讀取,write是批量寫(xiě),writeStream是流寫(xiě)入 關(guān)于startingoffsets "latest" for streaming, "earliest" for batch
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.10:6667,192.168.1.11:6667")
.option("subscribe", topic) //topic可以訂閱多個(gè)移国,消費(fèi)具體分區(qū)用assign,消費(fèi)topic用subscribe
// .option("startingoffsets", "earliest") 讀具體偏移量,只支持批讀取
// .option("endingoffsets", "latest")
.load()
val kafkaDf: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
//判斷是否為流處理
println(kafkaDf.isStreaming)
kafkaDf.printSchema()
val words = kafkaDf.flatMap(_._2.split(","))
val wordCounts = words.groupBy("value").count()
val query = wordCounts
.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}
結(jié)合sparksql的應(yīng)用:
object KafkaStreaming2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(s"${this.getClass.getSimpleName}").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel(LogLevel.ERROR.toString)
val topic = "kafka"
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.10:6667,192.168.1.11:6667")
.option("subscribe", topic)
.load()
val kafkaDf: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
val value = kafkaDf.filter(_._2.split(",").length == 3)
val deviceDf: Dataset[DeviceData] = value.map(line => {
val arr = line._2.split(",")
DeviceData(arr(0), arr(1), arr(2).toDouble)
})
deviceDf.createOrReplaceTempView("test")
val frame = spark.sql("select * from test").where("signal>0.5")
//outputMode("complete")不可加
val query = frame.writeStream.format("console").start()
query.awaitTermination()
}
}
case class DeviceData(device: String, deviceType: String, signal: Double)
和傳統(tǒng)方式進(jìn)行對(duì)比:
object Test {
def main(args: Array[String]): Unit = {
//創(chuàng)建SparkConf()并設(shè)置App名稱
val conf = new SparkConf().setAppName("SQL-2")
//SQLContext要依賴SparkContext
val sc = new SparkContext(conf)
//創(chuàng)建SQLContext
val sqlContext = new SQLContext(sc)
//從指定的地址創(chuàng)建RDD
val personRDD = sc.textFile(args(0)).map(_.split(" "))
//通過(guò)StructType直接指定每個(gè)字段的schema
val schema = StructType(
List(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
)
)
//將RDD映射到rowRDD
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
//將schema信息應(yīng)用到rowRDD上
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//注冊(cè)表
personDataFrame.createOrReplaceTempView("t_person")
//執(zhí)行SQL
val df = sqlContext.sql("select * from t_person order by age desc limit 4")
//將結(jié)果以JSON的方式存儲(chǔ)到指定位置
df.write.json(args(1))
//停止Spark Context
sc.stop()
}
}