Akka Stream之流中的錯(cuò)誤處理

當(dāng)流中的某個(gè)階段失敗時(shí), 通常會(huì)導(dǎo)致整個(gè)流被拆掉梯码。此時(shí)醉鳖,每個(gè)階段的下游得到關(guān)于失敗通知和上游得到關(guān)于取消通知舆驶。

在許多情況下, 您可能希望避免完全的流失敗, 這可以通過(guò)幾種不同的方法完成:

  • recover發(fā)出最終的元素, 然后在上游故障上正常完成流
  • recoverWithRetries創(chuàng)建一個(gè)新的上游并從失敗開(kāi)始處理
  • 在后退后重新啟動(dòng)流的部分
  • 對(duì)支持監(jiān)督策略的階段使用監(jiān)督策略

除了這些內(nèi)置的用于錯(cuò)誤處理的工具之外, 一個(gè)常見(jiàn)的模式是將流包裝到一個(gè)actor中, 并讓actor在失敗時(shí)重新啟動(dòng)整個(gè)流。

Recover

recover允許你注入一個(gè)最終元素,然后在上游失敗時(shí)完成流溶弟。通過(guò)一個(gè)偏函數(shù),決定哪些異常這樣恢復(fù)瞭郑。如果有個(gè)異常不匹配辜御,流將失敗。

如果您希望在失敗時(shí)優(yōu)雅地完成流, 而讓下游知道出現(xiàn)了故障, 則recover可能很有用屈张。

Source(0 to 6).map(n =>
  if (n < 5) n.toString
  else throw new RuntimeException("Boom!")).recover {
  case _: RuntimeException => "stream truncated"
}.runForeach(println)

則輸出可能是:

0
1
2
3
4
stream truncated

recoverWithRetries

recoverWithRetries 允許你在失敗的地方放入一個(gè)新的上游擒权,在失敗到指定的最大次數(shù)后恢復(fù)流。

通過(guò)一個(gè)偏函數(shù)阁谆,決定哪些異常這樣恢復(fù)碳抄。如果有個(gè)異常不匹配,流將失敗场绿。

val planB = Source(List("five", "six", "seven", "eight"))

Source(0 to 10).map(n =>
  if (n < 5) n.toString
  else throw new RuntimeException("Boom!")).recoverWithRetries(attempts = 1, {
  case _: RuntimeException => planB
}).runForeach(println)

輸出將是

0
1
2
3
4
five
six
seven
eight

正如Akka為actor提供回退監(jiān)督模式一樣, Akka stream也提供了一個(gè)RestartSource纳鼎、RestartSinkRestartFlow, 用于實(shí)施所謂指數(shù)回退監(jiān)控策略, 在某個(gè)階段失敗時(shí)再次啟動(dòng)它, 每次重新啟動(dòng)的延遲時(shí)間越來(lái)越長(zhǎng)。

當(dāng)某個(gè)階段因?yàn)橥獠抠Y源是否可用而失敗或完成時(shí)裳凸,而且需要一些時(shí)間重新啟動(dòng)贱鄙,這種模式有用。一個(gè)主要的例子是當(dāng)一個(gè)WebSocket連接因?yàn)镠TTP服務(wù)器運(yùn)行正在下降(可能因?yàn)槌?fù)荷)而失敗時(shí)姨谷。通過(guò)使用指數(shù)回退逗宁,避免進(jìn)行緊密的重新連接,這樣既可以讓HTTP服務(wù)器恢復(fù)一段時(shí)間梦湘,又避免在客戶(hù)端使用不必要的資源瞎颗。

以下代碼段顯示了如何使用akka.stream.scaladsl.RestartSource創(chuàng)建一個(gè)回退監(jiān)管,它將監(jiān)督給定的Source捌议。本例中哼拔,Source是一個(gè)服務(wù)器發(fā)送事件(SSE),由akka-http提供瓣颅。如果此處流失敗倦逐,將再次發(fā)送請(qǐng)求,以3,6,12,24和最終30秒的間隔增加(此處宫补,由于 maxBackoff 參數(shù)檬姥,它將保持上限)。

val restartSource = RestartSource.withBackoff(
  minBackoff = 3.seconds,
  maxBackoff = 30.seconds,
  randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
) { () =>
  // Create a source from a future of a source
  Source.fromFutureSource {
    // Make a single request with akka-http
    Http().singleRequest(HttpRequest(
      uri = "http://example.com/eventstream"))
      // Unmarshall it as a source of server sent events
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
  }
}

強(qiáng)烈建議使用 randomFactor 為回退間隔添加一點(diǎn)額外的方差, 以避免在完全相同的時(shí)間點(diǎn)重新啟動(dòng)多個(gè)流, 例如, 因?yàn)樗鼈冇捎诠蚕碣Y源 (如相同的服務(wù)器下線粉怕,并在相同間隔后重啟) 而停止健民。通過(guò)在重新啟動(dòng)間隔中增加額外的隨機(jī)性, 這些流將在時(shí)間上稍有不同的點(diǎn)開(kāi)始, 從而避免大量的通信量沖擊恢復(fù)的服務(wù)器或他們都需要聯(lián)系的其他資源。

上述 RestartSource 將永遠(yuǎn)不會(huì)終止, 除非Sink被送入取消贫贝。將它與 KillSwitch 結(jié)合使用通常會(huì)很方便, 以便在需要時(shí)可以終止它:

val killSwitch = restartSource
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(Sink.foreach(event => println(s"Got event: $event")))(Keep.left)
  .run()

doSomethingElse()

killSwitch.shutdown()

Sink和flow也可以被監(jiān)管秉犹,使用akka.stream.scaladsl.RestartSinkakka.stream.scaladsl.RestartFlow蛉谜。RestartSink 在取消時(shí)重新啟動(dòng), 而在輸入端口取消、輸出端口完成或輸出端口發(fā)送錯(cuò)誤時(shí)重新啟動(dòng) RestartFlow崇堵。

監(jiān)管策略

注意
支持監(jiān)管策略的各個(gè)階段都有明文規(guī)定, 如果一個(gè)階段的文檔中沒(méi)有說(shuō)明它遵守監(jiān)管策略, 就意味著它失敗, 而不是采用監(jiān)管型诚。

錯(cuò)誤處理策略受actor監(jiān)管策略的啟發(fā), 但語(yǔ)義已經(jīng)適應(yīng)了流處理的領(lǐng)域。最重要的區(qū)別是, 監(jiān)管不是自動(dòng)應(yīng)用到流階段, 而是每個(gè)階段必須顯式實(shí)現(xiàn)的東西筑辨。

在許多階段, 實(shí)現(xiàn)對(duì)監(jiān)管策略的支持可能甚至沒(méi)有意義, 對(duì)于連接到外部技術(shù)的階段尤其如此, 例如, 失敗的連接如果立即嘗試新連接, 可能仍然會(huì)失敗俺驶。

對(duì)于實(shí)現(xiàn)監(jiān)管的階段, 在通過(guò)使用屬性物化流時(shí), 可以選擇處理流元素的異常處理策略。

有三種方法可以處理應(yīng)用程序代碼中的異常:

  • Stop - 流以失敗完成棍辕。
  • Resume - 元素被丟棄暮现,流繼續(xù)執(zhí)行
  • Restart - 元素被丟棄,且流在重啟該階段后繼續(xù)執(zhí)行楚昭。重新啟動(dòng)階段意味著任何累積狀態(tài)都被清除栖袋。 這通常通過(guò)創(chuàng)建階段的新實(shí)例來(lái)執(zhí)行。

默認(rèn)情況下, 停止策略用于所有異常, 即在拋出異常時(shí), 流將以失敗完成抚太。

implicit val materializer = ActorMaterializer()
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// division by zero will fail the stream and the
// result here will be a Future completed with Failure(ArithmeticException)

可以在materializer的設(shè)置中定義流的默認(rèn)監(jiān)管策略塘幅。

val decider: Supervision.Decider = {
  case _: ArithmeticException => Supervision.Resume
  case _ => Supervision.Stop
}
implicit val materializer = ActorMaterializer(
  ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)

在這里你可以看到, 所有的 ArithmeticException 將恢復(fù)處理, 即導(dǎo)致除以零的元素被丟棄了。

注意
請(qǐng)注意, 丟棄元素可能會(huì)導(dǎo)致具有循環(huán)的圖中出現(xiàn)死鎖尿贫。

還可以為flow的所有操作定義監(jiān)管策略电媳。

implicit val materializer = ActorMaterializer()
val decider: Supervision.Decider = {
  case _: ArithmeticException => Supervision.Resume
  case _ => Supervision.Stop
}
val flow = Flow[Int]
  .filter(100 / _ < 50).map(elem => 100 / (5 - elem))
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
val source = Source(0 to 5).via(flow)

val result = source.runWith(Sink.fold(0)(_ + _))
// the elements causing division by zero will be dropped
// result here will be a Future completed with Success(150)

重新啟動(dòng)的工作方式與恢復(fù)類(lèi)似,除了故障處理階段的累加狀態(tài)(如果有的話)將被重置庆亡。

implicit val materializer = ActorMaterializer()
val decider: Supervision.Decider = {
  case _: IllegalArgumentException => Supervision.Restart
  case _ => Supervision.Stop
}
val flow = Flow[Int]
  .scan(0) { (acc, elem) =>
    if (elem < 0) throw new IllegalArgumentException("negative not allowed")
    else acc + elem
  }
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
val source = Source(List(1, 3, -1, 5, 7)).via(flow)
val result = source.limit(1000).runWith(Sink.seq)
// 負(fù)數(shù)元素導(dǎo)致scan階段重啟
// 即再次從0開(kāi)始
// 結(jié)果將是以Success(Vector(0, 1, 4, 0, 5, 12))完成的Future 

來(lái)自mapAsync錯(cuò)誤

流監(jiān)管也可以應(yīng)用于mapAsyncmapAsyncUnordered的future匾乓,即使這些錯(cuò)誤發(fā)生于future而不是在階段自身。

假設(shè)我們使用外部服務(wù)來(lái)查找電子郵件地址又谋,我們希望丟棄那些無(wú)法找到的地址拼缝。

我們開(kāi)始于推文的作者流:

val authors: Source[Author, NotUsed] =
  tweets
    .filter(_.hashtags.contains(akkaTag))
    .map(_.author)

假設(shè)我們可以使用以下方式查找其電子郵件地址:

def lookupEmail(handle: String): Future[String] =

當(dāng)電子郵件沒(méi)有找到時(shí),FutureFailure完成彰亥。

通過(guò)使用lookupEmail服務(wù)以及使用mapAsync, 可以將作者流轉(zhuǎn)換為電子郵件地址流, 并使用Supervision.resumingDecider丟棄未知電子郵件地址:

import ActorAttributes.supervisionStrategy
import Supervision.resumingDecider

val emailAddresses: Source[String, NotUsed] =
  authors.via(
    Flow[Author].mapAsync(4)(author => addressSystem.lookupEmail(author.handle))
      .withAttributes(supervisionStrategy(resumingDecider)))

如果不使用Resume而是默認(rèn)的停止策略咧七,那么流將在第一個(gè)帶有Failure完成的Future時(shí),以失敗完成流任斋。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末继阻,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子仁卷,更是在濱河造成了極大的恐慌穴翩,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锦积,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡歉嗓,警方通過(guò)查閱死者的電腦和手機(jī)丰介,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人哮幢,你說(shuō)我怎么就攤上這事带膀。” “怎么了橙垢?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵垛叨,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我柜某,道長(zhǎng)嗽元,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任喂击,我火速辦了婚禮剂癌,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘翰绊。我一直安慰自己佩谷,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布监嗜。 她就那樣靜靜地躺著谐檀,像睡著了一般。 火紅的嫁衣襯著肌膚如雪裁奇。 梳的紋絲不亂的頭發(fā)上桐猬,一...
    開(kāi)封第一講書(shū)人閱讀 51,688評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音框喳,去河邊找鬼课幕。 笑死,一個(gè)胖子當(dāng)著我的面吹牛五垮,可吹牛的內(nèi)容都是我干的乍惊。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼放仗,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼润绎!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起诞挨,我...
    開(kāi)封第一講書(shū)人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤莉撇,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后惶傻,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體棍郎,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年银室,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了涂佃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片励翼。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖辜荠,靈堂內(nèi)的尸體忽然破棺而出汽抚,到底是詐尸還是另有隱情,我是刑警寧澤伯病,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布造烁,位于F島的核電站,受9級(jí)特大地震影響午笛,放射性物質(zhì)發(fā)生泄漏惭蟋。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一季研、第九天 我趴在偏房一處隱蔽的房頂上張望敞葛。 院中可真熱鬧,春花似錦与涡、人聲如沸惹谐。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)氨肌。三九已至,卻和暖如春酌畜,著一層夾襖步出監(jiān)牢的瞬間怎囚,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工桥胞, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留恳守,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓贩虾,卻偏偏與公主長(zhǎng)得像催烘,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子缎罢,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 1 基本流處理 讓我們首先看看使用akka-stream處理流的真正含義伊群。圖1展示了在某個(gè)處理節(jié)點(diǎn)上,元素是一個(gè)個(gè)...
    樂(lè)言筆記閱讀 2,656評(píng)論 1 1
  • (1)viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], M...
    樂(lè)言筆記閱讀 2,400評(píng)論 0 0
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理策精,服務(wù)發(fā)現(xiàn)舰始,斷路器,智...
    卡卡羅2017閱讀 134,659評(píng)論 18 139
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 172,167評(píng)論 25 707
  • 《道德經(jīng)》有云咽袜,上善若水丸卷,水善利萬(wàn)物而不爭(zhēng),處眾人之所惡询刹,故幾于道也及老。 道為何物抽莱,老子又說(shuō)道可道范抓,非常道骄恶,名可名,...
    lin秀閱讀 1,026評(píng)論 0 0