Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器痢站,而是創(chuàng)建輸入流直接從Kafka 集群節(jié)點(diǎn)拉取消息绿渣。輸入流保證每個(gè)消息從Kafka 集群拉取以后只完全轉(zhuǎn)換一次撩轰,保證語(yǔ)義一致性膀哲。但是當(dāng)作業(yè)發(fā)生故障或重啟時(shí)讳侨,要保障從當(dāng)前的消費(fèi)位點(diǎn)去處理數(shù)據(jù)(即Exactly Once語(yǔ)義)行冰,單純的依靠SparkStreaming本身的機(jī)制是不太理想的溺蕉,生產(chǎn)環(huán)境中通常借助手動(dòng)管理offset的方式來(lái)維護(hù)kafka的消費(fèi)位點(diǎn)。本文分享將介紹如何手動(dòng)管理Kafka的Offset悼做,希望對(duì)你有所幫助疯特。本文主要包括以下內(nèi)容:
- 如何使用MySQL管理Kafka的Offset
- 如何使用Redis管理Kafka的OffSet
如何使用MySQL管理Kafka的Offset
我們可以從Spark Streaming 應(yīng)用程序中編寫代碼來(lái)手動(dòng)管理Kafka偏移量,偏移量可以從每一批流處理中生成的RDDS偏移量來(lái)獲取肛走,獲取方式為:
KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
// 獲取偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
...
}
當(dāng)獲取到偏移量之后漓雅,可以將將其保存到外部存儲(chǔ)設(shè)備中(MySQL、Redis朽色、Zookeeper邻吞、HBase等)。
使用案例代碼
- MySQL中用于保存偏移量的表
CREATE TABLE `topic_par_group_offset` (
`topic` varchar(255) NOT NULL,
`partition` int(11) NOT NULL,
`groupid` varchar(255) NOT NULL,
`offset` bigint(20) DEFAULT NULL,
PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
- 常量配置類:ConfigConstants
object ConfigConstants {
// Kafka配置
val kafkaBrokers = "kms-2:9092,kms-3:9092,kms-4:9092"
val groupId = "group_test"
val kafkaTopics = "test"
val batchInterval = Seconds(5)
val streamingStorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
val kafkaKeySer = "org.apache.kafka.common.serialization.StringSerializer"
val kafkaValueSer = "org.apache.kafka.common.serialization.StringSerializer"
val sparkSerializer = "org.apache.spark.serializer.KryoSerializer"
val batchSize = 16384
val lingerMs = 1
val bufferMemory = 33554432
// MySQL配置
val user = "root"
val password = "123qwe"
val url = "jdbc:mysql://localhost:3306/kafka_offset"
val driver = "com.mysql.jdbc.Driver"
// 檢查點(diǎn)配置
val checkpointDir = "file:///e:/checkpoint"
val checkpointInterval = Seconds(10)
// Redis配置
val redisAddress = "192.168.10.203"
val redisPort = 6379
val redisAuth = "123qwe"
val redisTimeout = 3000
}
- JDBC連接工具類:JDBCConnPool
object JDBCConnPool {
val log: Logger = Logger.getLogger(JDBCConnPool.getClass)
var dataSource: BasicDataSource = null
/**
* 創(chuàng)建數(shù)據(jù)源
*
* @return
*/
def getDataSource(): BasicDataSource = {
if (dataSource == null) {
dataSource = new BasicDataSource()
dataSource.setDriverClassName(ConfigConstants.driver)
dataSource.setUrl(ConfigConstants.url)
dataSource.setUsername(ConfigConstants.user)
dataSource.setPassword(ConfigConstants.password)
dataSource.setMaxTotal(50)
dataSource.setInitialSize(3)
dataSource.setMinIdle(3)
dataSource.setMaxIdle(10)
dataSource.setMaxWaitMillis(2 * 10000)
dataSource.setRemoveAbandonedTimeout(180)
dataSource.setRemoveAbandonedOnBorrow(true)
dataSource.setRemoveAbandonedOnMaintenance(true)
dataSource.setTestOnReturn(true)
dataSource.setTestOnBorrow(true)
}
return dataSource
}
/**
* 釋放數(shù)據(jù)源
*/
def closeDataSource() = {
if (dataSource != null) {
dataSource.close()
}
}
/**
* 獲取數(shù)據(jù)庫(kù)連接
*
* @return
*/
def getConnection(): Connection = {
var conn: Connection = null
try {
if (dataSource != null) {
conn = dataSource.getConnection()
} else {
conn = getDataSource().getConnection()
}
} catch {
case e: Exception =>
log.error(e.getMessage(), e)
}
conn
}
/**
* 關(guān)閉連接
*/
def closeConnection (ps:PreparedStatement , conn:Connection ) {
if (ps != null) {
try {
ps.close();
} catch {
case e:Exception =>
log.error("預(yù)編譯SQL語(yǔ)句對(duì)象PreparedStatement關(guān)閉異常葫男!" + e.getMessage(), e);
}
}
if (conn != null) {
try {
conn.close();
} catch {
case e:Exception =>
log.error("關(guān)閉連接對(duì)象Connection異常抱冷!" + e.getMessage(), e);
}
}
}
}
- Kafka生產(chǎn)者:KafkaProducerTest
object KafkaProducerTest {
def main(args: Array[String]): Unit = {
val props : Properties = new Properties()
props.put("bootstrap.servers", ConfigConstants.kafkaBrokers)
props.put("batch.size", ConfigConstants.batchSize.asInstanceOf[Integer])
props.put("linger.ms", ConfigConstants.lingerMs.asInstanceOf[Integer])
props.put("buffer.memory", ConfigConstants.bufferMemory.asInstanceOf[Integer])
props.put("key.serializer",ConfigConstants.kafkaKeySer)
props.put("value.serializer", ConfigConstants.kafkaValueSer)
val producer : Producer[String, String] = new KafkaProducer[String, String](props)
val startTime : Long = System.currentTimeMillis()
for ( i <- 1 to 100) {
producer.send(new ProducerRecord[String, String](ConfigConstants.kafkaTopics, "Spark", Integer.toString(i)))
}
println("消耗時(shí)間:" + (System.currentTimeMillis() - startTime))
producer.close()
}
}
- 讀取和保存Offset:
該對(duì)象的作用是從外部設(shè)備中讀取和寫入Offset,包括MySQL和Redis
object OffsetReadAndSave {
/**
* 從MySQL中獲取偏移量
*
* @param groupid
* @param topic
* @return
*/
def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition, Long] = {
val conn = JDBCConnPool.getConnection()
val selectSql = "select * from topic_par_group_offset where groupid = ? and topic = ?"
val ppst = conn.prepareStatement(selectSql)
ppst.setString(1, groupid)
ppst.setString(2, topic)
val result: ResultSet = ppst.executeQuery()
// 主題分區(qū)偏移量
val topicPartitionOffset = mutable.Map[TopicPartition, Long]()
while (result.next()) {
val topicPartition: TopicPartition = new TopicPartition(result.getString("topic"), result.getInt("partition"))
topicPartitionOffset += (topicPartition -> result.getLong("offset"))
}
JDBCConnPool.closeConnection(ppst, conn)
topicPartitionOffset
}
/**
* 從Redis中獲取偏移量
*
* @param groupid
* @param topic
* @return
*/
def getOffsetFromRedis(groupid: String, topic: String): Map[TopicPartition, Long] = {
val jedis: Jedis = JedisConnPool.getConnection()
var offsets = mutable.Map[TopicPartition, Long]()
val key = s"${topic}_${groupid}"
val fields : java.util.Map[String, String] = jedis.hgetAll(key)
for (partition <- JavaConversions.mapAsScalaMap(fields)) {
offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong)
}
offsets.toMap
}
/**
* 將偏移量寫入MySQL
*
* @param groupid 消費(fèi)者組ID
* @param offsetRange 消息偏移量范圍
*/
def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {
val conn = JDBCConnPool.getConnection()
val insertSql = "replace into topic_par_group_offset(`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)"
val ppst = conn.prepareStatement(insertSql)
for (offset <- offsetRange) {
ppst.setString(1, offset.topic)
ppst.setInt(2, offset.partition)
ppst.setString(3, groupid)
ppst.setLong(4, offset.untilOffset)
ppst.executeUpdate()
}
JDBCConnPool.closeConnection(ppst, conn)
}
/**
* 將偏移量保存到Redis中
* @param groupid
* @param offsetRange
*/
def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) = {
val jedis :Jedis = JedisConnPool.getConnection()
for(offsetRange<-offsetRange){
val topic=offsetRange.topic
val partition=offsetRange.partition
val offset=offsetRange.untilOffset
// key為topic_groupid,field為partition梢褐,value為offset
jedis.hset(s"${topic}_${groupid}",partition.toString,offset.toString)
}
}
}
- 業(yè)務(wù)處理類
該對(duì)象是業(yè)務(wù)處理邏輯旺遮,主要是消費(fèi)Kafka數(shù)據(jù),再處理之后進(jìn)行手動(dòng)將偏移量保存到MySQL中盈咳。在啟動(dòng)程序時(shí)耿眉,會(huì)判斷外部存儲(chǔ)設(shè)備中是否存在偏移量,如果是首次啟動(dòng)則從最初的消費(fèi)位點(diǎn)消費(fèi)鱼响,如果存在Offset鸣剪,則從當(dāng)前的Offset去消費(fèi)。
觀察現(xiàn)象:當(dāng)首次啟動(dòng)時(shí)會(huì)從頭消費(fèi)數(shù)據(jù),手動(dòng)停止程序西傀,然后再次啟動(dòng)斤寇,會(huì)發(fā)現(xiàn)會(huì)從當(dāng)前提交的偏移量消費(fèi)數(shù)據(jù)。
object ManualCommitOffset {
def main(args: Array[String]): Unit = {
val brokers = ConfigConstants.kafkaBrokers
val groupId = ConfigConstants.groupId
val topics = ConfigConstants.kafkaTopics
val batchInterval = ConfigConstants.batchInterval
val conf = new SparkConf()
.setAppName(ManualCommitOffset.getClass.getSimpleName)
.setMaster("local[1]")
.set("spark.serializer",ConfigConstants.sparkSerializer)
val ssc = new StreamingContext(conf, batchInterval)
// 必須開啟checkpoint,否則會(huì)報(bào)錯(cuò)
ssc.checkpoint(ConfigConstants.checkpointDir)
ssc.sparkContext.setLogLevel("OFF")
//使用broker和topic創(chuàng)建direct kafka stream
val topicSet = topics.split(" ").toSet
// kafka連接參數(shù)
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
// 從MySQL中讀取該主題對(duì)應(yīng)的消費(fèi)者組的分區(qū)偏移量
val offsetMap = OffsetReadAndSave.getOffsetMap(groupId, topics)
var inputDStream: InputDStream[ConsumerRecord[String, String]] = null
//如果MySQL中已經(jīng)存在了偏移量,則應(yīng)該從該偏移量處開始消費(fèi)
if (offsetMap.size > 0) {
println("存在偏移量拥褂,從該偏移量處進(jìn)行消費(fèi)D锼!")
inputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))
} else {
//如果MySQL中沒有存在了偏移量饺鹃,從最早開始消費(fèi)
inputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
}
// checkpoint時(shí)間間隔莫秆,必須是batchInterval的整數(shù)倍
inputDStream.checkpoint(ConfigConstants.checkpointInterval)
// 保存batch的offset
var offsetRanges = Array[OffsetRange]()
// 獲取當(dāng)前DS的消息偏移量
val transformDS = inputDStream.transform { rdd =>
// 獲取offset
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
/**
* 狀態(tài)更新函數(shù)
* @param newValues:新的value值
* @param stateValue:狀態(tài)值
* @return
*/
def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {
var oldvalue = stateValue.getOrElse(0) // 獲取狀態(tài)值
// 遍歷當(dāng)前數(shù)據(jù),并更新狀態(tài)
for (newValue <- newValues) {
oldvalue += newValue
}
// 返回最新的狀態(tài)
Option(oldvalue)
}
// 業(yè)務(wù)邏輯處理
// 該示例統(tǒng)計(jì)消息key的個(gè)數(shù)悔详,用于查看是否是從已經(jīng)提交的偏移量消費(fèi)數(shù)據(jù)
transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()
// 打印偏移量和數(shù)據(jù)信息镊屎,觀察輸出的結(jié)果
transformDS.foreachRDD { (rdd, time) =>
// 遍歷打印該RDD數(shù)據(jù)
rdd.foreach { record =>
println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
}
// 打印消費(fèi)偏移量信息
for (o <- offsetRanges) {
println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")
}
//將偏移量保存到到MySQL中
OffsetReadAndSave.saveOffsetRanges(groupId, offsetRanges)
}
ssc.start()
ssc.awaitTermination()
}
}
如何使用Redis管理Kafka的OffSet
- Redis連接類
object JedisConnPool {
val config = new JedisPoolConfig
//最大連接數(shù)
config.setMaxTotal(60)
//最大空閑連接數(shù)
config.setMaxIdle(10)
config.setTestOnBorrow(true)
//服務(wù)器ip
val redisAddress :String = ConfigConstants.redisAddress.toString
// 端口號(hào)
val redisPort:Int = ConfigConstants.redisPort.toInt
//訪問密碼
val redisAuth :String = ConfigConstants.redisAuth.toString
//等待可用連接的最大時(shí)間
val redisTimeout:Int = ConfigConstants.redisTimeout.toInt
val pool = new JedisPool(config,redisAddress,redisPort,redisTimeout,redisAuth)
def getConnection():Jedis = {
pool.getResource
}
}
- 業(yè)務(wù)邏輯處理
該對(duì)象與上面的基本類似,只不過(guò)使用的是Redis來(lái)進(jìn)行存儲(chǔ)Offset茄螃,存儲(chǔ)到Redis的數(shù)據(jù)類型是Hash缝驳,基本格式為:[key field value] -> [ topic_groupid partition offset],即 key為topic_groupid,field為partition归苍,value為offset用狱。
object ManualCommitOffsetToRedis {
def main(args: Array[String]): Unit = {
val brokers = ConfigConstants.kafkaBrokers
val groupId = ConfigConstants.groupId
val topics = ConfigConstants.kafkaTopics
val batchInterval = ConfigConstants.batchInterval
val conf = new SparkConf()
.setAppName(ManualCommitOffset.getClass.getSimpleName)
.setMaster("local[1]")
.set("spark.serializer", ConfigConstants.sparkSerializer)
val ssc = new StreamingContext(conf, batchInterval)
// 必須開啟checkpoint,否則會(huì)報(bào)錯(cuò)
ssc.checkpoint(ConfigConstants.checkpointDir)
ssc.sparkContext.setLogLevel("OFF")
//使用broker和topic創(chuàng)建direct kafka stream
val topicSet = topics.split(" ").toSet
// kafka連接參數(shù)
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
// 從Redis中讀取該主題對(duì)應(yīng)的消費(fèi)者組的分區(qū)偏移量
val offsetMap = OffsetReadAndSave.getOffsetFromRedis(groupId, topics)
var inputDStream: InputDStream[ConsumerRecord[String, String]] = null
//如果Redis中已經(jīng)存在了偏移量,則應(yīng)該從該偏移量處開始消費(fèi)
if (offsetMap.size > 0) {
println("存在偏移量,從該偏移量處進(jìn)行消費(fèi)F雌夏伊!")
inputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))
} else {
//如果Redis中沒有存在了偏移量,從最早開始消費(fèi)
inputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
}
// checkpoint時(shí)間間隔吻氧,必須是batchInterval的整數(shù)倍
inputDStream.checkpoint(ConfigConstants.checkpointInterval)
// 保存batch的offset
var offsetRanges = Array[OffsetRange]()
// 獲取當(dāng)前DS的消息偏移量
val transformDS = inputDStream.transform { rdd =>
// 獲取offset
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
/**
* 狀態(tài)更新函數(shù)
*
* @param newValues :新的value值
* @param stateValue :狀態(tài)值
* @return
*/
def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {
var oldvalue = stateValue.getOrElse(0) // 獲取狀態(tài)值
// 遍歷當(dāng)前數(shù)據(jù)溺忧,并更新狀態(tài)
for (newValue <- newValues) {
oldvalue += newValue
}
// 返回最新的狀態(tài)
Option(oldvalue)
}
// 業(yè)務(wù)邏輯處理
// 該示例統(tǒng)計(jì)消息key的個(gè)數(shù),用于查看是否是從已經(jīng)提交的偏移量消費(fèi)數(shù)據(jù)
transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()
// 打印偏移量和數(shù)據(jù)信息盯孙,觀察輸出的結(jié)果
transformDS.foreachRDD { (rdd, time) =>
// 遍歷打印該RDD數(shù)據(jù)
rdd.foreach { record =>
println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
}
// 打印消費(fèi)偏移量信息
for (o <- offsetRanges) {
println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")
}
//將偏移量保存到到Redis中
OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges)
}
ssc.start()
ssc.awaitTermination()
}
}
總結(jié)
本文介紹了如何使用外部存儲(chǔ)設(shè)備來(lái)保存Kafka的消費(fèi)位點(diǎn)鲁森,通過(guò)詳細(xì)的代碼示例說(shuō)明了使用MySQL和Redis管理消費(fèi)位點(diǎn)的方式。當(dāng)然振惰,外部存儲(chǔ)設(shè)備很多刀森,用戶也可以使用其他的存儲(chǔ)設(shè)備進(jìn)行管理Offset,比如Zookeeper和HBase等报账,其基本處理思路都十分相似研底。
大數(shù)據(jù)技術(shù)與數(shù)倉(cāng)