Actor的創(chuàng)建&引用&聲明周期
1.創(chuàng)建actor
- 定義一個Actor類
要定義自己的Actor類盏求,需要繼承Actor并實現(xiàn)receive方法充活。receive方法需要定義一系列case語句(類型為PartialFunction[Any, Unit])來描述你的Actor能夠處理哪些消息(使用標準的Scala模式匹配),以及消息如何被處理且蓬。
如下例:
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging
class MyActor extends Actor {
val log = Logging(context.system, this)
def receive = {
case "test" => log.info("received test")
case _ => log.info("received unknown message")
}
}
- Props
Props是一個用來在創(chuàng)建actor時指定選項的配置類欣硼,可以把它看作是不可變的,因此在創(chuàng)建包含相關(guān)部署信息的actor時(例如使用哪一個調(diào)度器(dispatcher)恶阴,詳見下文)诈胜,是可以自由共享的。以下是如何創(chuàng)建Props實例的示例.
import akka.actor.Props
val props1 = Props[MyActor]
val props2 = Props(new ActorWithArgs("arg")) // careful, see below
val props3 = Props(classOf[ActorWithArgs], "arg")
警告
在另一個actor中聲明一個actor是非常危險的冯事,會打破actor的封裝焦匈。永遠不要將一個actor的this引用傳進Props!
推薦做法
在每一個Actor的伴生對象中提供工廠方法是一個好主意昵仅,這有助于保持創(chuàng)建合適的Props缓熟,盡可能接近actor的定義。這也避免了使用Props.apply(...)方法將采用一個“按名”(by-name)參數(shù)的缺陷摔笤,因為伴生對象的給定代碼塊中將不會保留包含作用域的引用:
object DemoActor {
/**
* Create Props for an actor of this type.
* @param magciNumber The magic number to be passed to this actor’s constructor.
* @return a Props for creating this actor, which can then be further configured
* (e.g. calling `.withDispatcher()` on it)
*/
def props(magicNumber: Int): Props = Props(new DemoActor(magicNumber))
}
class DemoActor(magicNumber: Int) extends Actor {
def receive = {
case x: Int => sender() ! (x + magicNumber)
}
}
class SomeOtherActor extends Actor {
// Props(new DemoActor(42)) would not be safe
context.actorOf(DemoActor.props(42), "demo")
// ...
}
- 使用Props創(chuàng)建Actor
Actor可以通過將Props實例傳入actorOf工廠方法來創(chuàng)建够滑,ActorSystem和ActorContext中都有該方法。
import akka.actor.ActorSystem
// ActorSystem is a heavy object: create only one per application
val system = ActorSystem("mySystem")
val myActor = system.actorOf(Props[MyActor], "myactor2")
使用ActorSystem將創(chuàng)建頂級actor吕世,由actor系統(tǒng)提供的守護actor監(jiān)管彰触;如果使用的是actor的上下文,則創(chuàng)建一個該actor的子actor命辖。
class FirstActor extends Actor {
val child = context.actorOf(Props[MyActor], name = "myChild")
// plus some behavior ...
}
推薦創(chuàng)建一個樹形結(jié)構(gòu)况毅,包含子actor、孫子等等尔艇,使之符合應(yīng)用的邏輯錯誤處理結(jié)構(gòu)
- 依賴注入
如果你的actor有帶參數(shù)的構(gòu)造函數(shù)尔许,則這些參數(shù)也需要成為Props的一部分,如上文所述漓帚。但有些情況下必須使用工廠方法母债,例如,當實際構(gòu)造函數(shù)的參數(shù)由依賴注入框架決定。
import akka.actor.IndirectActorProducer
class DependencyInjector(applicationContext: AnyRef, beanName: String)
extends IndirectActorProducer {
override def actorClass = classOf[Actor]
override def produce =
// obtain fresh Actor instance from DI framework ...
}
val actorRef = system.actorOf(
Props(classOf[DependencyInjector], applicationContext, "hello"),
"helloBean")
2.Actor API
Actor trait只定義了一個抽象方法毡们,就是上面提到的receive迅皇,用來實現(xiàn)actor的行為。
如果當前actor的行為與收到的消息不匹配衙熔,則會調(diào)用 unhandled登颓,其缺省實現(xiàn)是向actor系統(tǒng)的事件流中發(fā)布一條akka.actor.UnhandledMessage(message, sender, recipient)(將配置項akka.actor.debug.unhandled設(shè)置為on來將它們轉(zhuǎn)換為實際的調(diào)試消息)。
另外红氯,它還包括:
- self引用代表本actor的ActorRef
- sender引用代表最近收到消息的發(fā)送actor框咙,通常用于下面將講到的消息回應(yīng)中
- supervisorStrategy 用戶可重寫它來定義對子actor的監(jiān)管策略
該策略通常在actor內(nèi)聲明,這樣決定函數(shù)就可以訪問actor的內(nèi)部狀態(tài):因為失敗通知作為消息發(fā)送給監(jiān)管者痢甘,并像普通消息一樣被處理(盡管不是正常行為)喇嘱,所有的值和actor變量都是可用的,以及sender引用 (報告失敗的將是直接子actor塞栅;如果原始失敗發(fā)生在遙遠的后裔者铜,它仍然是一次向上報告一層)。
- context暴露actor和當前消息的上下文信息放椰,如:
- 用于創(chuàng)建子actor的工廠方法(actorOf)
- actor所屬的系統(tǒng)
- 父監(jiān)管者
- 所監(jiān)管的子actor
- 生命周期監(jiān)控
- hotswap行為棧作烟,見Become/Unbecome
3.Actor生命周期
actor系統(tǒng)中的路徑代表一個"地方",這里可能會被活著的actor占據(jù)砾医。最初(除了系統(tǒng)初始化actor)路徑都是空的拿撩。在調(diào)用actorOf()時它將為指定路徑分配根據(jù)傳入Props創(chuàng)建的一個actor化身。actor化身是由路徑和一個UID標識的如蚜。重新啟動只會替換有Props定義的Actor實例压恒,但不會替換化身,因此UID保持不變怖亭。
當actor停止時涎显,其化身的生命周期結(jié)束。在這一時間點上相關(guān)的生命周期事件被調(diào)用兴猩,監(jiān)視該actor的actor都會獲得終止通知期吓。當化身停止后,路徑可以重復(fù)使用倾芝,通過actorOf()創(chuàng)建一個actor讨勤。在這種情況下,除了UID不同外晨另,新化身與老化身是相同的潭千。
ActorRef始終表示化身(路徑和UID)而不只是一個給定的路徑。因此如果actor停止借尿,并且創(chuàng)建一個新的具有相同名稱的actor刨晴,則指向老化身的ActorRef將不會指向新的化身屉来。
相對地,ActorSelection指向路徑(或多個路徑狈癞,如果使用了通配符)茄靠,且完全不關(guān)注有沒有化身占據(jù)它。因此ActorSelection 不能被監(jiān)視蝶桶。獲取某路徑下的當前化身ActorRef是可能的慨绳,只要向該ActorSelection發(fā)送Identify,如果收到ActorIdentity回應(yīng)真竖,則正確的引用就包含其中(詳見通過Actor Selection確定Actor)脐雪。也可以使用ActorSelection的resolveOne方法,它會返回一個包含匹配ActorRef的Future恢共。
- 使用DeathWatch進行生命周期監(jiān)控
為了在其它actor終止時 (即永久停止战秋,而不是臨時的失敗和重啟)收到通知,actor可以將自己注冊為其它actor在終止時所發(fā)布的Terminated消息的接收者(見停止 Actor)讨韭。這個服務(wù)是由actor系統(tǒng)的DeathWatch組件提供的获询。
注冊一個監(jiān)視器很簡單:
import akka.actor.{ Actor, Props, Terminated }
class WatchActor extends Actor {
val child = context.actorOf(Props.empty, "child")
context.watch(child) // <-- this is the only call needed for registration
var lastSender = system.deadLetters
def receive = {
case "kill" =>
context.stop(child); lastSender = sender()
case Terminated(`child`) => lastSender ! "finished"
}
}
要注意Terminated消息的產(chǎn)生與注冊和終止行為所發(fā)生的順序無關(guān)。特別地拐袜,即使在注冊時,被觀察的actor已經(jīng)終止了梢薪,監(jiān)視actor仍然會受到一個Terminated消息蹬铺。
多次注冊并不表示會有多個消息產(chǎn)生,也不保證有且只有一個這樣的消息被接收到:如果被監(jiān)控的actor已經(jīng)生成了消息并且已經(jīng)進入了隊列秉撇,在這個消息被處理之前又發(fā)生了另一次注冊甜攀,則會有第二個消息進入隊列,因為對一個已經(jīng)終止的actor的監(jiān)控注冊操作會立刻導(dǎo)致Terminated消息的產(chǎn)生琐馆。
可以使用context.unwatch(target)來停止對另一個actor生存狀態(tài)的監(jiān)控规阀。即使Terminated已經(jīng)加入郵箱,該操作仍有效瘦麸;一旦調(diào)用unwatch谁撼,則被觀察的actor的Terminated消息就都不會再被處理。
- 啟動Hook
actor啟動后滋饲,它的preStart方法會被立即執(zhí)行厉碟。
override def preStart() {
// registering with other actors
someService ! Register(self)
}
在actor第一次創(chuàng)建時,將調(diào)用此方法屠缭。在重新啟動期間箍鼓,它被postRestart的默認實現(xiàn)調(diào)用,這意味著通過重寫該方法呵曹,你可以選擇是僅僅在初始化該actor時調(diào)用一次款咖,還是為每次重新啟動都調(diào)用袜瞬。actor構(gòu)造函數(shù)中的初始化代碼將在每個actor實例創(chuàng)建的時候被調(diào)用,這也發(fā)生在每次重啟時泻肯。
- 重啟Hook
所有的actor都是被監(jiān)管的佳励,即與另一個使用某種失敗處理策略的actor綁定在一起。如果在處理一個消息的時候拋出了異常背稼,Actor將被重啟(詳見監(jiān)管與監(jiān)控)贰军。這個重啟過程包括上面提到的Hook:
要被重啟的actor被通知是通過調(diào)用preRestart,包含著導(dǎo)致重啟的異常以及觸發(fā)異常的消息蟹肘;如果重啟并不是因為消息處理而發(fā)生的词疼,則所攜帶的消息為None,例如帘腹,當一個監(jiān)管者沒有處理某個異常繼而被其監(jiān)管者重啟時贰盗,或者因其兄弟節(jié)點的失敗導(dǎo)致的重啟。如果消息可用阳欲,則消息的發(fā)送者通常也可用(即通過調(diào)用sender)舵盈。
這個方法是用來完成清理、準備移交給新actor實例等操作的最佳位置球化。其缺省實現(xiàn)是終止所有子actor并調(diào)用postStop秽晚。
最初調(diào)用actorOf的工廠將被用來創(chuàng)建新的實例。
新的actor的postRestart方法被調(diào)用時筒愚,將攜帶著導(dǎo)致重啟的異常信息赴蝇。默認實現(xiàn)中,preStart被調(diào)用時巢掺,就像一個正常的啟動一樣句伶。
actor的重啟只會替換掉原來的actor對象;重啟不影響郵箱的內(nèi)容陆淀,所以對消息的處理將在postRestart hook返回后繼續(xù)考余。觸發(fā)異常的消息不會被重新接收。在actor重啟過程中轧苫,所有發(fā)送到該actor的消息將象平常一樣被放進郵箱隊列中楚堤。
警告
要知道失敗通知與用戶消息的相關(guān)順序不是決定性的。尤其是含懊,在失敗以前收到的最后一條消息被處理之前钾军,父節(jié)點可能已經(jīng)重啟其子節(jié)點了。詳細信息請參見“討論:消息順序”绢要。
- 終止 Hook
一個Actor終止后吏恭,其postStop hook將被調(diào)用,它可以用來重罪,例如取消該actor在其它服務(wù)中的注冊樱哼。這個hook保證在該actor的消息隊列被禁止后才運行哀九,即之后發(fā)給該actor的消息將被重定向到ActorSystem的deadLetters中。
3. 通過Actor Selection定位Actor
如Actor引用, 路徑與地址中所述搅幅,每個actor都擁有一個唯一的邏輯路徑阅束,此路徑是由從actor系統(tǒng)的根開始的父子鏈構(gòu)成;它還擁有一個物理路徑茄唐,如果監(jiān)管鏈包含有遠程監(jiān)管者息裸,此路徑可能會與邏輯路徑不同。這些路徑用來在系統(tǒng)中查找actor沪编,例如呼盆,當收到一個遠程消息時查找收件者,但是它們更直接的用處在于:actor可以通過指定絕對或相對路徑(邏輯的或物理的)來查找其它的actor蚁廓,并隨結(jié)果獲取一個ActorSelection:
// will look up this absolute path
context.actorSelection("/user/serviceA/aggregator")
// will look up sibling beneath same supervisor
context.actorSelection("../joe")
其中指定的路徑被解析為一個java.net.URI访圃,它以/分隔成路徑段。如果路徑以/開始相嵌,表示一個絕對路徑腿时,且從根監(jiān)管者("/user"的父親)開始查找;否則是從當前actor開始饭宾。如果某一個路徑段為..批糟,會找到當前所遍歷到的actor的上一級,否則則會向下一級尋找具有該名字的子actor看铆。 必須注意的是actor路徑中的..總是表示邏輯結(jié)構(gòu)跃赚,即其監(jiān)管者。
一個actor selection的路徑元素中可能包含通配符性湿,從而允許向匹配模式的集合廣播該條消息:
// will look all children to serviceB with names starting with worker
context.actorSelection("/user/serviceB/worker*")
// will look up all siblings beneath same supervisor
context.actorSelection("../*")
消息可以通過ActorSelection發(fā)送,并且在投遞每條消息時 ActorSelection的路徑都會被查找满败。如果selection不匹配任何actor肤频,則消息將被丟棄。
要獲得ActorSelection的ActorRef算墨,你需要發(fā)送一條消息到selection宵荒,然后使用答復(fù)消息的sender()引用即可。有一個內(nèi)置的Identify消息净嘀,所有actor會理解它并自動返回一個包含ActorRef的ActorIdentity消息报咳。此消息被遍歷到的actor特殊處理為,如果一個具體的名稱查找失斖诓亍(即一個不含通配符的路徑?jīng)]有對應(yīng)的活動actor)暑刃,則會生成一個否定結(jié)果。請注意這并不意味著應(yīng)答消息有到達保證膜眠,它仍然是一個普通的消息岩臣。
import akka.actor.{ Actor, Props, Identify, ActorIdentity, Terminated }
class Follower extends Actor {
val identifyId = 1
context.actorSelection("/user/another") ! Identify(identifyId)
def receive = {
case ActorIdentity(`identifyId`, Some(ref)) =>
context.watch(ref)
context.become(active(ref))
case ActorIdentity(`identifyId`, None) => context.stop(self)
}
def active(another: ActorRef): Actor.Receive = {
case Terminated(`another`) => context.stop(self)
}
}
你也可以通過ActorSelection的resolveOne方法獲取ActorSelection的一個ActorRef溜嗜。如果存在這樣的actor,它將返回一個包含匹配的ActorRef的Future架谎。如果沒有這樣的actor 存在或識別沒有在指定的時間內(nèi)完成炸宵,它將以失敗告終——akka.actor.ActorNotFound。
如果開啟了遠程調(diào)用谷扣,則遠程actor地址也可以被查找:
context.actorSelection("akka.tcp://app@otherhost:1234/user/serviceB")
4.發(fā)送消息
向actor發(fā)送消息需使用下列方法之一土全。
- !意思是“fire-and-forget”,即異步發(fā)送一個消息并立即返回会涎。也稱為tell裹匙。
- ?異步發(fā)送一條消息并返回一個Future代表一個可能的回應(yīng)。也稱為ask在塔。
對每一個消息發(fā)送者幻件,分別有消息順序保證。
注意
使用ask有一些性能內(nèi)涵蛔溃,因為需要跟蹤超時绰沥,需要有橋梁將Promise轉(zhuǎn)為ActorRef,并且需要在遠程情況下可訪問贺待。所以為了性能應(yīng)該總選擇tell徽曲,除非只能選擇ask。
Tell: Fire-forget
這是發(fā)送消息的推薦方式麸塞。 不會阻塞地等待消息秃臣。它擁有最好的并發(fā)性和可擴展性。
actorRef ! message
如果是在一個Actor中調(diào)用 哪工,那么發(fā)送方的actor引用會被隱式地作為消息的sender(): ActorRef成員一起發(fā)送奥此。目的actor可以使用它來向源actor發(fā)送回應(yīng), 使用sender() ! replyMsg雁比。
如果不是從Actor實例發(fā)送的稚虎,sender成員缺省為 deadLetters actor引用。
Ask: Send-And-Receive-Future
ask模式既包含actor也包含future偎捎,所以它是一種使用模式蠢终,而不是ActorRef的方法:
import akka.pattern.{ ask, pipe }
import system.dispatcher // The ExecutionContext that will be used
case class Result(x: Int, s: String, d: Double)
case object Request
implicit val timeout = Timeout(5 seconds) // needed for `?` below
val f: Future[Result] =
for {
x <- ask(actorA, Request).mapTo[Int] // call pattern directly
s <- (actorB ask Request).mapTo[String] // call by implicit conversion
d <- (actorC ? Request).mapTo[Double] // call by symbolic name
} yield Result(x, s, d)
f pipeTo actorD // .. or ..
pipe(f) to actorD
5.接收消息
Actor必須實現(xiàn)receive方法來接收消息:
protected def receive: PartialFunction[Any, Unit]
這個方法應(yīng)返回一個PartialFunction,例如一個“match/case”子句茴她,消息可以與其中的不同分支進行scala模式匹配寻拂。如下例:
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging
class MyActor extends Actor {
val log = Logging(context.system, this)
def receive = {
case "test" => log.info("received test")
case _ => log.info("received unknown message")
}
}
6.終止Actor
通過調(diào)用ActorRefFactory(即ActorContext或ActorSystem)的stop方法來終止一個actor。通常context用來終止子actor丈牢,而 system用來終止頂級actor祭钉。實際的終止操作是異步執(zhí)行的,即stop可能在actor被終止之前返回己沛。
actor的終止分兩步: 第一步actor將掛起對郵箱的處理朴皆,并向所有子actor發(fā)送終止命令帕识,然后處理來自子actor的終止消息直到所有的子actor都完成終止,最后終止自己(調(diào)用postStop遂铡,清空郵箱肮疗,向DeathWatch發(fā)布Terminated,通知其監(jiān)管者)扒接。這個過程保證actor系統(tǒng)中的子樹以一種有序的方式終止伪货,將終止命令傳播到葉子結(jié)點并收集它們回送的確認消息給被終止的監(jiān)管者。如果其中某個actor沒有響應(yīng)(即由于處理消息用了太長時間以至于沒有收到終止命令)钾怔,整個過程將會被阻塞碱呼。
在ActorSystem.shutdown()被調(diào)用時,系統(tǒng)根監(jiān)管actor會被終止宗侦,以上的過程將保證整個系統(tǒng)的正確終止愚臀。
postStop() hook 是在actor被完全終止以后調(diào)用的。這是為了清理資源:
override def postStop() {
// clean up some resources ...
}
注意
由于actor的終止是異步的矾利,你不能馬上使用你剛剛終止的子actor的名字姑裂;這會導(dǎo)致InvalidActorNameException。你應(yīng)該 監(jiān)視watch()正在終止的actor男旗,并在Terminated最終到達后作為回應(yīng)創(chuàng)建它的替代者舶斧。
優(yōu)雅地終止
如果你需要等待終止過程的結(jié)束,或者組合若干actor的終止次序察皇,可以使用gracefulStop:
import akka.pattern.gracefulStop
import scala.concurrent.Await
try {
val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds, Manager.Shutdown)
Await.result(stopped, 6 seconds)
// the actor has been stopped
} catch {
// the actor wasn't stopped within 5 seconds
case e: akka.pattern.AskTimeoutException =>
}
object Manager {
case object Shutdown
}
class Manager extends Actor {
import Manager._
val worker = context.watch(context.actorOf(Props[Cruncher], "worker"))
def receive = {
case "job" => worker ! "crunch"
case Shutdown =>
worker ! PoisonPill
context become shuttingDown
}
def shuttingDown: Receive = {
case "job" => sender() ! "service unavailable, shutting down"
case Terminated(`worker`) =>
context stop self
}
}
當gracefulStop()成功返回時茴厉,actor的postStop() hook將會被執(zhí)行:在postStop()結(jié)束和gracefulStop()返回之間存在happens-before邊界。
在上面的示例中自定義的Manager.Shutdown消息是發(fā)送到目標actor來啟動actor的終止過程什荣。你可以使用PoisonPill矾缓,但之后在停止目標actor之前,你與其他actor的互動的機會有限稻爬。在postStop中嗜闻,可以處理簡單的清理任務(wù)。
警告
請記住因篇,actor停止和其名稱被注銷是彼此異步發(fā)生的獨立事件。因此竞滓,在gracefulStop()返回后吹缔。你會發(fā)現(xiàn)其名稱仍可能在使用中。為了保證正確注銷厢塘,只在你控制的監(jiān)管者內(nèi)茶没,并且只在響應(yīng)Terminated消息時重用名稱肌幽,即不是用于頂級actor。
7.Become/Unbecome
升級
Akka支持在運行時對Actor消息循環(huán)(即其實現(xiàn))進行實時替換:在actor中調(diào)用context.become方法抓半。become要求一個PartialFunction[Any, Unit]參數(shù)作為新的消息處理實現(xiàn)喂急。 被替換的代碼被保存在一個棧中,可以被push和pop笛求。
警告
請注意actor被其監(jiān)管者重啟后將恢復(fù)其最初的行為廊移。
8.使用PartialFunction鏈來擴展actor
有時在一些actor中分享共同的行為,或通過若干小的函數(shù)構(gòu)成一個actor的行為是很有用的探入。這由于actor的receive方法返回一個Actor.Receive(PartialFunction[Any,Unit]的類型別名)而使之成為可能狡孔,多個偏函數(shù)可以使用PartialFunction#orElse鏈接在一起。你可以根據(jù)需要鏈接盡可能多的功能蜂嗽,但是你要牢記"第一個匹配"獲勝——這在組合可以處理同一類型的消息的功能時會很重要苗膝。
例如,假設(shè)你有一組actor是生產(chǎn)者Producers或消費者Consumers植旧,然而有時候需要actor分享這兩種行為辱揭。這可以很容易實現(xiàn)而無需重復(fù)代碼,通過提取行為的特質(zhì)和并將actor的receive實現(xiàn)為這些偏函數(shù)的組合隆嗅。
trait ProducerBehavior {
this: Actor =>
val producerBehavior: Receive = {
case GiveMeThings =>
sender() ! Give("thing")
}
}
trait ConsumerBehavior {
this: Actor with ActorLogging =>
val consumerBehavior: Receive = {
case ref: ActorRef =>
ref ! GiveMeThings
case Give(thing) =>
log.info("Got a thing! It's {}", thing)
}
}
class Producer extends Actor with ProducerBehavior {
def receive = producerBehavior
}
class Consumer extends Actor with ActorLogging with ConsumerBehavior {
def receive = consumerBehavior
}
class ProducerConsumer extends Actor with ActorLogging
with ProducerBehavior with ConsumerBehavior {
def receive = producerBehavior orElse consumerBehavior
}
// protocol
case object GiveMeThings
case class Give(thing: Any)
不同于繼承界阁,相同的模式可以通過組合實現(xiàn)——可以簡單地通過委托的偏函數(shù)組合成receive方法。