在《企業(yè)應(yīng)用集成模式》一書中疟羹,定義了許多與消息處理有關(guān)的模式,其中運用最為廣泛的模式為Publisher-Subscriber模式,尤其是在異步處理場景下。
基于Publisher-Subscriber模式冶忱,還可以根據(jù)不同的場景衍生出特殊的模式,例如針對一個Publisher和多個Subscriber境析,演化為Broadcast模式和Message Router模式囚枪。前者會將消息同時發(fā)送給所有的Subscriber,實現(xiàn)分布式的并行處理劳淆。例如針對訂單處理的場景链沼,當(dāng)顧客下訂單后,既需要生成訂單沛鸵,又需要通知庫存準(zhǔn)備發(fā)貨括勺,還需要通知賣方和買方。這些任務(wù)雖然存在事務(wù)的一致性谒臼,但基于BASE原則朝刊,可以通過補(bǔ)償機(jī)制實現(xiàn)事務(wù)的最終一致性。于是蜈缤,設(shè)計時可以將這些任務(wù)交給不同的Subscriber,當(dāng)接收到消息后冯挎,同時對訂單進(jìn)行處理底哥。至于Message Router,則需要引入的Router對傳入的消息作出智能判斷房官,從而將消息傳遞給真正感興趣的Subscriber趾徽。這就好像發(fā)布者同時發(fā)布了不同的刊物,訂閱者只訂閱自己喜歡的刊物翰守。
而消息總線(message bus)則通過引入總線來徹底解除Publisher與Subscriber之間的耦合孵奶,類似設(shè)計模式中的Mediator模式±澹總線就是Mediator了袁,用以協(xié)調(diào)Publisher與Subscriber之間的關(guān)系朗恳。或者载绿,我們也可以認(rèn)為是兩個Publisher-Subscriber的組合粥诫。對于Publisher而言,總線就是Subscriber崭庸;對于Subscriber而言怀浆,總線則成了Publisher。
AKKA提供的事件總線(Event Bus)可以看做是一種運用于特殊場景的消息總線怕享,此時事件即為消息执赡。它可以看做是Message Router模式的實現(xiàn),提供了向多個Actor發(fā)送消息的基礎(chǔ)設(shè)施函筋,內(nèi)含的Classifier作為分類器沙合,用于分發(fā)消息時選擇Subscriber,扮演了Message Router的角色驻呐。
在AKKA中灌诅,Event Bus被定義為trait,定義了基本的訂閱含末、取消訂閱猜拾、發(fā)布等對應(yīng)的方法,代碼如下所示:
trait EventBus {
type Event
type Classifier
type Subscriber
def subscribe(subscriber: Subscriber, to: Classifier): Boolean
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean
def unsubscribe(subscriber: Subscriber): Unit
def publish(event: Event): Unit
}
根據(jù)AKKA官方文檔的描述佣盒,Event為所有發(fā)布到該總線上的事件類型挎袜,Classifier是選擇訂閱者的分類器,Subscriber就是注冊到該總線上的訂閱者肥惭。它們均被定義為抽象的type盯仪,使得EventBus擁有最大的開放性。我們視情況而定去具體實現(xiàn)特定類型蜜葱。例如針對Actor的EventBus全景,訂閱者被指定為ActorRef類型:
trait ActorEventBus extends EventBus {
type Subscriber = ActorRef
protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b
}
AKKA運用Cake Pattern,以trait類型定義了一些特化的分類器牵囤。例如:
trait ActorClassifier { this: EventBus ?
type Classifier = ActorRef
}
trait PredicateClassifier { this: EventBus ?
type Classifier = Event ? Boolean
}
trait LookupClassification { this: EventBus ?
protected final val subscribers = new Index[Classifier, Subscriber](mapSize(), new Comparator[Subscriber] {
def compare(a: Subscriber, b: Subscriber): Int = compareSubscribers(a, b)
})
protected def mapSize(): Int
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
protected def classify(event: Event): Classifier
protected def publish(event: Event, subscriber: Subscriber): Unit
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber)
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber)
def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(subscriber)
def publish(event: Event): Unit = {
val i = subscribers.valueIterator(classify(event))
while (i.hasNext) publish(event, i.next())
}
}
除此之外爸黄,還有諸如SubchannelClassification、ScanningClassification揭鳞、ActorClassification等分類器炕贵,都采用了同樣的方式定義為trait。這樣就便于繼承EventBus的類進(jìn)行trait的混入野崇,例如EventStream的定義:
class EventStream(private val debug: Boolean = false) extends LoggingBus with SubchannelClassification {
type Event = AnyRef
type Classifier = Class[_]
}
由于LoggingBus繼承自ActorEventBus称开,故而EventStream中的Subscriber類型被定義為ActorRef。然后在EventStream中又重寫了Event和Classfier類型乓梨,分別為AnyRef和Class鳖轰,這說明任何Java引用對象都可以作為事件清酥,而分類的依據(jù)則為Event的類型。
type Subscriber = ActorRef
type Event = AnyRef
type Classifier = Class[_]
EventStream繼承了SubchannelClassification脆霎。在其中維持了訂閱者列表总处,雖然該訂閱列表類型為SubclassifiedIndex,不過我們可以將其簡單地視為一個Map(實際情況更復(fù)雜睛蛛,因為它實際上維護(hù)了分類的層級):
trait SubchannelClassification {
this: EventBus ?
private lazy val subscriptions = new SubclassifiedIndex[Classifier, Subscriber]()
}
由于此時的Classifier在EventStream中被定義為Class[_]鹦马,Subscriber為ActorRef,因此subscripions相當(dāng)于被定義為:
private lazy val subscriptions = new SubclassifiedIndex[Class[_], ActorRef]
當(dāng)Actor出現(xiàn)故障忆肾,從而使得消息被轉(zhuǎn)發(fā)給dead letter時荸频,我們可能需要偵聽這些死信,并對它們進(jìn)行處理客冈。則AKKA的做法就是通過EventStream來進(jìn)行訂閱:
class DeadLetterListener extends Actor {
def receive = {
case DeadLetter(msg, from, to) =>
println(s"${System.currentTimeMillis()}: from $from to $to with message $msg.")
}
}
val listener = system.actorOf(Props[DeadLetterListener], "listener")
system.eventStream.subscribe(listener, classOf[DeadLetter])
結(jié)合SubchannelClassification旭从,我們來分析其執(zhí)行過程。首先场仲,它通過subscribe方法將DeadLetterListener的actor引用對象以及事件類型DeadLetter注冊到SubchannelClassification中的subscriptions和悦。當(dāng)ActorSystem的任意actor發(fā)出DeadLetter時,就會觸發(fā)EventStream的publish()方法(實際上是調(diào)用了SubchannelClassification的publish()方法渠缕,進(jìn)而調(diào)用EventStream的publish()方法):
class EventStream(private val debug: Boolean = false) extends LoggingBus with SubchannelClassification {
protected def publish(event: AnyRef, subscriber: ActorRef) = {
if (subscriber.isTerminated) unsubscribe(subscriber)
else subscriber ! event
}
}
此時就會通過subscriber(此時為前面定義的DeadLetterListener)發(fā)送event(此時為DeadLetter對象)鸽素,從而進(jìn)入到DeadLetterListener的receive方法中,打印出我想要的消息亦鳞。
通過EventStream還可以處理日志消息馍忽。AKKA自身也提供了默認(rèn)的處理器,可以配置在application.conf文件中:
akka {
event-handlers = ["akka.event.Logging$DefaultLogger"]
}
這個默認(rèn)的日志處理器會訂閱高于配置級別的日志事件類燕差,例如將日志級別配置為Debug:
system.eventStream.setLogLevel(Logging.DebugLevel)
通過這樣的配置遭笋,所有低于Debug級別的日志事件發(fā)生時,都不會被EventStream分發(fā)徒探。