純函數(shù)式的并行計(jì)算(1)

選擇數(shù)據(jù)類型和函數(shù)
“創(chuàng)建并行計(jì)算”具體是指什么?我們可以從一個(gè)相對簡單的例子入手——求一組整數(shù)的和。例如下面就是利用左折疊的方法計(jì)算求和:

  def sum(ints: Seq[Int]): Int =
    ints.foldLeft(0)(_ + _)

除了疊加算法, 還有一個(gè)分治的算法,代碼如下:

  def sum1(ints: IndexedSeq[Int]): Int =
    if (ints.length < 1)
      ints.headOption.getOrElse(0)
    else {
      val (l, r) = ints.splitAt(ints.length)
      sum1(l) + sum1(r)
    }

我們使用splitAt函數(shù)將序列一分為二,并各自遞歸求和盲再,最后合并它們的結(jié)果。這種實(shí)現(xiàn)可以實(shí)現(xiàn)并行化瓣铣,即對兩部分的求和可以同時(shí)進(jìn)行答朋。
一種用于并行計(jì)算的數(shù)據(jù)類型
用于表示并行計(jì)算的任何數(shù)據(jù)類型都包含一個(gè)結(jié)果,這個(gè)結(jié)果是一個(gè)有意義的類型(這里是Int)棠笑,且能夠獲取绿映。為此我們設(shè)計(jì)一個(gè)這樣的數(shù)據(jù)類型Par[A](Par是Paralle的簡寫),它就像一個(gè)裝有結(jié)果的容器腐晾,并具備下面的方法:

trait Par[A] {

}

object Par {

  //接受一個(gè)未求值的A叉弦,返回結(jié)果將會在另一個(gè)線程中執(zhí)行
  def unit[A](a: => A): Par[A] = ???

  //從并行計(jì)算中抽取結(jié)果
  def get[A](pa: Par[A]): A = ???
}

現(xiàn)在我們用自定義的數(shù)據(jù)類型更新求和算法:

  import Par._

  def sum2(ints: IndexedSeq[Int]): Int =
    if (ints.length <= 1)
      ints.headOption.getOrElse(0)
    else {
      val (l, r) = ints.splitAt(ints.length)
      val pl = unit(sum2(l))
      val pr = unit(sum2(r))
      get(pl) + get(pr)
    }

現(xiàn)在我們面臨一個(gè)選擇, 是讓unit在一個(gè)獨(dú)立的邏輯線程中立即求值藻糖,還是等到get被調(diào)用的時(shí)候再求值淹冰。unit立即求值會導(dǎo)致程序無法并行計(jì)算,但是unit返回一個(gè)代表一步計(jì)算的Par[Int]巨柒,那在調(diào)用get的時(shí)候無法避免產(chǎn)生副作用樱拴。如何才能避免unit和get的缺陷呢?
組合并行計(jì)算
我們可以不調(diào)用get函數(shù)洋满, 那么函數(shù)的代碼將如下:

  def sum3(ints: IndexedSeq[Int]): Par[Int] =
    if (ints.length <= 1)
      unit(ints.headOption.getOrElse(0))
    else {
      val (l, r) = ints.splitAt(ints.length)
      map2(sum3(l), sum3(r))(_ + _)
    }

練習(xí) 7.1
Par.map2是一個(gè)新的高階函數(shù)晶乔,用于組合兩個(gè)并行計(jì)算的結(jié)果。實(shí)現(xiàn)map2函數(shù):

  def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] = {
    val a = get(pa)
    val b = get(pb)
    unit(f(a, b))
  }

顯性分流
目前的API沒有明確的表明何時(shí)應(yīng)該將計(jì)算從主線程中分流出去牺勾,換句話說程序員也不知道在哪兒會發(fā)生并行計(jì)算正罢。如何讓分流更加明確呢?我們引入另一個(gè)函數(shù)來做:

  //將par[A]分配另一個(gè)獨(dú)立的線程中去運(yùn)行
  def folk[A](pa: => Par[A]): Par[A] = ???

讓我們來重寫sum函數(shù):

  def sum4(ints: IndexedSeq[Int]): Par[Int] =
    if (ints.length <= 1)
      unit(ints.headOption.getOrElse(0))
    else {
      val (l, r) = ints.splitAt(ints.length)
      map2(folk(sum3(l)), folk(sum3(r)))(_ + _)
    }

對于length <= 1情況我們并不需要folk到一個(gè)獨(dú)立線程中計(jì)算驻民。
現(xiàn)在回到unit是嚴(yán)格還是惰性的問題翻具,有了folk履怯,即便unit是嚴(yán)格也不會有什么損失。至于非嚴(yán)格版本我們叫它lazyUnit吧:

  //接受一個(gè)已求值的A裆泳,返回結(jié)果將會在另一個(gè)線程中執(zhí)行
  def unit[A](a: A): Par[A] = ???
  //接受一個(gè)未求值的A叹洲,返回結(jié)果將會在另一個(gè)線程中執(zhí)行
  def lazyUnit[A](a: => A): Par[A] = folk(unit(a))

到此,我們看出Par只是一個(gè)值的函數(shù)工禾,表明并行計(jì)算运提。而實(shí)際執(zhí)行并行計(jì)算的是get函數(shù),所以我們將get函數(shù)改名為run函數(shù)闻葵,表明這個(gè)并行計(jì)算實(shí)際執(zhí)行的地方糙捺。

  //從并行計(jì)算中抽取結(jié)果
  def run[A](pa: Par[A]): A = ???

確定表現(xiàn)形式
經(jīng)過各種思考和選擇之后,我們有了下面大致的API笙隙。

  //接受一個(gè)已求值的A,返回結(jié)果將會在另一個(gè)線程中執(zhí)行
  def unit[A](a: A): Par[A] = ???
  //接受一個(gè)未求值的A坎缭,返回結(jié)果將會在另一個(gè)線程中執(zhí)行
  def lazyUnit[A](a: => A): Par[A] = folk(unit(a))
  //從并行計(jì)算中抽取結(jié)果
  def run[A](pa: Par[A]): A = ???
  //將par[A]分配另一個(gè)獨(dú)立的線程中去運(yùn)行
  def folk[A](pa: => Par[A]): Par[A] = ???
  
  def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] = {
    val a = run(pa)
    val b = run(pb)
    unit(f(a, b))
  }

練習(xí) 7.2
在繼續(xù)之前竟痰,我們盡可能實(shí)現(xiàn)API中的函數(shù)
如上代碼
讓我們根據(jù)run函數(shù)反推Par類型,讓我們試著假設(shè)run可以訪問一個(gè)ExecutorService掏呼,看能不能搞清Par的樣子:

  def run[A](s: ExecutorService)(pa: Par[A]): A = ???

最簡單的莫過于坏快,Par[A]是ExecutorService => A,當(dāng)然這也未免太簡單了憎夷,為此Par[A]莽鸿,應(yīng)該是ExecutorService => Future[A],而run直接返回Future:

  type Par[A] = ExecutorService => Future[A]
  def run[A](s: ExecutorService)(pa: Par[A]): Future[A] = pa(s)

完善API
既然有了Par的表現(xiàn)形式拾给,不妨就簡單直接點(diǎn)祥得,基于Par的表現(xiàn)類型最簡單的實(shí)現(xiàn):

object Par {

  import java.util.concurrent.{ExecutorService, Future, Callable}
  
  type Par[A] = ExecutorService => Future[A]

  private case class UnitFuture[A](a: A) extends Future[A] {
    override def isCancelled: Boolean = false

    override def get(): A = a

    override def get(timeout: Long, unit: TimeUnit): A = a

    override def cancel(mayInterruptIfRunning: Boolean): Boolean = false

    override def isDone: Boolean = true
  }
  
  //接受一個(gè)已求值的A,返回結(jié)果將會在另一個(gè)線程中執(zhí)行
  def unit[A](a: A): Par[A] = es => UnitFuture(a)
  
  //接受一個(gè)未求值的A蒋得,返回結(jié)果將會在另一個(gè)線程中執(zhí)行
  def lazyUnit[A](a: => A): Par[A] = folk(unit(a))
  
  //從并行計(jì)算中抽取結(jié)果
  def run[A](s: ExecutorService)(pa: Par[A]): Future[A] = pa(s)
  
  //將par[A]分配另一個(gè)獨(dú)立的線程中去運(yùn)行
  def folk[A](pa: => Par[A]): Par[A] = es => {
    es.submit(new Callable[A] {
      override def call(): A = pa(es).get()
    })
  }

  def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] = 
    es => {
      val af = pa(es)
      val bf = pb(es)
      UnitFuture(f(af.get(), bf.get()))
    }
  
}

練習(xí)7.3
改進(jìn)Map2的實(shí)現(xiàn)级及,支持超時(shí)設(shè)置。

  def map2[A, B, C](pa: Par[A], pb: Par[B], 
                    timeout: Long, timeUnit: TimeUnit)(f: (A, B) => C): Par[C] =
    es => {
      val af = pa(es)
      val bf = pb(es)
      val a = af.get(timeout, timeUnit)
      val b = bf.get(timeout, timeUnit)
      UnitFuture(f(a, b))
    }

練習(xí) 7.4
使用lazyUnit寫一個(gè)函數(shù)將另一個(gè)函數(shù)A => B轉(zhuǎn)換為一個(gè)一步計(jì)算

  def asyncF[A, B](f: A => B): A => Par[B] =
    a => lazyUnit(f(a))

練習(xí) 7.5
實(shí)現(xiàn)一個(gè)叫做sequence的函數(shù)额衙。不能使用而外的基礎(chǔ)函數(shù)饮焦,不能調(diào)用run。

  def sequence[A](li: List[Par[A]]): Par[List[A]] = {
    def loop(n: Int, res: Par[List[A]]): Par[List[A]] = n match {
      case m if m < 0 => res
      case _ => loop(n - 1, map2(li(n), res)(_ :: _))
    }
    loop(li.length - 1, unit(Nil))
  }

練習(xí) 7.6
實(shí)現(xiàn)parFilter窍侧,并過濾列表元素

  def parFilter[A](li: List[A])(f: A => Boolean): Par[List[A]] = {
    def loop(n: Int, res: List[Par[A]]): List[Par[A]] = n match {
      case m if m < 0 => res
      case _ => 
        if (f(li(n))) loop(n - 1, lazyUnit(li(n)) :: res)
        else loop(n - 1, res)
    }
    sequence(loop(li.length - 1, Nil))
  }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末县踢,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子伟件,更是在濱河造成了極大的恐慌硼啤,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件斧账,死亡現(xiàn)場離奇詭異丙曙,居然都是意外死亡爸业,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進(jìn)店門亏镰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來扯旷,“玉大人,你說我怎么就攤上這事索抓【觯” “怎么了?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵逼肯,是天一觀的道長耸黑。 經(jīng)常有香客問我,道長篮幢,這世上最難降的妖魔是什么大刊? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮三椿,結(jié)果婚禮上缺菌,老公的妹妹穿的比我還像新娘。我一直安慰自己搜锰,他們只是感情好敲霍,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布域携。 她就那樣靜靜地躺著姨蝴,像睡著了一般部蛇。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上狈涮,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天狐胎,我揣著相機(jī)與錄音,去河邊找鬼歌馍。 笑死顽爹,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的骆姐。 我是一名探鬼主播镜粤,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼玻褪!你這毒婦竟也來了肉渴?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤带射,失蹤者是張志新(化名)和其女友劉穎同规,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡券勺,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年绪钥,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片关炼。...
    茶點(diǎn)故事閱讀 38,137評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡程腹,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出儒拂,到底是詐尸還是另有隱情寸潦,我是刑警寧澤,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布社痛,位于F島的核電站见转,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏蒜哀。R本人自食惡果不足惜斩箫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望撵儿。 院中可真熱鬧乘客,春花似錦、人聲如沸统倒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽房匆。三九已至,卻和暖如春报亩,著一層夾襖步出監(jiān)牢的瞬間浴鸿,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工弦追, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留岳链,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓劲件,卻偏偏與公主長得像掸哑,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子零远,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理苗分,服務(wù)發(fā)現(xiàn),斷路器牵辣,智...
    卡卡羅2017閱讀 134,601評論 18 139
  • //Clojure入門教程: Clojure – Functional Programming for the J...
    葡萄喃喃囈語閱讀 3,622評論 0 7
  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語法摔癣,類相關(guān)的語法,內(nèi)部類的語法,繼承相關(guān)的語法择浊,異常的語法戴卜,線程的語...
    子非魚_t_閱讀 31,587評論 18 399
  • 我正需要一場鵝毛般的大雪, 讓它來遮蓋昨日的痕跡琢岩; 列車已經(jīng)離開了這個(gè)站點(diǎn) 落滿雪花的身影又該走向哪里呢投剥? 過去總...
    梁金欠閱讀 216評論 0 1
  • 家里有了蝸牛就是不一樣。 寶爸詢問兒子粘捎,“爸爸新下了《極速蝸呸泵澹》,看不看攒磨?”在房里東奔西跑的兒子立刻答應(yīng)了泳桦。 寶爸...
    鉛筆芒種閱讀 218評論 0 1