sparkstreaming實(shí)時(shí)寫入hive

最近一直在研究presto接口hive和mysql的一些使用和功能,因此,我在想是否能將數(shù)據(jù)實(shí)時(shí)的寫入到hive呢歹叮,剛好公司項(xiàng)目有需求數(shù)據(jù)實(shí)時(shí)寫入到hive中贿堰,對(duì)此辙芍,我特定實(shí)現(xiàn)了一下。



pom文件

spark-streaming-kafka-0-10_2.112.1.0

spark-core_2.11

spark-sql_2.11

scala-library

采用的是scala2.11.8

實(shí)現(xiàn)邏輯:

實(shí)時(shí)的獲取kafka中的數(shù)據(jù)羹与,然后保存偏移量故硅,實(shí)時(shí)的寫入到hive中

實(shí)現(xiàn)過程如下:

以下代碼是針對(duì)topic的分區(qū)只有一個(gè)的情況下:

object FamilyBandService {

val logger = LoggerFactory.getLogger(this.getClass)

def main(args: Array[String]): Unit = {

val conf =new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[*]")

conf.set("spark.defalut.parallelism","500")

//每秒鐘每個(gè)分區(qū)kafka拉取消息的速率

? ? ? .set("spark.streaming.kafka.maxRatePerPartition","500")

// 序列化

? ? ? .set("spark.serilizer","org.apache.spark.serializer.KryoSerializer")

// 建議開啟rdd的壓縮

? ? ? .set("spark.rdd.compress","true")

val sc =new SparkContext(conf)

val ssc =new StreamingContext(sc,Seconds(3))

val brokers = PropertyUtil.getInstance().getProperty("brokerList","") ?//配置文件讀取工具類需自行編寫

val groupId = PropertyUtil.getInstance().getProperty("groupid","")

val topic = PropertyUtil.getInstance().getProperty("topic","")

var topics =Array(topic)

Logger.getLogger("org").setLevel(Level.ERROR) //臨時(shí)測(cè)試的時(shí)候只開啟error級(jí)別,方便排錯(cuò)纵搁。

//封裝參數(shù)

? ? val kafkaParams =Map[String, Object](

"bootstrap.servers" -> brokers,

"key.deserializer" ->classOf[StringDeserializer],

"value.deserializer" ->classOf[StringDeserializer],

"group.id" -> groupId,

"auto.offset.reset" ->"latest",

"enable.auto.commit" -> (false: java.lang.Boolean))

//從redis中獲取到偏移量

? ? val offsets: Long = RedisUtil.hashGet("offset","offsets").toLong

val topicPartition: TopicPartition =new TopicPartition(topic,0)

val partitionoffsets:Map[TopicPartition, Long] =Map(topicPartition -> offsets)

//獲取到實(shí)時(shí)流對(duì)象

? ? val kafkaStream =if (offsets ==0) {

KafkaUtils.createDirectStream[String,String](

ssc,

PreferConsistent, ?//這里有3種模式,一般情況下吃衅,都是使用PreferConsistent

//LocationStrategies.PreferConsistent:將在可用的執(zhí)行器之間均勻分配分區(qū)。

//PreferBrokers? 執(zhí)行程序與Kafka代理所在的主機(jī)相同腾誉,將更喜歡在該分區(qū)的Kafka leader上安排分區(qū)

//PreferFixed 如果您在分區(qū)之間的負(fù)載有顯著偏差徘层,這允許您指定分區(qū)到主機(jī)的顯式映射(任何未指定的分區(qū)將使用一致的位置)。

Subscribe[String,String](topics, kafkaParams) //消息訂閱

)

}else {

KafkaUtils.createDirectStream[String,String](

ssc,

PreferConsistent,

Subscribe[String,String](topics, kafkaParams, partitionoffsets) ?//此種方式是針對(duì)具體某個(gè)分區(qū)或者topic只有一個(gè)分區(qū)的情況

)

}

//業(yè)務(wù)處理

? ? kafkaStream.foreachRDD(rdd => {

val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges ?//獲取到分區(qū)和偏移量信息

val events: RDD[Some[String]] = rdd.map(x => {

val data = x.value()

Some(data)

})

val session = SQLContextSingleton.getSparkSession(events.sparkContext) ?//構(gòu)建一個(gè)Sparksession的單例

session.sql("set hive.exec.dynamic.partition=true") ? ? ?//配置hive支持動(dòng)態(tài)分區(qū)

session.sql("set hive.exec.dynamic.partition.mode=nonstrict") ? //配置hive動(dòng)態(tài)分區(qū)為非嚴(yán)格模式

//如果將數(shù)據(jù)轉(zhuǎn)換為Seq(xxxx),然后倒入隱式轉(zhuǎn)換import session.implicalit._ ?是否能實(shí)現(xiàn)呢妄辩,答案是否定的惑灵。

val dataRow = events.map(line => { ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //構(gòu)建row

val temp = line.get.split("###") ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??

Row(temp(0), temp(1), temp(2), temp(3), temp(4), temp(5))

})

//"deviceid","code","time","info","sdkversion","appversion"

?val structType =StructType(Array( ? ? ? ? ? ? ? ? ? ? ? ? ?//確定字段的類別

StructField("deviceid", StringType,true),

StructField("code", StringType,true),

StructField("time", StringType,true),

StructField("info", StringType,true),

StructField("sdkversion", StringType,true),

StructField("appversion", StringType,true)

))

val df = session.createDataFrame(dataRow, structType) ? //構(gòu)建df

df.createOrReplaceTempView("jk_device_info")

session.sql("insert into test.jk_device_info select * from jk_device_info")

for (rs <- ranges) {

//實(shí)時(shí)保存偏移量到redis

? ? ? ? val value = rs.untilOffset.toString

RedisUtil.hashSet("offset","offsets", value) ? //偏移量保存

println(s"the offset:${value}")

}

})

println("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")

ssc.start()

ssc.awaitTermination()

}

}

上面說到了單分區(qū)情況下的實(shí)時(shí)寫入hive的情況,此種情況要求數(shù)據(jù)具有很高的時(shí)序性眼耀,但是并發(fā)會(huì)受到影響英支。那么我們采用多分區(qū)的情況下,如何實(shí)現(xiàn)呢哮伟,代碼如下:

//單分區(qū)情況

/*? ? //從redis中獲取到偏移量

val offsets: Long = RedisUtil.hashGet("offset", "offsets").toLong ? ?//redis工具類需要自行編寫

val topicPartition: TopicPartition = new TopicPartition(topic, 0)

val partitionoffsets: Map[TopicPartition, Long] = Map(topicPartition -> offsets)*/

//多分區(qū)情況

val partitions = 3

?var fromdbOffset =Map[TopicPartition, Long]()

for (partition <-0 until partitions) {

val topicPartition =new TopicPartition(topic, partition)

val offsets = RedisUtil.hashGet("offset",s"${topic}_${partition}").toLong

fromdbOffset += (topicPartition -> offsets)

}

//獲取到實(shí)時(shí)流對(duì)象

? ? val kafkaStream =if (fromdbOffset.size==0) {

KafkaUtils.createDirectStream[String,String](

ssc,

PreferConsistent,

Subscribe[String,String](topics, kafkaParams)

)

}else {

KafkaUtils.createDirectStream[String,String](

ssc,

PreferConsistent,

//? ? ? ? Subscribe[String, String](topics, kafkaParams, partitionoffsets) 訂閱具體某個(gè)分區(qū)

? ? ? ? ConsumerStrategies.Assign[String,String](fromdbOffset.keys, kafkaParams, fromdbOffset)

)

}

針對(duì)Assign 和?Subscribe 干花,我們來看下官方源碼

Assign:

//2個(gè)參數(shù)

def Assign[K,V](

topicPartitions:Iterable[TopicPartition],

kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K,V] = {

new Assign[K,V](

new ju.ArrayList(topicPartitions.asJavaCollection),

new ju.HashMap[String, Object](kafkaParams.asJava),

ju.Collections.emptyMap[TopicPartition, jl.Long]())

}

//3個(gè)參數(shù)

def Assign[K,V](

topicPartitions:Iterable[TopicPartition],

kafkaParams: collection.Map[String, Object],

offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K,V] = {

new Assign[K,V](

new ju.ArrayList(topicPartitions.asJavaCollection),

new ju.HashMap[String, Object](kafkaParams.asJava),

new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l =>new jl.Long(l)).asJava))

}

Subscribe :

2個(gè)參數(shù)

def Subscribe[K,V](

topics:Iterable[jl.String],

kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K,V] = {

new Subscribe[K,V](

new ju.ArrayList(topics.asJavaCollection),

new ju.HashMap[String, Object](kafkaParams.asJava),

ju.Collections.emptyMap[TopicPartition, jl.Long]())

}

3個(gè)參數(shù)

def Subscribe[K,V](

topics:Iterable[jl.String],

kafkaParams: collection.Map[String, Object],

offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K,V] = {

new Subscribe[K,V](

new ju.ArrayList(topics.asJavaCollection),

new ju.HashMap[String, Object](kafkaParams.asJava),

new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l =>new jl.Long(l)).asJava))

}

經(jīng)過對(duì)比,我們發(fā)現(xiàn)楞黄,區(qū)別就在第一個(gè)參數(shù)上池凄,一個(gè)為topics,一個(gè)為topicPartitions

SQLContextSingleton單例

def getSparkSession(sc: SparkContext): SparkSession = {

if (sparkSession ==null) {

sparkSession = SparkSession

.builder()

.enableHiveSupport()

.master("local[*]")

.config(sc.getConf)

.config("spark.files.openCostInBytes", PropertyUtil.getInstance().getProperty("spark.files.openCostInBytes"))

//連接到hive元數(shù)據(jù)庫

? ? ? .config("hive.metastore.uris","thrift://192.168.1.61:9083")

//--files hdfs:///user/processuser/hive-site.xml 集群上運(yùn)行需要指定hive-site.xml的位置

? ? ? .config("spark.sql.warehouse.dir","hdfs://192.168.1.61:8020/user/hive/warehouse")

.getOrCreate()

}

sparkSession

}

如果需要連接到hive必須要注意的幾個(gè)事項(xiàng):

1鬼廓,指定hive的元數(shù)據(jù)地址

2肿仑,指定spark.sql.warehouse.dir的數(shù)據(jù)存儲(chǔ)位置

3,enableHiveSupport()

4,resource下要放hive-site.xml文件

xml文件需要配置的信息尤慰,以下信息均可從集群的配置中得到:

hive.exec.scratchdir

hive.metastore.warehouse.dir

hive.querylog.location

hive.metastore.uris ? ?

javax.jdo.option.ConnectionURL

javax.jdo.option.ConnectionDriverName

javax.jdo.option.ConnectionUserName

javax.jdo.option.ConnectionPassword

5馏锡,本地執(zhí)行要指定hadoop的目錄

System.setProperty("hadoop.home.dir", PropertyUtil.getInstance().getProperty("localMode"))

#hadoop info

localMode=D://hadoop-2.6.5//hadoop-2.6.5

clusterMode=file://usr//hdp//2.6.2.0-205//hadoop

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市伟端,隨后出現(xiàn)的幾起案子杯道,更是在濱河造成了極大的恐慌,老刑警劉巖责蝠,帶你破解...
    沈念sama閱讀 218,451評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件党巾,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡霜医,警方通過查閱死者的電腦和手機(jī)齿拂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來支子,“玉大人创肥,你說我怎么就攤上這事≈蹬螅” “怎么了叹侄?”我有些...
    開封第一講書人閱讀 164,782評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)昨登。 經(jīng)常有香客問我趾代,道長(zhǎng),這世上最難降的妖魔是什么丰辣? 我笑而不...
    開封第一講書人閱讀 58,709評(píng)論 1 294
  • 正文 為了忘掉前任撒强,我火速辦了婚禮,結(jié)果婚禮上笙什,老公的妹妹穿的比我還像新娘飘哨。我一直安慰自己,他們只是感情好琐凭,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,733評(píng)論 6 392
  • 文/花漫 我一把揭開白布芽隆。 她就那樣靜靜地躺著,像睡著了一般统屈。 火紅的嫁衣襯著肌膚如雪胚吁。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,578評(píng)論 1 305
  • 那天愁憔,我揣著相機(jī)與錄音腕扶,去河邊找鬼。 笑死吨掌,一個(gè)胖子當(dāng)著我的面吹牛半抱,可吹牛的內(nèi)容都是我干的脓恕。 我是一名探鬼主播,決...
    沈念sama閱讀 40,320評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼代虾,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼进肯!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起棉磨,我...
    開封第一講書人閱讀 39,241評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎学辱,沒想到半個(gè)月后乘瓤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,686評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡策泣,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,878評(píng)論 3 336
  • 正文 我和宋清朗相戀三年衙傀,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片萨咕。...
    茶點(diǎn)故事閱讀 39,992評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡统抬,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出危队,到底是詐尸還是另有隱情聪建,我是刑警寧澤,帶...
    沈念sama閱讀 35,715評(píng)論 5 346
  • 正文 年R本政府宣布茫陆,位于F島的核電站金麸,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏簿盅。R本人自食惡果不足惜挥下,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,336評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望桨醋。 院中可真熱鬧棚瘟,春花似錦、人聲如沸喜最。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽返顺。三九已至禀苦,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間遂鹊,已是汗流浹背振乏。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留秉扑,地道東北人慧邮。 一個(gè)月前我還...
    沈念sama閱讀 48,173評(píng)論 3 370
  • 正文 我出身青樓调限,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親误澳。 傳聞我的和親對(duì)象是個(gè)殘疾皇子耻矮,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,947評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容