AKKA中的Event Stream

在《企業(yè)應(yīng)用集成模式》一書中疟羹,定義了許多與消息處理有關(guān)的模式,其中運用最為廣泛的模式為Publisher-Subscriber模式,尤其是在異步處理場景下。

基于Publisher-Subscriber模式冶忱,還可以根據(jù)不同的場景衍生出特殊的模式,例如針對一個Publisher和多個Subscriber境析,演化為Broadcast模式和Message Router模式囚枪。前者會將消息同時發(fā)送給所有的Subscriber,實現(xiàn)分布式的并行處理劳淆。例如針對訂單處理的場景链沼,當(dāng)顧客下訂單后,既需要生成訂單沛鸵,又需要通知庫存準(zhǔn)備發(fā)貨括勺,還需要通知賣方和買方。這些任務(wù)雖然存在事務(wù)的一致性谒臼,但基于BASE原則朝刊,可以通過補(bǔ)償機(jī)制實現(xiàn)事務(wù)的最終一致性。于是蜈缤,設(shè)計時可以將這些任務(wù)交給不同的Subscriber,當(dāng)接收到消息后冯挎,同時對訂單進(jìn)行處理底哥。至于Message Router,則需要引入的Router對傳入的消息作出智能判斷房官,從而將消息傳遞給真正感興趣的Subscriber趾徽。這就好像發(fā)布者同時發(fā)布了不同的刊物,訂閱者只訂閱自己喜歡的刊物翰守。

而消息總線(message bus)則通過引入總線來徹底解除Publisher與Subscriber之間的耦合孵奶,類似設(shè)計模式中的Mediator模式±澹總線就是Mediator了袁,用以協(xié)調(diào)Publisher與Subscriber之間的關(guān)系朗恳。或者载绿,我們也可以認(rèn)為是兩個Publisher-Subscriber的組合粥诫。對于Publisher而言,總線就是Subscriber崭庸;對于Subscriber而言怀浆,總線則成了Publisher。

AKKA提供的事件總線(Event Bus)可以看做是一種運用于特殊場景的消息總線怕享,此時事件即為消息执赡。它可以看做是Message Router模式的實現(xiàn),提供了向多個Actor發(fā)送消息的基礎(chǔ)設(shè)施函筋,內(nèi)含的Classifier作為分類器沙合,用于分發(fā)消息時選擇Subscriber,扮演了Message Router的角色驻呐。

在AKKA中灌诅,Event Bus被定義為trait,定義了基本的訂閱含末、取消訂閱猜拾、發(fā)布等對應(yīng)的方法,代碼如下所示:

trait EventBus {
  type Event
  type Classifier
  type Subscriber

  def subscribe(subscriber: Subscriber, to: Classifier): Boolean

  def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean

  def unsubscribe(subscriber: Subscriber): Unit

  def publish(event: Event): Unit
}

根據(jù)AKKA官方文檔的描述佣盒,Event為所有發(fā)布到該總線上的事件類型挎袜,Classifier是選擇訂閱者的分類器,Subscriber就是注冊到該總線上的訂閱者肥惭。它們均被定義為抽象的type盯仪,使得EventBus擁有最大的開放性。我們視情況而定去具體實現(xiàn)特定類型蜜葱。例如針對Actor的EventBus全景,訂閱者被指定為ActorRef類型:

trait ActorEventBus extends EventBus {
  type Subscriber = ActorRef
  protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b
}

AKKA運用Cake Pattern,以trait類型定義了一些特化的分類器牵囤。例如:

trait ActorClassifier { this: EventBus ?
  type Classifier = ActorRef
}

trait PredicateClassifier { this: EventBus ?
  type Classifier = Event ? Boolean
}

trait LookupClassification { this: EventBus ?

  protected final val subscribers = new Index[Classifier, Subscriber](mapSize(), new Comparator[Subscriber] {
    def compare(a: Subscriber, b: Subscriber): Int = compareSubscribers(a, b)
  })

  protected def mapSize(): Int

  protected def compareSubscribers(a: Subscriber, b: Subscriber): Int

  protected def classify(event: Event): Classifier

  protected def publish(event: Event, subscriber: Subscriber): Unit

  def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber)

  def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber)

  def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(subscriber)

  def publish(event: Event): Unit = {
    val i = subscribers.valueIterator(classify(event))
    while (i.hasNext) publish(event, i.next())
  }
}

除此之外爸黄,還有諸如SubchannelClassification、ScanningClassification揭鳞、ActorClassification等分類器炕贵,都采用了同樣的方式定義為trait。這樣就便于繼承EventBus的類進(jìn)行trait的混入野崇,例如EventStream的定義:

class EventStream(private val debug: Boolean = false) extends LoggingBus with SubchannelClassification {
  type Event = AnyRef
  type Classifier = Class[_]
}

由于LoggingBus繼承自ActorEventBus称开,故而EventStream中的Subscriber類型被定義為ActorRef。然后在EventStream中又重寫了Event和Classfier類型乓梨,分別為AnyRef和Class鳖轰,這說明任何Java引用對象都可以作為事件清酥,而分類的依據(jù)則為Event的類型。

type Subscriber = ActorRef
type Event = AnyRef
type Classifier = Class[_]

EventStream繼承了SubchannelClassification脆霎。在其中維持了訂閱者列表总处,雖然該訂閱列表類型為SubclassifiedIndex,不過我們可以將其簡單地視為一個Map(實際情況更復(fù)雜睛蛛,因為它實際上維護(hù)了分類的層級):

trait SubchannelClassification {
  this: EventBus ?

  private lazy val subscriptions = new SubclassifiedIndex[Classifier, Subscriber]()
}

由于此時的Classifier在EventStream中被定義為Class[_]鹦马,Subscriber為ActorRef,因此subscripions相當(dāng)于被定義為:

private lazy val subscriptions = new SubclassifiedIndex[Class[_], ActorRef]

當(dāng)Actor出現(xiàn)故障忆肾,從而使得消息被轉(zhuǎn)發(fā)給dead letter時荸频,我們可能需要偵聽這些死信,并對它們進(jìn)行處理客冈。則AKKA的做法就是通過EventStream來進(jìn)行訂閱:

class DeadLetterListener extends Actor {
  def receive = {
    case DeadLetter(msg, from, to) =>
      println(s"${System.currentTimeMillis()}: from $from to $to with message $msg.")
  }
}

val listener = system.actorOf(Props[DeadLetterListener], "listener")
system.eventStream.subscribe(listener, classOf[DeadLetter])

結(jié)合SubchannelClassification旭从,我們來分析其執(zhí)行過程。首先场仲,它通過subscribe方法將DeadLetterListener的actor引用對象以及事件類型DeadLetter注冊到SubchannelClassification中的subscriptions和悦。當(dāng)ActorSystem的任意actor發(fā)出DeadLetter時,就會觸發(fā)EventStream的publish()方法(實際上是調(diào)用了SubchannelClassification的publish()方法渠缕,進(jìn)而調(diào)用EventStream的publish()方法):

class EventStream(private val debug: Boolean = false) extends LoggingBus with SubchannelClassification {
  protected def publish(event: AnyRef, subscriber: ActorRef) = {
    if (subscriber.isTerminated) unsubscribe(subscriber)
    else subscriber ! event
  }
}

此時就會通過subscriber(此時為前面定義的DeadLetterListener)發(fā)送event(此時為DeadLetter對象)鸽素,從而進(jìn)入到DeadLetterListener的receive方法中,打印出我想要的消息亦鳞。

通過EventStream還可以處理日志消息馍忽。AKKA自身也提供了默認(rèn)的處理器,可以配置在application.conf文件中:

akka {
  event-handlers = ["akka.event.Logging$DefaultLogger"]
}

這個默認(rèn)的日志處理器會訂閱高于配置級別的日志事件類燕差,例如將日志級別配置為Debug:

system.eventStream.setLogLevel(Logging.DebugLevel)

通過這樣的配置遭笋,所有低于Debug級別的日志事件發(fā)生時,都不會被EventStream分發(fā)徒探。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末瓦呼,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子测暗,更是在濱河造成了極大的恐慌吵血,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件偷溺,死亡現(xiàn)場離奇詭異,居然都是意外死亡钱贯,警方通過查閱死者的電腦和手機(jī)挫掏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來秩命,“玉大人尉共,你說我怎么就攤上這事褒傅。” “怎么了袄友?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵殿托,是天一觀的道長。 經(jīng)常有香客問我剧蚣,道長支竹,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任鸠按,我火速辦了婚禮礼搁,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘目尖。我一直安慰自己馒吴,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布瑟曲。 她就那樣靜靜地躺著饮戳,像睡著了一般。 火紅的嫁衣襯著肌膚如雪洞拨。 梳的紋絲不亂的頭發(fā)上扯罐,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天,我揣著相機(jī)與錄音扣甲,去河邊找鬼篮赢。 笑死,一個胖子當(dāng)著我的面吹牛琉挖,可吹牛的內(nèi)容都是我干的启泣。 我是一名探鬼主播,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼示辈,長吁一口氣:“原來是場噩夢啊……” “哼寥茫!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起矾麻,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤纱耻,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后险耀,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體弄喘,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年甩牺,在試婚紗的時候發(fā)現(xiàn)自己被綠了蘑志。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖急但,靈堂內(nèi)的尸體忽然破棺而出澎媒,到底是詐尸還是另有隱情,我是刑警寧澤波桩,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布戒努,位于F島的核電站,受9級特大地震影響镐躲,放射性物質(zhì)發(fā)生泄漏储玫。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一匀油、第九天 我趴在偏房一處隱蔽的房頂上張望缘缚。 院中可真熱鬧,春花似錦敌蚜、人聲如沸桥滨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽齐媒。三九已至,卻和暖如春纷跛,著一層夾襖步出監(jiān)牢的瞬間喻括,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工贫奠, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留唬血,地道東北人。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓唤崭,卻偏偏與公主長得像拷恨,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子谢肾,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,871評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 1 基本流處理 讓我們首先看看使用akka-stream處理流的真正含義腕侄。圖1展示了在某個處理節(jié)點上,元素是一個個...
    樂言筆記閱讀 2,656評論 1 1
  • 與Actor集成 為了將流的元素作為消息傳遞給一個普通的actor芦疏,你可以在mapAsync里使用ask或者使用S...
    樂言筆記閱讀 3,864評論 0 1
  • Akka幫助您構(gòu)建可靠的應(yīng)用程序在一臺機(jī)器上使用多個處理器核心(“擴(kuò)大”)或分布在計算機(jī)網(wǎng)絡(luò)(“擴(kuò)張”)冕杠。關(guān)鍵的抽...
    兒哥欠三百首閱讀 2,717評論 0 0
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)酸茴,斷路器分预,智...
    卡卡羅2017閱讀 134,656評論 18 139
  • 項目到了一定階段會出現(xiàn)一種甜蜜的負(fù)擔(dān):業(yè)務(wù)的不斷發(fā)展與人員的流動性越來越大,代碼維護(hù)與測試回歸流程越來越繁瑣薪捍。這個...
    fdacc6a1e764閱讀 3,181評論 0 6