squbs-20. 流的生命周期

原文地址:Streams Lifecycle
Akka Streams/Reactive流需要和服務(wù)的Runtime Lifecycle 集成每强。為了這個(gè),一個(gè)自動(dòng)化的或半自動(dòng)話的集成通過 PerpetualStream實(shí)現(xiàn)百揭。為了直接對流源的細(xì)粒度控制, LifecycleManaged提供一個(gè)包裝阁簸,可以控制任何源組件的可能發(fā)生的停止或關(guān)閉畔柔,以便流可以優(yōu)雅的啟動(dòng)/關(guān)閉。

永久流(PerpetualStream)

PerpetualStream是一個(gè)特性(triat)捍掺,允許聲明一個(gè)可以啟動(dòng)的流撼短,當(dāng)服務(wù)優(yōu)雅的啟動(dòng)和關(guān)閉,不會(huì)在服務(wù)關(guān)閉的時(shí)候丟失消息乡小。

使用 PerpetualStream的流將符合以下要求阔加,將允許PerpetualStream 中的鉤子通過最小自定義重寫數(shù)無縫的工作:

  1. killSwitch.flow作為在source之后的流的第一個(gè)階段。killSwitch 是一個(gè)標(biāo)準(zhǔn)的Akka SharedKillSwitch满钟,通過PerpetualStream特性(trait)提供胜榔。
  2. stream實(shí)現(xiàn) FutureProduct通過它們最后的元素。 ProductTuple, List和其他的超類湃番。Sink物化Future是很自然的夭织。如果更多的物化值需要,它通常來自某種形式的 Product吠撮。Sink成為流最后的元素尊惰,也通常物化Product的最后一個(gè)元素。
  3. Future呈現(xiàn)流的完結(jié)(物化值或最后的物化值)泥兰。換句話說弄屡,流結(jié)束時(shí)future完成。

如果滿足以上所有要求鞋诗,沒有其他自定義重寫用于PerpetualStream函數(shù)膀捷。下面的代碼符合PerpetualStream

class WellBehavedStream extends PerpetualStream[Future[Done]] {

  def generator = Iterator.iterate(0) { p => 
    if (p == Int.MaxValue) 0 else p + 1 
  }

  val source = Source.fromIterator(generator _)

  val ignoreSink = Sink.ignore
  
  override def streamGraph = RunnableGraph.fromGraph(GraphDSL.create(ignoreSink) {
    implicit builder =>
      sink =>
        import GraphDSL.Implicits._
        source ~> killSwitch.flow[Int] ~> sink
        ClosedShape
  })
}

這就是。這個(gè)流行為良好削彬,因?yàn)樗锘?sink物化值全庸,即Future[Done]秀仲。

關(guān)閉重寫

有時(shí)不可能定義良好的流。舉個(gè)例子壶笼,Sink可能不會(huì)物化 Future或你需要在關(guān)閉時(shí)做更多的清理神僵。因?yàn)檫@個(gè)原因,可以通過重寫shutdown如下:

  override def shutdown(): Future[Done] = {
    // Do all your cleanup
    // For safety, call super
    super.shutdown()
    // The Future from super.shutdown may not mean anything.
    // Feel free to create your own future that identifies the
    // stream being done. Return your Future instead.
  }

shutdown需要使用如下:

  1. 初始化流的關(guān)閉
  2. 做其他清理
  3. 當(dāng)流結(jié)束處理覆劈,返回future

注意:建議任何情況下調(diào)用 super.shutdown保礼。調(diào)用是無害的或有其他副作用。

備用關(guān)閉機(jī)制

相比于使用killSwitch墩崩, source 可以提供一個(gè)更好方式來正確的關(guān)閉氓英。在這種情況下,僅使用source的關(guān)閉機(jī)制和重寫 shutdown來發(fā)起source的關(guān)閉鹦筹。killSwitch 依然未使用铝阐。

Kill Switch 重寫

如果killSwitch需要跨多流共享,你可以重寫 killSwitch來反射共享實(shí)例

  override lazy val killSwitch = mySharedKillSwitch

接收消息并將其轉(zhuǎn)發(fā)到流

一些流從actor消息中獲取輸入铐拐。雖然可能一些流配置可以物化source中的ActorRef徘键,然而很難調(diào)用這個(gè)actor。因?yàn)?code>PerpetualStream自身是個(gè)actor遍蟋,他可以擁有一個(gè)公開的地址/路徑并且轉(zhuǎn)發(fā)消息至流source吹害。這樣做,我們需要重寫receive 如下:

  override def receive = {
    case msg: MyStreamMessage =>
      val (sourceActorRef, _) = matValue
      sourceActorRef forward msg
  }

處理流錯(cuò)誤

PerpetualStream默認(rèn)從錯(cuò)誤中恢復(fù)不會(huì)被流stage捕獲虚青。這個(gè)消息引起忽略異常它呀。如果需要一個(gè)不同的行為的話,重寫 decider棒厘。

  override def decider: Supervision.Decider = { t => 
    log.error("Uncaught error {} from stream", t)
    t.printStackTrace()
    Restart
  }

Restart將重啟有錯(cuò)誤的stage纵穿,而不會(huì)重啟stream。查看Supervision Strategies獲得可能的策略奢人。

結(jié)合一下

下面的例子盡可能重寫上面討論的內(nèi)容:

class MsgReceivingStream extends PerpetualStream[(ActorRef, Future[Done])] {

  val actorSource = Source.actorPublisher[MyStreamMsg](Props[MyPublisher])
  val ignoreSink = Sink.ignore[MyStreamMsg]
  
  override def streamGraph = RunnableGraph.fromGraph(GraphDSL.create(actorSource, ignoreSink)((_, _)) {
    implicit builder =>
      (source, sink) =>
        import GraphDSL.Implicits._
        source ~> sink
        ClosedShape
  })
  
  // Just forward the message to the stream source
  override def receive = {
    case msg: MyStreamMsg =>
      val (sourceActorRef, _) = matValue
      sourceActorRef forward msg
  }
  
  override def shutdown() = {
    val (sourceActorRef, _) = matValue
    sourceActorRef ! cancelStream
    // Sink materialization conforms
    // so super.shutdown() will give the right future
    super.shutdown()
  }
}

制作一個(gè)Lifecycle-Sensitive source

如果你期望擁有一個(gè)source谓媒,完全鏈接squbs的生命周期的動(dòng)作,你可以將source包裹 LifecycleManaged:

Scala

val inSource = <your-original-source>
val aggregatedSource = LifecycleManaged().source(inSource)

Java

final Source inSource = <your-original-source>
final Source aggregatedSource = new LifecycleManaged(system).source(inSource);

這個(gè)結(jié)果source將集成source實(shí)例化成一個(gè)(T, ActorRef) 何乎, TinSource 的實(shí)例化類型句惯, ActorRef 是 trigger actor的實(shí)例化類型(從Unicomplex接收事件,squbs的容器)

這個(gè)集成source直到生命周期狀態(tài)變成Active才開始從源source中發(fā)出支救,并且在生命周期狀態(tài)成為 Stopping之后停止發(fā)出元素和關(guān)閉流抢野。

個(gè)性化集成Triggered Source

如果你想要你的flow啟動(dòng)/停用個(gè)性化的事件,你可以整合一個(gè)個(gè)性化的trigger source各墨,元素true將會(huì)啟用指孤,元素false將會(huì)停用。

注意 Trigger有一個(gè)參數(shù)eagerComplete默認(rèn)為false在scala中欲主,而在JAVA中需要傳遞邓厕。如果eagerComplete設(shè)置為false,trigger source 的結(jié)束或終止將脫離這個(gè)觸發(fā)扁瓢。如果設(shè)置為true详恼,這個(gè)終止會(huì)完成這個(gè)流。

Scala

import org.squbs.stream.TriggerEvent._

val inSource = <your-original-source>
val trigger = <your-custom-trigger-source>.collect {
  case 0 => DISABLE
  case 1 => ENABLE
}

val aggregatedSource = new Trigger().source(inSource, trigger)

Java

import static org.squbs.stream.TriggerEvent.DISABLE;
import static org.squbs.stream.TriggerEvent.ENABLE;

final Source<?, ?> inSource = <your-original-source>;
final Source<?, ?> trigger = <your-custom-trigger-source>.collect(new PFBuilder<Integer, TriggerEvent>()
    .match(Integer.class, p -> p == 1, p -> ENABLE)
    .match(Integer.class, p -> p == 0, p -> DISABLE)
    .build());

final Source aggregatedSource = new Trigger(false).source(inSource, trigger);

個(gè)性化的生命周期事件觸發(fā)

如果你想要在ActiveStopping之外響應(yīng)更多生命周期引几,舉個(gè)例子昧互,你想要Failed來同時(shí)關(guān)閉flow,你可以修改生命周期事件映射伟桅。

import org.squbs.stream.TriggerEvent._

val inSource = <your-original-source>
val trigger = Source.actorPublisher[LifecycleState](Props.create(classOf[UnicomplexActorPublisher]))
  .collect {
    case Active => ENABLE
    case Stopping | Failed => DISABLE
  }

val aggregatedSource = new Trigger().source(inSource, trigger)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末敞掘,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子楣铁,更是在濱河造成了極大的恐慌玖雁,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件盖腕,死亡現(xiàn)場離奇詭異赫冬,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)溃列,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進(jìn)店門劲厌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人听隐,你說我怎么就攤上這事补鼻。” “怎么了雅任?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵风范,是天一觀的道長。 經(jīng)常有香客問我椿访,道長乌企,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任成玫,我火速辦了婚禮加酵,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘哭当。我一直安慰自己猪腕,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布钦勘。 她就那樣靜靜地躺著陋葡,像睡著了一般。 火紅的嫁衣襯著肌膚如雪彻采。 梳的紋絲不亂的頭發(fā)上腐缤,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天捌归,我揣著相機(jī)與錄音,去河邊找鬼岭粤。 笑死惜索,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的剃浇。 我是一名探鬼主播巾兆,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼虎囚!你這毒婦竟也來了角塑?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤淘讥,失蹤者是張志新(化名)和其女友劉穎圃伶,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體适揉,經(jīng)...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡留攒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嫉嘀。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片炼邀。...
    茶點(diǎn)故事閱讀 38,789評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖剪侮,靈堂內(nèi)的尸體忽然破棺而出拭宁,到底是詐尸還是另有隱情,我是刑警寧澤瓣俯,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布杰标,位于F島的核電站,受9級特大地震影響彩匕,放射性物質(zhì)發(fā)生泄漏腔剂。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一驼仪、第九天 我趴在偏房一處隱蔽的房頂上張望掸犬。 院中可真熱鬧,春花似錦绪爸、人聲如沸湾碎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽介褥。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間柔滔,已是汗流浹背溢陪。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留睛廊,地道東北人嬉愧。 一個(gè)月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像喉前,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子王财,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內(nèi)容