引言
在Akka中, 一個(gè)Future
是用來(lái)獲取某個(gè)并發(fā)操作的結(jié)果的數(shù)據(jù)結(jié)構(gòu)德澈。這個(gè)操作通常是由Actor執(zhí)行或由Dispatcher直接執(zhí)行的. 這個(gè)結(jié)果可以以同步(阻塞)或異步(非阻塞)的方式訪問(wèn)到涂。
Future提供了一種簡(jiǎn)單的方式來(lái)執(zhí)行并行算法坠陈。
Future直接使用
Future中的一個(gè)常見(jiàn)用例是在不需要使用Actor的情況下并發(fā)地執(zhí)行計(jì)算趾疚。
Future有兩種使用方式:
- 阻塞方式(Blocking):該方式下遭商,父actor或主程序停止執(zhí)行知道所有future完成各自任務(wù)屁擅。通過(guò)
scala.concurrent.Await
使用讨惩。
- 非阻塞方式(Non-Blocking),也稱(chēng)為回調(diào)方式(Callback):父actor或主程序在執(zhí)行期間啟動(dòng)future背苦,future任務(wù)和父actor并行執(zhí)行,當(dāng)每個(gè)future完成任務(wù)潘明,將通知父actor行剂。通過(guò)
onComplete
、onSuccess
钉疫、onFailure
方式使用硼讽。
執(zhí)行上下文(ExecutionContext)
為了運(yùn)行回調(diào)和操作,F(xiàn)utures需要有一個(gè)ExecutionContext
牲阁。
如果你在作用域內(nèi)有一個(gè)ActorSystem
固阁,它會(huì)它自己派發(fā)器用作ExecutionContext,你也可以用ExecutionContext伴生對(duì)象提供的工廠方法來(lái)將Executors和ExecutorServices進(jìn)行包裹城菊,或者甚至創(chuàng)建自己的實(shí)例备燃。
通過(guò)導(dǎo)入ExecutionContext.Implicits.global
來(lái)導(dǎo)入默認(rèn)的全局執(zhí)行上下文。你可以把該執(zhí)行上下文看做是一個(gè)線程池凌唬,ExecutionContext是在某個(gè)線程池執(zhí)行任務(wù)的抽象并齐。
如果在代碼中沒(méi)有導(dǎo)入該執(zhí)行上下文,代碼將無(wú)法編譯客税。
阻塞方式
第一個(gè)例子展示如何創(chuàng)建一個(gè)future况褪,然后通過(guò)阻塞方式等待其計(jì)算結(jié)果。雖然阻塞方式不是一個(gè)很好的用法更耻,但是可以說(shuō)明問(wèn)題测垛。
這個(gè)例子中,通過(guò)在未來(lái)某個(gè)時(shí)間計(jì)算1+1秧均,當(dāng)計(jì)算結(jié)果后再返回食侮。
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object FutureBlockDemo extends App{
implicit val baseTime = System.currentTimeMillis
// create a Future
val f = Future {
Thread.sleep(500)
1+1
}
// this is blocking(blocking is bad)
val result = Await.result(f, 1 second)
// 如果Future沒(méi)有在Await規(guī)定的時(shí)間里返回,
// 將拋出java.util.concurrent.TimeoutException
println(result)
Thread.sleep(1000)
}
代碼解釋?zhuān)?/p>
- 在上面的代碼中,被傳遞給Future的代碼塊會(huì)被缺省的
Dispatcher
所執(zhí)行目胡,代碼塊的返回結(jié)果會(huì)被用來(lái)完成Future
锯七。 與從Actor返回的Future不同,這個(gè)Future擁有正確的類(lèi)型, 我們還避免了管理Actor的開(kāi)銷(xiāo)誉己。
-
Await.result
方法將阻塞1秒時(shí)間來(lái)等待Future結(jié)果返回眉尸,如果Future在規(guī)定時(shí)間內(nèi)沒(méi)有返回,將拋出java.util.concurrent.TimeoutException
異常。 - 通過(guò)導(dǎo)入
scala.concurrent.duration._
效五,可以用一種方便的方式來(lái)聲明時(shí)間間隔地消,如100 nanos
,500 millis
畏妖,5 seconds
脉执、1 minute
、1 hour
戒劫,3 days
半夷。還可以通過(guò)Duration(100, MILLISECONDS)
,Duration(200, "millis")
來(lái)創(chuàng)建時(shí)間間隔迅细。
非阻塞方式(回調(diào)方式)
有時(shí)你只需要監(jiān)聽(tīng)Future
的完成事件巫橄,對(duì)其進(jìn)行響應(yīng),不是創(chuàng)建新的Future茵典,而僅僅是產(chǎn)生副作用湘换。
通過(guò)onComplete
,onSuccess
,onFailure
三個(gè)回調(diào)函數(shù)來(lái)異步執(zhí)行Future任務(wù),而后兩者僅僅是第一項(xiàng)的特例统阿。
使用onComplete的代碼示例:
import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random
object FutureNotBlock extends App{
println("starting calculation ...")
val f = Future {
Thread.sleep(Random.nextInt(500))
42
}
println("before onComplete")
f.onComplete{
case Success(value) => println(s"Got the callback, meaning = $value")
case Failure(e) => e.printStackTrace
}
// do the rest of your work
println("A ...")
Thread.sleep(100)
println("B ....")
Thread.sleep(100)
println("C ....")
Thread.sleep(100)
println("D ....")
Thread.sleep(100)
println("E ....")
Thread.sleep(100)
Thread.sleep(2000)
}
使用onSuccess彩倚、onFailure的代碼示例:
import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random
object Test12_FutureOnSuccessAndFailure extends App{
val f = Future {
Thread.sleep(Random.nextInt(500))
if (Random.nextInt(500) > 250) throw new Exception("Tikes!") else 42
}
f onSuccess {
case result => println(s"Success: $result")
}
f onFailure {
case t => println(s"Exception: ${t.getMessage}")
}
// do the rest of your work
println("A ...")
Thread.sleep(100)
println("B ....")
Thread.sleep(100)
println("C ....")
Thread.sleep(100)
println("D ....")
Thread.sleep(100)
println("E ....")
Thread.sleep(100)
Thread.sleep(1000)
}
代碼解釋?zhuān)?br>
上面兩段例子中,F(xiàn)uture結(jié)構(gòu)中隨機(jī)延遲一段時(shí)間扶平,然后返回結(jié)果或者拋出異常帆离。
然后在回調(diào)函數(shù)中進(jìn)行相關(guān)處理。
創(chuàng)建返回Future[T]的方法
先看一下示例:
import scala.concurrent.{Await, Future, future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
object ReturnFuture extends App{
implicit val baseTime = System.currentTimeMillis
// `future` method is another way to create a future
// It starts the computation asynchronously and retures a Future[Int] that
// will hold the result of the computation.
def longRunningComputation(i: Int): Future[Int] = future {
Thread.sleep(100)
i + 1
}
// this does not block
longRunningComputation(11).onComplete {
case Success(result) => println(s"result = $result")
case Failure(e) => e.printStackTrace
}
// keep the jvm from shutting down
Thread.sleep(1000)
}
代碼解釋?zhuān)?br>
上面代碼中的longRunningComputation返回一個(gè)Future[Int]
结澄,然后進(jìn)行相關(guān)的異步操作哥谷。
其中future
方法是創(chuàng)建一個(gè)future的另一種方法。它將啟動(dòng)一個(gè)異步計(jì)算并且返回包含計(jì)算結(jié)果的Future[T]
麻献。
Future用于Actor
通常有兩種方法來(lái)從一個(gè)Actor獲取回應(yīng): 第一種是發(fā)送一個(gè)消息actor ! msg
们妥,這種方法只在發(fā)送者是一個(gè)Actor時(shí)有效;第二種是通過(guò)一個(gè)Future勉吻。
使用Actor的?
方法來(lái)發(fā)送消息會(huì)返回一個(gè)Future监婶。 要等待并獲取結(jié)果的最簡(jiǎn)單方法是:
import scala.concurrent.Await
import akka.pattern.ask
import scala.concurrent.duration._
import akka.util.Timeout
implicit val timeout = Timeout(5 seconds)
val future = actor ? msg
val result = Await.result(future, timeout.duration).asInstanceOf[String]
下面是使用?
發(fā)送消息給actor,并等待回應(yīng)的代碼示例:
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.language.postfixOps
import scala.concurrent.duration._
case object AskNameMessage
class TestActor extends Actor {
def receive = {
case AskNameMessage => // respond to the 'ask' request
sender ! "Fred"
case _ => println("that was unexpected")
}
}
object AskDemo extends App{
//create the system and actor
val system = ActorSystem("AskDemoSystem")
val myActor = system.actorOf(Props[TestActor], name="myActor")
// (1) this is one way to "ask" another actor for information
implicit val timeout = Timeout(5 seconds)
val future = myActor ? AskNameMessage
val result = Await.result(future, timeout.duration).asInstanceOf[String]
println(result)
// (2) a slightly different way to ask another actor for information
val future2: Future[String] = ask(myActor, AskNameMessage).mapTo[String]
val result2 = Await.result(future2, 1 second)
println(result2)
system.shutdown
}
代碼解釋?zhuān)?/p>
Await.result(future, timeout.duration).asInstanceOf[String]
會(huì)導(dǎo)致當(dāng)前線程被阻塞餐曼,并等待actor通過(guò)它的應(yīng)答來(lái)完成Future
。但是阻塞會(huì)導(dǎo)致性能問(wèn)題鲜漩,所以是不推薦的源譬。致阻塞的操作位于Await.result
和Await.ready
中,這樣就方便定位阻塞的位置孕似。
- 還要注意actor返回的Future的類(lèi)型是
Future[Any]
踩娘,這是因?yàn)閍ctor是動(dòng)態(tài)的。 這也是為什么上例中注釋(1)使用了asInstanceOf
。 - 在使用非阻塞方式時(shí)养渴,最好使用
mapTo
方法來(lái)將Future轉(zhuǎn)換到期望的類(lèi)型雷绢。如果轉(zhuǎn)換成功,mapTo
方法會(huì)返回一個(gè)包含結(jié)果的新的 Future理卑,如果不成功翘紊,則返回ClassCastException
異常。
轉(zhuǎn)載請(qǐng)注明作者Jason Ding及其出處
Github博客主頁(yè)(http://jasonding1354.github.io/)
GitCafe博客主頁(yè)(http://jasonding1354.gitcafe.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡(jiǎn)書(shū)主頁(yè)(http://www.reibang.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進(jìn)入我的博客主頁(yè)