為什么要有Hadoop饲梭?
????? 從計算機誕生到現(xiàn)今,積累了海量的數(shù)據(jù)早歇,這些海量的數(shù)據(jù)有結(jié)構(gòu)化倾芝、半結(jié)構(gòu)化、非
結(jié)構(gòu)的數(shù)據(jù)箭跳,并且這些海量的數(shù)據(jù)存儲和檢索就成為了一大問題蛀醉。
????? 我們都知道大數(shù)據(jù)技術(shù)難題在于一個數(shù)據(jù)復(fù)雜性、數(shù)據(jù)量衅码、大規(guī)模的數(shù)據(jù)計算拯刁。
Hadoop就是為了解決這些問題而出現(xiàn)的。
Hadoop的誕生
????? DougCutting是Lucene的作者逝段,當(dāng)時Lucene面臨和谷歌同樣的問題垛玻,就是海量的數(shù)據(jù)存儲和檢索,于是就誕生了Nutch奶躯。
????? 在這之后帚桩,谷歌的大牛就為解決這個問題發(fā)了三篇論文(GFS、Map-Reduce嘹黔、BigTable)账嚎,這三篇論文總體表達(dá)的意思就是部署多臺廉價的服務(wù)器集群,通過分布式的方式將海量數(shù)據(jù)存儲在這個集群上儡蔓,然后利用集群上的所有機器進行數(shù)據(jù)計算郭蕉,這樣谷歌就不用買很多很貴的服務(wù)器,只需要把普通的機器組合在一起喂江。
Doug Cutting等人就去研究這三篇論文召锈,發(fā)現(xiàn)價值巨大,于是Doug Cutting等人在Nutch上實現(xiàn)了GFS和Map-Reduce获询,使得Nutch的性能飆升涨岁。
????? 于是Doug Cutting等人就把這兩部分納入到Hadoop項目中姓言,主要還是為了將Hadoop項目作為一個大數(shù)據(jù)整體化的解決方案缎浇。
????? 所以為什么后面就出現(xiàn)了Hadoop而不是在Nutch上去做整體化大數(shù)據(jù)解決方案仗扬。
????? 這三篇論文對應(yīng)Hadoop的組件:
????? GFS-> HDFS?????????????????? 文件系統(tǒng)
????? Map-Reduce-> MR??????? ?????計算框架
????? BigTable-> Hbase?????????????? 數(shù)據(jù)庫系統(tǒng)
什么是Hadoop着绷?
????? Hadoop是Apache下的一個分布式系統(tǒng)基礎(chǔ)架構(gòu)税手,主要是為了解決海量數(shù)據(jù)存儲和海量的數(shù)據(jù)計算問題帕棉。
????? 在這個基礎(chǔ)之上發(fā)展出了的更多的技術(shù)灵迫,使得Hadoop稱為大數(shù)據(jù)技術(shù)生態(tài)圈之一空凸。
Hadoop發(fā)行版本
1、Apache版本最原始的版本
2畜疾、Clodera版本赴邻,在大型互聯(lián)網(wǎng)企業(yè)中用的比較多,軟件免費啡捶,通過服務(wù)收費姥敛。
3、Hortonworks文檔比較好
特點
????? 高可靠:維護多個副本瞎暑,假設(shè)計算元素和存儲出現(xiàn)故障時彤敛,可以對失敗節(jié)點重新分布處理
????? 高擴展:在集群間分配任務(wù)數(shù)據(jù),可方便的擴展數(shù)以千計的節(jié)點
????? 高效性:并行工作
????? 高容錯:自動保存多個副本了赌,并且能夠?qū)κ∪蝿?wù)重新分配
Hadoop組成
HDFS:一個高可靠高吞吐量的分布式文件系統(tǒng)
?? NameNode(nn):存儲文件的元數(shù)據(jù)墨榄,如:文件名、文件目錄結(jié)構(gòu)等信息
?? DataNode(dn):在文件系統(tǒng)存儲文件塊數(shù)據(jù)勿她,以及數(shù)據(jù)的校驗和袄秩,也就是真正存儲文件內(nèi)容的,只是文件大的時候會切割成一小塊一小塊的逢并。
?? SecondayNameNode(2nn):用于監(jiān)控HDFS狀態(tài)的輔助后臺程序之剧,每隔一段時間就獲取HDFS的快照,就是備份和監(jiān)控狀態(tài)
Yarn:作業(yè)調(diào)度與集群資源管理框架砍聊。(Hadoop2.0加入)
?? ResourceManager(rm):處理客戶端請求背稼、啟動和監(jiān)控MRAppMaster、監(jiān)控NodeManager玻蝌,以及資源分配和調(diào)度蟹肘。
?? NodeManager(nn):單個節(jié)點上的資源管理、處理來自ResourceManager的命令俯树,處理來自MRAppMaster的命令帘腹。
?? MRAppMaster:數(shù)據(jù)切分、為應(yīng)用程序申請資源聘萨,并分配內(nèi)部任務(wù)竹椒、任務(wù)監(jiān)控和容錯。
?? Container:對任務(wù)運行環(huán)境的抽象米辐,封裝了CPU、內(nèi)存等多維資源以及環(huán)境變量书释、啟動命令等任務(wù)運行相關(guān)信息(hadoop內(nèi)部文件操作命令和Liunx差不多)
MapReduce:分布式離線并行計算框架翘贮。
?? Map階段:并行處理數(shù)據(jù)
?? Reduce階段:對Map階段處理的結(jié)果數(shù)據(jù)進行匯總
?Common:支持其他模塊的工具模塊。
理解Hadoop組成
????? 有一個建筑工地的建造時間很緊急爆惧,設(shè)立了一個支持小組狸页,支援各個小分隊(Common),首先1000包水泥,這些水泥要進行存儲(HDFS)芍耘,假設(shè)這些水泥有防水的和不防水的址遇,防水的水泥存到倉庫1(HDFS-dn),不防水的存儲到倉庫2(HDFS-dn)斋竞,那么就要進行記錄倔约,哪些水泥存放到哪里了(HDFS-nn),因為趕工期擔(dān)心水泥可能會因為潮濕那些問題坝初,出現(xiàn)不可用浸剩,所以又準(zhǔn)備了1000包水泥,并且每天都要對這些水泥進行檢查(HDFS-2nn)鳄袍。
????? 如果一個小分隊要領(lǐng)取水泥就要和工地倉儲管理人員申請绢要,倉儲管理人員同意了,就要向公司申請人員來搬水泥(Yarn-MRAppMaster)拗小,開始調(diào)動這些人員搬運水泥(Yarn-rm)重罪,小分隊領(lǐng)取到了水泥之后,開始決定給修外墻的多少包水泥(Yarn-nm)哀九。
????? 修外墻小組就開始拿著水泥干活了(MapReduce-Map)剿配,直到整棟樓的外墻修好了(MapReduce-Reduce),第N棟也是如此(MapReduce-Map)勾栗。
Hadoop內(nèi)為什么要如此劃分惨篱?
數(shù)據(jù)存放在Hadoop,那么Hadoop必然需要對數(shù)據(jù)進行管理围俘,如果沒有一個專門管理數(shù)據(jù)存儲的組件或數(shù)據(jù)運算的組件砸讳,全部都融合在一個東西里面就會顯得很臃腫,并且組件之間只需要通過接口進行溝通界牡,那么各自的組件就可以僅僅自身的需求做優(yōu)化等簿寂,那么就不會影響到其他的組件。
各自的組件只需要做好自己的事情宿亡,對外提供接口接收相應(yīng)的數(shù)據(jù)及返回數(shù)據(jù)常遂,只要符合我組件規(guī)范的就運行,不符合就不運行挽荠,而不需要關(guān)心其他克胳,專心做自己的事情,也可以使得組件之間可以單獨的運行圈匆。
Hadoop目錄
????? bin:程序級命令(hdfs漠另、Yarn等)
????? etc:配置文件
????? include:類庫等文件
????? lib:類庫等文件
????? libexec:類庫等文件
????? sbin:hadoop系統(tǒng)命令(關(guān)閉、啟動等)
????? share:官方提供的案例等
Hadoop運行模式
????? 本地模式:不需要啟動單獨進程跃赚,直接運行笆搓,一般測試和開發(fā)使用,一臺機器就可以運行,如果是在Liunx满败,跑的是本地肤频,可以直接通過命令運行相應(yīng)的jar包。
偽分布式模式:等同于分布式算墨,但只有一個節(jié)點宵荒,具有集群的配置信息和運行,由于偽分布式只有一臺機器米同,可以不啟動Yarn骇扇,那么也就算是Hadoop的HDFS啟動了,直接運行MapReduce程序的話面粮,結(jié)果都在HDFS上少孝,不在是在本地,如果需要交由YARN上進行資源調(diào)度和分配任務(wù)熬苍,則需要配置Yarn地址稍走,以及指定數(shù)據(jù)獲取方式。
????? 完全分布式模式:多個節(jié)點一起運行柴底,可以指定不同節(jié)點干不同的活婿脸,比如機器1干NameNode的活,機器2干ResourceManger的活柄驻。
注意:啟動NameNode時狐树,DataNode會記錄NameNode信息(id),當(dāng)緩存的NameNode記錄刪除了鸿脓,這個時候啟動就會報錯抑钟,這個時候就需要將NameNode格式化(刪除所有數(shù)據(jù)),之后在重新啟動野哭。
HDFS
HDFS是什么在塔?
????? HDFS就是一個分布式文件存儲系統(tǒng),通過目錄樹來定位文件拨黔,由于分布式特點那么集群中的服務(wù)器就有各自的角色蛔溃。
特點
????? 低成本:由于是眾多服務(wù)器組成的,那么在某服務(wù)器掛了篱蝇,只需要付出一臺廉價的服務(wù)器贺待。
????? 高容錯性:HDFS是由眾多服務(wù)器實現(xiàn)的分布式存儲,每個文件都會有冗余備份零截,那么如果存儲數(shù)據(jù)的某個服務(wù)器掛了狠持,那么還有備份的數(shù)據(jù),允許服務(wù)器出現(xiàn)故障瞻润。
????? 高吞吐量:HDFS是一次寫多次讀的訪問模型,不允許修改文件,并簡化了數(shù)據(jù)的一致性問題绍撞。
????? 就近原則:在數(shù)據(jù)附近執(zhí)行程序正勒,也體現(xiàn)出來移動計算比移動數(shù)據(jù)效率高。
????? 可移植性:HDFS可以實現(xiàn)不同平臺之間的移植傻铣。
應(yīng)用場景
????? 一次寫入章贞,多次讀取,且不支持文件的修改非洲。
????? 適合數(shù)據(jù)分析場景鸭限,不適合網(wǎng)盤應(yīng)用。
HDFS數(shù)據(jù)塊
????? HDFS的文件在物理上是分塊存儲的两踏,1.x版本的數(shù)據(jù)塊默認(rèn)大小是64MB败京,2.x版本的數(shù)據(jù)塊默認(rèn)塊大小是128MB,這個值是可以通過配置參數(shù)(dfs.blocksize)進行調(diào)整的梦染。
????? HDFS的塊比磁盤的塊大赡麦,目的就在于要減少尋址的開銷(標(biāo)準(zhǔn):尋址時間只占傳輸時間的1%),如果塊設(shè)置的夠大帕识,從磁盤傳輸數(shù)據(jù)的時間明顯就大于定位這個塊開始位置所需要的文件泛粹,因此傳輸一個由多個塊組成的文件的時間取決于磁盤傳輸速率。
HDFS常用命令(和Liunx差不多)
基本命令:hadoop fs
查看幫助:hadoop fs 或 hadoop fs -help(詳情)
創(chuàng)建目錄:hadoop fs -mkdir /usr
查看目錄信息:hadoop fs -ls /usr
本地剪切肮疗,粘貼到集群:hadoop fs -moveFromLocal test.txt /usr/
追加一個文件到已存在文件的末尾:hadoop fs -appendToFile test2.txt /usr/test.txt
顯示文件內(nèi)容:hadoop fs -cat /usr/test.txt
顯示一個文件末尾:hadoop fs -tail /usr/ test.txt
以字符形式打印一個文件內(nèi)容:hadoop fs -text /usr/test.txt
修改文件所屬權(quán)限(-chgrp晶姊、-chomd、chown)(liunx一樣用法):hadoop fs -chmod777 /usr/test.txt
從本地復(fù)制到hdfs:hadoopfs -copyFormLocal text.txt /usr/test
hdfs復(fù)制到本地:hadoop fs -copyToLocal /usr/ text.txt ./
從hdfs路徑拷貝到hdfs另一個路徑:hadoop fs -cp /usr/dir1 /usr/dir2
在hdfs目錄中移動文件:hadoop fs -mv /usr/test.txt /usr/dir
從hdfs下載文件到本地:hadoop fs -get /usr/test.txt ./
合并下載多個文件:hadoop fs -getmerge /usr /*.txt ./result.txt
上傳文件等于copyFormLocal:hadoop fs -put test.txt /usr
刪除文件或文件夾:hadoop fs -rmr /usr/test.txt
刪除空目錄:hadoop fs -rmdir /usr/test3
統(tǒng)計文件系統(tǒng)可用空間信息(-h格式化信息):hadoop fs -df -h
統(tǒng)計文件夾大小信息:hadoop fs -du -h /
統(tǒng)計制定目錄下的文件節(jié)點數(shù)據(jù)量(嵌套級伪货,當(dāng)前文件個數(shù)们衙,大小):hadoop fs -count -h /usr
設(shè)置文件的副本數(shù):hadoop fs -setrep 3 /usr/test.txt
NameNode
NameNode和SecondaryNameNode工作機制
第一階段:NameNode的工作
????? 1、第一次啟動namenode格式化后超歌,創(chuàng)建fsimage和edits文件砍艾,如果不是第一次啟動,直接加載編輯日志和鏡像文件到內(nèi)存巍举。
????? 2脆荷、客戶端對元數(shù)據(jù)進行操作請求
????? 3、NameNode記錄操作日志懊悯,更新滾動日志蜓谋。
????? 4、NameNode在內(nèi)存中對數(shù)據(jù)進行操作
第二階段:Secondary NameNode的工作
????? 1炭分、Secondary NameNode詢問NameNode是否需要checkpoint桃焕,直接帶回NameNode檢查結(jié)果。
????? 2捧毛、Secondary NameNode請求執(zhí)行checkpoint
????? 3观堂、NameNode滾動正在寫的eits日志
????? 4让网、將滾動前的編輯日志和鏡像文件拷貝到Secondary NameNode
????? 5、Secondary NameNode加載編輯日志和鏡像文件到內(nèi)存并且合并
????? 6师痕、生成新的鏡像文件fsimage.chkpoint
????? 7溃睹、拷貝fsimage.chkpoint到NameNode
????? 8、NameNode將fsimage.chkpoint重命名為fsimage
說明
Fsimage文件:HDFS文件系統(tǒng)元數(shù)據(jù)的一個永久檢查點胰坟,其中包含HDFS文件系統(tǒng)所有目錄和文件因篇,以及node序列化信息。
Edits文件:存放HDFS文件系統(tǒng)的所有更新操作笔横,文件系統(tǒng)客戶端執(zhí)行的所有寫操作日志都會記錄到edits文件竞滓。
Secondary
NameNode:在主NameNode掛了,可以從Secondary NameNode中恢復(fù)數(shù)據(jù)吹缔,但是由于同步的條件限制商佑,會出現(xiàn)數(shù)據(jù)不一致。
DataNode
工作機制
集群安全模式
????? NameNode啟動時涛菠,受限將鏡像文件加載進去內(nèi)存莉御,并編輯日志文件中的各項操作,一旦內(nèi)存中成功建立文件系統(tǒng)元數(shù)據(jù)鏡像俗冻,則創(chuàng)建一個新的fsimage文件和一個空的編輯日志礁叔。
????? 此時的NameNode開始監(jiān)聽DataNode請求,但此刻迄薄,NameNode是運行在安全模式琅关,則此時NameNode文件系統(tǒng)對于客戶端來說是只可讀。
????? 系統(tǒng)中數(shù)據(jù)塊文件并不是由NameNode維護的讥蔽,而是以塊列表的形式存儲在DataNode涣易,在系統(tǒng)正常操作期間,NameNode會在內(nèi)存中保留所有塊位置影像信息冶伞。
????? 在安全模式下新症,各個DataNode會向NameNode發(fā)送最新的塊列表信息,NameNode了解到足夠多的塊信息之后响禽,即可高效運行文件系統(tǒng)徒爹。
????? 如果滿足最小復(fù)本條件,NameNode會在30秒后就退出安全模式芋类,最小復(fù)本條件指的是整個文件系統(tǒng)中99%的塊都滿足最小復(fù)本級別隆嗅,在啟動一個剛剛格式化的HDFS集群時,因為系統(tǒng)中還沒有塊侯繁,所以NameNode不會進入安全模式胖喳。
????? 集群啟動完成后自動退出安全模式。
安全模式的應(yīng)用場景
????? 銀行對賬贮竟、維護丽焊。
Java操作HDFS
Demo
public static void main(String[] args) throws IllegalArgumentException, IOException,
InterruptedException, URISyntaxException {
??????? //配置信息
??????? Configurationconfiguration = new Configuration();
??????? //獲取文件系統(tǒng)
??????? FileSystemfileSystem= FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "levi");
??????? //拷貝本地文件到集群
??????? fileSystem.copyFromLocalFile(new Path("e:/hdfs/test.txt"), new Path("/usr/hdfs/test.txt"));
??????? //關(guān)閉
??????? fileSystem.close();
}
HDFS數(shù)據(jù)流
IO流寫流程
IO流方式上傳文件 (Java)
public void fileUpload() throws IOException, InterruptedException,
URISyntaxException {
??????? //配置
??????? Configurationconfiguration = new Configuration();
??????? //文件系統(tǒng)
??????? FileSystemfileSystem= FileSystem.get(new URI("hdfs://hadoop102:8020"),configuration,"levi");
??????? //獲取輸出流(上傳到服務(wù)器) - 服務(wù)器
??????? FSDataOutputStreamfsDataOutputStream = fileSystem.create(new Path("/usr/hdfs/test03.txt"));
??????? //文件輸入流(本地上傳)
??????? FileInputStreamfileInputStream = new java.io.FileInputStream(new File("E:/hdfs/test03.txt"));
??????? //流對接
??????? IOUtils.copyBytes(fileInputStream, fsDataOutputStream, configuration);
??????? fsDataOutputStream.hflush();
??????? IOUtils.closeStream(fileInputStream);
??????? IOUtils.closeStream(fsDataOutputStream);
??????? //關(guān)閉
??????? fileSystem.close();
??? }
IO流讀流程
IO流方式下載文件 (Java)
public void readFile() throws IOException, InterruptedException,
URISyntaxException {
?????? //配置
?????? Configurationconfiguration = new Configuration();
?????? //文件系統(tǒng)
?????? FileSystemfileSystem= FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "levi");
?????? //輸入流(下載) 服務(wù)器
?????? FSDataInputStreamfsDataInputStream = fileSystem.open(new Path("/usr/hdfs/hadoop-2.7.2.tar.gz"));
?????? //輸出(本地)
?????? FileOutputStreamfileOutputStream = new FileOutputStream(new File("E:/hdfs/block.txt"));
?????? //流對接
?????? //--- 第一塊
?????? /*byte[]buf= newbyte[1024];
?????? for (inti = 0; i <1024*128; i++) {
?????????? fsDataInputStream.read(buf);
?????????? fileOutputStream.write(buf);
?????? }
?????? //關(guān)閉
?????? fsDataInputStream.close();
?????? fileOutputStream.close();*/
?????? //--- 第二塊
?????? fsDataInputStream.seek(1024 * 1024 * 128);//定位這個位置開始讀
?????? IOUtils.copyBytes(fsDataInputStream, fileOutputStream, 1024);
?????? IOUtils.closeStream(fileOutputStream);
?????? IOUtils.closeStream(fsDataInputStream);
?????? fileSystem.close();
??? }
副本節(jié)點選擇
????? 在海量數(shù)據(jù)的處理中较剃,節(jié)點之間的數(shù)據(jù)傳輸速率是很重要,特別是在帶寬很稀缺的情況下粹懒,而節(jié)點和節(jié)點之間的距離越遠(yuǎn)重付,那么必然會影響數(shù)據(jù)的傳輸。
????? 在成千的服務(wù)器集群中凫乖,Hadoop是怎么選擇副本節(jié)點呢?
低版本Hadoop
第一個副本在客戶端所處的節(jié)點上弓颈,但是如果客戶端是在集群外帽芽,隨機選取一個節(jié)點
第二個副本和第一個副本位于不同機架的隨機節(jié)點上,也就是不和第一個副本在相同機架翔冀。
第三個副本和第二個副本位于相同機架导街,節(jié)點隨機
Hadoop2.5版本以上
????? 第一個副本在客戶端所處節(jié)點上。如果客戶端在集群外纤子,隨機選一個
????? 第二個副本和第一個副本位于相同機架搬瑰,隨機節(jié)點
????? 第三個副本位于不同機架,節(jié)點隨機
HDFS誤區(qū)
小文件存儲
每個文件均按照塊存儲控硼,每個塊的元數(shù)據(jù)存儲在NamNode的內(nèi)存中(一個文件/目錄/文件塊一般占有150字節(jié)的元數(shù)據(jù)內(nèi)存空間)泽论,因此Hadoop存儲小文件會非常低效,因為大量小文件會耗盡NameNode中大部分內(nèi)存卡乾,但存儲小文件所需要的磁盤容量和存儲這些文件原始內(nèi)容所需要的磁盤空間相比也不會增多翼悴。
例如:上傳一個文件1MB,那么這個文件會在HDFS中的一個塊存儲著幔妨,這個塊默認(rèn)是128MB鹦赎,那么是不是占用了128MB的磁盤空間呢?
????? 每一個塊128MB只是HDFS的邏輯上的劃分误堡,所以在磁盤占用空間還是1MB古话,只有當(dāng)一個或多個文件在一個塊內(nèi)超過128MB,之后將這個文件進行切割锁施。
副節(jié)點處理
HDFS是先把當(dāng)前這個節(jié)點處理完陪踩,在去處理副本節(jié)點的。
回收站
????? 回收站默認(rèn)是不啟用的沾谜,在core-site.xml文件中的配置fs.trash.interval默認(rèn)是為0.
HDFS全過程
MapReduce
MapReduce是什么膊毁?
????? MapReduce是一個分布式運算程序的編程框架,是用戶開發(fā)基于Hadoop的數(shù)據(jù)分析應(yīng)用的核心框架基跑。
????? MapReduce核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個完整的分布式運算程序婚温,并發(fā)的運行在一個Hadoop集群上。
作用
????? 由于硬件資源限制媳否,海量數(shù)據(jù)無法在單機上處理栅螟,單機版程序擴展到集群進行分布式運算荆秦,增加程序的復(fù)雜度和開發(fā)難度。
????? MapReduce框架就是要使得開發(fā)人員開源將絕大部分工作集中在業(yè)務(wù)邏輯的開發(fā)上力图,而分布式運算的復(fù)雜性交由MapReduce來處理步绸。
特點
????? 適合數(shù)據(jù)復(fù)雜度運算
????? 不適合算法復(fù)雜度運算
????? 不適合實時計算、流式計算
核心思想
分布式的運算程序最少需要分成兩個階段:
第一個階段:MapTask并發(fā)實例吃媒,完全并行運行瓤介,互不相干
第二個階段:ReduceTask并發(fā)實例,互不相干赘那,但是他們的數(shù)據(jù)依賴于上一個階段的所有MapTask并發(fā)實例的輸出
MapReduce編程模型只能包含一個Map階段和Reduce階段刑桑,如果用戶的業(yè)務(wù)邏輯非常復(fù)雜,那就只能多個MapReduce程序募舟,串行運行祠斧。
?
總結(jié)
????? Map:并行處理任務(wù)(運算)。
????? Reduce:等待相關(guān)的所有Map處理完任務(wù)拱礁,在將任務(wù)數(shù)據(jù)匯總輸出琢锋。
????? MRAppMaster:負(fù)責(zé)整個程序的過程調(diào)度和狀態(tài)協(xié)調(diào)。
MapReduce進程
????? 一個完整的MapReduce程序在分布式允許時有三類實例進程:
????? MRAppMaster:負(fù)責(zé)整個程序的過程調(diào)度和狀態(tài)協(xié)調(diào)呢灶。
????? MapTask:負(fù)責(zé)Map階段的整個數(shù)據(jù)處理流程吴超。
????? ReduceTask:負(fù)責(zé)Reduce階段的整個數(shù)據(jù)處理流程。
序列化
????? 序列化就是把內(nèi)存中的對象轉(zhuǎn)換成字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)填抬,以便于存儲(持久化)和網(wǎng)絡(luò)傳輸烛芬。
????? 而序列化就是Map到Reducer的橋梁。
????? Java序列化是一個重量級的序列化框架(Serializable)飒责,使用這個框架進行序列化后會附帶很多額外信息(各種校驗信息赘娄、header等),不便于網(wǎng)絡(luò)傳輸宏蛉,所以Hadoop自己開發(fā)了一套序列化機制(Writable)遣臼,精確、高效拾并。
Java類型Hadoop
? Writable類型
booleanBooleanWritable
byteByteWritable
intIntWritable
floatFloatWritable
longLongWritable
doubleDoubleWritable
stringText
mapMapWritable
arrayArrayWritable
備注:自定義的反序列類中的write方法和read方法中DataOutput和DataInput這兩個類所提供的方法中揍堰,對應(yīng)Java類型String的方法,分別是writeUTF()和readUTF()嗅义。
實例(統(tǒng)計單詞)
public class WordCountMapper extends Mapper<LongWritable, Text, Text,
IntWritable> {
??? private Text key = new Text();
??? @Override
??? protected void map(LongWritable key, Text value, Context context)
?????????? throws IOException, InterruptedException {
?????? //讀取每一行
?????? Stringline = value.toString();
?????? //切割出每一個單詞
?????? String []words = line.split("\t");
?????? //將讀取到的每一個單詞都寫出屏歹,并且值都為1,因為是在map計算完后到reduce進行匯總之碗,形成Key 多個Value
?????? for (String word : words) {
?????????? //每次文件內(nèi)的讀取一行都調(diào)用一次map蝙眶,那樣就形成了調(diào)用多次map,那樣的話就不用創(chuàng)建多個key對象了
?????????? this.key.set(word);
?????????? context.write(this.key, new IntWritable(1));
?????? }
??? }
}
public class WorkCountReducer extends Reducer<Text, IntWritable, Text,
IntWritable>{
??? @Override
??? protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
?????? //這里就形成 多個值的匯總結(jié)果褪那,那么將這個值多個進行匯總后幽纷,統(tǒng)一歸并到一個key式塌,就形成了一個key對應(yīng)多個value
?????? int count = 0;
?????? for (IntWritable value : values) {
?????????? count += value.get();
?????? }
?????? context.write(key, new IntWritable(count));
??? }
}
public class WordCountDriver {
??? public static void main(String[] args) throws Exception {
?????? //配置
?????? Configurationconfiguration = new Configuration();
?????? //任務(wù)運行
?????? Jobjob= Job.getInstance(configuration);
?????? job.setJarByClass(WordCountDriver.class);
?????? //運算類和匯總類
?????? job.setMapperClass(WordCountMapper.class);
?????? job.setReducerClass(WorkCountReducer.class);
?????? //運算和匯總輸入和輸出
?????? job.setMapOutputKeyClass(Text.class);
?????? job.setMapOutputValueClass(IntWritable.class);
?????? //最終輸出
?????? job.setOutputKeyClass(Text.class);
?????? job.setOutputValueClass(IntWritable.class);
?????? //運算文件的輸入和結(jié)果輸出
?????? FileInputFormat.setInputPaths(job, new Path("E:/hadooptest/mapreduce/input"));
?????? FileOutputFormat.setOutputPath(job, new Path("E:/hadooptest/mapreduce/output"));
?????? //提交
?????? job.submit();
?????? //等待
?????? boolean result = job.waitForCompletion(true);
?????? System.exit(result ? 0 : 1);
??? }
}
程序流程分析
1、MapReduce程序讀取輸入目錄存放的相應(yīng)文件友浸。
2峰尝、客戶端在submit方法執(zhí)行之前,獲取到待處理的數(shù)據(jù)信息收恢,讓后根據(jù)急群眾參數(shù)配置形成一個任務(wù)分配規(guī)劃武学。
????? 1、建立連接
????? 2派诬、創(chuàng)建提交任務(wù)的代理(本地:LocalRunner劳淆、遠(yuǎn)程:YarnRunner)
????? 3、創(chuàng)建給集群提交數(shù)據(jù)的stag路徑
????? 4默赂、獲取到任務(wù)id,并創(chuàng)建任務(wù)路徑
????? 5括勺、獲取到任務(wù)jar包缆八,拷貝jar包到集群(這個jar就是程序運行的業(yè)務(wù)代碼)
????? 6、計算切片疾捍,生成切片規(guī)劃文件
computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize=128MB
????? 7奈辰、提交任務(wù),返回提交狀態(tài)
3乱豆、客戶端提交job.split奖恰、jar包、job.xml等文件給Yarn宛裕,Yarn中的resourcemanager啟動MRAppMater瑟啃。
4矩肩、MRAppMater啟動后根據(jù)job的描述信息散劫,計算出需要的MapTask實例數(shù)量,然后向集群申請機器放钦,啟動相應(yīng)數(shù)量的Map Task進程岩榆。
5错负、MapTask利用客戶指定的InputFormat來讀取數(shù)據(jù),形成KV對勇边。
6犹撒、MapTask將輸入KV對傳遞給客戶定義的map()方法,做邏輯運算粒褒。
7识颊、map()運算完畢后將運算結(jié)果的KV對,手機到MapTask緩存怀浆。
8谊囚、MapTask緩存中的KV對按照K分區(qū)排序后不斷寫到磁盤文件怕享。
9、MRAppMaster監(jiān)控到所有MapTask進程任務(wù)完成后镰踏,會根據(jù)用戶指定的參數(shù)啟動相應(yīng)數(shù)量的ReduceTask進程函筋,并告知ReduceTask進程要處理的數(shù)據(jù)分區(qū)。
10奠伪、ReduceTask進程啟動后跌帐,根據(jù)MRAppMaster告知待處理數(shù)據(jù)所在位置,從N臺MapTask運行所在的機器上獲取到N個MapTask輸出結(jié)果文件绊率,并在本地運行重新歸并排序谨敛,按照相同Key的KV為一個組,調(diào)用客戶定義的reduce()方法進行邏輯運算滤否。
11脸狸、ReduceTask運算完畢后,調(diào)用客戶指定的OuputFormat將結(jié)果數(shù)據(jù)輸出(文件)到外部存儲藐俺。
說明:
????? 切片是邏輯上的切片
????? 規(guī)劃文件就是里面描述了切多少個片炊甲,每個片是怎么樣的。
數(shù)據(jù)切片
????? MapTask的并行任務(wù)是否越多越好欲芹?并行度是如何決定的卿啡?MapTask到底開多少個合適?
????? 1菱父、一個job的map()階段并行度(MapTask開幾個)颈娜,由客戶端在提交job時決定。
????? 2浙宜、每一個Split切片分配一個MapTask并行實例處理官辽。
????? 3、默認(rèn)情況下切片大小=塊大小(blocksize)
????? 4梆奈、切片時不考慮數(shù)據(jù)集整體野崇,而是針對每一個文件單獨切片(這個是邏輯上的劃分)
切片流程
????? 1、獲取到數(shù)據(jù)存儲目錄
????? 2亩钟、找到要便利處理目錄下的每一個文件
????? 3乓梨、讀取第一個文件test.txt(257MB)
??????????? 1、獲取文件大小
??????????? 2清酥、計算分片大小扶镀,每次切片時,都要判斷剩下的部分是否大于塊大小的1.1倍焰轻,大于就在劃分一個塊切片
??????????? 切片:
第一塊:128MB
????????????????? 第二塊:129MB / 128MB = 1.0078125
????????????????? 1.0078125< 1.1 =不在切片臭觉,反之繼續(xù)切
????????????????? 源碼:computeSliteSize(Math.max(minSize,Math.max(naxSize,blocksize)));
??????????? 3、將切片信息寫到一個切片規(guī)劃文件(說明文件)中
??????????? 4、整個切片的核心過程在于getSplit()方法(看submit()源碼)中完成蝠筑,數(shù)據(jù)切片只是邏輯上對輸入數(shù)據(jù)進行切片狞膘,并不會在磁盤上,將文件切分進行存儲什乙。
??????????? InputSplit只是記錄了分片的元數(shù)據(jù)信息挽封。比如:起始位置、長度臣镣、所在的節(jié)點列表等辅愿。
注意:塊是HDFS上物理存儲的數(shù)據(jù),切片只是邏輯上的劃分忆某。
??????????? 5点待、提交切片規(guī)劃文件(說明文件)到Y(jié)arn上,Yarn上的MrAppMaster就根據(jù)切片規(guī)劃文件(說明文件)計算開啟的MapTask個數(shù)(多少個切片就多少個MapTask)弃舒。
FileInputFormat中默認(rèn)的切片機制
????? 1癞埠、簡單按照文件內(nèi)容長度切片
????? 2、切片大小聋呢,默認(rèn)是塊大小
????? 3燕差、切片時不考慮數(shù)據(jù)集整體性,而是逐個文件的單獨切片坝冕,循環(huán)遍歷每一個文件。
????? MaxSize(切片最大值):如果比塊大小還小瓦呼,則會讓切片變小喂窟。
MinSize(切片最小值):如果比塊大小還大,則會讓切片變得比塊還大央串。
假設(shè):塊大小128MB
?????????????????????? MaxSize設(shè)為100MB
?????????????????????? 切片后的存儲占塊大小100MB
小文件切片處理
????? 如果有大量的小文件磨澡,而每一個文件都是一個單獨的切片,都會各自交給一個MapTask處理质和,那么需要開啟大量的MapTask稳摄,則會產(chǎn)生大量的MapTask,導(dǎo)致處理效率低下饲宿。
解決方案
????? 1厦酬、在數(shù)據(jù)處理前端,先把小文件合并成大文件瘫想,在上傳到HDFS做后續(xù)分析
????? 2仗阅、如果已經(jīng)有大量的小文件存在HDFS,使用CombineFileInputFormat進行處理国夜,CombineFileInputFormat的切片邏輯跟TextFileInputFormat不同减噪,他可以將多個小文件邏輯上規(guī)劃到一個切片中,這樣多個小文件就可以交給一個MapTask。
????? 3筹裕、優(yōu)先滿足最小切片大小醋闭,不超過最大切片大小的前提下。
文件合并
//-------- 使用提供的自定義類朝卒,指定切片大小
job.setInputFormatClass(CombineTextInputFormat.class);
//最大輸入切片大小证逻,一個文件的大小是4M就開始切,算法是1.1倍
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
//最小輸入切片大小扎运,多個文件合并到了一起瑟曲,到了2M就切,算法是1.1倍豪治,優(yōu)先滿足最小切片大小
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
備注:在運行日志中查找number of就可以看到了
Shuffle機制
1洞拨、在MapReduce中,Map階段處理的數(shù)據(jù)如何傳遞給Reduce階段額负拟,是MapReduce框架中關(guān)機的一個流程烦衣,這個流程就叫Shuffle。
????? 2掩浙、Shuffle(洗牌花吟、發(fā)牌):核心就是數(shù)據(jù)分區(qū)、排序厨姚、緩存
????? 3衅澈、MapTaks輸出處理結(jié)果數(shù)據(jù),分發(fā)給ReduceTask并在分發(fā)過程中對數(shù)據(jù)按照Key進行分區(qū)和排序谬墙。
Shuffle機制緩存流程圖
????? Shuffle是MapReduce處理流程中一個過程今布,每一個步驟都是分散在各個MapTask和ReduceTask節(jié)點上。
MapReduce詳細(xì)運行流程
總結(jié)
????? MapReduce詳細(xì)運行流程圖就是一個流水線一般的作業(yè)拭抬,從左向右過去部默,而在開發(fā)的過程中,需要使用到什么組件造虎,這些組件會起到什么作用傅蹂,在哪一個時間起作用,都可以在這個圖中詳細(xì)的描述
分區(qū)
自定義分區(qū)
public classMyPartitionerextends Partitioner<Text, FlowBean>{
??? @Override
??? public int getPartition(Text key, FlowBean value, int numPartitions) {
?????? //拿到手機號碼前三位
?????? StringphoneNum = key.toString().substring(0, 3);
?????? //建立5個分區(qū)算凿,從0開始
?????? int partition = 4;
?????? //判斷
?????? if("135".equals(phoneNum)) {
?????????? partition = 0;
?????? }else if("136".equals(phoneNum)) {
?????????? partition = 1;
?????? }else if("137".equals(phoneNum)) {
?????????? partition = 2;
?????? }else if("138".equals(phoneNum)) {
?????????? partition = 3;
?????? }
?????? return partition;
??? }
}
?//設(shè)置分區(qū)類
?????? //如果沒有設(shè)置分區(qū)份蝴,那么則會按照塊大小去計算什么時候進行分區(qū)
?????? job.setPartitionerClass(MyPartitioner.class);
?????? //設(shè)置ReduceTask數(shù)量
?????? job.setNumReduceTasks(5);
總結(jié)
reduce數(shù)量小于分區(qū)數(shù)量就會報錯。
reduce數(shù)量是1澎媒,那么則所有結(jié)果輸出到一個文件內(nèi)搞乏,即便配置了分區(qū)也不會去跑分區(qū)的代碼(執(zhí)行分區(qū))
reduce數(shù)量大于分區(qū)數(shù)量,輸出的其他文件為空
分區(qū)數(shù)量 = reduce數(shù)量戒努,按照分區(qū)數(shù)量輸出結(jié)果文件數(shù)量
分區(qū)就是對map的結(jié)果數(shù)據(jù)進行二次處理请敦,從而再去決定是否影響輸出的reduce結(jié)果輸出镐躲。
排序
????? MapTask和ReduceTask均會對數(shù)據(jù)(按Key排序)進行排序,這個操作屬于Hadoop默認(rèn)行為侍筛,任何應(yīng)用程序中的數(shù)據(jù)均會被排序萤皂,而不管邏輯上是否需要。
????? 對于MapTask匣椰,他會把處理的結(jié)果暫時放到一個緩沖區(qū)裆熙,當(dāng)緩沖區(qū)使用率達(dá)到了閾值就對緩沖區(qū)的數(shù)據(jù)進行排序,并將這些有序的數(shù)據(jù)寫到磁盤上禽笑,而當(dāng)數(shù)據(jù)處理完后入录,他會對磁盤上所有文件進行一次合并,將這些文件合并成一個有序的文件佳镜。
????? 對于ReduceTask僚稿,他從每個MapTask遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)文件,如果文件大小超過一定閾值蟀伸,則放到磁盤上蚀同,否則放到內(nèi)存中,如果磁盤上文件數(shù)目達(dá)到一定閾值啊掏,則進行一次合并蠢络,生成一個更大的文件,如果內(nèi)存文件大小或數(shù)目超過閾值迟蜜,則進行合并后將數(shù)據(jù)寫出到磁盤上,當(dāng)所有的數(shù)據(jù)拷貝完畢后娜睛,再統(tǒng)一的對內(nèi)存核磁盤上的所有文件進行一次合并芦疏。
自定義排序
public class FlowBean implements WritableComparable<FlowBean>
{
??? private Long sum;
??? public FlowBean() {
?????? super();
??? }
??? @Override
??? public void write(DataOutput dataOutput) throws IOException {
?????? dataOutput.writeLong(sum);
??? }
??? @Override
??? public void readFields(DataInput dataInput) throws IOException {
?????? sum = dataInput.readLong();
??? }
??? public Long getSum() {
?????? return sum;
??? }
??? public void setSum(Long sum) {
?????? this.sum = sum;
??? }
??? @Override
??? public String toString() {
?????? return sum.toString();
??? }
??? @Override
??? public int compareTo(FlowBean o) {
?????? return this.sum > o.getSum() ? -1 : 1;
??? }
}
總結(jié)
Shullt規(guī)定Key是要進行排序的,如果作為Key是必須要實現(xiàn)WritableComparable接口的微姊。
Combiner合并
ReducrTask是接收總的MapTask結(jié)果,Combiner在每一個MapTask運行的分预,對每每個MapTask的結(jié)果匯總(局部匯總)兢交,將MapTask匯總后之后進行壓縮傳輸,可以減少網(wǎng)絡(luò)傳輸量笼痹。
????? 但是Combiner的前提是不能影響到最終的業(yè)務(wù)邏輯配喳,如果是累加求和是沒有問題的,如果是求平均值就有問題的凳干。
如:
1晴裹、在每一個MapTask進行求平均值之后在ReduceTask再求一次平均值,結(jié)果是不一樣的救赐。
????? 2涧团、將MapTask的數(shù)據(jù)全部匯總到ReduceTask之后再求平均值。
這兩種結(jié)果是不一樣的。
自定義Combiner合并
public class WordCountCombiner extends Reducer<Text, IntWritable, Text,
IntWritable>{
??? @Override
??? protected void reduce(Text key, Iterable<IntWritable> values,
?????????? Contextcontext)throws IOException, InterruptedException {
?????? int count = 0;
?????? for (IntWritable intWritable : values) {
?????????? count += intWritable.get();
?????? }
?????? context.write(key, new IntWritable(count));
??? }
}
//reduce是接收總的MapTask匯總泌绣,combiner在每一個maptask運行的钮追,對每一個maptask匯總
//如:每一個maptask都進行匯總,之后進行壓縮傳輸
job.setCombinerClass(WordCountCombiner.class);
分組
????? 就是對分區(qū)排序好的數(shù)據(jù)阿迈,在進行一次合并分類開來元媚,再一次合并的話,就有個比較標(biāo)識苗沧,如果兩個數(shù)據(jù)標(biāo)識是一樣的刊棕,就認(rèn)為是一組數(shù)據(jù),最后過濾去重待逞,最終得到有哪些組甥角。
自定義分組
public classOrderGoupingComparatorextends WritableComparator {
??? protected OrderGoupingComparator() {
?????? super(OrderBean.class, true);
??? }
??? @Override
??? public intcompare(WritableComparablea,WritableComparableb) {
?????? OrderBeanabean = (OrderBean) a;
?????? OrderBeanbbean = (OrderBean) b;
?????? // 將orderId相同的bean都視為一組
?????? return abean.getOrderId().compareTo(bbean.getOrderId());
??? }
}
//設(shè)置Reduce端分組
job.setGroupingComparatorClass(OrderGoupingComparator.class);
//分區(qū)
job.setPartitionerClass(OrderPartition.class);
job.setNumReduceTasks(3);
自定義InputFormat
????? 對小文件的輸入進行合并處理。
1飒焦、設(shè)置文件不可切割
2蜈膨、讀取到整個文件,并且整個文件的數(shù)據(jù)作為value輸出給MapTask(將分片傳進去去讀取后牺荠,將讀取到的所有分片數(shù)據(jù)合并給到MapTask)
3翁巍、MapTask在對合并后的數(shù)據(jù)做操作
public class DistriutedCacheMapper extends Mapper<LongWritable, Text, Text,
NullWritable>{
??? private Map<String, String> map = new HashMap<>();
??? private Text key = new Text();
??? @Override
??? protected void setup(Mapper<LongWritable, Text,
Text, NullWritable>.Context context)
?????????? throws IOException, InterruptedException {
?????? //獲取緩存文件,這個文件給加載進了hadoop系統(tǒng)了休雌,在緩存中的根灶壶,可以直接通過名字調(diào)用
?????? BufferedReaderbufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(new File("pd.txt"))));
?????? Stringline;
?????? while(StringUtils.isNotEmpty(line = bufferedReader.readLine())) {
?????????? //數(shù)據(jù)處理
?????????? String []strings = line.split("\t");
?????????? //將數(shù)據(jù)放到緩存集合中
?????????? map.put(strings[0], strings[1]);
?????? }
?????? bufferedReader.close();
??? }
??? @Override
??? protected void map(LongWritable key, Text value, Context context)
?????????? throws IOException, InterruptedException {
?????? //讀取到order的每一行
?????? String[]strings = value.toString().split("\t");
?????? StringorderId = strings[0];
?????? Stringname = map.get(orderId);
?????? this.key.set(value.toString() + "\t" + name);
?????? context.write(this.key, NullWritable.get());
??? }
}
自定義OutputFormat
????? 獲取到ReduceTask的運行結(jié)果,自定義要輸出的結(jié)果數(shù)據(jù)和文件
public class FilterRecordWriter extends RecordWriter<Text,
NullWritable>{
??? private FileSystem fileSystem;
??? private FSDataOutputStream aCreate;
??? private FSDataOutputStream oCreate;
??? publicFilterRecordWriter(TaskAttemptContext job) {
?????? try {
?????????? fileSystem= FileSystem.get(job.getConfiguration());
?????????? //創(chuàng)建輸出文件路徑
?????????? PathaPath = new Path("E:\\hadooptest\\mapreduce\\a.log");
?????????? PathoPath = new Path("E:\\hadooptest\\mapreduce\\o.log");
?????????? //創(chuàng)建輸出流
?????????? aCreate = fileSystem.create(aPath);
?????????? oCreate = fileSystem.create(oPath);
?????? }catch (IOException e) {
?????????? e.printStackTrace();
?????? }
??? }
??? @Override
??? public void write(Text key, NullWritable value) throws IOException, InterruptedException {
?????? if(key.toString().contains("levi")) {
?????????? aCreate.write(key.toString().getBytes());
?????? }else {
?????????? oCreate.write(key.toString().getBytes());
?????? }
??? }
??? @Override
??? public void close(TaskAttemptContext context) throws IOException, InterruptedException {
?????? if(null != aCreate) {
?????????? aCreate.close();
?????? }
?????? if(null != oCreate) {
?????????? oCreate.close();
?????? }
??? }
}
//設(shè)置輸出類為自定義輸出類
job.setOutputFormatClass(FliterOutputFormat.class);
//雖然自定義了一個輸出杈曲,但是還是要輸出驰凛,因為有個成功狀態(tài)標(biāo)識文件要輸出,不然會報錯
FileOutputFormat.setOutputPath(job, new Path("E:\\hadooptest\\mapreduce\\output"));
計數(shù)器
Hadoop為每一個作業(yè)維護了若干個內(nèi)置計算器担扑,以描述多項指標(biāo)恰响,例如:某些計數(shù)器記錄已處理的字節(jié)數(shù)等。
計數(shù)器的使用
context.getCounter("counterGroup組名","countera變量"),increment(1);
說明:計數(shù)器的結(jié)果在程序運行后的控制臺日志中可查看
總結(jié)
????? HDFS根據(jù)配置在各個節(jié)點存儲數(shù)據(jù)涌献,并且存儲相應(yīng)的副本數(shù)據(jù)胚宦。
????? MapReduce就是在需要執(zhí)行無論是MapTask或ReduceTask的時候,會先去ResouceManager去詢問燕垃,任務(wù)要在哪里運行枢劝,其實ResourceManger就是看要運行這個任務(wù)的輸入數(shù)據(jù)在哪個節(jié)點,從而去告知這個節(jié)點執(zhí)行任務(wù)卜壕,那么就形成了直接移動計算您旁,而不是移動數(shù)據(jù)的方式。
因為數(shù)據(jù)可能存儲在服務(wù)器1或服務(wù)器2…服務(wù)器轴捎,那么不需要移動數(shù)據(jù)鹤盒,負(fù)責(zé)執(zhí)行任務(wù)的服務(wù)器蚕脏,到指定的路徑,下載要運算的任務(wù)jar包昨悼,直接在本地運行蝗锥,那么當(dāng)數(shù)據(jù)非常大的時候就不用去移動數(shù)據(jù)。
YARN
Yarn是什么率触?
????? Yarn是一個資源調(diào)度平臺终议,負(fù)責(zé)為運算程序提供服務(wù)器運算資源,相當(dāng)于一個分布式的操作系統(tǒng)平臺葱蝗,在之前說了穴张,可以配置MapReduce在Yarn之上運行,所以MapReduce等運算程序則相當(dāng)于運行于操作系統(tǒng)之上的應(yīng)用程序两曼。
Yarn機制
????? 1皂甘、不需要知道用戶提交的程序運行機制,只要符合Yarn規(guī)范的資源請求機制即可使用Yarn悼凑,Spark偿枕、Storm等運算框架都可以整合在Yarn上運行,意味著與用戶程序完全解耦户辫。
????? 2渐夸、只提供運算資源的調(diào)度,程序向Yarn申請資源渔欢,Yarn負(fù)責(zé)分配資源
????? 3墓塌、Yarn總的資源調(diào)度是ResourceManager,提供運算資源的角色叫NodeManager奥额。
????? 4苫幢、Yarn作為一個通用的資源調(diào)度平臺,企業(yè)以前存在的各種運算集群都可以整合在一個物理集群上垫挨,提高資源利用率韩肝,方便數(shù)據(jù)共享。
Yarn作業(yè)流程
1九榔、客戶端將MapReduce程序提交到客戶端所在的節(jié)點伞梯。
2、YarnRunner就向RsourceManager申請一個Application帚屉。
3、RsourceManager內(nèi)部運行一下漾峡,看看哪個節(jié)點離提交申請節(jié)點近攻旦,以及系統(tǒng)資源等,內(nèi)部運行完了生逸,就將應(yīng)用程序資源路徑返回給YarnRunner牢屋。
4且预、程序就將運行程序所需要的資源提交到HDFS上。
5烙无、程序資源提交完后锋谐,申請運行MRAppMaster。
6截酷、RsourceManager將用戶請求轉(zhuǎn)化為一個task(任務(wù))涮拗,并尋找最適合的NodeManager,并將任務(wù)分配給這個NodeManager迂苛。
7三热、NodeManager領(lǐng)取到任務(wù)后,創(chuàng)建容器(Container)三幻,并產(chǎn)生MRAppMaster就漾。
8、MRAppMaster向RsourceManager申請運行N個MapTask容器(切片文件中有說明)念搬。
9抑堡、RsourceManager又尋找了一下,將MapTask分配給另外兩個NodeManager朗徊,這兩個NodeManager領(lǐng)取到任務(wù)首妖,并且創(chuàng)建容器(Container)。
10荣倾、RsourceManager告知申請運行MapTask容器的NodeManger悯搔,向那兩個接受到任務(wù)的NodeManager發(fā)送程序啟動腳本,這兩個NodeManger就分別啟動MapTask舌仍,MapTask對數(shù)據(jù)進行分區(qū)排序妒貌。
11、MRAppMaster看程序都跑完了铸豁,趕緊申請2個容器灌曙,運行ReduceTask。
12节芥、ReduceTask的容器向MapTask容器獲取相應(yīng)分區(qū)的數(shù)據(jù)在刺,并執(zhí)行任務(wù)。
13头镊、程序運行完畢后蚣驼,MapResource會向RsourceManager注銷自己。
Hadoop – HelloWorld
準(zhǔn)備
1相艇、三臺機器
2颖杏、ssh
3、防火墻
配置
JAVA_HOME
hadoop-env.sh
yarn-env.sh
mapred-env.sh
core-site.xml
<!-- 指定HDFS中NameNode的地址-->
fs.defaultFS
hdfs://hadoop-senior00-levi.com:8082
<!-- 指定hadoop運行時產(chǎn)生文件的存儲目錄-->
hadoop.tmp.dir
/opt/module/hadoop-2.5.0-cdh5.3.6/data/tmp
yarn-site.xml
<!-- reducer獲取數(shù)據(jù)的方式-->
yarn.nodemanager.aux-services
mapreduce_shuffle
<!-- 指定YARN的ResourceManager的地址-->
yarn.resourcemanager.hostname
hadoop-senior01-levi.com
<!-- 任務(wù)歷史服務(wù)-->
?????
??????????? yarn.log.server.url
??????????? http://hadoop-senior00-levi.com:19888/jobhistory/logs/
hdfs-site.xml
<!-- 指定seconddaryNameNode地址坛芽,主要這個是避免NameNode掛了? -->
dfs.namenode.secondary.http-address
hadoop-senior02-levi.com:50090
<!-- 指定name.dir留储,默認(rèn)就是翼抠,但是避免未啟用,設(shè)置一下-->
?dfs.namenode.name.dir
?/opt/module/hadoop-2.5.0-cdh5.3.6/data/tmp/name
mapred-site.xml
<!-- 指定mr運行在yarn上-->
mapreduce.framework.name
yarn
<!-- 配置 MapReduce JobHistory Server 地址 获讳,默認(rèn)端口10020-->
????? mapreduce.jobhistory.address
????? hadoop-senior00-levi.com:10020
<!-- 配置 MapReduce JobHistory Server web ui 地址阴颖, 默認(rèn)端口19888-->
????? mapreduce.jobhistory.webapp.address
????? hadoop-senior00-levi.com:19888
slaves
hadoop-senior00-levi.com
hadoop-senior01-levi.com
hadoop-senior02-levi.com
Hadoop-HA
Hadoop為什么要有HA?
????? 我們都知道NameNode是存儲了所有數(shù)據(jù)的路徑丐膝,在Hadoop第一個版本是沒有HA量愧,單臺的NameNode節(jié)點掛了,那么整個數(shù)據(jù)就沒辦法訪問了尤误,
????? 那個的工程師就自己寫一個腳本去解決這個問題侠畔,定時的拷貝NameNode的fsimage和edits到別的服務(wù)器,但是數(shù)據(jù)量大的時候损晤,拷貝就很慢了软棺,而且工程師半夜正在和周公下棋的時候,NameNode掛了尤勋,那就很尷尬了喘落。
????? 雖然可以到第二天早上來恢復(fù),但是數(shù)據(jù)量那么大的時候最冰,太慢了瘦棋,滿足不了需求。
????? 所以Hadoop為了解決這個問題暖哨,在后面的版本繼承了HA(高可用)赌朋。
Hadoop-HA是什么?
????? Hadoop-HA(高可用)就是在一臺服務(wù)器掛了篇裁,第二臺服務(wù)器可以馬上頂上去沛慢。
????? 兩個基本問題:
1、第一臺服務(wù)器和第二臺服務(wù)器的數(shù)據(jù)必須要同步达布。
Hadoop-HA通過edits-log的變化团甲,來將數(shù)據(jù)寫入到JournalNode節(jié)點里面去,以分享給其他的NameNode黍聂。
2躺苦、要解決第一臺和第二臺服務(wù)器同時啟用的情況,在這種情況下产还,子節(jié)點怎么提交數(shù)據(jù)匹厘,會提交到兩臺服務(wù)器,但是又會出現(xiàn)搶占資源的情況脐区,(給一個人送東西和給兩個人送東西所耗費的體力是不言而喻的)愈诚,
這個問題在Hadoop-HA中稱為腦裂,借助第三方框架(Zookeeper)實現(xiàn)隔離機制來解決腦裂這個問題。
Hadoop–HA 實現(xiàn)
NameNode
hdfs-site.xml
???
???????dfs.replication
???? 3
????
????????? dfs.namenode.secondary.http-address
????????? hadoop104:50090
????
???? ?dfs.namenode.checkpoint.period
???? ?120
????
???? ?dfs.namenode.name.dir
???? ?/opt/module/hadoop-2.7.2/data/tmp/dfs/name
????
???? ? ?? dfs.hosts
???? ???? /opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts
????
???? ?dfs.hosts.exclude
???? ?/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude
???
????????? dfs.nameservices
????????? mycluster
???
????????? dfs.ha.namenodes.mycluster
????????? nn1,nn2
???
????????? dfs.namenode.rpc-address.mycluster.nn1
????????? hadoop102:8020
???
????????? dfs.namenode.rpc-address.mycluster.nn2
????????? hadoop103:8020
???
????????? dfs.namenode.http-address.mycluster.nn1
????????? hadoop102:50070
???
????????? dfs.namenode.http-address.mycluster.nn2
????????? hadoop103:50070
???
????????? dfs.namenode.shared.edits.dir
????????? qjournal://hadoop102:8485;hadoop103:8485;hadoop104:8485/mycluster
???
????????? dfs.ha.fencing.methods
????????? sshfence
???
????????? dfs.ha.fencing.ssh.private-key-files
????????? /home/levi/.ssh/id_rsa
???
????????? dfs.journalnode.edits.dir
????????? /opt/module/hadoop/data/jn
???
????????? dfs.permissions.enable
????????? false
???
????????? dfs.client.failover.proxy.provider.mycluster
???? ???? org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
????
????????? dfs.ha.automatic-failover.enabled
????????? true
core-site.xml
????????? fs.defaultFS
???? ??????hdfs://hadoop102:8020
???? -->
????
????????? fs.defaultFS
????????? hdfs://mycluster
????
????????? hadoop.tmp.dir
????????? /opt/module/hadoop-2.7.2/data/tmp
????
???? ?fs.trash.interval
???? ?1
???? ?
???? ?hadoop.http.staticuser.user
???? ?levi
????
????????? ha.zookeeper.quorum
????????? hadoop102:2181,hadoop103:2181,hadoop104:2181
ResourceManager
hdfs-site.xml
????????? yarn.resourcemanager.hostname
????????? hadoop103
???? -->
???
???????yarn.nodemanager.aux-services
???????mapreduce_shuffle
???
???????yarn.log-aggregation-enable
??????? true
???
???????yarn.log.server.url
???????http://hadoop103:19888/jobhistory/logs/
???
???????yarn.log-aggregation.retain-seconds
??????? 86400
???
???????yarn.resourcemanager.ha.enabled
??????? true
???
???????yarn.resourcemanager.cluster-id
??????? cluster-yarn1
???
???????yarn.resourcemanager.ha.rm-ids
??????? rm1,rm2
???
???????yarn.resourcemanager.hostname.rm1
?????? ?hadoop103
???
???????yarn.resourcemanager.hostname.rm2
??????? hadoop104
???
???????yarn.resourcemanager.zk-address
???????hadoop102:2181,hadoop103:2181,hadoop104:2181
???
???????yarn.resourcemanager.recovery.enabled
??????? true
???
???????yarn.resourcemanager.store.class????
????????? org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore