運(yùn)用Aggregator模式實(shí)現(xiàn)MapReduce

MapReduce是更好地利用并行計(jì)算資源來提升數(shù)據(jù)處理能力的重要算法瘫析,如今已被主流的大數(shù)據(jù)分析平臺實(shí)現(xiàn),成為了大數(shù)據(jù)批量處理的主力軍。利用前面介紹的Actor特性熙兔,其實(shí)我們也可以實(shí)現(xiàn)一個(gè)簡易的MapReduce。

利用AKKA Actor來實(shí)現(xiàn)MapReduce艾恼,天生就支持并行計(jì)算(利用遠(yuǎn)程Actor)與異步操作住涉。為了簡便起見,本例使用了本地的Actor實(shí)現(xiàn)了大數(shù)據(jù)世界的Hello World钠绍,即WordCounter舆声。

在編寫字?jǐn)?shù)統(tǒng)計(jì)器的MapReduce之前,我們需要先分辨職責(zé)柳爽,包括:

  • 給定網(wǎng)頁地址媳握,獲取指定網(wǎng)頁的內(nèi)容
  • 對網(wǎng)頁內(nèi)容進(jìn)行分詞
  • 為每個(gè)單詞統(tǒng)計(jì)字?jǐn)?shù)

考慮到本文的中心主題是介紹響應(yīng)式編程與Actor模型,所以我們降低了案例難度磷脯,讀取的網(wǎng)頁內(nèi)容均為英文蛾找,并簡單地以空格作為分詞的標(biāo)志。由于我們需要接受客戶端的字?jǐn)?shù)統(tǒng)計(jì)分析請求赵誓,那么要完成前面提到的職責(zé)打毛,至少需要四個(gè)Actor:

  • WordCounterClient:發(fā)送數(shù)據(jù)分析請求
  • WordCounterServer:模擬服務(wù)端,接收數(shù)據(jù)分析請求俩功,并最終將統(tǒng)計(jì)后的結(jié)果返回給WordCounterClient
  • PageContentFetcher:獲取網(wǎng)頁內(nèi)容
  • ContentWordCounter:網(wǎng)頁內(nèi)容的字?jǐn)?shù)統(tǒng)計(jì)器

為了盡可能地提升性能隘冲,對于獲取網(wǎng)頁內(nèi)容以及統(tǒng)計(jì)內(nèi)容字?jǐn)?shù)的統(tǒng)計(jì)工作,我們都需要多個(gè)Actor同時(shí)執(zhí)行绑雄。然而展辞,由于每個(gè)Actor處理消息都是以異步形式進(jìn)行,我們該怎樣才能知道并發(fā)處理的請求都得到了處理万牺?針對字?jǐn)?shù)統(tǒng)計(jì)器的案例而言罗珍,我們還需要將每個(gè)Actor統(tǒng)計(jì)獲得的字?jǐn)?shù)再進(jìn)行reduce,同樣也需要知道是否每條消息都已經(jīng)處理完畢脚粟,并獲得處理的結(jié)果覆旱。

AKKA通過Aggregator特性實(shí)現(xiàn)了Aggregator模式,可以很好地解決剛才提到的問題核无。它通過引入一個(gè)單獨(dú)的聚合器Actor扣唱,用以聚合多個(gè)Actor產(chǎn)生的數(shù)據(jù),并根據(jù)這些Actor對消息的Response更新狀態(tài)

假定ContentWordCounter分析后的結(jié)果如下代碼所示:

case class AnalysisResult(wordToCount: Seq[(String, Long)])

那么噪沙,Aggregator就可以通過在其內(nèi)部維持一個(gè)分析結(jié)果集(即前面所謂的狀態(tài)炼彪,代碼中的analysisResults),每收到一個(gè)Actor的Response正歼,就將結(jié)果塞入到這個(gè)結(jié)果集(更新狀態(tài))中辐马,并判斷結(jié)果集的長度是否等于要處理的網(wǎng)頁數(shù),以此作為消息是否處理完畢的條件局义。整個(gè)Aggregator的實(shí)現(xiàn)如下:

class WordCounterAggregator extends Actor with Aggregator {  expectOnce {
    case StartAggregation(target, urls) =>
      new Handler(target, urls, sender)
    case _ =>
      sender ! BadCommand
      context stop self
  }
  class Handler(target: ActorRef, urls: Seq[String], originalSender: ActorRef) {
    var analysisResults = Set.empty[AnalysisResult]
    context.system.scheduler.scheduleOnce(10.seconds, self, Timeout)
    expect {
      case Timeout =>
        respondIfDone(respondAnyway = true)
    }
    urls.foreach { uri =>
      target ! FetchPageContent(uri)
      expectOnce {
        case result: AnalysisResult =>
          analysisResults += result
          respondIfDone()
      }
    }
    def respondIfDone(respondAnyway: Boolean = false) = {
      import MapSeqImplicits._
      if (respondAnyway || analysisResults.size == urls.size) {
        val wordToCounts = analysisResults.flatMap(_.wordToCount).reduceByKey(_ + _)
        originalSender ! AggregatedAnalysisResult(wordToCounts)
        context stop self
      }
    }
  }
}

WordCounterAggregator繼承了Aggregator特性喜爷,這個(gè)特性已經(jīng)對Actor的receive進(jìn)行了處理,使得繼承該特性的Actor不需要重寫receive方法萄唇。Aggregator特性提供了expect檩帐、expectOnceunexpect,用以接收期待處理的消息另萤。

在Aggregator內(nèi)部轿塔,其實(shí)維持了一個(gè)expectList,用以存放expect等函數(shù)所接收的偏函數(shù)仲墨。expectexpectOnce都是將偏函數(shù)放入到這個(gè)列表中勾缭,只是后者只留存一次(通過permanent標(biāo)志來判定),一旦匹配了目养,就會將該偏函數(shù)移除俩由,而expect則不會;至于unexpect癌蚁,就是expect的反操作幻梯,用于將偏函數(shù)從列表中移除。

自定義的respondIfDone方法會在滿足聚合條件時(shí)努释,對分析結(jié)果進(jìn)行reduce運(yùn)算碘梢。Scala的集合庫自身并沒有提供reduceByKey()函數(shù),是我模仿Spark的RDD自行編寫的隱式轉(zhuǎn)換方法:

object MapSeqImplicits {
  implicit class MapSeqWrapper(wordToCount: Iterable[(String, Long)]) {
    def reduceByKey(f: (Long, Long) => Long): Seq[(String, Long)] = {
      wordToCount.groupBy(_._1).map {
        case (word, counts) => (word, counts.map(_._2).foldLeft(0L)(f))      
      }.toSeq
    }
 }
}

因?yàn)橐肓艘粋€(gè)Aggregator伐蒂,消息的處理以及Actor之間的協(xié)作就變得相對復(fù)雜煞躬。要進(jìn)行響應(yīng)式編程,其中一個(gè)關(guān)鍵就是要理清楚數(shù)據(jù)(或消息)的流動方向逸邦,并分辨每個(gè)數(shù)據(jù)處理器的職責(zé)恩沛。我們可以借助類似狀態(tài)圖之類的可視化工具幫助我們分析數(shù)據(jù)流動模型。下圖是本例的一個(gè)消息處理模型缕减,它同時(shí)還表達(dá)了Actor之間的協(xié)作關(guān)系雷客。

Actor之間的協(xié)作

執(zhí)行字?jǐn)?shù)統(tǒng)計(jì)的流程如下所示:

  • 首先,WordCounterClient接收StartAnalysisWebPages消息桥狡,準(zhǔn)備分析網(wǎng)頁搅裙;
  • 由于Client沒有這個(gè)“能力”完成分析任務(wù)皱卓,于是求助于WordCounterServer,并發(fā)起FetchWebPages消息部逮,要求獲取網(wǎng)頁內(nèi)容娜汁;
  • WordCounterServer同樣是個(gè)憊懶貨色,什么都不干甥啄,轉(zhuǎn)手就將這件事情轉(zhuǎn)交給別的Actor了,所以他其實(shí)就是一個(gè)前臺接待員炬搭。如果不需要聚合蜈漓,它收到的FetchWebPages其實(shí)應(yīng)該交給PageContentFetcher,但現(xiàn)在須得經(jīng)由WordCounterAggregator來分配請求宫盔;所以從另外一個(gè)角度來看融虽,這個(gè)Aggregator相當(dāng)于是一個(gè)Mediator;
  • 由于Aggregator是一個(gè)Mediator灼芭,因此它會協(xié)調(diào)多個(gè)PageContentFetcher與ContentWordCounter來并行完成任務(wù)有额;因而Aggregator和這兩個(gè)Actor之間是一對多關(guān)系,而PageContentFetcher與ContentWordCounter則屬于一對一關(guān)系彼绷。當(dāng)PageContentFetcher獲得了網(wǎng)頁內(nèi)容后巍佑,就通過CountPageContent消息,將統(tǒng)計(jì)字?jǐn)?shù)的職責(zé)交給了ContentWordCounter寄悯;
  • ContentWordCounter在計(jì)算完當(dāng)前網(wǎng)頁的字?jǐn)?shù)后萤衰,會將分析結(jié)果AnalysisResult返回給Aggregator,并由其完成分析結(jié)果的reduce運(yùn)算猜旬,并返回AggregatedAnalysisResult結(jié)果給Server脆栋;
  • 最后,Server再將Client需要的最終結(jié)果返回給Client洒擦。

由于Aggregator需要協(xié)調(diào)多個(gè)Fetcher與Counter的Actor椿争,以支持異步并行計(jì)算(本例實(shí)則是并發(fā)計(jì)算)的需要,我為其引入了AKKA提供的Router Actor熟嫩。通過Router可以創(chuàng)建一個(gè)容器Actor秦踪,內(nèi)部管理多個(gè)worker rootees,并提供了RoundRobin掸茅、Random洋侨、Boardcast等多種路由形式,用戶可以根據(jù)Actor的負(fù)載情況選擇不同的路由方式倦蚪。

這里希坚,我選擇使用RoundRobin以硬編碼的形式創(chuàng)建了Router Actor:

val analyst: ActorRef = context.actorOf(Props(new ContentWordCounter(aggregator)), "PageContentAnalyst") 
val fetchers = context.actorOf(RoundRobinPool(4).props(Props(new PageContentFetcher(analyst))), "fetchers")

整體來看,PageContentFetcher與ContentWordCounter其實(shí)扮演的是map角色陵且,并通過Router Actor來實(shí)現(xiàn)map工作的異步并發(fā)處理裁僧;而WordCounterAggregator則扮演了reduce角色个束,它負(fù)責(zé)將收到的多個(gè)分析結(jié)果進(jìn)行reduce運(yùn)算。

由于缺乏對MapReduce算法必要的封裝聊疲,用AKKA Actor實(shí)現(xiàn)的MapReduce顯得比較復(fù)雜茬底,但卻較好地體現(xiàn)了響應(yīng)式編程的異步數(shù)據(jù)流本質(zhì)。

當(dāng)我們在使用Actor來處理異步消息傳遞時(shí)获洲,當(dāng)業(yè)務(wù)漸趨復(fù)雜后阱表,我們常常會迷失在復(fù)雜的消息傳遞網(wǎng)中而無法自拔。為了保持清醒的頭腦贡珊,需要時(shí)刻謹(jǐn)記Actor的職責(zé)最爬。以我的經(jīng)驗(yàn),我們應(yīng)該考慮:

  • 從Actor扮演的角色來思考它應(yīng)該接收什么樣的消息门岔;
  • Actor對消息的處理一定要滿足單一職責(zé)原則爱致,正確地履行職責(zé),也當(dāng)在合適時(shí)候正確地轉(zhuǎn)移職責(zé)寒随;
  • 運(yùn)用狀態(tài)圖幫助思考Actor與其他Actor之間的協(xié)作關(guān)系糠悯;
  • 正確理解AKKA Actor的消息發(fā)送機(jī)制,當(dāng)在Actor內(nèi)部再次發(fā)送消息時(shí)妻往,是由sender發(fā)送互艾,還是通過消息傳遞過來的actorRef對象發(fā)送消息。

要完成多個(gè)網(wǎng)頁的字?jǐn)?shù)統(tǒng)計(jì)功能讯泣,除了使用稍顯復(fù)雜的Actor模式之外忘朝,我們也可以直接使用scala提供的并行集合來完成,代碼更為精簡:

val words = for {
 url <- urls.par
 line <- scala.io.Source.fromURL(url).getLines()
 word <- line.split(" ")
} yield (word)
val analysisResult = words.map(w => (w, 1L)).reduceByKey(_ + _)

在業(yè)務(wù)相對簡單判帮,并不需要非阻塞消息處理局嘁,也沒有可伸縮性需求的時(shí)候,若能恰當(dāng)運(yùn)用scala自身提供的par集合會是好的選擇晦墙。

事實(shí)上悦昵,為了實(shí)現(xiàn)字?jǐn)?shù)統(tǒng)計(jì)的功能,采用AKKA提供的Aggregator確乎有些過度晌畅。它更擅長于通過將職責(zé)分治與合理運(yùn)用基于消息的Actor模式來完成更為復(fù)雜的響應(yīng)式系統(tǒng)但指。WordCounter的例子不外乎是我為了更好地解釋Aggregator模式而給出的一個(gè)Demo罷了。


本文以及《利用Actor實(shí)現(xiàn)管道過濾器模式》兩篇文章給出的源代碼抗楔,可以在我的github上獲得:https://github.com/agiledon/reactiveprogramming.git棋凳。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市连躏,隨后出現(xiàn)的幾起案子剩岳,更是在濱河造成了極大的恐慌,老刑警劉巖入热,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拍棕,死亡現(xiàn)場離奇詭異晓铆,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)绰播,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門骄噪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蠢箩,你說我怎么就攤上這事链蕊。” “怎么了谬泌?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵滔韵,是天一觀的道長。 經(jīng)常有香客問我呵萨,道長奏属,這世上最難降的妖魔是什么跨跨? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任潮峦,我火速辦了婚禮,結(jié)果婚禮上勇婴,老公的妹妹穿的比我還像新娘忱嘹。我一直安慰自己,他們只是感情好耕渴,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布拘悦。 她就那樣靜靜地躺著,像睡著了一般橱脸。 火紅的嫁衣襯著肌膚如雪础米。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天添诉,我揣著相機(jī)與錄音屁桑,去河邊找鬼。 笑死栏赴,一個(gè)胖子當(dāng)著我的面吹牛蘑斧,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播须眷,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼为黎,長吁一口氣:“原來是場噩夢啊……” “哼嘴脾!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤泉沾,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后亏娜,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡求橄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了葡公。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片罐农。...
    茶點(diǎn)故事閱讀 39,779評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖催什,靈堂內(nèi)的尸體忽然破棺而出涵亏,到底是詐尸還是另有隱情,我是刑警寧澤蒲凶,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布气筋,位于F島的核電站,受9級特大地震影響旋圆,放射性物質(zhì)發(fā)生泄漏宠默。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一灵巧、第九天 我趴在偏房一處隱蔽的房頂上張望搀矫。 院中可真熱鬧,春花似錦刻肄、人聲如沸瓤球。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽卦羡。三九已至,卻和暖如春麦到,著一層夾襖步出監(jiān)牢的瞬間绿饵,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工瓶颠, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留拟赊,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓步清,卻偏偏與公主長得像要门,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子廓啊,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 《基于Actor的響應(yīng)式編程》計(jì)劃分為三部分欢搜,第一部分剖析響應(yīng)式編程的本質(zhì)思想,為大家介紹何謂響應(yīng)式編程(Reac...
    _張逸_閱讀 2,083評論 0 7
  • Actor系統(tǒng)的實(shí)體 在Actor系統(tǒng)中谴轮,actor之間具有樹形的監(jiān)管結(jié)構(gòu)炒瘟,并且actor可以跨多個(gè)網(wǎng)絡(luò)節(jié)點(diǎn)進(jìn)行透...
    JasonDing閱讀 3,341評論 2 6
  • Akka幫助您構(gòu)建可靠的應(yīng)用程序在一臺機(jī)器上使用多個(gè)處理器核心(“擴(kuò)大”)或分布在計(jì)算機(jī)網(wǎng)絡(luò)(“擴(kuò)張”)。關(guān)鍵的抽...
    兒哥欠三百首閱讀 2,715評論 0 0
  • Actor作為Akka中最核心的結(jié)構(gòu)第步,其在Akka中的中的組織結(jié)構(gòu)也至關(guān)重要疮装,今天我們就來講講Akka中Actor...
    三分青年閱讀 3,401評論 0 14
  • 我們并非離群索居缘琅,但無時(shí)不刻在感受孤獨(dú);我們一方面渴望和他人的交流廓推,另一方面卻又希望一個(gè)人獨(dú)處刷袍; 我們生活在時(shí)刻在...
    百順君閱讀 373評論 0 1