(1)viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ? Mat3): ReprMat[T, Mat3]
通過附加給定的步驟轉(zhuǎn)換此Flow呆细。
combine
函數(shù)用于組合此流和另外一個(gè)流的物化值,并放入Flow結(jié)果的物化值。
one of the values.建議使用內(nèi)部?jī)?yōu)化 Keep.left
和 Keep.right
保持, 而不是手動(dòng)編寫傳入其中一個(gè)值的函數(shù)墩划。
(2)toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ? Mat3): Sink[In, Mat3]
將Flow
連接到Sink
藕甩,連接兩者的處理步驟碗啄。
combine
函數(shù)用于組合此flow和Sink的物化值风瘦,并放入Sink結(jié)果的物化值凹蜈。
+----------------------------+
| Resulting Sink[In, M2] |
| |
| +------+ +------+ |
| | | | | |
In ~~> | flow | ~Out~> | sink | |
| | Mat| | M| |
| +------+ +------+ |
+----------------------------+
(3)to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Sink[In, Mat]
即toMat(sink)(Keep.left)
米苹。生成的Sink
的物化值將是現(xiàn)在Flow
的物化值(忽略提供的Sink
的物化值)糕伐;如果需要不同的策略,使用toMat
蘸嘶。
(4)mapMaterializedValue[Mat2](f: Mat ? Mat2): ReprMat[Out, Mat2]
轉(zhuǎn)換此Flow的物化值良瞧。
(5)joinMat[Mat2, Mat3](flow: Graph[FlowShape[Out, In], Mat2])(combine: (Mat, Mat2) ? Mat3): RunnableGraph[Mat3]
Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]]
通過連接輸入和輸出, 將此Flow加入到另一個(gè)Flow, 創(chuàng)建 RunnableGraph。
+------+ +-------+
| | ~Out~> | |
| this | | other |
| | <~In~ | |
+------+ +-------+
例如:
val connection = Tcp().outgoingConnection(localhost)
//#repl-client
val replParser =
Flow[String].takeWhile(_ != "q")
.concat(Source.single("BYE"))
.map(elem ? ByteString(s"$elem\n"))
val repl = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(text ? println("Server: " + text))
.map(_ ? readLine("> "))
.via(replParser)
connection.join(repl).run()
//#repl-client
(6)join[Mat2](flow: Graph[FlowShape[Out, In], Mat2]): RunnableGraph[Mat]
即joinMat(flow)(Keep.left)
(7)joinMat[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2])(combine: (Mat, Mat2) ? M): Flow[I2, O2, M]
將此 [[Flow]] 連接到 [[[BidiFlow]] 以關(guān)閉協(xié)議棧的 "頂部":
+---------------------------+
| Resulting Flow |
| |
| +------+ +------+ |
| | | ~Out~> | | ~~> O2
| | flow | | bidi | |
| | | <~In~ | | <~~ I2
| +------+ +------+ |
+---------------------------+
例如:
val bidi = BidiFlow.fromFlows(
Flow[Int].map(x ? x.toLong + 2).withAttributes(name("top")),
Flow[ByteString].map(_.decodeString("UTF-8")).withAttributes(name("bottom")))
val f = Flow[String].map(Integer.valueOf(_).toInt).join(bidi)
val result = Source(List(ByteString("1"), ByteString("2"))).via(f).limit(10).runWith(Sink.seq)
結(jié)果應(yīng)該是:Seq(3L, 4L)
(8)join[I2, O2, Mat2](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2]): Flow[I2, O2, Mat]
即joinMat(bidi)(Keep.left)
(9)withAttributes(attr: Attributes): Repr[Out]
將此 [[流]] 的屬性更改為給定部分, 并密封屬性列表训唱。這意味著進(jìn)一步的調(diào)用將無法刪除這些屬性, 而是添加新的褥蚯。請(qǐng)注意, 此操作對(duì)空流沒有影響 (因?yàn)檫@些屬性只適用于已包含的處理階段)。
例如:
val flow = Flow[Int]
.filter(100 / _ < 50).map(elem ? 100 / (5 - elem))
.withAttributes(ActorAttributes.supervisionStrategy(decider))
(10)addAttributes(attr: Attributes): Repr[Out]
將給定的屬性添加到此流中况增。進(jìn)一步調(diào)用 "withAttributes" 不會(huì)刪除這些屬性赞庶。請(qǐng)注意, 此操作對(duì)空流沒有影響 (因?yàn)檫@些屬性只適用于已包含的處理階段)。
例如:
val section = Flow[Int].map(_ * 2).async
.addAttributes(Attributes.inputBuffer(initial = 1, max = 1)) // the buffer size of this map is 1
val flow = section.via(Flow[Int].map(_ / 2)).async // the buffer size of this map is the default
(11)named(name: String): Repr[Out]
即addAttributes(Attributes.name(name))
。給Flow增加一個(gè)name
屬性歧强。
(12)async: Repr[Out]
即澜薄,addAttributes(Attributes.asyncBoundary)。在這個(gè)Flow周圍放置一個(gè)異步邊界摊册。
(13)runWith[Mat1, Mat2](source: Graph[SourceShape[In], Mat1], sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): (Mat1, Mat2)
將Source連接到Flow表悬,然后連接到Sink并運(yùn)行它。返回的元組包含了Source和Sink的物化值丧靡。
(14)toProcessor: RunnableGraph[Processor[In @uncheckedVariance, Out @uncheckedVariance]]
將此流轉(zhuǎn)換為一個(gè)RunnableGraph
(物化到一個(gè)響應(yīng)式流的org.reactivestreams.Processor
, 它實(shí)現(xiàn)了由此流程封裝的操作)蟆沫。每個(gè)物化都會(huì)產(chǎn)生一個(gè)新的Processor
實(shí)例, 即返回的 RunnableGraph
是可重用的。
(15)recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T]
恢復(fù)允許在發(fā)生故障時(shí)發(fā)送最后一個(gè)元素温治,并優(yōu)雅地完成流饭庞。由于底層故障信號(hào)onError到達(dá)帶外(out-of-band),它可能跳過現(xiàn)有元素熬荆。 該階段可以恢復(fù)故障信號(hào)舟山,但不能恢復(fù)跳過的元素(它們將被丟棄)。
在recover
中拋出異常將自動(dòng)記錄日志在ERROR級(jí)別卤恳。
當(dāng)上游元素可用或者上游失敗偏函數(shù)pf返回一個(gè)元素時(shí)累盗,發(fā)送元素。
當(dāng)下游背壓時(shí)背壓突琳。
當(dāng)下游完成或者引起上游失敗的異常pf能夠處理若债,則完成。
當(dāng)下游取消時(shí)拆融,取消蠢琳。
例如:
Source(0 to 6).map(n ?
if (n < 5) n.toString
else throw new RuntimeException("Boom!")
).recover {
case _: RuntimeException ? "stream truncated"
}.runForeach(println)
輸出是:
0
1
2
3
4
stream truncated
(16)recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T]
RecoverWithRetries 允許Flow失敗時(shí),切換到備用Source镜豹。在故障嘗試多次并恢復(fù)后, 它將繼續(xù)有效, 這樣每次出現(xiàn)故障時(shí), 它就會(huì)被送入偏函數(shù)"pf", 而一個(gè)新的源可能會(huì)被物化傲须。請(qǐng)注意, 如果傳入0, 則根本不會(huì)嘗試恢復(fù)。
attempts
設(shè)為負(fù)數(shù)趟脂,被解釋為無限泰讽。
由于底層故障信號(hào)onError到達(dá)帶外(out-of-band),它可能跳過現(xiàn)有元素昔期。 該階段可以恢復(fù)故障信號(hào)已卸,但不能恢復(fù)跳過的元素(它們將被丟棄)。
在recover
中拋出異常將自動(dòng)記錄日志在ERROR級(jí)別镇眷。
當(dāng)上游元素可用或者上游失敗偏函數(shù)pf返回一個(gè)元素時(shí)咬最,發(fā)送元素翎嫡。
當(dāng)下游背壓時(shí)背壓欠动。
當(dāng)下游完成或者引起上游失敗的異常pf能夠處理,則完成。
當(dāng)下游取消時(shí)具伍,取消翅雏。
參數(shù)attempts 重試的最大值或者設(shè)為-1將無限次重試。
參數(shù)偏函數(shù)pf 接收失敗原因并返回要物化的新Source, 如果有的話人芽。
如果attempts
是一個(gè)負(fù)數(shù)但不是-1拋出IllegalArgumentException 異常望几。
例如:
val planB = Source(List("five", "six", "seven", "eight"))
Source(0 to 10).map(n ?
if (n < 5) n.toString
else throw new RuntimeException("Boom!")
).recoverWithRetries(attempts = 1, {
case _: RuntimeException ? planB
}).runForeach(println)
輸出結(jié)果是:
0
1
2
3
4
five
six
seven
eight
(17)mapError(pf: PartialFunction[Throwable, Throwable]): Repr[Out]
雖然類似 recover
,但此階段可用于將錯(cuò)誤信號(hào)轉(zhuǎn)換為另一種, 而不將其日志記錄為過程中的錯(cuò)誤萤厅。因此, 從這個(gè)意義上說, 它并不完全等同于 recover(t => throw t2)
橄抹,因?yàn)?recover
將日志記錄 t2
錯(cuò)誤。
由于底層故障信號(hào) onError 到達(dá)帶外, 它可能會(huì)跳過現(xiàn)有的元素惕味。此階段可以恢復(fù)故障信號(hào), 但不能收回將被丟棄的已跳過的元素楼誓。
與recover
類似,在mapError
中拋出異常將被記錄名挥。
例如:
val ex = new RuntimeException("ex") with NoStackTrace
val boom = new Exception("BOOM!") with NoStackTrace
Source(1 to 3).map { a ? if (a == 2) throw ex else a }
.mapError { case t: Exception ? throw boom }
(18)map[T](f: Out ? T): Repr[T]
通過將給定函數(shù)應(yīng)用于每個(gè)元素, 使其通過此處理步驟來轉(zhuǎn)換此流疟羹。
遵守ActorAttributes.SupervisionStrategy
屬性。
當(dāng)映射函數(shù)返回一個(gè)元素時(shí)發(fā)出禀倔。
當(dāng)下游背壓時(shí)背壓榄融。
當(dāng)下游完成時(shí)完成。
當(dāng)下游取消時(shí)取消救湖。
(19)statefulMapConcat[T](f: () ? Out ? immutable.Iterable[T]): Repr[T]
將每個(gè)輸入元素轉(zhuǎn)換為輸出元素的 Iterable, 然后將其平坦化到輸出流中愧杯。該轉(zhuǎn)換意味著是有狀態(tài)的, 這是通過為每個(gè)物化重新創(chuàng)建轉(zhuǎn)換函數(shù)來啟用的-返回的函數(shù)通常會(huì)掩蓋可變對(duì)象以在調(diào)用之間存儲(chǔ)狀態(tài)。對(duì)于無狀態(tài)變量, 請(qǐng)參見 [[FlowOps. mapConcat]]鞋既。
返回的Iterable
不能包含null
值, 因?yàn)樗鼈兪欠欠ǖ牧髟?根據(jù)反應(yīng)流規(guī)范民效。
遵守ActorAttributes.SupervisionStrategy
屬性。
當(dāng)映射函數(shù)返回一個(gè)元素或者先前計(jì)算的集合仍有剩余的元素時(shí)發(fā)出涛救。
當(dāng)下游背壓或者先前計(jì)算的集合仍有剩余的元素時(shí)背壓畏邢。
當(dāng)下游完成并且剩余的元素已經(jīng)發(fā)出時(shí)完成。
當(dāng)下游取消時(shí)检吆,取消舒萎。
例如:
"be able to restart" in {
Source(List(2, 1, 3, 4, 1)).statefulMapConcat(() ? {
var prev: Option[Int] = None
x ? {
if (x % 3 == 0) throw ex
prev match {
case Some(e) ?
prev = Some(x)
(1 to e) map (_ ? x)
case None ?
prev = Some(x)
List.empty[Int]
}
}
}).withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(TestSink.probe[Int])
.request(2).expectNext(1, 1)
.request(4).expectNext(1, 1, 1, 1)
.expectComplete()
}
"be able to resume" in {
Source(List(2, 1, 3, 4, 1)).statefulMapConcat(() ? {
var prev: Option[Int] = None
x ? {
if (x % 3 == 0) throw ex
prev match {
case Some(e) ?
prev = Some(x)
(1 to e) map (_ ? x)
case None ?
prev = Some(x)
List.empty[Int]
}
}
}).withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(TestSink.probe[Int])
.request(2).expectNext(1, 1)
.requestNext(4)
.request(4).expectNext(1, 1, 1, 1)
.expectComplete()
}
(20)mapConcat[T](f: Out ? immutable.Iterable[T]): Repr[T]
即statefulMapConcat(() ? f)
。這個(gè)轉(zhuǎn)換函數(shù)沒有內(nèi)部狀態(tài)蹭沛,即沒有掩蓋可變對(duì)象臂寝。
例如:
val someDataSource = Source(List(List("1"), List("2"), List("3", "4", "5"), List("6", "7")))
//#flattening-seqs
val myData: Source[List[Message], NotUsed] = someDataSource
val flattened: Source[Message, NotUsed] = myData.mapConcat(identity)
//#flattening-seqs
Await.result(flattened.limit(8).runWith(Sink.seq), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7"))
(21)mapAsync[T](parallelism: Int)(f: Out ? Future[T]): Repr[T]
通過將給定函數(shù)應(yīng)用于每個(gè)元素, 使其通過此處理步驟來轉(zhuǎn)換此流。 函數(shù)返回一個(gè)Future
并將Future
的值發(fā)給下游摊灭。并行運(yùn)行的Future
數(shù)量作為mapAsync
的第一個(gè)參數(shù)給出咆贬。這些Future
可能以任何順序完成,但下游發(fā)出的元素與從上游接收的元素相同帚呼。
如果函數(shù)`f'引發(fā)異常掏缎,或者如果Future
完成失敗皱蹦,并且監(jiān)管(supervision)決定是akka.stream.Supervision.Stop
,流將完成失敗眷蜈。
如果函數(shù)`f'拋出異常沪哺,或者如果Future
完成失敗,監(jiān)管(supervision)決定是akka.stream.Supervision.Resume
或akka.stream.Supervision.Restart
酌儒,則元素被丟棄辜妓,流繼續(xù)運(yùn)行。
函數(shù)`f'總是按照元素到達(dá)的順序?qū)λ鼈冞M(jìn)行調(diào)用忌怎。
遵守ActorAttributes.SupervisionStrategy
屬性籍滴。
當(dāng)由提供的函數(shù)返回的Future
按順序完成下一個(gè)元素時(shí)發(fā)出。
當(dāng)future的數(shù)量到達(dá)配置的并行數(shù)并且下游背壓時(shí)榴啸,或者當(dāng)?shù)谝粋€(gè)future沒有完成時(shí)异逐,背壓。
當(dāng)下游完成并且所有的future完成并且所有元素已經(jīng)發(fā)出插掂,則完成灰瞻。
當(dāng)下游取消時(shí),取消辅甥。
例如:
//#mapAsync-ask
import akka.pattern.ask
implicit val askTimeout = Timeout(5.seconds)
val words: Source[String, NotUsed] =
Source(List("hello", "hi"))
words
.mapAsync(parallelism = 5)(elem ? (ref ? elem).mapTo[String])
// continue processing of the replies from the actor
.map(_.toLowerCase)
.runWith(Sink.ignore)
//#mapAsync-ask
(22)mapAsyncUnordered[T](parallelism: Int)(f: Out ? Future[T]): Repr[T]
通過將給定函數(shù)應(yīng)用于每個(gè)元素, 使其通過此處理步驟來轉(zhuǎn)換此流酝润。 函數(shù)返回一個(gè)Future
并將Future
的值發(fā)給下游。并行運(yùn)行的Future
數(shù)量作為mapAsyncUnordered
的第一個(gè)參數(shù)給出璃弄。每個(gè)已處理的元素將在準(zhǔn)備就緒后立即發(fā)送到下游, 也就是說, 元素可能不會(huì)按照從上游接收到的相同順序發(fā)送到下游要销。
如果函數(shù)`f'引發(fā)異常,或者如果Future
完成失敗夏块,并且監(jiān)管(supervision)決定是akka.stream.Supervision.Stop
疏咐,流將完成失敗。
如果函數(shù)`f'拋出異常脐供,或者如果Future
完成失敗浑塞,監(jiān)管(supervision)決定是akka.stream.Supervision.Resume
或akka.stream.Supervision.Restart
,則元素被丟棄政己,流繼續(xù)運(yùn)行酌壕。
函數(shù)`f'總是按照元素到達(dá)的順序?qū)λ鼈冞M(jìn)行調(diào)用(即使 "f" 返回的Future
的結(jié)果可能以不同的順序發(fā)出)。
遵守ActorAttributes.SupervisionStrategy
屬性歇由。
當(dāng)任一由給定函數(shù)返回的Future完成時(shí)卵牍,發(fā)送。
當(dāng)future的數(shù)量到達(dá)配置的并行數(shù)并且下游背壓時(shí)背壓沦泌。
當(dāng)下游完成并且所有的future已經(jīng)完成并且所有元素已經(jīng)發(fā)送時(shí)糊昙,完成。
當(dāng)下游取消時(shí)取消谢谦。
(23)filter(p: Out ? Boolean): Repr[Out]
只傳遞滿足給定謂詞的元素释牺。
遵守ActorAttributes.SupervisionStrategy
屬性萝衩。
當(dāng)對(duì)于某元素,給定的謂詞返回true船侧,則發(fā)送該元素。
當(dāng)對(duì)于某元素給定的謂詞返回true并且下游背壓時(shí)厅各,背壓镜撩。
當(dāng)下游完成時(shí)完成。
當(dāng)下游取消時(shí)取消队塘。
(24)filterNot(p: Out ? Boolean): Repr[Out]
只傳遞那些不滿足給定謂詞的元素袁梗。
(25)takeWhile(p: Out ? Boolean, inclusive: Boolean): Repr[Out]
在謂詞第一次返回 false 后終止處理 (并取消上游發(fā)布者), 包括第一個(gè)失敗的元素 (如果inclusive
是真的) 由于輸入緩沖, 某些元素可能已從上游發(fā)布者請(qǐng)求, 然后此步驟的下游不處理。
如果第一個(gè)流元素的謂詞為 false, 則該流將完成而不生成任何元素憔古。
遵守ActorAttributes.SupervisionStrategy
屬性遮怜。
當(dāng)謂詞為true時(shí),發(fā)送元素鸿市。
當(dāng)下游背壓時(shí)背壓锯梁。
如果inclusive
為false當(dāng)謂詞為false時(shí)或者當(dāng)如果inclusive
為true而謂詞為false后第一個(gè)元素已發(fā)出并且下游完成時(shí)或者下游完成時(shí),完成焰情。
例如:
Source(1 to 10).takeWhile(_ < 3, true).runWith(TestSink.probe[Int])
.request(4)
.expectNext(1, 2, 3)
.expectComplete()
(26)takeWhile(p: Out ? Boolean): Repr[Out]
即takeWhile(p, false)
在謂詞第一次返回false后陌凳,終止處理(并且取消上游發(fā)布者)。由于輸入緩沖内舟,一些元素可能已經(jīng)從上游發(fā)布者請(qǐng)求合敦,將不會(huì)被此步驟的下游處理。
如果第一個(gè)流元素的謂詞為 false, 則該流將完成而不生成任何元素验游。
當(dāng)謂詞為true時(shí)充岛,發(fā)送元素。
當(dāng)下游背壓時(shí)背壓耕蝉。
當(dāng)謂詞返回false或者下游取消時(shí)崔梗,則取消。
例如:
val flowUnderTest = Flow[Int].takeWhile(_ < 5)
val future = Source(1 to 10).via(flowUnderTest).runWith(Sink.fold(Seq.empty[Int])(_ :+ _))
val result = Await.result(future, 3.seconds)
assert(result == (1 to 4))
(27)dropWhile(p: Out ? Boolean): Repr[Out]
例如:
在謂詞為true時(shí)垒在,丟棄該元素炒俱。
謂詞第一次返回false后,所有元素都將被采用爪膊。
Source(1 to 4).dropWhile(_ < 3).runWith(TestSink.probe[Int])
.request(2)
.expectNext(3, 4)
.expectComplete()
Source(1 to 4).dropWhile(a ? if (a < 3) true else throw TE("")).withAttributes(supervisionStrategy(resumingDecider))
.runWith(TestSink.probe[Int])
.request(1)
.expectComplete()
Source(1 to 4).dropWhile {
case 1 | 3 ? true
case 4 ? false
case 2 ? throw TE("")
}.withAttributes(supervisionStrategy(restartingDecider))
.runWith(TestSink.probe[Int])
.request(1)
.expectNext(4)
.expectComplete()
(28)collect[T](pf: PartialFunction[Out, T]): Repr[T]
通過將給定的偏函數(shù)應(yīng)用于每個(gè)元素, 以便通過此處理步驟來轉(zhuǎn)換此流权悟。 不匹配的元素被過濾掉。
遵守ActorAttributes.SupervisionStrategy
屬性推盛。
如果某元素在偏函數(shù)中有定義峦阁,則發(fā)送該元素。
如果某元素在偏函數(shù)中有定義并且下游背壓耘成,則背壓榔昔。
如果下游完成驹闰,則完成。
如果下游取消撒会,則取消嘹朗。
例如:
val emailAddresses: Source[String, NotUsed] =
authors
.mapAsync(4)(author ? addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) ? emailAddress }
(29)grouped(n: Int): Repr[immutable.Seq[Out]]
將該流塊組合成給定大小的組,最后一組可能由于流的結(jié)束而小于請(qǐng)求诵肛。
n
必須是正數(shù), 否則拋出IllegalArgumentException
異常屹培。
已累積指定數(shù)量的元素或上游完成時(shí)發(fā)出元素。
已組裝出一組并且下游背壓時(shí)背壓怔檩。
下游完成時(shí)褪秀,完成。
下游取消時(shí)薛训,取消媒吗。
例如:
Source(1 to 4).grouped(2)
.runWith(Sink.seq)
結(jié)果應(yīng)該是:
Seq(Seq(1, 2), Seq(3, 4))
(30)limitWeighted[T](max: Long)(costFn: Out ? Long): Repr[Out]
通過使用成本函數(shù)來評(píng)估傳入元素的成本, 確保流有界性。到底有多少元素將被允許前往下游取決于每個(gè)元素的評(píng)估成本乙埃。如果累計(jì)成本超過最大值, 它將向上游發(fā)出故障 "StreamLimitException" 信號(hào)闸英。
由于輸入緩沖,可能已經(jīng)從上游發(fā)布者請(qǐng)求了一些元素介袜,然后將不會(huì)在該步驟的下游處理自阱。
遵守ActorAttributes.SupervisionStrategy
屬性。
上游發(fā)出且已累計(jì)的成本沒有達(dá)到最大值時(shí)發(fā)出米酬。
下游背壓時(shí)背壓沛豌。
累計(jì)的成本超過最大值時(shí)報(bào)錯(cuò)。
下游取消時(shí)取消赃额。
(31)limit(max: Long): Repr[Out]
即limitWeighted(max)(_ ? 1)
通過限制上游元素的數(shù)量來確保流的有界性加派。如果傳入元素的數(shù)量超過最大值, 它將向上游發(fā)出故障 "StreamLimitException" 信號(hào)。
由于輸入緩沖跳芳,可能已經(jīng)從上游發(fā)布者請(qǐng)求了一些元素芍锦,然后將不會(huì)在該步驟的下游處理。
上游發(fā)出且已發(fā)出的元素?cái)?shù)量沒有達(dá)到最大值時(shí)發(fā)出飞盆。
下游背壓時(shí)背壓娄琉。
傳入元素的總數(shù)超過最大值時(shí)報(bào)錯(cuò)。
下游取消時(shí)取消吓歇。
(32)sliding(n: Int, step: Int = 1): Repr[immutable.Seq[Out]]
在流上應(yīng)用滑動(dòng)窗口, 并將窗口作為元素組返回孽水;因?yàn)榱魑膊浚詈笠粋€(gè)組可能小于請(qǐng)求城看。
n
必須是正數(shù), 否則拋出IllegalArgumentException
異常女气。
step````必須是正數(shù), 否則拋出
IllegalArgumentException```異常。
當(dāng)窗口內(nèi)已收集足夠的元素或上游已完成测柠,發(fā)出元素炼鞠。
當(dāng)窗口已收集足夠的元素且下游背壓時(shí)背壓缘滥。
當(dāng)上游完成時(shí),完成谒主。
當(dāng)下游取消時(shí)朝扼,取消。
(33)scan[T](zero: T)(f: (T, Out) ? T): Repr[T]
Similar to fold
but is not a terminal operation,
- emits its current value which starts at
zero
and then - applies the current and next value to the given function
f
, - emitting the next current value.
類似于fold
, 但不是終端操作, 從zero
作為當(dāng)前值開始, 然后將當(dāng)前值和上游傳入的元素應(yīng)用于給定函數(shù)f
, 立即將結(jié)果(中間結(jié)果)發(fā)送到下游霎肯,并作為下次計(jì)算的當(dāng)前值擎颖。
如果函數(shù)```f````拋出異常,并且監(jiān)督?jīng)Q定是[[akka.stream.Supervision.Restart]]姿现,則當(dāng)前值將再次從零開始肠仪,流將繼續(xù)肖抱。
遵守ActorAttributes.SupervisionStrategy
屬性备典。
當(dāng)掃描元素的函數(shù)返回一個(gè)新元素時(shí)發(fā)出元素。
當(dāng)下游背壓時(shí)背壓意述。
當(dāng)上游完成時(shí)完成提佣。
當(dāng)下游取消時(shí)取消。
例如:
val scan = Flow[Int].scan(0) { (old, current) ?
require(current > 0)
old + current
}.withAttributes(supervisionStrategy(Supervision.restartingDecider))
Source(List(1, 3, -1, 5, 7)).via(scan)
注意荤崇,scan會(huì)將每一次計(jì)算結(jié)果都發(fā)給它的下游拌屏,此處下游會(huì)收到0, 1, 4, 0, 5, 12
共六個(gè)元素。
(34)scanAsync[T](zero: T)(f: (T, Out) ? Future[T]): Repr[T]
類似于scan
术荤,但使用異步函數(shù)倚喂,它發(fā)出其當(dāng)前值 (從zero
開始), 然后將當(dāng)前值和下一個(gè)數(shù)值應(yīng)用于給定函數(shù) f
, 從而發(fā)出解析為下一個(gè)當(dāng)前值的Future
。
如果函數(shù)f````拋出異常瓣戚,并且監(jiān)督?jīng)Q定是[[akka.stream.Supervision.Restart]]端圈,則當(dāng)前值將再次從
zero```開始,流將繼續(xù)運(yùn)行子库。
如果函數(shù) f
拋出異常, 并且監(jiān)視決策是akka.stream.Supervision.Resume
當(dāng)前值從上一個(gè)當(dāng)前值開始, 或者當(dāng)它沒有一個(gè)時(shí), 則為zero
, 流將繼續(xù)舱权。
遵守ActorAttributes.SupervisionStrategy
屬性。
當(dāng)f
返回future完成時(shí)仑嗅,發(fā)送元素鸳兽。
當(dāng)下游背壓時(shí)背壓接癌。
當(dāng)上游完成并且最后一個(gè)f
返回的future完成時(shí),,完成雨涛。
下游取消時(shí),取消弥搞。
(35)fold[T](zero: T)(f: (T, Out) ? T): Repr[T]
類似于scan
抹蚀,但它只在上游傳入元素完成,并計(jì)算出最終結(jié)果后郭变,才將最終結(jié)果發(fā)送到下游颜价。應(yīng)用給定的函數(shù)將當(dāng)前值和下一個(gè)值進(jìn)行計(jì)算涯保,其結(jié)果作為下一次計(jì)算的當(dāng)前值(注意此中間結(jié)果不會(huì)發(fā)給下游)。
如果函數(shù)f````拋出異常周伦,并且監(jiān)督?jīng)Q定是[[akka.stream.Supervision.Restart]]夕春,則當(dāng)前值將再次從
zero```開始,流將繼續(xù)運(yùn)行专挪。
遵守ActorAttributes.SupervisionStrategy
屬性及志。
當(dāng)上游完成時(shí),發(fā)送計(jì)算結(jié)果寨腔。
當(dāng)下游背壓時(shí)背壓速侈。
當(dāng)上游完成時(shí),完成迫卢。
下游取消時(shí)倚搬,取消。
例如:
val fold= Flow[Int].fold(0) { (old, current) ?
require(current > 0)
old + current
}.withAttributes(supervisionStrategy(Supervision.restartingDecider))
Source(List(1, 3, -1, 5, 7)).via(fold)
注意下游只會(huì)收到一個(gè)元素12乾蛤。
(36)foldAsync[T](zero: T)(f: (T, Out) ? Future[T]): Repr[T]
類似于fold
每界,但使用異步函數(shù),應(yīng)用給定的函數(shù)將當(dāng)前值和下一個(gè)值進(jìn)行計(jì)算家卖,其結(jié)果作為下一次計(jì)算的當(dāng)前值(注意此中間結(jié)果不會(huì)發(fā)給下游)眨层。
遵守ActorAttributes.SupervisionStrategy
屬性。
如果函數(shù)f````拋出異常上荡,并且監(jiān)督?jīng)Q定是[[akka.stream.Supervision.Restart]]趴樱,則當(dāng)前值將再次從
zero```開始,流將繼續(xù)運(yùn)行酪捡。
當(dāng)上游完成時(shí)叁征,發(fā)送計(jì)算結(jié)果。
當(dāng)下游背壓時(shí)背壓沛善。
當(dāng)上游完成時(shí)航揉,完成。
下游取消時(shí)金刁,取消帅涂。
(37)reduce[T >: Out](f: (T, T) ? T): Repr[T]
類似于fold
但使用收到的第一個(gè)元素作為zero
元素。應(yīng)用給定的函數(shù)將當(dāng)前值和下一個(gè)值進(jìn)行計(jì)算尤蛮,其結(jié)果作為下一次計(jì)算的當(dāng)前值(注意此中間結(jié)果不會(huì)發(fā)給下游)媳友。
如果流是空的 (即在發(fā)送任何元素之前完成), 則 reduce
階段將以NoSuchElementException
使下游失敗, 這在語義上與 Scala 的標(biāo)準(zhǔn)庫集合在這種情況下的狀態(tài)一致。
遵守ActorAttributes.SupervisionStrategy
屬性产捞。
當(dāng)上游完成時(shí)醇锚,發(fā)送計(jì)算結(jié)果。
當(dāng)下游背壓時(shí),背壓焊唬。
當(dāng)上游完成時(shí)恋昼,完成。
下游取消時(shí)赶促,取消液肌。
(38)intersperse[T >: Out](start: T, inject: T, end: T): Repr[T]
以及intersperse[T >: Out](inject: T): Repr[T]
使用所提供元素的散流, 類似于scala.collection.immutable.List.mkString
在List元素之間插入分隔符的方式。
此外, 還可以向流中注入起始和結(jié)束標(biāo)記元素鸥滨。
例如:
val nums = Source(List(1,2,3)).map(_.toString)
nums.intersperse(",") // 1 , 2 , 3
nums.intersperse("[", ",", "]") // [ 1 , 2 , 3 ]
如果您只想預(yù)置或僅追加元素 (但是仍然使用intercept
特性在元素之間插入一個(gè)分隔符嗦哆,你可能需要使用下面的模式而不是3個(gè)參數(shù)版本的intersperse
(參見Source .concat
用于語義細(xì)節(jié)):
Source.single(">> ") ++ Source(List("1", "2", "3")).intersperse(",")
Source(List("1", "2", "3")).intersperse(",") ++ Source.single("END")
上游發(fā)出時(shí)發(fā)出(或者如果有提供start
元素,先發(fā)送它)婿滓。
當(dāng)下游背壓時(shí)老速,背壓。
當(dāng)上游完成時(shí)凸主,完成橘券。
下游取消時(shí),取消秕铛。
(39)groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]]
將流進(jìn)行分組约郁,或按照一個(gè)時(shí)間窗口內(nèi)接收到的元素分組缩挑,或按照給定數(shù)量的限制進(jìn)行分組但两,無論哪一個(gè)先發(fā)生。 如果沒有從上游接收到元素供置,空組將不會(huì)被發(fā)出谨湘。 流結(jié)束之前的最后一個(gè)組將包含自先前發(fā)出的組以來的緩沖元素。
n
必須是正數(shù)芥丧,d
必須大于0秒紧阔,否則拋出IllegalArgumentException
異常。
從上一個(gè)組發(fā)射后經(jīng)過配置的時(shí)間或者緩沖n
個(gè)元素后發(fā)送续担。
當(dāng)下游背壓且已經(jīng)有n+1
個(gè)元素緩沖擅耽,則背壓。
當(dāng)上游完成并且最后一組已發(fā)送物遇,完成乖仇。
當(dāng)下游取消,取消询兴。
(40)groupedWeightedWithin(maxWeight: Long, d: FiniteDuration)(costFn: Out ? Long): Repr[immutable.Seq[Out]]
將流進(jìn)行分組乃沙,或按照一個(gè)時(shí)間窗口內(nèi)接收到的元素分組,或按照元素權(quán)重的限制進(jìn)行分組诗舰,無論哪一個(gè)先發(fā)生警儒。 如果沒有從上游接收到元素,空組將不會(huì)被發(fā)出眶根。 流結(jié)束之前的最后一個(gè)組將包含自先前發(fā)出的組以來的緩沖元素蜀铲。
maxWeight
必須是正數(shù)边琉,d
必須大于0秒,否則拋出IllegalArgumentException
異常记劝。
從上一個(gè)組發(fā)射后經(jīng)過配置的時(shí)間或者到達(dá)權(quán)重限制后發(fā)送艺骂。
當(dāng)下游背壓,并且緩沖組(+等待元素)權(quán)重大于maxWeight
時(shí)隆夯,背壓钳恕。
當(dāng)上游完成并且最后一組已發(fā)送,完成蹄衷。
當(dāng)下游取消忧额,取消。
(41)delay(of: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.dropTail): Repr[Out]
按一定時(shí)間間隔將元素的發(fā)射時(shí)間改變愧口。在等待下一個(gè)元素被發(fā)送時(shí)睦番,它允許存儲(chǔ)元素在內(nèi)部緩沖區(qū)。如果緩沖區(qū)內(nèi)沒有足夠的空間耍属,根據(jù)定義的akka.stream.DelayOverflowStrategy
托嚣,可以丟棄元素或者背壓上游。
延遲精度為 10ms, 以避免不必要的計(jì)時(shí)器調(diào)度周期厚骗。
內(nèi)部緩沖區(qū)的默認(rèn)容量為16示启。您可以通過調(diào)用addAttributes (inputBuffer)
來設(shè)置緩沖區(qū)大小。
當(dāng)緩沖區(qū)有等待的元素并且為這個(gè)元素配置的時(shí)間已到達(dá)领舰,則發(fā)送元素夫嗓。
OverflowStrategy策略介紹:
- emitEarly:如果緩沖區(qū)已滿,當(dāng)新的元素可用時(shí)冲秽,這個(gè)策略不等待直接發(fā)送下一個(gè)元素到下游舍咖。
- dropHead:如果緩沖區(qū)已滿,當(dāng)新的元素到達(dá)锉桑,丟棄緩沖區(qū)中最舊的元素排霉,從而為新元素留出空間。
- dropTail:如果緩沖區(qū)已滿民轴,當(dāng)新的元素到達(dá)攻柠,丟棄緩沖區(qū)中最新的元素,從而為新元素留出空間杉武。
- dropBuffer:如果緩沖區(qū)已滿辙诞,當(dāng)新的元素到達(dá),丟棄緩沖區(qū)中所有元素轻抱,從而為新元素留出空間飞涂。
- dropNew:如果緩沖區(qū)已滿,當(dāng)新的元素到達(dá),丟棄這個(gè)新元素较店。
- backpressure:如果緩沖區(qū)已滿士八,當(dāng)新的元素到達(dá),則背壓上游發(fā)布者直到緩沖區(qū)內(nèi)的空間可用梁呈。
- fail:如果緩沖區(qū)已滿婚度,當(dāng)新的元素到達(dá),則以失敗完成流官卡。
當(dāng)上游完成并且緩沖元素被耗盡時(shí)蝗茁,完成。
當(dāng)下游取消時(shí)寻咒,取消哮翘。
(42)drop(n: Long): Repr[Out]
在流的開始處丟棄給定數(shù)目的元素。
如果n
為零或?yàn)樨?fù)數(shù), 則不會(huì)丟棄任何元素毛秘。
當(dāng)已經(jīng)丟棄了指定數(shù)目的元素饭寺,則發(fā)送元素。
當(dāng)指定數(shù)目的元素已被丟棄并且下游背壓時(shí)叫挟,背壓艰匙。
當(dāng)上游完成時(shí),完成抹恳。
當(dāng)下游取消時(shí)员凝,取消。
(43)dropWithin(d: FiniteDuration): Repr[Out]
在流的開始處在給定的持續(xù)時(shí)間內(nèi)适秩,丟棄接收的元素绊序。
當(dāng)經(jīng)過指定的時(shí)間后并且新的上游元素到達(dá)硕舆,發(fā)送元素秽荞。
當(dāng)下游背壓時(shí),背壓抚官。
當(dāng)上游完成時(shí)扬跋,完成。
下游取消時(shí)凌节,取消钦听。
(44)take(n: Long): Repr[Out]
在給定數(shù)量的元素之后終止處理 (并取消上游發(fā)布者)。由于輸入緩沖, 有些元素可能已從上游發(fā)布者請(qǐng)求, 這些元素將不會(huì)在這一步的下游進(jìn)行處理倍奢。
如果n
是0或者負(fù)數(shù)朴上,流將不產(chǎn)生任何元素完成。
指定的數(shù)目還沒到達(dá)時(shí)卒煞,發(fā)送元素痪宰。
當(dāng)下游背壓時(shí),背壓。
當(dāng)指定數(shù)目的元素已經(jīng)處理或者上游完成時(shí)衣撬,完成乖订。
當(dāng)指定數(shù)目的元素已被處理或者下游取消時(shí),取消具练。
(45)takeWithin(d: FiniteDuration): Repr[Out]
在給定的持續(xù)時(shí)間后乍构,終止處理(并取消上游發(fā)布者)。由于輸入緩沖, 有些元素可能已從上游發(fā)布者請(qǐng)求, 這些元素將不會(huì)在這一步的下游進(jìn)行處理扛点。
請(qǐng)注意哥遮,這可以與take
結(jié)合來限制持續(xù)時(shí)間內(nèi)的元素?cái)?shù)量。
當(dāng)上游元素到達(dá)時(shí)陵究,發(fā)送元素昔善。
當(dāng)下游背壓時(shí),背壓畔乙。
在上游完成或計(jì)時(shí)器觸發(fā)時(shí)完成君仆。
當(dāng)下游取消或計(jì)時(shí)器觸發(fā)時(shí)取消。
(46)conflateWithSeed[S](seed: Out ? S)(aggregate: (S, Out) ? S): Repr[S]
通過將元素合并到一個(gè)摘要中牲距,允許更快的上游獨(dú)立于較慢的訂閱者返咱,直到訂閱者準(zhǔn)備好接受它們。例如牍鞠,如果上游的發(fā)布者速度更快咖摹,那么合并的步驟可能會(huì)勻化傳入的數(shù)目。
此版本的合并允許從第一個(gè)元素派生種子, 并將聚合類型更改為與輸入類型不同的類型难述。有關(guān)不更改類型的更簡(jiǎn)單版本, 請(qǐng)參見FlowOps.conflate
萤晴。
此元素僅在上游速度較快時(shí)聚合元素, 但如果下游速度較快, 則不會(huì)復(fù)制元素。
遵守ActorAttributes.SupervisionStrategy
屬性胁后。
當(dāng)下游停止背壓并且有可用的已合并元素店读,則發(fā)送元素。
沒有背壓的時(shí)候攀芯。
當(dāng)上游完成時(shí)屯断,完成。
當(dāng)下游取消時(shí)侣诺,取消殖演。
參數(shù)seed,使用第一個(gè)未消耗的元素作為開始年鸳,為合并值提供第一個(gè)狀態(tài)趴久。
參數(shù)aggregate, 獲取當(dāng)前聚合的值和當(dāng)前正在等待的元素搔确,以生成一個(gè)新的聚合值彼棍。
也請(qǐng)參見FlowOps.conflate
, FlowOps.limit
, FlowOps.limitWeighted
, FlowOps.batch
, FlowOps.batchWeighted
(47)conflate[O2 >: Out](aggregate: (O2, O2) ? O2): Repr[O2]
通過將元素合并到一個(gè)摘要中已添,允許更快的上游獨(dú)立于較慢的訂閱者,直到訂閱者準(zhǔn)備好接受它們滥酥。例如更舞,如果上游的發(fā)布者速度更快,那么合并的步驟可能會(huì)勻化傳入的數(shù)目坎吻。
此版本的合并不改變流的輸出類型缆蝉。請(qǐng)參見FlowOps.conflateWithSeed
,一個(gè)更為復(fù)雜的版本瘦真,可以在聚合時(shí)使用種子函數(shù)并轉(zhuǎn)換元素刊头。
此元素僅在上游速度較快時(shí)聚合元素, 但如果下游速度較快, 則不會(huì)復(fù)制元素。
遵守ActorAttributes.SupervisionStrategy
屬性诸尽。
當(dāng)下游停止背壓并且有可用的已合并元素原杂,則發(fā)送元素。
沒有背壓的時(shí)候您机。
當(dāng)上游完成時(shí)穿肄,完成。
當(dāng)下游取消時(shí)际看,取消咸产。
參數(shù)aggregate, 獲取當(dāng)前聚合的值和當(dāng)前正在等待的元素仲闽,以生成一個(gè)新的聚合值脑溢。
也請(qǐng)參見FlowOps.conflateWithSeed
, FlowOps.limit
, FlowOps.limitWeighted
, FlowOps.batch
, FlowOps.batchWeighted
(48)batch[S](max: Long, seed: Out ? S)(aggregate: (S, Out) ? S): Repr[S]
通過將元素聚合到批次中,允許更快的上游獨(dú)立于較慢的訂閱者赖欣,直到訂閱者準(zhǔn)備好接受它們屑彻。例如,如果上游發(fā)布者更快顶吮,一個(gè)batch步驟可以存儲(chǔ)接收到的元素于一個(gè)數(shù)組直到最大限制值社牲。
此元素僅在上游速度較快時(shí)聚合元素, 但如果下游速度較快, 則不會(huì)復(fù)制元素。
遵守ActorAttributes.SupervisionStrategy
屬性云矫。
當(dāng)下游停止背壓并且有可用的已聚合元素膳沽,則發(fā)送元素。
當(dāng)成批元素達(dá)到最大值而有一個(gè)待處理元素且下游背壓時(shí)让禀,背壓。
當(dāng)上游完成且沒有成批或待處理元素等待陨界,完成巡揍。
參數(shù)max 在背壓上游前,可成批的最大元素?cái)?shù)
(必須是非0正數(shù))菌瘪。
參數(shù)seed腮敌,使用第一個(gè)未消耗的元素作為開始阱当,為合并值提供第一個(gè)狀態(tài)。
參數(shù)aggregate糜工, 獲取當(dāng)前成批的值和當(dāng)前正在等待的元素弊添,以生成一個(gè)新的聚合值。
(49)batchWeighted[S](max: Long, costFn: Out ? Long, seed: Out ? S)(aggregate: (S, Out) ? S): Repr[S]
通過將元素聚合到批次中捌木,允許更快的上游獨(dú)立于較慢的訂閱者油坝,直到訂閱者準(zhǔn)備好接受它們。例如刨裆,如果上游發(fā)布者速度更快澈圈,則batch步驟可以將“ByteString”元素連接到允許的最大限制。
此元素僅在上游速度較快時(shí)聚合元素, 但如果下游速度較快, 則不會(huì)復(fù)制元素帆啃。
成批將應(yīng)用于所有元素瞬女,即使一個(gè)單一元素成本比允許的最大值還大。這種情況下努潘,先前成批的元素將被發(fā)送诽偷,然后這個(gè)"重"元素將被發(fā)送(在應(yīng)用了種子函數(shù)后)而不用它與其它元素成批處理,然后其余傳入的元素成批處理疯坤。
當(dāng)下游停止背壓并且有可用的已聚合元素渤刃,則發(fā)送元素。
當(dāng)成批元素權(quán)重值到達(dá)最大值而有一個(gè)待處理元素且下游背壓時(shí)贴膘,背壓卖子。
當(dāng)上游完成且沒有成批或待處理元素等待,完成刑峡。
當(dāng)下游取消時(shí)洋闽,取消。
參數(shù)max 在背壓上游前突梦,成批元素的最大權(quán)重值(必須時(shí)非0正數(shù))
參數(shù)costFn 用于計(jì)算單一元素權(quán)重值的函數(shù)
參數(shù)seed诫舅,使用第一個(gè)未消耗的元素作為開始,為成批值提供第一個(gè)狀態(tài)宫患。
參數(shù)aggregate刊懈, 獲取當(dāng)前成批的值和當(dāng)前正在等待的元素,以生成一個(gè)新的成批值娃闲。
(50)expand[U](extrapolate: Out ? Iterator[U]): Repr[U]
通過從較老的元素中外推元素虚汛,直到從上游來新元素,可以使更快地下游獨(dú)立于較慢的發(fā)布者皇帮。例如卷哩,擴(kuò)展步驟可能會(huì)重復(fù)給訂閱者的最后一個(gè)元素,直到它從上游接收到更新属拾。
這個(gè)元素永遠(yuǎn)不會(huì)“丟棄”上游元素将谊,因?yàn)樗性囟冀?jīng)過至少一個(gè)外推步驟冷溶。
這意味著如果上游實(shí)際上比上游更快,它將被下游用戶背壓尊浓。
Expand
不支持akka.stream.Supervision.Restart
和akka.stream.Supervision.Resume
逞频。
來自seed
或者extrapolate
函數(shù)的異常將使流以失敗完成。
當(dāng)下游停止背壓時(shí)栋齿,發(fā)送元素苗胀。
當(dāng)下游背壓或者迭代器運(yùn)行為空時(shí),背壓褒颈。
當(dāng)上游完成時(shí)柒巫,完成。
當(dāng)下游取消時(shí)谷丸,取消堡掏。
(51)buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out]
在流中添加一個(gè)固定大小的緩沖區(qū), 允許從較快的上游存儲(chǔ)元素, 直到它變?yōu)闈M的。如果沒有可用的空間, 根據(jù)已定義的 akka.stream.OverflowStrategy
, 它可能會(huì)丟棄元素或上背壓上游刨疼。
當(dāng)下游停止背壓泉唁,并且在緩沖區(qū)有待處理元素,則發(fā)送元素揩慕。
OverflowStrategy策略介紹:
- emitEarly:如果緩沖區(qū)已滿亭畜,當(dāng)新的元素可用時(shí),這個(gè)策略不等待直接發(fā)送下一個(gè)元素到下游迎卤。
- dropHead:如果緩沖區(qū)已滿拴鸵,當(dāng)新的元素到達(dá),丟棄緩沖區(qū)中最舊的元素蜗搔,從而為新元素留出空間劲藐。
- dropTail:如果緩沖區(qū)已滿,當(dāng)新的元素到達(dá)樟凄,丟棄緩沖區(qū)中最新的元素聘芜,從而為新元素留出空間。
- dropBuffer:如果緩沖區(qū)已滿缝龄,當(dāng)新的元素到達(dá)汰现,丟棄緩沖區(qū)中所有元素,從而為新元素留出空間叔壤。
- dropNew:如果緩沖區(qū)已滿瞎饲,當(dāng)新的元素到達(dá),丟棄這個(gè)新元素百新。
- backpressure:如果緩沖區(qū)已滿企软,當(dāng)新的元素到達(dá),則背壓上游發(fā)布者直到緩沖區(qū)內(nèi)的空間可用饭望。
- fail:如果緩沖區(qū)已滿仗哨,當(dāng)新的元素到達(dá),則以失敗完成流铅辞。
當(dāng)上游完成并且緩沖元素被耗盡時(shí)厌漂,完成。
當(dāng)下游取消時(shí)斟珊,取消苇倡。
(52)prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U, NotUsed])]
從流中獲取n個(gè)元素 (僅當(dāng)上游在發(fā)出n個(gè)元素之前完成比n小) 并返回一個(gè)包含所取元素的嚴(yán)格序列和一個(gè)表示剩余元素的流的pair
。如果 "n" 為零或負(fù), 則返回一個(gè)空集合和一個(gè)流(將包含整個(gè)上游的流保持不變)的pair
囤踩。
如果上游出現(xiàn)錯(cuò)誤, 則取決于當(dāng)前狀態(tài)
- 如果在獲取n個(gè)元素之前旨椒,主流標(biāo)識(shí)錯(cuò)誤,子流尚未發(fā)出堵漱。
- 如果在獲取n個(gè)元素之后综慎,主流標(biāo)識(shí)錯(cuò)誤,子流已經(jīng)發(fā)出(在那一刻勤庐,主流已經(jīng)完成)
當(dāng)達(dá)到配置的“前綴”元素?cái)?shù)目時(shí)示惊,發(fā)送“前綴”以及剩余部分組成的子流。
當(dāng)下游背壓或者子流背壓時(shí)愉镰,背壓米罚。
當(dāng)“前綴”元素和子流都已耗盡時(shí),完成丈探。
當(dāng)下游取消或者子流取消時(shí)录择,取消。
(53)groupBy[K](maxSubstreams: Int, f: Out ? K): SubFlow[Out, Mat, Repr, Closed]
此操作將輸入流解復(fù)用為單獨(dú)的輸出流碗降,每個(gè)元素鍵一個(gè)輸出流隘竭。 使用給定函數(shù)為每個(gè)元素計(jì)算鍵。 當(dāng)?shù)谝淮斡龅揭粋€(gè)新的鍵時(shí)遗锣,一個(gè)新的子流被打開货裹,并隨后所有屬于該鍵的元素輸入到該流。
從這個(gè)方法返回的對(duì)象不是一個(gè)普通的Source或Flow精偿,而是一個(gè)SubFlow弧圆。這意味著在此之后,所有的轉(zhuǎn)換都將以相同的方式應(yīng)用于所有遇到的子流笔咽。SubFlow模式通過關(guān)閉子流(即將其連接到一個(gè)Sink)或?qū)⒆恿骱喜⒃谝黄鸲顺?有關(guān)更多信息搔预,請(qǐng)參見SubFlow中的to和mergeBack方法。
需要注意的是子流也像任何其他流一樣傳播背壓叶组,這意味著阻塞一個(gè)子流將阻塞“groupBy”運(yùn)算符本身——從而阻塞所有子流——一旦所有的內(nèi)部或顯式緩沖區(qū)被填滿拯田。
如果 groupby 函數(shù) f
拋出一個(gè)異常, 并且監(jiān)管策略是 akka.stream.Supervision.Stop
, 則流和 substreams 將以失敗完成。
如果 groupby 函數(shù) f
拋出一個(gè)異常, 并且監(jiān)管策略是 akka.stream.Supervision.Resume
或者akka.stream.Supervision.Restart
, 該元素被丟棄甩十,流和 substreams 將繼續(xù)運(yùn)行船庇。
函數(shù)f
不可以返回null
吭产。這將拋出異常并觸發(fā)監(jiān)管決策機(jī)制。
遵守ActorAttributes.SupervisionStrategy
屬性鸭轮。
當(dāng)分組函數(shù)返回尚未創(chuàng)建的組的元素時(shí)發(fā)出臣淤。發(fā)出新組。
當(dāng)某個(gè)組有尚未處理的元素窃爷,而該組的子流背壓時(shí)邑蒋,背壓。
當(dāng)上游完成時(shí)按厘,完成医吊。
當(dāng)下游取消并且所有子流取消時(shí),取消逮京。
參數(shù)maxSubstreams 配置支持的最大子流數(shù)/鍵數(shù)卿堂。如果遇到更多不同的鍵, 則流將失敗。
(54)splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: Out ? Boolean): SubFlow[Out, Mat, Repr, Closed]
此操作將給定謂詞應(yīng)用于所有傳入元素, 并將它們發(fā)送到輸出流中的一個(gè)流, 如果給定謂詞返回 true, 則總是用當(dāng)前元素開始新的一個(gè)子流造虏。這意味著, 對(duì)于以下一系列謂詞值, 將產(chǎn)生三個(gè)子流, 長(zhǎng)度為1御吞、2和 3:
false, // 元素進(jìn)入第一個(gè)子流
true, false, // 元素進(jìn)入第二個(gè)子流
true, false, false // 元素進(jìn)入第三個(gè)子流
如果流的 * 第一個(gè) * 元素與謂詞匹配, 則 splitWhen 發(fā)出的第一個(gè)流將從該元素開始。例如:
true, false, false // 第一個(gè)流從拆分元素開始
true, false // 隨后的子流操作方式相同
從這個(gè)方法返回的對(duì)象不是一個(gè)普通的Source或Flow漓藕,而是一個(gè)SubFlow陶珠。這意味著在此之后,所有的轉(zhuǎn)換都將以相同的方式應(yīng)用于所有遇到的子流享钞。SubFlow模式通過關(guān)閉子流(即將其連接到一個(gè)Sink)或?qū)⒆恿骱喜⒃谝黄鸲顺?有關(guān)更多信息揍诽,請(qǐng)參見SubFlow中的to和mergeBack方法。
需要注意的是子流也像任何其他流一樣傳播背壓栗竖,這意味著阻塞一個(gè)子流將阻塞“splitWhen”運(yùn)算符本身——從而阻塞所有子流——一旦所有的內(nèi)部或顯式緩沖區(qū)被填滿暑脆。
如果拆分謂詞 p
拋出一個(gè)異常, 并且監(jiān)管策略是 akka.stream.Supervision.Stop
, 則流和 substreams 將以失敗完成。
如果拆分謂詞 p
拋出一個(gè)異常, 并且監(jiān)管策略是 akka.stream.Supervision.Resume
或者akka.stream.Supervision.Restart
, 該元素被丟棄狐肢,流和 substreams 將繼續(xù)運(yùn)行添吗。
當(dāng)元素對(duì)于謂詞為true時(shí),為隨后的元素打開并發(fā)出新的子流份名。
當(dāng)某個(gè)子流有待處理的元素而之前的元素尚未完全消耗時(shí)碟联,或子流背壓時(shí),背壓僵腺。
當(dāng)上游完成時(shí)鲤孵,完成。
當(dāng)下游取消并且子流以SubstreamCancelStrategy.drain
取消時(shí)辰如,或者下游取消或任何子流以SubstreamCancelStrategy.propagate
取消時(shí)普监,取消。
也請(qǐng)參見FlowOps.splitAfter
。
(55)splitWhen(p: Out ? Boolean): SubFlow[Out, Mat, Repr, Closed]
即splitWhen(SubstreamCancelStrategy.drain)(p)
(56)splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: Out ? Boolean): SubFlow[Out, Mat, Repr, Closed]
此操作將給定謂詞應(yīng)用于所有傳入元素, 并將它們發(fā)送到輸出流中的一個(gè)流, 如果給定謂詞返回 true, 則結(jié)束當(dāng)前子流凯正。這意味著, 對(duì)于以下一系列謂詞值, 將產(chǎn)生三個(gè)子流, 長(zhǎng)度為2毙玻、2和 3:
false, true, // 元素進(jìn)入第一個(gè)子流
false, true, // 元素進(jìn)入第二個(gè)子流
false, false, true // 元素進(jìn)入第三個(gè)子流
需要注意的是子流也像任何其他流一樣傳播背壓,這意味著阻塞一個(gè)子流將阻塞“splitAfter”運(yùn)算符本身——從而阻塞所有子流——一旦所有的內(nèi)部或顯式緩沖區(qū)被填滿漆际。
如果拆分謂詞 p
拋出一個(gè)異常, 并且監(jiān)管策略是 akka.stream.Supervision.Stop
, 則流和 substreams 將以失敗完成淆珊。
如果拆分謂詞 p
拋出一個(gè)異常, 并且監(jiān)管策略是 akka.stream.Supervision.Resume
或者akka.stream.Supervision.Restart
, 該元素被丟棄夺饲,流和 substreams 將繼續(xù)運(yùn)行奸汇。
當(dāng)元素經(jīng)過時(shí)發(fā)出。當(dāng)提供的謂詞為真時(shí), 它發(fā)出元素并為后續(xù)元素打開一個(gè)新的流往声。
當(dāng)某個(gè)子流有待處理的元素而之前的元素尚未完全消耗時(shí)擂找,或子流背壓時(shí),背壓浩销。
當(dāng)上游完成時(shí)完成贯涎。
當(dāng)下游取消并且子流以SubstreamCancelStrategy.drain
取消時(shí),或者下游取消或任何子流以SubstreamCancelStrategy.propagate
取消時(shí)慢洋,取消塘雳。
也請(qǐng)參見FlowOps.splitWhen
。
(57)splitAfter(p: Out ? Boolean): SubFlow[Out, Mat, Repr, Closed]
即splitAfter(SubstreamCancelStrategy.drain)(p)
(58)flatMapConcat[T, M](f: Out ? Graph[SourceShape[T], M]): Repr[T]
將每個(gè)輸入元素轉(zhuǎn)換為輸出元素的Source普筹,然后通過串聯(lián)將其平鋪為輸出流败明,從而一個(gè)接著一個(gè)的完全處理Source。
當(dāng)當(dāng)前消費(fèi)的子流有一個(gè)元素可用時(shí)發(fā)出太防。
當(dāng)下游背壓時(shí)妻顶,背壓。
當(dāng)上游完成且所有子流完成時(shí)蜒车,完成讳嘱。
當(dāng)下游取消時(shí),取消酿愧。
(59)flatMapMerge[T, M](breadth: Int, f: Out ? Graph[SourceShape[T], M]): Repr[T]
將每個(gè)輸入元素轉(zhuǎn)換為輸出元素的“Source”沥潭,然后通過合并將輸出元素展平成輸出流,其中在任何給定時(shí)間最大“廣度”子流被處理嬉挡。
當(dāng)當(dāng)前消費(fèi)的子流有一個(gè)元素可用時(shí)發(fā)出钝鸽。
當(dāng)下游背壓時(shí),背壓棘伴。
當(dāng)上游完成且所有子流完成時(shí)寞埠,完成。
當(dāng)下游取消時(shí)焊夸,取消仁连。
例如:
val async = Flow[Int].map(_ * 2).async
Source(0 to 9)
.map(_ * 10)
.flatMapMerge(5, i ? Source(i to (i + 9)).via(async))
.grouped(1000)
.runWith(Sink.head)
.futureValue
.sorted should ===(0 to 198 by 2)
(60)initialTimeout(timeout: FiniteDuration): Repr[Out]
如果在提供的超時(shí)之前,第一個(gè)元素還沒有經(jīng)過這個(gè)階段,則流以scala.concurrent.TimeoutException
失敗饭冬。
當(dāng)上游發(fā)出元素時(shí)使鹅,發(fā)出。
當(dāng)下游背壓時(shí)昌抠,背壓患朱。
當(dāng)上游完成時(shí)完成或者在第一個(gè)元素到達(dá)前超時(shí)已過則失敗。
當(dāng)下游取消時(shí)取消炊苫。
(61)completionTimeout(timeout: FiniteDuration): Repr[Out]
當(dāng)超時(shí)已過時(shí)裁厅,流尚未完成,則流以scala.concurrent.TimeoutException
失敗侨艾。
當(dāng)上游發(fā)出元素時(shí)执虹,發(fā)出。
當(dāng)下游背壓時(shí)唠梨,背壓袋励。
當(dāng)上游完成時(shí)完成或者在上游完全前超時(shí)已過則失敗。
當(dāng)下游取消時(shí)取消当叭。
(62)idleTimeout(timeout: FiniteDuration): Repr[Out]
如果兩個(gè)處理的元素之間的時(shí)間超過了提供的超時(shí)時(shí)間茬故,則流以scala.concurrent.TimeoutException
失敗。定期檢查超時(shí)蚁鳖,所以檢查的分辨率是一個(gè)周期(等于超時(shí)值)磺芭。
當(dāng)上游發(fā)出元素時(shí),發(fā)出才睹。
當(dāng)下游背壓時(shí)徘跪,背壓。
當(dāng)上游完成時(shí)完成或者如果在兩個(gè)發(fā)出的元素之間超時(shí)失敗琅攘。
當(dāng)下游取消時(shí)取消垮庐。
(63)backpressureTimeout(timeout: FiniteDuration): Repr[Out]
如果一個(gè)元素的發(fā)出和下一個(gè)下游需求之間的時(shí)間超過了提供的超時(shí)時(shí)間,則流以scala.concurrent.TimeoutException
失敗坞琴。定期檢查超時(shí)哨查,所以檢查的分辨率是一個(gè)周期(等于超時(shí)值)。
當(dāng)上游發(fā)出元素時(shí)剧辐,發(fā)出寒亥。
當(dāng)下游背壓時(shí),背壓荧关。
當(dāng)上游完成時(shí)完成或者如果一個(gè)元素的發(fā)出和下一個(gè)下游需求之間的時(shí)間超時(shí)失敗溉奕。
當(dāng)下游取消時(shí)取消。