Structured Streaming自定義Output Sink到多個(gè)輸出源

最近有個(gè)需求砚作,需要把我們Structured Streaming處理后的實(shí)時(shí)數(shù)據(jù),發(fā)送到Redis一份嘹锁。官網(wǎng)并沒有提供redis輸出方式葫录。之前我們使用的是foreachBatch這種方式,可以同時(shí)輸出到關(guān)系型數(shù)據(jù)庫领猾,kafka等米同,但是官方?jīng)]提供輸出方法的redis就有點(diǎn)難處理。后來看官方文檔摔竿,官方推薦我們使用foreach進(jìn)行輸出窍霞。對(duì)于我們這種需要往多個(gè)數(shù)據(jù)源同時(shí)輸出的情況,我們需要自定義Output Sink:
自定義sink需要繼承自ForeachWriter拯坟。以下是我寫的同時(shí)輸出到kafka,redis和mysql的sink類

package xds.DataCleaning_201905

import java.sql.{Connection, PreparedStatement}
import java.util
import java.util.Date
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.sql.{ForeachWriter, Row}
import org.json4s.jackson.JsonMethods.{compact, render}
import redis.clients.jedis.{Jedis}
import xds.Utils.{DateUtils, KafkaProducerUtils, MysqlManager, RedisClient}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

/**
  * wh 20190621
  *
  *
  * 對(duì)于partition_id的每個(gè)分區(qū):
  *
  * 對(duì)于epoch_id的流數(shù)據(jù)的每個(gè)批次/紀(jì)元:
  *
  * 方法open(partitionId但金,epochId)被調(diào)用。
  *
  * 如果open(...)返回true郁季,則對(duì)于分區(qū)和批處理/紀(jì)元中的每一行冷溃,將調(diào)用方法進(jìn)程(行)。
  *
  * 調(diào)用方法close(錯(cuò)誤)梦裂,在處理行時(shí)看到錯(cuò)誤(如果有)似枕。
  */
class MySink extends ForeachWriter [Row]{
  val kafkaTopic : String = "LS_VD_CL"
  var jedis: Jedis = _
  var connection :Connection = _
  var statementToInsert : PreparedStatement = _
  var kafkaProducer : KafkaProducer[String, String] = _
  override def open(partitionId: Long, version: Long): Boolean = {
    jedis = RedisClient.pool.getResource
    connection = MysqlManager.getMysqlManager.getConnection
    kafkaProducer = KafkaProducerUtils.getProducer
    connection.setAutoCommit(false)
    statementToInsert = connection.prepareStatement("insert into t_videodata_1min (CreateTime,VehicleCount,Speed,ID_Link,ID_Station,ID_Lane,ID_TrafficSource,Type)" +
      "values (?,?,?,?,?,?,?,?)")
    println("open connection !")
    true
  }
  override def process(value: Row): Unit = {
    //獲取row中每一個(gè)字段
    val CreateTime:Date = value.getAs[Date](0)
    val VehicleCount:Float = value.getAs[Float](1)
    val Speed:Float= value.getAs[Float](2)
    val ID_Link:String = value.getAs[String](3)
    val ID_Station:String = value.getAs[String](4)
    val ID_Lane:String = value.getAs[String](5)
    val ID_TrafficSource:String = value.getAs[String](6)
    val Type:Integer = value.getAs[Integer](7)
//以下為存入redis
    val map :util.HashMap[String,String]= new util.HashMap[String,String]
    map.put("VehicleCount",VehicleCount.toString)
    map.put("Speed",Speed.toString)
    map.put("ID_Lane",ID_Lane)
    val hourMin = DateUtils.dateToStr(CreateTime,"HHmm")
    jedis.hmset("C"+hourMin+ID_Link+"#"+ID_TrafficSource,map)
    val createTimeStr = DateUtils.dateToStr(CreateTime,"yyyy-MM-dd HH:mm:ss")
//以下為存入mysql
    statementToInsert.setObject(1,CreateTime)
    statementToInsert.setObject(2,VehicleCount)
    statementToInsert.setObject(3,Speed)
    statementToInsert.setObject(4,ID_Link)
    statementToInsert.setObject(5,ID_Station)
    statementToInsert.setObject(6,ID_Lane)
    statementToInsert.setObject(7,ID_TrafficSource)
    statementToInsert.setObject(8,Type)
    statementToInsert.addBatch()
//以下為發(fā)至kafka
    val messageToKafka = ("ID_TrafficSource" -> ID_TrafficSource) ~
      ("CreateTime" -> createTimeStr)~
      ("ID_Station" -> ID_Station) ~
      ("ID_Link" -> ID_Link)~
      ("ID_Lane" -> ID_Lane)~
      ("VehicleCount" -> VehicleCount) ~
      ("Speed" -> Speed) ~
      ("Type" -> Type.toString)
    val jsonToKafka = compact(render(messageToKafka))//封裝成json
    kafkaProducer.send(new ProducerRecord(kafkaTopic,jsonToKafka))
  }
//記得關(guān)閉各種連接
  override def close(errorOrNull: Throwable): Unit = {
    //關(guān)閉連接
    println("close connection !")
    statementToInsert.executeBatch() //批量執(zhí)行
    connection.commit //提交
    //注意關(guān)閉各種連接
    statementToInsert.close()
    connection.close()
    jedis.close()
  }
}

主函數(shù)里面我們只需要如下調(diào)用即可:

val query = df.writeStream.outputMode("append").foreach(new MySink).start()
query.awaitTermination()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市年柠,隨后出現(xiàn)的幾起案子凿歼,更是在濱河造成了極大的恐慌,老刑警劉巖冗恨,帶你破解...
    沈念sama閱讀 222,681評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件咖为,死亡現(xiàn)場離奇詭異策彤,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,205評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門挠羔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來吠勘,“玉大人币厕,你說我怎么就攤上這事〕情唬” “怎么了?”我有些...
    開封第一講書人閱讀 169,421評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵态兴,是天一觀的道長狠持。 經(jīng)常有香客問我,道長瞻润,這世上最難降的妖魔是什么工坊? 我笑而不...
    開封第一講書人閱讀 60,114評(píng)論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮敢订,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘罢吃。我一直安慰自己楚午,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,116評(píng)論 6 398
  • 文/花漫 我一把揭開白布尿招。 她就那樣靜靜地躺著矾柜,像睡著了一般。 火紅的嫁衣襯著肌膚如雪就谜。 梳的紋絲不亂的頭發(fā)上怪蔑,一...
    開封第一講書人閱讀 52,713評(píng)論 1 312
  • 那天,我揣著相機(jī)與錄音丧荐,去河邊找鬼缆瓣。 笑死,一個(gè)胖子當(dāng)著我的面吹牛虹统,可吹牛的內(nèi)容都是我干的弓坞。 我是一名探鬼主播,決...
    沈念sama閱讀 41,170評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼车荔,長吁一口氣:“原來是場噩夢啊……” “哼渡冻!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起忧便,我...
    開封第一講書人閱讀 40,116評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤族吻,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后珠增,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體超歌,經(jīng)...
    沈念sama閱讀 46,651評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,714評(píng)論 3 342
  • 正文 我和宋清朗相戀三年蒂教,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了握础。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,865評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡悴品,死狀恐怖禀综,靈堂內(nèi)的尸體忽然破棺而出简烘,到底是詐尸還是另有隱情,我是刑警寧澤定枷,帶...
    沈念sama閱讀 36,527評(píng)論 5 351
  • 正文 年R本政府宣布孤澎,位于F島的核電站,受9級(jí)特大地震影響欠窒,放射性物質(zhì)發(fā)生泄漏覆旭。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,211評(píng)論 3 336
  • 文/蒙蒙 一岖妄、第九天 我趴在偏房一處隱蔽的房頂上張望型将。 院中可真熱鬧,春花似錦荐虐、人聲如沸七兜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,699評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽腕铸。三九已至,卻和暖如春铛碑,著一層夾襖步出監(jiān)牢的瞬間狠裹,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,814評(píng)論 1 274
  • 我被黑心中介騙來泰國打工汽烦, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留涛菠,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,299評(píng)論 3 379
  • 正文 我出身青樓撇吞,卻偏偏與公主長得像碗暗,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子梢夯,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,870評(píng)論 2 361