Actor 是 Scala 基于消息傳遞的并發(fā)模型,雖然自 Scala-2.10 其默認(rèn)并發(fā)模型的地位已被 Akka 取代亩冬,但這種與傳統(tǒng) Java硼身、C++完全不一樣的并發(fā)模型依舊值得學(xué)習(xí)。
如何使用 Actor
擴(kuò)展 Actor
先來(lái)看看第一種用法营袜,下面是一個(gè)簡(jiǎn)單例子及部分說(shuō)明
//< 擴(kuò)展超類 Actor
class ActorItem extends Actor {
//< 重載 act 方法
def act(): Unit = {
//< receive 從消息隊(duì)列 mailbox 中去一條消息并處理
receive { case msg => println(msg) }
}
}
object Test {
def main (args: Array[String]): Unit = {
val actorItem = new ActorItem
//< 啟動(dòng)
actorItem.start()
//< 向 item 發(fā)送消息
actorItem ! "actor test1"
actorItem ! "actor test2"
}
}
輸出:
actor test1
這種用法在實(shí)際中并不常用丑罪,需要:
- 擴(kuò)展超類 Actor
- 重載 act 方法
- 調(diào)用擴(kuò)展類對(duì)象 start 方法
使用 scala.actors.Actor.actor 方法
第二種方式是實(shí)際中常用并且是 Scala 社區(qū)推薦的吩屹,例子如下:
object Test {
def main (args: Array[String]): Unit = {
val actorItem = actor {
receive { case msg => println(msg) }
}
//< 向 item 發(fā)送消息
actorItem ! "actor test1"
actorItem ! "actor test2"
}
}
輸出:
actor test1
這里需要特別注意的是,actor 其實(shí)是scala.actors.Actor的 actor 方法免绿,并不是 scala 語(yǔ)言內(nèi)建的擦盾。
這種使用方法更加方便,與第一種擴(kuò)展超類 Actor 有以下幾點(diǎn)不同:
- 使用 Actor.actor 方法(返回類型為Actor)而不是擴(kuò)展 Actor 并重載 act 方法
- 構(gòu)造完成即啟動(dòng)厌衙,不需要調(diào)用 start方法(當(dāng)然你調(diào)用了也不會(huì)有什么問(wèn)題)
使用 react
除了可以使用 receive 從消息隊(duì)列 mailbox 中取出消息并處理婶希,react 同樣可以蓬衡。receive 和 react 的區(qū)別與聯(lián)系將在下文中說(shuō)明。先來(lái)看看怎么用筒饰,其實(shí)只要把上面兩段代碼的 receive 替換成 react 即可:
class ActorItem extends Actor {
def act(): Unit = {
react { case msg => println(msg) }
}
}
object Test {
def main (args: Array[String]): Unit = {
val actorItem = new ActorItem
actorItem.start()
actorItem ! "actor test1"
actorItem ! "actor test2"
}
}
輸出:
actor test1
object Test {
def main (args: Array[String]): Unit = {
val actorItem = actor {
react { case msg => println(msg) }
}
actorItem ! "actor test1"
actorItem ! "actor test2"
}
}
輸出:
actor test1
持續(xù)處理消息
如果你仔細(xì)觀察壁晒,就會(huì)發(fā)現(xiàn)上面的每個(gè)示例中,都向 actor 發(fā)送了"actor test1"和"actor test2"兩條消息谬晕,但最終只打印了"actor test1"這一條消息。這是因?yàn)榘锟祝还苁?receive 還是 react不撑,都只從 mailbox 中取一條消息進(jìn)行處理,處理完之后不會(huì)再取一條處理姆坚。如果想要持續(xù)從 maibox 中取消息并處理揩页,也有兩種方式。
方式一:使用 loop萍程。適用于擴(kuò)展 Actor 和 actor 方法兩種方式
class ActorItem extends Actor {
def act(): Unit = {
loop {
react { case msg => println(msg) }
}
}
}
object Test {
def main (args: Array[String]): Unit = {
val actorItem = new ActorItem
actorItem.start()
actorItem ! "actor test1"
actorItem ! "actor test2"
}
}
輸出:
actor test1
actor test2
方式二:在 receive 處理中調(diào)用receive兔仰;在 react 處理中調(diào)用 react。僅適用于 actor 方法這種方法
class ActorItem extends Actor {
def act(): Unit = {
react {
case msg => {
println(msg)
act()
}
}
}
}
object Test {
def main (args: Array[String]): Unit = {
val actorItem = new ActorItem
actorItem.start()
actorItem ! "actor test1"
actorItem ! "actor test2"
}
}
Actor是如何工作的
每個(gè)actor對(duì)象都有一個(gè) mailbox忍法,可以簡(jiǎn)單的認(rèn)為是一個(gè)隊(duì)列饿序,用來(lái)存放發(fā)送給這個(gè)actor的消息羹蚣。
當(dāng) actor 發(fā)送消息時(shí),它并不會(huì)阻塞咽弦,而當(dāng) actor 接收消息時(shí)胁出,它也不會(huì)被打斷。發(fā)送的消息在接收 actor 的 mailbox 中等待處理全蝶,直到 actor 調(diào)用 receive 方法。
receive 具體是怎么工作的呢嫂用?來(lái)看看它的源碼:
def receive[R](f: PartialFunction[Any, R]): R = {
var done = false
while (!done) {
//< 從 mailbox 中取出一條消息
val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
senders = replyTo :: senders
//< 與偏函數(shù)進(jìn)行匹配嘱函,匹配失敗返回 null
val matches = f.isDefinedAt(m)
senders = senders.tail
matches
})
if (null eq qel) {
//< 如果當(dāng)前mailbox里面沒(méi)有可以處理的消息,調(diào)用suspendActor往弓,該方法會(huì)調(diào)用wait
waitingFor = f.isDefinedAt
isSuspended = true
suspendActor()
} else {
//< 執(zhí)行到這里就說(shuō)明成功從 mailbox 中獲得匹配的消息
received = Some(qel.msg)
senders = qel.session :: senders
done = true
}
}
//< 成功獲得消息后函似,調(diào)用 f.apply 來(lái)執(zhí)行對(duì)應(yīng)的操作
val result = f(received.get)
received = None
senders = senders.tail
result
}
一圖勝千言,下圖為 receive 模型工作流程
與線程的關(guān)系
Actor 的線程模型可以這樣理解:在一個(gè)進(jìn)程中顿天,所有的 actor 共享一個(gè)線程池蔑担,總的線程個(gè)數(shù)可以配置,也可以根據(jù) CPU 個(gè)數(shù)決定鸟缕。
當(dāng)一個(gè) actor 啟動(dòng)后排抬,Scala 分配一個(gè)線程給它使用,如果使用 receive 模型番甩,這個(gè)線程就一直為該 Actor 所有届搁。
如果使用 react 模型,react 找到并處理消息后并不返回,它的返回類型為 Nothing么翰,Scala 執(zhí)行完 react 方法后辽旋,拋出異常檐迟,調(diào)用 act 也就是間接調(diào)用 react 的線程會(huì)捕獲這個(gè)異常码耐,忘掉這個(gè) actor,該線程就可以被其他actor 使用敦间。
所以束铭,如果能用 react 就盡量使用 react,可以節(jié)省線程带猴。
良好的 Actor 風(fēng)格
只通過(guò)消息與 actor 通信
舉個(gè)例子懈万,一個(gè) GoodActor可能會(huì)在發(fā)往 BadActor 的消息中包含一個(gè)指向自己的引用,來(lái)表明作為消息源的自己口予。如果 BadActor 調(diào)用了 GoodActor 的某個(gè)任意的方法而不是通過(guò) "!" 發(fā)送消息的話渴语,問(wèn)題就來(lái)了。被調(diào)用的方法可能讀到 GoodActor 的私有實(shí)例數(shù)據(jù)牙甫,而這些數(shù)據(jù)可能是由另一個(gè)線程寫(xiě)進(jìn)去调违。結(jié)果是,你需要確保 BadActor 線程對(duì)這些實(shí)例數(shù)據(jù)的讀取和 GoodActor 線程對(duì)這些數(shù)據(jù)的寫(xiě)入是同步在一個(gè)鎖上的且轨。一旦繞開(kāi)了 actor 之間的消息傳遞機(jī)制虚婿,就回到了共享數(shù)據(jù)和鎖模型中。
優(yōu)選不可變的消息
由于 Scala 的 actor 模型提供了在每個(gè) actor 的 act 方法中的單線程環(huán)境至朗,不需要擔(dān)心在這個(gè)方法的實(shí)現(xiàn)中使用的對(duì)象是否是線程安全的剧浸。
確保消息對(duì)象是線程安全的最佳途徑是在消息中使用不可變對(duì)象。任何只有 val 字段且這些字段只引用到不可變對(duì)象的類的實(shí)例都是不可變的嫌变。
如果你發(fā)現(xiàn)自己有一個(gè)可變的對(duì)象腾啥,想繼續(xù)使用它,同時(shí)也想用消息發(fā)送給另一個(gè) actor碑宴,此時(shí)應(yīng)該考慮制作并發(fā)送它的一個(gè)副本,比如利用 clone 方法祸挪。
讓消息自包含
向某個(gè) actor 發(fā)送消息贞间,如果你想得到這個(gè) actor 的回復(fù)增热,可以在消息中包含自身。示例如下:
class ActorItem extends Actor {
def act(): Unit = {
react {
case (name: String, actor: Actor) => {
println( name )
actor ! "Done"
}
}
}
}
object Test {
def main (args: Array[String]): Unit = {
val actorItem = new ActorItem
actorItem.start()
actorItem ! ("scala", self)
receive {
case msg => println( msg )
}
}
}
輸出:
scala
Done
使用樣本類
在上例中公黑,若把(name: String, actor: Actor)
定義成類摄咆,代碼可讀性會(huì)大大提高
case class Info(name: String, actor: Actor)
class ActorItem extends Actor {
def act(): Unit = {
react {
case Info => {
println( Info.name )
actor ! "Done"
}
}
}
}
**傳送門: **Scala 在簡(jiǎn)書(shū)目錄