簡(jiǎn)約的JAVA版本MapReduce和日常No.25

昨天做了一個(gè)小調(diào)查论熙,說(shuō)看看想看些啥。大概的分布是這樣的摄狱,一個(gè)1代表一個(gè)投票脓诡。看來(lái)還是2媒役、3比較多祝谚。

11111 希望看到"算法"回復(fù)1。
111111111111 希望看到"技術(shù)細(xì)節(jié)"回復(fù)2酣衷。
111111111 希望看到"成長(zhǎng)和讀書(shū)"分享回復(fù)3交惯。

還好多人說(shuō)想看我長(zhǎng)啥樣,嘛穿仪,在我比較正經(jīng)的時(shí)候席爽,就長(zhǎng)下面這樣。

大圖預(yù)警0∑V欢汀!钠龙!

日常呢炬藤,就長(zhǎng)這樣御铃。

長(zhǎng)這樣。

好了切入正題沈矿,今天開(kāi)始挖一個(gè)新坑上真,就是實(shí)現(xiàn)一些基于MapReduce的一些圖算法,比如Pregel啊羹膳,PageRank啊睡互,LPA啊,SLPA啊等等陵像,坑很大就珠,非常大,慢慢寫(xiě)吧醒颖,都不會(huì)講非常難的理論問(wèn)題妻怎,以代碼細(xì)節(jié)為主。泞歉。

先上一個(gè)我思維拓展的時(shí)候?qū)懙胘ava實(shí)現(xiàn)的MapReduce的基礎(chǔ)版本吧逼侦,寫(xiě)得不是很好,我也在慢慢完善腰耙,Go語(yǔ)言版本的還在寫(xiě)榛丢,真是慚愧感覺(jué)一直在吃老本。

今天實(shí)現(xiàn)的一個(gè)內(nèi)容是挺庞,將一個(gè)List<Integer>進(jìn)行map操作變成另外一個(gè)List晰赞,然后通過(guò)reduce進(jìn)行加和。

靈感來(lái)源來(lái)自于《MapReduce: Simplified Data Processing on Large Clusters 》這篇論文选侨,大家可以看看我之前的文章掖鱼,在了解完什么是Mapreduce。然后先去看看這篇論文侵俗,啟發(fā)很多锨用。

首先我們從兩個(gè)接口入手丰刊,MapFunction和ReduceFunction隘谣,這是MapReduce的兩個(gè)靈魂接口,由使用者去定義啄巧,這里我定義的都是最最簡(jiǎn)單的版本寻歧,暫時(shí)并沒(méi)有進(jìn)行泛化的能力。

MapFunction定義了一個(gè)接口秩仆,類型為V码泛,然后通過(guò)一個(gè)叫map的方法,輸出一個(gè)類型為V的值澄耍。
public interface MapFunction<V> {V map(V target);}

ReduceFunction定義了一個(gè)接口噪珊,類型為V晌缘,然后通過(guò)一個(gè)叫reduce的方法,通過(guò)聚合兩個(gè)V類型的值痢站,輸出一個(gè)類型為V的值磷箕。
public interface ReduceFunction<V> {V reduce(V A,V B);}

上面兩個(gè)方法定義了MapReduce的核心內(nèi)容,就是任務(wù)切分和任務(wù)聚合阵难。有小伙伴不理解這里為什么使用泛型岳枷,因?yàn)樽鳛橐粋€(gè)框架來(lái)說(shuō),我是不知道使用者想使用什么樣的類型進(jìn)行計(jì)算的(雖然這里我知道我接下來(lái)就要用Integer進(jìn)行計(jì)算了)呜叫,所以必須不能指定類型空繁,否則這個(gè)框架就永遠(yuǎn)只能用Integer類型了。

那我們的map和reduce任務(wù)要跑在哪里呢朱庆?有小伙伴說(shuō)跑在分布式環(huán)境里盛泡。對(duì)沒(méi)錯(cuò),最終目的是跑在分布式環(huán)境里娱颊。但是在這里饭于,咱就偷個(gè)懶,先用多線程來(lái)模擬這個(gè)過(guò)程维蒙,并且使用內(nèi)存來(lái)作為消息機(jī)制掰吕。

我是i5雙核的CPU,經(jīng)驗(yàn)值下面颅痊,只有兩個(gè)cpu的話殖熟,創(chuàng)建4個(gè)線程對(duì)于性能來(lái)說(shuō)比單線程好。(畢竟線程切換存在開(kāi)銷(xiāo)斑响,控制得不好多線程肯定是比單線程慢的菱属,不服來(lái)辯)
<pre>
public class CPUs
{public static final int threads = 4;
private static final java.util.concurrent.ExecutorService pool = Executors.newFixedThreadPool(threads);
public static Future submit(Callable task){return pool.submit(task);}
public static void execute(Runnable task){pool.execute(task);}
public static void shutdown(){pool.shutdown();}}
</pre>

好了,MapFunction有了舰罚,CPUs也有了纽门,接下來(lái)可以開(kāi)始寫(xiě)提交器了。任務(wù)提交器是什么東西呢营罢,就是把一個(gè)map任務(wù)進(jìn)行切分赏陵,并且交給多個(gè)線程去異步執(zhí)行,然后最終把結(jié)果匯總還給客戶端的一個(gè)類饲漾。下面的類都比較大蝙搔,建議在電腦端看。

這個(gè)類做了什么事呢考传?就是把List封裝起來(lái)吃型,然后把任務(wù)分發(fā)給多個(gè)線程去執(zhí)行,使用CountDownLatch來(lái)保證所有的線程都已經(jīng)完成計(jì)算僚楞,然后再把結(jié)果返回給客戶端勤晚。

<pre>
public class MapSubmitter<V> {private List<V> target ;private int length;public MapSubmitter(List<V> target){this.target = target;this.length = target.size();}public List<V> map(final MapFunction<V> mapFunction){final CountDownLatch countDownLatch = new CountDownLatch(length);final List<V> result = new ArrayList<V>();for(int i = 0 ; i < length ; i++) {final V current = target.get(i);final int currentIndex = i;try {Future<V> future = CPUs.submit(new Callable<V>() {public V call() throws Exception {V result = mapFunction.map(current);//Printer.println(currentIndex); return result;}});result.add(i,future.get());countDownLatch.countDown();}catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}try{countDownLatch.await();} catch (InterruptedException e) {}finally {return result;}}}
</pre>
這個(gè)類又做了什么事呢枉层?List封裝起來(lái),交給很多線程去執(zhí)行赐写,然后維護(hù)一個(gè)最終的結(jié)果類V返干,并為這個(gè)結(jié)果提供線程安全的保護(hù),避免因?yàn)槎嗑€程操作同一個(gè)結(jié)果造成結(jié)果錯(cuò)誤血淌。

<pre>
public class ReduceSubmitter<V> {private List<V> target ;private int length;private V result ;Lock lock = new ReentrantLock();public ReduceSubmitter(List<V> target){this.target = target;this.length = target.size();this.result = target.get(0);}public V reduce(final ReduceFunction<V> reduceFunction){final CountDownLatch countDownLatch = new CountDownLatch(length);countDownLatch.countDown();for(int i = 1 ; i < length ; i ++) {final V current = target.get(i);CPUs.execute(new Runnable() {public void run() {lock.lock();V next = reduceFunction.reduce(ReduceSubmitter.this.result,current);ReduceSubmitter.this.result = next;lock.unlock();countDownLatch.countDown();}});}try{countDownLatch.await();} catch (InterruptedException e) {}finally {return this.result;}}}
</pre>
好咯矩欠,寫(xiě)完了就開(kāi)始測(cè)試了,主要就創(chuàng)建一個(gè)長(zhǎng)度為10的數(shù)組,然后進(jìn)行map操作把每一個(gè)值都進(jìn)行平方悠夯,然后通過(guò)reduce操作進(jìn)行求和癌淮,代碼比較簡(jiǎn)單就不一一細(xì)說(shuō)了,有啥問(wèn)題后臺(tái)留言交流沦补。
<pre>
public class TestMapReduce {public static void main(String[] args){//僅僅是為了耗時(shí)而模擬的一個(gè)好像很復(fù)雜的操作乳蓄,不然太快了。final int junkTime = 1000000;//初始化一個(gè)想進(jìn)行操作的數(shù)組List<Integer> integerList = new ArrayList<Integer>();for(int i = 0 ; i < 10 ; i++){integerList.add(i);}int length = integerList.size();// printer.printList(integerList); Long start = System.currentTimeMillis();//進(jìn)行map操作并返回結(jié)果MapSubmitter<Integer> mapSubmitter = new MapSubmitter<Integer>(integerList);integerList = mapSubmitter.map(new MapFunction<Integer>() {public Integer map(Integer target) {Double b = 0D;for(int i = 0 ; i <junkTime;i++){b += Math.exp(i);}return target * target;}});Printer.println("mapreduce cost time:" + (System.currentTimeMillis() - start)); start = System.currentTimeMillis();
//進(jìn)行reduce操作并返回結(jié)果
ReduceSubmitter<Integer> reduceSubmitter = new ReduceSubmitter<Integer>(integerList);Integer resultInteger = reduceSubmitter.reduce(new ReduceFunction<Integer>() {public Integer reduce(Integer A, Integer B) {Double b = 0D;for(int i = 0 ; i <junkTime;i++){b += Math.exp(i);}return A+B;}});Printer.println("reduce cost time:" + (System.currentTimeMillis() - start)); CPUs.shutdown();}}

</pre>
好啦夕膀,今天的MapReduce就說(shuō)到這里虚倒。經(jīng)過(guò)我的實(shí)驗(yàn),無(wú)論多少次實(shí)驗(yàn)产舞,都是比單線程快那么一丟丟的魂奥,這都要得益于那個(gè)耗時(shí)的操作,模糊了線程切換帶來(lái)的時(shí)間損耗易猫,畢竟不怎么耗時(shí)的操作來(lái)說(shuō)耻煤,單線程其實(shí)是絕對(duì)比多線程快的。

細(xì)心的同學(xué)會(huì)發(fā)現(xiàn)准颓,好像這個(gè)并不符合論文里面的標(biāo)準(zhǔn)吖哈蝇。嗯吶是的,這個(gè)只是我心血來(lái)潮寫(xiě)的簡(jiǎn)單版本攘已。問(wèn)題有諸如炮赦,我們上面的map操作好像不能變成其他類型吖,怎么實(shí)現(xiàn)WordCount呢样勃?以及Driver好像沒(méi)有進(jìn)行任務(wù)切分和分發(fā)吖吠勘?好像也沒(méi)有suffle操作啊彤灶?好像整個(gè)過(guò)程也不是嚴(yán)格多線程的吖看幼,怎么辦呢批旺?下一次給大家分享一個(gè)更加完整的MapReduce幌陕。

希望大家都能在自己的機(jī)器上跑成功。源碼都在上面了我就不放鏈接了汽煮。

好了搏熄,如果有任務(wù)問(wèn)題請(qǐng)后臺(tái)留言棚唆,我會(huì)看的。如果對(duì)您有一點(diǎn)點(diǎn)的幫助或者啟發(fā)的話心例,幫忙轉(zhuǎn)發(fā)或者點(diǎn)個(gè)贊都是對(duì)我很大的支持喔宵凌,么么噠。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末止后,一起剝皮案震驚了整個(gè)濱河市瞎惫,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌译株,老刑警劉巖瓜喇,帶你破解...
    沈念sama閱讀 217,907評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異歉糜,居然都是意外死亡乘寒,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)匪补,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)伞辛,“玉大人,你說(shuō)我怎么就攤上這事夯缺≡槭希” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,298評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵踊兜,是天一觀的道長(zhǎng)瞧捌。 經(jīng)常有香客問(wèn)我,道長(zhǎng)润文,這世上最難降的妖魔是什么姐呐? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,586評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮典蝌,結(jié)果婚禮上曙砂,老公的妹妹穿的比我還像新娘。我一直安慰自己骏掀,他們只是感情好鸠澈,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,633評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著截驮,像睡著了一般笑陈。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上葵袭,一...
    開(kāi)封第一講書(shū)人閱讀 51,488評(píng)論 1 302
  • 那天涵妥,我揣著相機(jī)與錄音,去河邊找鬼坡锡。 笑死蓬网,一個(gè)胖子當(dāng)著我的面吹牛窒所,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播帆锋,決...
    沈念sama閱讀 40,275評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼吵取,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了锯厢?” 一聲冷哼從身側(cè)響起皮官,我...
    開(kāi)封第一講書(shū)人閱讀 39,176評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎实辑,沒(méi)想到半個(gè)月后臣疑,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,619評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡徙菠,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,819評(píng)論 3 336
  • 正文 我和宋清朗相戀三年讯沈,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片婿奔。...
    茶點(diǎn)故事閱讀 39,932評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡缺狠,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出萍摊,到底是詐尸還是另有隱情挤茄,我是刑警寧澤,帶...
    沈念sama閱讀 35,655評(píng)論 5 346
  • 正文 年R本政府宣布冰木,位于F島的核電站穷劈,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏踊沸。R本人自食惡果不足惜歇终,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,265評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望逼龟。 院中可真熱鬧评凝,春花似錦、人聲如沸腺律。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,871評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)匀钧。三九已至翎碑,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間之斯,已是汗流浹背日杈。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,994評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人达椰。 一個(gè)月前我還...
    沈念sama閱讀 48,095評(píng)論 3 370
  • 正文 我出身青樓翰蠢,卻偏偏與公主長(zhǎng)得像项乒,于是被迫代替她去往敵國(guó)和親啰劲。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,884評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 一檀何、多線程 說(shuō)明下線程的狀態(tài) java中的線程一共有 5 種狀態(tài)蝇裤。 NEW:這種情況指的是,通過(guò) New 關(guān)鍵字創(chuàng)...
    Java旅行者閱讀 4,680評(píng)論 0 44
  • 摘自:http://staticor.io/post/hadoop/2016-01-23hadoop-defini...
    wangliang938閱讀 593評(píng)論 0 1
  • MapReduce是一個(gè)數(shù)據(jù)處理的編程模型频鉴。這個(gè)模型很簡(jiǎn)單栓辜,但也不是簡(jiǎn)單到不能夠支持一些有用的語(yǔ)言。Hadoop能...
    單行線的旋律閱讀 1,518評(píng)論 0 2
  • MapReduce框架結(jié)構(gòu)## MapReduce是一個(gè)用于大規(guī)模數(shù)據(jù)處理的分布式計(jì)算模型MapReduce模型主...
    Bloo_m閱讀 3,751評(píng)論 0 4
  • 文/夏蓮 從前的馬策馬奔騰 淌得了湍急的流水 卻躍不過(guò)險(xiǎn)峻的山 騎馬的人總得上岸 從前的路總是被分割 多少人的相遇...
    周小錦閱讀 191評(píng)論 2 9