昨天做了一個(gè)小調(diào)查论熙,說(shuō)看看想看些啥。大概的分布是這樣的摄狱,一個(gè)1代表一個(gè)投票脓诡。看來(lái)還是2媒役、3比較多祝谚。
11111 希望看到"算法"回復(fù)1。
111111111111 希望看到"技術(shù)細(xì)節(jié)"回復(fù)2酣衷。
111111111 希望看到"成長(zhǎng)和讀書(shū)"分享回復(fù)3交惯。
還好多人說(shuō)想看我長(zhǎng)啥樣,嘛穿仪,在我比較正經(jīng)的時(shí)候席爽,就長(zhǎng)下面這樣。
大圖預(yù)警0∑V欢汀!钠龙!
日常呢炬藤,就長(zhǎng)這樣御铃。
長(zhǎng)這樣。
好了切入正題沈矿,今天開(kāi)始挖一個(gè)新坑上真,就是實(shí)現(xiàn)一些基于MapReduce的一些圖算法,比如Pregel啊羹膳,PageRank啊睡互,LPA啊,SLPA啊等等陵像,坑很大就珠,非常大,慢慢寫(xiě)吧醒颖,都不會(huì)講非常難的理論問(wèn)題妻怎,以代碼細(xì)節(jié)為主。泞歉。
先上一個(gè)我思維拓展的時(shí)候?qū)懙胘ava實(shí)現(xiàn)的MapReduce的基礎(chǔ)版本吧逼侦,寫(xiě)得不是很好,我也在慢慢完善腰耙,Go語(yǔ)言版本的還在寫(xiě)榛丢,真是慚愧感覺(jué)一直在吃老本。
今天實(shí)現(xiàn)的一個(gè)內(nèi)容是挺庞,將一個(gè)List<Integer>進(jìn)行map操作變成另外一個(gè)List晰赞,然后通過(guò)reduce進(jìn)行加和。
靈感來(lái)源來(lái)自于《MapReduce: Simplified Data Processing on Large Clusters 》這篇論文选侨,大家可以看看我之前的文章掖鱼,在了解完什么是Mapreduce。然后先去看看這篇論文侵俗,啟發(fā)很多锨用。
首先我們從兩個(gè)接口入手丰刊,MapFunction和ReduceFunction隘谣,這是MapReduce的兩個(gè)靈魂接口,由使用者去定義啄巧,這里我定義的都是最最簡(jiǎn)單的版本寻歧,暫時(shí)并沒(méi)有進(jìn)行泛化的能力。
MapFunction定義了一個(gè)接口秩仆,類型為V码泛,然后通過(guò)一個(gè)叫map的方法,輸出一個(gè)類型為V的值澄耍。
public interface MapFunction<V> {V map(V target);}
ReduceFunction定義了一個(gè)接口噪珊,類型為V晌缘,然后通過(guò)一個(gè)叫reduce的方法,通過(guò)聚合兩個(gè)V類型的值痢站,輸出一個(gè)類型為V的值磷箕。
public interface ReduceFunction<V> {V reduce(V A,V B);}
上面兩個(gè)方法定義了MapReduce的核心內(nèi)容,就是任務(wù)切分和任務(wù)聚合阵难。有小伙伴不理解這里為什么使用泛型岳枷,因?yàn)樽鳛橐粋€(gè)框架來(lái)說(shuō),我是不知道使用者想使用什么樣的類型進(jìn)行計(jì)算的(雖然這里我知道我接下來(lái)就要用Integer進(jìn)行計(jì)算了)呜叫,所以必須不能指定類型空繁,否則這個(gè)框架就永遠(yuǎn)只能用Integer類型了。
那我們的map和reduce任務(wù)要跑在哪里呢朱庆?有小伙伴說(shuō)跑在分布式環(huán)境里盛泡。對(duì)沒(méi)錯(cuò),最終目的是跑在分布式環(huán)境里娱颊。但是在這里饭于,咱就偷個(gè)懶,先用多線程來(lái)模擬這個(gè)過(guò)程维蒙,并且使用內(nèi)存來(lái)作為消息機(jī)制掰吕。
我是i5雙核的CPU,經(jīng)驗(yàn)值下面颅痊,只有兩個(gè)cpu的話殖熟,創(chuàng)建4個(gè)線程對(duì)于性能來(lái)說(shuō)比單線程好。(畢竟線程切換存在開(kāi)銷(xiāo)斑响,控制得不好多線程肯定是比單線程慢的菱属,不服來(lái)辯)
<pre>
public class CPUs
{public static final int threads = 4;
private static final java.util.concurrent.ExecutorService pool = Executors.newFixedThreadPool(threads);
public static Future submit(Callable task){return pool.submit(task);}
public static void execute(Runnable task){pool.execute(task);}
public static void shutdown(){pool.shutdown();}}
</pre>
好了,MapFunction有了舰罚,CPUs也有了纽门,接下來(lái)可以開(kāi)始寫(xiě)提交器了。任務(wù)提交器是什么東西呢营罢,就是把一個(gè)map任務(wù)進(jìn)行切分赏陵,并且交給多個(gè)線程去異步執(zhí)行,然后最終把結(jié)果匯總還給客戶端的一個(gè)類饲漾。下面的類都比較大蝙搔,建議在電腦端看。
這個(gè)類做了什么事呢考传?就是把List封裝起來(lái)吃型,然后把任務(wù)分發(fā)給多個(gè)線程去執(zhí)行,使用CountDownLatch來(lái)保證所有的線程都已經(jīng)完成計(jì)算僚楞,然后再把結(jié)果返回給客戶端勤晚。
<pre>
public class MapSubmitter<V> {private List<V> target ;private int length;public MapSubmitter(List<V> target){this.target = target;this.length = target.size();}public List<V> map(final MapFunction<V> mapFunction){final CountDownLatch countDownLatch = new CountDownLatch(length);final List<V> result = new ArrayList<V>();for(int i = 0 ; i < length ; i++) {final V current = target.get(i);final int currentIndex = i;try {Future<V> future = CPUs.submit(new Callable<V>() {public V call() throws Exception {V result = mapFunction.map(current);//Printer.println(currentIndex); return result;}});result.add(i,future.get());countDownLatch.countDown();}catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}try{countDownLatch.await();} catch (InterruptedException e) {}finally {return result;}}}
</pre>
這個(gè)類又做了什么事呢枉层?List封裝起來(lái),交給很多線程去執(zhí)行赐写,然后維護(hù)一個(gè)最終的結(jié)果類V返干,并為這個(gè)結(jié)果提供線程安全的保護(hù),避免因?yàn)槎嗑€程操作同一個(gè)結(jié)果造成結(jié)果錯(cuò)誤血淌。
<pre>
public class ReduceSubmitter<V> {private List<V> target ;private int length;private V result ;Lock lock = new ReentrantLock();public ReduceSubmitter(List<V> target){this.target = target;this.length = target.size();this.result = target.get(0);}public V reduce(final ReduceFunction<V> reduceFunction){final CountDownLatch countDownLatch = new CountDownLatch(length);countDownLatch.countDown();for(int i = 1 ; i < length ; i ++) {final V current = target.get(i);CPUs.execute(new Runnable() {public void run() {lock.lock();V next = reduceFunction.reduce(ReduceSubmitter.this.result,current);ReduceSubmitter.this.result = next;lock.unlock();countDownLatch.countDown();}});}try{countDownLatch.await();} catch (InterruptedException e) {}finally {return this.result;}}}
</pre>
好咯矩欠,寫(xiě)完了就開(kāi)始測(cè)試了,主要就創(chuàng)建一個(gè)長(zhǎng)度為10的數(shù)組,然后進(jìn)行map操作把每一個(gè)值都進(jìn)行平方悠夯,然后通過(guò)reduce操作進(jìn)行求和癌淮,代碼比較簡(jiǎn)單就不一一細(xì)說(shuō)了,有啥問(wèn)題后臺(tái)留言交流沦补。
<pre>
public class TestMapReduce {public static void main(String[] args){//僅僅是為了耗時(shí)而模擬的一個(gè)好像很復(fù)雜的操作乳蓄,不然太快了。final int junkTime = 1000000;//初始化一個(gè)想進(jìn)行操作的數(shù)組List<Integer> integerList = new ArrayList<Integer>();for(int i = 0 ; i < 10 ; i++){integerList.add(i);}int length = integerList.size();// printer.printList(integerList); Long start = System.currentTimeMillis();//進(jìn)行map操作并返回結(jié)果MapSubmitter<Integer> mapSubmitter = new MapSubmitter<Integer>(integerList);integerList = mapSubmitter.map(new MapFunction<Integer>() {public Integer map(Integer target) {Double b = 0D;for(int i = 0 ; i <junkTime;i++){b += Math.exp(i);}return target * target;}});Printer.println("mapreduce cost time:" + (System.currentTimeMillis() - start)); start = System.currentTimeMillis();
//進(jìn)行reduce操作并返回結(jié)果
ReduceSubmitter<Integer> reduceSubmitter = new ReduceSubmitter<Integer>(integerList);Integer resultInteger = reduceSubmitter.reduce(new ReduceFunction<Integer>() {public Integer reduce(Integer A, Integer B) {Double b = 0D;for(int i = 0 ; i <junkTime;i++){b += Math.exp(i);}return A+B;}});Printer.println("reduce cost time:" + (System.currentTimeMillis() - start)); CPUs.shutdown();}}
</pre>
好啦夕膀,今天的MapReduce就說(shuō)到這里虚倒。經(jīng)過(guò)我的實(shí)驗(yàn),無(wú)論多少次實(shí)驗(yàn)产舞,都是比單線程快那么一丟丟的魂奥,這都要得益于那個(gè)耗時(shí)的操作,模糊了線程切換帶來(lái)的時(shí)間損耗易猫,畢竟不怎么耗時(shí)的操作來(lái)說(shuō)耻煤,單線程其實(shí)是絕對(duì)比多線程快的。
細(xì)心的同學(xué)會(huì)發(fā)現(xiàn)准颓,好像這個(gè)并不符合論文里面的標(biāo)準(zhǔn)吖哈蝇。嗯吶是的,這個(gè)只是我心血來(lái)潮寫(xiě)的簡(jiǎn)單版本攘已。問(wèn)題有諸如炮赦,我們上面的map操作好像不能變成其他類型吖,怎么實(shí)現(xiàn)WordCount呢样勃?以及Driver好像沒(méi)有進(jìn)行任務(wù)切分和分發(fā)吖吠勘?好像也沒(méi)有suffle操作啊彤灶?好像整個(gè)過(guò)程也不是嚴(yán)格多線程的吖看幼,怎么辦呢批旺?下一次給大家分享一個(gè)更加完整的MapReduce幌陕。
希望大家都能在自己的機(jī)器上跑成功。源碼都在上面了我就不放鏈接了汽煮。
好了搏熄,如果有任務(wù)問(wèn)題請(qǐng)后臺(tái)留言棚唆,我會(huì)看的。如果對(duì)您有一點(diǎn)點(diǎn)的幫助或者啟發(fā)的話心例,幫忙轉(zhuǎn)發(fā)或者點(diǎn)個(gè)贊都是對(duì)我很大的支持喔宵凌,么么噠。