當(dāng)流中的某個(gè)階段失敗時(shí), 通常會(huì)導(dǎo)致整個(gè)流被拆掉梯码。此時(shí)醉鳖,每個(gè)階段的下游得到關(guān)于失敗通知和上游得到關(guān)于取消通知舆驶。
在許多情況下, 您可能希望避免完全的流失敗, 這可以通過(guò)幾種不同的方法完成:
-
recover
發(fā)出最終的元素, 然后在上游故障上正常完成流 -
recoverWithRetries
創(chuàng)建一個(gè)新的上游并從失敗開(kāi)始處理 - 在后退后重新啟動(dòng)流的部分
- 對(duì)支持監(jiān)督策略的階段使用監(jiān)督策略
除了這些內(nèi)置的用于錯(cuò)誤處理的工具之外, 一個(gè)常見(jiàn)的模式是將流包裝到一個(gè)actor中, 并讓actor在失敗時(shí)重新啟動(dòng)整個(gè)流。
Recover
recover
允許你注入一個(gè)最終元素,然后在上游失敗時(shí)完成流溶弟。通過(guò)一個(gè)偏函數(shù),決定哪些異常這樣恢復(fù)瞭郑。如果有個(gè)異常不匹配辜御,流將失敗。
如果您希望在失敗時(shí)優(yōu)雅地完成流, 而讓下游知道出現(xiàn)了故障, 則recover
可能很有用屈张。
Source(0 to 6).map(n =>
if (n < 5) n.toString
else throw new RuntimeException("Boom!")).recover {
case _: RuntimeException => "stream truncated"
}.runForeach(println)
則輸出可能是:
0
1
2
3
4
stream truncated
recoverWithRetries
recoverWithRetries
允許你在失敗的地方放入一個(gè)新的上游擒权,在失敗到指定的最大次數(shù)后恢復(fù)流。
通過(guò)一個(gè)偏函數(shù)阁谆,決定哪些異常這樣恢復(fù)碳抄。如果有個(gè)異常不匹配,流將失敗场绿。
val planB = Source(List("five", "six", "seven", "eight"))
Source(0 to 10).map(n =>
if (n < 5) n.toString
else throw new RuntimeException("Boom!")).recoverWithRetries(attempts = 1, {
case _: RuntimeException => planB
}).runForeach(println)
輸出將是
0
1
2
3
4
five
six
seven
eight
正如Akka為actor提供回退監(jiān)督模式一樣, Akka stream也提供了一個(gè)RestartSource
纳鼎、RestartSink
和 RestartFlow
, 用于實(shí)施所謂指數(shù)回退監(jiān)控策略, 在某個(gè)階段失敗時(shí)再次啟動(dòng)它, 每次重新啟動(dòng)的延遲時(shí)間越來(lái)越長(zhǎng)。
當(dāng)某個(gè)階段因?yàn)橥獠抠Y源是否可用而失敗或完成時(shí)裳凸,而且需要一些時(shí)間重新啟動(dòng)贱鄙,這種模式有用。一個(gè)主要的例子是當(dāng)一個(gè)WebSocket連接因?yàn)镠TTP服務(wù)器運(yùn)行正在下降(可能因?yàn)槌?fù)荷)而失敗時(shí)姨谷。通過(guò)使用指數(shù)回退逗宁,避免進(jìn)行緊密的重新連接,這樣既可以讓HTTP服務(wù)器恢復(fù)一段時(shí)間梦湘,又避免在客戶(hù)端使用不必要的資源瞎颗。
以下代碼段顯示了如何使用akka.stream.scaladsl.RestartSource
創(chuàng)建一個(gè)回退監(jiān)管,它將監(jiān)督給定的Source捌议。本例中哼拔,Source是一個(gè)服務(wù)器發(fā)送事件(SSE),由akka-http提供瓣颅。如果此處流失敗倦逐,將再次發(fā)送請(qǐng)求,以3,6,12,24和最終30秒的間隔增加(此處宫补,由于 maxBackoff 參數(shù)檬姥,它將保持上限)。
val restartSource = RestartSource.withBackoff(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
) { () =>
// Create a source from a future of a source
Source.fromFutureSource {
// Make a single request with akka-http
Http().singleRequest(HttpRequest(
uri = "http://example.com/eventstream"))
// Unmarshall it as a source of server sent events
.flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
}
}
強(qiáng)烈建議使用 randomFactor 為回退間隔添加一點(diǎn)額外的方差, 以避免在完全相同的時(shí)間點(diǎn)重新啟動(dòng)多個(gè)流, 例如, 因?yàn)樗鼈冇捎诠蚕碣Y源 (如相同的服務(wù)器下線粉怕,并在相同間隔后重啟) 而停止健民。通過(guò)在重新啟動(dòng)間隔中增加額外的隨機(jī)性, 這些流將在時(shí)間上稍有不同的點(diǎn)開(kāi)始, 從而避免大量的通信量沖擊恢復(fù)的服務(wù)器或他們都需要聯(lián)系的其他資源。
上述 RestartSource 將永遠(yuǎn)不會(huì)終止, 除非Sink被送入取消贫贝。將它與 KillSwitch 結(jié)合使用通常會(huì)很方便, 以便在需要時(shí)可以終止它:
val killSwitch = restartSource
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(event => println(s"Got event: $event")))(Keep.left)
.run()
doSomethingElse()
killSwitch.shutdown()
Sink和flow也可以被監(jiān)管秉犹,使用akka.stream.scaladsl.RestartSink
和akka.stream.scaladsl.RestartFlow
蛉谜。RestartSink 在取消時(shí)重新啟動(dòng), 而在輸入端口取消、輸出端口完成或輸出端口發(fā)送錯(cuò)誤時(shí)重新啟動(dòng) RestartFlow崇堵。
監(jiān)管策略
注意
支持監(jiān)管策略的各個(gè)階段都有明文規(guī)定, 如果一個(gè)階段的文檔中沒(méi)有說(shuō)明它遵守監(jiān)管策略, 就意味著它失敗, 而不是采用監(jiān)管型诚。
錯(cuò)誤處理策略受actor監(jiān)管策略的啟發(fā), 但語(yǔ)義已經(jīng)適應(yīng)了流處理的領(lǐng)域。最重要的區(qū)別是, 監(jiān)管不是自動(dòng)應(yīng)用到流階段, 而是每個(gè)階段必須顯式實(shí)現(xiàn)的東西筑辨。
在許多階段, 實(shí)現(xiàn)對(duì)監(jiān)管策略的支持可能甚至沒(méi)有意義, 對(duì)于連接到外部技術(shù)的階段尤其如此, 例如, 失敗的連接如果立即嘗試新連接, 可能仍然會(huì)失敗俺驶。
對(duì)于實(shí)現(xiàn)監(jiān)管的階段, 在通過(guò)使用屬性物化流時(shí), 可以選擇處理流元素的異常處理策略。
有三種方法可以處理應(yīng)用程序代碼中的異常:
- Stop - 流以失敗完成棍辕。
- Resume - 元素被丟棄暮现,流繼續(xù)執(zhí)行
- Restart - 元素被丟棄,且流在重啟該階段后繼續(xù)執(zhí)行楚昭。重新啟動(dòng)階段意味著任何累積狀態(tài)都被清除栖袋。 這通常通過(guò)創(chuàng)建階段的新實(shí)例來(lái)執(zhí)行。
默認(rèn)情況下, 停止策略用于所有異常, 即在拋出異常時(shí), 流將以失敗完成抚太。
implicit val materializer = ActorMaterializer()
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// division by zero will fail the stream and the
// result here will be a Future completed with Failure(ArithmeticException)
可以在materializer的設(shè)置中定義流的默認(rèn)監(jiān)管策略塘幅。
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)
在這里你可以看到, 所有的 ArithmeticException 將恢復(fù)處理, 即導(dǎo)致除以零的元素被丟棄了。
注意
請(qǐng)注意, 丟棄元素可能會(huì)導(dǎo)致具有循環(huán)的圖中出現(xiàn)死鎖尿贫。
還可以為flow的所有操作定義監(jiān)管策略电媳。
implicit val materializer = ActorMaterializer()
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
val flow = Flow[Int]
.filter(100 / _ < 50).map(elem => 100 / (5 - elem))
.withAttributes(ActorAttributes.supervisionStrategy(decider))
val source = Source(0 to 5).via(flow)
val result = source.runWith(Sink.fold(0)(_ + _))
// the elements causing division by zero will be dropped
// result here will be a Future completed with Success(150)
重新啟動(dòng)的工作方式與恢復(fù)類(lèi)似,除了故障處理階段的累加狀態(tài)(如果有的話)將被重置庆亡。
implicit val materializer = ActorMaterializer()
val decider: Supervision.Decider = {
case _: IllegalArgumentException => Supervision.Restart
case _ => Supervision.Stop
}
val flow = Flow[Int]
.scan(0) { (acc, elem) =>
if (elem < 0) throw new IllegalArgumentException("negative not allowed")
else acc + elem
}
.withAttributes(ActorAttributes.supervisionStrategy(decider))
val source = Source(List(1, 3, -1, 5, 7)).via(flow)
val result = source.limit(1000).runWith(Sink.seq)
// 負(fù)數(shù)元素導(dǎo)致scan階段重啟
// 即再次從0開(kāi)始
// 結(jié)果將是以Success(Vector(0, 1, 4, 0, 5, 12))完成的Future
來(lái)自mapAsync錯(cuò)誤
流監(jiān)管也可以應(yīng)用于mapAsync
和mapAsyncUnordered
的future匾乓,即使這些錯(cuò)誤發(fā)生于future而不是在階段自身。
假設(shè)我們使用外部服務(wù)來(lái)查找電子郵件地址又谋,我們希望丟棄那些無(wú)法找到的地址拼缝。
我們開(kāi)始于推文的作者流:
val authors: Source[Author, NotUsed] =
tweets
.filter(_.hashtags.contains(akkaTag))
.map(_.author)
假設(shè)我們可以使用以下方式查找其電子郵件地址:
def lookupEmail(handle: String): Future[String] =
當(dāng)電子郵件沒(méi)有找到時(shí),Future
以Failure
完成彰亥。
通過(guò)使用lookupEmail
服務(wù)以及使用mapAsync
, 可以將作者流轉(zhuǎn)換為電子郵件地址流, 并使用Supervision.resumingDecider
丟棄未知電子郵件地址:
import ActorAttributes.supervisionStrategy
import Supervision.resumingDecider
val emailAddresses: Source[String, NotUsed] =
authors.via(
Flow[Author].mapAsync(4)(author => addressSystem.lookupEmail(author.handle))
.withAttributes(supervisionStrategy(resumingDecider)))
如果不使用Resume
而是默認(rèn)的停止策略咧七,那么流將在第一個(gè)帶有Failure完成的Future時(shí),以失敗完成流任斋。