背景:
項目需要讀取Hbase并把計算結(jié)果保存在Hbase里供其他接口獲取。
算法由pyspark實現(xiàn)言秸。
原先Hbase的Thrift接口三天兩頭宕筋搏,而且性能低下。
充滿糟點的background結(jié)束
結(jié)論:
先說結(jié)論驶社,想節(jié)約時間的可以跳過后面的"充滿糟點過程"部分。
前提:
1. 你要有個可以通repo的spark集群养晋,或通過某種手段可以通repo(自建內(nèi)網(wǎng) http://repo.hortonworks.com/content/groups/public/ 的clone衬吆,或proxy)
2. spark-shell/spark-submit部分:
spark-submit \ --conf "spark.driver.extraJavaOptions=-Dhttp.proxyHost=<代理服務器IP> \ -Dhttp.proxyPort=<代理服務器端口> -Dhttps.proxyHost=<代理服務器IP> \ -Dhttps.proxyPort=<代理服務器端口>"\ --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 \ --repositories http://repo.hortonworks.com/content/groups/public/ \ --files files:////hbase-site.xml script.py arg1, arg2
相關(guān)的lib依賴會自動下載, 如果實在沒有手段通外網(wǎng), 可以考慮先用此命令在通外網(wǎng)的機器上產(chǎn)生ivy2的cache,然后復制到master服務器上(client只需要master有相關(guān)lib即可)。
配置及注意點:
此方法未在Spark Yarn/Yarn Client 模式下測試绳泉,有童鞋搞定了且愿意分享的話可以補充逊抡。
代理只能HTTP或HTTPS協(xié)議(自己用polipo轉(zhuǎn)一個就可以,不贅述)。
Local及Standalone模式經(jīng)測試無問題零酪。
由于用的是shc-core冒嫡,因此保險起見推薦將hbase-site.xml復制進$SPARK_HOME/conf, 之前直接-files里提交文件,結(jié)果查庫的時候連接失敗了(emmmm)
用pyspark DataFrame操作Hbase:
和JAVA/Scala版的shc一樣四苇,先要定義catalog
catalog = ''.join("""{ "table":{"namespace":"test", "name":"test_table"}, "rowkey":"key", "columns":{ "col0":{"cf":"rowkey", "col":"key", "type":"string"}, "col1":{"cf":"result", "col":"class", "type":"string"} } }""".split())
造一個dataframe來測試寫入:
data_source_format = 'org.apache.spark.sql.execution.datasources.hbase' df = sc.parallelize([('a', '1.0'), ('b', '2.0')]).toDF(schema=['col0', 'col1']) df.show() df.write.options(catalog=catalog,newTable="5").format(data_source_format).save()
注: 推薦寫入時加上"newTable"選項孝凌,否則當表不存在時會報如下錯誤,也就是提醒你新表至少要有3個regions:
Py4JJavaError: An error occurred while calling o510.save.: org.apache.spark.sql.execution.datasources.hbase.InvalidRegionNumberException: Number of regions specified for new table must be greater than 3.
再讀取數(shù)據(jù):
df_read = spark.read.options(catalog=catalog).format(data_source_format).load() df_read.show()
注: 讀取的時候可以在load()之后加上各種select where 語句, 會自動轉(zhuǎn)換為各種不人性的scan filters月腋,并延遲加載到讀取時執(zhí)行蟀架。注意,因為spark是lazy執(zhí)行的榆骚,如果where特別復雜的話推薦先load().cache()完之后接一句dataframe.count()再進行where處理片拍,否則語句會被轉(zhuǎn)換為各種filter,并在Hbase中處理妓肢,value filter的速度有目共睹…當然Hbase集群夠強也可以無視捌省。
充滿糟點的過程:
在Thrift接口N**N次 OOM 之后,終于忍無可忍的想直接用原生spark-hbase讀寫數(shù)據(jù)碉钠。
OK纲缓,先去Hbase官網(wǎng)上找了reference,關(guān)于spark的部分只有Java和Scala有木有喊废?spark-hbase項目不知死活有木有祝高?最可氣的是reference上那個版本號(帶beta)全宇宙的repo都找不到有木有?github只有個空頁面有木有污筷?
雖然是開源項目褂策,好歹reference也稍微上點心好不好-_-||。
(未完待續(xù))