打破法則:一個微妙的bug
在上篇博文的實現(xiàn)中篷店,實際上會有一個相當(dāng)微妙的問題出現(xiàn)在大多數(shù)folk的實現(xiàn)上臭家,當(dāng)使用固定大小的線程池作為ExecutorService時方淤, 是很容易出現(xiàn)死鎖的。
那我們可以修復(fù)這個folk實現(xiàn)使其能夠在固定大小的線程池中工作嗎携茂?我們來看看不同的實現(xiàn):
def folk1[A](pa: => Par[A]): Par[A] = es => pa(es)
這無疑是避免的死鎖,唯一的問題是带膜,我們實際上并沒有派生一個單獨的邏輯線程來對pa進行求值鸳谜。不過這仍然是一個有用的組合子钱慢,因為它讓我們做到延時計算的實例卿堂。讓我們來給它一個更加適合的名字懒棉,delay:
def delay[A](pa: => Par[A]): Par[A] = es => pa(es)
但是我們很像能夠在固定大小的線程池中運行任意的計算。為了做到這一點穗慕,我們需要為Par挑選另一個表示類型妻导。
用Actor來實現(xiàn)一個完全無阻塞的Par
怎樣才可以實現(xiàn)一個非阻塞的Par表示類型呢?我們可以嘗試自己寫一個Future類型替換java.until.concurrent.Future倔韭。
sealed trait Future[A] {
private[chp7] def apply(f: A => Unit): Unit
}
type Par[+A] = ExecutorService => Future[A]
基于新的Par類型,讓我們重新實現(xiàn)API:
sealed trait Future[A] {
private[chp7] def apply(f: A => Unit): Unit
}
type Par[+A] = ExecutorService => Future[A]
//接受一個已求值的A胰苏,返回結(jié)果將會在另一個線程中執(zhí)行
def unit[A](a: A): Par[A] = es => new Future[A] {
def apply(f: (A) => Unit): Unit = f(a)
}
//接受一個未求值的A醇疼,返回結(jié)果將會在另一個線程中執(zhí)行
def lazyUnit[A](a: => A): Par[A] = folk(unit(a))
//從并行計算中抽取結(jié)果
def run[A](es: ExecutorService)(pa: Par[A]): A = {
val ref = new AtomicReference[A]
val latch = new CountDownLatch(1)
pa(es) {a => ref.set(a); latch.countDown()}
latch.await()
ref.get()
}
//將par[A]分配另一個獨立的線程中去運行
def folk[A](pa: => Par[A]): Par[A] = es => new Future[A] {
def apply(f: (A) => Unit): Unit = eval(es)(pa(es)(f))
}
def eval(es: ExecutorService)(f: => Unit): Unit =
es.submit(new Callable[Unit] {
override def call(): Unit = f
})
def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] =
es => new Future[C] {
def apply(cb: (C) => Unit): Unit = {
var ar: Option[A] = None
var br: Option[B] = None
val combiner = Actor[Either[A, B]](es) {
case Left(a) => br match {
case None => ar = Some(a)
case Some(b) => eval(es)(cb(f(a, b)))
}
case Right(b) => ar match {
case None => br = Some(b)
case Some(a) => eval(es)(cb(f(a, b)))
}
}
pa(es)(a => combiner ! Left(a))
pb(es)(b => combiner ! Right(b))
}
}
完善組合子為更加通用的形式
練習(xí) 7.11
實現(xiàn)choiceN秧荆,根據(jù)一個結(jié)果多多個計算進行選擇。
def choiceN[A](n: Par[Int])(choice: List[Par[A]]): Par[A] =
es => {
val i = n(es).get()
choice(i)(es)
}
練習(xí) 7.13
實現(xiàn)一個更加通用的chooser
def chooser[A, B](pa: Par[A])(choices: A => Par[B]): Par[B] =
es => {
val a = pa(es).get()
choices(a)(es)
}