Spark Streaming在企業(yè)級使用中搅轿,一般會使用no receiver的方式讀取數(shù)據(jù)痰憎,對應(yīng)kafka中的Direct方式,采用no receiver的方式可以提高數(shù)據(jù)讀取效率并保證事務(wù)的一致性陨享,看看在Spark Streaming中是怎樣使用kafka的Direct方式
- 首先展示一個Demo,代碼如下
object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(brokers, topics) = args
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
- 這里使用了KafkaUtils.createDirectStream方法來創(chuàng)建了一個DirectKafkaInputDStream,createDirectStream的代碼如下
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, fromOffsets, messageHandler)
}
- 首先實例化了一個KafkaCluster急膀,KafkaCluster里面封裝了Spark Streaming操作kafka Api的所有方法。重點來了辆飘,如果我們在其他項目中需要讀取kafka的數(shù)據(jù)啦辐,KafkaCluster就是最好的參考文檔了谓传。接著看getFromOffsets(kc, kafkaParams, topics)方法,返回的fromOffsets 變量類型為 Map[TopicAndPartition, Long]芹关,TopicAndPartition是kafka里的對象续挟,代表了topic和他的第幾個分區(qū),Long的值就是offset(數(shù)據(jù)偏移量)侥衬,kafka是一個分布式消息隊列诗祸,每一個topic可以有多個partition,多個partition可以提高kafka的吞吐量轴总。
- 接著看DirectKafkaInputDStream內(nèi)部代碼直颅,compute()方法代碼如下
//這里沒有使用receiver,而是直接包裝了KafkaRDD進(jìn)行數(shù)據(jù)讀取
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
// 獲取每個topic中每個partition的offset結(jié)束偏移量肘习,對應(yīng)currentOffsets际乘。代表了每個partition數(shù)據(jù)的開始和結(jié)束位置
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
// KafkaRDD用來從kafka中讀取數(shù)據(jù),類似hadoopRDD從hdfs中讀取數(shù)據(jù)漂佩,如果我們讀取其他數(shù)據(jù)來源脖含,也可以自定義RDD
val rdd = KafkaRDD[K, V, U, T, R](context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
// Report the record number and metadata of this batch interval to InputInfoTracker.
// offsetRanges 表示每次讀取partition數(shù)據(jù)的范圍
val offsetRanges = currentOffsets.map { case (tp, fo) =>
val uo = untilOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo.offset)
}
val description = offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map("offsets" -> offsetRanges.toList,StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo = StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// 更新currentOffsets,這次的結(jié)束位置就下次的開始位置
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
}
主要是獲取untilOffsets 投蝉,把currentOffsets和untilOffsets 傳遞到KafkaRDD中养葵,這樣KafkaRDD就知道讀取哪些數(shù)據(jù)了。
- 看KafkaRDD中都有哪些內(nèi)容瘩缆,首先使用KafkaRDD的伴生對象去創(chuàng)建Kafka的實例关拒,看一下Kafka的apply方法,代碼如下
def apply[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag,
R: ClassTag](
sc: SparkContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
untilOffsets: Map[TopicAndPartition, LeaderOffset],
messageHandler: MessageAndMetadata[K, V] => R
): KafkaRDD[K, V, U, T, R] = {
val leaders = untilOffsets.map { case (tp, lo) =>
tp -> (lo.host, lo.port)
}.toMap
// offsetRanges代表了數(shù)據(jù)偏移量的開始和結(jié)束位置
val offsetRanges = fromOffsets.map { case (tp, fo) =>
val uo = untilOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo.offset)
}.toArray
new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler)
}
主要就是生成了一個offsetRanges庸娱,和leaders(數(shù)據(jù)所在的ip地址和端口)着绊,messageHandler也是一個很有意思的函數(shù),一會再介紹
- 看KafkaRDD的代碼熟尉,自定義一個RDD归露,最關(guān)鍵的三個方法分別是getPartitions、getPreferredLocations斤儿、compute剧包。
(1)首先看getPartitions,代碼如下
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}
每一個topic的每一個partition對應(yīng)一個Spark中的partition往果,從這里我們可以評估一個kafka數(shù)據(jù)源(topic)需要多少個core資源來計算
(2)再看getPreferredLocations方法疆液,代碼如下
override def getPreferredLocations(thePart: Partition): Seq[String] = {
val part = thePart.asInstanceOf[KafkaRDDPartition]
// TODO is additional hostname resolution necessary here
Seq(part.host)
}
getPreferredLocations決定了數(shù)據(jù)本地性,如果kafka中broker和Spark在同一個集群中陕贮,此時getPreferredLocations獲取本地性就可以極大提高效率堕油,因為沒有了數(shù)據(jù)網(wǎng)絡(luò)傳輸?shù)某杀?br> (3)最后看compute方法,代碼如下
override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset == part.untilOffset) {
log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
} else {
new KafkaRDDIterator(part, context)
}
}
主要功能封裝在KafkaRDDIterator中,代碼如下
private class KafkaRDDIterator(part: KafkaRDDPartition,context: TaskContext) extends NextIterator[R] {
context.addTaskCompletionListener{ context => closeIfNeeded() }
log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
s"offsets ${part.fromOffset} -> ${part.untilOffset}")
val kc = new KafkaCluster(kafkaParams)
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[V]]
val consumer = connectLeader
var requestOffset = part.fromOffset
var iter: Iterator[MessageAndOffset] = null
// The idea is to use the provided preferred host, except on task retry attempts,
// to minimize number of kafka metadata requests
private def connectLeader: SimpleConsumer = {
if (context.attemptNumber > 0) {
kc.connectLeader(part.topic, part.partition).fold(
errs => throw new SparkException(s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
errs.mkString("\n")),
consumer => consumer
)
} else {
kc.connect(part.host, part.port)
}
}
private def handleFetchErr(resp: FetchResponse) {
if (resp.hasError) {
val err = resp.errorCode(part.topic, part.partition)
if (err == ErrorMapping.LeaderNotAvailableCode ||
err == ErrorMapping.NotLeaderForPartitionCode) {
log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
Thread.sleep(kc.config.refreshLeaderBackoffMs)
}
// Let normal rdd retry sort out reconnect attempts
throw ErrorMapping.exceptionFor(err)
}
}
// 3馍迄、獲取一批數(shù)據(jù)
private def fetchBatch: Iterator[MessageAndOffset] = {
// 4福也、包裝一個request請求對象
val req = new FetchRequestBuilder()
.addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
.build()
// 5、使用SimpleConsumer對象發(fā)送fetch請求攀圈,返回response
val resp = consumer.fetch(req)
handleFetchErr(resp)
// kafka may return a batch that starts before the requested offset
// 5、從返回的response中得到一個數(shù)據(jù)的迭代器
resp.messageSet(part.topic, part.partition)
.iterator
.dropWhile(_.offset < requestOffset)
}
override def close(): Unit = {
if (consumer != null) {
consumer.close()
}
}
// 1峦甩、從讀取數(shù)據(jù)開始看
override def getNext(): R = {
if (iter == null || !iter.hasNext) {
// 2赘来、獲取一些數(shù)據(jù),數(shù)據(jù)包裝在迭代器中
iter = fetchBatch
}
if (!iter.hasNext) {
assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
finished = true
null.asInstanceOf[R]
} else {
// 6凯傲、不斷讀取數(shù)據(jù)
val item = iter.next()
if (item.offset >= part.untilOffset) {
assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
finished = true
null.asInstanceOf[R]
} else {
requestOffset = item.nextOffset
// 7犬辰、將數(shù)據(jù)封裝到MessageAndMetadata類中
messageHandler(new MessageAndMetadata(part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
}
}
}
}
跟著KafkaRDDIterator里的步驟來看代碼
(4)最后返回的消息被封裝到了MessageAndMetadata中,那么messageHandler是個什么東東冰单,我們順著代碼向上找幌缝,是在KafkaUtils的createDirectStream方法有定義的,代碼如下
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
原來是取消息中的key和message诫欠,如果我們想取出其他參數(shù)可以自定義messageHandler函數(shù)涵卵,是不是有點意思。