Kafka Direct 跟Receiver方式的區(qū)別
Receiver
Receiver是使用Kafka的 High-Level Consumer API來實(shí)現(xiàn)的。Receiver從Kafka中獲取的數(shù)據(jù)都存儲在Spark Executor的內(nèi)存中的(
如果數(shù)據(jù)暴增,數(shù)據(jù)大量堆積,容易出現(xiàn)oom的問題
),Spark Streaming啟動的job會去處理那些數(shù)據(jù)烫映。
在默認(rèn)的配置下上陕,這種方式可能會因?yàn)榈讓拥氖《鴣G失數(shù)據(jù),如果要啟用高可靠機(jī)制,讓數(shù)據(jù)零丟失,就必須啟用Spark Streaming的預(yù)寫日志機(jī)制(Write Ahead Log,WAL),該機(jī)制會同步地將接收到的Kafka數(shù)據(jù)寫入分布式文件系統(tǒng)(比如HDFS,S3)上的預(yù)寫日志中,所以當(dāng)?shù)讓庸?jié)點(diǎn)出現(xiàn)了失敗,可以通過WAL中的數(shù)據(jù)進(jìn)行恢復(fù),但是效率會下降晌砾。
使用時(shí)注意事項(xiàng):
1.操作簡單,代碼量少,不需要手動管理offset,需要開啟wal機(jī)制,可以保證數(shù)據(jù)不丟失,但效率會減低,并且為了保證數(shù)據(jù)不丟失,將一份數(shù)據(jù)存兩份,浪費(fèi)資源
2.無法保證數(shù)據(jù)只被處理一次,在寫入外部存儲的數(shù)據(jù)還未將offset更新到zk就掛掉,這些數(shù)據(jù)會被重復(fù)消費(fèi)
3.kafka的topic的分區(qū)和spark streaming生成的rdd分區(qū)不相關(guān),增加topic的分區(qū)數(shù),只會增加reciver讀取分區(qū)數(shù)據(jù)的線程數(shù),并不會提高spark的處理數(shù)據(jù)的并行度
Direct
Direct 使用Kafka的Low-Level Consumer api讀取kafka數(shù)據(jù),來獲得每個(gè)topic+partition的最新的offset烦磁,從而定義每個(gè)batch的offset的范圍养匈。當(dāng)處理數(shù)據(jù)的job啟動時(shí),就會使用Kafka的Low-Level Consumer api來獲取Kafka指定offset范圍的數(shù)據(jù)都伪。
使用時(shí)注意事項(xiàng):
1.當(dāng)讀取topic的數(shù)據(jù)時(shí)候,會自動對應(yīng)topic的分區(qū)生成對應(yīng)的RDD分區(qū)并行從Kafka中讀取數(shù)據(jù),在Kafka partition和RDD partition之間呕乎,有一對一的映射關(guān)系。
2.不需要開啟WAL機(jī)制陨晶,只要Kafka中作了數(shù)據(jù)的備份猬仁,那么就可以使用通過Kafka的副本進(jìn)行恢復(fù)。
3.Spark內(nèi)部一定時(shí)同步的,所以可以自己跟蹤offset并保存到checkpoint中,可以保證數(shù)據(jù)不會被重復(fù)消費(fèi)
4.操作復(fù)雜,代碼量大,并且需要自己對offset監(jiān)控維護(hù),增加用戶開發(fā)成本
Receiver配合著WAL機(jī)制可以保證數(shù)據(jù)零丟失的高可靠性先誉,但是卻無法保證數(shù)據(jù)被處理一次且僅一次湿刽,可能會處理兩次。因?yàn)镾park和ZooKeeper之間可能是不同步的褐耳≌┕耄基于direct的方式,使用kafka的簡單api铃芦,SparkStreaming自己就負(fù)責(zé)追蹤消費(fèi)的offset雅镊,并保存在checkpoint中襟雷。Spark自己一定是同步的,因此可以保證數(shù)據(jù)是消費(fèi)一次且僅消費(fèi)一次仁烹。
區(qū)別
Receiver | Direct |
---|---|
需要開啟WAL |
不需要開啟WAL |
使用高層次 api |
使用簡單api |
zk自動維護(hù) |
手動維護(hù)offset |
無法保證數(shù)據(jù)被處理一次 |
數(shù)據(jù)只被處理一次 |
代碼簡單,量少 |
代碼復(fù)雜,量大 |
topic分區(qū)與rdd分區(qū)不是一對一的關(guān)系 |
topic分區(qū)與rdd分區(qū)是一對一的關(guān)系 |
由receiver拉取kafka數(shù)據(jù) |
由rdd分區(qū)拉取對應(yīng)分區(qū)的數(shù)據(jù)(kafka與rdd分區(qū)相等的情況) |
.. |
.. |
連接kafka的兩種方式 (receiver&direct) 栗子
Maven依賴
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.1.3</spark.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-streaming kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
Receiver
package xzw.shuai.kafka.demo
import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkKafkaReceiver {
private val topics = "receiver-test"
private val HDFS_PATH = "hdfs://node01:9000/kafka-ck"
private val numThreads = 1
def main(args: Array[String]): Unit = {
//當(dāng)應(yīng)用程序停止的時(shí)候,會將當(dāng)前批次的數(shù)據(jù)處理完成后在停止
System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true")
//1000*分區(qū)數(shù)*采樣時(shí)間=拉取數(shù)據(jù)量
System.setProperty("spark.streaming.kafka.maxRatePerPartition", "1000")
val conf = new SparkConf().setMaster("local[2]").setAppName("receiver")
//設(shè)置監(jiān)控級別
.set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(HDFS_PATH)
val kafkaParams = Map(
"metadata.broker.list" -> "node01:9091,node02:9092,node03:9092",
"zookeeper.connect" -> "node01:2181,node02:2181,node03:2181",
"group.id" -> "receiver"
)
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val kafkaDStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_2)
// word count
kafkaDStream
.map(_._2) // 1是分區(qū)號,2是具體kafka中數(shù)
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print(10) // 輸出結(jié)果
ssc.start()
ssc.awaitTermination()
}
}
Direct
package xzw.shuai.kafka.demo
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.zookeeper.data.Stat
object SparkKafkaDirect {
private val zkHosts = "node01:2181,node02:2181,node03:2181"
private val logger = Logger.getLogger("SparkKafkaDirect")
private val zkPath = "/kafka-direct-test"
private val topic = Set("direct-test")
private val HDFS_PATH="hdfs://node01:9000/kafka-ck"
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("receiver")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
val ssc = new StreamingContext(sc, Seconds(5))
val kafkaParams = Map(
"metadata.broker.list" -> "node01:9091,node02:9092,node03:9092",
"group.id" -> "direct"
)
val zkClient: ZkClient = new ZkClient(zkHosts)
// 讀取 offset
val offsets: Option[Map[TopicAndPartition, Long]] = readOffset(zkClient)
// 獲取到kafka數(shù)據(jù)
val kafkaDStream: InputDStream[(String, String)] = offsets match {
// 使用 direct方式消費(fèi)kafka數(shù)據(jù)
case None =>
print("start from scratch")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
case Some(offset) =>
print("start with the offset")
val messageHeader = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder, (String, String)](ssc, kafkaParams, offset, messageHeader)
}
// word count
kafkaDStream.map(_._2) // 1是分區(qū)號,2是具體kafka中數(shù)
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.foreachRDD(print(_)) // 輸出結(jié)果
// 保存偏移量到zk中 , 也可自定義到其他存儲介質(zhì)
kafkaDStream.foreachRDD(rdd =>
saveOffset(zkClient, zkHosts, zkPath, rdd)
)
ssc.start()
ssc.awaitTermination()
}
// 保存 offset
def saveOffset(zkClient: ZkClient, zkHost: String, zkPath: String, rdd: RDD[_]): Unit = {
logger.info("save offsets to Zookeeper")
val stopwatch = new Stopwatch()
val offsetsRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetsRanges.foreach(offsetRange => logger.debug(s" Using $offsetRange"))
val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}")
.mkString(",")
logger.info("writing offsets to Zookeeper zkClient=" + zkClient + " zkHosts=" + zkHosts + "zkPath=" + zkPath + " offsetsRangesStr:" + offsetsRangesStr)
updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
logger.info("done updating offsets in zookeeper. took " + stopwatch)
}
// 讀取 offset
def readOffset(zkClient: ZkClient): Option[Map[TopicAndPartition, Long]] = {
val stopwatch = new Stopwatch()
val stat = new Stat()
val dataAndStat: (Option[String], Stat) = try {
(Some(zkClient.readData(zkPath, stat)), stat)
} catch {
case _ => (None, stat)
case e2: Throwable => throw e2
}
// 獲取offset
dataAndStat._1 match {
case Some(offsetsRangeStr) =>
logger.info(s" Read offset ranges: $offsetsRangeStr")
val offset: Map[TopicAndPartition, Long] = offsetsRangeStr.split(",")
.map(str => str.split(":"))
.map {
case Array(partitions, offset) =>
TopicAndPartition(topic.last, partitions.toInt) -> offset.toLong
}.toMap
logger.info("Done reading offsets from Zookeeper. Took " + stopwatch)
Some(offset)
case None =>
logger.info(" No offsets found in Zookeeper. Took " + stopwatch)
None
}
}
// 更新 zk中的 offset
def updatePersistentPath(zkClient: ZkClient, zkPath: String, offsetsRangesStr: String): Unit = {
try {
zkClient.writeData(zkPath, offsetsRangesStr)
} catch {
// 如果失敗了 ==> 沒有此目錄,則創(chuàng)建目錄
case _: ZkNoNodeException =>
createParentPath(zkClient, zkPath)
try {
// 創(chuàng)建一個(gè)持久的節(jié)點(diǎn) ==> 即 目錄
// 在offset寫入到 該節(jié)點(diǎn)
zkClient.createPersistent(zkPath, offsetsRangesStr)
} catch {
case _: ZkNodeExistsException =>
zkClient.writeData(zkPath, offsetsRangesStr)
case e2: Throwable => throw e2
}
case e2: Throwable => throw e2
}
}
// 如果path不存在,則創(chuàng)建
def createParentPath(zkClient: ZkClient, zkPath: String): Unit = {
val parentDir = zkPath.substring(0, zkPath.lastIndexOf('/'))
if (parentDir.length != 0)
zkClient.createPersistent(parentDir, true)
}
// 過程時(shí)間
class Stopwatch {
private val start = System.currentTimeMillis()
override def toString: String = (System.currentTimeMillis() - start) + " ms"
}
}