Hadoop詳解 - HDFS - MapReduce - YARN - HA

為什么要有Hadoop饲梭?

????? 從計算機誕生到現(xiàn)今,積累了海量的數(shù)據(jù)早歇,這些海量的數(shù)據(jù)有結(jié)構(gòu)化倾芝、半結(jié)構(gòu)化、非

結(jié)構(gòu)的數(shù)據(jù)箭跳,并且這些海量的數(shù)據(jù)存儲和檢索就成為了一大問題蛀醉。

????? 我們都知道大數(shù)據(jù)技術(shù)難題在于一個數(shù)據(jù)復(fù)雜性、數(shù)據(jù)量衅码、大規(guī)模的數(shù)據(jù)計算拯刁。

Hadoop就是為了解決這些問題而出現(xiàn)的。


Hadoop的誕生

????? DougCutting是Lucene的作者逝段,當(dāng)時Lucene面臨和谷歌同樣的問題垛玻,就是海量的數(shù)據(jù)存儲和檢索,于是就誕生了Nutch奶躯。

????? 在這之后帚桩,谷歌的大牛就為解決這個問題發(fā)了三篇論文(GFS、Map-Reduce嘹黔、BigTable)账嚎,這三篇論文總體表達(dá)的意思就是部署多臺廉價的服務(wù)器集群,通過分布式的方式將海量數(shù)據(jù)存儲在這個集群上儡蔓,然后利用集群上的所有機器進行數(shù)據(jù)計算郭蕉,這樣谷歌就不用買很多很貴的服務(wù)器,只需要把普通的機器組合在一起喂江。

Doug Cutting等人就去研究這三篇論文召锈,發(fā)現(xiàn)價值巨大,于是Doug Cutting等人在Nutch上實現(xiàn)了GFS和Map-Reduce获询,使得Nutch的性能飆升涨岁。

????? 于是Doug Cutting等人就把這兩部分納入到Hadoop項目中姓言,主要還是為了將Hadoop項目作為一個大數(shù)據(jù)整體化的解決方案缎浇。

????? 所以為什么后面就出現(xiàn)了Hadoop而不是在Nutch上去做整體化大數(shù)據(jù)解決方案仗扬。

????? 這三篇論文對應(yīng)Hadoop的組件:

????? GFS-> HDFS?????????????????? 文件系統(tǒng)

????? Map-Reduce-> MR??????? ?????計算框架

????? BigTable-> Hbase?????????????? 數(shù)據(jù)庫系統(tǒng)



什么是Hadoop着绷?

????? Hadoop是Apache下的一個分布式系統(tǒng)基礎(chǔ)架構(gòu)税手,主要是為了解決海量數(shù)據(jù)存儲和海量的數(shù)據(jù)計算問題帕棉。

????? 在這個基礎(chǔ)之上發(fā)展出了的更多的技術(shù)灵迫,使得Hadoop稱為大數(shù)據(jù)技術(shù)生態(tài)圈之一空凸。


Hadoop發(fā)行版本

1、Apache版本最原始的版本

2畜疾、Clodera版本赴邻,在大型互聯(lián)網(wǎng)企業(yè)中用的比較多,軟件免費啡捶,通過服務(wù)收費姥敛。

3、Hortonworks文檔比較好



特點

????? 高可靠:維護多個副本瞎暑,假設(shè)計算元素和存儲出現(xiàn)故障時彤敛,可以對失敗節(jié)點重新分布處理

????? 高擴展:在集群間分配任務(wù)數(shù)據(jù),可方便的擴展數(shù)以千計的節(jié)點

????? 高效性:并行工作

????? 高容錯:自動保存多個副本了赌,并且能夠?qū)κ∪蝿?wù)重新分配


Hadoop組成

HDFS:一個高可靠高吞吐量的分布式文件系統(tǒng)

?? NameNode(nn):存儲文件的元數(shù)據(jù)墨榄,如:文件名、文件目錄結(jié)構(gòu)等信息

?? DataNode(dn)在文件系統(tǒng)存儲文件塊數(shù)據(jù)勿她,以及數(shù)據(jù)的校驗和袄秩,也就是真正存儲文件內(nèi)容的,只是文件大的時候會切割成一小塊一小塊的逢并。

?? SecondayNameNode(2nn)用于監(jiān)控HDFS狀態(tài)的輔助后臺程序之剧,每隔一段時間就獲取HDFS的快照,就是備份和監(jiān)控狀態(tài)



Yarn:作業(yè)調(diào)度與集群資源管理框架砍聊。(Hadoop2.0加入)

?? ResourceManager(rm)處理客戶端請求背稼、啟動和監(jiān)控MRAppMaster、監(jiān)控NodeManager玻蝌,以及資源分配和調(diào)度蟹肘。

?? NodeManager(nn)單個節(jié)點上的資源管理、處理來自ResourceManager的命令俯树,處理來自MRAppMaster的命令帘腹。

?? MRAppMaster數(shù)據(jù)切分、為應(yīng)用程序申請資源聘萨,并分配內(nèi)部任務(wù)竹椒、任務(wù)監(jiān)控和容錯。

?? Container對任務(wù)運行環(huán)境的抽象米辐,封裝了CPU、內(nèi)存等多維資源以及環(huán)境變量书释、啟動命令等任務(wù)運行相關(guān)信息(hadoop內(nèi)部文件操作命令和Liunx差不多)



MapReduce:分布式離線并行計算框架翘贮。

?? Map階段:并行處理數(shù)據(jù)

?? Reduce階段:對Map階段處理的結(jié)果數(shù)據(jù)進行匯總

?Common:支持其他模塊的工具模塊


理解Hadoop組成

????? 有一個建筑工地的建造時間很緊急爆惧,設(shè)立了一個支持小組狸页,支援各個小分隊(Common),首先1000包水泥,這些水泥要進行存儲(HDFS)芍耘,假設(shè)這些水泥有防水的和不防水的址遇,防水的水泥存到倉庫1(HDFS-dn),不防水的存儲到倉庫2(HDFS-dn)斋竞,那么就要進行記錄倔约,哪些水泥存放到哪里了(HDFS-nn),因為趕工期擔(dān)心水泥可能會因為潮濕那些問題坝初,出現(xiàn)不可用浸剩,所以又準(zhǔn)備了1000包水泥,并且每天都要對這些水泥進行檢查(HDFS-2nn)鳄袍。

????? 如果一個小分隊要領(lǐng)取水泥就要和工地倉儲管理人員申請绢要,倉儲管理人員同意了,就要向公司申請人員來搬水泥(Yarn-MRAppMaster)拗小,開始調(diào)動這些人員搬運水泥(Yarn-rm)重罪,小分隊領(lǐng)取到了水泥之后,開始決定給修外墻的多少包水泥(Yarn-nm)哀九。

????? 修外墻小組就開始拿著水泥干活了(MapReduce-Map)剿配,直到整棟樓的外墻修好了(MapReduce-Reduce),第N棟也是如此(MapReduce-Map)勾栗。



Hadoop內(nèi)為什么要如此劃分惨篱?

數(shù)據(jù)存放在Hadoop,那么Hadoop必然需要對數(shù)據(jù)進行管理围俘,如果沒有一個專門管理數(shù)據(jù)存儲的組件或數(shù)據(jù)運算的組件砸讳,全部都融合在一個東西里面就會顯得很臃腫,并且組件之間只需要通過接口進行溝通界牡,那么各自的組件就可以僅僅自身的需求做優(yōu)化等簿寂,那么就不會影響到其他的組件。

各自的組件只需要做好自己的事情宿亡,對外提供接口接收相應(yīng)的數(shù)據(jù)及返回數(shù)據(jù)常遂,只要符合我組件規(guī)范的就運行,不符合就不運行挽荠,而不需要關(guān)心其他克胳,專心做自己的事情,也可以使得組件之間可以單獨的運行圈匆。



Hadoop目錄

????? bin程序級命令(hdfs漠另、Yarn等)

????? etc配置文件

????? include類庫等文件

????? lib類庫等文件

????? libexec類庫等文件

????? sbinhadoop系統(tǒng)命令(關(guān)閉、啟動等)

????? share官方提供的案例等



Hadoop運行模式

????? 本地模式:不需要啟動單獨進程跃赚,直接運行笆搓,一般測試和開發(fā)使用,一臺機器就可以運行,如果是在Liunx满败,跑的是本地肤频,可以直接通過命令運行相應(yīng)的jar包。


偽分布式模式:等同于分布式算墨,但只有一個節(jié)點宵荒,具有集群的配置信息和運行,由于偽分布式只有一臺機器米同,可以不啟動Yarn骇扇,那么也就算是Hadoop的HDFS啟動了,直接運行MapReduce程序的話面粮,結(jié)果都在HDFS上少孝,不在是在本地,如果需要交由YARN上進行資源調(diào)度和分配任務(wù)熬苍,則需要配置Yarn地址稍走,以及指定數(shù)據(jù)獲取方式。


????? 完全分布式模式:多個節(jié)點一起運行柴底,可以指定不同節(jié)點干不同的活婿脸,比如機器1干NameNode的活,機器2干ResourceManger的活柄驻。


注意:啟動NameNode時狐树,DataNode會記錄NameNode信息(id),當(dāng)緩存的NameNode記錄刪除了鸿脓,這個時候啟動就會報錯抑钟,這個時候就需要將NameNode格式化(刪除所有數(shù)據(jù)),之后在重新啟動野哭。


HDFS

HDFS是什么在塔?

????? HDFS就是一個分布式文件存儲系統(tǒng),通過目錄樹來定位文件拨黔,由于分布式特點那么集群中的服務(wù)器就有各自的角色蛔溃。



特點

????? 低成本:由于是眾多服務(wù)器組成的,那么在某服務(wù)器掛了篱蝇,只需要付出一臺廉價的服務(wù)器贺待。

????? 高容錯性:HDFS是由眾多服務(wù)器實現(xiàn)的分布式存儲,每個文件都會有冗余備份零截,那么如果存儲數(shù)據(jù)的某個服務(wù)器掛了狠持,那么還有備份的數(shù)據(jù),允許服務(wù)器出現(xiàn)故障瞻润。

????? 高吞吐量:HDFS是一次寫多次讀的訪問模型,不允許修改文件,并簡化了數(shù)據(jù)的一致性問題绍撞。

????? 就近原則:在數(shù)據(jù)附近執(zhí)行程序正勒,也體現(xiàn)出來移動計算比移動數(shù)據(jù)效率高。

????? 可移植性:HDFS可以實現(xiàn)不同平臺之間的移植傻铣。



應(yīng)用場景

????? 一次寫入章贞,多次讀取,且不支持文件的修改非洲。

????? 適合數(shù)據(jù)分析場景鸭限,不適合網(wǎng)盤應(yīng)用。



HDFS數(shù)據(jù)塊

????? HDFS的文件在物理上是分塊存儲的两踏,1.x版本的數(shù)據(jù)塊默認(rèn)大小是64MB败京,2.x版本的數(shù)據(jù)塊默認(rèn)塊大小是128MB,這個值是可以通過配置參數(shù)(dfs.blocksize)進行調(diào)整的梦染。

????? HDFS的塊比磁盤的塊大赡麦,目的就在于要減少尋址的開銷(標(biāo)準(zhǔn):尋址時間只占傳輸時間的1%),如果塊設(shè)置的夠大帕识,從磁盤傳輸數(shù)據(jù)的時間明顯就大于定位這個塊開始位置所需要的文件泛粹,因此傳輸一個由多個塊組成的文件的時間取決于磁盤傳輸速率。



HDFS常用命令(和Liunx差不多)

基本命令:hadoop fs

查看幫助:hadoop fs 或 hadoop fs -help(詳情)

創(chuàng)建目錄:hadoop fs -mkdir /usr

查看目錄信息:hadoop fs -ls /usr

本地剪切肮疗,粘貼到集群:hadoop fs -moveFromLocal test.txt /usr/

追加一個文件到已存在文件的末尾:hadoop fs -appendToFile test2.txt /usr/test.txt

顯示文件內(nèi)容:hadoop fs -cat /usr/test.txt

顯示一個文件末尾:hadoop fs -tail /usr/ test.txt

以字符形式打印一個文件內(nèi)容:hadoop fs -text /usr/test.txt

修改文件所屬權(quán)限(-chgrp晶姊、-chomd、chown)(liunx一樣用法):hadoop fs -chmod777 /usr/test.txt

從本地復(fù)制到hdfs:hadoopfs -copyFormLocal text.txt /usr/test

hdfs復(fù)制到本地:hadoop fs -copyToLocal /usr/ text.txt ./

從hdfs路徑拷貝到hdfs另一個路徑:hadoop fs -cp /usr/dir1 /usr/dir2

在hdfs目錄中移動文件:hadoop fs -mv /usr/test.txt /usr/dir

從hdfs下載文件到本地:hadoop fs -get /usr/test.txt ./

合并下載多個文件:hadoop fs -getmerge /usr /*.txt ./result.txt

上傳文件等于copyFormLocal:hadoop fs -put test.txt /usr

刪除文件或文件夾:hadoop fs -rmr /usr/test.txt

刪除空目錄:hadoop fs -rmdir /usr/test3

統(tǒng)計文件系統(tǒng)可用空間信息(-h格式化信息):hadoop fs -df -h

統(tǒng)計文件夾大小信息:hadoop fs -du -h /

統(tǒng)計制定目錄下的文件節(jié)點數(shù)據(jù)量(嵌套級伪货,當(dāng)前文件個數(shù)们衙,大小):hadoop fs -count -h /usr

設(shè)置文件的副本數(shù):hadoop fs -setrep 3 /usr/test.txt


NameNode

NameNode和SecondaryNameNode工作機制


第一階段:NameNode的工作

????? 1、第一次啟動namenode格式化后超歌,創(chuàng)建fsimage和edits文件砍艾,如果不是第一次啟動,直接加載編輯日志和鏡像文件到內(nèi)存巍举。

????? 2脆荷、客戶端對元數(shù)據(jù)進行操作請求

????? 3、NameNode記錄操作日志懊悯,更新滾動日志蜓谋。

????? 4、NameNode在內(nèi)存中對數(shù)據(jù)進行操作


第二階段:Secondary NameNode的工作

????? 1炭分、Secondary NameNode詢問NameNode是否需要checkpoint桃焕,直接帶回NameNode檢查結(jié)果。

????? 2捧毛、Secondary NameNode請求執(zhí)行checkpoint

????? 3观堂、NameNode滾動正在寫的eits日志

????? 4让网、將滾動前的編輯日志和鏡像文件拷貝到Secondary NameNode

????? 5、Secondary NameNode加載編輯日志和鏡像文件到內(nèi)存并且合并

????? 6师痕、生成新的鏡像文件fsimage.chkpoint

????? 7溃睹、拷貝fsimage.chkpoint到NameNode

????? 8、NameNode將fsimage.chkpoint重命名為fsimage



說明

Fsimage文件:HDFS文件系統(tǒng)元數(shù)據(jù)的一個永久檢查點胰坟,其中包含HDFS文件系統(tǒng)所有目錄和文件因篇,以及node序列化信息。


Edits文件:存放HDFS文件系統(tǒng)的所有更新操作笔横,文件系統(tǒng)客戶端執(zhí)行的所有寫操作日志都會記錄到edits文件竞滓。


Secondary

NameNode在主NameNode掛了,可以從Secondary NameNode中恢復(fù)數(shù)據(jù)吹缔,但是由于同步的條件限制商佑,會出現(xiàn)數(shù)據(jù)不一致。



DataNode

工作機制


集群安全模式

????? NameNode啟動時涛菠,受限將鏡像文件加載進去內(nèi)存莉御,并編輯日志文件中的各項操作,一旦內(nèi)存中成功建立文件系統(tǒng)元數(shù)據(jù)鏡像俗冻,則創(chuàng)建一個新的fsimage文件和一個空的編輯日志礁叔。

????? 此時的NameNode開始監(jiān)聽DataNode請求,但此刻迄薄,NameNode是運行在安全模式琅关,則此時NameNode文件系統(tǒng)對于客戶端來說是只可讀

????? 系統(tǒng)中數(shù)據(jù)塊文件并不是由NameNode維護的讥蔽,而是以塊列表的形式存儲在DataNode涣易,在系統(tǒng)正常操作期間,NameNode會在內(nèi)存中保留所有塊位置影像信息冶伞。

????? 在安全模式下新症,各個DataNode會向NameNode發(fā)送最新的塊列表信息,NameNode了解到足夠多的塊信息之后响禽,即可高效運行文件系統(tǒng)徒爹。

????? 如果滿足最小復(fù)本條件,NameNode會在30秒后就退出安全模式芋类,最小復(fù)本條件指的是整個文件系統(tǒng)中99%的塊都滿足最小復(fù)本級別隆嗅,在啟動一個剛剛格式化的HDFS集群時,因為系統(tǒng)中還沒有塊侯繁,所以NameNode不會進入安全模式胖喳。

????? 集群啟動完成后自動退出安全模式。


安全模式的應(yīng)用場景

????? 銀行對賬贮竟、維護丽焊。



Java操作HDFS

Demo

public static void main(String[] args) throws IllegalArgumentException, IOException,

InterruptedException, URISyntaxException {


??????? //配置信息

??????? Configurationconfiguration = new Configuration();


??????? //獲取文件系統(tǒng)

??????? FileSystemfileSystem= FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "levi");


??????? //拷貝本地文件到集群

??????? fileSystem.copyFromLocalFile(new Path("e:/hdfs/test.txt"), new Path("/usr/hdfs/test.txt"));


??????? //關(guān)閉

??????? fileSystem.close();

}


HDFS數(shù)據(jù)流

IO流寫流程


IO流方式上傳文件 (Java)

public void fileUpload() throws IOException, InterruptedException,

URISyntaxException {

??????? //配置

??????? Configurationconfiguration = new Configuration();


??????? //文件系統(tǒng)

??????? FileSystemfileSystem= FileSystem.get(new URI("hdfs://hadoop102:8020"),configuration,"levi");


??????? //獲取輸出流(上傳到服務(wù)器) - 服務(wù)器

??????? FSDataOutputStreamfsDataOutputStream = fileSystem.create(new Path("/usr/hdfs/test03.txt"));


??????? //文件輸入流(本地上傳)

??????? FileInputStreamfileInputStream = new java.io.FileInputStream(new File("E:/hdfs/test03.txt"));


??????? //流對接

??????? IOUtils.copyBytes(fileInputStream, fsDataOutputStream, configuration);


??????? fsDataOutputStream.hflush();

??????? IOUtils.closeStream(fileInputStream);

??????? IOUtils.closeStream(fsDataOutputStream);


??????? //關(guān)閉

??????? fileSystem.close();


??? }


IO流讀流程



IO流方式下載文件 (Java)

public void readFile() throws IOException, InterruptedException,

URISyntaxException {

?????? //配置

?????? Configurationconfiguration = new Configuration();


?????? //文件系統(tǒng)

?????? FileSystemfileSystem= FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "levi");


?????? //輸入流(下載) 服務(wù)器

?????? FSDataInputStreamfsDataInputStream = fileSystem.open(new Path("/usr/hdfs/hadoop-2.7.2.tar.gz"));


?????? //輸出(本地)

?????? FileOutputStreamfileOutputStream = new FileOutputStream(new File("E:/hdfs/block.txt"));


?????? //流對接

?????? //--- 第一塊

?????? /*byte[]buf= newbyte[1024];

?????? for (inti = 0; i <1024*128; i++) {

?????????? fsDataInputStream.read(buf);

?????????? fileOutputStream.write(buf);

?????? }

?????? //關(guān)閉

?????? fsDataInputStream.close();

?????? fileOutputStream.close();*/


?????? //--- 第二塊

?????? fsDataInputStream.seek(1024 * 1024 * 128);//定位這個位置開始讀

?????? IOUtils.copyBytes(fsDataInputStream, fileOutputStream, 1024);

?????? IOUtils.closeStream(fileOutputStream);

?????? IOUtils.closeStream(fsDataInputStream);


?????? fileSystem.close();

??? }


副本節(jié)點選擇

????? 在海量數(shù)據(jù)的處理中较剃,節(jié)點之間的數(shù)據(jù)傳輸速率是很重要,特別是在帶寬很稀缺的情況下粹懒,而節(jié)點和節(jié)點之間的距離越遠(yuǎn)重付,那么必然會影響數(shù)據(jù)的傳輸。

????? 在成千的服務(wù)器集群中凫乖,Hadoop是怎么選擇副本節(jié)點呢?


低版本Hadoop

第一個副本在客戶端所處的節(jié)點上弓颈,但是如果客戶端是在集群外帽芽,隨機選取一個節(jié)點

第二個副本和第一個副本位于不同機架的隨機節(jié)點上,也就是不和第一個副本在相同機架翔冀。

第三個副本和第二個副本位于相同機架导街,節(jié)點隨機


Hadoop2.5版本以上

????? 第一個副本在客戶端所處節(jié)點上。如果客戶端在集群外纤子,隨機選一個

????? 第二個副本和第一個副本位于相同機架搬瑰,隨機節(jié)點

????? 第三個副本位于不同機架,節(jié)點隨機


HDFS誤區(qū)

小文件存儲

每個文件均按照塊存儲控硼,每個塊的元數(shù)據(jù)存儲在NamNode的內(nèi)存中(一個文件/目錄/文件塊一般占有150字節(jié)的元數(shù)據(jù)內(nèi)存空間)泽论,因此Hadoop存儲小文件會非常低效,因為大量小文件會耗盡NameNode中大部分內(nèi)存卡乾,但存儲小文件所需要的磁盤容量和存儲這些文件原始內(nèi)容所需要的磁盤空間相比也不會增多翼悴。


例如:上傳一個文件1MB,那么這個文件會在HDFS中的一個塊存儲著幔妨,這個塊默認(rèn)是128MB鹦赎,那么是不是占用了128MB的磁盤空間呢?

????? 每一個塊128MB只是HDFS的邏輯上的劃分误堡,所以在磁盤占用空間還是1MB古话,只有當(dāng)一個或多個文件在一個塊內(nèi)超過128MB,之后將這個文件進行切割锁施。




副節(jié)點處理

HDFS是先把當(dāng)前這個節(jié)點處理完陪踩,在去處理副本節(jié)點的。



回收站

????? 回收站默認(rèn)是不啟用的沾谜,在core-site.xml文件中的配置fs.trash.interval默認(rèn)是為0.


HDFS全過程


MapReduce

MapReduce是什么膊毁?

????? MapReduce是一個分布式運算程序的編程框架,是用戶開發(fā)基于Hadoop的數(shù)據(jù)分析應(yīng)用的核心框架基跑。

????? MapReduce核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個完整的分布式運算程序婚温,并發(fā)的運行在一個Hadoop集群上。


作用

????? 由于硬件資源限制媳否,海量數(shù)據(jù)無法在單機上處理栅螟,單機版程序擴展到集群進行分布式運算荆秦,增加程序的復(fù)雜度和開發(fā)難度。

????? MapReduce框架就是要使得開發(fā)人員開源將絕大部分工作集中在業(yè)務(wù)邏輯的開發(fā)上力图,而分布式運算的復(fù)雜性交由MapReduce來處理步绸。



特點

????? 適合數(shù)據(jù)復(fù)雜度運算

????? 不適合算法復(fù)雜度運算

????? 不適合實時計算、流式計算


核心思想


分布式的運算程序最少需要分成兩個階段:

第一個階段:MapTask并發(fā)實例吃媒,完全并行運行瓤介,互不相干

第二個階段:ReduceTask并發(fā)實例,互不相干赘那,但是他們的數(shù)據(jù)依賴于上一個階段的所有MapTask并發(fā)實例的輸出


MapReduce編程模型只能包含一個Map階段和Reduce階段刑桑,如果用戶的業(yè)務(wù)邏輯非常復(fù)雜,那就只能多個MapReduce程序募舟,串行運行祠斧。

?

總結(jié)

????? Map并行處理任務(wù)(運算)。

????? Reduce:等待相關(guān)的所有Map處理完任務(wù)拱礁,在將任務(wù)數(shù)據(jù)匯總輸出琢锋。

????? MRAppMaster負(fù)責(zé)整個程序的過程調(diào)度和狀態(tài)協(xié)調(diào)。



MapReduce進程

????? 一個完整的MapReduce程序在分布式允許時有三類實例進程:

????? MRAppMaster負(fù)責(zé)整個程序的過程調(diào)度和狀態(tài)協(xié)調(diào)呢灶。

????? MapTask負(fù)責(zé)Map階段的整個數(shù)據(jù)處理流程吴超。

????? ReduceTask負(fù)責(zé)Reduce階段的整個數(shù)據(jù)處理流程。



序列化

????? 序列化就是把內(nèi)存中的對象轉(zhuǎn)換成字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)填抬,以便于存儲(持久化)和網(wǎng)絡(luò)傳輸烛芬。

????? 而序列化就是Map到Reducer的橋梁。


????? Java序列化是一個重量級的序列化框架(Serializable)飒责,使用這個框架進行序列化后會附帶很多額外信息(各種校驗信息赘娄、header等),不便于網(wǎng)絡(luò)傳輸宏蛉,所以Hadoop自己開發(fā)了一套序列化機制(Writable)遣臼,精確、高效拾并。


Java類型Hadoop

? Writable類型

booleanBooleanWritable

byteByteWritable

intIntWritable

floatFloatWritable

longLongWritable

doubleDoubleWritable

stringText

mapMapWritable

arrayArrayWritable


備注:自定義的反序列類中的write方法和read方法中DataOutput和DataInput這兩個類所提供的方法中揍堰,對應(yīng)Java類型String的方法,分別是writeUTF()和readUTF()嗅义。



實例(統(tǒng)計單詞)

public class WordCountMapper extends Mapper<LongWritable, Text, Text,

IntWritable> {


??? private Text key = new Text();


??? @Override

??? protected void map(LongWritable key, Text value, Context context)

?????????? throws IOException, InterruptedException {

?????? //讀取每一行

?????? Stringline = value.toString();


?????? //切割出每一個單詞

?????? String []words = line.split("\t");


?????? //將讀取到的每一個單詞都寫出屏歹,并且值都為1,因為是在map計算完后到reduce進行匯總之碗,形成Key 多個Value

?????? for (String word : words) {

?????????? //每次文件內(nèi)的讀取一行都調(diào)用一次map蝙眶,那樣就形成了調(diào)用多次map,那樣的話就不用創(chuàng)建多個key對象了

?????????? this.key.set(word);

?????????? context.write(this.key, new IntWritable(1));

?????? }

??? }

}


public class WorkCountReducer extends Reducer<Text, IntWritable, Text,

IntWritable>{


??? @Override

??? protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

?????? //這里就形成 多個值的匯總結(jié)果褪那,那么將這個值多個進行匯總后幽纷,統(tǒng)一歸并到一個key式塌,就形成了一個key對應(yīng)多個value

?????? int count = 0;

?????? for (IntWritable value : values) {

?????????? count += value.get();

?????? }

?????? context.write(key, new IntWritable(count));


??? }

}


public class WordCountDriver {

??? public static void main(String[] args) throws Exception {

?????? //配置

?????? Configurationconfiguration = new Configuration();


?????? //任務(wù)運行

?????? Jobjob= Job.getInstance(configuration);

?????? job.setJarByClass(WordCountDriver.class);


?????? //運算類和匯總類

?????? job.setMapperClass(WordCountMapper.class);

?????? job.setReducerClass(WorkCountReducer.class);


?????? //運算和匯總輸入和輸出

?????? job.setMapOutputKeyClass(Text.class);

?????? job.setMapOutputValueClass(IntWritable.class);


?????? //最終輸出

?????? job.setOutputKeyClass(Text.class);

?????? job.setOutputValueClass(IntWritable.class);


?????? //運算文件的輸入和結(jié)果輸出

?????? FileInputFormat.setInputPaths(job, new Path("E:/hadooptest/mapreduce/input"));

?????? FileOutputFormat.setOutputPath(job, new Path("E:/hadooptest/mapreduce/output"));


?????? //提交

?????? job.submit();


?????? //等待

?????? boolean result = job.waitForCompletion(true);

?????? System.exit(result ? 0 : 1);

??? }

}


程序流程分析


1、MapReduce程序讀取輸入目錄存放的相應(yīng)文件友浸。

2峰尝、客戶端在submit方法執(zhí)行之前,獲取到待處理的數(shù)據(jù)信息收恢,讓后根據(jù)急群眾參數(shù)配置形成一個任務(wù)分配規(guī)劃武学。

????? 1、建立連接

????? 2派诬、創(chuàng)建提交任務(wù)的代理(本地:LocalRunner劳淆、遠(yuǎn)程:YarnRunner)

????? 3、創(chuàng)建給集群提交數(shù)據(jù)的stag路徑

????? 4默赂、獲取到任務(wù)id,并創(chuàng)建任務(wù)路徑

????? 5括勺、獲取到任務(wù)jar包缆八,拷貝jar包到集群(這個jar就是程序運行的業(yè)務(wù)代碼)

????? 6、計算切片疾捍,生成切片規(guī)劃文件

computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize=128MB

????? 7奈辰、提交任務(wù),返回提交狀態(tài)


3乱豆、客戶端提交job.split奖恰、jar包、job.xml等文件給Yarn宛裕,Yarn中的resourcemanager啟動MRAppMater瑟啃。

4矩肩、MRAppMater啟動后根據(jù)job的描述信息散劫,計算出需要的MapTask實例數(shù)量,然后向集群申請機器放钦,啟動相應(yīng)數(shù)量的Map Task進程岩榆。

5错负、MapTask利用客戶指定的InputFormat來讀取數(shù)據(jù),形成KV對勇边。

6犹撒、MapTask將輸入KV對傳遞給客戶定義的map()方法,做邏輯運算粒褒。

7识颊、map()運算完畢后將運算結(jié)果的KV對,手機到MapTask緩存怀浆。

8谊囚、MapTask緩存中的KV對按照K分區(qū)排序后不斷寫到磁盤文件怕享。

9、MRAppMaster監(jiān)控到所有MapTask進程任務(wù)完成后镰踏,會根據(jù)用戶指定的參數(shù)啟動相應(yīng)數(shù)量的ReduceTask進程函筋,并告知ReduceTask進程要處理的數(shù)據(jù)分區(qū)。

10奠伪、ReduceTask進程啟動后跌帐,根據(jù)MRAppMaster告知待處理數(shù)據(jù)所在位置,從N臺MapTask運行所在的機器上獲取到N個MapTask輸出結(jié)果文件绊率,并在本地運行重新歸并排序谨敛,按照相同Key的KV為一個組,調(diào)用客戶定義的reduce()方法進行邏輯運算滤否。

11脸狸、ReduceTask運算完畢后,調(diào)用客戶指定的OuputFormat將結(jié)果數(shù)據(jù)輸出(文件)到外部存儲藐俺。


說明:

????? 切片是邏輯上的切片

????? 規(guī)劃文件就是里面描述了切多少個片炊甲,每個片是怎么樣的。



數(shù)據(jù)切片

????? MapTask的并行任務(wù)是否越多越好欲芹?并行度是如何決定的卿啡?MapTask到底開多少個合適?


????? 1菱父、一個job的map()階段并行度(MapTask開幾個)颈娜,由客戶端在提交job時決定。

????? 2浙宜、每一個Split切片分配一個MapTask并行實例處理官辽。

????? 3、默認(rèn)情況下切片大小=塊大小(blocksize)

????? 4梆奈、切片時不考慮數(shù)據(jù)集整體野崇,而是針對每一個文件單獨切片(這個是邏輯上的劃分)



切片流程

????? 1、獲取到數(shù)據(jù)存儲目錄

????? 2亩钟、找到要便利處理目錄下的每一個文件

????? 3乓梨、讀取第一個文件test.txt(257MB)

??????????? 1、獲取文件大小

??????????? 2清酥、計算分片大小扶镀,每次切片時,都要判斷剩下的部分是否大于塊大小的1.1倍焰轻,大于就在劃分一個塊切片

??????????? 切片:

第一塊:128MB

????????????????? 第二塊:129MB / 128MB = 1.0078125

????????????????? 1.0078125< 1.1 =不在切片臭觉,反之繼續(xù)切

????????????????? 源碼:computeSliteSize(Math.max(minSize,Math.max(naxSize,blocksize)));

??????????? 3、將切片信息寫到一個切片規(guī)劃文件(說明文件)中

??????????? 4、整個切片的核心過程在于getSplit()方法(看submit()源碼)中完成蝠筑,數(shù)據(jù)切片只是邏輯上對輸入數(shù)據(jù)進行切片狞膘,并不會在磁盤上,將文件切分進行存儲什乙。

??????????? InputSplit只是記錄了分片的元數(shù)據(jù)信息挽封。比如:起始位置、長度臣镣、所在的節(jié)點列表等辅愿。


注意:塊是HDFS上物理存儲的數(shù)據(jù),切片只是邏輯上的劃分忆某。

??????????? 5点待、提交切片規(guī)劃文件(說明文件)到Y(jié)arn上,Yarn上的MrAppMaster就根據(jù)切片規(guī)劃文件(說明文件)計算開啟的MapTask個數(shù)(多少個切片就多少個MapTask)弃舒。



FileInputFormat中默認(rèn)的切片機制

????? 1癞埠、簡單按照文件內(nèi)容長度切片

????? 2、切片大小聋呢,默認(rèn)是塊大小

????? 3燕差、切片時不考慮數(shù)據(jù)集整體性,而是逐個文件的單獨切片坝冕,循環(huán)遍歷每一個文件。


????? MaxSize(切片最大值):如果比塊大小還小瓦呼,則會讓切片變小喂窟。

MinSize(切片最小值):如果比塊大小還大,則會讓切片變得比塊還大央串。


假設(shè):塊大小128MB

?????????????????????? MaxSize設(shè)為100MB

?????????????????????? 切片后的存儲占塊大小100MB



小文件切片處理

????? 如果有大量的小文件磨澡,而每一個文件都是一個單獨的切片,都會各自交給一個MapTask處理质和,那么需要開啟大量的MapTask稳摄,則會產(chǎn)生大量的MapTask,導(dǎo)致處理效率低下饲宿。


解決方案

????? 1厦酬、在數(shù)據(jù)處理前端,先把小文件合并成大文件瘫想,在上傳到HDFS做后續(xù)分析

????? 2仗阅、如果已經(jīng)有大量的小文件存在HDFS,使用CombineFileInputFormat進行處理国夜,CombineFileInputFormat的切片邏輯跟TextFileInputFormat不同减噪,他可以將多個小文件邏輯上規(guī)劃到一個切片中,這樣多個小文件就可以交給一個MapTask。

????? 3筹裕、優(yōu)先滿足最小切片大小醋闭,不超過最大切片大小的前提下。




文件合并

//-------- 使用提供的自定義類朝卒,指定切片大小

job.setInputFormatClass(CombineTextInputFormat.class);

//最大輸入切片大小证逻,一個文件的大小是4M就開始切,算法是1.1倍

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

//最小輸入切片大小扎运,多個文件合并到了一起瑟曲,到了2M就切,算法是1.1倍豪治,優(yōu)先滿足最小切片大小

CombineTextInputFormat.setMinInputSplitSize(job, 2097152);


備注:在運行日志中查找number of就可以看到了


Shuffle機制

1洞拨、在MapReduce中,Map階段處理的數(shù)據(jù)如何傳遞給Reduce階段額负拟,是MapReduce框架中關(guān)機的一個流程烦衣,這個流程就叫Shuffle。

????? 2掩浙、Shuffle(洗牌花吟、發(fā)牌):核心就是數(shù)據(jù)分區(qū)、排序厨姚、緩存

????? 3衅澈、MapTaks輸出處理結(jié)果數(shù)據(jù),分發(fā)給ReduceTask并在分發(fā)過程中對數(shù)據(jù)按照Key進行分區(qū)和排序谬墙。



Shuffle機制緩存流程圖

????? Shuffle是MapReduce處理流程中一個過程今布,每一個步驟都是分散在各個MapTask和ReduceTask節(jié)點上。


MapReduce詳細(xì)運行流程



總結(jié)

????? MapReduce詳細(xì)運行流程圖就是一個流水線一般的作業(yè)拭抬,從左向右過去部默,而在開發(fā)的過程中,需要使用到什么組件造虎,這些組件會起到什么作用傅蹂,在哪一個時間起作用,都可以在這個圖中詳細(xì)的描述


分區(qū)

自定義分區(qū)

public classMyPartitionerextends Partitioner<Text, FlowBean>{

??? @Override

??? public int getPartition(Text key, FlowBean value, int numPartitions) {

?????? //拿到手機號碼前三位

?????? StringphoneNum = key.toString().substring(0, 3);


?????? //建立5個分區(qū)算凿,從0開始

?????? int partition = 4;


?????? //判斷

?????? if("135".equals(phoneNum)) {

?????????? partition = 0;


?????? }else if("136".equals(phoneNum)) {

?????????? partition = 1;


?????? }else if("137".equals(phoneNum)) {

?????????? partition = 2;


?????? }else if("138".equals(phoneNum)) {

?????????? partition = 3;

?????? }


?????? return partition;

??? }

}


?//設(shè)置分區(qū)類

?????? //如果沒有設(shè)置分區(qū)份蝴,那么則會按照塊大小去計算什么時候進行分區(qū)

?????? job.setPartitionerClass(MyPartitioner.class);


?????? //設(shè)置ReduceTask數(shù)量

?????? job.setNumReduceTasks(5);


總結(jié)

reduce數(shù)量小于分區(qū)數(shù)量就會報錯

reduce數(shù)量是1澎媒,那么則所有結(jié)果輸出到一個文件內(nèi)搞乏,即便配置了分區(qū)也不會去跑分區(qū)的代碼(執(zhí)行分區(qū))

reduce數(shù)量大于分區(qū)數(shù)量,輸出的其他文件為空

分區(qū)數(shù)量 = reduce數(shù)量戒努,按照分區(qū)數(shù)量輸出結(jié)果文件數(shù)量


分區(qū)就是對map的結(jié)果數(shù)據(jù)進行二次處理请敦,從而再去決定是否影響輸出的reduce結(jié)果輸出镐躲。


排序

????? MapTask和ReduceTask均會對數(shù)據(jù)(按Key排序)進行排序,這個操作屬于Hadoop默認(rèn)行為侍筛,任何應(yīng)用程序中的數(shù)據(jù)均會被排序萤皂,而不管邏輯上是否需要。

????? 對于MapTask匣椰,他會把處理的結(jié)果暫時放到一個緩沖區(qū)裆熙,當(dāng)緩沖區(qū)使用率達(dá)到了閾值就對緩沖區(qū)的數(shù)據(jù)進行排序,并將這些有序的數(shù)據(jù)寫到磁盤上禽笑,而當(dāng)數(shù)據(jù)處理完后入录,他會對磁盤上所有文件進行一次合并,將這些文件合并成一個有序的文件佳镜。

????? 對于ReduceTask僚稿,他從每個MapTask遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)文件,如果文件大小超過一定閾值蟀伸,則放到磁盤上蚀同,否則放到內(nèi)存中,如果磁盤上文件數(shù)目達(dá)到一定閾值啊掏,則進行一次合并蠢络,生成一個更大的文件,如果內(nèi)存文件大小或數(shù)目超過閾值迟蜜,則進行合并后將數(shù)據(jù)寫出到磁盤上,當(dāng)所有的數(shù)據(jù)拷貝完畢后娜睛,再統(tǒng)一的對內(nèi)存核磁盤上的所有文件進行一次合并芦疏。


自定義排序

public class FlowBean implements WritableComparable<FlowBean>

{


??? private Long sum;


??? public FlowBean() {

?????? super();

??? }


??? @Override

??? public void write(DataOutput dataOutput) throws IOException {

?????? dataOutput.writeLong(sum);

??? }


??? @Override

??? public void readFields(DataInput dataInput) throws IOException {

?????? sum = dataInput.readLong();

??? }


??? public Long getSum() {

?????? return sum;

??? }


??? public void setSum(Long sum) {

?????? this.sum = sum;

??? }


??? @Override

??? public String toString() {

?????? return sum.toString();

??? }


??? @Override

??? public int compareTo(FlowBean o) {

?????? return this.sum > o.getSum() ? -1 : 1;

??? }

}


總結(jié)

Shullt規(guī)定Key是要進行排序的,如果作為Key是必須要實現(xiàn)WritableComparable接口的微姊。

Combiner合并

ReducrTask是接收總的MapTask結(jié)果,Combiner在每一個MapTask運行的分预,對每每個MapTask的結(jié)果匯總(局部匯總)兢交,將MapTask匯總后之后進行壓縮傳輸,可以減少網(wǎng)絡(luò)傳輸量笼痹。

????? 但是Combiner的前提是不能影響到最終的業(yè)務(wù)邏輯配喳,如果是累加求和是沒有問題的,如果是求平均值就有問題的凳干。

如:

1晴裹、在每一個MapTask進行求平均值之后在ReduceTask再求一次平均值,結(jié)果是不一樣的救赐。

????? 2涧团、將MapTask的數(shù)據(jù)全部匯總到ReduceTask之后再求平均值。


這兩種結(jié)果不一樣的。



自定義Combiner合并

public class WordCountCombiner extends Reducer<Text, IntWritable, Text,

IntWritable>{

??? @Override

??? protected void reduce(Text key, Iterable<IntWritable> values,

?????????? Contextcontext)throws IOException, InterruptedException {

?????? int count = 0;

?????? for (IntWritable intWritable : values) {

?????????? count += intWritable.get();

?????? }

?????? context.write(key, new IntWritable(count));

??? }

}

//reduce是接收總的MapTask匯總泌绣,combiner在每一個maptask運行的钮追,對每一個maptask匯總

//如:每一個maptask都進行匯總,之后進行壓縮傳輸

job.setCombinerClass(WordCountCombiner.class);


分組

????? 就是對分區(qū)排序好的數(shù)據(jù)阿迈,在進行一次合并分類開來元媚,再一次合并的話,就有個比較標(biāo)識苗沧,如果兩個數(shù)據(jù)標(biāo)識是一樣的刊棕,就認(rèn)為是一組數(shù)據(jù),最后過濾去重待逞,最終得到有哪些組甥角。


自定義分組

public classOrderGoupingComparatorextends WritableComparator {


??? protected OrderGoupingComparator() {

?????? super(OrderBean.class, true);

??? }

??? @Override

??? public intcompare(WritableComparablea,WritableComparableb) {

?????? OrderBeanabean = (OrderBean) a;

?????? OrderBeanbbean = (OrderBean) b;

?????? // 將orderId相同的bean都視為一組

?????? return abean.getOrderId().compareTo(bbean.getOrderId());


??? }

}

//設(shè)置Reduce端分組

job.setGroupingComparatorClass(OrderGoupingComparator.class);


//分區(qū)

job.setPartitionerClass(OrderPartition.class);

job.setNumReduceTasks(3);


自定義InputFormat

????? 對小文件的輸入進行合并處理。


1飒焦、設(shè)置文件不可切割

2蜈膨、讀取到整個文件,并且整個文件的數(shù)據(jù)作為value輸出給MapTask(將分片傳進去去讀取后牺荠,將讀取到的所有分片數(shù)據(jù)合并給到MapTask)

3翁巍、MapTask在對合并后的數(shù)據(jù)做操作

public class DistriutedCacheMapper extends Mapper<LongWritable, Text, Text,

NullWritable>{


??? private Map<String, String> map = new HashMap<>();


??? private Text key = new Text();


??? @Override

??? protected void setup(Mapper<LongWritable, Text,

Text, NullWritable>.Context context)

?????????? throws IOException, InterruptedException {

?????? //獲取緩存文件,這個文件給加載進了hadoop系統(tǒng)了休雌,在緩存中的根灶壶,可以直接通過名字調(diào)用

?????? BufferedReaderbufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(new File("pd.txt"))));

?????? Stringline;

?????? while(StringUtils.isNotEmpty(line = bufferedReader.readLine())) {

?????????? //數(shù)據(jù)處理

?????????? String []strings = line.split("\t");


?????????? //將數(shù)據(jù)放到緩存集合中

?????????? map.put(strings[0], strings[1]);

?????? }

?????? bufferedReader.close();

??? }


??? @Override

??? protected void map(LongWritable key, Text value, Context context)

?????????? throws IOException, InterruptedException {

?????? //讀取到order的每一行

?????? String[]strings = value.toString().split("\t");


?????? StringorderId = strings[0];


?????? Stringname = map.get(orderId);


?????? this.key.set(value.toString() + "\t" + name);

?????? context.write(this.key, NullWritable.get());

??? }

}


自定義OutputFormat

????? 獲取到ReduceTask的運行結(jié)果,自定義要輸出的結(jié)果數(shù)據(jù)和文件

public class FilterRecordWriter extends RecordWriter<Text,

NullWritable>{


??? private FileSystem fileSystem;

??? private FSDataOutputStream aCreate;

??? private FSDataOutputStream oCreate;


??? publicFilterRecordWriter(TaskAttemptContext job) {

?????? try {

?????????? fileSystem= FileSystem.get(job.getConfiguration());

?????????? //創(chuàng)建輸出文件路徑

?????????? PathaPath = new Path("E:\\hadooptest\\mapreduce\\a.log");

?????????? PathoPath = new Path("E:\\hadooptest\\mapreduce\\o.log");


?????????? //創(chuàng)建輸出流

?????????? aCreate = fileSystem.create(aPath);

?????????? oCreate = fileSystem.create(oPath);

?????? }catch (IOException e) {

?????????? e.printStackTrace();

?????? }

??? }


??? @Override

??? public void write(Text key, NullWritable value) throws IOException, InterruptedException {

?????? if(key.toString().contains("levi")) {

?????????? aCreate.write(key.toString().getBytes());

?????? }else {

?????????? oCreate.write(key.toString().getBytes());

?????? }

??? }


??? @Override

??? public void close(TaskAttemptContext context) throws IOException, InterruptedException {

?????? if(null != aCreate) {

?????????? aCreate.close();

?????? }

?????? if(null != oCreate) {

?????????? oCreate.close();

?????? }

??? }

}


//設(shè)置輸出類為自定義輸出類

job.setOutputFormatClass(FliterOutputFormat.class);

//雖然自定義了一個輸出杈曲,但是還是要輸出驰凛,因為有個成功狀態(tài)標(biāo)識文件要輸出,不然會報錯

FileOutputFormat.setOutputPath(job, new Path("E:\\hadooptest\\mapreduce\\output"));


計數(shù)器

Hadoop為每一個作業(yè)維護了若干個內(nèi)置計算器担扑,以描述多項指標(biāo)恰响,例如:某些計數(shù)器記錄已處理的字節(jié)數(shù)等。


計數(shù)器的使用

context.getCounter("counterGroup組名","countera變量"),increment(1);

說明:計數(shù)器的結(jié)果在程序運行后的控制臺日志中可查看



總結(jié)

????? HDFS根據(jù)配置在各個節(jié)點存儲數(shù)據(jù)涌献,并且存儲相應(yīng)的副本數(shù)據(jù)胚宦。

????? MapReduce就是在需要執(zhí)行無論是MapTask或ReduceTask的時候,會先去ResouceManager去詢問燕垃,任務(wù)要在哪里運行枢劝,其實ResourceManger就是看要運行這個任務(wù)的輸入數(shù)據(jù)在哪個節(jié)點,從而去告知這個節(jié)點執(zhí)行任務(wù)卜壕,那么就形成了直接移動計算您旁,而不是移動數(shù)據(jù)的方式。

因為數(shù)據(jù)可能存儲在服務(wù)器1或服務(wù)器2…服務(wù)器轴捎,那么不需要移動數(shù)據(jù)鹤盒,負(fù)責(zé)執(zhí)行任務(wù)的服務(wù)器蚕脏,到指定的路徑,下載要運算的任務(wù)jar包昨悼,直接在本地運行蝗锥,那么當(dāng)數(shù)據(jù)非常大的時候就不用去移動數(shù)據(jù)。


YARN

Yarn是什么率触?

????? Yarn是一個資源調(diào)度平臺终议,負(fù)責(zé)為運算程序提供服務(wù)器運算資源,相當(dāng)于一個分布式的操作系統(tǒng)平臺葱蝗,在之前說了穴张,可以配置MapReduce在Yarn之上運行,所以MapReduce等運算程序則相當(dāng)于運行于操作系統(tǒng)之上的應(yīng)用程序两曼。


Yarn機制

????? 1皂甘、不需要知道用戶提交的程序運行機制,只要符合Yarn規(guī)范的資源請求機制即可使用Yarn悼凑,Spark偿枕、Storm等運算框架都可以整合在Yarn上運行,意味著與用戶程序完全解耦户辫。

????? 2渐夸、只提供運算資源的調(diào)度,程序向Yarn申請資源渔欢,Yarn負(fù)責(zé)分配資源

????? 3墓塌、Yarn總的資源調(diào)度是ResourceManager,提供運算資源的角色叫NodeManager奥额。

????? 4苫幢、Yarn作為一個通用的資源調(diào)度平臺,企業(yè)以前存在的各種運算集群都可以整合在一個物理集群上垫挨,提高資源利用率韩肝,方便數(shù)據(jù)共享。


Yarn作業(yè)流程



1九榔、客戶端將MapReduce程序提交到客戶端所在的節(jié)點伞梯。

2、YarnRunner就向RsourceManager申請一個Application帚屉。

3、RsourceManager內(nèi)部運行一下漾峡,看看哪個節(jié)點離提交申請節(jié)點近攻旦,以及系統(tǒng)資源等,內(nèi)部運行完了生逸,就將應(yīng)用程序資源路徑返回給YarnRunner牢屋。

4且预、程序就將運行程序所需要的資源提交到HDFS上。

5烙无、程序資源提交完后锋谐,申請運行MRAppMaster。

6截酷、RsourceManager將用戶請求轉(zhuǎn)化為一個task(任務(wù))涮拗,并尋找最適合的NodeManager,并將任務(wù)分配給這個NodeManager迂苛。

7三热、NodeManager領(lǐng)取到任務(wù)后,創(chuàng)建容器(Container)三幻,并產(chǎn)生MRAppMaster就漾。

8、MRAppMaster向RsourceManager申請運行N個MapTask容器(切片文件中有說明)念搬。

9抑堡、RsourceManager又尋找了一下,將MapTask分配給另外兩個NodeManager朗徊,這兩個NodeManager領(lǐng)取到任務(wù)首妖,并且創(chuàng)建容器(Container)。

10荣倾、RsourceManager告知申請運行MapTask容器的NodeManger悯搔,向那兩個接受到任務(wù)的NodeManager發(fā)送程序啟動腳本,這兩個NodeManger就分別啟動MapTask舌仍,MapTask對數(shù)據(jù)進行分區(qū)排序妒貌。

11、MRAppMaster看程序都跑完了铸豁,趕緊申請2個容器灌曙,運行ReduceTask。

12节芥、ReduceTask的容器向MapTask容器獲取相應(yīng)分區(qū)的數(shù)據(jù)在刺,并執(zhí)行任務(wù)。

13头镊、程序運行完畢后蚣驼,MapResource會向RsourceManager注銷自己。


Hadoop – HelloWorld

準(zhǔn)備

1相艇、三臺機器

2颖杏、ssh

3、防火墻



配置

JAVA_HOME

hadoop-env.sh

yarn-env.sh

mapred-env.sh



core-site.xml

<!-- 指定HDFS中NameNode的地址-->

fs.defaultFS

hdfs://hadoop-senior00-levi.com:8082


<!-- 指定hadoop運行時產(chǎn)生文件的存儲目錄-->

hadoop.tmp.dir

/opt/module/hadoop-2.5.0-cdh5.3.6/data/tmp



yarn-site.xml


<!-- reducer獲取數(shù)據(jù)的方式-->

yarn.nodemanager.aux-services

mapreduce_shuffle


<!-- 指定YARN的ResourceManager的地址-->

yarn.resourcemanager.hostname

hadoop-senior01-levi.com


<!-- 任務(wù)歷史服務(wù)-->

?????

??????????? yarn.log.server.url

??????????? http://hadoop-senior00-levi.com:19888/jobhistory/logs/



hdfs-site.xml

<!-- 指定seconddaryNameNode地址坛芽,主要這個是避免NameNode掛了? -->

dfs.namenode.secondary.http-address

hadoop-senior02-levi.com:50090


<!-- 指定name.dir留储,默認(rèn)就是翼抠,但是避免未啟用,設(shè)置一下-->

?dfs.namenode.name.dir

?/opt/module/hadoop-2.5.0-cdh5.3.6/data/tmp/name



mapred-site.xml

<!-- 指定mr運行在yarn上-->

mapreduce.framework.name

yarn


<!-- 配置 MapReduce JobHistory Server 地址 获讳,默認(rèn)端口10020-->

????? mapreduce.jobhistory.address

????? hadoop-senior00-levi.com:10020


<!-- 配置 MapReduce JobHistory Server web ui 地址阴颖, 默認(rèn)端口19888-->

????? mapreduce.jobhistory.webapp.address

????? hadoop-senior00-levi.com:19888




slaves

hadoop-senior00-levi.com

hadoop-senior01-levi.com

hadoop-senior02-levi.com



Hadoop-HA

Hadoop為什么要有HA?

????? 我們都知道NameNode是存儲了所有數(shù)據(jù)的路徑丐膝,在Hadoop第一個版本是沒有HA量愧,單臺的NameNode節(jié)點掛了,那么整個數(shù)據(jù)就沒辦法訪問了尤误,

????? 那個的工程師就自己寫一個腳本去解決這個問題侠畔,定時的拷貝NameNode的fsimage和edits到別的服務(wù)器,但是數(shù)據(jù)量大的時候损晤,拷貝就很慢了软棺,而且工程師半夜正在和周公下棋的時候,NameNode掛了尤勋,那就很尷尬了喘落。

????? 雖然可以到第二天早上來恢復(fù),但是數(shù)據(jù)量那么大的時候最冰,太慢了瘦棋,滿足不了需求。

????? 所以Hadoop為了解決這個問題暖哨,在后面的版本繼承了HA(高可用)赌朋。


Hadoop-HA是什么?

????? Hadoop-HA(高可用)就是在一臺服務(wù)器掛了篇裁,第二臺服務(wù)器可以馬上頂上去沛慢。

????? 兩個基本問題:

1、第一臺服務(wù)器和第二臺服務(wù)器的數(shù)據(jù)必須要同步达布。

Hadoop-HA通過edits-log的變化团甲,來將數(shù)據(jù)寫入到JournalNode節(jié)點里面去,以分享給其他的NameNode黍聂。

2躺苦、要解決第一臺和第二臺服務(wù)器同時啟用的情況,在這種情況下产还,子節(jié)點怎么提交數(shù)據(jù)匹厘,會提交到兩臺服務(wù)器,但是又會出現(xiàn)搶占資源的情況脐区,(給一個人送東西和給兩個人送東西所耗費的體力是不言而喻的)愈诚,

這個問題在Hadoop-HA中稱為腦裂,借助第三方框架(Zookeeper)實現(xiàn)隔離機制來解決腦裂這個問題。


Hadoop–HA 實現(xiàn)

NameNode

hdfs-site.xml


???

???????dfs.replication

???? 3




????

????????? dfs.namenode.secondary.http-address

????????? hadoop104:50090




????

???? ?dfs.namenode.checkpoint.period

???? ?120




????

???? ?dfs.namenode.name.dir

???? ?/opt/module/hadoop-2.7.2/data/tmp/dfs/name




????

???? ? ?? dfs.hosts

???? ???? /opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts




????

???? ?dfs.hosts.exclude

???? ?/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude




???

????????? dfs.nameservices

????????? mycluster



???

????????? dfs.ha.namenodes.mycluster

????????? nn1,nn2



???

????????? dfs.namenode.rpc-address.mycluster.nn1

????????? hadoop102:8020



???

????????? dfs.namenode.rpc-address.mycluster.nn2

????????? hadoop103:8020



???

????????? dfs.namenode.http-address.mycluster.nn1

????????? hadoop102:50070



???

????????? dfs.namenode.http-address.mycluster.nn2

????????? hadoop103:50070



???

????????? dfs.namenode.shared.edits.dir

????????? qjournal://hadoop102:8485;hadoop103:8485;hadoop104:8485/mycluster



???

????????? dfs.ha.fencing.methods

????????? sshfence



???

????????? dfs.ha.fencing.ssh.private-key-files

????????? /home/levi/.ssh/id_rsa



???

????????? dfs.journalnode.edits.dir

????????? /opt/module/hadoop/data/jn



???

????????? dfs.permissions.enable

????????? false




???

????????? dfs.client.failover.proxy.provider.mycluster

???? ???? org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider



????

????????? dfs.ha.automatic-failover.enabled

????????? true


core-site.xml



????????? fs.defaultFS

???? ??????hdfs://hadoop102:8020

???? -->



????

????????? fs.defaultFS

????????? hdfs://mycluster




????

????????? hadoop.tmp.dir

????????? /opt/module/hadoop-2.7.2/data/tmp




????

???? ?fs.trash.interval

???? ?1



???? ?

???? ?hadoop.http.staticuser.user

???? ?levi




????

????????? ha.zookeeper.quorum

????????? hadoop102:2181,hadoop103:2181,hadoop104:2181


ResourceManager

hdfs-site.xml



????????? yarn.resourcemanager.hostname

????????? hadoop103

???? -->



???

???????yarn.nodemanager.aux-services

???????mapreduce_shuffle




???

???????yarn.log-aggregation-enable

??????? true



???

???????yarn.log.server.url

???????http://hadoop103:19888/jobhistory/logs/



???

???????yarn.log-aggregation.retain-seconds

??????? 86400




???

???????yarn.resourcemanager.ha.enabled

??????? true




???

???????yarn.resourcemanager.cluster-id

??????? cluster-yarn1



???

???????yarn.resourcemanager.ha.rm-ids

??????? rm1,rm2




???

???????yarn.resourcemanager.hostname.rm1

?????? ?hadoop103




???

???????yarn.resourcemanager.hostname.rm2

??????? hadoop104




???

???????yarn.resourcemanager.zk-address

???????hadoop102:2181,hadoop103:2181,hadoop104:2181




???

???????yarn.resourcemanager.recovery.enabled

??????? true




???

???????yarn.resourcemanager.store.class????

????????? org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末扰路,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子倔叼,更是在濱河造成了極大的恐慌汗唱,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件丈攒,死亡現(xiàn)場離奇詭異哩罪,居然都是意外死亡,警方通過查閱死者的電腦和手機巡验,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進店門际插,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人显设,你說我怎么就攤上這事框弛。” “怎么了捕捂?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵瑟枫,是天一觀的道長。 經(jīng)常有香客問我指攒,道長慷妙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任允悦,我火速辦了婚禮膝擂,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘隙弛。我一直安慰自己架馋,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布驶鹉。 她就那樣靜靜地躺著绩蜻,像睡著了一般。 火紅的嫁衣襯著肌膚如雪室埋。 梳的紋絲不亂的頭發(fā)上办绝,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天,我揣著相機與錄音姚淆,去河邊找鬼孕蝉。 笑死,一個胖子當(dāng)著我的面吹牛腌逢,可吹牛的內(nèi)容都是我干的降淮。 我是一名探鬼主播,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了焊唬?” 一聲冷哼從身側(cè)響起单寂,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎来庭,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體穿挨,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡月弛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了科盛。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片帽衙。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖贞绵,靈堂內(nèi)的尸體忽然破棺而出厉萝,到底是詐尸還是另有隱情,我是刑警寧澤但壮,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布冀泻,位于F島的核電站,受9級特大地震影響蜡饵,放射性物質(zhì)發(fā)生泄漏弹渔。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一溯祸、第九天 我趴在偏房一處隱蔽的房頂上張望肢专。 院中可真熱鬧,春花似錦焦辅、人聲如沸博杖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽剃根。三九已至,卻和暖如春前方,著一層夾襖步出監(jiān)牢的瞬間狈醉,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工惠险, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留苗傅,地道東北人。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓班巩,卻偏偏與公主長得像渣慕,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,724評論 2 351

推薦閱讀更多精彩內(nèi)容

  • 首先逊桦,我們在使用前先看看HDFS是什麼眨猎?這將有助于我們是以后的運維使用和故障排除思路的獲得。 HDFS采用mast...
    W_Bousquet閱讀 4,187評論 0 2
  • HDFS入門 hadoop架構(gòu) Hadoop 1.0中的資源管理方案 Hadoop 1.0指的是版本為Apache...
    依天立業(yè)閱讀 1,047評論 0 1
  • 一强经、系統(tǒng)參數(shù)配置優(yōu)化 1宵呛、系統(tǒng)內(nèi)核參數(shù)優(yōu)化配置 修改文件/etc/sysctl.conf,添加如下配置夕凝,然后執(zhí)行s...
    張偉科閱讀 3,727評論 0 14
  • 翻譯: https://www.cloudera.com/documentation/enterprise/lat...
    金剛_30bf閱讀 2,618評論 1 1
  • 《父愛如山》 作者:你的父親覺空 仁慈出義子, 愛在厲嚴(yán)生户秤, 如存心疑惑码秉, 山水共月明。
    管雪彤閱讀 85評論 0 0