DDIA Ch10

MapReduce

MapReduce就是分布式的unix tools, 如果他是在一臺(tái)機(jī)器上跑,那就是unix pipe

Hadoop implementation of MapReduce, that filesystem is called HDFS (Hadoop Distributed File System), an open source reimplementation of Google File System (GFS)

Various other distributed filesystems besides HDFS exist, such as GlusterFS and the Quantcast File System (QFS) [20]. Object storage services such as Amazon S3, Azure Blob Storage, and OpenStack Swift [21]

Job Execution

MapReduce有兩個(gè)callback function喇闸, Mapper, Reducer

Mapper

  • The mapper is called once for every input record, and its job is to extract the key and value from the input record. For each input, it may generate any number of key-value pairs (including none). It does not keep any state from one input record to the next, so each record is handled independently.

Reducer

  • The MapReduce framework takes the key-value pairs produced by the mappers, collects all the values belonging to the same key, and calls the reducer with an iterator over that collection of values. The reducer can produce output records (such as the number of occurrences of the same URL).

Distributed execution of MapReduce

![[DDIA-Figure-10-1.png]]

為了增加locality, 每個(gè)mapper function 都直接被copy 到存它要 map 的機(jī)器上面远舅,如果mapper 是用 java 寫(xiě)的闰蛔,直接copy jar file 進(jìn)那個(gè)machine,然后在那個(gè)機(jī)器上面跑 mapper function (increase locality)图柏, 這樣直接省去了通過(guò)network copy file 的時(shí)間

整個(gè)過(guò)程就是 partitioning mapper function into different machine
reducer 也是一樣的序六,MapReduce framework 用 key 的 hash 來(lái)決定分配到哪個(gè) reducer function

每個(gè)key value pair 都要進(jìn)行排序, 由于dataset 過(guò)大蚤吹, 不可能直接在一臺(tái)機(jī)器上sort例诀, 所以排序是在每個(gè)機(jī)器上先執(zhí)行一次,并且存到local disk, 然后當(dāng)mapper 讀完所有數(shù)據(jù)并且寫(xiě)完 sorted output files之后裁着, MapReduce scheduler 通知所有 reducer 他們可以從 mapper 那里讀數(shù)據(jù)了繁涂, reducer 拿到所有屬于自己的 排好序的 key value之后,進(jìn)行merge (圖中右半邊) 跨算,這樣就排好序了

最后reducer 執(zhí)行他要執(zhí)行的邏輯(分析數(shù)據(jù)爆土, count, 從大到小排序之類(lèi)的)诸蚕,然后寫(xiě)入分布式的存儲(chǔ)系統(tǒng)

The key-value pairs must be sorted, but the dataset is likely too large to be sorted with a conventional sorting algorithm on a single machine. Instead, the sorting is per‐ formed in stages. First, each map task partitions its output by reducer, based on the hash of the key. Each of these partitions is written to a sorted file on the mapper’s local disk, using a technique similar to what we discussed in “SSTables and LSM- Trees” on page 76.

Whenever a mapper finishes reading its input file and writing its sorted output files, the MapReduce scheduler notifies the reducers that they can start fetching the output files from that mapper. The reducers connect to each of the mappers and download the files of sorted key-value pairs for their partition. The process of partitioning by reducer, sorting, and copying data partitions from mappers to reducers is known as the shuffle [26] (a confusing term—unlike shuffling a deck of cards, there is no ran‐ domness in MapReduce).

The reduce task takes the files from the mappers and merges them together, preserv‐ ing the sort order. Thus, if different mappers produced records with the same key, they will be adjacent in the merged reducer input.

The reducer is called with a key and an iterator that incrementally scans over all records with the same key (which may in some cases not all fit in memory). The reducer can use arbitrary logic to process these records, and can generate any number of output records. These output records are written to a file on the distributed filesys‐ tem (usually, one copy on the local disk of the machine running the reducer, with replicas on other machines).

一個(gè)MapReduce job 本身能力有限步势,想要做復(fù)雜的分析需要多個(gè)job, 這就形成了 workflow (多個(gè)job 組成的workflow)背犯,而且mapreduce 一定要等到之前的job完成才可以執(zhí)行下一個(gè)job坏瘩, 因?yàn)樗看味际菍?xiě)入一個(gè)file
Hadoop 本身并沒(méi)有workflow scheduler, 所有有多個(gè)scheduler 被開(kāi)發(fā)出來(lái)了

A batch job’s output is only considered valid when the job has completed successfully (MapReduce discards the partial output of a failed job). Therefore, one job in a work‐ flow can only start when the prior jobs—that is, the jobs that produce its input direc‐ tories—have completed successfully. To handle these dependencies between job executions, various workflow schedulers for Hadoop have been developed, including Oozie, Azkaban, Luigi, Airflow, and Pinball [28].

workflow scheduler 還可以幫忙管理jobs漠魏, 讓公司不同組可以用不同的 mapreduce 的output

These schedulers also have management features that are useful when maintaining a large collection of batch jobs. Workflows consisting of 50 to 100 MapReduce jobs are common when building recommendation systems [29], and in a large organization, many different teams may be running different jobs that read each other’s output. Tool support is important for managing such complex dataflows.

我覺(jué)得我要看Ian 用哪個(gè)開(kāi)源的版本倔矾,然后直接從這里讀一些 guide…… 或者搜一些這些工具的實(shí)戰(zhàn)教程, 上手用柱锹。 這樣應(yīng)該可以節(jié)省很多上課需要的時(shí)間

Various higher-level tools for Hadoop, such as Pig [30], Hive [31], Cascading [32], Crunch [33], and FlumeJava [34], also set up workflows of multiple MapReduce stages that are automatically wired together appropriately.

在 batch processing 語(yǔ)境下哪自, join means resolving all occurrences of some assciation within a dataset
![[DDIA-Figure-10-2.png]]

就是跟DB join一個(gè)意思,把所有user profile 的數(shù)據(jù)合并禁熏,但這樣直接每個(gè)user 都去DB 查詢相關(guān)數(shù)據(jù)的話會(huì)直接 overwhelm DB壤巷, 所以為了 achieve good throughput in a batch process, the computation must be (as much as possible) local to one machine.

In order to achieve good throughput in a batch process, the computation must be (as much as possible) local to one machine. Making random-access requests over the network for every record you want to process is too slow. Moreover, querying a remote database would mean that the batch job becomes nondeterministic, because the data in the remote database might change.

所以, 更好的解決方案就是用ETL process 來(lái)復(fù)制一份user database瞧毙, 然后直接根據(jù) user activity events 的分布來(lái)把每個(gè)用戶的 profile 存到相關(guān) machine 上胧华, 這樣就有l(wèi)ocality了

Thus, a better approach would be to take a copy of the user database (for example, extracted from a database backup using an ETL process—see “Data Warehousing” on page 91) and to put it in the same distributed filesystem as the log of user activity events. You would then have the user database in one set of files in HDFS and the user activity records in another set of files, and could use MapReduce to bring together all of the relevant records in the same place and process them efficiently.

所以mapper 確保了locality,這樣reducer 的 throughput 就提高了

In a sort-merge join, the mappers and the sorting process make sure that all the nec‐ essary data to perform the join operation for a particular user ID is brought together in the same place: a single call to the reducer.

這個(gè)角度很好宙彪,key 就像一個(gè)地址矩动,reducer 是被分配的 destination 的地址

One way of looking at this architecture is that mappers “send messages” to the reduc‐ ers. When a mapper emits a key-value pair, the key acts like the destination address to which the value should be delivered. Even though the key is just an arbitrary string (not an actual network address like an IP address and port number), it behaves like an address: all key-value pairs with the same key will be delivered to the same desti‐ nation (a call to the reducer).

mapper decouples network communication from computation, reducer does the computation

這種方式跟普通DB不一樣的地方在于它直接把 索取數(shù)據(jù)和process 數(shù)據(jù)的邏輯分開(kāi)了, 普通的application 通常要先 fetch data释漆, then process (我的movie project就是這樣) MapReduce 自己會(huì)處理網(wǎng)絡(luò)問(wèn)題悲没, 所以application code 不用擔(dān)心 partial failure

Using the MapReduce programming model has separated the physical network com‐ munication aspects of the computation (getting the data to the right machine) from the application logic (processing the data once you have it). This separation contrasts with the typical use of databases, where a request to fetch data from a database often occurs somewhere deep inside a piece of application code [36]. Since MapReduce handles all network communication, it also shields the application code from having to worry about partial failures, such as the crash of another node: MapReduce trans‐ parently retries failed tasks without affecting the application logic.

其實(shí)locality 是一個(gè)不斷出現(xiàn)的 theme, 無(wú)論在內(nèi)存上灵汪, disk 上檀训, 還是這里的分布式處理柑潦, locality 永遠(yuǎn)是優(yōu)化的首選 (network 的 cache 也一樣,都是為了增加locality)
分地區(qū)的Datacenter 也是 locality (距離近的Datacenter latency 就低)

GROUP BY

Besides joins, another common use of the “bringing related data to the same place” pattern is grouping records by some key (as in the GROUP BY clause in SQL).

Map-Side joins

如果一個(gè)data set 過(guò)于大峻凫, 所有mapper 都要把output 放到相關(guān) reducer上面渗鬼, 這時(shí)候mapper 可能成為瓶頸, 于是就有了 map-side joins 來(lái)優(yōu)化
map-side joins 就是 mapper 直接做 join 操作

Broadcast hash joins

這個(gè)方法就是把一個(gè)小的 dataset 直接放到memory荧琼, 然后大的dataset 通過(guò)直接從小的 data set (in memory) 來(lái)map譬胎, 大的dataset 是partitioned, 所以每個(gè) mapper 是大的 dataset 其中一部分命锄, 但是小的dataset 很快(in memory) 所以叫 boardcast (整個(gè)小dataset 都會(huì) broadcast 給所有 mapper partition)

This simple but effective algorithm is called a broadcast hash join: the word broadcast reflects the fact that each mapper for a partition of the large input reads the entirety of the small input (so the small input is effectively “broadcast” to all partitions of the large input), and the word hash reflects its use of a hash table. This join method is supported by Pig (under the name “replicated join”), Hive (“MapJoin”), Cascading, and Crunch. It is also used in data warehouse query engines such as Impala [41].

Partitioned hash joins

如果input的數(shù)量 跟 mapper partition 的數(shù)量一樣堰乔,就可以直接按照partition的數(shù)量來(lái)分配任務(wù), 這就是partitioned hash joins

比如 用戶ID結(jié)尾只有10種可能(0-9)脐恩, 那么你分成10個(gè) mapper partition镐侯, 然后每個(gè)只 process 其中一個(gè),這樣讀取數(shù)據(jù)就小了很多

If the partitioning is done correctly, you can be sure that all the records you might want to join are located in the same numbered partition, and so it is sufficient for each mapper to only read one partition from each of the input datasets. This has the advantage that each mapper can load a smaller amount of data into its hash table.

我當(dāng)時(shí)看的時(shí)候就覺(jué)得這不就是之前的一個(gè)job 的結(jié)果嘛驶冒? 果然苟翻,所以這樣單獨(dú)調(diào)用mapper 其實(shí)是可以為了之后其他 reducer 用,本質(zhì)上就是在準(zhǔn)備各種可以直接查詢的 dataset

If a map-side merge join is possible, it probably means that prior MapReduce jobs brought the input datasets into this partitioned and sorted form in the first place. In principle, this join could have been performed in the reduce stage of the prior job. However, it may still be appropriate to perform the merge join in a separate map- only job, for example if the partitioned and sorted datasets are also needed for other purposes besides this particular join.

Batch process 的目的

Build search Indexes

MapReduce 原本就是Google為了建 index 而生的 [[DDIA要讀的paper#^6bad9b]] 只不過(guò)后來(lái)不用了 [[DDIA要讀的paper#^721545]]

Lucene/Solr 好像還在用MapReduce 來(lái)建 index [[DDIA要讀的paper#^5a0230]]

Build machine learning systems

Search indexes are just one example of the possible outputs of a batch processing workflow. Another common use for batch processing is to build machine learning systems such as classifiers (e.g., spam filters, anomaly detection, image recognition) and recommendation systems (e.g., people you may know, products you may be interested in, or related searches [29]).

所以Hadoop 是machine learning的基礎(chǔ)設(shè)施

MapReduce 這種不會(huì)篡改原始數(shù)據(jù)的模式 就區(qū)分了DB骗污, 因?yàn)樗軌蛉萑蘠uggy code崇猫, 也就是 human\ fault\ tolerance

  • Databases with read-write trans‐ actions do not have this property: if you deploy buggy code that writes bad data to the database, then rolling back the code will do nothing to fix the data in the database. (The idea of being able to recover from buggy code has been called human fault tolerance [50].)

書(shū)中這一段話很有用,可以反復(fù)看 (page 414)

  • Like Unix tools, MapReduce jobs separate logic from wiring (configuring the input and output directories), which provides a separation of concerns and ena‐ bles potential reuse of code: one team can focus on implementing a job that does one thing well, while other teams can decide where and when to run that job.

Hadoop居然用Avro需忿?诅炉! 那你必須要了解如何用Avro了,或者thrift屋厘, y總那里好像有thrift 教程

On Hadoop, some of those low-value syntactic conversions are eliminated by using more structured file formats: Avro (see “Avro” on page 122) and Parquet (see “Column-Oriented Storage” on page 95) are often used, as they provide efficient schema-based encoding and allow evolution of their schemas over time (see Chapter 4).

MapReduce 特點(diǎn)其實(shí)是它是general purpose涕烧, 可以跑任何request

When the MapReduce paper [1] was published, it was—in some sense—not at all new. All of the processing and parallel join algorithms that we discussed in the last few sections had already been implemented in so-called massively parallel processing (MPP) databases more than a decade previously [3, 40]. For example, the Gamma database machine, Teradata, and Tandem NonStop SQL were pioneers in this area [52].

The biggest difference is that MPP databases focus on parallel execution of analytic SQL queries on a cluster of machines, while the combination of MapReduce and a distributed filesystem [19] provides something much more like a general-purpose operating system that can run arbitrary programs.

所以用HDFS來(lái)存儲(chǔ)是非常合適的,尤其是需要各種變化的時(shí)候(既需要 OLTP汗洒, 也需要OLAP 的情況下)

MapReduce整體來(lái)說(shuō)適合大數(shù)據(jù)

The Hadoop ecosystem includes both random-access OLTP databases such as HBase (see “SSTables and LSM-Trees” on page 76) and MPP-style analytic databases such as Impala [41]. Neither HBase nor Impala uses MapReduce, but both use HDFS for storage. They are very different approaches to accessing and processing data, but they can nevertheless coexist and be integrated in the same system.

Materialization

把mapreduce的中間狀態(tài)寫(xiě)入一個(gè)file就是Materialization

The process of writing out this intermediate state to files is called materialization.

但這種fully materialized intermediate state 有時(shí)候沒(méi)有必要澈魄,所以有了data flow engines

Dataflow engines

In order to fix these problems with MapReduce, several new execution engines for distributed batch computations were developed, the most well known of which are Spark [61, 62], Tez [63, 64], and Flink [65, 66].

Since they explicitly model the flow of data through several processing stages, these systems are known as dataflow engines.

這句話很好的總結(jié)了dataflow engine與mapreduce的特性

Returning to the Unix analogy, we saw that MapReduce is like writing the output of each command to a temporary file, whereas dataflow engines look much more like Unix pipes.

Graph and iteratice processing

大多數(shù)機(jī)器學(xué)習(xí)都適用graph model,也對(duì)……page rank 本身就是對(duì)各種網(wǎng)頁(yè)進(jìn)行排名

Dataflow engines like Spark, Flink, and Tez (see “Materialization of Intermediate State” on page 419) typically arrange the operators in a job as a directed acyclic graph (DAG).

總結(jié)

這一章主要在講 batch processing仲翎, 從 unix tool 開(kāi)始 ( design philosophy)到 mapreduce 和 它后繼的 dataflow engine

Some of those design principles are that inputs are immutable, outputs are intended to become the input to another (as yet unknown) program, and complex problems are solved by composing small tools that “do one thing well.”

In the Unix world, the uniform interface that allows one program to be composed with another is files and pipes; in MapReduce, that interface is a distributed filesys‐ tem. We saw that dataflow engines add their own pipe-like data transport mecha‐ nisms to avoid materializing intermediate state to the distributed filesystem, but the initial input and final output of a job is still usually HDFS.

分布式批量處理主要用到了兩個(gè)方法,

  1. 拆分
  2. 容錯(cuò)

The two main problems that distributed batch processing frameworks need to solve are:

Partitioning

In MapReduce, mappers are partitioned according to input file blocks. The out‐ put of mappers is repartitioned, sorted, and merged into a configurable number of reducer partitions. The purpose of this process is to bring all the related data— e.g., all the records with the same key—together in the same place.

Post-MapReduce dataflow engines try to avoid sorting unless it is required, but they otherwise take a broadly similar approach to partitioning.

Fault tolerance

MapReduce frequently writes to disk, which makes it easy to recover from an individual failed task without restarting the entire job but slows down execution in the failure-free case. Dataflow engines perform less materialization of inter‐ mediate state and keep more in memory, which means that they need to recom‐ pute more data if a node fails. Deterministic operators reduce the amount of data that needs to be recomputed.

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末铛漓,一起剝皮案震驚了整個(gè)濱河市溯香,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌浓恶,老刑警劉巖玫坛,帶你破解...
    沈念sama閱讀 222,378評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異包晰,居然都是意外死亡湿镀,警方通過(guò)查閱死者的電腦和手機(jī)炕吸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,970評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)勉痴,“玉大人赫模,你說(shuō)我怎么就攤上這事≌裘” “怎么了瀑罗?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,983評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)雏掠。 經(jīng)常有香客問(wèn)我斩祭,道長(zhǎng),這世上最難降的妖魔是什么乡话? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,938評(píng)論 1 299
  • 正文 為了忘掉前任摧玫,我火速辦了婚禮,結(jié)果婚禮上绑青,老公的妹妹穿的比我還像新娘诬像。我一直安慰自己,他們只是感情好时迫,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,955評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布颅停。 她就那樣靜靜地躺著,像睡著了一般掠拳。 火紅的嫁衣襯著肌膚如雪癞揉。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 52,549評(píng)論 1 312
  • 那天溺欧,我揣著相機(jī)與錄音喊熟,去河邊找鬼。 笑死姐刁,一個(gè)胖子當(dāng)著我的面吹牛芥牌,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播聂使,決...
    沈念sama閱讀 41,063評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼壁拉,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了柏靶?” 一聲冷哼從身側(cè)響起弃理,我...
    開(kāi)封第一講書(shū)人閱讀 39,991評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎屎蜓,沒(méi)想到半個(gè)月后痘昌,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,522評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,604評(píng)論 3 342
  • 正文 我和宋清朗相戀三年辆苔,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了算灸。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,742評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡驻啤,死狀恐怖菲驴,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情街佑,我是刑警寧澤谢翎,帶...
    沈念sama閱讀 36,413評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站沐旨,受9級(jí)特大地震影響森逮,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜磁携,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,094評(píng)論 3 335
  • 文/蒙蒙 一褒侧、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧谊迄,春花似錦闷供、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,572評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至粮呢,卻和暖如春婿失,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背啄寡。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,671評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工豪硅, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人挺物。 一個(gè)月前我還...
    沈念sama閱讀 49,159評(píng)論 3 378
  • 正文 我出身青樓懒浮,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親识藤。 傳聞我的和親對(duì)象是個(gè)殘疾皇子砚著,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,747評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容