SparkStreaming之寫數(shù)據(jù)到Kafka

本文主要記錄使用SparkStreaming從Kafka里讀取數(shù)據(jù),并使用Redis保存Offset孔飒,并監(jiān)聽Redis中的某個(gè)Key是否存在來停止程序秽荞,將讀取到的數(shù)據(jù)轉(zhuǎn)換為json寫入到Kafka

相關(guān)文章:
1.Spark之PI本地
2.Spark之WordCount集群
3.SparkStreaming之讀取Kafka數(shù)據(jù)
4.SparkStreaming之使用redis保存Kafka的Offset
5.SparkStreaming之優(yōu)雅停止
6.SparkStreaming之寫數(shù)據(jù)到Kafka
7.Spark計(jì)算《西虹市首富》短評(píng)詞云

KafkaSink

對(duì)KafkaProducer進(jìn)行封裝便于廣播

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] = {
    producer.send(new ProducerRecord[K, V](topic, key, value))
  }

  def send(topic: String, value: V): Future[RecordMetadata] = {
    producer.send(new ProducerRecord[K, V](topic, value))
  }
}

object KafkaSink {

  import scala.collection.JavaConversions._

  def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)
      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }
      producer
    }
    new KafkaSink(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}

初始化KafkaSink,并廣播

// 初始化KafkaSink,并廣播
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
      p.setProperty("bootstrap.servers", bootstrapServers)
      p.setProperty("key.serializer", classOf[StringSerializer].getName)
      p.setProperty("value.serializer", classOf[StringSerializer].getName)
      p
  }
  if (LOG.isInfoEnabled)
    LOG.info("kafka producer init done!")
    ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}

變量Partition并使用廣播變量發(fā)送到Kafka

// 使用廣播變量發(fā)送到Kafka
partition.foreach(record => {
  kafkaProducer.value.send("Test_Json", new Gson().toJson(record))
})

完整程序 Kafka2KafkaStreaming

import com.google.gson.Gson
import me.jinkun.scala.util.{InternalRedisClient, KafkaSink}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import org.slf4j.LoggerFactory

/**
  *
  */
object Kafka2KafkaStreaming {
  private val LOG = LoggerFactory.getLogger("Kafka2KafkaStreaming")

  private val STOP_FLAG = "TEST_STOP_FLAG"

  def initRedisPool() = {
    // Redis configurations
    val maxTotal = 20
    val maxIdle = 10
    val minIdle = 1
    val redisHost = "47.98.119.122"
    val redisPort = 6379
    val redisTimeout = 30000
    InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
  }

  /**
    * 從redis里獲取Topic的offset值
    *
    * @param topicName
    * @param partitions
    * @return
    */
  def getLastCommittedOffsets(topicName: String, partitions: Int): Map[TopicPartition, Long] = {
    if (LOG.isInfoEnabled())
      LOG.info("||--Topic:{},getLastCommittedOffsets from Redis--||", topicName)

    //從Redis獲取上一次存的Offset
    val jedis = InternalRedisClient.getPool.getResource
    val fromOffsets = collection.mutable.HashMap.empty[TopicPartition, Long]
    for (partition <- 0 to partitions - 1) {
      val topic_partition_key = topicName + "_" + partition
      val lastSavedOffset = jedis.get(topic_partition_key)
      val lastOffset = if (lastSavedOffset == null) 0L else lastSavedOffset.toLong
      fromOffsets += (new TopicPartition(topicName, partition) -> lastOffset)
    }
    jedis.close()

    fromOffsets.toMap
  }

  def main(args: Array[String]): Unit = {
    //初始化Redis Pool
    initRedisPool()

    val conf = new SparkConf()
      .setAppName("ScalaKafkaStream")
      .setMaster("local[3]")

    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc, Seconds(3))

    val bootstrapServers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"
    val groupId = "kafka-test-group"
    val topicName = "Test"
    val maxPoll = 1000

    val kafkaParams = Map(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    // 這里指定Topic的Partition的總數(shù)
    val fromOffsets = getLastCommittedOffsets(topicName, 3)

    // 初始化KafkaDS
    val kafkaTopicDS =
      KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets))

    // 初始化KafkaSink,并廣播
    val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.setProperty("bootstrap.servers", bootstrapServers)
        p.setProperty("key.serializer", classOf[StringSerializer].getName)
        p.setProperty("value.serializer", classOf[StringSerializer].getName)
        p
      }
      if (LOG.isInfoEnabled)
        LOG.info("kafka producer init done!")
      ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
    }


    kafkaTopicDS.foreachRDD(rdd => {
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      // 如果rdd有數(shù)據(jù)
      if (!rdd.isEmpty()) {
        // 在每個(gè)Partition里執(zhí)行
        rdd
          .map(_.value())
          .flatMap(_.split(" "))
          .map(x => (x, 1L))
          .reduceByKey(_ + _)
          .foreachPartition(partition => {

            val jedis = InternalRedisClient.getPool.getResource
            val p = jedis.pipelined()
            p.multi() //開啟事務(wù)

            // 使用廣播變量發(fā)送到Kafka
            partition.foreach(record => {
              kafkaProducer.value.send("Test_Json", new Gson().toJson(record))
            })

            val offsetRange = offsetRanges(TaskContext.get.partitionId)
            println("partition : " + offsetRange.partition + " fromOffset:  " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
            val topic_partition_key = offsetRange.topic + "_" + offsetRange.partition
            p.set(topic_partition_key, offsetRange.untilOffset + "")

            p.exec() //提交事務(wù)
            p.sync //關(guān)閉pipeline
            jedis.close()
          })
      }
    })

    ssc.start()

    // 優(yōu)雅停止
    stopByMarkKey(ssc)

    ssc.awaitTermination()
  }

  /**
    * 優(yōu)雅停止
    *
    * @param ssc
    */
  def stopByMarkKey(ssc: StreamingContext): Unit = {
    val intervalMills = 10 * 1000 // 每隔10秒掃描一次消息是否存在
    var isStop = false
    while (!isStop) {
      isStop = ssc.awaitTerminationOrTimeout(intervalMills)
      if (!isStop && isExists(STOP_FLAG)) {
        LOG.warn("2秒后開始關(guān)閉sparstreaming程序.....")
        Thread.sleep(2000)
        ssc.stop(true, true)
      }
    }
  }

  /**
    * 判斷Key是否存在
    *
    * @param key
    * @return
    */
  def isExists(key: String): Boolean = {
    val jedis = InternalRedisClient.getPool.getResource
    val flag = jedis.exists(key)
    jedis.close()
    flag
  }
}

創(chuàng)建名為Test_Json的Topic

kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka --topic Test_Json --partitions 3 --replication-factor 3

運(yùn)行結(jié)果如下:


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蛾方,一起剝皮案震驚了整個(gè)濱河市像捶,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌桩砰,老刑警劉巖拓春,帶你破解...
    沈念sama閱讀 211,265評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異亚隅,居然都是意外死亡硼莽,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門煮纵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來懂鸵,“玉大人,你說我怎么就攤上這事行疏〈夜猓” “怎么了?”我有些...
    開封第一講書人閱讀 156,852評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵酿联,是天一觀的道長(zhǎng)终息。 經(jīng)常有香客問我,道長(zhǎng)贞让,這世上最難降的妖魔是什么周崭? 我笑而不...
    開封第一講書人閱讀 56,408評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮喳张,結(jié)果婚禮上续镇,老公的妹妹穿的比我還像新娘。我一直安慰自己蹲姐,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評(píng)論 5 384
  • 文/花漫 我一把揭開白布人柿。 她就那樣靜靜地躺著柴墩,像睡著了一般。 火紅的嫁衣襯著肌膚如雪凫岖。 梳的紋絲不亂的頭發(fā)上江咳,一...
    開封第一講書人閱讀 49,772評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音哥放,去河邊找鬼歼指。 笑死爹土,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的踩身。 我是一名探鬼主播胀茵,決...
    沈念sama閱讀 38,921評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼挟阻!你這毒婦竟也來了琼娘?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,688評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤附鸽,失蹤者是張志新(化名)和其女友劉穎脱拼,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體坷备,經(jīng)...
    沈念sama閱讀 44,130評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡熄浓,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了省撑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片赌蔑。...
    茶點(diǎn)故事閱讀 38,617評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖丁侄,靈堂內(nèi)的尸體忽然破棺而出惯雳,到底是詐尸還是另有隱情,我是刑警寧澤鸿摇,帶...
    沈念sama閱讀 34,276評(píng)論 4 329
  • 正文 年R本政府宣布石景,位于F島的核電站,受9級(jí)特大地震影響拙吉,放射性物質(zhì)發(fā)生泄漏潮孽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評(píng)論 3 312
  • 文/蒙蒙 一往史、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧佛舱,春花似錦椎例、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至肆捕,卻和暖如春刷晋,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評(píng)論 1 265
  • 我被黑心中介騙來泰國(guó)打工眼虱, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留喻奥,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,315評(píng)論 2 360
  • 正文 我出身青樓捏悬,卻偏偏與公主長(zhǎng)得像撞蚕,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子邮破,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • kafka的定義:是一個(gè)分布式消息系統(tǒng)诈豌,由LinkedIn使用Scala編寫,用作LinkedIn的活動(dòng)流(Act...
    時(shí)待吾閱讀 5,309評(píng)論 1 15
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理抒和,服務(wù)發(fā)現(xiàn)矫渔,斷路器,智...
    卡卡羅2017閱讀 134,629評(píng)論 18 139
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,713評(píng)論 13 425
  • kafka數(shù)據(jù)可靠性深度解讀 Kafka起初是由LinkedIn公司開發(fā)的一個(gè)分布式的消息系統(tǒng)摧莽,后成為Apache...
    it_zzy閱讀 2,002評(píng)論 2 20
  • 熬夜看球系列之三 看球喝酒吃肉而已庙洼,做法依然賊簡(jiǎn)單,菜名也許依然污镊辕。NO MORE BULLSHIT TODAY ...
    非狒狒肥閱讀 487評(píng)論 2 2