2016-01-23 Hadoop the Definitive 4th

摘自:http://staticor.io/post/hadoop/2016-01-23hadoop-definitive-guide-note#toc_7

Developing a MapReduce ApplicationRunning on a ClusterLaunching a Job
MapReduce Web UI

Tuning a JobMapReduce Workflows

How MapReduce WorksJob Submission
Job Intialization
Task Assignment
Task ExecutionStreaming

Progress and Status UpdatesJOB Completion

Failures

Shuffle and Sort

MapReduce Types and FormatsTypes
Default MapReduce Jobstreaming

Input FormatsInputSplitFileInputFormat
split size 公式
Small files and CombineFileInputFormat
Preventing splitting
Processing a whole file as a record

Text Input
Multiple Inputs

Output FormatsMultiple Outputs

MapReduce FeaturesCounterTask counters
Job counters

sortingPREPARATION
PARITIAL SORT
TOTAL SORT
Secondary SortSTREAMING

joining datasets

Developing a MapReduce Application
Writing a program in MapReduce follows a certain pattern. Before we start writing a MapReduce program, however, we need to set up and configure the development.
Configuration class ( org.apache.hadoop.conf package)

Running on a Cluster
In a distributed setting, things are a little more complex. For a start, a jobs' classes must be packaged into a jobJAR file to send to the cluster. Hadoop will find the job JAR automactically by searching for the JAR on the drivers' classpath that contains the class set in the setJarByClass() method ( on JobConf or Job).
The client class pathuser's client-side classpath set by hadoop jar is made up of:

  • The job Jar file- Any JAR files in the lib directory of the job JAR file, and the classes direcotry ( if present)- The classpath defined by HADOOP_CLASSPATH, if set.

Launching a Job
$ hadoop jar hadoop-examples.jar v2.MaxTemperatureDriver \ -conf conf/hadoop-cluster.xml input/ncdc/all max-temp

Job, Task, and Task Attempt IDs

job ID 根據(jù) YARN application ID 生成. YARN application ID 由 YARN RM 生成 (與 RM 的composed time, counter maintained, 以及后面添加的app唯一標(biāo)識(shí)序號(hào) 有關(guān)), app ID 一般長得樣子是application_141041231231_0003
, 對應(yīng)的jobID 就是把這前的application 替換為 job_. ==> job_141041231231_0003一個(gè)Job再細(xì)分為幾個(gè)Task, 它們的ID是 將job_前綴替換為 task ==> task_141041231231_0003_m_00003. 前后綴加上任務(wù)編號(hào), 以區(qū)別不同的task.
MapReduce Web UI
YARN Page: resource-manager-host:8088
resource manager page
job history page
mapreduce job page

Tuning a Job
" Can I Make it run faster" may come out of our minds, after our job is working.
You should think about these profile in checklist before you start trying to optimize at the task level.
Number of mappers 如果每個(gè)mapper只運(yùn)行幾秒就停了, 你應(yīng)該讓他們運(yùn)行更長時(shí)間, 1分鐘甚至更長. 對于small files 去看看 CombineFileInputFormat (本博客中有單獨(dú)文章提及)
Number of reducers 每個(gè)reducer的建議時(shí)長至少是5分鐘, 產(chǎn)出數(shù)據(jù)大小應(yīng)是blocksize水平的. 書中后面會(huì)單獨(dú)講解.

Combiners 在 mapper-reducer 之間的shuffle過程中能否使用某Combiner 提升效率

Intermidiate compression map輸出進(jìn)行壓縮可能提升效率

Custom serialization

Shuffle tweaks

MapReduce Workflows
這里繼 求當(dāng)年最高溫度之后再嘗試尋找一個(gè)新的例子 -- 求一年366天中, 歷年在多種天氣狀態(tài)下的平均最大溫度. 1.1號(hào)為例, 先取下雨天氣的天氣, 然后求1901年至2000年 每年1.1號(hào)的下雨天氣的最大溫度, 找到最大值, 再對各類天氣狀態(tài)的最大求均值.
這個(gè)分析需要進(jìn)行分解:
計(jì)算 (日期-無年, 天氣狀態(tài)) 為key下的最大溫度
求上面輸出的key下平均值

結(jié)合之前的分析, 能看到這里的任務(wù)要分2步或以上的MapReduce完成.
How MapReduce Works
MapReduce Job 的Lifetime:
hadoop客戶端, 用戶提交MapReduce Job
YARN RM(resource manager) 來分配cluster上的資源
YARN NM(NameNode) 準(zhǔn)備用于運(yùn)算的containers
MapRededuce application master, 在job運(yùn)行過程中的"協(xié)調(diào)人".
HDFS share job間的文件


Job Submission
submit() 完成: 向 RM 申領(lǐng)一個(gè)新的 application Id, 用來指向給MapReduce Job ID; 檢測Job, 例如輸出的目錄(output files)是否已經(jīng)存在, 如果已經(jīng)存在則會(huì)報(bào)錯(cuò); 運(yùn)行Input的Split, 若這過程有問題, 例如Input Path 不存在, 則會(huì)報(bào)錯(cuò)返回給 MapReduce Program; 對運(yùn)行Job時(shí)需要的文件: JAR file, 配置文件, 計(jì)算好的 Input Splits, 進(jìn)行copy, 待JobId分配成功后將放入到以Id命名的Share directory. 在集群運(yùn)行該JOB時(shí)會(huì)有大量NM對Jar 進(jìn)行訪問, 因此其copy的量會(huì)較大, 程序中用 mapreduce.client.submit.file.replication來的控制, 默認(rèn)因子設(shè)置為10. 提交成功.
Job Intialization
RM 接收到 某項(xiàng) submit的請求后, 會(huì)將由 YARN 的 scheduler 處理該請求 -- 給其分配container(RM 啟動(dòng)任務(wù), NM管理的地方). 該項(xiàng)MapReduce Job的直接master其實(shí)是 Java application 中的主類 MRAppMaster , 這個(gè)類中會(huì)創(chuàng)建一系列的bookkeeping objects用來跟蹤記錄Job的處理進(jìn)度. 因?yàn)镴ob是會(huì)以被再細(xì)分為若干項(xiàng)Task, 所以每項(xiàng)Task都會(huì)單獨(dú)向MRAppMaster匯報(bào)其完成情況. 另外, 它還會(huì)接收到用集群對輸入切分好的Input Splits, 然后為每Input Split創(chuàng)建mapper task, 同樣也完成reducer的初始化. (reducer個(gè)數(shù)由 mapreduce.job.reducers指定).
uberized
application master 決定如何運(yùn)行MapReduce job下的各項(xiàng)task. 一般來說, 每一項(xiàng)task單獨(dú)被申請各自container, 等task執(zhí)行完畢, container被NM回收. 這種情況下, 每個(gè)JVM僅僅執(zhí)行一次task. 如果Job太小, application master 可能會(huì)用相同的JVM執(zhí)行多個(gè)任務(wù), 實(shí)現(xiàn)JVM的重用 -- 每個(gè)task依次在這個(gè)container里的JVM里順序執(zhí)行, 直到所有task被執(zhí)行完畢. 這樣master不必申請多次, 達(dá)到了uberlized 的效果.

application master 在 OutputCommitter上 執(zhí)行 setupJob() 方法, 為task的輸出創(chuàng)建臨時(shí)woking space及輸出目錄. 詳情還要查詢 Output Committers
Task Assignment
non-uber task. application master 開始向 RM 為他創(chuàng)建好的 mapper/reducer申請container 資源完成任務(wù). mapper的請求優(yōu)先級(jí)會(huì)高于reducer -- 這是因?yàn)樵趫?zhí)行sort, reducer 任務(wù)之前 所有mapper結(jié)果要完成. 對于 reduce的請求, 至少要等 5% 的map任務(wù)完成 才會(huì)開始接受受理. Reduce 任務(wù)可以在集群的任意位置執(zhí)行, 但map task 受到數(shù)據(jù)局部性(data locality)制約. map, reduce task默認(rèn)的分配內(nèi)存是 1024MB. 該值的properties: mapreduce.map.memory.mb mapreduce.reduce.memory.mb, mapreduce.map.cpo.vcores, mapreduce.reduce.cpu.vcores
data locality , 在quora中找到了一個(gè)答案中的解釋:
Data locality is a core concept of Hadoop, based on several assumptions around the use of MapReduce. In short, keep data on disks that are close to the CPU and RAM that will be used to process and store it. If you had a cluster of 100 machines, and needed to read a selection of records, the records should be adjacent on disk, fit into RAM and be processable (e.g sorted or computed) using that machine's CPU.

Task Execution
Streaming


Progress and Status Updates
user有必要獲得他提交的job目錄的運(yùn)行情況. 包括每個(gè)task的status(運(yùn)行中, 成功, 失敗), Mapper和Reducer的完成進(jìn)度. Mapper的完成度就是和task完成個(gè)數(shù)比例有關(guān). Reducer則要復(fù)雜一些, 涉及到shuffle, reduce.

JOB Completion
application master, task containers 負(fù)責(zé)打掃, 清理. 執(zhí)行OutputCommitter 's commitJob, 讓用戶看到預(yù)期的歷史記錄, 服務(wù)器使用信息.
Failures
任務(wù)失敗, 遇到這種情況很正常, 我們應(yīng)該學(xué)習(xí)怎樣避免失敗. 或者說如何解決這樣的問題, 并避免.
失敗的維度: task, application master, node manager, resource manger.
task failure 正常來說, 是最易看到的一類錯(cuò)誤, 如果一個(gè)map/reduce task 拋出 runtime型的異常, JVM將會(huì)在其exit之前先把該信息報(bào)告給application master. app master 標(biāo)記這項(xiàng)task為 failed. 然后釋放container資源, 供下一個(gè)task使用.

另, 對于一個(gè)failed的task, app master 會(huì)再給他4次重試的機(jī)會(huì)(4 可進(jìn)行修改, 參數(shù)為:mapreduce.map.maxattempts) app master 會(huì)在重新調(diào)度的時(shí)候盡可能的使用與先前不同的Node來執(zhí)行這個(gè)失敗的任務(wù) 4次不行的話 則整個(gè)Job標(biāo)記為失敗.
如果失敗的task 超過一定個(gè)數(shù), 則會(huì)激活 job failure的改變. Profiles: mapreudce.map.failures.maxpercent, mapreduce.reduce.failures.maxpercent (也就是說失敗的個(gè)數(shù)小于這個(gè)百分比, 那么 appication將會(huì)繼續(xù)執(zhí)行其它的task.
application master failure

node manager failure 當(dāng)node manager fail RM上看不到其返回的heartbeat. (比如10分鐘內(nèi)看不到, 時(shí)間配置:yearn.resourcemanager.nm.liveness-monitor.expiry-interval-ms)

resource manager failure

這個(gè)問題就比較嚴(yán)重了. 暫時(shí)先不想了解.
Shuffle and Sort

mapper side mapper的輸出經(jīng)過排序(按key)后傳給reducer, 在這個(gè)排序并轉(zhuǎn)移數(shù)據(jù)的過程, 叫做shuffle.

注, 上圖中能看出, map的輸出結(jié)果不是直接寫到disk中, 而是先到一個(gè)內(nèi)存中的緩存區(qū)(memory buffer), 這個(gè)默認(rèn)大小是100MB(可通過 mapreduce.task.io.sort.mb 參數(shù)配置), 當(dāng)緩存區(qū)的空間達(dá)到一定水平(默認(rèn)是 80%mapreduce.map.sort.splill.percent 0.80) , 將會(huì)啟動(dòng)spill寫到disk. (map持續(xù)向buffer中寫 , 不會(huì)停止) , 要是 buffer達(dá)到100%了, 則map的輸出則會(huì)暫停, 直到spill完成. 不過還要注意的就是, 寫向disk之前, data也會(huì)按照reducer的要求進(jìn)行partition. 按照給的或默認(rèn)的方式, 在每個(gè)partition內(nèi)執(zhí)行combine.
compress map output通過來說, 對map的結(jié)果進(jìn)行壓縮處理, 將會(huì)提高Job的效率. 因?yàn)檫@樣會(huì)節(jié)約磁盤空間, 減少向reducer轉(zhuǎn)移的數(shù)據(jù)量大小. 默認(rèn), 是不進(jìn)行壓縮. 但可通過mapreduce.map.output.compress = true.

reducer side

MapReduce Types and Formats
Types
map: (K1, V1) -> list(K2, V2)reduce: (K2, list(V2)) -> list(K3, V3)
若增加了combiner
map: (K1, V1) -> list(K2, V2)combiner: (K2, list(V2)) -> list(K2, V2)reduce: (K2, list(V2)) -> list(K3, V3)
Input types: 輸入的格式被 input format 指定, 例如: TextInputFormat 指定 是

Default MapReduce Job
如果MapReduce 沒有 mapper, reducer會(huì)是怎樣?

public MinimalMapReduce extends Configured implements Tool{ @Override public int run(String[] args)throws Exception { if (args.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output> \n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Job job = new Job(getConf()); job.setJarByClass(getClass()); FileInputFormat.addInputPath(job, new Path(args[0]); FileoutputFormat.addOutputPath(job, new Path(args[1]); return job.waitForCompletion(true) ? 0 : 1 ; } public static void main(String[] args ) throws Exception { int exitCode = ToolRunner.run(new MinimalMapReduce(), args); System.exit(exitCode); }}


每行是一個(gè)record(key,value分別是 line's offset + line ) 組成. 下面是MapReduce 的 driver程序, 使用精確的配置參數(shù):
public class MinimalMapReduceWithDefaults extends Configured implements Tool { @Override public int run(String[] args) throws Exception{ Job job = JobBuilder.parseInputAndOuput(this, getConf(), args); if (job == null) { return -1; } job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(Mapper.Class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); job.setReducerClass(Reducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); return job.waitForCompletion(true)? :0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args); System.exit(exitCode); }}

以上就是MapReduce 一個(gè)Job的框架代碼. 默認(rèn)的input format 是 TextInputFormat , 提供了
keys the offset of the beginning of the line in the file.

values the line of text

mapper的泛用型定義
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); }}

默認(rèn)partitioner : HashPartitioner 根據(jù) map keyout 的key -hash值 & Integer.Max_VALUE, mod % reduce個(gè)數(shù), 進(jìn)行partition. 原理是讓每個(gè)reducer處理的 mapOutput key的種類數(shù)是even的. (特殊情況當(dāng)然也能想象: 某幾種output key的個(gè)數(shù)非常多, 則就造成 個(gè)別reducer 處理的數(shù)據(jù)量非常非常大)
public class HashPartitioner<K, V> extends Partitioner<K, V> { public int getPartition(K key, V value, int numReduceTasks){ return (key.hashCode() & Integer.Max_VALUE ) % numReduceTasks; } }

Choose the number of reducers
很多新人(說我呢) 都覺得reducer個(gè)數(shù)越多越好 -- 這樣對于map/reduce之間的數(shù)據(jù)流有好處. 選擇合適的reducer個(gè)數(shù)也不是一件容易的事. 使用多的reducer固然增大并行化, 讓每個(gè)reducer處理的數(shù)據(jù)量減少. 然而, 這樣你會(huì)得到很多的小文件 -- 相對來說, 把這種文件控制在一定水平內(nèi)才是最優(yōu)的策略. 通常來說, 一個(gè)reducer的處理時(shí)間控制在5分鐘左右, 產(chǎn)出的數(shù)據(jù)量大小應(yīng)該是和HDFS的blocksize相當(dāng)?shù)?

reducer generic type
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException { for (VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); } }}

在reducer處理之前數(shù)據(jù)會(huì)先經(jīng)過shuffle的排序.
streaming


Input Formats
為了處理輸入數(shù)據(jù)更加效率, Hadoop也提供了不僅僅TextInputFormat一種方式. 為了講的更詳盡一點(diǎn), 還得從它的繼承和父類開始.
InputSplit
Hadoop里對于輸入的分割, 定義了InputSplit這個(gè)抽象類, 表示一個(gè)mapper處理的輸入數(shù)據(jù), 其中有2個(gè)抽象方法需要實(shí)現(xiàn):
public abstract class InputSplit{ public abstract long getLength() throws IOException, InterruptedException; public abstract String[] getLocations() throws IOException, InterruptedException; }

這里2個(gè)方法功能也非常簡單, 目的是為了獲得輸入的字節(jié)長度大小和位置信息. 位置用來分割時(shí)將map task處理的數(shù)據(jù)更接近, 大小信息方便將較大的先被處理, 這樣方便減少job的運(yùn)行時(shí)間. (貪心算法)
MapReduce里用InputFormat完成對InputSplits的創(chuàng)建. 也是一個(gè)抽象類:
public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; }

這里的getSplits()方法對輸入進(jìn)行分割, 然后把結(jié)果發(fā)送到application master, am分配給mapper. mapper將會(huì)使用 createRecordReader() 實(shí)現(xiàn)的結(jié)果, 獲得Split的結(jié)果 -- RecordReader, 原來輸入中record的數(shù)據(jù)流. 下面是mapper類中run()方法, 完成對 record流的解析處理.
public void run(Context context) throws IOException, InterruptedException{ setup(context) while (context.nextKeyValue()){ map(context.getCurrentKey(), context.getCurrentValue(), context); cleanup(context); }}

FileInputFormat
FileInputFormat 是對 InputFormat類的繼承, 用于對文件輸入的指定, 它能干兩件事: 1. 定義文件在Job中的輸入格式; 2. 完成文件的Split. 不過一般的, 在實(shí)際工作是使用的它的子類. 見下圖:



input paths

FileInputFormat 首先對輸入的路徑定義了幾個(gè)常用方法:
public static void addInputPath(Job job, Path path)public static void addInputPaths(Job job, String commaSeparatedPaths)public static void setInputPath(Job job, Path ... input Paths)public static void setInputPaths(Job job, String commaSeparatedPaths)

add~方法用來將一個(gè)路徑或一組路徑添加到輸入的列表中. set~方法則是用新的參數(shù)替換掉原來已有的輸入列表. (比如add 加了3個(gè), set之后加的3個(gè)將 不再)
注, 這里路徑支持 glob pattern
注, 雖然用戶沒有自己指定, 但FileInputFormat將自動(dòng)過濾那些隱藏文件(以 . 或者 _ 開頭的文件)
FileInputFormat input splits

FileInputFormat 如何產(chǎn)生input splits> ? 它只能split那些"large" files. 這里大小的定義是和HDFS block 大小有關(guān). 當(dāng)然這里也有知道有這幾個(gè)property

  • mapreduce.input.fileinputformat.split.minsize 最小分割值, 默認(rèn)為1- mapreduce.input.fileinputformat.split.maxsize 最大分割值, 默認(rèn)為 Long.MAX_VALUE- dfs.blocksize long 一般是128 MB(即為 134217728)

最小分割常常是1個(gè)byte, 有時(shí)有的format 會(huì)定義一個(gè)下界. Application 可能也會(huì)對minSplitSize進(jìn)行設(shè)置, 把它設(shè)為一個(gè)大于HDFS block 的值, 但這未必是一個(gè)好方式. 最大分割, 只有當(dāng)這個(gè)值小于 block size才會(huì)起作用.
split size 公式
max(minimumSize, min(maximumSize, blockSize))
默認(rèn)情況: minimumSize < blockSize < maximumSize
Small files and CombineFileInputFormat
HDFS 是更擅長于處理個(gè)數(shù)少, 而塊頭大的數(shù)據(jù)文件, 相較于個(gè)數(shù)多,而卻很小的小文件簇來說.
如果是面臨著非常多的小文件, 比如10000個(gè)大小均小于10MB的輸入文件群, 再直接用FileInputFormat就不合適了 -- 它是對每個(gè)文件進(jìn)行分割.
為什么說小文件太多 HDFS反而應(yīng)付不好>>?MapReduce工作模式對集群上磁盤間的文件轉(zhuǎn)移要求很高, 小文件過多, 無形之中加大了對文件的seek工作. 再者, 太多的小文件加大了namenode的管理mapping的工作. 一個(gè)可取的策略是將這些小文件通過合并形成sequencedfiles -- key作為文件名, value為文件內(nèi)容. 但要是目前已經(jīng)有這些小文件了, 這時(shí)應(yīng)考慮使用 CombineFileInputFormat.

Preventing splitting
有時(shí)我們想對一整個(gè)文件作為一個(gè)Input SPlit, 而不想切分. 方法1: 將minimum split size 設(shè)置的非常大. 方法2: 對FileInputFormat實(shí)現(xiàn)的子類 重寫它的 isSplittable()方法. 例子
import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;public class NonSplittableTextInputFormat extends TextInputFormat{ @Override protected boolean isSplitable(JobContext context, Path, file){ return false; }}

Processing a whole file as a record
有些特殊情況, 要求我們以多個(gè)目錄中各個(gè)文件作為record. (比如某個(gè)時(shí)間戳下產(chǎn)生的數(shù)據(jù), 以時(shí)間戳命名) 對于這樣的場景, 可使用 WholeFileInputFormat
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { @Override protected boolean isSplitable(JobContext context, Path file){ return false; } @Override publick RecordReader<NullWrtable, BytesWritable> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeFileRecordReader reader = new WholeFileRecordReader(); reader.initialize(split, context); return reader; }}

注, 由于整個(gè)文件內(nèi)容作為這里record的value, 因此肯定是不可分的. 同之前, 重寫isSplitable()方法.
The RecordReader used by WholeFileInputFormat for reading a whole file as a record

class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {private FileSplit fileSplit;private Configuration conf;private BytesWritable value = new BytesWritable();private boolean processed = false; @Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { byte[] contents = new byte[(int) fileSplit.getLength()]; Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream in = null; try { in = fs.open(file); IOUtils.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); }finally{ IOUtils.closeStream(in); } processed = true; return true } return false;}@Overridepublic NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get();}@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException { return value;}@Overridepublic float getProgress() throws IOException{ return processed ? 1.0f : 0.0f;}@Override public void close() throws IOException { //do nothing}}

Text Input
TextInputFormat 示例文本 共四行文本

On the top of the Crumpetty Tree The Quangle Wangle sat, But his face you could not see, On account of his Bea ver Hat.

經(jīng)TextInputFormat, 得到的 對是這樣的對:

(0, On the top of the Crumpetty Tree) (33, The Quangle Wangle sat,) (57, But his face you could not see,) (89, On account of his Bea ver Hat.)

The Relationship Between Input Splits and HDFS Blocks
以這里的record形式(一行一行的文本)為例, HDFS中的block并不會(huì)考慮太多 - 不會(huì)強(qiáng)制要求一行文本全部放在同一個(gè)block中. 但Split不會(huì)把同一行文本分配在2個(gè)不同的Input Split中.


幾點(diǎn)要注意的地方:

  • Controlling the maximum line length 如果使用 TextInputFormat, User可以對每行的長度進(jìn)行控制. -- mapreduce.input.linerecordreader.line.maxlength - KeyValueTextInputFormat 若每一行已經(jīng)具備了明文的<Key,Value> 結(jié)構(gòu), 可使用這個(gè)子類方便的實(shí)現(xiàn)Input的格式指定. 利用參數(shù) mapreduce.input.keyvaluelinerecordreader.key.value.separator 指定每行的分隔符. 默認(rèn)是tab鍵. - 若將多行指定為一條 record, 請研究 NLineInputFormat .

Multiple Inputs
MultipleInputs.addInputPath(job, path1, TextInputFormat.class, MaxTemperatureMapper.class);MultipleInputs.addInputPath(job, path2, TextInputFormat.class, MaxTemperatureMapper.class);

Output Formats
OutputFormat 的類結(jié)構(gòu)圖如下所示:


直接跳到多目標(biāo)輸出:
Multiple Outputs
默認(rèn)情況下, FileOutputFormat 和 其子類將會(huì)將reducer的輸出結(jié)果以 這樣的形式命名: "part-r-00000", 如果我們要想達(dá)到"不同reducer輸出到不同的路徑", 研究一下 MultipleOutputs 這個(gè)類.
example Partitioning data

還是之前 天氣數(shù)據(jù)的例子, 根據(jù)天氣狀態(tài)(station)進(jìn)行partition.
方法: 在reducer處理過程中, 對station進(jìn)行處理 -- 1. 把擁有相同station的map輸出 partition到同一個(gè); 2. 設(shè)置reducer個(gè)數(shù)等于station的種類數(shù). partitioner的長相:
public class StationPartitioner extends Partitioner<LongWritable, Text> { private NcdcRecordParse parser = new NcdcRecordParser(); @Override pubic int getPartition(LongWritable key, Text value, int numPartitions) { parser.parse(value); } private int getPartition(String stationId) { /// }}

這里省略掉的 getPartition 方法, 簡單的說是把已有的stationID 轉(zhuǎn)化成 partition 的索引.
思考: 這樣做的方式存在哪些不足?
實(shí)現(xiàn)partition需要我們已經(jīng)對輸出信息掌握, 比如這里 要先知道都有哪些station, 才能進(jìn)行處理. 即使事先給你一個(gè)參考字典, 但是若出現(xiàn)了未知的情況, 難免會(huì)有麻煩.
Partition的個(gè)數(shù)被人為的指定了, 這樣極可能導(dǎo)致uneven-sized partitions -- 絕大多數(shù)reducer處理非常少量的數(shù)據(jù), 這決不是一種高效的思路. 如果極個(gè)別的reducer 消耗時(shí)間明顯長于其它的reducer, 那么這樣reducer將直接決定job的執(zhí)行時(shí)間.

為了讓Job盡快完成, 默認(rèn)使用 HashPartitioner 完成Partition (盡量以免 unevenly-sized partition)
multipleoutputs 例子
public class PartitionByStationUsingMultipleOutputs extends Configured implement Tool { static class StationMapper extends Mapper<LongWritable, Text, Text, Text> { private NcdcRecord parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value); context.write(new Text(parser.getStationId()), value); } } static class MultipleOutputsReducer extends Reducers<Text, Text, NullWritable, Text> { private MultipleOutputs<NullWritable, Text> multipleOutputs; @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs<NullWritable, Text>(context); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value: values) { multipleOutputs.write(NullWritable.get(), value, key.toString()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputANdOutput(this, getConf(), args); if (job == null) return -1; job.setMapperClass(StationMapper.class); job.setMapOutputKeyClass(Text.class); job.setReducerClass(MultipleOutputReducer.class); job.setOutputKeyClass(NullWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new PartitionByStationUsingMultipleOutputs(), args); System.exit(exitCode); } }}

注, 注意 MultipleOutputs.write() 寫的key, value.
我想目標(biāo)輸出創(chuàng)建任意的子目錄>? 沒問題, 看這段代碼吧
@Override protected void reduce(Text key, Iterable<Text>values, Context context) throws IOException, InterruptedException { for (Text value : values) { parser.parse(value); String basePath = String.format("%s/%s/part", parser.getStationId(), parser.getYear()); multipleOutputs.write(NullWritable.get(), value, basePath); }}

注: LazeOutput
FileOutputFormat 創(chuàng)建的 part-r-nnnnn files 即使這個(gè)文件是空的(大小為0) , 也會(huì)同樣創(chuàng)建. 如果不希望這樣, 可使用 LazyOutputFormat. 使用 Streaming的話則是 -lazyOutput 參數(shù).
MapReduce Features
這一章, 討論 counter, joining, 和 sorting
Counter
在任務(wù)執(zhí)行過程中的一些任務(wù)信息的反饋, 有些內(nèi)容我們希望看到稍加統(tǒng)計(jì)的結(jié)果, 由于統(tǒng)計(jì)本身并不復(fù)雜, 也不會(huì)用到太多高深的技巧, 大多是對job, task等的計(jì)數(shù)類反饋, 所以這里先來簡單了解一下Counter -- 根據(jù)所有計(jì)數(shù)對象的類型, 分參悟了以下幾種:
MapReduce task counters -- org.apache.hadoop.mapreduce.TaskCounter
Filesystem counters -- org.apache.hadoop.mapreduce.FileSYstemCounter
FileInputFormat counters -- org.apahce.hadoop.mapreduce.lib.input.FileInputFormatCounter
FileOutputFormat counters -- org.apahce.hadoop.mapreduce.lib.output.FileOutputFormatCounter
Job counters -- org.apache.hadoop.mapreduce.JobCounter

Task counters
顯然, 這是一類針對task而收集信息的工具. 例如 MAP_INPUT_RECORDS 等, 有很多我們在MapReduce WebUI 上看到的信息, 其實(shí)都是出自于它們的返回結(jié)果. 下面來見表:
MAP_INPUT_RECORDS 統(tǒng)計(jì)每個(gè)map處理的records個(gè)數(shù). 最后聚合, 得到整個(gè)Job的輸入record個(gè)數(shù).
SPLIT_RAW_BYTES input-split 對象的bytes, 由于是在原來輸入數(shù)據(jù)又增加了分割的offset, 因此會(huì)大于真正的 total input size.
MAP_OUTPUT_RECORDS map output 產(chǎn)出的record個(gè)數(shù). 通過每個(gè)map的OutputCollector()調(diào)用其 collect()方法來完成.

MAO_OUTPUT_BYTES map output產(chǎn)出的非壓縮類bytes大小, 通過每個(gè)map的OutputCollector()調(diào)用其 collect()方法來完成.

MAP_OUTPUT_MATERIALIZE_BYTES map output 直接向Disk產(chǎn)出的bytes大小(包括壓縮類的文件的大小)

COMBINE_INPUT_RECORDS 被所有combiners處理過的 input records個(gè)數(shù).

COMBINE_OUTPUT_RECORDS 被所有combiners處理過的 output records個(gè)數(shù).

REDUCE_INPUT_GROUPS 所有reducer處理的key種類數(shù), 對reducer執(zhí)行reduce() 方法累增得到.

REDUCE_INPUT_RECORDS 所有reducer處理的 input records 個(gè)數(shù).

REDUCE_OUTPUT_RECORDS 所有reducer處理的 output records 個(gè)數(shù).

REDUCE_SHUFFLE_BYTES map output 到 reducer過程中 shuffle用到的bytes大小

SPILLED_RECORDS 所有map/reduce過程中spill到磁盤中的records個(gè)數(shù)

CPU_MILLISECONDS CPU 對該任務(wù)的消耗 毫秒

PHYSICAL_MEMORY_BYTES 任務(wù)消耗的內(nèi)存大小

VIRTUAL_MEMORY_BYTES 任務(wù)消耗的虛擬內(nèi)存大小

COMMITTED_HEAP_BYTES JVM可用的內(nèi)存大小

GC_TIME_MILLIS GC消耗時(shí)間 毫秒

SHUFFLED_MAPS map output 產(chǎn)生的文件數(shù), (被shuffle之后再由reducer處理)

FAILED_SHUFFLE shuffle過程中 map output copy失敗的個(gè)數(shù)

MERGED_MAP_OUTPUTS map output 被合并的個(gè)數(shù) (在Shuffle端處理)

BYTES_READ filesystem task counter, map/reduce 讀入的bytes

BYTES_WRITTEN filesystem task counter, map/reduce 寫入的bytes

READ_OPS filesystem task counter, map/reduce 讀的操作個(gè)數(shù)

LARGE_READ_OPS filesystem task counter, map/reduce 讀的操作個(gè)數(shù), 限定于 large read(如對于大型目錄列表的讀入)

WRITE_OPS filesystem task counter, map/reduce 寫的操作個(gè)數(shù)

Job counters
job counter 與其它類的counter 有所不同, 全由application master操縱. 它們用來對Job進(jìn)行統(tǒng)計(jì)匯總, 如:
TOTAL_LAUNCHED_MAPS mapper 啟動(dòng)個(gè)數(shù)

TOTAL_LAUNCHED_REDUCES reducer 啟動(dòng)個(gè)數(shù)

TOTAL_LAUNCHED_UBERTASKS uber task的個(gè)數(shù)

NUM_FAILED_MAPS mapper失敗個(gè)數(shù)

NUM_FAILED_REDUCES reducer失敗個(gè)數(shù)

NUM_KILLED_MAPS mapper killed 個(gè)數(shù)

NUM_KILLED_REDUCES reducer killed 個(gè)數(shù)

其它內(nèi)容暫時(shí)略過.
sorting
The ability to sort data is at the heart of MapReduce.

這一部分將會(huì)接觸到MapReduce中如何使用Sort來重新組織數(shù)據(jù)流, 以及不同的sort 方式.
PREPARATION
以之前的溫度數(shù)據(jù)為例, 由于要求某些溫度數(shù)據(jù)的最大值, 而原始數(shù)據(jù)是TEXT結(jié)構(gòu), 顯然不能應(yīng)照數(shù)值型進(jìn)行排序, 那么 TEXT -> INT(或其它的FLOAT, DOUBLE) 轉(zhuǎn)化過程就要考慮是否有invalid data. 在map端要對不合理的數(shù)據(jù)過濾處理. 下面看一個(gè)實(shí)現(xiàn) 數(shù)值轉(zhuǎn)型的 程序
public class SortDataPreprocessor extends Configured implements Toll { static class CleannerMapper extends Mapper<LongWritable, Text, IntWritable, Text> { private NcdcRecordParser parser =new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Context context) { parser.parse(value); if (parser.isValidTemprature)) { context.write(new IntWritable(parser.getAirTemperature()), value); } } } @Override public int run(String[] args) throws Exception{ Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1 } job.setMapperClass(CleannerMapper.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueKeyClass(Text.class); job.setNumReduceTasks(0); job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setCompressorClass(job.GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SortDataPreprocessor(), args); System.exit(exitCode); }}

PARITIAL SORT
默認(rèn)情況下, MapReduce會(huì)根據(jù)input records的key進(jìn)行排序.
Example 9-4. A MapReduce program for sorting a SequenceFile with IntWritable keys using the default HashPartitioner

public class SortByTemperatureUsingHashPartitioner extends Configured implements Tool{ @Override public int run (String[] args) throws Exception{ Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) return -1; job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputKeyClass(IntWrtable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.Block); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SortByTemperatureUsingHashPartitioner(), args); System.exit(exitCode); } }

w假定使用30個(gè)reducer來執(zhí)行程序:
$ hadoop jar hadoop-examples.jar SortByTemperatureUsingHashPartitioner \ -D mapreduce.job.reduces=30 input/folder output/folder

TOTAL SORT
How can you produce a globally sorted file using Hadoop? 如果只有一個(gè)partition 可能答案就解決了, 但我們面臨問題是多個(gè). 要如何做到全局性排序呢. -- 把構(gòu)造partitioner時(shí)與要排序的值結(jié)合到一起. 比如我們有4個(gè)partition, 然后把 < -10度的放在第一個(gè), [-10, 0) 放在第2個(gè), [0, 10) 放在第3個(gè), 其余是第4個(gè). 然后在每個(gè)partition中對溫度進(jìn)行排序.

MapReduce program for sorting a SequenceFile with IntWritable keys using the TotalOrderPartitioner to globally sort the day
public class SortByTemperatureUsingTotalOrderPartitioner extends Configured implements Tool {@Override public int run(String[] args) throws Exception{ Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) return -1 ; }job.setInputFormatClass(SequenceFileInputFormat.class);job.setOutputKeyClass(IntWritable.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.classs); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); job.setPartitionerClass(TotalORderPartitioner.class); InputSampler.Sampler<IntWrtable, Text> sampler = new InputSampler.RandomSample<IntWritable, Text>(0.1, 10000, 10); InputSampler.writePartitionFile(job, sampler); // Add to DistributedCacheConfiguration conf = job.getConfiguration(); String partitionFile = TotalOrderPartitioner.getPartitionFile(conf); URI partitionUri = new URI(partitionFile); job.addCacheFile(partitionUri); return job.waitForCompletion(true) ? 0 : 1; }public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SortByTemperatureUsingTotalOrderPartitioner(), args); System.exit(exitCode); }}

Secondary Sort
Application to find the maximum temperature by sorting temperatures in the key

public class MaxTemperatureUsingSecondarySort extends Tool{ static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value); if (parser.isValidTemperature()){ context.write(new IntPair(parser.getYearInt(), parser.getAirTemperture()), NullWritable.get()); } } } static class MaxTemperatureReducer extends Reducer<IntPair, NullWritalbe, IntPair, NullWritalbe> { @Override protected void reduce(IntPair key, Iterable<NullWritable> values, Context context throws IOException, InterruptedException{ context.write(key, NullWritable.get()); } } public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> { @Override public int getPartition(IntPair key, NullWritable value, int numPartitions){ // multiply by 127 to perform some mixing return Math.abs(key.getFirst() * 127 ) % numPartitions; } } public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst()); if (cmp != 0) return cpm; return - IntPair.compare(ip1.getSecond(), ip2.getSecond()); } } public static class GroupComparator extends WritableComparator { protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst()); return cmp; } @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job==null) return -1; job.setMapperClass(MaxTemperatureMapper.class); job.setSortComparatorClass(KeyComparator.class); job.setGroupingComparatorClass(GroupComparator.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(IntPair.class); job.setOutputValueClass(NullWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args ) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args); System.exit(exitCode); }}

STREAMING
to do a secondary sort in Streaming, we can take advantage of a couple of library classes that Hadoop provides.
$ hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \ -D stream.num.map.output.key.fields=2 \ -D mapreduce.partition.keypartitioner.options=-k1, 1 \ -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ -D mapreduce.partition.keycomparator.options="-k1n k2nr" \ -input input/ncdc/all \ -output output-secondarysport-streaming \ -mapper ... -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ -reducer ch...

joining datasets


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市存璃,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖牙躺,帶你破解...
    沈念sama閱讀 219,427評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異腕扶,居然都是意外死亡孽拷,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門半抱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來脓恕,“玉大人,你說我怎么就攤上這事窿侈×夺#” “怎么了?”我有些...
    開封第一講書人閱讀 165,747評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵史简,是天一觀的道長乃秀。 經(jīng)常有香客問我,道長厨钻,這世上最難降的妖魔是什么铅忿? 我笑而不...
    開封第一講書人閱讀 58,939評(píng)論 1 295
  • 正文 為了忘掉前任匆赃,我火速辦了婚禮逗扒,結(jié)果婚禮上茎匠,老公的妹妹穿的比我還像新娘归薛。我一直安慰自己轰坊,他們只是感情好冀宴,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,955評(píng)論 6 392
  • 文/花漫 我一把揭開白布火本。 她就那樣靜靜地躺著危队,像睡著了一般。 火紅的嫁衣襯著肌膚如雪钙畔。 梳的紋絲不亂的頭發(fā)上茫陆,一...
    開封第一講書人閱讀 51,737評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音擎析,去河邊找鬼簿盅。 笑死,一個(gè)胖子當(dāng)著我的面吹牛揍魂,可吹牛的內(nèi)容都是我干的桨醋。 我是一名探鬼主播,決...
    沈念sama閱讀 40,448評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼现斋,長吁一口氣:“原來是場噩夢啊……” “哼喜最!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起庄蹋,我...
    開封第一講書人閱讀 39,352評(píng)論 0 276
  • 序言:老撾萬榮一對情侶失蹤瞬内,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后限书,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體虫蝶,經(jīng)...
    沈念sama閱讀 45,834評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,992評(píng)論 3 338
  • 正文 我和宋清朗相戀三年倦西,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了能真。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,133評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡扰柠,死狀恐怖粉铐,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情耻矮,我是刑警寧澤秦躯,帶...
    沈念sama閱讀 35,815評(píng)論 5 346
  • 正文 年R本政府宣布忆谓,位于F島的核電站裆装,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜哨免,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,477評(píng)論 3 331
  • 文/蒙蒙 一茎活、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧琢唾,春花似錦载荔、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至普办,卻和暖如春工扎,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背衔蹲。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評(píng)論 1 272
  • 我被黑心中介騙來泰國打工肢娘, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人舆驶。 一個(gè)月前我還...
    沈念sama閱讀 48,398評(píng)論 3 373
  • 正文 我出身青樓橱健,卻偏偏與公主長得像,于是被迫代替她去往敵國和親沙廉。 傳聞我的和親對象是個(gè)殘疾皇子拘荡,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,077評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)撬陵,斷路器俱病,智...
    卡卡羅2017閱讀 134,671評(píng)論 18 139
  • MapReduce編程重點(diǎn)把握 MapReduce核心概念 思考幾個(gè)問題 詞頻統(tǒng)計(jì)wordcount的具體執(zhí)行過程...
    胖胖的大羅閱讀 720評(píng)論 0 1
  • github鏈接 針對Hive的優(yōu)化主要有以下幾個(gè)方面: map reduce file format shuff...
    zoyanhui閱讀 6,174評(píng)論 2 33
  • Hadoop是一個(gè)十分流行的分布式存儲(chǔ)和計(jì)算框架,也是業(yè)內(nèi)大數(shù)據(jù)處理和分析最通用的框架之一袱结。 Hadoop2.0 ...
    一只小哈閱讀 1,782評(píng)論 1 10
  • 一直有理財(cái)?shù)男牧料叮瑓s缺少行動(dòng),上一次記賬可能還是5年前垢夹,上班兩年多也沒存下多少溢吻。女生都比較能買買買,有了娃之后買買買...
    7128閱讀 161評(píng)論 0 0