選擇數(shù)據(jù)類型和函數(shù)
“創(chuàng)建并行計(jì)算”具體是指什么?我們可以從一個(gè)相對簡單的例子入手——求一組整數(shù)的和。例如下面就是利用左折疊的方法計(jì)算求和:
def sum(ints: Seq[Int]): Int =
ints.foldLeft(0)(_ + _)
除了疊加算法, 還有一個(gè)分治的算法,代碼如下:
def sum1(ints: IndexedSeq[Int]): Int =
if (ints.length < 1)
ints.headOption.getOrElse(0)
else {
val (l, r) = ints.splitAt(ints.length)
sum1(l) + sum1(r)
}
我們使用splitAt函數(shù)將序列一分為二,并各自遞歸求和盲再,最后合并它們的結(jié)果。這種實(shí)現(xiàn)可以實(shí)現(xiàn)并行化瓣铣,即對兩部分的求和可以同時(shí)進(jìn)行答朋。
一種用于并行計(jì)算的數(shù)據(jù)類型
用于表示并行計(jì)算的任何數(shù)據(jù)類型都包含一個(gè)結(jié)果,這個(gè)結(jié)果是一個(gè)有意義的類型(這里是Int)棠笑,且能夠獲取绿映。為此我們設(shè)計(jì)一個(gè)這樣的數(shù)據(jù)類型Par[A](Par是Paralle的簡寫),它就像一個(gè)裝有結(jié)果的容器腐晾,并具備下面的方法:
trait Par[A] {
}
object Par {
//接受一個(gè)未求值的A叉弦,返回結(jié)果將會在另一個(gè)線程中執(zhí)行
def unit[A](a: => A): Par[A] = ???
//從并行計(jì)算中抽取結(jié)果
def get[A](pa: Par[A]): A = ???
}
現(xiàn)在我們用自定義的數(shù)據(jù)類型更新求和算法:
import Par._
def sum2(ints: IndexedSeq[Int]): Int =
if (ints.length <= 1)
ints.headOption.getOrElse(0)
else {
val (l, r) = ints.splitAt(ints.length)
val pl = unit(sum2(l))
val pr = unit(sum2(r))
get(pl) + get(pr)
}
現(xiàn)在我們面臨一個(gè)選擇, 是讓unit在一個(gè)獨(dú)立的邏輯線程中立即求值藻糖,還是等到get被調(diào)用的時(shí)候再求值淹冰。unit立即求值會導(dǎo)致程序無法并行計(jì)算,但是unit返回一個(gè)代表一步計(jì)算的Par[Int]巨柒,那在調(diào)用get的時(shí)候無法避免產(chǎn)生副作用樱拴。如何才能避免unit和get的缺陷呢?
組合并行計(jì)算
我們可以不調(diào)用get函數(shù)洋满, 那么函數(shù)的代碼將如下:
def sum3(ints: IndexedSeq[Int]): Par[Int] =
if (ints.length <= 1)
unit(ints.headOption.getOrElse(0))
else {
val (l, r) = ints.splitAt(ints.length)
map2(sum3(l), sum3(r))(_ + _)
}
練習(xí) 7.1
Par.map2是一個(gè)新的高階函數(shù)晶乔,用于組合兩個(gè)并行計(jì)算的結(jié)果。實(shí)現(xiàn)map2函數(shù):
def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] = {
val a = get(pa)
val b = get(pb)
unit(f(a, b))
}
顯性分流
目前的API沒有明確的表明何時(shí)應(yīng)該將計(jì)算從主線程中分流出去牺勾,換句話說程序員也不知道在哪兒會發(fā)生并行計(jì)算正罢。如何讓分流更加明確呢?我們引入另一個(gè)函數(shù)來做:
//將par[A]分配另一個(gè)獨(dú)立的線程中去運(yùn)行
def folk[A](pa: => Par[A]): Par[A] = ???
讓我們來重寫sum函數(shù):
def sum4(ints: IndexedSeq[Int]): Par[Int] =
if (ints.length <= 1)
unit(ints.headOption.getOrElse(0))
else {
val (l, r) = ints.splitAt(ints.length)
map2(folk(sum3(l)), folk(sum3(r)))(_ + _)
}
對于length <= 1情況我們并不需要folk到一個(gè)獨(dú)立線程中計(jì)算驻民。
現(xiàn)在回到unit是嚴(yán)格還是惰性的問題翻具,有了folk履怯,即便unit是嚴(yán)格也不會有什么損失。至于非嚴(yán)格版本我們叫它lazyUnit吧:
//接受一個(gè)已求值的A裆泳,返回結(jié)果將會在另一個(gè)線程中執(zhí)行
def unit[A](a: A): Par[A] = ???
//接受一個(gè)未求值的A叹洲,返回結(jié)果將會在另一個(gè)線程中執(zhí)行
def lazyUnit[A](a: => A): Par[A] = folk(unit(a))
到此,我們看出Par只是一個(gè)值的函數(shù)工禾,表明并行計(jì)算运提。而實(shí)際執(zhí)行并行計(jì)算的是get函數(shù),所以我們將get函數(shù)改名為run函數(shù)闻葵,表明這個(gè)并行計(jì)算實(shí)際執(zhí)行的地方糙捺。
//從并行計(jì)算中抽取結(jié)果
def run[A](pa: Par[A]): A = ???
確定表現(xiàn)形式
經(jīng)過各種思考和選擇之后,我們有了下面大致的API笙隙。
//接受一個(gè)已求值的A,返回結(jié)果將會在另一個(gè)線程中執(zhí)行
def unit[A](a: A): Par[A] = ???
//接受一個(gè)未求值的A坎缭,返回結(jié)果將會在另一個(gè)線程中執(zhí)行
def lazyUnit[A](a: => A): Par[A] = folk(unit(a))
//從并行計(jì)算中抽取結(jié)果
def run[A](pa: Par[A]): A = ???
//將par[A]分配另一個(gè)獨(dú)立的線程中去運(yùn)行
def folk[A](pa: => Par[A]): Par[A] = ???
def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] = {
val a = run(pa)
val b = run(pb)
unit(f(a, b))
}
練習(xí) 7.2
在繼續(xù)之前竟痰,我們盡可能實(shí)現(xiàn)API中的函數(shù)
如上代碼
讓我們根據(jù)run函數(shù)反推Par類型,讓我們試著假設(shè)run可以訪問一個(gè)ExecutorService掏呼,看能不能搞清Par的樣子:
def run[A](s: ExecutorService)(pa: Par[A]): A = ???
最簡單的莫過于坏快,Par[A]是ExecutorService => A,當(dāng)然這也未免太簡單了憎夷,為此Par[A]莽鸿,應(yīng)該是ExecutorService => Future[A],而run直接返回Future:
type Par[A] = ExecutorService => Future[A]
def run[A](s: ExecutorService)(pa: Par[A]): Future[A] = pa(s)
完善API
既然有了Par的表現(xiàn)形式拾给,不妨就簡單直接點(diǎn)祥得,基于Par的表現(xiàn)類型最簡單的實(shí)現(xiàn):
object Par {
import java.util.concurrent.{ExecutorService, Future, Callable}
type Par[A] = ExecutorService => Future[A]
private case class UnitFuture[A](a: A) extends Future[A] {
override def isCancelled: Boolean = false
override def get(): A = a
override def get(timeout: Long, unit: TimeUnit): A = a
override def cancel(mayInterruptIfRunning: Boolean): Boolean = false
override def isDone: Boolean = true
}
//接受一個(gè)已求值的A,返回結(jié)果將會在另一個(gè)線程中執(zhí)行
def unit[A](a: A): Par[A] = es => UnitFuture(a)
//接受一個(gè)未求值的A蒋得,返回結(jié)果將會在另一個(gè)線程中執(zhí)行
def lazyUnit[A](a: => A): Par[A] = folk(unit(a))
//從并行計(jì)算中抽取結(jié)果
def run[A](s: ExecutorService)(pa: Par[A]): Future[A] = pa(s)
//將par[A]分配另一個(gè)獨(dú)立的線程中去運(yùn)行
def folk[A](pa: => Par[A]): Par[A] = es => {
es.submit(new Callable[A] {
override def call(): A = pa(es).get()
})
}
def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] =
es => {
val af = pa(es)
val bf = pb(es)
UnitFuture(f(af.get(), bf.get()))
}
}
練習(xí)7.3
改進(jìn)Map2的實(shí)現(xiàn)级及,支持超時(shí)設(shè)置。
def map2[A, B, C](pa: Par[A], pb: Par[B],
timeout: Long, timeUnit: TimeUnit)(f: (A, B) => C): Par[C] =
es => {
val af = pa(es)
val bf = pb(es)
val a = af.get(timeout, timeUnit)
val b = bf.get(timeout, timeUnit)
UnitFuture(f(a, b))
}
練習(xí) 7.4
使用lazyUnit寫一個(gè)函數(shù)將另一個(gè)函數(shù)A => B轉(zhuǎn)換為一個(gè)一步計(jì)算
def asyncF[A, B](f: A => B): A => Par[B] =
a => lazyUnit(f(a))
練習(xí) 7.5
實(shí)現(xiàn)一個(gè)叫做sequence的函數(shù)额衙。不能使用而外的基礎(chǔ)函數(shù)饮焦,不能調(diào)用run。
def sequence[A](li: List[Par[A]]): Par[List[A]] = {
def loop(n: Int, res: Par[List[A]]): Par[List[A]] = n match {
case m if m < 0 => res
case _ => loop(n - 1, map2(li(n), res)(_ :: _))
}
loop(li.length - 1, unit(Nil))
}
練習(xí) 7.6
實(shí)現(xiàn)parFilter窍侧,并過濾列表元素
def parFilter[A](li: List[A])(f: A => Boolean): Par[List[A]] = {
def loop(n: Int, res: List[Par[A]]): List[Par[A]] = n match {
case m if m < 0 => res
case _ =>
if (f(li(n))) loop(n - 1, lazyUnit(li(n)) :: res)
else loop(n - 1, res)
}
sequence(loop(li.length - 1, Nil))
}