最近一直在研究presto接口hive和mysql的一些使用和功能,因此,我在想是否能將數(shù)據(jù)實(shí)時(shí)的寫入到hive呢歹叮,剛好公司項(xiàng)目有需求數(shù)據(jù)實(shí)時(shí)寫入到hive中贿堰,對(duì)此辙芍,我特定實(shí)現(xiàn)了一下。
pom文件
spark-streaming-kafka-0-10_2.112.1.0
spark-core_2.11
spark-sql_2.11
scala-library
采用的是scala2.11.8
實(shí)現(xiàn)邏輯:
實(shí)時(shí)的獲取kafka中的數(shù)據(jù)羹与,然后保存偏移量故硅,實(shí)時(shí)的寫入到hive中
實(shí)現(xiàn)過程如下:
以下代碼是針對(duì)topic的分區(qū)只有一個(gè)的情況下:
object FamilyBandService {
val logger = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[*]")
conf.set("spark.defalut.parallelism","500")
//每秒鐘每個(gè)分區(qū)kafka拉取消息的速率
? ? ? .set("spark.streaming.kafka.maxRatePerPartition","500")
// 序列化
? ? ? .set("spark.serilizer","org.apache.spark.serializer.KryoSerializer")
// 建議開啟rdd的壓縮
? ? ? .set("spark.rdd.compress","true")
val sc =new SparkContext(conf)
val ssc =new StreamingContext(sc,Seconds(3))
val brokers = PropertyUtil.getInstance().getProperty("brokerList","") ?//配置文件讀取工具類需自行編寫
val groupId = PropertyUtil.getInstance().getProperty("groupid","")
val topic = PropertyUtil.getInstance().getProperty("topic","")
var topics =Array(topic)
Logger.getLogger("org").setLevel(Level.ERROR) //臨時(shí)測(cè)試的時(shí)候只開啟error級(jí)別,方便排錯(cuò)纵搁。
//封裝參數(shù)
? ? val kafkaParams =Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" ->classOf[StringDeserializer],
"value.deserializer" ->classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" ->"latest",
"enable.auto.commit" -> (false: java.lang.Boolean))
//從redis中獲取到偏移量
? ? val offsets: Long = RedisUtil.hashGet("offset","offsets").toLong
val topicPartition: TopicPartition =new TopicPartition(topic,0)
val partitionoffsets:Map[TopicPartition, Long] =Map(topicPartition -> offsets)
//獲取到實(shí)時(shí)流對(duì)象
? ? val kafkaStream =if (offsets ==0) {
KafkaUtils.createDirectStream[String,String](
ssc,
PreferConsistent, ?//這里有3種模式,一般情況下吃衅,都是使用PreferConsistent
//LocationStrategies.PreferConsistent:將在可用的執(zhí)行器之間均勻分配分區(qū)。
//PreferBrokers? 執(zhí)行程序與Kafka代理所在的主機(jī)相同腾誉,將更喜歡在該分區(qū)的Kafka leader上安排分區(qū)
//PreferFixed 如果您在分區(qū)之間的負(fù)載有顯著偏差徘层,這允許您指定分區(qū)到主機(jī)的顯式映射(任何未指定的分區(qū)將使用一致的位置)。
Subscribe[String,String](topics, kafkaParams) //消息訂閱
)
}else {
KafkaUtils.createDirectStream[String,String](
ssc,
PreferConsistent,
Subscribe[String,String](topics, kafkaParams, partitionoffsets) ?//此種方式是針對(duì)具體某個(gè)分區(qū)或者topic只有一個(gè)分區(qū)的情況
)
}
//業(yè)務(wù)處理
? ? kafkaStream.foreachRDD(rdd => {
val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges ?//獲取到分區(qū)和偏移量信息
val events: RDD[Some[String]] = rdd.map(x => {
val data = x.value()
Some(data)
})
val session = SQLContextSingleton.getSparkSession(events.sparkContext) ?//構(gòu)建一個(gè)Sparksession的單例
session.sql("set hive.exec.dynamic.partition=true") ? ? ?//配置hive支持動(dòng)態(tài)分區(qū)
session.sql("set hive.exec.dynamic.partition.mode=nonstrict") ? //配置hive動(dòng)態(tài)分區(qū)為非嚴(yán)格模式
//如果將數(shù)據(jù)轉(zhuǎn)換為Seq(xxxx),然后倒入隱式轉(zhuǎn)換import session.implicalit._ ?是否能實(shí)現(xiàn)呢妄辩,答案是否定的惑灵。
val dataRow = events.map(line => { ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //構(gòu)建row
val temp = line.get.split("###") ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
Row(temp(0), temp(1), temp(2), temp(3), temp(4), temp(5))
})
//"deviceid","code","time","info","sdkversion","appversion"
?val structType =StructType(Array( ? ? ? ? ? ? ? ? ? ? ? ? ?//確定字段的類別
StructField("deviceid", StringType,true),
StructField("code", StringType,true),
StructField("time", StringType,true),
StructField("info", StringType,true),
StructField("sdkversion", StringType,true),
StructField("appversion", StringType,true)
))
val df = session.createDataFrame(dataRow, structType) ? //構(gòu)建df
df.createOrReplaceTempView("jk_device_info")
session.sql("insert into test.jk_device_info select * from jk_device_info")
for (rs <- ranges) {
//實(shí)時(shí)保存偏移量到redis
? ? ? ? val value = rs.untilOffset.toString
RedisUtil.hashSet("offset","offsets", value) ? //偏移量保存
println(s"the offset:${value}")
}
})
println("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
ssc.start()
ssc.awaitTermination()
}
}
上面說到了單分區(qū)情況下的實(shí)時(shí)寫入hive的情況,此種情況要求數(shù)據(jù)具有很高的時(shí)序性眼耀,但是并發(fā)會(huì)受到影響英支。那么我們采用多分區(qū)的情況下,如何實(shí)現(xiàn)呢哮伟,代碼如下:
//單分區(qū)情況
/*? ? //從redis中獲取到偏移量
val offsets: Long = RedisUtil.hashGet("offset", "offsets").toLong ? ?//redis工具類需要自行編寫
val topicPartition: TopicPartition = new TopicPartition(topic, 0)
val partitionoffsets: Map[TopicPartition, Long] = Map(topicPartition -> offsets)*/
//多分區(qū)情況
val partitions = 3
?var fromdbOffset =Map[TopicPartition, Long]()
for (partition <-0 until partitions) {
val topicPartition =new TopicPartition(topic, partition)
val offsets = RedisUtil.hashGet("offset",s"${topic}_${partition}").toLong
fromdbOffset += (topicPartition -> offsets)
}
//獲取到實(shí)時(shí)流對(duì)象
? ? val kafkaStream =if (fromdbOffset.size==0) {
KafkaUtils.createDirectStream[String,String](
ssc,
PreferConsistent,
Subscribe[String,String](topics, kafkaParams)
)
}else {
KafkaUtils.createDirectStream[String,String](
ssc,
PreferConsistent,
//? ? ? ? Subscribe[String, String](topics, kafkaParams, partitionoffsets) 訂閱具體某個(gè)分區(qū)
? ? ? ? ConsumerStrategies.Assign[String,String](fromdbOffset.keys, kafkaParams, fromdbOffset)
)
}
針對(duì)Assign 和?Subscribe 干花,我們來看下官方源碼
Assign:
//2個(gè)參數(shù)
def Assign[K,V](
topicPartitions:Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K,V] = {
new Assign[K,V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
//3個(gè)參數(shù)
def Assign[K,V](
topicPartitions:Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K,V] = {
new Assign[K,V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l =>new jl.Long(l)).asJava))
}
Subscribe :
2個(gè)參數(shù)
def Subscribe[K,V](
topics:Iterable[jl.String],
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K,V] = {
new Subscribe[K,V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
3個(gè)參數(shù)
def Subscribe[K,V](
topics:Iterable[jl.String],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K,V] = {
new Subscribe[K,V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l =>new jl.Long(l)).asJava))
}
經(jīng)過對(duì)比,我們發(fā)現(xiàn)楞黄,區(qū)別就在第一個(gè)參數(shù)上池凄,一個(gè)為topics,一個(gè)為topicPartitions
SQLContextSingleton單例
def getSparkSession(sc: SparkContext): SparkSession = {
if (sparkSession ==null) {
sparkSession = SparkSession
.builder()
.enableHiveSupport()
.master("local[*]")
.config(sc.getConf)
.config("spark.files.openCostInBytes", PropertyUtil.getInstance().getProperty("spark.files.openCostInBytes"))
//連接到hive元數(shù)據(jù)庫
? ? ? .config("hive.metastore.uris","thrift://192.168.1.61:9083")
//--files hdfs:///user/processuser/hive-site.xml 集群上運(yùn)行需要指定hive-site.xml的位置
? ? ? .config("spark.sql.warehouse.dir","hdfs://192.168.1.61:8020/user/hive/warehouse")
.getOrCreate()
}
sparkSession
}
如果需要連接到hive必須要注意的幾個(gè)事項(xiàng):
1鬼廓,指定hive的元數(shù)據(jù)地址
2肿仑,指定spark.sql.warehouse.dir的數(shù)據(jù)存儲(chǔ)位置
3,enableHiveSupport()
4,resource下要放hive-site.xml文件
xml文件需要配置的信息尤慰,以下信息均可從集群的配置中得到:
hive.exec.scratchdir
hive.metastore.warehouse.dir
hive.querylog.location
hive.metastore.uris ? ?
javax.jdo.option.ConnectionURL
javax.jdo.option.ConnectionDriverName
javax.jdo.option.ConnectionUserName
javax.jdo.option.ConnectionPassword
5馏锡,本地執(zhí)行要指定hadoop的目錄
System.setProperty("hadoop.home.dir", PropertyUtil.getInstance().getProperty("localMode"))
#hadoop info
localMode=D://hadoop-2.6.5//hadoop-2.6.5
clusterMode=file://usr//hdp//2.6.2.0-205//hadoop