【Akka】在并發(fā)程序中使用Future

引言

在Akka中, 一個(gè)Future是用來(lái)獲取某個(gè)并發(fā)操作的結(jié)果的數(shù)據(jù)結(jié)構(gòu)德澈。這個(gè)操作通常是由Actor執(zhí)行或由Dispatcher直接執(zhí)行的. 這個(gè)結(jié)果可以以同步(阻塞)或異步(非阻塞)的方式訪問(wèn)到涂。
Future提供了一種簡(jiǎn)單的方式來(lái)執(zhí)行并行算法坠陈。

Future直接使用

Future中的一個(gè)常見(jiàn)用例是在不需要使用Actor的情況下并發(fā)地執(zhí)行計(jì)算趾疚。
Future有兩種使用方式:

  1. 阻塞方式(Blocking):該方式下遭商,父actor或主程序停止執(zhí)行知道所有future完成各自任務(wù)屁擅。通過(guò)scala.concurrent.Await使用讨惩。
  1. 非阻塞方式(Non-Blocking),也稱(chēng)為回調(diào)方式(Callback):父actor或主程序在執(zhí)行期間啟動(dòng)future背苦,future任務(wù)和父actor并行執(zhí)行,當(dāng)每個(gè)future完成任務(wù)潘明,將通知父actor行剂。通過(guò)onCompleteonSuccess钉疫、onFailure方式使用硼讽。

執(zhí)行上下文(ExecutionContext)

為了運(yùn)行回調(diào)和操作,F(xiàn)utures需要有一個(gè)ExecutionContext牲阁。
如果你在作用域內(nèi)有一個(gè)ActorSystem固阁,它會(huì)它自己派發(fā)器用作ExecutionContext,你也可以用ExecutionContext伴生對(duì)象提供的工廠方法來(lái)將Executors和ExecutorServices進(jìn)行包裹城菊,或者甚至創(chuàng)建自己的實(shí)例备燃。
通過(guò)導(dǎo)入ExecutionContext.Implicits.global來(lái)導(dǎo)入默認(rèn)的全局執(zhí)行上下文。你可以把該執(zhí)行上下文看做是一個(gè)線程池凌唬,ExecutionContext是在某個(gè)線程池執(zhí)行任務(wù)的抽象并齐。
如果在代碼中沒(méi)有導(dǎo)入該執(zhí)行上下文,代碼將無(wú)法編譯客税。

阻塞方式

第一個(gè)例子展示如何創(chuàng)建一個(gè)future况褪,然后通過(guò)阻塞方式等待其計(jì)算結(jié)果。雖然阻塞方式不是一個(gè)很好的用法更耻,但是可以說(shuō)明問(wèn)題测垛。
這個(gè)例子中,通過(guò)在未來(lái)某個(gè)時(shí)間計(jì)算1+1秧均,當(dāng)計(jì)算結(jié)果后再返回食侮。

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

object FutureBlockDemo extends App{
  implicit val baseTime = System.currentTimeMillis

  // create a Future
  val f = Future {
    Thread.sleep(500)
    1+1
  }
  // this is blocking(blocking is bad)
  val result = Await.result(f, 1 second)
  // 如果Future沒(méi)有在Await規(guī)定的時(shí)間里返回,
  // 將拋出java.util.concurrent.TimeoutException
  println(result)
  Thread.sleep(1000)
}

代碼解釋?zhuān)?/p>

  1. 在上面的代碼中,被傳遞給Future的代碼塊會(huì)被缺省的Dispatcher所執(zhí)行目胡,代碼塊的返回結(jié)果會(huì)被用來(lái)完成Future锯七。 與從Actor返回的Future不同,這個(gè)Future擁有正確的類(lèi)型, 我們還避免了管理Actor的開(kāi)銷(xiāo)誉己。
  1. Await.result方法將阻塞1秒時(shí)間來(lái)等待Future結(jié)果返回眉尸,如果Future在規(guī)定時(shí)間內(nèi)沒(méi)有返回,將拋出java.util.concurrent.TimeoutException異常。
  2. 通過(guò)導(dǎo)入scala.concurrent.duration._效五,可以用一種方便的方式來(lái)聲明時(shí)間間隔地消,如100 nanos500 millis畏妖,5 seconds脉执、1 minute1 hour戒劫,3 days半夷。還可以通過(guò)Duration(100, MILLISECONDS)Duration(200, "millis")來(lái)創(chuàng)建時(shí)間間隔迅细。

非阻塞方式(回調(diào)方式)

有時(shí)你只需要監(jiān)聽(tīng)Future的完成事件巫橄,對(duì)其進(jìn)行響應(yīng),不是創(chuàng)建新的Future茵典,而僅僅是產(chǎn)生副作用湘换。
通過(guò)onComplete,onSuccess,onFailure三個(gè)回調(diào)函數(shù)來(lái)異步執(zhí)行Future任務(wù),而后兩者僅僅是第一項(xiàng)的特例统阿。

使用onComplete的代碼示例:

import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random

object FutureNotBlock extends App{
  println("starting calculation ...")
  val f = Future {
    Thread.sleep(Random.nextInt(500))
    42
  }

  println("before onComplete")
  f.onComplete{
    case Success(value) => println(s"Got the callback, meaning = $value")
    case Failure(e) => e.printStackTrace
  }

  // do the rest of your work
  println("A ...")
  Thread.sleep(100)
  println("B ....")
  Thread.sleep(100)
  println("C ....")
  Thread.sleep(100)
  println("D ....")
  Thread.sleep(100)
  println("E ....")
  Thread.sleep(100)

  Thread.sleep(2000)
}

使用onSuccess彩倚、onFailure的代碼示例:

import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random

object Test12_FutureOnSuccessAndFailure extends App{
  val f = Future {
    Thread.sleep(Random.nextInt(500))
    if (Random.nextInt(500) > 250) throw new Exception("Tikes!") else 42
  }

  f onSuccess {
    case result => println(s"Success: $result")
  }

  f onFailure {
    case t => println(s"Exception: ${t.getMessage}")
  }

  // do the rest of your work
  println("A ...")
  Thread.sleep(100)
  println("B ....")
  Thread.sleep(100)
  println("C ....")
  Thread.sleep(100)
  println("D ....")
  Thread.sleep(100)
  println("E ....")
  Thread.sleep(100)

  Thread.sleep(1000)
}

代碼解釋?zhuān)?br> 上面兩段例子中,F(xiàn)uture結(jié)構(gòu)中隨機(jī)延遲一段時(shí)間扶平,然后返回結(jié)果或者拋出異常帆离。
然后在回調(diào)函數(shù)中進(jìn)行相關(guān)處理。

創(chuàng)建返回Future[T]的方法

先看一下示例:

import scala.concurrent.{Await, Future, future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

object ReturnFuture extends App{
  implicit val baseTime = System.currentTimeMillis

  // `future` method is another way to create a future
  // It starts the computation asynchronously and retures a Future[Int] that
  // will hold the result of the computation.
  def longRunningComputation(i: Int): Future[Int] = future {
    Thread.sleep(100)
    i + 1
  }

  // this does not block
  longRunningComputation(11).onComplete {
    case Success(result) => println(s"result = $result")
    case Failure(e) => e.printStackTrace
  }

  // keep the jvm from shutting down
  Thread.sleep(1000)
}

代碼解釋?zhuān)?br> 上面代碼中的longRunningComputation返回一個(gè)Future[Int]结澄,然后進(jìn)行相關(guān)的異步操作哥谷。
其中future方法是創(chuàng)建一個(gè)future的另一種方法。它將啟動(dòng)一個(gè)異步計(jì)算并且返回包含計(jì)算結(jié)果的Future[T]麻献。

Future用于Actor

通常有兩種方法來(lái)從一個(gè)Actor獲取回應(yīng): 第一種是發(fā)送一個(gè)消息actor ! msg们妥,這種方法只在發(fā)送者是一個(gè)Actor時(shí)有效;第二種是通過(guò)一個(gè)Future勉吻。
使用Actor的?方法來(lái)發(fā)送消息會(huì)返回一個(gè)Future监婶。 要等待并獲取結(jié)果的最簡(jiǎn)單方法是:

import scala.concurrent.Await
import akka.pattern.ask
import scala.concurrent.duration._
import akka.util.Timeout

implicit val timeout = Timeout(5 seconds)
val future = actor ? msg
val result = Await.result(future, timeout.duration).asInstanceOf[String]

下面是使用?發(fā)送消息給actor,并等待回應(yīng)的代碼示例:

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.language.postfixOps
import scala.concurrent.duration._

case object AskNameMessage

class TestActor extends Actor {
  def receive = {
    case AskNameMessage => // respond to the 'ask' request
      sender ! "Fred"
    case _ => println("that was unexpected")
  }
}
object AskDemo extends App{
  //create the system and actor
  val system = ActorSystem("AskDemoSystem")
  val myActor = system.actorOf(Props[TestActor], name="myActor")

  // (1) this is one way to "ask" another actor for information
  implicit val timeout = Timeout(5 seconds)
  val future = myActor ? AskNameMessage
  val result = Await.result(future, timeout.duration).asInstanceOf[String]
  println(result)

  // (2) a slightly different way to ask another actor for information
  val future2: Future[String] = ask(myActor, AskNameMessage).mapTo[String]
  val result2 = Await.result(future2, 1 second)
  println(result2)

  system.shutdown
}

代碼解釋?zhuān)?/p>

  1. Await.result(future, timeout.duration).asInstanceOf[String]會(huì)導(dǎo)致當(dāng)前線程被阻塞餐曼,并等待actor通過(guò)它的應(yīng)答來(lái)完成Future。但是阻塞會(huì)導(dǎo)致性能問(wèn)題鲜漩,所以是不推薦的源譬。致阻塞的操作位于Await.resultAwait.ready中,這樣就方便定位阻塞的位置孕似。
  1. 還要注意actor返回的Future的類(lèi)型是Future[Any]踩娘,這是因?yàn)閍ctor是動(dòng)態(tài)的。 這也是為什么上例中注釋(1)使用了asInstanceOf
  2. 在使用非阻塞方式時(shí)养渴,最好使用mapTo方法來(lái)將Future轉(zhuǎn)換到期望的類(lèi)型雷绢。如果轉(zhuǎn)換成功,mapTo方法會(huì)返回一個(gè)包含結(jié)果的新的 Future理卑,如果不成功翘紊,則返回ClassCastException異常。

轉(zhuǎn)載請(qǐng)注明作者Jason Ding及其出處
Github博客主頁(yè)(http://jasonding1354.github.io/)
GitCafe博客主頁(yè)(http://jasonding1354.gitcafe.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡(jiǎn)書(shū)主頁(yè)(http://www.reibang.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進(jìn)入我的博客主頁(yè)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末藐唠,一起剝皮案震驚了整個(gè)濱河市帆疟,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌宇立,老刑警劉巖踪宠,帶你破解...
    沈念sama閱讀 212,080評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異妈嘹,居然都是意外死亡柳琢,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,422評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)润脸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)柬脸,“玉大人,你說(shuō)我怎么就攤上這事津函⌒ち福” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,630評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵尔苦,是天一觀的道長(zhǎng)涩馆。 經(jīng)常有香客問(wèn)我,道長(zhǎng)允坚,這世上最難降的妖魔是什么魂那? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,554評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮稠项,結(jié)果婚禮上涯雅,老公的妹妹穿的比我還像新娘。我一直安慰自己展运,他們只是感情好活逆,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,662評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著拗胜,像睡著了一般蔗候。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上埂软,一...
    開(kāi)封第一講書(shū)人閱讀 49,856評(píng)論 1 290
  • 那天锈遥,我揣著相機(jī)與錄音,去河邊找鬼。 笑死所灸,一個(gè)胖子當(dāng)著我的面吹牛丽惶,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播爬立,決...
    沈念sama閱讀 39,014評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼钾唬,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了懦尝?” 一聲冷哼從身側(cè)響起知纷,我...
    開(kāi)封第一講書(shū)人閱讀 37,752評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎陵霉,沒(méi)想到半個(gè)月后琅轧,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,212評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡踊挠,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,541評(píng)論 2 327
  • 正文 我和宋清朗相戀三年乍桂,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片效床。...
    茶點(diǎn)故事閱讀 38,687評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡睹酌,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出剩檀,到底是詐尸還是另有隱情憋沿,我是刑警寧澤,帶...
    沈念sama閱讀 34,347評(píng)論 4 331
  • 正文 年R本政府宣布沪猴,位于F島的核電站辐啄,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏运嗜。R本人自食惡果不足惜壶辜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,973評(píng)論 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望担租。 院中可真熱鬧砸民,春花似錦、人聲如沸奋救。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,777評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)尝艘。三九已至演侯,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間利耍,已是汗流浹背蚌本。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,006評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留隘梨,地道東北人程癌。 一個(gè)月前我還...
    沈念sama閱讀 46,406評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像轴猎,于是被迫代替她去往敵國(guó)和親嵌莉。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,576評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容