背景介紹
本項(xiàng)目主要解決 check 和 opinion2 張歷史數(shù)據(jù)表(歷史數(shù)據(jù)是指當(dāng)業(yè)務(wù)發(fā)生過程中的完整中間流程和結(jié)果數(shù)據(jù))的在線查詢伐厌。原實(shí)現(xiàn)基于 Oracle 提供存儲(chǔ)查詢服務(wù),隨著數(shù)據(jù)量的不斷增加蝎毡,在寫入和讀取過程中面臨性能問題扰肌,且歷史數(shù)據(jù)僅供業(yè)務(wù)查詢參考抛寝,并不影響實(shí)際流程,從系統(tǒng)結(jié)構(gòu)上來說曙旭,放在業(yè)務(wù)鏈條上游比較重盗舰。本項(xiàng)目將其置于下游數(shù)據(jù)處理Hadoop分布式平臺(tái)來實(shí)現(xiàn)此需求。下面列一些具體的需求指標(biāo):
1桂躏、數(shù)據(jù)量:目前 check 表的累計(jì)數(shù)據(jù)量為 5000w+ 行钻趋,11GB;opinion 表的累計(jì)數(shù)據(jù)量為 3 億 +沼头,約 100GB爷绘。每日增量約為每張表 50 萬 + 行,只做 insert进倍,不做 update土至。
2、查詢要求:check 表的主鍵為 id(Oracle 全局 id)猾昆,查詢鍵為 check_id陶因,一個(gè) check_id 對應(yīng)多條記錄,所以需返回對應(yīng)記錄的 list垂蜗; opinion 表的主鍵也是 id楷扬,查詢鍵是 bussiness_no 和 buss_type,同理返回 list贴见。單筆查詢返回 List 大小約 50 條以下烘苹,查詢頻率為 100 筆 / 天左右,查詢響應(yīng)時(shí)間 2s片部。
技術(shù)選型
從數(shù)據(jù)量及查詢要求來看镣衡,分布式平臺(tái)上具備大數(shù)據(jù)量存儲(chǔ),且提供實(shí)時(shí)查詢能力的組件首選HBase档悠。根據(jù)需求做了初步的調(diào)研和評估后廊鸥,大致確定 HBase 作為主要存儲(chǔ)組件。將需求拆解為寫入和讀取 HBase 兩部分辖所。
讀取 HBase 相對來說方案比較確定惰说,基本根據(jù)需求設(shè)計(jì) RowKey,然后根據(jù) HBase 提供的豐富 API(get缘回,scan 等)來讀取數(shù)據(jù)吆视,滿足性能要求即可。
寫入 HBase 的方法大致有以下幾種:
1切诀、Java 調(diào)用 HBase 原生 API揩环,HTable.add(List(Put))。
2幅虑、MapReduce 作業(yè)丰滑,使用 TableOutputFormat 作為輸出。
3倒庵、Bulk Load褒墨,先將數(shù)據(jù)按照 HBase 的內(nèi)部數(shù)據(jù)格式生成持久化的 HFile 文件,然后復(fù)制到合適的位置并通知 RegionServer 擎宝,即完成海量數(shù)據(jù)的入庫郁妈。其中生成 Hfile 這一步可以選擇 MapReduce 或 Spark。
本文采用第 3 種方式绍申,Spark + Bulk Load 寫入 HBase噩咪。該方法相對其他 2 種方式有以下優(yōu)勢:
1顾彰、BulkLoad 不會(huì)寫 WAL,也不會(huì)產(chǎn)生 flush 以及 split胃碾。
2涨享、如果我們大量調(diào)用 PUT 接口插入數(shù)據(jù),可能會(huì)導(dǎo)致大量的 GC 操作仆百。除了影響性能之外厕隧,嚴(yán)重時(shí)甚至可能會(huì)對 HBase 節(jié)點(diǎn)的穩(wěn)定性造成影響,采用 BulkLoad 無此顧慮俄周。
3吁讨、過程中沒有大量的接口調(diào)用消耗性能。
4峦朗、可以利用 Spark 強(qiáng)大的計(jì)算能力建丧。
圖示如下:
設(shè)計(jì)
環(huán)境信息
Hadoop?2.5-2.7
HBase?0.98.6
Spark?2.0.0-2.1.1
Sqoop?1.4.6
表設(shè)計(jì)
本段的重點(diǎn)在于討論 HBase 表的設(shè)計(jì),其中 RowKey 是最重要的部分波势。為了方便說明問題茶鹃,我們先來看看數(shù)據(jù)格式。以下以 check 舉例艰亮,opinion 同理闭翩。
check 表(原表字段有 18 個(gè),為方便描述迄埃,本文截選 5 個(gè)字段示意)
如上圖所示疗韵,主鍵為 id,32 位字母和數(shù)字隨機(jī)組成侄非,業(yè)務(wù)查詢字段 check_id 為不定長字段(不超過 32 位)蕉汪,字母和數(shù)字組成,同一 check_id 可能對應(yīng)多條記錄逞怨,其他為相關(guān)業(yè)務(wù)字段者疤。眾所周知,HBase 是基于 RowKey 提供查詢叠赦,且要求 RowKey 是唯一的驹马。RowKey 的設(shè)計(jì)主要考慮的是數(shù)據(jù)將怎樣被訪問。初步來看除秀,我們有 2 種設(shè)計(jì)方法糯累。
1、拆成 2 張表册踩,一張表 id 作為 RowKey泳姐,列為 check 表對應(yīng)的各列;另一張表為索引表暂吉,RowKey 為 check_id胖秒,每一列對應(yīng)一個(gè) id缎患。查詢時(shí),先找到 check_id 對應(yīng)的 id list阎肝,然后根據(jù) id 找到對應(yīng)的記錄较锡。均為 HBase 的 get 操作。
2盗痒、將本需求可看成是一個(gè)范圍查詢,而不是單條查詢低散。將 check_id 作為 RowKey 的前綴俯邓,后面跟 id。查詢時(shí)設(shè)置 Scan 的 startRow 和 stopRow熔号,找到對應(yīng)的記錄 list稽鞭。
第一種方法優(yōu)點(diǎn)是表結(jié)構(gòu)簡單,RowKey 容易設(shè)計(jì)引镊,缺點(diǎn)為 1)數(shù)據(jù)寫入時(shí)朦蕴,一行原始數(shù)據(jù)需要寫入到 2 張表,且索引表寫入前需要先掃描該 RowKey 是否存在弟头,如果存在吩抓,則加入一列,否則新建一行赴恨,2)讀取的時(shí)候疹娶,即便是采用 List, 也至少需要讀取 2 次表。第二種設(shè)計(jì)方法伦连,RowKey 設(shè)計(jì)較為復(fù)雜雨饺,但是寫入和讀取都是一次性的。綜合考慮惑淳,我們采用第二種設(shè)計(jì)方法额港。
RowKey 設(shè)計(jì)
熱點(diǎn)問題
HBase 中的行是以 RowKey 的字典序排序的钙蒙,其熱點(diǎn)問題通常發(fā)生在大量的客戶端直接訪問集群的一個(gè)或極少數(shù)節(jié)點(diǎn)榜掌。默認(rèn)情況下兄旬,在開始建表時(shí)懒闷,表只會(huì)有一個(gè) region匙奴,并隨著 region 增大而拆分成更多的 region姊舵,這些 region 才能分布在多個(gè) regionserver 上從而使負(fù)載均分勤揩。對于我們的業(yè)務(wù)需求帝火,存量數(shù)據(jù)已經(jīng)較大痕貌,因此有必要在一開始就將 HBase 的負(fù)載均攤到每個(gè) regionserver风罩,即做 pre-split。常見的防治熱點(diǎn)的方法為加鹽舵稠,hash 散列超升,自增部分(如時(shí)間戳)翻轉(zhuǎn)等入宦。
RowKey 設(shè)計(jì)
Step1:確定預(yù)分區(qū)數(shù)目,創(chuàng)建 HBase Table
不同的業(yè)務(wù)場景及數(shù)據(jù)特點(diǎn)確定數(shù)目的方式不一樣室琢,我個(gè)人認(rèn)為應(yīng)該綜合考慮數(shù)據(jù)量大小和集群大小等因素乾闰。比如 check 表大小約為 11G,測試集群大小為 10 臺(tái)機(jī)器盈滴,hbase.hregion.max.filesize=3G(當(dāng) region 的大小超過這個(gè)數(shù)時(shí)涯肩,將拆分為 2 個(gè)),所以初始化時(shí)盡量使得一個(gè) region 的大小為 1~2G(不會(huì)一上來就 split)巢钓,region 數(shù)據(jù)分到 11G/2G=6 個(gè)病苗,但為了充分利用集群資源,本文中 check 表劃分為 10 個(gè)分區(qū)症汹。如果數(shù)據(jù)量為 100G硫朦,且不斷增長,集群情況不變背镇,則 region 數(shù)目增大到 100G/2G=50 個(gè)左右較合適咬展。Hbase check 表建表語句如下:
create?'tinawang:check',
{?NAME?=>?'f',?COMPRESSION?=>?'SNAPPY',DATA_BLOCK_ENCODING?=>?'FAST_DIFF',BLOOMFILTER=>'ROW'},
{SPLITS?=>?[?'1','2','3',?'4','5','6','7','8','9']}
其中,Column Family =‘f’瞒斩,越短越好破婆。
COMPRESSION => 'SNAPPY',HBase 支持 3 種壓縮 LZO, GZIP and Snappy胸囱。GZIP 壓縮率高荠割,但是耗 CPU。后兩者差不多旺矾,Snappy 稍微勝出一點(diǎn)蔑鹦,cpu 消耗的比 GZIP 少。一般在 IO 和 CPU 均衡下箕宙,選擇 Snappy嚎朽。
DATA_BLOCK_ENCODING => 'FAST_DIFF',本案例中 RowKey 較為接近柬帕,通過以下命令查看 key 長度相對 value 較長哟忍。
./hbase?org.apache.hadoop.hbase.io.hfile.HFile?-m?-f?/apps/hbase/data/data/tinawang/check/a661f0f95598662a53b3d8b1ae469fdf/f/a5fefc880f87492d908672e1634f2eed_SeqId_2_
Step2:RowKey 組成
Salt
讓數(shù)據(jù)均衡的分布到各個(gè) Region 上,結(jié)合 pre-split陷寝,我們對查詢鍵即 check 表的 check_id 求 hashcode 值锅很,然后 modulus(numRegions) 作為前綴,注意補(bǔ)齊數(shù)據(jù)凤跑。
StringUtils.leftPad(Integer.toString(Math.abs(check_id.hashCode()?%?numRegion)),1,’0’)
說明:如果數(shù)據(jù)量達(dá)上百 G 以上爆安,則 numRegions 自然到 2 位數(shù),則 salt 也為 2 位仔引。
Hash 散列
因?yàn)?check_id 本身是不定長的字符數(shù)字串扔仓,為使數(shù)據(jù)散列化褐奥,方便 RowKey 查詢和比較,我們對 check_id 采用 SHA1 散列化翘簇,并使之 32 位定長化撬码。
MD5Hash.getMD5AsHex(Bytes.toBytes(check_id))
唯一性
以上 salt+hash 作為 RowKey 前綴,加上 check 表的主鍵 id 來保障 RowKey 唯一性版保。綜上呜笑,check 表的 RowKey 設(shè)計(jì)如下:(check_id=A208849559)
為增強(qiáng)可讀性,中間還可以加上自定義的分割符彻犁,如’+’,’|’等叫胁。
7+7c9498b4a83974da56b252122b9752bf+56B63AB98C2E00B4E053C501380709AD
以上設(shè)計(jì)能保證對每次查詢而言,其 salt+hash 前綴值是確定的袖裕,并且落在同一個(gè) region 中。需要說明的是 HBase 中 check 表的各列同數(shù)據(jù)源 Oracle 中 check 表的各列存儲(chǔ)溉瓶。
WEB 查詢設(shè)計(jì)
RowKey 設(shè)計(jì)與查詢息息相關(guān)急鳄,查詢方式?jīng)Q定 RowKey 設(shè)計(jì),反之基于以上 RowKey 設(shè)計(jì)堰酿,查詢時(shí)通過設(shè)置 Scan 的 [startRow疾宏,stopRow], 即可完成掃描。以查詢 check_id=A208849559 為例触创,根據(jù) RowKey 的設(shè)計(jì)原則坎藐,對其進(jìn)行 salt+hash 計(jì)算,得前綴哼绑。
startRow?=?7+7c9498b4a83974da56b252122b9752bf
stopRow?=?7+7c9498b4a83974da56b252122b9752bg
代碼實(shí)現(xiàn)關(guān)鍵流程
Spark write to HBase
Step0: prepare work
因?yàn)槭菑纳嫌蜗到y(tǒng)承接的業(yè)務(wù)數(shù)據(jù)岩馍,存量數(shù)據(jù)采用 sqoop 抽到 hdfs;增量數(shù)據(jù)每日以文件的形式從 ftp 站點(diǎn)獲取抖韩。因?yàn)闃I(yè)務(wù)數(shù)據(jù)字段中包含一些換行符蛀恩,且 sqoop1.4.6 目前只支持單字節(jié),所以本文選擇’0x01’作為列分隔符茂浮,’0x10’作為行分隔符双谆。
Step1: Spark read hdfs text file
SparkContext.textfile() 默認(rèn)行分隔符為”\n”,此處我們用“0x10”席揽,需要在 Configuration 中配置顽馋。應(yīng)用配置,我們調(diào)用 newAPIHadoopFile 方法來讀取 hdfs 文件幌羞,返回 JavaPairRDD寸谜,其中 LongWritable 和 Text 分別為 Hadoop 中的 Long 類型和 String 類型(所有 Hadoop 數(shù)據(jù)類型和 java 的數(shù)據(jù)類型都很相像,除了它們是針對網(wǎng)絡(luò)序列化而做的特殊優(yōu)化)属桦。我們需要的數(shù)據(jù)文件放在 pairRDD 的 value 中程帕,即 Text 指代住练。為后續(xù)處理方便,可將 JavaPairRDD轉(zhuǎn)換為 JavaRDD< String >愁拭。
Step2: Transfer and sort RDD
① 將 avaRDD< String>轉(zhuǎn)換成 JavaPairRDD讲逛,其中參數(shù)依次表示為,RowKey岭埠,col盏混,value。做這樣轉(zhuǎn)換是因?yàn)?HBase 的基本原理是基于 RowKey 排序的惜论,并且當(dāng)采用 bulk load 方式將數(shù)據(jù)寫入多個(gè)預(yù)分區(qū)(region)時(shí)许赃,要求 Spark 各 partition 的數(shù)據(jù)是有序的,RowKey馆类,column family(cf)混聊,col name 均需要有序。在本案例中因?yàn)橹挥幸粋€(gè)列簇乾巧,所以將 RowKey 和 col name 組織出來為 Tuple2格式的 key句喜。請注意原本數(shù)據(jù)庫中的一行記錄(n 個(gè)字段),此時(shí)會(huì)被拆成 n 行沟于。
② 基于 JavaPairRDD進(jìn)行 RowKey咳胃,col 的二次排序。如果不做排序旷太,會(huì)報(bào)以下異常:
java.io.IOException:?Added?a?key?notlexically?larger?than?previous?key
③ 將數(shù)據(jù)組織成 HFile 要求的 JavaPairRDDhfileRDD展懈。
Step3:create hfile and bulk load to HBase
①主要調(diào)用 saveAsNewAPIHadoopFile 方法:
hfileRdd.saveAsNewAPIHadoopFile(hfilePath,ImmutableBytesWritable.class,
KeyValue.class,HFileOutputFormat2.class,config);
② hfilebulk load to HBase
final?Job?job?=?Job.getInstance();
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
HFileOutputFormat2.configureIncrementalLoad(job,htable);
LoadIncrementalHFiles?bulkLoader?=?newLoadIncrementalHFiles(config);
bulkLoader.doBulkLoad(newPath(hfilePath),htable);
注:如果集群開啟了 kerberos,step4 需要放置在 ugi.doAs()方法中供璧,在進(jìn)行如下驗(yàn)證后實(shí)現(xiàn)
UserGroupInformation?ugi?=?UserGroupInformation.loginUserFromKeytabAndReturnUGI(keyUser,keytabPath);
UserGroupInformation.setLoginUser(ugi);
訪問 HBase 集群的 60010 端口 web存崖,可以看到 region 分布情況。
Read from HBase
本文基于 spring boot 框架來開發(fā) web 端訪問 HBase 內(nèi)數(shù)據(jù)睡毒。
use connection pool(使用連接池)
創(chuàng)建連接是一個(gè)比較重的操作金句,在實(shí)際 HBase 工程中,我們引入連接池來共享 zk 連接吕嘀,meta 信息緩存违寞,region server 和 master 的連接。
HConnection?connection?=?HConnectionManager.createConnection(config);
HTableInterface?table?=?connection.getTable("table1");
try?{
//?Use?the?table?as?needed,?for?a?single?operation?and?a?single?thread
}?finally?{
table.close();
}
也可以通過以下方法偶房,覆蓋默認(rèn)線程池趁曼。
HConnection?createConnection(org.apache.hadoop.conf.Configuration?conf,ExecutorService?pool);
process query
Step1: 根據(jù)查詢條件,確定 RowKey 前綴
根據(jù) 3.3 RowKey 設(shè)計(jì)介紹棕洋,HBase 的寫和讀都遵循該設(shè)計(jì)規(guī)則挡闰。此處我們采用相同的方法,將 web 調(diào)用方傳入的查詢條件,轉(zhuǎn)化成對應(yīng)的 RowKey 前綴摄悯。例如赞季,查詢 check 表傳遞過來的 check_id=A208849559,生成前綴 7+7c9498b4a83974da56b252122b9752bf奢驯。
Step2:確定 scan 范圍
A208849559 對應(yīng)的查詢結(jié)果數(shù)據(jù)即在 RowKey 前綴為 7+7c9498b4a83974da56b252122b9752bf 對應(yīng)的 RowKey 及 value 中申钩。
scan.setStartRow(Bytes.toBytes(rowkey_pre));?//scan,?7+7c9498b4a83974da56b252122b9752bf
byte[]?stopRow?=?Bytes.toBytes(rowkey_pre);
stopRow[stopRow.length-1]++;
scan.setStopRow(stopRow);//?7+7c9498b4a83974da56b252122b9752bg
Step3:查詢結(jié)果組成返回對象
遍歷 ResultScanner 對象,將每一行對應(yīng)的數(shù)據(jù)封裝成 table entity瘪阁,組成 list 返回撒遣。
測試
從原始數(shù)據(jù)中隨機(jī)抓取 1000 個(gè) check_id,用于模擬測試管跺,連續(xù)發(fā)起 3 次請求數(shù)為 2000(200 個(gè)線程并發(fā)义黎,循環(huán) 10 次),平均響應(yīng)時(shí)間為 51ms豁跑,錯(cuò)誤率為 0廉涕。
如上圖,經(jīng)歷 N 次累計(jì)測試后艇拍,各個(gè) region 上的 Requests 數(shù)較為接近狐蜕,符合負(fù)載均衡設(shè)計(jì)之初。
踩坑記錄
1淑倾、kerberos 認(rèn)證問題
如果集群開啟了安全認(rèn)證馏鹤,那么在進(jìn)行 Spark 提交作業(yè)以及訪問 HBase 時(shí)征椒,均需要進(jìn)行 kerberos 認(rèn)證娇哆。
本文采用 yarn cluster 模式,像提交普通作業(yè)一樣勃救,可能會(huì)報(bào)以下錯(cuò)誤碍讨。
ERROR?StartApp:?job?failure,
java.lang.NullPointerException
at?com.tinawang.spark.hbase.utils.HbaseKerberos.(HbaseKerberos.java:18)
at?com.tinawang.spark.hbase.job.SparkWriteHbaseJob.run(SparkWriteHbaseJob.java:60)
定位到 HbaseKerberos.java:18,代碼如下:
this.keytabPath?=?(Thread.currentThread().getContextClassLoader().getResource(prop.getProperty("hbase.keytab"))).getPath();
這是因?yàn)?executor 在進(jìn)行 HBase 連接時(shí)蒙秒,需要重新認(rèn)證勃黍,通過 --keytab 上傳的 tina.keytab 并未被 HBase 認(rèn)證程序塊獲取到,所以認(rèn)證的 keytab 文件需要另外通過 --files 上傳晕讲。示意如下
--keytab?/path/tina.keytab?\
--principal?tina@GNUHPC.ORG?\
--files?"/path/tina.keytab.hbase"
其中 tina.keytab.hbase 是將 tina.keytab 復(fù)制并重命名而得覆获。因?yàn)?Spark 不允許同一個(gè)文件重復(fù)上傳。
2瓢省、序列化
org.apache.spark.SparkException:?Task?not?serializable
at?org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at?org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at?org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at?org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
at?org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at?org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
...
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
Caused?by:?java.io.NotSerializableException:?org.apache.spark.api.java.JavaSparkContext
Serialization?stack:
-?object?not?serializable?(class:?org.apache.spark.api.java.JavaSparkContext,?value:?org.apache.spark.api.java.JavaSparkContext@24a16d8c)
-?field?(class:?com.tinawang.spark.hbase.processor.SparkReadFileRDD,?name:?sc,?type:?class?org.apache.spark.api.java.JavaSparkContext)
...
解決方法一:
如果 sc 作為類的成員變量弄息,在方法中被引用,則加 transient 關(guān)鍵字勤婚,使其不被序列化摹量。
private?transient?JavaSparkContext?sc;
解決方法二:
將 sc 作為方法參數(shù)傳遞,同時(shí)使涉及 RDD 操作的類 implements Serializable。 代碼中采用第二種方法缨称。詳見代碼凝果。
3、批量請求測試
Exception?in?thread?"http-nio-8091-Acceptor-0"?java.lang.NoClassDefFoundError:?org/apache/tomcat/util/ExceptionUtils
或者
Exception?in?thread?"http-nio-8091-exec-34"?java.lang.NoClassDefFoundError:?ch/qos/logback/classic/spi/ThrowableProxy
查看下面 issue 以及一次排查問題的過程睦尽,可能是 open file 超過限制器净。
https://github.com/spring-projects/spring-boot/issues/1106
http://mp.weixin.qq.com/s/34GVlaYDOdY1OQ9eZs-iXg
使用 ulimit-a 查看每個(gè)用戶默認(rèn)打開的文件數(shù)為 1024。
在系統(tǒng)文件 /etc/security/limits.conf 中修改這個(gè)數(shù)量限制骂删,在文件中加入以下內(nèi)容, 即可解決問題掌动。
soft nofile 65536
hard nofile 65536
大數(shù)據(jù)全套學(xué)習(xí)視頻資料領(lǐng)取加微信號(hào) “Lxiao_28”