環(huán)境
環(huán)境使用:hadoop3.1玉罐,Python3.6,ubuntu18.04
Hadoop是使用Java開(kāi)發(fā)的脐雪,推薦使用Java操作HDFS。
有時(shí)候也需要我們使用Python操作HDFS恢共。
本次我們來(lái)討論如何使用Python操作HDFS战秋,進(jìn)行文件上傳,下載讨韭,查看文件夾脂信,以及如何使用Python進(jìn)行MapReduce編程。
使用Python操作HDFS
首先需要安裝和導(dǎo)入hdfs
庫(kù)透硝,使用pip install hdfs
狰闪。
1. 連接并查看指定路徑下的數(shù)據(jù)
from hdfs import *
client = Client('http://ip:port') #2.X版本port 使用50070 3.x版本port 使用9870
client.list('/') #查看hdfs /下的目錄
2. 創(chuàng)建目錄
client.makedirs('/test')
client.makedirs('/test',permision = 777 ) # permision可以設(shè)置參數(shù)
3. 重命名、刪除
client.rename('/test','123') #將/test 目錄改名為123
client.delete('/test',True) #第二個(gè)參數(shù)表示遞歸刪除
4.下載
將/test/log.txt
文件下載至/home
目錄下濒生。
client.download('/test/log.txt','/home')
5. 讀取
with client.read("/test/[PPT]Google Protocol Buffers.pdf") as reader:
print reader.read()
其他參數(shù):
- read(*args, **kwds)
- hdfs_path:hdfs路徑
- offset:設(shè)置開(kāi)始的字節(jié)位置
- l- ength:讀取的長(zhǎng)度(字節(jié)為單位)
- buffer_size:用于傳輸數(shù)據(jù)的字節(jié)的緩沖區(qū)的大小埋泵。默認(rèn)值設(shè)置在HDFS配置。
- encoding:指定編碼
- chunk_size:如果設(shè)置為正數(shù)罪治,上下文管理器將返回一個(gè)發(fā)生器產(chǎn)生的每一chunk_size字節(jié)而不是一個(gè)類似文件的對(duì)象
- delimiter:如果設(shè)置丽声,上下文管理器將返回一個(gè)發(fā)生器產(chǎn)生每次遇到分隔符。此參數(shù)要求指定的編碼觉义。
- progress:回調(diào)函數(shù)來(lái)跟蹤進(jìn)度雁社,為每一chunk_size字節(jié)(不可用,如果塊大小不是指定)晒骇。它將傳遞兩個(gè)參數(shù)霉撵,文件上傳的路徑和傳輸?shù)淖止?jié)數(shù)。稱為一次與- 1作為第二個(gè)參數(shù)洪囤。
6.上傳數(shù)據(jù)
將文件上傳至hdfs的 /test
下徒坡。
client.upload(‘/test’,’/home/test/a.log’)
Python-MapReduce
編寫mapper
代碼,map.py
:
import sys
for line in sys.stdin:
fields = line.strip().split()
for item in fields:
print(item + ' ' + '1')
編寫reducer
代碼瘤缩,reduce.py
:
import sys
result = {}
for line in sys.stdin:
kvs = line.strip().split(' ')
k = kvs[0]
v = kvs[1]
if k in result:
result[k]+=1
else:
result[k] = 1
for k,v in result.items():
print("%s\t%s" %(k,v))
添加測(cè)試文本崭参,test1.txt
:
tale as old as time
true as it can be
beauty and the beast
本地測(cè)試執(zhí)行map
代碼:
cat test1.txt | python map.py
結(jié)果:
tale 1
as 1
old 1
as 1
time 1
true 1
as 1
it 1
can 1
be 1
beauty 1
and 1
the 1
beast 1
本地測(cè)試執(zhí)行reduce
代碼:
cat test1.txt | python map.py | sort -k1,1 | python reduce.py
執(zhí)行結(jié)果:
and 1
be 1
old 1
beauty 1
true 1
it 1
beast 1
as 3
can 1
time 1
the 1
tale 1
在Hadoop平臺(tái)執(zhí)行map-reduce
程序
本地測(cè)試完畢,編寫腳本在HDFS中執(zhí)行程序
腳本:run.sh
(請(qǐng)根據(jù)本機(jī)環(huán)境修改)
HADOOP_CMD="/app/hadoop-3.1.2/bin/hadoop"
STREAM_JAR_PATH="/app/hadoop-3.1.2/share/hadoop/tools/lib/hadoop-streaming-3.1.2.jar"
INPUT_FILE_PATH_1="/py/input/"
OUTPUT_PATH="/output"
$HADOOP_CMD fs -rmr-skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py" \
-reducer "python reduce.py" \
-file ./map.py \
-file ./reduce.py \
添加執(zhí)行權(quán)限chmod a+x run.sh
款咖;
執(zhí)行測(cè)試:bash run.sh
何暮,查看結(jié)果:
練習(xí)
1. 文件合并去重
輸入文件file1
的樣例如下:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
輸入文件file2
的樣例如下:
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y
根據(jù)輸入文件file1
和file2
合并得到的輸出文件file3
的樣例如下:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x
對(duì)于兩個(gè)輸入文件奄喂,即文件file1
和文件file2
,請(qǐng)編寫MapReduce
程序海洼,對(duì)兩個(gè)文件進(jìn)行合并跨新,并剔除其中重復(fù)的內(nèi)容,得到一個(gè)新的輸出文件file3
坏逢。
為了完成文件合并去重的任務(wù)域帐,你編寫的程序要能將含有重復(fù)內(nèi)容的不同文件合并到一個(gè)沒(méi)有重復(fù)的整合文件,規(guī)則如下:
- 第一列按學(xué)號(hào)排列是整;
- 學(xué)號(hào)相同肖揣,按
x,y,z
排列。
2. 挖掘父子關(guān)系
輸入文件內(nèi)容如下:
child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma
輸出文件內(nèi)容如下:
grandchild grandparent
Steven Alice
Steven Jesse
Jone Alice
Jone Jesse
Steven Mary
Steven Frank
Jone Mary
Jone Frank
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse
你編寫的程序要能挖掘父子輩關(guān)系浮入,給出祖孫輩關(guān)系的表格龙优。規(guī)則如下:
- 孫子在前,祖父在后
- 孫子相同事秀,祖父的名字按照
A-Z
排列