這篇文章是給Spark初學者寫的怔毛,老手就不要看了契耿。文章談及如何和HBase/Redis/MySQL/Kafka等進行交互的方法靡羡,主要是為了讓大家明白其內(nèi)部機制
一些概念
一個partition 對應一個task,一個task 必定存在于一個Executor,一個Executor 對應一個JVM.
- Partition 是一個可迭代數(shù)據(jù)集合
- Task 本質(zhì)是作用于Partition的線程
問題
Task 里如何使用Kafka Producer 將數(shù)據(jù)發(fā)送到Kafaka呢系洛。 其他譬如HBase/Redis/MySQL 也是如此俊性。
解決方案
直觀的解決方案自然是能夠在Executor(JVM)里有個Prodcuer Pool(或者共享單個Producer實例),但是我們的代碼都是
現(xiàn)在Driver端執(zhí)行描扯,然后將一些函數(shù)序列化到Executor端執(zhí)行定页,這里就有序列化問題,正常如Pool,Connection都是無法序列化的绽诚。
一個簡單的解決辦法是定義個Object 類典徊,
譬如
object SimpleHBaseClient {
private val DEFAULT_ZOOKEEPER_QUORUM = "127.0.0.1:2181"
private lazy val (table, conn) = createConnection
def bulk(items:Iterator) = {
items.foreach(conn.put(_))
conn.flush....
}
......
}
然后保證這個類在map,foreachRDD等函數(shù)下使用,譬如:
dstream.foreachRDD{ rdd =>
rdd.foreachPartition{iter=>
SimpleHBaseClient.bulk(iter)
}
}
為什么要保證放到foreachRDD /map 等這些函數(shù)里呢恩够?
Spark的機制是先將用戶的程序作為一個單機運行(運行者是Driver)卒落,Driver通過序列化機制,將對應算子規(guī)定的函數(shù)發(fā)送到Executor進行執(zhí)行蜂桶。這里儡毕,foreachRDD/map 等函數(shù)都是會發(fā)送到Executor執(zhí)行的,Driver端并不會執(zhí)行扑媚。里面引用的object 類 會作為一個stub 被序列化過去腰湾,object內(nèi)部屬性的的初始化其實是在Executor端完成的,所以可以避過序列化的問題疆股。
Pool也是類似的做法费坊。然而我們并不建議使用pool,因為Spark 本身已經(jīng)是分布式的,舉個例子可能有100個executor,如果每個executor再搞10個connection
的pool,則會有100*10 個鏈接旬痹,Kafka也受不了附井。一個Executor 維持一個connection就好。
關(guān)于Executor掛掉丟數(shù)據(jù)的問題唱凯,其實就看你什么時候flush,這是一個性能的權(quán)衡羡忘。