有如下程序总寒,SparkStreaming 讀取 Kafka 中的數(shù)據(jù),經(jīng)過(guò)處理后理肺,把數(shù)據(jù)寫入到 Hbase 中
/**
* Author: Jed
* Description: SparkStreaming 讀取 Kafka 中的數(shù)據(jù)摄闸,實(shí)時(shí)寫入 HBase中
* Create: 2018-05-04 14:50
*/
object HBaseTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val kafkaParams = Map[String, AnyRef](
"bootstrap.servers" -> "172.16.26.6:9092,172.16.26.10:9092,172.16.26.13:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest"
"group.id" -> s"GROUP${new Random().nextInt(1000)}"
)
val topics = Array("baihe")
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val values: DStream[Array[String]] = stream.map(_.value.split("\\|"))
values.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
val connection = HBaseUtil.getConnection
val tableName = TableName.valueOf("test")
val table = connection.getTable(tableName)
val puts = new ArrayList[Put]
try {
partition.foreach(arr => {
val put = new Put(CustomerFunction.genRowkey(arr(0)))
val index = Array[Int](0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val enNames = Array[String]("touched", "user_number", "start_time", "end_time", "channel_id", "binding_flag", "click_path", "result", "interf_name", "channel_category", "by_operator")
var value = ""
for (i <- 0 until index.length) {
value += arr(i) + "|"
}
value = value.dropRight(1)
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes(value))
puts.add(put)
// 這里為了提高性能,每一萬(wàn)條入一次HBase庫(kù)
if (puts.size % 10000 == 0) {
table.put(puts)
puts.clear()
}
})
} catch {
case e: Exception => e.printStackTrace
} finally {
table.put(puts)
table.close
connection.close
}
})
})
ssc.start
ssc.awaitTermination
}
}
object HBaseUtil {
var conf: Configuration = null
var connection: Connection = null
def getConnection(): Connection = {
if (conf == null) {
conf.set("hbase.zookeeper.quorum", "172.16.26.6:2181,172.16.26.10:2181,172.16.26.13:2181")
}
if ((connection == null || connection.isClosed()) && conf != null) {
try {
connection = ConnectionFactory.createConnection(conf)
} catch {
case e: Exception => e.printStackTrace()
}
}
return connection;
}
def colse() = {
if (connection != null) {
try {
connection.close();
} catch {
case e: Exception => e.printStackTrace()
}
}
}
}
執(zhí)行以上程序妹萨,中途會(huì)報(bào)錯(cuò):
2018-05-29 16:21:40 883 [ERROR] org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:432) Failed to get region location
org.apache.hadoop.hbase.DoNotRetryIOException: hconnection-0x6432ad81 closed
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1174)
at org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:422)
at org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:371)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:245)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.flush(BufferedMutatorImpl.java:197)
at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1461)
at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1029)
重點(diǎn)是:hconnection-0x6432ad81 closed
問(wèn)題出在獲得連接的工具類中年枕,在 DStream 中的每個(gè) partition 中獲得中一個(gè) HBase 的連接,為了提高"效率"眠副,讓每個(gè) partition 共用了一個(gè) connection画切,但就是這樣,才導(dǎo)致了問(wèn)題的出現(xiàn)囱怕,假設(shè) A partition 中有 10000 條數(shù)據(jù)霍弹,B partition 中有 20000 條數(shù)據(jù),兩個(gè) partition 共用一個(gè) connection娃弓,A典格、B兩個(gè) partition 并行的往 HBase 中寫數(shù)據(jù),當(dāng) A partition 寫完10000條數(shù)據(jù)后台丛,關(guān)閉了 connection耍缴,假設(shè)此時(shí) B partition 也已經(jīng)寫入了10000條數(shù)據(jù),但它還有 10000 條數(shù)據(jù)要寫挽霉,連接卻關(guān)閉了防嗡,程序會(huì)報(bào)以上的錯(cuò)誤,數(shù)據(jù)會(huì)丟失 10000 條
解決辦法就是讓每個(gè) partition 獲得獨(dú)立的 connection侠坎,只需要把 HBaseUtil 類修改如下即可:
object HBaseUtil {
val conf: Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "192.168.42.101:2181,192.168.42.102:2181,192.168.42.101:2181")
def getConnection(): Connection = {
return ConnectionFactory.createConnection(conf)
}
}