Hadoop
Hadoop 是一個(gè)提供分布式存儲(chǔ)和分布式計(jì)算的框架镊绪,為大量數(shù)據(jù)的存儲(chǔ)和計(jì)算提供了一個(gè)可靠的平臺(tái)支持。現(xiàn)在 Hadoop 和其它相關(guān)的衍生產(chǎn)品構(gòu)成了大數(shù)據(jù)生態(tài)系統(tǒng)极谊。
HDFS
HDFS 是 Hadoop 提供的一個(gè)分布式存儲(chǔ)的文件系統(tǒng)偷厦,基本思想就是分而存之雄右,讓多臺(tái)計(jì)算機(jī)分別存儲(chǔ)一個(gè)大文件的一部分,這樣就解決了大文件無法在單臺(tái)計(jì)算機(jī)上存儲(chǔ)和無法在單臺(tái)計(jì)算機(jī)上快速計(jì)算的問題。
與磁盤一樣,HDFS 也有塊的概念婿着,將一個(gè)大文件進(jìn)行拆分,每一部分就稱為一個(gè)塊 (block)醋界。在 Hadoop 2.x 中竟宋,默認(rèn)一個(gè)塊大小為 128M。當(dāng)有一個(gè) 1M 的數(shù)據(jù)存放到 HDFS 中時(shí)并不會(huì)占用一個(gè) 128M 的數(shù)據(jù)空間形纺,而是占有 1M丘侠,這是和磁盤塊不同的地方。
一個(gè)塊大小為 128M 主要原因是為了讓尋道時(shí)間小于傳輸時(shí)間的 1%逐样,假設(shè)傳輸速率為 100m/s蜗字,尋道時(shí)間為 10ms,則 (10ms / 1000) / (1%) * 100m/s = 100M官研。
HDFS 默認(rèn)情況下秽澳,會(huì)為每個(gè)數(shù)據(jù)塊存儲(chǔ) 3 份闯睹,并存儲(chǔ)在不同的計(jì)算機(jī)上戏羽,這就是其備份機(jī)制。HDFS 的可靠性就體現(xiàn)在這里楼吃。
這些數(shù)據(jù)塊分別由多臺(tái)不同的計(jì)算機(jī)存儲(chǔ)始花,而記錄這些塊在那臺(tái)機(jī)器上存儲(chǔ)、塊大小是多少孩锡、屬于哪個(gè)文件的信息稱為元數(shù)據(jù)酷宵,也可以稱為是描述數(shù)據(jù)的數(shù)據(jù)。
元數(shù)據(jù)由一個(gè)單獨(dú)的進(jìn)程來維護(hù)躬窜,這個(gè)進(jìn)程稱為 NameNode浇垦。一般會(huì)由一臺(tái)單獨(dú)的計(jì)算機(jī)作為 NameNode 節(jié)點(diǎn),也就是在那臺(tái)計(jì)算機(jī)上啟動(dòng)一個(gè) NameNode 進(jìn)程荣挨。
管理這些數(shù)據(jù)塊的工作也是由一個(gè)單獨(dú)的進(jìn)程來完成男韧,這個(gè)進(jìn)程為 DataNode朴摊。數(shù)據(jù)塊則是由多臺(tái)計(jì)算機(jī)協(xié)同存儲(chǔ)的,在每臺(tái)計(jì)算機(jī)上都會(huì)啟動(dòng)一個(gè) DataNode 進(jìn)程此虑,并時(shí)刻與 NameNode 進(jìn)行通信甚纲。
-
NameNode
NameNode 用于維護(hù)著元數(shù)據(jù)信息,為了提高查找效率朦前,NameNode 會(huì)將元數(shù)據(jù)信息存放在內(nèi)存中介杆。所以,NameNode 節(jié)點(diǎn)必須是大內(nèi)存的韭寸,這也是 HDFS 的一個(gè)瓶頸春哨。
內(nèi)存是不可靠的,所以元數(shù)據(jù)信息也會(huì)持久化到磁盤一份恩伺,這個(gè)稍后再說悲靴。
由于每個(gè)數(shù)據(jù)塊都會(huì)產(chǎn)生一條元數(shù)據(jù)信息,如果 HDFS 中存放大量小文件莫其,就會(huì)產(chǎn)生大量的元數(shù)據(jù)信息癞尚,這樣 NameNode 的內(nèi)存就會(huì)很快就會(huì)撐爆。(最簡單的解決辦法就是將文件進(jìn)行合并乱陡,然后上傳到 HDFS 中)
-
DataNode
DataNode 是一個(gè)維護(hù)其所在計(jì)算機(jī)的數(shù)據(jù)塊的進(jìn)程浇揩,主要工作可以分為讀和寫兩部分,也就是檢索和存儲(chǔ)功能憨颠。同時(shí) DataNode 也會(huì)定時(shí)向 NameNode 報(bào)告其健康狀況和其所維護(hù)的數(shù)據(jù)塊列表胳徽。
NameNode 是 HDFS 的命門,如果 NameNode 掛掉之后 HDFS 就徹底的無法提供服務(wù)了爽彤,并且存儲(chǔ)在內(nèi)存中的元數(shù)據(jù)信息也會(huì)丟失养盗,那樣就永遠(yuǎn)無法提供服務(wù)了。所以适篙,為防止元數(shù)據(jù)丟失問題往核,HDFS 有了就 SecondaryNameNode 來幫助 NameNode 進(jìn)行元數(shù)據(jù)的持久化,有高可用機(jī)制來進(jìn)行 NameNode 的主備切換工作嚷节。
-
SecondaryNameNode
SecondaryNameNode 可以說是 NameNode 的輔助節(jié)點(diǎn)聂儒,在一定程度上也可以起到備份節(jié)點(diǎn)的作用。在 NameNode 中會(huì)有一個(gè)名為 FSimage 的舊的元數(shù)據(jù)持久化文件和一個(gè)名為 Edits Log 的預(yù)寫日志硫痰。
SecondaryNameNode 會(huì)定期詢問 NameNode 是否需要將 FSimage 和 Edits Log 進(jìn)行合并 (稱為檢查點(diǎn))衩婚,通過設(shè)置間隔時(shí)間和 Edits Log 的文件大小閾值來限定是否需要合并。
SecondaryNameNode 從 NameNode 拉取過來 FSimage 和 Edits Log 后效斑,會(huì)根據(jù)預(yù)寫日志進(jìn)行重演非春,然后合并到 FSimage 中,最后將合并后的 FSimage 發(fā)給 NameNode,并且自身也會(huì)存儲(chǔ)一份奇昙。
當(dāng) NameNode 重啟的時(shí)候坐搔,就會(huì)讀取 FSimage 中的持久化文件進(jìn)行元數(shù)據(jù)的恢復(fù)。當(dāng) NameNode 節(jié)點(diǎn)磁盤也壞的時(shí)候敬矩,SecondaryNameNode 保存的 FSimage 也可以一定程度上進(jìn)行元數(shù)據(jù)的恢復(fù)概行,但是會(huì)丟失一部分?jǐn)?shù)據(jù)數(shù)據(jù)。
在介紹完 HDFS 基本組成后弧岳,我們再看看 HDFS 讀寫操作流程:
-
寫操作
當(dāng)客戶端上傳文件的時(shí)候凳忙,會(huì)先向 NameNode 發(fā)送一個(gè)寫請求,NameNode 會(huì)先對身份和時(shí)候已經(jīng)存在這個(gè)文件進(jìn)行一個(gè)校驗(yàn)禽炬。校驗(yàn)通過后涧卵,就會(huì)給客戶端一個(gè)確認(rèn)消息,告訴它可以上傳腹尖。
客戶端收到確認(rèn)消息后柳恐,就會(huì)對文件進(jìn)行分塊,分塊操作是在客戶端進(jìn)行的热幔。分塊完成后乐设,就會(huì)向 NameNode 詢問第一個(gè)數(shù)據(jù)塊的存放地址,NameNode 會(huì)根據(jù)動(dòng)態(tài)感知機(jī)制绎巨,為這個(gè)數(shù)據(jù)塊找到一個(gè)合適的存儲(chǔ)位置近尚,然后將 DataNode 的地址返回給客戶端。
客戶端在收到上傳地址后场勤,就會(huì)與 DataNode 進(jìn)行通信并上傳戈锻,由于有備份機(jī)制, DataNode 在收到數(shù)據(jù)后和媳,會(huì)發(fā)送給備份 DataNode格遭。
數(shù)據(jù)塊傳輸完成后,就會(huì)給客戶端發(fā)送一個(gè)確認(rèn)消息留瞳。然后客戶端告訴 NameNode 上傳完成拒迅,再向 NameNode 發(fā)送上傳第二個(gè)數(shù)據(jù)塊的請求,以此類推撼港。
由此可見 HDFS 的寫操作是串行進(jìn)行的坪它。
-
讀操作
客戶端會(huì)向 NameNode 發(fā)送一個(gè)讀操作請求骤竹,NameNode 會(huì)返回最近通信的存儲(chǔ)請求文件的 DataNode 的節(jié)點(diǎn)地址帝牡。
客戶端收到地址后,就會(huì)從 DataNode 中并行讀取蒙揣,然后將讀取到的數(shù)據(jù)在客戶端進(jìn)行合并靶溜。
如果,讀取失敗后,客戶端就會(huì)想 NameNode 重新發(fā)送請求罩息,NameNode 會(huì)從其它備份節(jié)點(diǎn)中選擇一個(gè)嗤详,返回給客戶端。
HDFS 就簡單說到這里瓷炮。
MapReduce
MapReduce 是 Hadoop 提供的一個(gè)分布式計(jì)算框架葱色,基本思想就是分而算之和移動(dòng)程序不移動(dòng)數(shù)據(jù),也就是針對每個(gè)數(shù)據(jù)塊進(jìn)行運(yùn)算 (MapTask)娘香,最后將每個(gè)節(jié)點(diǎn)的運(yùn)算結(jié)果進(jìn)行匯總 (ReduceTask)苍狰。
MapReduce 的工作可以基本分為讀取、Shuffle和輸出三步:
-
分片
分片是 MR 程序?qū)斎胛募指畹囊粋€(gè)文件集的引用烘绽。不同于 HDFS 中的數(shù)據(jù)塊淋昭,一個(gè)分片就代表著一個(gè) MapTask 輸入數(shù)據(jù)的引用。
MapReduce 程序會(huì)為每個(gè)分片都會(huì)啟動(dòng)一個(gè) MapTask 進(jìn)程安接,讓其專門處理這個(gè)分片所引用的數(shù)據(jù)翔忽。
默認(rèn)情況下,一個(gè)數(shù)據(jù)塊就對應(yīng)一個(gè)分片盏檐,這樣主要是為了避免數(shù)據(jù)在網(wǎng)絡(luò)上傳輸歇式,只需要將 MapTask 程序發(fā)送到數(shù)據(jù)塊所在的節(jié)點(diǎn)就行了,這就是數(shù)據(jù)不動(dòng)程序動(dòng)胡野。
這樣就會(huì)產(chǎn)生數(shù)據(jù)塊過大和數(shù)據(jù)塊過小兩種情況:
- 如果數(shù)據(jù)塊過小(大量小文件)贬丛,這樣每個(gè)數(shù)據(jù)塊作為一個(gè)分片,就會(huì)啟動(dòng)大量的 MapTask给涕。而每個(gè) MapTask 都是一個(gè)進(jìn)程豺憔,這樣就把大量的時(shí)間花費(fèi)在了創(chuàng)建線程銷毀線程上了。MapReduce 提供了 CombineFileInputFormat 類够庙,將所有數(shù)據(jù)塊作為一個(gè)分片恭应,也就是只啟動(dòng)一個(gè) MapTask。
- 如果數(shù)據(jù)塊過大耘眨,那就降低了并行度昼榛,無法發(fā)揮分布式計(jì)算的優(yōu)勢√弈眩可以根據(jù)具體的業(yè)務(wù)胆屿,將數(shù)據(jù)塊大小調(diào)整為合適的尺寸。
-
輸入 / InputFormat
InputFormat 用來為 MR 程序提供計(jì)算分片和獲取對應(yīng)分片的 Reader 服務(wù):
public abstract class InputFormat<K, V> { // 計(jì)算分片 public abstract List<InputSplit> getSplits(JobContext context); // 根據(jù) split 獲取數(shù)據(jù) Reader public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context); }
getSplits() 方法是由客戶端進(jìn)行調(diào)用的偶宫,然后將分片信息存放到 HDFS 中非迹。ApplicationMaster 會(huì)從 HDFS 中進(jìn)行拉取,并根據(jù)分片信息纯趋,選擇最優(yōu)的位置在 Worker 上啟動(dòng) MapTask憎兽。
-
MapTask
我們寫 MR 程序的時(shí)候冷离,Map 端都會(huì)繼承 Mapper:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { } // 初始化 protected void setup(Context context) { } // 我們一般都會(huì)重寫這個(gè)方法 // 默認(rèn)是,將 key-value 原樣輸出 protected void map(KEYIN key, VALUEIN value, Context context){ context.write((KEYOUT) key, (VALUEOUT) value); } // 清理工作 protected void cleanup(Context context) { } // Map 任務(wù)啟動(dòng)的時(shí)候纯命,就會(huì)調(diào)用這個(gè)方法 public void run(Context context) throws IOException, InterruptedException { setup(context); try { // 內(nèi)部就是調(diào)用 RecordReader 的 nextKeyValue() 方法 while (context.nextKeyValue()) { // RecordReader 的 getCurrentKey() 和 getCurrentValue() 方法 map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } }
這里只是簡單的看一下工作流程西剥,具體的細(xì)節(jié)在剖析源碼的時(shí)候再看。
Mapper 將從 RecordReader 中獲取到的 key-value 交給 map() 方法去做具體的運(yùn)算工作亿汞,最后我們會(huì)調(diào)用 context.write() 方法將處理后的 key-value 寫出瞭空。
這樣就進(jìn)行到了 Map 端的 Shuffle 操作。
-
Shuffle
Shuffle 操作可以分為 Map 端和 Reduce 端兩部分疗我,總的來說匙铡,Shuffle 就做了分區(qū)、聚合和排序三件事碍粥。
Map 端調(diào)用完 context.write() 方法后鳖眼,就會(huì)通過 RecordWriter 將 key-value 按 key 進(jìn)行分區(qū),并寫入到環(huán)形緩沖區(qū)中嚼摩,并會(huì)在環(huán)形緩沖區(qū)中進(jìn)行一次快排操作钦讳。
緩沖區(qū)大小默認(rèn)為 100M,閾值為 80%枕面,也就是緩沖區(qū)寫滿 80% 的時(shí)候就會(huì)發(fā)生溢寫操作愿卒,將緩沖區(qū)的數(shù)據(jù)溢寫到磁盤,每次溢寫都會(huì)產(chǎn)生一個(gè)新的文件潮秘∏砜可以將緩沖區(qū)大小設(shè)的更大一些,盡量避免溢寫的發(fā)生枕荞。
當(dāng) Map 端將數(shù)據(jù)寫完后柜候,會(huì)將溢寫文件進(jìn)行合并,然后按在進(jìn)行一次歸并排序躏精,這樣就產(chǎn)生了分區(qū)且排序后的 key-value渣刷,來等待 Reduce 端的拉取。
如果設(shè)置了 Combiner 的話矗烛,就會(huì)在溢寫的時(shí)候執(zhí)行和最后合并數(shù)據(jù)的時(shí)候執(zhí)行辅柴,并不是只執(zhí)行一次。
分區(qū)數(shù)是由 ReduceTasK 的數(shù)量來決定的瞭吃,默認(rèn)使用 HashPartiton 進(jìn)行分區(qū)操作碌嘀,當(dāng)然,也可以根據(jù)業(yè)務(wù)需求進(jìn)行自定義分區(qū):
// 自定義 partition public class PhonePartition extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numReduceTask) { String phoneNumber = key.toString(); if(phoneNumber.startsWith("137")){ return 0; } if(phoneNumber.startsWith("138")){ return 1; } if(phoneNumber.startsWith("139")){ return 2; } if(phoneNumber.startsWith("135")){ return 3; } if(phoneNumber.startsWith("136")){ return 4; } return 5; } }
排序操作默認(rèn)使用的是字典排序歪架,也可以自定義排序器:
// 方式一 public class MyPair implements WritableComparable<SortPair> { private String first; private int second; // ... @Override public int compareTo(SortPair sortPair) { String anoFirst = sortPair.getFirst(); int firstComp = first.compareTo(anoFirst); if(firstComp != 0){ return firstComp; } else { int anoSecond = sortPair.getSecond(); return second - anoSecond; } } // ... } // 方式二 public class MyPartitioner extends Partitioner<Text, Text> { @Override public int getPartition(Text text, Text text2, int i) { return text.compareTo(text2); } } // Job job.setPartitionerClass(MyPartitioner.class);
Reduce 端 Shuffle 就是對從 Map 端拉取過來的數(shù)據(jù)進(jìn)行一次聚合操作股冗,將相同 key 的 value 方法一起,并暴露給 ReduceTask 一個(gè) value 迭代器牡拇。
-
輸出
輸出就比較簡單了魁瞪,將數(shù)據(jù)輸出到指定介質(zhì)中穆律。
YARN
Yarn 是一個(gè)獨(dú)立的資源調(diào)度框架惠呼,由 ResourceManger 和 NodeManager 兩部分構(gòu)成:
-
ResourceManger
ResourceManger 用來處理用戶提交的任務(wù)請求,并維護(hù) NodeManager 的節(jié)點(diǎn)信息剔蹋。
-
NodeManager
NodeManager 主要用于資源管理旅薄、任務(wù)管理和 Container (容器) 管理、
-
ApplicationMaster
ApplicationMaster 是 ResourceManger 在 NodeManager 上啟動(dòng)個(gè)一個(gè)負(fù)責(zé)管理特定任務(wù)的進(jìn)程,ResourceManger 只負(fù)責(zé)分派任務(wù)矫付,不復(fù)制管理任務(wù)杀赢,這個(gè)工作就是由 ApplicationMaster 來完成的脖咐。
-
Container
資源管理單位,ApplicationMaster 向 ResourceManager 請求任務(wù)所需要的資源的時(shí)候,ResourceManager 分配給它的資源就可以理解為一個(gè) Container折欠。