其實本章并不是解釋快的原因不同,只是我?guī)е@個問題,看完了這篇博客颤殴。這篇博客是google拿來的觅廓,做了些許改動。寫的不錯涵但,看了很多遍才看明白杈绸。感興趣的可以留言交流
它解釋了:
1、hive的操作符是mr一個線程驅(qū)動的
2矮瘟、最終結(jié)果是有一個Stage-0瞳脓,它是依賴于Stage-1的文件讀取操作,它不是MR作業(yè)芥永,只是一個基于hadoop文件系統(tǒng)客戶端的分布式文件讀取程序篡殷。
3、limit時的數(shù)據(jù)流
select deviceid from t_aa_pc_log where pt='2012-07-07-00' limit 1;? 從根本上說埋涧, hive是hadoop提交作業(yè)的客戶端板辽,它使用antlr詞法語法分析工具,對SQL進行分析優(yōu)化后棘催,成一系列MapReduce作業(yè)劲弦,向hadoop提交運行作業(yè)以得到結(jié)果。
? 這條語句指定分區(qū)字段 pt為2012-07-07-00, 限制結(jié)果為 limit 1. 假設(shè)運行這個MR作業(yè)需要5個map, 那么每個map應(yīng)該輸出一條記錄醇坝,從jobtrack 的 jobdetails頁面中的計數(shù)器中 Map Input Records 一項應(yīng)該顯示為5(即該作業(yè)中Map階段總共輸入5條記錄),結(jié)果是否如預(yù)計的那樣邑跪, 通過運行改SQL來驗證:
? > select deviceid from t_aa_pc_log where pt='2012-07-07-00' limit 1;? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201205162059_1547550, Tracking URL = http://jt.dc.sh-wgq.sdo.com:50030/jobdetails.jsp?jobid=job_201205162059_1547550
Kill Command = /home/hdfs/hadoop-current/bin/hadoop job? -Dmapred.job.tracker=10.133.10.103:50020 -kill job_201205162059_1547550
2012-07-07 16:22:42,570 Stage-1 map = 0%,? reduce = 0%
2012-07-07 16:22:48,628 Stage-1 map = 80%,? reduce = 0%
2012-07-07 16:22:49,640 Stage-1 map = 100%,? reduce = 0%
2012-07-07 16:22:50,654 Stage-1 map = 100%,? reduce = 100%
Ended Job = job_201205162059_1547550
OK
0cf49387a23d9cec25da3d76d6988546
Time taken: 13.499 seconds
hive>?
正如limit 1限制,輸出一條記錄呼猪,再通過 http://jt.dc.sh-wgq.sdo.com:50030/jobdetails.jsp?jobid=job_201205162059_1547550
查看Map Input Records項:
? 上圖顯示Map Input Records實際上是35画畅,并非之前設(shè)想的每個MAP一條,總共5條宋距,那多出來的30條記錄又是怎么來的? 實際上這個跟hive mapreduce實現(xiàn)有關(guān)轴踱,先來看看上面這條SQL的執(zhí)行計劃:
? ? > explain select deviceid from t_aa_pc_log where pt='2012-07-07-00' limit 1;
OK
STAGE DEPENDENCIES:
? Stage-1 is a root stage
? Stage-0 is a root stage
STAGE PLANS:
? Stage: Stage-1
? ? Map Reduce
? ? ? Alias -> Map Operator Tree:
? ? ? ? t_aa_pc_log?
? ? ? ? ? TableScan
? ? ? ? ? ? alias: t_aa_pc_log
? ? ? ? ? ? Filter Operator
? ? ? ? ? ? ? predicate:
? ? ? ? ? ? ? ? ? expr: (pt = '2012-07-07-00')
? ? ? ? ? ? ? ? ? type: boolean
? ? ? ? ? ? ? Select Operator
? ? ? ? ? ? ? ? expressions:
? ? ? ? ? ? ? ? ? ? ? expr: deviceid
? ? ? ? ? ? ? ? ? ? ? type: string
? ? ? ? ? ? ? ? outputColumnNames: _col0
? ? ? ? ? ? ? ? Limit
? ? ? ? ? ? ? ? ? File Output Operator
? ? ? ? ? ? ? ? ? ? compressed: false
? ? ? ? ? ? ? ? ? ? GlobalTableId: 0
? ? ? ? ? ? ? ? ? ? table:
? ? ? ? ? ? ? ? ? ? ? ? input format: org.apache.hadoop.mapred.TextInputFormat
? ? ? ? ? ? ? ? ? ? ? ? output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
? Stage: Stage-0
? ? Fetch Operator
? ? ? limit: 1
Time taken: 0.418 seconds
? 該執(zhí)行計劃顯示,Stage-1 是一個MR程序谚赎,且只有map過程淫僻, 沒有reduce過程,也就是說在Map過程就直接將結(jié)果輸出到HDFS文件系統(tǒng), Stage-0是依賴于Stage-1的文件讀取操作,它不是MR作業(yè)壶唤,只是一個基于hadoop文件系統(tǒng)客戶端的分布式文件讀取程序雳灵。
? 重點分析Stage-1過程,一條記錄被讀取后調(diào)用hive自定義mapper函數(shù)闸盔,依次經(jīng)過
TableScan Operator -> Filter Operator -> Select Operator -> Limit Operator-> File Output Operator, 以上每一個Operator都是hive定義的一個處理過程, 每一個 Operator都定義有:
這樣就構(gòu)成了一個 Operator圖悯辙,hive正是基于這些圖關(guān)系來處理諸如limit, group by, join等操作. Operator 基類定義一個:
protected boolean done; // 初始化值為false
這個字段指示某一個層級的Operator是否已經(jīng)處理完成,每當一條記錄進入特定的Operator操作時,當前Operator會判斷自己的childOperators 的done是否全部為true, 如果是笑撞, 表示childOperators已去全部處理完畢岛啸, 當前這個Operator也把自己的 done設(shè)置為true, 這樣層層返回,直到最外層的Operator, 這個查詢中涉及的部分Operator如下圖:
該hive MR作業(yè)中指定的mapper是:
mapred.mapper.class = org.apache.hadoop.hive.ql.exec.ExecMapper
input format是:
hive.input.format? org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
部分時序圖:
可見MapRunner是一個驅(qū)動線程茴肥,來讀取數(shù)據(jù)和執(zhí)行后面的任務(wù)坚踩。A調(diào)用B,B調(diào)用C瓤狐,最總追到 LimitOperator瞬铸。例如先讀出一條數(shù)據(jù),然后再進行select础锐、接著filter嗓节。如果在filter階段過濾掉,那么計數(shù)器(currCount 是一個記錄處理完成的計數(shù)器)會減去1.
MapRunner會循環(huán)調(diào)用CombineHiveRecordReader的doNext方法讀入行記錄皆警,直到doNext方法返回false, doNext方法中有一個重要的邏輯來控制記錄讀取是否結(jié)束
@Override
public boolean doNext(K key, V value) throws IOException {
? if (ExecMapper.getDone()) {
? ? return false;
? }
? return recordReader.next(key, value);
}
? 每讀取一條記錄都會判斷 MapRunner.getDone()是否為真拦宣, 如果是則結(jié)束Mapper讀取過程,? ExecMapper類中定義了一個靜態(tài)變量done(靜態(tài)非常重要,因為在hadoop框架下執(zhí)行時 CombineHiveRecordReader無法拿到 ExecMapper實例), 當 MapRunner讀取一條記錄后就會調(diào)用 MapRunner的map函數(shù),? ExecMapper中定義了一個MapOperator信姓,MapOperator的 childOperators 列表中持有TableScanOperator實例鸵隧,依次類推, 各Operator遞歸包含.
? ? ExecMapper的map函數(shù)被調(diào)用時會先判斷 MapOperator的done是否為true, 如果是意推,則將自己的靜態(tài)變量done設(shè)置為true(這樣 CombineHiveRecordReader在下一次讀取記錄時發(fā)現(xiàn) ExecMapper的done為true, 結(jié)束mapper記錄讀取),? 否則執(zhí)行MapOperator的process方法, 具體邏輯如下:
public void map(Object key, Object value, OutputCollector output,
? ? ? Reporter reporter) throws IOException {
? ? if (oc == null) {
? ? ? oc = output;
? ? ? rp = reporter;
? ? ? mo.setOutputCollector(oc);
? ? ? mo.setReporter(rp);
? ? }
? ? // reset the execContext for each new row
? ? execContext.resetRow();
? ? try {
? ? ? if (mo.getDone()) {
? ? ? ? done = true;
? ? ? } else {
? ? ? ? // Since there is no concept of a group, we don't invoke
? ? ? ? // startGroup/endGroup for a mapper
? ? ? ? mo.process((Writable)value);
接下來再看看各Operator如何判斷自己狀態(tài)是否為執(zhí)行完成:
int childrenDone = 0;
for (int i = 0; i < childOperatorsArray.length; i++) {
? Operator o = childOperatorsArray[i];
? if (o.getDone()) {
? ? childrenDone++;
? } else {
? ? o.process(row, childOperatorsTag[i]);
? }
}
// if all children are done, this operator is also done
if (childrenDone == childOperatorsArray.length) {
? setDone(true);
}
每個Operator都判斷自己的子Operator狀態(tài)是否全部完成豆瘫, 如果是則把自己的狀態(tài)也設(shè)置成done=true.
最后再看LimitOperator的判斷邏輯:
@Override
public void processOp(Object row, int tag) throws HiveException {
? if (currCount < limit) {
? ? forward(row, inputObjInspectors[tag]);
? ? currCount++;
? } else {
? ? setDone(true);
? }
}
currCount 是一個記錄處理完成的計數(shù)器, 初始值為0菊值, 當該值大于等于limit后外驱,將自己標識成處理完成狀態(tài),即設(shè)置done=true.
? 分析到現(xiàn)在, 已經(jīng)可以非常清晰的解釋最初的疑問了腻窒, 為什么 limit 1昵宇, map數(shù)為5的前提下, Map Input Records 是35而不是5
1. 第一條記錄進入LimitOperator done 為false
2. 第二條記錄進入LimitOperator done 為true
3. 第三條記錄進入SelectOperator done 設(shè)置為true
4. 第四條記錄進入FilterOperator done設(shè)置為true
5. 第五條記錄進入TableScanOperator done設(shè)置為true
6. 第六條記錄進入MapOperator done設(shè)置為true
7. 第7條記錄進入ExecMapper 靜態(tài)變量done設(shè)置為true
8. 讀取第八條記錄時 CombineHiveRecordReader 發(fā)現(xiàn) ExecMapper 的done已經(jīng)為true, 結(jié)束數(shù)據(jù)讀取儿子,從而 MapRunner 退出循環(huán)趟薄, 結(jié)束mapper過程.
從上面8個步驟看出, 每個map會讀取7條記錄典徊, 5個map, 正好是35條記錄.
? 在平時工作中恩够, 通過分析 hive 執(zhí)行計劃可以讓我們清楚的知道MR中的每一個過程卒落,理解HIVE執(zhí)行過程, 進而對SQL優(yōu)化.
所以后面的幾條數(shù)據(jù)是沒有用的蜂桶,只把第一條數(shù)據(jù)拿來處理