Hadoop生態(tài)系統(tǒng)為大數(shù)據(jù)領(lǐng)域提供了開源的分布式存儲(chǔ)和分布式計(jì)算的平臺(tái)使鹅,這一章我們進(jìn)行Hadoop生態(tài)系統(tǒng)的入門學(xué)習(xí)锅锨,介紹其中分布式文件系統(tǒng)HDFS侨颈、分布式資源調(diào)度YARN逗威、分布式計(jì)算框架MapReduce(包含Spark的入門以及和MapReduce的比較),最后通過Spring Boot集成Hadoop來訪問文件系統(tǒng)糜值。
大數(shù)據(jù)的應(yīng)用
本人喜歡體育運(yùn)動(dòng),以體育中來舉列子坯墨。
足球點(diǎn)球大戰(zhàn)
2006年世界杯中德國(guó)隊(duì)和阿根廷隊(duì)的點(diǎn)球大戰(zhàn)中寂汇,德國(guó)隊(duì)守門員教練科普克,給了門將萊曼一張紙條捣染,萊曼看過紙條將計(jì)就計(jì)骄瓣。阿根廷隊(duì)的每個(gè)點(diǎn)球,幾乎都被萊曼判斷對(duì)了方向液斜,并成功撲出坎比亞索和阿亞拉的點(diǎn)球累贤,幫助德國(guó)隊(duì)打進(jìn)四強(qiáng)。而這張紙條就是根據(jù)數(shù)據(jù)分析記錄了阿根廷隊(duì)員點(diǎn)球的習(xí)慣方向少漆。
金州勇士的崛起
如今越來越多的籃球隊(duì)開始重視和應(yīng)用籃球大數(shù)據(jù)臼膏,比如NBA中的金州勇士隊(duì)
。勇士隊(duì)曾長(zhǎng)期以來一直是NBA里最爛的球隊(duì)之一示损,2009年它的成績(jī)排名倒數(shù)第二渗磅。沒有任何執(zhí)教N(yùn)BA經(jīng)驗(yàn)的史蒂夫·科爾,因突出的投籃優(yōu)勢(shì)被委任為教練检访∈加悖科爾在執(zhí)掌勇士隊(duì)之后,堅(jiān)持用數(shù)據(jù)說話而不是憑經(jīng)驗(yàn)脆贵。他根據(jù)數(shù)據(jù)工程師對(duì)歷年來NBA比賽的統(tǒng)計(jì)医清,發(fā)現(xiàn)最有效的進(jìn)攻是眼花繚亂的傳球和準(zhǔn)確的投籃,而不是彰顯個(gè)人能力的突破和扣籃卖氨。在這個(gè)思想的指導(dǎo)下会烙,勇士隊(duì)隊(duì)員苦練神投技。這其中最亮眼的新打法是盡可能地從24英尺(大約7.3米)外的三分線投籃筒捺,這樣可以得3分柏腻。于是開發(fā)了小球時(shí)代,在2015-2018摘下三枚總冠軍系吭,成功成為NBA的霸主五嫂。
大數(shù)據(jù)的基本概率
有人說大數(shù)據(jù)的特點(diǎn)就是數(shù)據(jù)量大,這個(gè)不是非常的正確,數(shù)據(jù)量大不是關(guān)鍵沃缘,通過數(shù)據(jù)分析在數(shù)據(jù)中提取出價(jià)值躯枢,最終帶來商業(yè)上的利益,這才是大數(shù)據(jù)分析的最終目標(biāo)孩灯。大數(shù)據(jù)有4個(gè)特點(diǎn)闺金,一般我們稱之為4V,分別為:
- Volume(大量):隨著信息技術(shù)的高速發(fā)展峰档,數(shù)據(jù)開始爆發(fā)性增長(zhǎng)败匹。社交網(wǎng)絡(luò)(微博、推特讥巡、臉書)掀亩、移動(dòng)網(wǎng)絡(luò)、各種智能工具欢顷,服務(wù)工具等槽棍,都成為數(shù)據(jù)的來源。數(shù)據(jù)的存儲(chǔ)也從過去的GB到TB抬驴,乃至現(xiàn)在的PB炼七、EB級(jí)別。
- Variety(多樣):廣泛的數(shù)據(jù)來源布持,決定了大數(shù)據(jù)形式的多樣性豌拙。任何形式的數(shù)據(jù)都可以產(chǎn)生作用。
- Velocity(高速):大數(shù)據(jù)的產(chǎn)生非常迅速题暖,主要通過互聯(lián)網(wǎng)傳輸按傅。大數(shù)據(jù)對(duì)處理速度有非常嚴(yán)格的要求,服務(wù)器中大量的資源都用于處理和計(jì)算數(shù)據(jù)胧卤,很多平臺(tái)都需要做到實(shí)時(shí)分析唯绍。數(shù)據(jù)無時(shí)無刻不在產(chǎn)生,誰的速度更快枝誊,誰就有優(yōu)勢(shì)况芒。
- Value(價(jià)值):這也是大數(shù)據(jù)的核心特征。通過從大量不相關(guān)的各種類型的數(shù)據(jù)中叶撒,挖掘出對(duì)未來趨勢(shì)與模式預(yù)測(cè)分析有價(jià)值的數(shù)據(jù)绝骚,并通過機(jī)器學(xué)習(xí)方法、人工智能方法或數(shù)據(jù)挖掘方法深度分析痊乾,發(fā)現(xiàn)新規(guī)律和新知識(shí)皮壁,并運(yùn)用于農(nóng)業(yè)椭更、金融哪审、醫(yī)療等各個(gè)領(lǐng)域,從而最終達(dá)到改善社會(huì)治理虑瀑、提高生產(chǎn)效率湿滓、推進(jìn)科學(xué)研究的效果滴须。
大數(shù)據(jù)涉及到的技術(shù)
- 數(shù)據(jù)采集:把海量數(shù)據(jù)收集到數(shù)據(jù)平臺(tái)上來,才能做后續(xù)的數(shù)據(jù)分析叽奥。
- 數(shù)據(jù)存儲(chǔ):數(shù)據(jù)的存儲(chǔ)位置扔水。由于數(shù)據(jù)量巨大,一般為分布式存儲(chǔ)系統(tǒng)朝氓,較通用用的為魔市。
- 數(shù)據(jù)分析:對(duì)數(shù)據(jù)進(jìn)行有效性分析(數(shù)據(jù)分析框架MapReduce,spark等)赵哲。
- 可視化:把分析結(jié)果可視化展示待德。
分布式文件系統(tǒng)HDFS
HDFS概述及設(shè)計(jì)目標(biāo)
HDFS源于Google的GFS論文,設(shè)計(jì)目標(biāo)為
- 非常巨大的分布式文件系統(tǒng)枫夺。
- 運(yùn)行在普通廉價(jià)的硬件上将宪。
- 易擴(kuò)展、為用戶提供性能不錯(cuò)的文件存儲(chǔ)服務(wù)橡庞。
HDFS架構(gòu)
HDFS是一種master/slave的架構(gòu)较坛。一個(gè)HDFS集群包含一個(gè)唯一的NameNode(NN),這個(gè)master server管理著整個(gè)文件系統(tǒng)的命名空間并且調(diào)節(jié)客戶端對(duì)文件的訪問扒最。同時(shí)丑勤,還擁有一系列的DataNode(DN),每個(gè)都管理著他們運(yùn)行的對(duì)應(yīng)節(jié)點(diǎn)的數(shù)據(jù)存儲(chǔ)扼倘。HDFS提供了一個(gè)文件系統(tǒng)的命名空間同時(shí)允許用戶將數(shù)據(jù)存在這些文件上确封。通常,一個(gè)文件被拆分成一個(gè)或多個(gè)數(shù)據(jù)塊再菊,并且這些數(shù)據(jù)塊被保存在一系列的DataNode上爪喘。NameNode執(zhí)行文件系統(tǒng)的命名空間的相關(guān)操作比如打開、關(guān)閉纠拔、重命名目錄或者文件秉剑。同時(shí)決定了數(shù)據(jù)塊到DataNode的映射。DataNode為客戶端的讀取寫入需求提供服務(wù)稠诲,同時(shí)處理NameNode發(fā)來的數(shù)據(jù)塊的創(chuàng)建侦鹏、刪除、復(fù)制等需求臀叙。
HDFS副本機(jī)制
在前面說過HDFS使用相對(duì)廉價(jià)的計(jì)算機(jī)略水,那么宕機(jī)就是一種必然事件,我們需要讓數(shù)據(jù)避免丟失劝萤,就只有采取冗余數(shù)據(jù)存儲(chǔ)渊涝,而具體的實(shí)現(xiàn)就是副本機(jī)制。具體為把一個(gè)文件分為很多的塊,一個(gè)塊默認(rèn)為128M跨释,而這些塊是以多副本的形式存儲(chǔ)胸私。比如存儲(chǔ)三個(gè)副本:
- 第一副本:如果上傳節(jié)點(diǎn)是DataNode(DN),則上傳該節(jié)點(diǎn)鳖谈;如果上傳節(jié)點(diǎn)是NameNode(NN)岁疼,則隨機(jī)選擇DataNode(DN) 。
- 第二副本:放置在不同機(jī)架的DataNode(DN)上 缆娃。
-
第三副本:放置在與第二副本相同機(jī)架的不同DataNode(DN)上捷绒。
這種方式可以極大程度上避免了宕機(jī)所造成的數(shù)據(jù)丟失。而數(shù)據(jù)庫(kù)的存儲(chǔ)的元數(shù)據(jù)是存儲(chǔ)在NameNode(NN)中贯要,在數(shù)據(jù)讀取是可以知道在那些節(jié)點(diǎn)上讀取文件疙驾。下面是官方架構(gòu)圖。
HDFS環(huán)境搭建
Apache Hadoop 有版本管理混亂郭毕,部署過程繁瑣它碎、升級(jí)過程復(fù)雜,兼容性差等缺點(diǎn)显押,而CDH是Hadoop眾多分支中的一種扳肛,由Cloudera維護(hù),基于穩(wěn)定版本的Apache Hadoop構(gòu)建乘碑。使用CDH可以避免在使用過程中的依賴包沖突問題挖息,對(duì)版本的升級(jí)也很方便,所以我們使用Hadoop-2.6.0-cdh5.7.0版本進(jìn)行安裝兽肤。本人是使用虛擬機(jī)進(jìn)行偽分布式模式的搭建套腹,即在一臺(tái)機(jī)器上安裝,集群模式其實(shí)和偽分布式模式差不太多资铡。
- 第一步安裝JDK
//把壓縮包jdk-8u181-linux-i586.tar.gz上傳到虛擬機(jī)
rz 選擇文件上傳
//在根目錄下建立解壓文件夾
mkdir apps
//解壓jdk到apps目錄
tar -zxvf jdk-8u181-linux-i586.tar.gz -C ~/apps/
//添加環(huán)境變量
vi ~/.bash_profile
//在編輯模式添加以下內(nèi)容
export JAVA_HOME=/root/apps/jdk1.8.0_181
export PATH=$JAVA_HOME/bin:$PATH
//編輯完成后电禀,保存退出
//使配置生效
source ~/.bash_profile
//驗(yàn)證
java -version
//若出現(xiàn)版本信息表示安裝成功
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (build 1.8.0_181-b13)
OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode)
- 第二步安裝SSH
//安裝SSH
sudo yum install ssh
//生成密鑰文件
ssh-keygen -t rsa
//配置免密鑰登錄:復(fù)制公鑰到authorized_keys,因?yàn)镹ameNode(NN)需要連接訪問DataNode(DN)笤休,配置后可直接訪問不用登錄尖飞,在集群環(huán)境下需要復(fù)制到其它節(jié)點(diǎn)。
cp ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys
- 安裝hadoop
下載:直接去cdh網(wǎng)站下載 [http://archive.cloudera.com/cdh5/cdh/5/]
//解壓
tar -zxvf hadoop-2.6.0-cdh5.7.0.tar.gz -C ~/app
//配置環(huán)境變量
export HADOOP_HOME=/root/apps/hadoop
export PATH=$HADOOP_HOME/bin:$PATH
//hadoop配置文件的修改(hadoop_home/etc/hadoop/)
hadoop-env.sh:
export JAVA_HOME=/root/apps/jdk1.8.0_181
core-site.xml:
<property>
<name>fs.defaultFS</name>
<value>hdfs://192.168.30.130:8092</value>//配置NameNode(NN)地址
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/app/tmp</value>//因?yàn)镠DFS默認(rèn)是存儲(chǔ)在零時(shí)文件中店雅,關(guān)機(jī)后會(huì)丟失政基,這個(gè)配置持久化文件
</property>
hdfs-site.xml:
<property>
<name>dfs.replication</name>//設(shè)置副本系數(shù),這里因?yàn)橹挥幸粋€(gè)節(jié)點(diǎn)闹啦,所以設(shè)置為1
<value>1</value>
</property>
slaves:
localhost
在里面配置節(jié)點(diǎn)沮明,現(xiàn)配置為虛擬機(jī)本地。如果為集群模式窍奋,則需要在里面配置其它節(jié)點(diǎn)的地址荐健。
啟動(dòng)hdfs:
//格式化文件系統(tǒng)(僅第一次執(zhí)行即可圣勒,不要重復(fù)執(zhí)行)
hdfs/hadoop namenode -format
//啟動(dòng)
hdfs: sbin/start-dfs.sh
//驗(yàn)證是否啟動(dòng)成功:
jps //該命令會(huì)有以下三個(gè)進(jìn)程
DataNode
SecondaryNameNode
NameNode
瀏覽器訪問方式: http://虛擬機(jī)地址:50070 有hdfs主界面
//停止hdfs
sbin/stop-dfs.sh
HDFS shell
HDFS shell的命令有很多,輸入hadoop fs回車摧扇,會(huì)有有多命令的提示。
Usage: hadoop fs [generic options]
[-appendToFile <localsrc> ... <dst>]
[-cat [-ignoreCrc] <src> ...]
[-checksum <src> ...]
[-chgrp [-R] GROUP PATH...]
[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
[-chown [-R] [OWNER][:[GROUP]] PATH...]
[-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>]
[-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
[-count [-q] [-h] [-v] <path> ...]
[-cp [-f] [-p | -p[topax]] <src> ... <dst>]
[-createSnapshot <snapshotDir> [<snapshotName>]]
[-deleteSnapshot <snapshotDir> <snapshotName>]
[-df [-h] [<path> ...]]
[-du [-s] [-h] <path> ...]
[-expunge]
[-find <path> ... <expression> ...]
[-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
[-getfacl [-R] <path>]
[-getfattr [-R] {-n name | -d} [-e en] <path>]
[-getmerge [-nl] <src> <localdst>]
這里介紹HDFS shell常用命令的使用(其實(shí)和Linux命令相似)
//查看文件目錄ls
hadoop fs -ls / 查看根目錄
//建立目錄mkdir
hadoop fs -mkdir 文件目錄
//上傳文件 put
hadoop fs -put 本地文件路徑 hdfs文件路徑
//get 下載文件到本地
hadoop fs -get hdfs文件路徑
//刪除文件/文件夾rm
hadoop fs -rm 文件
hadoop fs -rm -R 文件夾 //級(jí)聯(lián)刪除
Java API操作HDFS
IDEA+Maven創(chuàng)建Java工程
添加HDFS相關(guān)依賴
<properties>
<java.version>1.8</java.version>
<hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>cloudera</id>
<url>http://repository.cloudera.com/artifactory/cloudera-repos/</url>//hadoop倉(cāng)庫(kù)
</repository>
</repositories>
開發(fā)Java Api操作HDFS文件使用Junit測(cè)試的方式挚歧,代碼如下
public class JunitTest {
FileSystem fileSystem = null;
Configuration configuration = null;
public static final String URL = "hdfs://192.168.30.130:8092";
@Before
public void setUp() throws IOException, InterruptedException {
//初始化配置
configuration = new Configuration();
//初始化FileSystem扛稽,其中包含了文件操作的函數(shù),其中root為虛擬機(jī)用戶名
fileSystem = FileSystem.get(URI.create(URL),configuration,"root");
}
@Test
public void mkdir() throws IOException {
//創(chuàng)建目錄
fileSystem.mkdirs(new Path("/test"));
}
@Test
public void create() throws IOException {
//創(chuàng)建文件滑负,得到輸出流對(duì)象
FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("/test/test.txt"));
//寫入文件內(nèi)容
fsDataOutputStream.write("hello dzy".getBytes());
//關(guān)閉流
fsDataOutputStream.close();
}
@After
public void setDown() throws IOException {
configuration.clear();
fileSystem.close();
}
}
在以上只是實(shí)驗(yàn)了幾個(gè)API還有很多其它的API在张,感興趣的可以繼續(xù)學(xué)習(xí)。
HDFS文件讀寫流程
三個(gè)角色的交互 客戶端/NameNode/DataNode
HDFS寫流程
- 客戶端向NameNode發(fā)出寫文件請(qǐng)求矮慕。
- 檢查是否已存在文件帮匾、檢查權(quán)限。若通過檢查痴鳄,直接先將操作寫入EditLog瘟斜,并返回輸出流對(duì)象。
(注:先寫Log痪寻,再寫內(nèi)存螺句,因?yàn)镋ditLog記錄的是最新的HDFS客戶端執(zhí)行所有的寫操作。如果后續(xù)真實(shí)寫操作失敗了橡类,由于在真實(shí)寫操作之前蛇尚,操作就被寫入EditLog中了,故EditLog中仍會(huì)有記錄顾画,我們不用擔(dān)心后續(xù)client讀不到相應(yīng)的數(shù)據(jù)塊取劫,因?yàn)樵诘?步中DataNode收到塊后會(huì)有一返回確認(rèn)信息,若沒寫成功研侣,發(fā)送端沒收到確認(rèn)信息谱邪,會(huì)一直重試,直到成功) - client端按128MB的塊切分文件庶诡。
- client將NameNode返回的分配的可寫的DataNode列表和Data數(shù)據(jù)一同發(fā)送給最近的第一個(gè)DataNode節(jié)點(diǎn)虾标,此后client端和NameNode分配的多個(gè)DataNode構(gòu)成pipeline管道,client端向輸出流對(duì)象中寫數(shù)據(jù)灌砖。client每向第一個(gè)DataNode寫入一個(gè)packet璧函,這個(gè)packet便會(huì)直接在pipeline里傳給第二個(gè)、第三個(gè)…DataNode基显。
(注:并不是寫好一個(gè)塊或一整個(gè)文件后才向后分發(fā)) - 每個(gè)DataNode寫完一個(gè)塊后蘸吓,會(huì)返回確認(rèn)信息。
- 寫完數(shù)據(jù)撩幽,關(guān)閉輸輸出流库继。
- 發(fā)送完成信號(hào)給NameNode箩艺。
(注:發(fā)送完成信號(hào)的時(shí)機(jī)取決于集群是強(qiáng)一致性還是最終一致性,強(qiáng)一致性則需要所有DataNode寫完后才向NameNode匯報(bào)宪萄。最終一致性則其中任意一個(gè)DataNode寫完后就能單獨(dú)向NameNode匯報(bào)艺谆,HDFS一般情況下都是強(qiáng)調(diào)強(qiáng)一致性)
HDFS讀流程
- client訪問NameNode,查詢?cè)獢?shù)據(jù)信息拜英,獲得這個(gè)文件的數(shù)據(jù)塊位置列表静汤,返回輸入流對(duì)象。
- 就近挑選一臺(tái)datanode服務(wù)器居凶,請(qǐng)求建立輸入流 虫给。
- DataNode向輸入流中中寫數(shù)據(jù),以packet為單位來校驗(yàn)侠碧。
- 關(guān)閉輸入流
HDFS優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- 支持超大文件抹估。
- 檢測(cè)和快速應(yīng)對(duì)硬件故障:在集群的環(huán)境中,硬件故障是常見的問題弄兜。因?yàn)橛猩锨_(tái)服務(wù)器連接在一起药蜻,這樣會(huì)導(dǎo)致高故障率。因此故障檢測(cè)和自動(dòng)恢復(fù)是hdfs文件系統(tǒng)的一個(gè)設(shè)計(jì)目標(biāo)替饿。
- 流式數(shù)據(jù)訪問:Hdfs的數(shù)據(jù)處理規(guī)模比較大谷暮,應(yīng)用一次需要訪問大量的數(shù)據(jù),同時(shí)這些應(yīng)用一般都是批量處理盛垦,而不是用戶交互式處理湿弦。應(yīng)用程序能以流的形式訪問數(shù)據(jù)集。主要的是數(shù)據(jù)的吞吐量腾夯,而不是訪問速度颊埃。
- 簡(jiǎn)化的一致性模型:在hdfs中,一個(gè)文件一旦經(jīng)過創(chuàng)建蝶俱、寫入班利、關(guān)閉后,一般就不需要修改了榨呆。這樣簡(jiǎn)單的一致性模型罗标,有利于提高吞吐量。
缺點(diǎn)
- 低延遲數(shù)據(jù)訪問:如和用戶進(jìn)行交互的應(yīng)用积蜻,需要數(shù)據(jù)在毫秒或秒的范圍內(nèi)得到響應(yīng)闯割。由于hadoop針對(duì)高數(shù)據(jù)吞吐量做了優(yōu)化,犧牲了獲取數(shù)據(jù)的延遲竿拆,所以對(duì)于低延遲來說宙拉,不適合用hadoop來做。
- 大量的小文件:Hdfs的NameNode中存儲(chǔ)了文件分塊的元數(shù)據(jù)信息丙笋,如果有大量的小文件谢澈,導(dǎo)致元數(shù)據(jù)信息增加煌贴,增加NameNode負(fù)荷。
- 不支持超強(qiáng)的事務(wù):沒有像關(guān)系型數(shù)據(jù)庫(kù)那樣锥忿,對(duì)事務(wù)有強(qiáng)有力的支持牛郑。
資源調(diào)度框架YARN
YARN概述
YARN是Hadoop 2.0中的資源管理系統(tǒng),可以讓不同計(jì)算框架(MapReduce\Spark等)可以共享同一個(gè)HDFS集群上的數(shù)據(jù)敬鬓,享受整體的資源調(diào)度淹朋。
YARN架構(gòu)
YARN主要由ResourceManager、NodeManager列林、ApplicationMaster和Container等幾個(gè)組件構(gòu)成。
- ResourceManager是Master上一個(gè)獨(dú)立運(yùn)行的進(jìn)程酪惭,負(fù)責(zé)集群統(tǒng)一的資源管理希痴、調(diào)度、分配等等春感。
- NodeManager是Slave上一個(gè)獨(dú)立運(yùn)行的進(jìn)程砌创,負(fù)責(zé)上報(bào)節(jié)點(diǎn)的狀態(tài),處理單個(gè)節(jié)點(diǎn)的資源管理 鲫懒、處理來自ResouceManager的命令 嫩实、處理來自ApplicationMaster的命令。
- ApplicationMaster和Container是運(yùn)行在Slave上的組件窥岩,為應(yīng)用程序申請(qǐng)資源甲献,并分配給內(nèi)部任務(wù) ,任務(wù)的監(jiān)控和容錯(cuò)等颂翼。
- Container是yarn中分配資源的一個(gè)單位晃洒,包涵內(nèi)存、CPU等等資源朦乏,yarn以Container為單位分配資源球及。
YARN執(zhí)行流程
YARN總體上仍然是master/slave結(jié)構(gòu),在整個(gè)資源管理框架中呻疹,resourcemanager為master吃引,nodemanager是slave。Resourcemanager負(fù)責(zé)對(duì)各個(gè)nademanger上資源進(jìn)行統(tǒng)一管理和調(diào)度刽锤。當(dāng)用戶提交一個(gè)應(yīng)用程序時(shí)镊尺,需要提供一個(gè)用以跟蹤和管理這個(gè)程序的ApplicationMaster,它負(fù)責(zé)向ResourceManager申請(qǐng)資源并思,并要求NodeManger啟動(dòng)可以占用一定資源的任務(wù)鹅心。由于不同的ApplicationMaster被分布到不同的節(jié)點(diǎn)上,因此它們之間不會(huì)相互影響纺荧。
流程:
- 客戶端向RM中提交程序
- RM向NM中分配一個(gè)container旭愧,并在該container中啟動(dòng)AM
- AM向RM注冊(cè)颅筋,這樣用戶可以直接通過RM査看應(yīng)用程序的運(yùn)行狀態(tài)(然后它將為各個(gè)任務(wù)申請(qǐng)資源,并監(jiān)控它的運(yùn)行狀態(tài)输枯,直到運(yùn)行結(jié)束)
- AM采用輪詢的方式通過RPC協(xié)議向RM申請(qǐng)和領(lǐng)取資源议泵,資源的協(xié)調(diào)通過異步完成
- AM申請(qǐng)到資源后,便與對(duì)應(yīng)的NM通信桃熄,要求它啟動(dòng)任務(wù)
- NM為任務(wù)設(shè)置好運(yùn)行環(huán)境(包括環(huán)境變量先口、JAR包、二進(jìn)制程序等)后瞳收,將任務(wù)啟動(dòng)命令寫到一個(gè)腳本中碉京,并通過運(yùn)行該腳本啟動(dòng)任務(wù)
- 各個(gè)任務(wù)通過某個(gè)RPC協(xié)議向AM匯報(bào)自己的狀態(tài)和進(jìn)度,以讓AM隨時(shí)掌握各個(gè)任務(wù)的運(yùn)行狀態(tài)螟深,從而可以在任務(wù)失敗時(shí)重新啟動(dòng)任務(wù)
-
應(yīng)用程序運(yùn)行完成后谐宙,AM向RM注銷并關(guān)閉自己
YARN環(huán)境搭建
使用的版本為hadoop-2.6.0-cdh5.7.0
//修改配置文件,配置如下(配置文件在/etc/hadoop下)
mapred-site.xml界弧,在默認(rèn)情況下是沒有mapred-site.xml文件的凡蜻,只有mapred-site.xml.template文件,所以在修改配置之前需要復(fù)制一份
cp mapred-site.xml.template mapred-site.xml
vi mapred-site.xml
//添加配置
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>//配置計(jì)算框架運(yùn)行在yarn之上垢箕,為固定配置
</property>
yarn-site.xml:
vi yarn-site.xml
//添加配置
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>//nodemanager的services划栓,目前為固定配置,當(dāng)學(xué)習(xí)Spark時(shí)条获,其中有動(dòng)態(tài)資源調(diào)度忠荞,這里才需要修改
</property>
//啟動(dòng)YARN相關(guān)的進(jìn)程
sbin/start-yarn.sh
//驗(yàn)證
jps
ResourceManager
NodeManager
可訪問http://虛擬機(jī)IP:8088可以看到Y(jié)ARN的主界面。
//停止YARN相關(guān)的進(jìn)程
sbin/stop-yarn.sh
提交作業(yè)到Y(jié)ARN上執(zhí)行
在 share目錄為我們提供了MapReduce作業(yè)的案例帅掘,我們可以使用其中的作業(yè)在YARN上執(zhí)行钻洒。
提交mr作業(yè)到Y(jié)ARN上運(yùn)行,我的作業(yè)jar包路徑為:
/root/apps/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar
//使用hadoop jar命令可提交作業(yè)到Y(jié)ARN,具體命令為
hadoop jar jar包路徑 有效的程序名字 有效程序的參數(shù)
// 選用其中PI程序進(jìn)行運(yùn)行锄开,在執(zhí)行之前要記到啟動(dòng)hdfs和yarn
hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 2 3
//訪問yarn主界面可看到作業(yè)的執(zhí)行情況
分布式處理框架MapReduce/Spark
MapReduce概述
源于Google的MapReduce論文素标,Hadoop MapReduce是Google MapReduce的一個(gè)克隆版。MapReduce用于大規(guī)模數(shù)據(jù)集(通常大于1TB)的并行運(yùn)算萍悴,實(shí)現(xiàn)了Map和Reduce兩個(gè)功能头遭。MapReduce的思想是“分而治之”⊙⒂眨“分”是把復(fù)雜的任務(wù)分解為若干個(gè)“簡(jiǎn)單的任務(wù)”執(zhí)行计维,由map負(fù)責(zé)∷河瑁“簡(jiǎn)單的任務(wù)”指數(shù)據(jù)或計(jì)算規(guī)模相對(duì)于原任務(wù)要大大縮婿昊獭;就近計(jì)算实抡,即會(huì)被分配到存放了所需數(shù)據(jù)的節(jié)點(diǎn)進(jìn)行計(jì)算欠母;這些小任務(wù)可以并行計(jì)算欢策,彼此間幾乎沒有依賴關(guān)系。Reducer負(fù)責(zé)對(duì)map階段的結(jié)果進(jìn)行匯總赏淌。
特點(diǎn):
- 海量數(shù)據(jù)離線處理
- 橫向擴(kuò)展踩寇,而非縱向擴(kuò)展,平滑無縫的可擴(kuò)展性六水。
- 易開發(fā):用戶不用考慮進(jìn)程間的通信和套接字編程俺孙,已經(jīng)為我們封裝好框架。這一點(diǎn)只是相對(duì)于傳統(tǒng)來講掷贾,現(xiàn)在主流的Spark框架更為簡(jiǎn)單(Spark為本人知識(shí)盲區(qū)睛榄,后續(xù)學(xué)習(xí))。
- 易運(yùn)行:可運(yùn)行在廉價(jià)的硬件之上想帅。
MapReduce編程模型
MapReduce編程模型給出了分布式編程方法的5個(gè)步驟:
- 迭代场靴,遍歷輸入數(shù)據(jù),將其解析成key/value對(duì)博脑;
- 將輸入key/value對(duì)映射map成另外一些key/value對(duì)憎乙;
- 根據(jù)key對(duì)中間結(jié)果進(jìn)行分組(grouping)票罐;
- 以組為單位對(duì)數(shù)據(jù)進(jìn)行歸約叉趣;
- 迭代,將最終產(chǎn)生的key/value對(duì)保存到輸出文件中该押。
MapReduce架構(gòu)
和HDFS一樣疗杉,MapReduce也是采用Master/Slave的架構(gòu)
MapReduce包含四個(gè)組成部分,分別為Client蚕礼、JobTracker烟具、TaskTracker和Task
- Client 客戶端
每一個(gè) Job 都會(huì)在用戶端通過 Client 類將應(yīng)用程序以及配置參數(shù) Configuration 打包成 JAR 文件存儲(chǔ)在 HDFS,并把路徑提交到 JobTracker 的 master 服務(wù)奠蹬,然后由 master 創(chuàng)建每一個(gè) Task(即 MapTask 和 ReduceTask) 將它們分發(fā)到各個(gè) TaskTracker 服務(wù)中去執(zhí)行朝聋。 - JobTracker
JobTracke負(fù)責(zé)資源監(jiān)控和作業(yè)調(diào)度。JobTracker 監(jiān)控所有TaskTracker 與job的健康狀況囤躁,一旦發(fā)現(xiàn)失敗冀痕,就將相應(yīng)的任務(wù)轉(zhuǎn)移到其他節(jié)點(diǎn);同時(shí)狸演,JobTracker 會(huì)跟蹤任務(wù)的執(zhí)行進(jìn)度言蛇、資源使用量等信息,并將這些信息告訴任務(wù)調(diào)度器宵距,而調(diào)度器會(huì)在資源出現(xiàn)空閑時(shí)腊尚,選擇合適的任務(wù)使用這些資源。在Hadoop中满哪,任務(wù)調(diào)度器是一個(gè)可插拔的模塊婿斥,用戶可以根據(jù)自己的需要設(shè)計(jì)相應(yīng)的調(diào)度器劝篷。 -
TaskTracker
TaskTracker 會(huì)周期性地通過Heartbeat 將本節(jié)點(diǎn)上資源的使用情況和任務(wù)的運(yùn)行進(jìn)度匯報(bào)給JobTracker,同時(shí)接收J(rèn)obTracker 發(fā)送過來的命令并執(zhí)行相應(yīng)的操作(如啟動(dòng)新任務(wù)受扳、殺死任務(wù)等)携龟。TaskTracker 使用"slot"等量劃分本節(jié)點(diǎn)上的資源量。"slot"代表計(jì)算資源(CPU勘高、內(nèi)存等)峡蟋。一個(gè)Task 獲取到一個(gè)slot 后才有機(jī)會(huì)運(yùn)行,而Hadoop 調(diào)度器的作用就是將各個(gè)TaskTracker 上的空閑slot分配給Task 使用华望。slot分為Map slot 和Reduce slot 兩種蕊蝗,分別供Map Task 和Reduce Task 使用。TaskTracker 通過slot 數(shù)目(可配置參數(shù))限定Task 的并發(fā)度赖舟。
-Task
Task 分為Map Task 和Reduce Task 兩種蓬戚,均由TaskTracker 啟動(dòng)。HDFS 以固定大小的block 為基本單位存儲(chǔ)數(shù)據(jù)宾抓,而對(duì)于MapReduce 而言子漩,其處理單位是split。split 是一個(gè)邏輯概念石洗,它只包含一些元數(shù)據(jù)信息幢泼,比如數(shù)據(jù)起始位置、數(shù)據(jù)長(zhǎng)度讲衫、數(shù)據(jù)所在節(jié)點(diǎn)等缕棵。它的劃分方法完全由用戶自己決定。但需要注意的是涉兽,split 的多少?zèng)Q定了Map Task 的數(shù)目招驴,因?yàn)槊總€(gè)split 只會(huì)交給一個(gè)Map Task 處理。
MapReduce編程
通過wordcount詞頻統(tǒng)計(jì)分析案例入門枷畏。
這個(gè)程序能夠計(jì)算一個(gè)文本中相同單詞出現(xiàn)的次數(shù)别厘。
我們?cè)谥把菔居肑ava API操作HDFS的項(xiàng)目上編寫代碼。具體代碼及解釋如下
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* WordCount統(tǒng)計(jì)單詞數(shù)量
*
* @author zhiying.dong 2019/04/09 20:50
*/
public class WordCountApp {
/**
* 第一步是對(duì)文件進(jìn)行Map操作拥诡,繼承Mapper類復(fù)寫它的map方法即可
* 參數(shù)是4個(gè)触趴,前兩個(gè)LongWritable表示文件偏移量,Text表示文件的數(shù)量
* 后兩個(gè)Text表示每一個(gè)單詞袋倔,LongWritable為給每一個(gè)單詞做一個(gè)賦值1的操作雕蔽,而相同單詞出現(xiàn)次數(shù)的累加在Reduce中
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//把文件內(nèi)容轉(zhuǎn)換為字符串
String line = value.toString();
//通過空格分割文件文本(本人實(shí)驗(yàn)的文本單詞之間是以空格分開)
String[] words = line.split(" ");
for (String word : words) {
//為每個(gè)單詞進(jìn)行賦值操作,以map的形式輸出
context.write(new Text(word), one);
}
}
}
/**
* 第二步是對(duì)文件進(jìn)行Reduce操作宾娜,繼承Reducer類復(fù)寫它的reduce方法即可
* 參數(shù)是4個(gè)批狐,前兩個(gè)Text表示輸入map的key,在這里為單詞字符串,LongWritable表示輸入map的value嚣艇,這里表示在上一步為每個(gè)單詞的賦值
* 后兩個(gè)Text為輸出map的key,這里表示每一個(gè)單詞承冰,LongWritable為輸出map的value,這里表示每一個(gè)單詞出現(xiàn)次數(shù)的累加
*/
public static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
//注意這里reduce輸入map的value為Iterable<LongWritable> 類型食零,
MapRedece為我們自動(dòng)根據(jù)相同key進(jìn)行了分類
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
//相同key出現(xiàn)次數(shù)的疊加
for (LongWritable value : values) {
sum += value.get();
}
context.write(key, new LongWritable(sum));
}
}
public static void main(String[] args) throws Exception {
//初始化配置
Configuration configuration = new Configuration();
//定義job
Job job = Job.getInstance(configuration, "wordcount");
//設(shè)置job的執(zhí)行類
job.setJarByClass(WordCountApp.class);
//設(shè)置需要統(tǒng)計(jì)的文件路徑困乒,在執(zhí)行時(shí)傳入
FileInputFormat.setInputPaths(job, new Path(args[0]));
//設(shè)置map操作時(shí)的相關(guān)參數(shù)
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//設(shè)置reduce操作時(shí)的相關(guān)參數(shù)
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//設(shè)置輸出文件的路徑,在執(zhí)行時(shí)傳入
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
開發(fā)完成后
編譯:mvn clean package -DskipTests
上傳到服務(wù)器然后運(yùn)行(根據(jù)自己上傳的路徑進(jìn)行命令參數(shù)的調(diào)整)
hadoop jar /root/hadoop/lib/hadoop-train-1.0.jar WordCountApp hdfs://192.168.30.130:9020/hello.txt hdfs://192.168.30.130:9020/output/wc
運(yùn)行完畢后可在輸出文件中查看統(tǒng)計(jì)的結(jié)果(親測(cè)可以成功)
注意
:
相同的代碼和腳本再次執(zhí)行贰谣,會(huì)報(bào)錯(cuò)
security.UserGroupInformation:
PriviledgedActionException as:hadoop (auth:SIMPLE) cause:
org.apache.hadoop.mapred.FileAlreadyExistsException:
Output directory hdfs://92.168.30.130:9020/output/wc already exists
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException:
Output directory hdfs:/92.168.30.130:9020/output/wc already exists
在MR中娜搂,輸出文件是不能事先存在的。有兩種解決方式
1)先手工通過shell的方式將輸出文件夾先刪除
hadoop fs -rm -r /output/wc
2) 在代碼中完成自動(dòng)刪除功能: 推薦大家使用這種方式
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if(fileSystem.exists(outputPath)){
fileSystem.delete(outputPath, true);
System.out.println("output file exists, but is has deleted");
}
Spark簡(jiǎn)單入門
Spark概述
MapReduce框架局限性
- 僅支持Map和Reduce兩種操作吱抚,提供給用戶的只有這兩種操作
- 處理效率低效:Map中間結(jié)果寫磁盤百宇,Reduce寫HDFS,多個(gè)MR之間通過HDFS交換數(shù)據(jù)
- 任務(wù)調(diào)度和啟動(dòng)開銷大:mr的啟動(dòng)開銷一秘豹,客戶端需要把應(yīng)用程序提交給resourcesManager携御,resourcesManager去選擇節(jié)點(diǎn)去運(yùn)行,快的話幾秒鐘既绕,慢的話1分鐘左右啄刹,開銷二,maptask和reducetask的啟動(dòng)凄贩,當(dāng)他倆被resourcesManager調(diào)度的時(shí)候誓军,會(huì)先啟動(dòng)一個(gè)container進(jìn)程,然后讓他倆運(yùn)行起來怎炊,每一個(gè)task都要經(jīng)歷jvm的啟動(dòng)谭企,銷毀等廓译,這兩點(diǎn)就是mr啟動(dòng)開銷
- 無法充分利用內(nèi)存
- Map端和Reduce端均需要排序:map和Reduce是都需要進(jìn)行排序的评肆,但是有的程序完全不需要排序(比如求最大值求最小值,聚合等)非区,所以就造成了性能的低效
- 不適合迭代計(jì)算(如機(jī)器學(xué)習(xí)瓜挽、圖計(jì)算等),交互式處理(數(shù)據(jù)挖掘)和流式處理(點(diǎn)擊日志分析):因?yàn)槿蝿?wù)調(diào)度和啟動(dòng)開銷大征绸,所以不適合交互式處理久橙,比如你啟動(dòng)要一分鐘,任務(wù)調(diào)度要幾分鐘管怠,我得等半天淆衷,這不適合
- MapReduce編程不夠靈活:map和reduce輸入輸出類型格式限制死了,可嘗試scala函數(shù)式編程語言
- MapReduce采用了多進(jìn)程模型渤弛,而Spark采用了多線程模型:運(yùn)行更快祝拯,更加節(jié)約資源。
Spark特點(diǎn)
- 高效(比MapReduce快10~100倍)
- 內(nèi)存計(jì)算引擎,提供Cache機(jī)制來支持需要反復(fù)迭代計(jì)算或者多次數(shù)據(jù)共享佳头,減少數(shù)據(jù)讀取的IO開銷鹰贵;DAG引擎,這種引擎的特點(diǎn)是康嘉,不同任務(wù)之間互相依賴碉输,減少多次計(jì)算之間中間結(jié)果寫到HDFS的開銷;使用多線程池模型來減少task啟動(dòng)開稍(特指MR中每個(gè)task都要經(jīng)歷JVM啟動(dòng)運(yùn)行銷毀操作亭珍,Spark的做法是敷钾,啟動(dòng)一些常駐的進(jìn)程,在進(jìn)程內(nèi)部會(huì)有多個(gè)線程去計(jì)算task肄梨,來一個(gè)task闰非,計(jì)算task,并回收線程峭范,以此循環(huán)财松,這樣就沒有JVM的開銷),shuffle過程中避免不必要的sort操作以及減少磁盤IO操作
- 易用:提供了豐富的API纱控,支持Java辆毡,Scala晶丘,Python和R四種語言
代碼量比MapReduce少2~5倍 - 與Hadoop集成:讀寫HDFS/Hbase與YARN集成
環(huán)境搭建
//在安裝之前先要安裝maven和scala環(huán)境绪爸。由于在之前本人虛擬機(jī)上已經(jīng)安裝了maven所以跳過maven的安裝。
// 解壓scala安裝包
tar -zxvf scala-2.11.8.tgz -C ../apps/
//配置環(huán)境變量
pwd //查看路徑
/root/apps/scala-2.11.8
vi ~/.bash.profile
//添加配置
export SCALA_HOME=/root/apps/scala-2.11.8
export PATH=$SCALA_HOME/bin:$PATH
//使配置生效
source ~/.bash.profile
//檢驗(yàn)是否安裝成功
scala -version
//出現(xiàn)以下信息徒恋,說明安裝成功
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL
//解壓spark
tar -zxvf spark-2.1.0-bin-2.6.0-cdh5.7.0.tgz -C ../apps
//進(jìn)入安裝目錄的bin目錄尔店,通過spark-shell可運(yùn)行spark
//若不熟悉命令眨攘,可用以下方式
./spark-shell --help
--master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
//表示在本地以兩個(gè)線程啟動(dòng)
./spark-shell --master local[2]
local:默認(rèn)一個(gè)線程
local[n]:n個(gè)線程
local[*]:全部線程
Spark編程
這里重新實(shí)現(xiàn)在上文實(shí)現(xiàn)的wordcount程序,代碼如下
sc.textFile("file://root/data/hello.txt").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).collect
沒有看錯(cuò)嚣州,只是一行代碼鲫售。因?yàn)镾park封裝了很多方便的函數(shù),相比于MapReduce開發(fā)更為方便该肴。這也是它成為主流的原因情竹。
Hadoop整合Spring-boot的使用
Spring Hadoop概述
Spring for hadoop提供了統(tǒng)一的配置模式以簡(jiǎn)化Apache Hadoop的開發(fā),并也易于調(diào)用HDFS匀哄、Mapreduce秦效、Pig和Hive的API。它還提供了與Spring生態(tài)圈的其他項(xiàng)目集成的能力涎嚼,例如Spring Intergration 和Spring Batch阱州,讓你可以優(yōu)雅地開發(fā)大數(shù)據(jù)的提取/導(dǎo)出和Hadoop工作流項(xiàng)目。
Spring Hadoop開發(fā)環(huán)境搭建及訪問HDFS
第一步加入依賴
<properties>
<java.version>1.8</java.version>
<hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop-boot</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
</dependencies>
<!--配置下載的倉(cāng)庫(kù)-->
<repositories>
<repository>
<id>cloudera</id>
<url>http://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
在配置文件中配置HDFS地址
spring:
hadoop:
fs-uri: hdfs://192.168.30.130:8092
在代碼中訪問HDFS
import org.apache.hadoop.fs.FileStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.hadoop.fs.FsShell;
import org.springframework.stereotype.Component;
/**
* Spring Boot操作HDFS
* @author zhiying.dong 2019/04/13 16:32
*/
@Component
public class SpringBootHadoopTest implements CommandLineRunner {
//自動(dòng)注入FsShell法梯,其中封裝了HDFS的訪問函數(shù)
@Autowired
private FsShell fsShell;
@Override
public void run(String... strings) throws Exception {
System.out.println("=========run start============");
//獲取根目錄下的文件列表
for (FileStatus fileStatus : fsShell.lsr("/")) {
System.out.println(">" + fileStatus.getPath());
}
System.out.println("===========run end===========");
}
}
執(zhí)行代碼后發(fā)現(xiàn)沒有權(quán)限訪問HDFS苔货,本人解決的方式為在項(xiàng)目啟動(dòng)時(shí)模擬用戶,網(wǎng)上有多種解決方式,可自行選擇
VM options:
-DHADOOP_USER_NAME=虛擬機(jī)用戶名
總結(jié)
大數(shù)據(jù)已經(jīng)涉及到生活的方方面面蒲赂,所以學(xué)習(xí)和了解大數(shù)據(jù)知識(shí)是很有必要的阱冶。Hadoop生態(tài)系統(tǒng)為大數(shù)據(jù)提供了開源的分布式存儲(chǔ)和分布式計(jì)算的平臺(tái),而這一章只是對(duì)其中基礎(chǔ)的知識(shí)進(jìn)行了入門滥嘴,后續(xù)還需要深入的學(xué)習(xí)木蹬。