SparkSQL讀取HBase數(shù)據(jù)

這里的SparkSQL是指整合了Hive的spark-sql cli(關(guān)于SparkSQL和Hive的整合玛荞,見文章后面的參考閱讀).

本質(zhì)上就是通過Hive訪問HBase表吠各,具體就是通過hive-hbase-handler .

環(huán)境篇

hadoop-2.3.0-cdh5.0.0

apache-hive-0.13.1-bin

spark-1.4.0-bin-hadoop2.3

hbase-0.96.1.1-cdh5.0.0

部署情況如下圖:


測試集群劲妙,將Spark Worker部署在每臺DataNode上和泌,是為了最大程度的任務(wù)本地化荧呐,Spark集群為Standalone模式部署。

其中有三臺機(jī)器上也部署了RegionServer擅憔。

這個部署情況對理解后面提到的任務(wù)本地化調(diào)度有幫助鸵闪。


配置篇


1. 拷貝以下HBase的相關(guān)jar包到Spark Master和每個Spark Worker節(jié)點(diǎn)上的$SPARK_HOME/lib目錄下.

(我嘗試用–jars的方式添加之后,不work暑诸,所以采用這種土辦法)

$HBASE_HOME/lib/hbase-client-0.96.1.1-cdh5.0.0.jar

$HBASE_HOME/lib/hbase-common-0.96.1.1-cdh5.0.0.jar

$HBASE_HOME/lib/hbase-protocol-0.96.1.1-cdh5.0.0.jar

$HBASE_HOME/lib/hbase-server-0.96.1.1-cdh5.0.0.jar

$HBASE_HOME/lib/htrace-core-2.01.jar

$HBASE_HOME/lib/protobuf-java-2.5.0.jar

$HBASE_HOME/lib/guava-12.0.1.jar


$HIVE_HOME/lib/hive-hbase-handler-0.13.1.jar


2.配置每個節(jié)點(diǎn)上的$SPARK_HOME/conf/spark-env.sh,將上面的jar包添加到SPARK_CLASSPATH

export?SPARK_CLASSPATH=$SPARK_HOME/lib/hbase-client-0.96.1.1-cdh5.0.0.jar:

$SPARK_HOME/lib/hbase-common-0.96.1.1-cdh5.0.0.jar:

$SPARK_HOME/lib/hbase-protocol-0.96.1.1-cdh5.0.0.jar:

$SPARK_HOME/lib/hbase-server-0.96.1.1-cdh5.0.0.jar:

$SPARK_HOME/lib/htrace-core-2.01.jar:

$SPARK_HOME/lib/protobuf-java-2.5.0.jar:

$SPARK_HOME/lib/guava-12.0.1.jar:

$SPARK_HOME/lib/hive-hbase-handler-0.13.1.jar:

${SPARK_CLASSPATH}

3.將hbase-site.xml拷貝至${HADOOP_CONF_DIR},由于spark-env.sh中配置了Hadoop配置文件目錄${HADOOP_CONF_DIR},因此會將hbase-site.xml加載蚌讼。

hbase-site.xml中主要是以下幾個參數(shù)的配置:

hbase.zookeeper.quorum

zkNode1:2181,zkNode2:2181,zkNode3:2181

HBase使用的zookeeper節(jié)點(diǎn)

hbase.client.scanner.caching

5000

HBase客戶端掃描緩存辟灰,對查詢性能有很大幫助

另外還有一個參數(shù):zookeeper.znode.parent=/hbase

是HBase在zk中的根目錄,默認(rèn)為/hbase篡石,視實(shí)際情況進(jìn)行配置伞矩。

4.重啟Spark集群。

?大數(shù)據(jù)學(xué)習(xí)交流群:724693112 歡迎想學(xué)習(xí)大數(shù)據(jù)和需要大數(shù)據(jù)學(xué)習(xí)資料的同學(xué)來一起學(xué)習(xí)夏志。

使用篇

hbase中有表lxw1234,數(shù)據(jù)如下:

hbase(main):025:0*?scan?'lxw1234'

ROW COLUMN+CELL

lxw1234.com column=f1:c1,?timestamp=1435624625198,?value=name1

lxw1234.com column=f1:c2,?timestamp=1435624591717,?value=name2

lxw1234.com column=f2:c1,?timestamp=1435624608759,?value=age1

lxw1234.com column=f2:c2,?timestamp=1435624635261,?value=age2

lxw1234.com column=f3:c1,?timestamp=1435624662282,?value=job1

lxw1234.com column=f3:c2,?timestamp=1435624697028,?value=job2

lxw1234.com column=f3:c3,?timestamp=1435624697065,?value=job3

1?row(s)?in?0.0350?seconds

進(jìn)入spark-sql,使用如下語句建表:

CREATE EXTERNAL TABLE lxw1234?(

rowkey?string,

f1 map,

f2 map,

f3 map

)?STORED BY?'org.apache.hadoop.hive.hbase.HBaseStorageHandler'

WITH SERDEPROPERTIES?("hbase.columns.mapping"?=?":key,f1:,f2:,f3:")

TBLPROPERTIES?("hbase.table.name"?=?"lxw1234");

建好之后,就可以查詢了:

spark-sql>?select?*?from?lxw1234;

lxw1234.com?{"c1":"name1","c2":"name2"}?{"c1":"age1","c2":"age2"}?{"c1":"job1","c2":"job2","c3":"job3"}

Time?taken:?4.726?seconds,?Fetched?1?row(s)

spark-sql>?select?count(1)?from?lxw1234;

1

Time?taken:?2.46?seconds,?Fetched?1?row(s)

spark-sql>

大表查詢苛让,消耗的時間和通過Hive用MapReduce查詢差不多沟蔑。

spark-sql>?select?count(1)?from?lxw1234_hbase;

53609638

Time?taken:?335.474?seconds,?Fetched?1?row(s)

在spark-sql中通過insert插入數(shù)據(jù)到HBase表時候報錯:

INSERT INTO TABLE lxw1234

SELECT?'row1'?AS rowkey,

map('c3','name3')?AS f1,

map('c3','age3')?AS f2,

map('c4','job3')?AS f3

FROM lxw1234_a

limit?1;


org.apache.spark.SparkException:?Job?aborted due to stage failure:?Task?0?in?stage?10.0?failed?4?times,

most recent failure:?Lost?task?0.3?in?stage?10.0?(TID?23,?slave013.uniclick.cloud):

java.lang.ClassCastException:?org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat?cannot be cast to org.apache.hadoop.hive.ql.io.HiveOutputFormat

at org.apache.spark.sql.hive.SparkHiveWriterContainer.outputFormat$lzycompute(hiveWriterContainers.scala:74)

at org.apache.spark.sql.hive.SparkHiveWriterContainer.outputFormat(hiveWriterContainers.scala:73)

at org.apache.spark.sql.hive.SparkHiveWriterContainer.getOutputName(hiveWriterContainers.scala:93)

at org.apache.spark.sql.hive.SparkHiveWriterContainer.initWriters(hiveWriterContainers.scala:117)

at org.apache.spark.sql.hive.SparkHiveWriterContainer.executorSideSetup(hiveWriterContainers.scala:86)

at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:99)

at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:83)

at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:83)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)

at org.apache.spark.scheduler.Task.run(Task.scala:70)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:744)


Driver?stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)

at scala.Option.foreach(Option.scala:236)

at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

這個還有待分析。


關(guān)于Spark任務(wù)本地化運(yùn)行

先看這張圖狱杰,該圖為運(yùn)行select * from lxw1234_hbase;這張大表查詢時候的任務(wù)運(yùn)行圖瘦材。


Spark和Hadoop MapReduce一樣,在任務(wù)調(diào)度時候都會考慮數(shù)據(jù)本地化仿畸,即”任務(wù)向數(shù)據(jù)靠攏”食棕,盡量將任務(wù)分配到數(shù)據(jù)所在的節(jié)點(diǎn)上運(yùn)行。

基于這點(diǎn)错沽,lxw1234_hbase為HBase中的外部表簿晓,Spark在解析時候,通過org.apache.hadoop.hive.hbase.HBaseStorageHandler獲取到表lxw1234_hbase在HBase中的region所在的RegionServer千埃,即:slave004憔儿、slave005、slave006 (上面的部署圖中提到了放可,總共只有三臺RegionServer谒臼,就是這三臺),所以耀里,在調(diào)度任務(wù)時候蜈缤,首先考慮要往這三臺節(jié)點(diǎn)上分配任務(wù)。

表lxw1234_hbase共有10個region冯挎,因此需要10個map task來運(yùn)行底哥。

再看一張圖,這是spark-sql cli指定的Executor配置:



每臺機(jī)器上Worker的實(shí)例為2個房官,每個Worker實(shí)例中運(yùn)行的Executor為1個叠艳,因此,每臺機(jī)器上運(yùn)行兩個Executor.

那么salve004易阳、slave005附较、slave006上各運(yùn)行2個Executor交煞,總共6個兼贡,很好,Spark已經(jīng)第一時間將這6個Task交給這6個Executor去執(zhí)行了(NODE_LOCAL Tasks)。

剩下4個Task辜妓,沒辦法,想NODE_LOCAL運(yùn)行桨踪,但那三臺機(jī)器上沒有剩余的Executor了掠归,只能分配給其他Worker上的Executor,這4個Task為ANY Tasks卢鹦。

正如那張任務(wù)運(yùn)行圖中所示臀脏。


寫在后面

通過Hive和spark-sql去訪問HBase表,只是為統(tǒng)計(jì)分析提供了一定的便捷性冀自,個人覺得性能上的優(yōu)勢并不明顯揉稚。

可能Spark通過API去讀取HBase數(shù)據(jù),性能更好些吧熬粗,以后再試搀玖。

另外,spark-sql有一點(diǎn)好處驻呐,就是可以先把HBase中的數(shù)據(jù)cache到一張內(nèi)存表中灌诅,然后在這張內(nèi)存表中,

通過SQL去統(tǒng)計(jì)分析含末,那就爽多了猜拾。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市佣盒,隨后出現(xiàn)的幾起案子关带,更是在濱河造成了極大的恐慌,老刑警劉巖沼撕,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件宋雏,死亡現(xiàn)場離奇詭異,居然都是意外死亡务豺,警方通過查閱死者的電腦和手機(jī)磨总,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來笼沥,“玉大人蚪燕,你說我怎么就攤上這事”记常” “怎么了馆纳?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長汹桦。 經(jīng)常有香客問我鲁驶,道長,這世上最難降的妖魔是什么舞骆? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任钥弯,我火速辦了婚禮径荔,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘脆霎。我一直安慰自己总处,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布睛蛛。 她就那樣靜靜地躺著鹦马,像睡著了一般。 火紅的嫁衣襯著肌膚如雪忆肾。 梳的紋絲不亂的頭發(fā)上荸频,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天,我揣著相機(jī)與錄音难菌,去河邊找鬼。 笑死蔑滓,一個胖子當(dāng)著我的面吹牛郊酒,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播键袱,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼燎窘,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了蹄咖?” 一聲冷哼從身側(cè)響起褐健,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎澜汤,沒想到半個月后蚜迅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡俊抵,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年谁不,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片徽诲。...
    茶點(diǎn)故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡刹帕,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出谎替,到底是詐尸還是另有隱情偷溺,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布钱贯,位于F島的核電站挫掏,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏秩命。R本人自食惡果不足惜砍濒,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一淋肾、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧爸邢,春花似錦樊卓、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至券敌,卻和暖如春唾戚,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背待诅。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工叹坦, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人卑雁。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓募书,卻偏偏與公主長得像,于是被迫代替她去往敵國和親测蹲。 傳聞我的和親對象是個殘疾皇子莹捡,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容