MapReduce是更好地利用并行計(jì)算資源來提升數(shù)據(jù)處理能力的重要算法瘫析,如今已被主流的大數(shù)據(jù)分析平臺實(shí)現(xiàn),成為了大數(shù)據(jù)批量處理的主力軍。利用前面介紹的Actor特性熙兔,其實(shí)我們也可以實(shí)現(xiàn)一個(gè)簡易的MapReduce。
利用AKKA Actor來實(shí)現(xiàn)MapReduce艾恼,天生就支持并行計(jì)算(利用遠(yuǎn)程Actor)與異步操作住涉。為了簡便起見,本例使用了本地的Actor實(shí)現(xiàn)了大數(shù)據(jù)世界的Hello World钠绍,即WordCounter舆声。
在編寫字?jǐn)?shù)統(tǒng)計(jì)器的MapReduce之前,我們需要先分辨職責(zé)柳爽,包括:
- 給定網(wǎng)頁地址媳握,獲取指定網(wǎng)頁的內(nèi)容
- 對網(wǎng)頁內(nèi)容進(jìn)行分詞
- 為每個(gè)單詞統(tǒng)計(jì)字?jǐn)?shù)
考慮到本文的中心主題是介紹響應(yīng)式編程與Actor模型,所以我們降低了案例難度磷脯,讀取的網(wǎng)頁內(nèi)容均為英文蛾找,并簡單地以空格作為分詞的標(biāo)志。由于我們需要接受客戶端的字?jǐn)?shù)統(tǒng)計(jì)分析請求赵誓,那么要完成前面提到的職責(zé)打毛,至少需要四個(gè)Actor:
- WordCounterClient:發(fā)送數(shù)據(jù)分析請求
- WordCounterServer:模擬服務(wù)端,接收數(shù)據(jù)分析請求俩功,并最終將統(tǒng)計(jì)后的結(jié)果返回給WordCounterClient
- PageContentFetcher:獲取網(wǎng)頁內(nèi)容
- ContentWordCounter:網(wǎng)頁內(nèi)容的字?jǐn)?shù)統(tǒng)計(jì)器
為了盡可能地提升性能隘冲,對于獲取網(wǎng)頁內(nèi)容以及統(tǒng)計(jì)內(nèi)容字?jǐn)?shù)的統(tǒng)計(jì)工作,我們都需要多個(gè)Actor同時(shí)執(zhí)行绑雄。然而展辞,由于每個(gè)Actor處理消息都是以異步形式進(jìn)行,我們該怎樣才能知道并發(fā)處理的請求都得到了處理万牺?針對字?jǐn)?shù)統(tǒng)計(jì)器的案例而言罗珍,我們還需要將每個(gè)Actor統(tǒng)計(jì)獲得的字?jǐn)?shù)再進(jìn)行reduce,同樣也需要知道是否每條消息都已經(jīng)處理完畢脚粟,并獲得處理的結(jié)果覆旱。
AKKA通過Aggregator
特性實(shí)現(xiàn)了Aggregator模式,可以很好地解決剛才提到的問題核无。它通過引入一個(gè)單獨(dú)的聚合器Actor扣唱,用以聚合多個(gè)Actor產(chǎn)生的數(shù)據(jù),并根據(jù)這些Actor對消息的Response更新狀態(tài)。
假定ContentWordCounter分析后的結(jié)果如下代碼所示:
case class AnalysisResult(wordToCount: Seq[(String, Long)])
那么噪沙,Aggregator就可以通過在其內(nèi)部維持一個(gè)分析結(jié)果集(即前面所謂的狀態(tài)炼彪,代碼中的analysisResults
),每收到一個(gè)Actor的Response正歼,就將結(jié)果塞入到這個(gè)結(jié)果集(更新狀態(tài))中辐马,并判斷結(jié)果集的長度是否等于要處理的網(wǎng)頁數(shù),以此作為消息是否處理完畢的條件局义。整個(gè)Aggregator的實(shí)現(xiàn)如下:
class WordCounterAggregator extends Actor with Aggregator { expectOnce {
case StartAggregation(target, urls) =>
new Handler(target, urls, sender)
case _ =>
sender ! BadCommand
context stop self
}
class Handler(target: ActorRef, urls: Seq[String], originalSender: ActorRef) {
var analysisResults = Set.empty[AnalysisResult]
context.system.scheduler.scheduleOnce(10.seconds, self, Timeout)
expect {
case Timeout =>
respondIfDone(respondAnyway = true)
}
urls.foreach { uri =>
target ! FetchPageContent(uri)
expectOnce {
case result: AnalysisResult =>
analysisResults += result
respondIfDone()
}
}
def respondIfDone(respondAnyway: Boolean = false) = {
import MapSeqImplicits._
if (respondAnyway || analysisResults.size == urls.size) {
val wordToCounts = analysisResults.flatMap(_.wordToCount).reduceByKey(_ + _)
originalSender ! AggregatedAnalysisResult(wordToCounts)
context stop self
}
}
}
}
WordCounterAggregator繼承了Aggregator
特性喜爷,這個(gè)特性已經(jīng)對Actor的receive
進(jìn)行了處理,使得繼承該特性的Actor不需要重寫receive
方法萄唇。Aggregator
特性提供了expect
檩帐、expectOnce
與unexpect
,用以接收期待處理的消息另萤。
在Aggregator內(nèi)部轿塔,其實(shí)維持了一個(gè)expectList,用以存放expect等函數(shù)所接收的偏函數(shù)仲墨。expect
與expectOnce
都是將偏函數(shù)放入到這個(gè)列表中勾缭,只是后者只留存一次(通過permanent標(biāo)志來判定),一旦匹配了目养,就會將該偏函數(shù)移除俩由,而expect
則不會;至于unexpect
癌蚁,就是expect
的反操作幻梯,用于將偏函數(shù)從列表中移除。
自定義的respondIfDone
方法會在滿足聚合條件時(shí)努释,對分析結(jié)果進(jìn)行reduce運(yùn)算碘梢。Scala的集合庫自身并沒有提供reduceByKey()
函數(shù),是我模仿Spark的RDD自行編寫的隱式轉(zhuǎn)換方法:
object MapSeqImplicits {
implicit class MapSeqWrapper(wordToCount: Iterable[(String, Long)]) {
def reduceByKey(f: (Long, Long) => Long): Seq[(String, Long)] = {
wordToCount.groupBy(_._1).map {
case (word, counts) => (word, counts.map(_._2).foldLeft(0L)(f))
}.toSeq
}
}
}
因?yàn)橐肓艘粋€(gè)Aggregator伐蒂,消息的處理以及Actor之間的協(xié)作就變得相對復(fù)雜煞躬。要進(jìn)行響應(yīng)式編程,其中一個(gè)關(guān)鍵就是要理清楚數(shù)據(jù)(或消息)的流動方向逸邦,并分辨每個(gè)數(shù)據(jù)處理器的職責(zé)恩沛。我們可以借助類似狀態(tài)圖之類的可視化工具幫助我們分析數(shù)據(jù)流動模型。下圖是本例的一個(gè)消息處理模型缕减,它同時(shí)還表達(dá)了Actor之間的協(xié)作關(guān)系雷客。
執(zhí)行字?jǐn)?shù)統(tǒng)計(jì)的流程如下所示:
- 首先,WordCounterClient接收StartAnalysisWebPages消息桥狡,準(zhǔn)備分析網(wǎng)頁搅裙;
- 由于Client沒有這個(gè)“能力”完成分析任務(wù)皱卓,于是求助于WordCounterServer,并發(fā)起FetchWebPages消息部逮,要求獲取網(wǎng)頁內(nèi)容娜汁;
- WordCounterServer同樣是個(gè)憊懶貨色,什么都不干甥啄,轉(zhuǎn)手就將這件事情轉(zhuǎn)交給別的Actor了,所以他其實(shí)就是一個(gè)前臺接待員炬搭。如果不需要聚合蜈漓,它收到的FetchWebPages其實(shí)應(yīng)該交給PageContentFetcher,但現(xiàn)在須得經(jīng)由WordCounterAggregator來分配請求宫盔;所以從另外一個(gè)角度來看融虽,這個(gè)Aggregator相當(dāng)于是一個(gè)Mediator;
- 由于Aggregator是一個(gè)Mediator灼芭,因此它會協(xié)調(diào)多個(gè)PageContentFetcher與ContentWordCounter來并行完成任務(wù)有额;因而Aggregator和這兩個(gè)Actor之間是一對多關(guān)系,而PageContentFetcher與ContentWordCounter則屬于一對一關(guān)系彼绷。當(dāng)PageContentFetcher獲得了網(wǎng)頁內(nèi)容后巍佑,就通過CountPageContent消息,將統(tǒng)計(jì)字?jǐn)?shù)的職責(zé)交給了ContentWordCounter寄悯;
- ContentWordCounter在計(jì)算完當(dāng)前網(wǎng)頁的字?jǐn)?shù)后萤衰,會將分析結(jié)果AnalysisResult返回給Aggregator,并由其完成分析結(jié)果的reduce運(yùn)算猜旬,并返回AggregatedAnalysisResult結(jié)果給Server脆栋;
- 最后,Server再將Client需要的最終結(jié)果返回給Client洒擦。
由于Aggregator需要協(xié)調(diào)多個(gè)Fetcher與Counter的Actor椿争,以支持異步并行計(jì)算(本例實(shí)則是并發(fā)計(jì)算)的需要,我為其引入了AKKA提供的Router Actor熟嫩。通過Router可以創(chuàng)建一個(gè)容器Actor秦踪,內(nèi)部管理多個(gè)worker rootees,并提供了RoundRobin掸茅、Random洋侨、Boardcast等多種路由形式,用戶可以根據(jù)Actor的負(fù)載情況選擇不同的路由方式倦蚪。
這里希坚,我選擇使用RoundRobin以硬編碼的形式創(chuàng)建了Router Actor:
val analyst: ActorRef = context.actorOf(Props(new ContentWordCounter(aggregator)), "PageContentAnalyst")
val fetchers = context.actorOf(RoundRobinPool(4).props(Props(new PageContentFetcher(analyst))), "fetchers")
整體來看,PageContentFetcher與ContentWordCounter其實(shí)扮演的是map角色陵且,并通過Router Actor來實(shí)現(xiàn)map工作的異步并發(fā)處理裁僧;而WordCounterAggregator則扮演了reduce角色个束,它負(fù)責(zé)將收到的多個(gè)分析結(jié)果進(jìn)行reduce運(yùn)算。
由于缺乏對MapReduce算法必要的封裝聊疲,用AKKA Actor實(shí)現(xiàn)的MapReduce顯得比較復(fù)雜茬底,但卻較好地體現(xiàn)了響應(yīng)式編程的異步數(shù)據(jù)流本質(zhì)。
當(dāng)我們在使用Actor來處理異步消息傳遞時(shí)获洲,當(dāng)業(yè)務(wù)漸趨復(fù)雜后阱表,我們常常會迷失在復(fù)雜的消息傳遞網(wǎng)中而無法自拔。為了保持清醒的頭腦贡珊,需要時(shí)刻謹(jǐn)記Actor的職責(zé)最爬。以我的經(jīng)驗(yàn),我們應(yīng)該考慮:
- 從Actor扮演的角色來思考它應(yīng)該接收什么樣的消息门岔;
- Actor對消息的處理一定要滿足單一職責(zé)原則爱致,正確地履行職責(zé),也當(dāng)在合適時(shí)候正確地轉(zhuǎn)移職責(zé)寒随;
- 運(yùn)用狀態(tài)圖幫助思考Actor與其他Actor之間的協(xié)作關(guān)系糠悯;
- 正確理解AKKA Actor的消息發(fā)送機(jī)制,當(dāng)在Actor內(nèi)部再次發(fā)送消息時(shí)妻往,是由sender發(fā)送互艾,還是通過消息傳遞過來的actorRef對象發(fā)送消息。
要完成多個(gè)網(wǎng)頁的字?jǐn)?shù)統(tǒng)計(jì)功能讯泣,除了使用稍顯復(fù)雜的Actor模式之外忘朝,我們也可以直接使用scala提供的并行集合來完成,代碼更為精簡:
val words = for {
url <- urls.par
line <- scala.io.Source.fromURL(url).getLines()
word <- line.split(" ")
} yield (word)
val analysisResult = words.map(w => (w, 1L)).reduceByKey(_ + _)
在業(yè)務(wù)相對簡單判帮,并不需要非阻塞消息處理局嘁,也沒有可伸縮性需求的時(shí)候,若能恰當(dāng)運(yùn)用scala自身提供的par集合會是好的選擇晦墙。
事實(shí)上悦昵,為了實(shí)現(xiàn)字?jǐn)?shù)統(tǒng)計(jì)的功能,采用AKKA提供的Aggregator確乎有些過度晌畅。它更擅長于通過將職責(zé)分治與合理運(yùn)用基于消息的Actor模式來完成更為復(fù)雜的響應(yīng)式系統(tǒng)但指。WordCounter的例子不外乎是我為了更好地解釋Aggregator模式而給出的一個(gè)Demo罷了。
本文以及《利用Actor實(shí)現(xiàn)管道過濾器模式》兩篇文章給出的源代碼抗楔,可以在我的github上獲得:https://github.com/agiledon/reactiveprogramming.git棋凳。