MapReduce data flow:
Hadoop Streaming:
Hadoop本身是用Java開發(fā)的蠢箩,程序也需要用Java編寫印颤,但是通過Hadoop Streaming,我們可以使用任意語言來編寫程序订歪,讓Hadoop運行脖祈。
Hadoop Streaming的優(yōu)缺點:
優(yōu)點
1.可以使用自己喜歡的語言來編寫MapReduce程序(換句話說,不必寫Java XD)
2.不需要像寫Java的MR程序那樣import一大堆庫刷晋,在代碼里做一大堆配置盖高,很多東西都抽象到了stdio上,代碼量顯著減少
3.因為沒有庫的依賴眼虱,調(diào)試方便喻奥,并且可以脫離Hadoop先在本地用管道模擬調(diào)試
缺點
1.只能通過命令行參數(shù)來控制MapReduce框架,不像Java的程序那樣可以在代碼里使用API捏悬,控制力比較弱撞蚕,有些東西鞭長莫及
2.因為中間隔著一層處理,效率會比較慢
3.所以Hadoop Streaming比較適合做一些簡單的任務(wù)过牙,比如用python寫只有一兩百行的腳本甥厦。如果項目比較復(fù)雜,或者需要進行比較細致的優(yōu)化寇钉,使用Streaming就容易出現(xiàn)一些束手束腳的地方刀疙。
使用python編寫Hadoop Streaming程序有幾點需要注意:
1.在能使用iterator的情況下,盡量使用iterator扫倡,避免將stdin的輸入大量儲存在內(nèi)存里谦秧,否則會嚴(yán)重降低性能
2.streaming不會幫你分割key和value傳進來,傳進來的只是一個個字符串而已镊辕,需要你自己在代碼里手動調(diào)用split()
3.從stdin得到的每一行數(shù)據(jù)末尾似乎會有\(zhòng)n油够,保險起見一般都需要使用rstrip()或者strip()來去掉
4.在想獲得K-V list而不是一個個處理key-value pair時,可以使用groupby配合itemgetter將key相同的k-v pair組成一個個group征懈,得到類似Java編寫的reduce可以直接獲取一個Text類型的key和一個iterable作為value的效果石咬。注意itemgetter的效率比lambda表達式要高,所以如果需求不是很復(fù)雜的話卖哎,盡量用itemgetter比較好鬼悠。
本地調(diào)試:
$ cat <input path> | python <path to mapper script> | sort -t $'\t' -k1,1 | python <path to reducer script> > <output path>
Tips:
Hadoop默認按照tab來分割key和value删性,以第一個分割出的部分為key,按key進行排序焕窝,因此這里使用
sort -t $'\t' -k1,1
在集群上運行與監(jiān)控:
為了更好地模擬集群環(huán)境蹬挺,我們可以在mapred-site.xml中增設(shè)reducer和mapper的最大數(shù)目(默認為2,實際可用數(shù)目大約是CPU核數(shù)-1)
首先需要知道用于streaming的java程序在哪里它掂。
hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
用Hadoop Streaming執(zhí)行python程序的一般步驟是:
1.將輸入文件放到HDFS上巴帮,建議使用copyFromLocal而不是put命令,參見Difference between hadoop fs -put and hadoop fs -copyFromLocal
一般可以新建一個文件夾用于存放輸入文件虐秋,假設(shè)叫input
$ hadoop fs -mkdir input
$ hadoop fs -ls
查看目錄榕茧,可以看到出現(xiàn)了一個/user/hadoop/input文件夾。/user/hadoop是默認的用戶文件夾客给,相當(dāng)于本地文件系統(tǒng)中的/home/hadoop
再使用
$ hadoop fs -copyFromLocal <PATH TO LOCAL FILE(S)> input/
將本地文件放到input文件夾下
2.開始MR作業(yè)
hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
-file mapper.py \
-mapper mapper.py \
-file reducer.py \
-reducer reducer.py \
-input $1 \
-output $2 \
一般來說要檢查運行狀況用押,都是去jobtracker的webUI。如果在master上靶剑,用瀏覽器訪問http://localhost:50030 即可 (如果你在配置hadoop的時候修改了mapred-site.xml的mapred.job.tracker.http.address蜻拨,請訪問對應(yīng)的其他地址)
在webUI里你可以看到running jobs, completed jobs和retired jobs。點擊Jobid下的超鏈接桩引,可以看到對應(yīng)job的執(zhí)行狀況缎讼。進去后如果看到Failed/Killed Task Attempts下非空,你可以點進對應(yīng)的超鏈接阐污,找到對應(yīng)的log去進行debug休涤。
成功執(zhí)行完這個任務(wù)之后咱圆,你用output參數(shù)在HDFS上指定的輸出文件夾里就會多出幾個文件
一個空白文件_SUCCESS笛辟,表明job運行成功,這個文件可以讓其他程序只要查看一下HDFS就能判斷這次job是否成功運行序苏,從而進行相關(guān)處理手幢。
一個_logs文件夾,顧名思義里面放著任務(wù)日志
part-00000, .... part-xxxxx文件忱详,有多少個reducer后面的數(shù)字就會有多大围来,對應(yīng)每個reducer的輸出結(jié)果。
如何串聯(lián)多趟MR
如果你有多次任務(wù)要執(zhí)行匈睁,下一步需要用上一步的任務(wù)做輸入监透,解決辦法其實很簡單。假設(shè)上一步在HDFS的輸出文件夾是output1航唆,那么在下一步的運行命令中胀蛮,指明
-input output1/part-*
即指定上一次的所有輸出為本次任務(wù)的輸入即可。注意這里假設(shè)你不需要對上一步的輸出做額外處理
控制partitioner
partitioning指的是數(shù)據(jù)經(jīng)過mapper處理后糯钙,被分發(fā)到reducer上的過程粪狼。partitioner控制的退腥,就是“怎樣的mapper輸出會被分發(fā)到哪一個reducer上”。Hadoop有幾個自帶的partitioner再榄。默認的是HashPartitioner狡刘,也就是把第一個tab前的key做hash之后用于分配partition。寫Hadoop Streaming程序是可以選擇其他partitioner的困鸥,你可以選擇自帶的其他幾種里的一種嗅蔬,也可以自己寫一個繼承Partitioner的java類然后編譯成jar,在運行參數(shù)里指定為你用的partitioner疾就。官方自帶的partitioner里最常用的是KeyFieldBasedPartitioner购城。它會按照key的一部分來做partition,而不是用整個key來做partition虐译。在學(xué)會用KeyFieldBasedPartitioner之前瘪板,必然要先學(xué)怎么控制key-value的分割。分割key的步驟可以分為兩步漆诽,用python來描述一下大約是
fields = output.split(seperator)
key = fields[:numKeyfields]
1.選擇用什么符號來分割key侮攀,也就是選擇seperator
map.output.key.field.separator可以指定用于分隔key的符號。比如指定為一點的話厢拭,就要加上參數(shù)
-D stream.map.output.field.separator=.
假設(shè)你的mapper輸出是
11.22.33.44
這時會先看準(zhǔn)[11, 22, 33, 44]這里的其中一個或幾個作為key
2.選擇key的范圍兰英,也就是選擇numKeyfields
控制key的范圍的參數(shù)是這個,假設(shè)我要設(shè)置被分割出的前2個元素為key:
-D stream.num.map.output.key.fields=2
那么key就是上面的 1122供鸠。值得注意的是假如這個數(shù)字設(shè)置到覆蓋整個輸出畦贸,在這個例子里是4的話,那么整一行都會變成key楞捂。
假設(shè)在上一步我們通過使用
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
將11.22.33.44的整個字符串都設(shè)置成了key薄坏,下一步就是在這個key的內(nèi)部再進行一次分割。map.output.key.field.separator可以用來設(shè)置第二次分割用的分割符寨闹,mapred.text.key.partitioner.options可以接受參數(shù)來劃分被分割出來的partition key胶坠,比如:
-D map.output.key.field.separator=. \
-D mapred.text.key.partitioner.options=-k1,2 \
指的就是在key的內(nèi)部里,將第1到第2個被點分割的元素作為partition key繁堡,這個例子里也就是1122沈善。這里的值-ki,j表示從i到j(luò)個元素(inclusive)會作為partition key。如果終點省略不寫椭蹄,像-ki的話闻牡,那么i和i之后的元素都會作為partition key。
partition key相同的輸出會保證分到同一個reducer上绳矩,也就是所有11.22.xx.xx的輸出都會到同一個partitioner罩润,11.22換成其他各種組合也是一樣。
命令格式大約就是長這樣:
hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-D map.output.key.field.separator=. \
-D mapred.text.key.partitioner.options=-k1,2 \
-input inputDir \
-output outputDir \
-mapper mapper.py -file mapper.py \
-reducer reducer.py -file reducer.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
控制comparator與自定義排序
用來控制的參數(shù)是mapred.text.key.comparator.options埋酬,接受的值格式類似于unix sort哨啃。比如我要按第二個元素的數(shù)字序(默認字典序)+倒序來排元素的話烧栋,就用
-D mapred.text.key.comparator.options=-k2,2nr
n表示數(shù)字序,r表示倒序拳球。這樣一來
11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2
就會被排成
11.14.2.3
11.14.2.2
11.12.1.2
11.12.1.1
11.11.4.1