Akka之Flow相關(guān)API總結(jié)

(1)viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ? Mat3): ReprMat[T, Mat3]
通過附加給定的步驟轉(zhuǎn)換此Flow呆细。

combine函數(shù)用于組合此流和另外一個(gè)流的物化值,并放入Flow結(jié)果的物化值。

one of the values.建議使用內(nèi)部?jī)?yōu)化 Keep.leftKeep.right 保持, 而不是手動(dòng)編寫傳入其中一個(gè)值的函數(shù)墩划。

(2)toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ? Mat3): Sink[In, Mat3]
Flow連接到Sink藕甩,連接兩者的處理步驟碗啄。

combine函數(shù)用于組合此flow和Sink的物化值风瘦,并放入Sink結(jié)果的物化值凹蜈。

     +----------------------------+
     | Resulting Sink[In, M2]     |
     |                            |
     |  +------+        +------+  |
     |  |      |        |      |  |
 In ~~> | flow | ~Out~> | sink |  |
     |  |   Mat|        |     M|  |
     |  +------+        +------+  |
     +----------------------------+

(3)to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Sink[In, Mat]
toMat(sink)(Keep.left)米苹。生成的Sink的物化值將是現(xiàn)在Flow的物化值(忽略提供的Sink的物化值)糕伐;如果需要不同的策略,使用toMat蘸嘶。

(4)mapMaterializedValue[Mat2](f: Mat ? Mat2): ReprMat[Out, Mat2]
轉(zhuǎn)換此Flow的物化值良瞧。

(5)joinMat[Mat2, Mat3](flow: Graph[FlowShape[Out, In], Mat2])(combine: (Mat, Mat2) ? Mat3): RunnableGraph[Mat3]
Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]]
通過連接輸入和輸出, 將此Flow加入到另一個(gè)Flow, 創(chuàng)建 RunnableGraph。

 +------+        +-------+
 |      | ~Out~> |       |
 | this |        | other |
 |      | <~In~  |       |
 +------+        +-------+

例如:

val connection = Tcp().outgoingConnection(localhost)
      //#repl-client

      val replParser =
        Flow[String].takeWhile(_ != "q")
          .concat(Source.single("BYE"))
          .map(elem ? ByteString(s"$elem\n"))

      val repl = Flow[ByteString]
        .via(Framing.delimiter(
          ByteString("\n"),
          maximumFrameLength = 256,
          allowTruncation = true))
        .map(_.utf8String)
        .map(text ? println("Server: " + text))
        .map(_ ? readLine("> "))
        .via(replParser)

      connection.join(repl).run()
      //#repl-client

(6)join[Mat2](flow: Graph[FlowShape[Out, In], Mat2]): RunnableGraph[Mat]
joinMat(flow)(Keep.left)

(7)joinMat[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2])(combine: (Mat, Mat2) ? M): Flow[I2, O2, M]
將此 [[Flow]] 連接到 [[[BidiFlow]] 以關(guān)閉協(xié)議棧的 "頂部":

 +---------------------------+
 | Resulting Flow            |
 |                           |
 | +------+        +------+  |
 | |      | ~Out~> |      | ~~> O2
 | | flow |        | bidi |  |
 | |      | <~In~  |      | <~~ I2
 | +------+        +------+  |
 +---------------------------+

例如:

val bidi = BidiFlow.fromFlows(
    Flow[Int].map(x ? x.toLong + 2).withAttributes(name("top")),
    Flow[ByteString].map(_.decodeString("UTF-8")).withAttributes(name("bottom")))
val f = Flow[String].map(Integer.valueOf(_).toInt).join(bidi)
val result = Source(List(ByteString("1"), ByteString("2"))).via(f).limit(10).runWith(Sink.seq)

結(jié)果應(yīng)該是:Seq(3L, 4L)

(8)join[I2, O2, Mat2](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2]): Flow[I2, O2, Mat]
joinMat(bidi)(Keep.left)

(9)withAttributes(attr: Attributes): Repr[Out]
將此 [[流]] 的屬性更改為給定部分, 并密封屬性列表训唱。這意味著進(jìn)一步的調(diào)用將無法刪除這些屬性, 而是添加新的褥蚯。請(qǐng)注意, 此操作對(duì)空流沒有影響 (因?yàn)檫@些屬性只適用于已包含的處理階段)。
例如:

val flow = Flow[Int]
      .filter(100 / _ < 50).map(elem ? 100 / (5 - elem))
      .withAttributes(ActorAttributes.supervisionStrategy(decider))

(10)addAttributes(attr: Attributes): Repr[Out]
將給定的屬性添加到此流中况增。進(jìn)一步調(diào)用 "withAttributes" 不會(huì)刪除這些屬性赞庶。請(qǐng)注意, 此操作對(duì)空流沒有影響 (因?yàn)檫@些屬性只適用于已包含的處理階段)。
例如:

val section = Flow[Int].map(_ * 2).async
      .addAttributes(Attributes.inputBuffer(initial = 1, max = 1)) // the buffer size of this map is 1
val flow = section.via(Flow[Int].map(_ / 2)).async // the buffer size of this map is the default

(11)named(name: String): Repr[Out]
addAttributes(Attributes.name(name))。給Flow增加一個(gè)name屬性歧强。

(12)async: Repr[Out]
即澜薄,addAttributes(Attributes.asyncBoundary)。在這個(gè)Flow周圍放置一個(gè)異步邊界摊册。

(13)runWith[Mat1, Mat2](source: Graph[SourceShape[In], Mat1], sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): (Mat1, Mat2)
將Source連接到Flow表悬,然后連接到Sink并運(yùn)行它。返回的元組包含了Source和Sink的物化值丧靡。

(14)toProcessor: RunnableGraph[Processor[In @uncheckedVariance, Out @uncheckedVariance]]
將此流轉(zhuǎn)換為一個(gè)RunnableGraph(物化到一個(gè)響應(yīng)式流的org.reactivestreams.Processor, 它實(shí)現(xiàn)了由此流程封裝的操作)蟆沫。每個(gè)物化都會(huì)產(chǎn)生一個(gè)新的Processor實(shí)例, 即返回的 RunnableGraph是可重用的。

(15)recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T]
恢復(fù)允許在發(fā)生故障時(shí)發(fā)送最后一個(gè)元素温治,并優(yōu)雅地完成流饭庞。由于底層故障信號(hào)onError到達(dá)帶外(out-of-band),它可能跳過現(xiàn)有元素熬荆。 該階段可以恢復(fù)故障信號(hào)舟山,但不能恢復(fù)跳過的元素(它們將被丟棄)。

recover中拋出異常將自動(dòng)記錄日志在ERROR級(jí)別卤恳。

當(dāng)上游元素可用或者上游失敗偏函數(shù)pf返回一個(gè)元素時(shí)累盗,發(fā)送元素。

當(dāng)下游背壓時(shí)背壓突琳。

當(dāng)下游完成或者引起上游失敗的異常pf能夠處理若债,則完成。

當(dāng)下游取消時(shí)拆融,取消蠢琳。

例如:

Source(0 to 6).map(n ?
      if (n < 5) n.toString
      else throw new RuntimeException("Boom!")
    ).recover {
      case _: RuntimeException ? "stream truncated"
    }.runForeach(println)

輸出是:

0
1
2
3
4
stream truncated

(16)recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T]
RecoverWithRetries 允許Flow失敗時(shí),切換到備用Source镜豹。在故障嘗試多次并恢復(fù)后, 它將繼續(xù)有效, 這樣每次出現(xiàn)故障時(shí), 它就會(huì)被送入偏函數(shù)"pf", 而一個(gè)新的源可能會(huì)被物化傲须。請(qǐng)注意, 如果傳入0, 則根本不會(huì)嘗試恢復(fù)。

attempts設(shè)為負(fù)數(shù)趟脂,被解釋為無限泰讽。

由于底層故障信號(hào)onError到達(dá)帶外(out-of-band),它可能跳過現(xiàn)有元素昔期。 該階段可以恢復(fù)故障信號(hào)已卸,但不能恢復(fù)跳過的元素(它們將被丟棄)。

recover中拋出異常將自動(dòng)記錄日志在ERROR級(jí)別镇眷。

當(dāng)上游元素可用或者上游失敗偏函數(shù)pf返回一個(gè)元素時(shí)咬最,發(fā)送元素翎嫡。

當(dāng)下游背壓時(shí)背壓欠动。

當(dāng)下游完成或者引起上游失敗的異常pf能夠處理,則完成。

當(dāng)下游取消時(shí)具伍,取消翅雏。

參數(shù)attempts 重試的最大值或者設(shè)為-1將無限次重試。

參數(shù)偏函數(shù)pf 接收失敗原因并返回要物化的新Source, 如果有的話人芽。

如果attempts是一個(gè)負(fù)數(shù)但不是-1拋出IllegalArgumentException 異常望几。
例如:

val planB = Source(List("five", "six", "seven", "eight"))

    Source(0 to 10).map(n ?
      if (n < 5) n.toString
      else throw new RuntimeException("Boom!")
    ).recoverWithRetries(attempts = 1, {
      case _: RuntimeException ? planB
    }).runForeach(println)

輸出結(jié)果是:

0
1
2
3
4
five
six
seven
eight

(17)mapError(pf: PartialFunction[Throwable, Throwable]): Repr[Out]
雖然類似 recover,但此階段可用于將錯(cuò)誤信號(hào)轉(zhuǎn)換為另一種, 而不將其日志記錄為過程中的錯(cuò)誤萤厅。因此, 從這個(gè)意義上說, 它并不完全等同于 recover(t => throw t2)橄抹,因?yàn)?recover將日志記錄 t2錯(cuò)誤。

由于底層故障信號(hào) onError 到達(dá)帶外, 它可能會(huì)跳過現(xiàn)有的元素惕味。此階段可以恢復(fù)故障信號(hào), 但不能收回將被丟棄的已跳過的元素楼誓。

recover類似,在mapError中拋出異常將被記錄名挥。
例如:

val ex = new RuntimeException("ex") with NoStackTrace
val boom = new Exception("BOOM!") with NoStackTrace
Source(1 to 3).map { a ? if (a == 2) throw ex else a }
        .mapError { case t: Exception ? throw boom }

(18)map[T](f: Out ? T): Repr[T]
通過將給定函數(shù)應(yīng)用于每個(gè)元素, 使其通過此處理步驟來轉(zhuǎn)換此流疟羹。

遵守ActorAttributes.SupervisionStrategy屬性。

當(dāng)映射函數(shù)返回一個(gè)元素時(shí)發(fā)出禀倔。

當(dāng)下游背壓時(shí)背壓榄融。

當(dāng)下游完成時(shí)完成。

當(dāng)下游取消時(shí)取消救湖。

(19)statefulMapConcat[T](f: () ? Out ? immutable.Iterable[T]): Repr[T]
將每個(gè)輸入元素轉(zhuǎn)換為輸出元素的 Iterable, 然后將其平坦化到輸出流中愧杯。該轉(zhuǎn)換意味著是有狀態(tài)的, 這是通過為每個(gè)物化重新創(chuàng)建轉(zhuǎn)換函數(shù)來啟用的-返回的函數(shù)通常會(huì)掩蓋可變對(duì)象以在調(diào)用之間存儲(chǔ)狀態(tài)。對(duì)于無狀態(tài)變量, 請(qǐng)參見 [[FlowOps. mapConcat]]鞋既。

返回的Iterable不能包含null值, 因?yàn)樗鼈兪欠欠ǖ牧髟?根據(jù)反應(yīng)流規(guī)范民效。

遵守ActorAttributes.SupervisionStrategy屬性。

當(dāng)映射函數(shù)返回一個(gè)元素或者先前計(jì)算的集合仍有剩余的元素時(shí)發(fā)出涛救。

當(dāng)下游背壓或者先前計(jì)算的集合仍有剩余的元素時(shí)背壓畏邢。

當(dāng)下游完成并且剩余的元素已經(jīng)發(fā)出時(shí)完成。

當(dāng)下游取消時(shí)检吆,取消舒萎。

例如:

"be able to restart" in {
      Source(List(2, 1, 3, 4, 1)).statefulMapConcat(() ? {
        var prev: Option[Int] = None
        x ? {
          if (x % 3 == 0) throw ex
          prev match {
            case Some(e) ?
              prev = Some(x)
              (1 to e) map (_ ? x)
            case None ?
              prev = Some(x)
              List.empty[Int]
          }
        }
      }).withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
        .runWith(TestSink.probe[Int])
        .request(2).expectNext(1, 1)
        .request(4).expectNext(1, 1, 1, 1)
        .expectComplete()
    }

    "be able to resume" in {
      Source(List(2, 1, 3, 4, 1)).statefulMapConcat(() ? {
        var prev: Option[Int] = None
        x ? {
          if (x % 3 == 0) throw ex
          prev match {
            case Some(e) ?
              prev = Some(x)
              (1 to e) map (_ ? x)
            case None ?
              prev = Some(x)
              List.empty[Int]
          }
        }
      }).withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
        .runWith(TestSink.probe[Int])
        .request(2).expectNext(1, 1)
        .requestNext(4)
        .request(4).expectNext(1, 1, 1, 1)
        .expectComplete()
    }

(20)mapConcat[T](f: Out ? immutable.Iterable[T]): Repr[T]
statefulMapConcat(() ? f)。這個(gè)轉(zhuǎn)換函數(shù)沒有內(nèi)部狀態(tài)蹭沛,即沒有掩蓋可變對(duì)象臂寝。
例如:

val someDataSource = Source(List(List("1"), List("2"), List("3", "4", "5"), List("6", "7")))

//#flattening-seqs
val myData: Source[List[Message], NotUsed] = someDataSource
val flattened: Source[Message, NotUsed] = myData.mapConcat(identity)
//#flattening-seqs

Await.result(flattened.limit(8).runWith(Sink.seq), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7"))

(21)mapAsync[T](parallelism: Int)(f: Out ? Future[T]): Repr[T]
通過將給定函數(shù)應(yīng)用于每個(gè)元素, 使其通過此處理步驟來轉(zhuǎn)換此流。 函數(shù)返回一個(gè)Future并將Future的值發(fā)給下游摊灭。并行運(yùn)行的Future數(shù)量作為mapAsync的第一個(gè)參數(shù)給出咆贬。這些Future可能以任何順序完成,但下游發(fā)出的元素與從上游接收的元素相同帚呼。

如果函數(shù)`f'引發(fā)異常掏缎,或者如果Future完成失敗皱蹦,并且監(jiān)管(supervision)決定是akka.stream.Supervision.Stop,流將完成失敗眷蜈。

如果函數(shù)`f'拋出異常沪哺,或者如果Future完成失敗,監(jiān)管(supervision)決定是akka.stream.Supervision.Resumeakka.stream.Supervision.Restart酌儒,則元素被丟棄辜妓,流繼續(xù)運(yùn)行。

函數(shù)`f'總是按照元素到達(dá)的順序?qū)λ鼈冞M(jìn)行調(diào)用忌怎。

遵守ActorAttributes.SupervisionStrategy屬性籍滴。

當(dāng)由提供的函數(shù)返回的Future按順序完成下一個(gè)元素時(shí)發(fā)出。

當(dāng)future的數(shù)量到達(dá)配置的并行數(shù)并且下游背壓時(shí)榴啸,或者當(dāng)?shù)谝粋€(gè)future沒有完成時(shí)异逐,背壓。

當(dāng)下游完成并且所有的future完成并且所有元素已經(jīng)發(fā)出插掂,則完成灰瞻。

當(dāng)下游取消時(shí),取消辅甥。

例如:

//#mapAsync-ask
import akka.pattern.ask
implicit val askTimeout = Timeout(5.seconds)
val words: Source[String, NotUsed] =
      Source(List("hello", "hi"))

words
      .mapAsync(parallelism = 5)(elem ? (ref ? elem).mapTo[String])
      // continue processing of the replies from the actor
      .map(_.toLowerCase)
      .runWith(Sink.ignore)
//#mapAsync-ask

(22)mapAsyncUnordered[T](parallelism: Int)(f: Out ? Future[T]): Repr[T]
通過將給定函數(shù)應(yīng)用于每個(gè)元素, 使其通過此處理步驟來轉(zhuǎn)換此流酝润。 函數(shù)返回一個(gè)Future并將Future的值發(fā)給下游。并行運(yùn)行的Future數(shù)量作為mapAsyncUnordered的第一個(gè)參數(shù)給出璃弄。每個(gè)已處理的元素將在準(zhǔn)備就緒后立即發(fā)送到下游, 也就是說, 元素可能不會(huì)按照從上游接收到的相同順序發(fā)送到下游要销。

如果函數(shù)`f'引發(fā)異常,或者如果Future完成失敗夏块,并且監(jiān)管(supervision)決定是akka.stream.Supervision.Stop疏咐,流將完成失敗。

如果函數(shù)`f'拋出異常脐供,或者如果Future完成失敗浑塞,監(jiān)管(supervision)決定是akka.stream.Supervision.Resumeakka.stream.Supervision.Restart,則元素被丟棄政己,流繼續(xù)運(yùn)行酌壕。

函數(shù)`f'總是按照元素到達(dá)的順序?qū)λ鼈冞M(jìn)行調(diào)用(即使 "f" 返回的Future的結(jié)果可能以不同的順序發(fā)出)。

遵守ActorAttributes.SupervisionStrategy屬性歇由。

當(dāng)任一由給定函數(shù)返回的Future完成時(shí)卵牍,發(fā)送。

當(dāng)future的數(shù)量到達(dá)配置的并行數(shù)并且下游背壓時(shí)背壓沦泌。

當(dāng)下游完成并且所有的future已經(jīng)完成并且所有元素已經(jīng)發(fā)送時(shí)糊昙,完成。

當(dāng)下游取消時(shí)取消谢谦。

(23)filter(p: Out ? Boolean): Repr[Out]
只傳遞滿足給定謂詞的元素释牺。

遵守ActorAttributes.SupervisionStrategy屬性萝衩。

當(dāng)對(duì)于某元素,給定的謂詞返回true船侧,則發(fā)送該元素。

當(dāng)對(duì)于某元素給定的謂詞返回true并且下游背壓時(shí)厅各,背壓镜撩。

當(dāng)下游完成時(shí)完成。

當(dāng)下游取消時(shí)取消队塘。

(24)filterNot(p: Out ? Boolean): Repr[Out]
只傳遞那些不滿足給定謂詞的元素袁梗。

(25)takeWhile(p: Out ? Boolean, inclusive: Boolean): Repr[Out]
在謂詞第一次返回 false 后終止處理 (并取消上游發(fā)布者), 包括第一個(gè)失敗的元素 (如果inclusive是真的) 由于輸入緩沖, 某些元素可能已從上游發(fā)布者請(qǐng)求, 然后此步驟的下游不處理。

如果第一個(gè)流元素的謂詞為 false, 則該流將完成而不生成任何元素憔古。

遵守ActorAttributes.SupervisionStrategy屬性遮怜。

當(dāng)謂詞為true時(shí),發(fā)送元素鸿市。

當(dāng)下游背壓時(shí)背壓锯梁。

如果inclusive為false當(dāng)謂詞為false時(shí)或者當(dāng)如果inclusive為true而謂詞為false后第一個(gè)元素已發(fā)出并且下游完成時(shí)或者下游完成時(shí),完成焰情。
例如:

Source(1 to 10).takeWhile(_ < 3, true).runWith(TestSink.probe[Int])
        .request(4)
        .expectNext(1, 2, 3)
        .expectComplete()

(26)takeWhile(p: Out ? Boolean): Repr[Out]
即takeWhile(p, false)
在謂詞第一次返回false后陌凳,終止處理(并且取消上游發(fā)布者)。由于輸入緩沖内舟,一些元素可能已經(jīng)從上游發(fā)布者請(qǐng)求合敦,將不會(huì)被此步驟的下游處理。

如果第一個(gè)流元素的謂詞為 false, 則該流將完成而不生成任何元素验游。

當(dāng)謂詞為true時(shí)充岛,發(fā)送元素。

當(dāng)下游背壓時(shí)背壓耕蝉。

當(dāng)謂詞返回false或者下游取消時(shí)崔梗,則取消。

例如:

val flowUnderTest = Flow[Int].takeWhile(_ < 5)

val future = Source(1 to 10).via(flowUnderTest).runWith(Sink.fold(Seq.empty[Int])(_ :+ _))
val result = Await.result(future, 3.seconds)
assert(result == (1 to 4))

(27)dropWhile(p: Out ? Boolean): Repr[Out]
例如:
在謂詞為true時(shí)垒在,丟棄該元素炒俱。
謂詞第一次返回false后,所有元素都將被采用爪膊。

Source(1 to 4).dropWhile(_ < 3).runWith(TestSink.probe[Int])
        .request(2)
        .expectNext(3, 4)
        .expectComplete()

Source(1 to 4).dropWhile(a ? if (a < 3) true else throw TE("")).withAttributes(supervisionStrategy(resumingDecider))
        .runWith(TestSink.probe[Int])
        .request(1)
        .expectComplete()

Source(1 to 4).dropWhile {
        case 1 | 3 ? true
        case 4     ? false
        case 2     ? throw TE("")
 }.withAttributes(supervisionStrategy(restartingDecider))
        .runWith(TestSink.probe[Int])
        .request(1)
        .expectNext(4)
        .expectComplete()

(28)collect[T](pf: PartialFunction[Out, T]): Repr[T]
通過將給定的偏函數(shù)應(yīng)用于每個(gè)元素, 以便通過此處理步驟來轉(zhuǎn)換此流权悟。 不匹配的元素被過濾掉。

遵守ActorAttributes.SupervisionStrategy屬性推盛。

如果某元素在偏函數(shù)中有定義峦阁,則發(fā)送該元素。

如果某元素在偏函數(shù)中有定義并且下游背壓耘成,則背壓榔昔。

如果下游完成驹闰,則完成。

如果下游取消撒会,則取消嘹朗。

例如:

 val emailAddresses: Source[String, NotUsed] =
      authors
        .mapAsync(4)(author ? addressSystem.lookupEmail(author.handle))
        .collect { case Some(emailAddress) ? emailAddress }

(29)grouped(n: Int): Repr[immutable.Seq[Out]]
將該流塊組合成給定大小的組,最后一組可能由于流的結(jié)束而小于請(qǐng)求诵肛。

n必須是正數(shù), 否則拋出IllegalArgumentException異常屹培。

已累積指定數(shù)量的元素或上游完成時(shí)發(fā)出元素。

已組裝出一組并且下游背壓時(shí)背壓怔檩。

下游完成時(shí)褪秀,完成。

下游取消時(shí)薛训,取消媒吗。

例如:

Source(1 to 4).grouped(2)
      .runWith(Sink.seq)

結(jié)果應(yīng)該是:

Seq(Seq(1, 2), Seq(3, 4))

(30)limitWeighted[T](max: Long)(costFn: Out ? Long): Repr[Out]
通過使用成本函數(shù)來評(píng)估傳入元素的成本, 確保流有界性。到底有多少元素將被允許前往下游取決于每個(gè)元素的評(píng)估成本乙埃。如果累計(jì)成本超過最大值, 它將向上游發(fā)出故障 "StreamLimitException" 信號(hào)闸英。

由于輸入緩沖,可能已經(jīng)從上游發(fā)布者請(qǐng)求了一些元素介袜,然后將不會(huì)在該步驟的下游處理自阱。

遵守ActorAttributes.SupervisionStrategy屬性。

上游發(fā)出且已累計(jì)的成本沒有達(dá)到最大值時(shí)發(fā)出米酬。

下游背壓時(shí)背壓沛豌。

累計(jì)的成本超過最大值時(shí)報(bào)錯(cuò)。

下游取消時(shí)取消赃额。

(31)limit(max: Long): Repr[Out]
limitWeighted(max)(_ ? 1)
通過限制上游元素的數(shù)量來確保流的有界性加派。如果傳入元素的數(shù)量超過最大值, 它將向上游發(fā)出故障 "StreamLimitException" 信號(hào)。

由于輸入緩沖跳芳,可能已經(jīng)從上游發(fā)布者請(qǐng)求了一些元素芍锦,然后將不會(huì)在該步驟的下游處理。

上游發(fā)出且已發(fā)出的元素?cái)?shù)量沒有達(dá)到最大值時(shí)發(fā)出飞盆。

下游背壓時(shí)背壓娄琉。

傳入元素的總數(shù)超過最大值時(shí)報(bào)錯(cuò)。

下游取消時(shí)取消吓歇。

(32)sliding(n: Int, step: Int = 1): Repr[immutable.Seq[Out]]
在流上應(yīng)用滑動(dòng)窗口, 并將窗口作為元素組返回孽水;因?yàn)榱魑膊浚詈笠粋€(gè)組可能小于請(qǐng)求城看。

n必須是正數(shù), 否則拋出IllegalArgumentException異常女气。

step````必須是正數(shù), 否則拋出IllegalArgumentException```異常。

當(dāng)窗口內(nèi)已收集足夠的元素或上游已完成测柠,發(fā)出元素炼鞠。

當(dāng)窗口已收集足夠的元素且下游背壓時(shí)背壓缘滥。

當(dāng)上游完成時(shí),完成谒主。

當(dāng)下游取消時(shí)朝扼,取消。

(33)scan[T](zero: T)(f: (T, Out) ? T): Repr[T]
Similar to fold but is not a terminal operation,

  • emits its current value which starts at zero and then
  • applies the current and next value to the given function f,
  • emitting the next current value.
    類似于fold, 但不是終端操作, 從zero 作為當(dāng)前值開始, 然后將當(dāng)前值和上游傳入的元素應(yīng)用于給定函數(shù) f, 立即將結(jié)果(中間結(jié)果)發(fā)送到下游霎肯,并作為下次計(jì)算的當(dāng)前值擎颖。

如果函數(shù)```f````拋出異常,并且監(jiān)督?jīng)Q定是[[akka.stream.Supervision.Restart]]姿现,則當(dāng)前值將再次從零開始肠仪,流將繼續(xù)肖抱。

遵守ActorAttributes.SupervisionStrategy屬性备典。

當(dāng)掃描元素的函數(shù)返回一個(gè)新元素時(shí)發(fā)出元素。

當(dāng)下游背壓時(shí)背壓意述。

當(dāng)上游完成時(shí)完成提佣。

當(dāng)下游取消時(shí)取消。

例如:

val scan = Flow[Int].scan(0) { (old, current) ?
        require(current > 0)
        old + current
      }.withAttributes(supervisionStrategy(Supervision.restartingDecider))
Source(List(1, 3, -1, 5, 7)).via(scan)

注意荤崇,scan會(huì)將每一次計(jì)算結(jié)果都發(fā)給它的下游拌屏,此處下游會(huì)收到0, 1, 4, 0, 5, 12共六個(gè)元素。

(34)scanAsync[T](zero: T)(f: (T, Out) ? Future[T]): Repr[T]
類似于scan术荤,但使用異步函數(shù)倚喂,它發(fā)出其當(dāng)前值 (從zero 開始), 然后將當(dāng)前值和下一個(gè)數(shù)值應(yīng)用于給定函數(shù) f, 從而發(fā)出解析為下一個(gè)當(dāng)前值的Future

如果函數(shù)f````拋出異常瓣戚,并且監(jiān)督?jīng)Q定是[[akka.stream.Supervision.Restart]]端圈,則當(dāng)前值將再次從zero```開始,流將繼續(xù)運(yùn)行子库。

如果函數(shù) f 拋出異常, 并且監(jiān)視決策是akka.stream.Supervision.Resume當(dāng)前值從上一個(gè)當(dāng)前值開始, 或者當(dāng)它沒有一個(gè)時(shí), 則為zero, 流將繼續(xù)舱权。

遵守ActorAttributes.SupervisionStrategy屬性。

當(dāng)f返回future完成時(shí)仑嗅,發(fā)送元素鸳兽。

當(dāng)下游背壓時(shí)背壓接癌。

當(dāng)上游完成并且最后一個(gè)f返回的future完成時(shí),,完成雨涛。

下游取消時(shí),取消弥搞。

(35)fold[T](zero: T)(f: (T, Out) ? T): Repr[T]
類似于scan抹蚀,但它只在上游傳入元素完成,并計(jì)算出最終結(jié)果后郭变,才將最終結(jié)果發(fā)送到下游颜价。應(yīng)用給定的函數(shù)將當(dāng)前值和下一個(gè)值進(jìn)行計(jì)算涯保,其結(jié)果作為下一次計(jì)算的當(dāng)前值(注意此中間結(jié)果不會(huì)發(fā)給下游)。

如果函數(shù)f````拋出異常周伦,并且監(jiān)督?jīng)Q定是[[akka.stream.Supervision.Restart]]夕春,則當(dāng)前值將再次從zero```開始,流將繼續(xù)運(yùn)行专挪。

遵守ActorAttributes.SupervisionStrategy屬性及志。

當(dāng)上游完成時(shí),發(fā)送計(jì)算結(jié)果寨腔。

當(dāng)下游背壓時(shí)背壓速侈。

當(dāng)上游完成時(shí),完成迫卢。

下游取消時(shí)倚搬,取消。

例如:

val fold= Flow[Int].fold(0) { (old, current) ?
        require(current > 0)
        old + current
      }.withAttributes(supervisionStrategy(Supervision.restartingDecider))
Source(List(1, 3, -1, 5, 7)).via(fold)

注意下游只會(huì)收到一個(gè)元素12乾蛤。

(36)foldAsync[T](zero: T)(f: (T, Out) ? Future[T]): Repr[T]
類似于fold每界,但使用異步函數(shù),應(yīng)用給定的函數(shù)將當(dāng)前值和下一個(gè)值進(jìn)行計(jì)算家卖,其結(jié)果作為下一次計(jì)算的當(dāng)前值(注意此中間結(jié)果不會(huì)發(fā)給下游)眨层。

遵守ActorAttributes.SupervisionStrategy屬性。

如果函數(shù)f````拋出異常上荡,并且監(jiān)督?jīng)Q定是[[akka.stream.Supervision.Restart]]趴樱,則當(dāng)前值將再次從zero```開始,流將繼續(xù)運(yùn)行酪捡。

當(dāng)上游完成時(shí)叁征,發(fā)送計(jì)算結(jié)果。

當(dāng)下游背壓時(shí)背壓沛善。

當(dāng)上游完成時(shí)航揉,完成。

下游取消時(shí)金刁,取消帅涂。

(37)reduce[T >: Out](f: (T, T) ? T): Repr[T]
類似于fold但使用收到的第一個(gè)元素作為zero元素。應(yīng)用給定的函數(shù)將當(dāng)前值和下一個(gè)值進(jìn)行計(jì)算尤蛮,其結(jié)果作為下一次計(jì)算的當(dāng)前值(注意此中間結(jié)果不會(huì)發(fā)給下游)媳友。

如果流是空的 (即在發(fā)送任何元素之前完成), 則 reduce 階段將以NoSuchElementException使下游失敗, 這在語義上與 Scala 的標(biāo)準(zhǔn)庫集合在這種情況下的狀態(tài)一致。

遵守ActorAttributes.SupervisionStrategy屬性产捞。

當(dāng)上游完成時(shí)醇锚,發(fā)送計(jì)算結(jié)果。

當(dāng)下游背壓時(shí),背壓焊唬。

當(dāng)上游完成時(shí)恋昼,完成。

下游取消時(shí)赶促,取消液肌。

(38)intersperse[T >: Out](start: T, inject: T, end: T): Repr[T]以及intersperse[T >: Out](inject: T): Repr[T]
使用所提供元素的散流, 類似于scala.collection.immutable.List.mkString在List元素之間插入分隔符的方式。

此外, 還可以向流中注入起始和結(jié)束標(biāo)記元素鸥滨。
例如:

  val nums = Source(List(1,2,3)).map(_.toString)
  nums.intersperse(",")            //   1 , 2 , 3
  nums.intersperse("[", ",", "]")  // [ 1 , 2 , 3 ]

如果您只想預(yù)置或僅追加元素 (但是仍然使用intercept特性在元素之間插入一個(gè)分隔符嗦哆,你可能需要使用下面的模式而不是3個(gè)參數(shù)版本的intersperse(參見Source .concat用于語義細(xì)節(jié)):

 Source.single(">> ") ++ Source(List("1", "2", "3")).intersperse(",")

 Source(List("1", "2", "3")).intersperse(",") ++ Source.single("END")

上游發(fā)出時(shí)發(fā)出(或者如果有提供start元素,先發(fā)送它)婿滓。

當(dāng)下游背壓時(shí)老速,背壓。

當(dāng)上游完成時(shí)凸主,完成橘券。

下游取消時(shí),取消秕铛。

(39)groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]]
將流進(jìn)行分組约郁,或按照一個(gè)時(shí)間窗口內(nèi)接收到的元素分組缩挑,或按照給定數(shù)量的限制進(jìn)行分組但两,無論哪一個(gè)先發(fā)生。 如果沒有從上游接收到元素供置,空組將不會(huì)被發(fā)出谨湘。 流結(jié)束之前的最后一個(gè)組將包含自先前發(fā)出的組以來的緩沖元素。

n必須是正數(shù)芥丧,d必須大于0秒紧阔,否則拋出IllegalArgumentException異常。

從上一個(gè)組發(fā)射后經(jīng)過配置的時(shí)間或者緩沖n個(gè)元素后發(fā)送续担。

當(dāng)下游背壓且已經(jīng)有n+1個(gè)元素緩沖擅耽,則背壓。

當(dāng)上游完成并且最后一組已發(fā)送物遇,完成乖仇。

當(dāng)下游取消,取消询兴。

(40)groupedWeightedWithin(maxWeight: Long, d: FiniteDuration)(costFn: Out ? Long): Repr[immutable.Seq[Out]]
將流進(jìn)行分組乃沙,或按照一個(gè)時(shí)間窗口內(nèi)接收到的元素分組,或按照元素權(quán)重的限制進(jìn)行分組诗舰,無論哪一個(gè)先發(fā)生警儒。 如果沒有從上游接收到元素,空組將不會(huì)被發(fā)出眶根。 流結(jié)束之前的最后一個(gè)組將包含自先前發(fā)出的組以來的緩沖元素蜀铲。

maxWeight必須是正數(shù)边琉,d必須大于0秒,否則拋出IllegalArgumentException異常记劝。

從上一個(gè)組發(fā)射后經(jīng)過配置的時(shí)間或者到達(dá)權(quán)重限制后發(fā)送艺骂。

當(dāng)下游背壓,并且緩沖組(+等待元素)權(quán)重大于maxWeight時(shí)隆夯,背壓钳恕。

當(dāng)上游完成并且最后一組已發(fā)送,完成蹄衷。

當(dāng)下游取消忧额,取消。

(41)delay(of: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.dropTail): Repr[Out]
按一定時(shí)間間隔將元素的發(fā)射時(shí)間改變愧口。在等待下一個(gè)元素被發(fā)送時(shí)睦番,它允許存儲(chǔ)元素在內(nèi)部緩沖區(qū)。如果緩沖區(qū)內(nèi)沒有足夠的空間耍属,根據(jù)定義的akka.stream.DelayOverflowStrategy托嚣,可以丟棄元素或者背壓上游。

延遲精度為 10ms, 以避免不必要的計(jì)時(shí)器調(diào)度周期厚骗。

內(nèi)部緩沖區(qū)的默認(rèn)容量為16示启。您可以通過調(diào)用addAttributes (inputBuffer)來設(shè)置緩沖區(qū)大小。

當(dāng)緩沖區(qū)有等待的元素并且為這個(gè)元素配置的時(shí)間已到達(dá)领舰,則發(fā)送元素夫嗓。

OverflowStrategy策略介紹:

  • emitEarly:如果緩沖區(qū)已滿,當(dāng)新的元素可用時(shí)冲秽,這個(gè)策略不等待直接發(fā)送下一個(gè)元素到下游舍咖。
  • dropHead:如果緩沖區(qū)已滿,當(dāng)新的元素到達(dá)锉桑,丟棄緩沖區(qū)中最舊的元素排霉,從而為新元素留出空間。
  • dropTail:如果緩沖區(qū)已滿民轴,當(dāng)新的元素到達(dá)攻柠,丟棄緩沖區(qū)中最新的元素,從而為新元素留出空間杉武。
  • dropBuffer:如果緩沖區(qū)已滿辙诞,當(dāng)新的元素到達(dá),丟棄緩沖區(qū)中所有元素轻抱,從而為新元素留出空間飞涂。
  • dropNew:如果緩沖區(qū)已滿,當(dāng)新的元素到達(dá),丟棄這個(gè)新元素较店。
  • backpressure:如果緩沖區(qū)已滿士八,當(dāng)新的元素到達(dá),則背壓上游發(fā)布者直到緩沖區(qū)內(nèi)的空間可用梁呈。
  • fail:如果緩沖區(qū)已滿婚度,當(dāng)新的元素到達(dá),則以失敗完成流官卡。

當(dāng)上游完成并且緩沖元素被耗盡時(shí)蝗茁,完成。

當(dāng)下游取消時(shí)寻咒,取消哮翘。

(42)drop(n: Long): Repr[Out]
在流的開始處丟棄給定數(shù)目的元素。
如果n為零或?yàn)樨?fù)數(shù), 則不會(huì)丟棄任何元素毛秘。

當(dāng)已經(jīng)丟棄了指定數(shù)目的元素饭寺,則發(fā)送元素。

當(dāng)指定數(shù)目的元素已被丟棄并且下游背壓時(shí)叫挟,背壓艰匙。

當(dāng)上游完成時(shí),完成抹恳。

當(dāng)下游取消時(shí)员凝,取消。

(43)dropWithin(d: FiniteDuration): Repr[Out]
在流的開始處在給定的持續(xù)時(shí)間內(nèi)适秩,丟棄接收的元素绊序。

當(dāng)經(jīng)過指定的時(shí)間后并且新的上游元素到達(dá)硕舆,發(fā)送元素秽荞。

當(dāng)下游背壓時(shí),背壓抚官。

當(dāng)上游完成時(shí)扬跋,完成。

下游取消時(shí)凌节,取消钦听。

(44)take(n: Long): Repr[Out]
在給定數(shù)量的元素之后終止處理 (并取消上游發(fā)布者)。由于輸入緩沖, 有些元素可能已從上游發(fā)布者請(qǐng)求, 這些元素將不會(huì)在這一步的下游進(jìn)行處理倍奢。

如果n是0或者負(fù)數(shù)朴上,流將不產(chǎn)生任何元素完成。

指定的數(shù)目還沒到達(dá)時(shí)卒煞,發(fā)送元素痪宰。

當(dāng)下游背壓時(shí),背壓。

當(dāng)指定數(shù)目的元素已經(jīng)處理或者上游完成時(shí)衣撬,完成乖订。

當(dāng)指定數(shù)目的元素已被處理或者下游取消時(shí),取消具练。

(45)takeWithin(d: FiniteDuration): Repr[Out]
在給定的持續(xù)時(shí)間后乍构,終止處理(并取消上游發(fā)布者)。由于輸入緩沖, 有些元素可能已從上游發(fā)布者請(qǐng)求, 這些元素將不會(huì)在這一步的下游進(jìn)行處理扛点。

請(qǐng)注意哥遮,這可以與take結(jié)合來限制持續(xù)時(shí)間內(nèi)的元素?cái)?shù)量。

當(dāng)上游元素到達(dá)時(shí)陵究,發(fā)送元素昔善。

當(dāng)下游背壓時(shí),背壓畔乙。

在上游完成或計(jì)時(shí)器觸發(fā)時(shí)完成君仆。

當(dāng)下游取消或計(jì)時(shí)器觸發(fā)時(shí)取消。

(46)conflateWithSeed[S](seed: Out ? S)(aggregate: (S, Out) ? S): Repr[S]
通過將元素合并到一個(gè)摘要中牲距,允許更快的上游獨(dú)立于較慢的訂閱者返咱,直到訂閱者準(zhǔn)備好接受它們。例如牍鞠,如果上游的發(fā)布者速度更快咖摹,那么合并的步驟可能會(huì)勻化傳入的數(shù)目。

此版本的合并允許從第一個(gè)元素派生種子, 并將聚合類型更改為與輸入類型不同的類型难述。有關(guān)不更改類型的更簡(jiǎn)單版本, 請(qǐng)參見FlowOps.conflate萤晴。

此元素僅在上游速度較快時(shí)聚合元素, 但如果下游速度較快, 則不會(huì)復(fù)制元素。

遵守ActorAttributes.SupervisionStrategy屬性胁后。

當(dāng)下游停止背壓并且有可用的已合并元素店读,則發(fā)送元素。

沒有背壓的時(shí)候攀芯。

當(dāng)上游完成時(shí)屯断,完成。

當(dāng)下游取消時(shí)侣诺,取消殖演。

參數(shù)seed,使用第一個(gè)未消耗的元素作為開始年鸳,為合并值提供第一個(gè)狀態(tài)趴久。
參數(shù)aggregate, 獲取當(dāng)前聚合的值和當(dāng)前正在等待的元素搔确,以生成一個(gè)新的聚合值彼棍。

也請(qǐng)參見FlowOps.conflate, FlowOps.limit, FlowOps.limitWeighted, FlowOps.batch, FlowOps.batchWeighted

(47)conflate[O2 >: Out](aggregate: (O2, O2) ? O2): Repr[O2]
通過將元素合并到一個(gè)摘要中已添,允許更快的上游獨(dú)立于較慢的訂閱者,直到訂閱者準(zhǔn)備好接受它們滥酥。例如更舞,如果上游的發(fā)布者速度更快,那么合并的步驟可能會(huì)勻化傳入的數(shù)目坎吻。

此版本的合并不改變流的輸出類型缆蝉。請(qǐng)參見FlowOps.conflateWithSeed,一個(gè)更為復(fù)雜的版本瘦真,可以在聚合時(shí)使用種子函數(shù)并轉(zhuǎn)換元素刊头。

此元素僅在上游速度較快時(shí)聚合元素, 但如果下游速度較快, 則不會(huì)復(fù)制元素。

遵守ActorAttributes.SupervisionStrategy屬性诸尽。

當(dāng)下游停止背壓并且有可用的已合并元素原杂,則發(fā)送元素。

沒有背壓的時(shí)候您机。

當(dāng)上游完成時(shí)穿肄,完成。

當(dāng)下游取消時(shí)际看,取消咸产。

參數(shù)aggregate, 獲取當(dāng)前聚合的值和當(dāng)前正在等待的元素仲闽,以生成一個(gè)新的聚合值脑溢。

也請(qǐng)參見FlowOps.conflateWithSeed, FlowOps.limit, FlowOps.limitWeighted, FlowOps.batch, FlowOps.batchWeighted

(48)batch[S](max: Long, seed: Out ? S)(aggregate: (S, Out) ? S): Repr[S]
通過將元素聚合到批次中,允許更快的上游獨(dú)立于較慢的訂閱者赖欣,直到訂閱者準(zhǔn)備好接受它們屑彻。例如,如果上游發(fā)布者更快顶吮,一個(gè)batch步驟可以存儲(chǔ)接收到的元素于一個(gè)數(shù)組直到最大限制值社牲。

此元素僅在上游速度較快時(shí)聚合元素, 但如果下游速度較快, 則不會(huì)復(fù)制元素。

遵守ActorAttributes.SupervisionStrategy屬性云矫。

當(dāng)下游停止背壓并且有可用的已聚合元素膳沽,則發(fā)送元素。

當(dāng)成批元素達(dá)到最大值而有一個(gè)待處理元素且下游背壓時(shí)让禀,背壓。

當(dāng)上游完成且沒有成批或待處理元素等待陨界,完成巡揍。

參數(shù)max 在背壓上游前,可成批的最大元素?cái)?shù)
(必須是非0正數(shù))菌瘪。
參數(shù)seed腮敌,使用第一個(gè)未消耗的元素作為開始阱当,為合并值提供第一個(gè)狀態(tài)。
參數(shù)aggregate糜工, 獲取當(dāng)前成批的值和當(dāng)前正在等待的元素弊添,以生成一個(gè)新的聚合值。

(49)batchWeighted[S](max: Long, costFn: Out ? Long, seed: Out ? S)(aggregate: (S, Out) ? S): Repr[S]
通過將元素聚合到批次中捌木,允許更快的上游獨(dú)立于較慢的訂閱者油坝,直到訂閱者準(zhǔn)備好接受它們。例如刨裆,如果上游發(fā)布者速度更快澈圈,則batch步驟可以將“ByteString”元素連接到允許的最大限制。

此元素僅在上游速度較快時(shí)聚合元素, 但如果下游速度較快, 則不會(huì)復(fù)制元素帆啃。

成批將應(yīng)用于所有元素瞬女,即使一個(gè)單一元素成本比允許的最大值還大。這種情況下努潘,先前成批的元素將被發(fā)送诽偷,然后這個(gè)"重"元素將被發(fā)送(在應(yīng)用了種子函數(shù)后)而不用它與其它元素成批處理,然后其余傳入的元素成批處理疯坤。

當(dāng)下游停止背壓并且有可用的已聚合元素渤刃,則發(fā)送元素。

當(dāng)成批元素權(quán)重值到達(dá)最大值而有一個(gè)待處理元素且下游背壓時(shí)贴膘,背壓卖子。

當(dāng)上游完成且沒有成批或待處理元素等待,完成刑峡。

當(dāng)下游取消時(shí)洋闽,取消。

參數(shù)max 在背壓上游前突梦,成批元素的最大權(quán)重值(必須時(shí)非0正數(shù))
參數(shù)costFn 用于計(jì)算單一元素權(quán)重值的函數(shù)
參數(shù)seed诫舅,使用第一個(gè)未消耗的元素作為開始,為成批值提供第一個(gè)狀態(tài)宫患。
參數(shù)aggregate刊懈, 獲取當(dāng)前成批的值和當(dāng)前正在等待的元素,以生成一個(gè)新的成批值娃闲。

(50)expand[U](extrapolate: Out ? Iterator[U]): Repr[U]
通過從較老的元素中外推元素虚汛,直到從上游來新元素,可以使更快地下游獨(dú)立于較慢的發(fā)布者皇帮。例如卷哩,擴(kuò)展步驟可能會(huì)重復(fù)給訂閱者的最后一個(gè)元素,直到它從上游接收到更新属拾。

這個(gè)元素永遠(yuǎn)不會(huì)“丟棄”上游元素将谊,因?yàn)樗性囟冀?jīng)過至少一個(gè)外推步驟冷溶。

這意味著如果上游實(shí)際上比上游更快,它將被下游用戶背壓尊浓。

Expand不支持akka.stream.Supervision.Restartakka.stream.Supervision.Resume 逞频。

來自seed或者extrapolate函數(shù)的異常將使流以失敗完成。

當(dāng)下游停止背壓時(shí)栋齿,發(fā)送元素苗胀。

當(dāng)下游背壓或者迭代器運(yùn)行為空時(shí),背壓褒颈。

當(dāng)上游完成時(shí)柒巫,完成。

當(dāng)下游取消時(shí)谷丸,取消堡掏。

(51)buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out]
在流中添加一個(gè)固定大小的緩沖區(qū), 允許從較快的上游存儲(chǔ)元素, 直到它變?yōu)闈M的。如果沒有可用的空間, 根據(jù)已定義的 akka.stream.OverflowStrategy, 它可能會(huì)丟棄元素或上背壓上游刨疼。

當(dāng)下游停止背壓泉唁,并且在緩沖區(qū)有待處理元素,則發(fā)送元素揩慕。

OverflowStrategy策略介紹:

  • emitEarly:如果緩沖區(qū)已滿亭畜,當(dāng)新的元素可用時(shí),這個(gè)策略不等待直接發(fā)送下一個(gè)元素到下游迎卤。
  • dropHead:如果緩沖區(qū)已滿拴鸵,當(dāng)新的元素到達(dá),丟棄緩沖區(qū)中最舊的元素蜗搔,從而為新元素留出空間劲藐。
  • dropTail:如果緩沖區(qū)已滿,當(dāng)新的元素到達(dá)樟凄,丟棄緩沖區(qū)中最新的元素聘芜,從而為新元素留出空間。
  • dropBuffer:如果緩沖區(qū)已滿缝龄,當(dāng)新的元素到達(dá)汰现,丟棄緩沖區(qū)中所有元素,從而為新元素留出空間叔壤。
  • dropNew:如果緩沖區(qū)已滿瞎饲,當(dāng)新的元素到達(dá),丟棄這個(gè)新元素百新。
  • backpressure:如果緩沖區(qū)已滿企软,當(dāng)新的元素到達(dá),則背壓上游發(fā)布者直到緩沖區(qū)內(nèi)的空間可用饭望。
  • fail:如果緩沖區(qū)已滿仗哨,當(dāng)新的元素到達(dá),則以失敗完成流铅辞。

當(dāng)上游完成并且緩沖元素被耗盡時(shí)厌漂,完成。

當(dāng)下游取消時(shí)斟珊,取消苇倡。

(52)prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U, NotUsed])]
從流中獲取n個(gè)元素 (僅當(dāng)上游在發(fā)出n個(gè)元素之前完成比n小) 并返回一個(gè)包含所取元素的嚴(yán)格序列和一個(gè)表示剩余元素的流的pair。如果 "n" 為零或負(fù), 則返回一個(gè)空集合和一個(gè)流(將包含整個(gè)上游的流保持不變)的pair囤踩。

如果上游出現(xiàn)錯(cuò)誤, 則取決于當(dāng)前狀態(tài)

  • 如果在獲取n個(gè)元素之前旨椒,主流標(biāo)識(shí)錯(cuò)誤,子流尚未發(fā)出堵漱。
  • 如果在獲取n個(gè)元素之后综慎,主流標(biāo)識(shí)錯(cuò)誤,子流已經(jīng)發(fā)出(在那一刻勤庐,主流已經(jīng)完成)

當(dāng)達(dá)到配置的“前綴”元素?cái)?shù)目時(shí)示惊,發(fā)送“前綴”以及剩余部分組成的子流。

當(dāng)下游背壓或者子流背壓時(shí)愉镰,背壓米罚。

當(dāng)“前綴”元素和子流都已耗盡時(shí),完成丈探。

當(dāng)下游取消或者子流取消時(shí)录择,取消。

(53)groupBy[K](maxSubstreams: Int, f: Out ? K): SubFlow[Out, Mat, Repr, Closed]
此操作將輸入流解復(fù)用為單獨(dú)的輸出流碗降,每個(gè)元素鍵一個(gè)輸出流隘竭。 使用給定函數(shù)為每個(gè)元素計(jì)算鍵。 當(dāng)?shù)谝淮斡龅揭粋€(gè)新的鍵時(shí)遗锣,一個(gè)新的子流被打開货裹,并隨后所有屬于該鍵的元素輸入到該流。

從這個(gè)方法返回的對(duì)象不是一個(gè)普通的Source或Flow精偿,而是一個(gè)SubFlow弧圆。這意味著在此之后,所有的轉(zhuǎn)換都將以相同的方式應(yīng)用于所有遇到的子流笔咽。SubFlow模式通過關(guān)閉子流(即將其連接到一個(gè)Sink)或?qū)⒆恿骱喜⒃谝黄鸲顺?有關(guān)更多信息搔预,請(qǐng)參見SubFlow中的to和mergeBack方法。

需要注意的是子流也像任何其他流一樣傳播背壓叶组,這意味著阻塞一個(gè)子流將阻塞“groupBy”運(yùn)算符本身——從而阻塞所有子流——一旦所有的內(nèi)部或顯式緩沖區(qū)被填滿拯田。

如果 groupby 函數(shù) f拋出一個(gè)異常, 并且監(jiān)管策略是 akka.stream.Supervision.Stop, 則流和 substreams 將以失敗完成。

如果 groupby 函數(shù) f拋出一個(gè)異常, 并且監(jiān)管策略是 akka.stream.Supervision.Resume或者akka.stream.Supervision.Restart, 該元素被丟棄甩十,流和 substreams 將繼續(xù)運(yùn)行船庇。

函數(shù)f不可以返回null吭产。這將拋出異常并觸發(fā)監(jiān)管決策機(jī)制。

遵守ActorAttributes.SupervisionStrategy屬性鸭轮。

當(dāng)分組函數(shù)返回尚未創(chuàng)建的組的元素時(shí)發(fā)出臣淤。發(fā)出新組。

當(dāng)某個(gè)組有尚未處理的元素窃爷,而該組的子流背壓時(shí)邑蒋,背壓。

當(dāng)上游完成時(shí)按厘,完成医吊。

當(dāng)下游取消并且所有子流取消時(shí),取消逮京。

參數(shù)maxSubstreams 配置支持的最大子流數(shù)/鍵數(shù)卿堂。如果遇到更多不同的鍵, 則流將失敗。

(54)splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: Out ? Boolean): SubFlow[Out, Mat, Repr, Closed]
此操作將給定謂詞應(yīng)用于所有傳入元素, 并將它們發(fā)送到輸出流中的一個(gè)流, 如果給定謂詞返回 true, 則總是用當(dāng)前元素開始新的一個(gè)子流造虏。這意味著, 對(duì)于以下一系列謂詞值, 將產(chǎn)生三個(gè)子流, 長(zhǎng)度為1御吞、2和 3:

    false,             // 元素進(jìn)入第一個(gè)子流
    true, false,       // 元素進(jìn)入第二個(gè)子流
    true, false, false // 元素進(jìn)入第三個(gè)子流

如果流的 * 第一個(gè) * 元素與謂詞匹配, 則 splitWhen 發(fā)出的第一個(gè)流將從該元素開始。例如:

     true, false, false // 第一個(gè)流從拆分元素開始
     true, false        // 隨后的子流操作方式相同

從這個(gè)方法返回的對(duì)象不是一個(gè)普通的Source或Flow漓藕,而是一個(gè)SubFlow陶珠。這意味著在此之后,所有的轉(zhuǎn)換都將以相同的方式應(yīng)用于所有遇到的子流享钞。SubFlow模式通過關(guān)閉子流(即將其連接到一個(gè)Sink)或?qū)⒆恿骱喜⒃谝黄鸲顺?有關(guān)更多信息揍诽,請(qǐng)參見SubFlow中的to和mergeBack方法。

需要注意的是子流也像任何其他流一樣傳播背壓栗竖,這意味著阻塞一個(gè)子流將阻塞“splitWhen”運(yùn)算符本身——從而阻塞所有子流——一旦所有的內(nèi)部或顯式緩沖區(qū)被填滿暑脆。

如果拆分謂詞 p拋出一個(gè)異常, 并且監(jiān)管策略是 akka.stream.Supervision.Stop, 則流和 substreams 將以失敗完成。

如果拆分謂詞 p拋出一個(gè)異常, 并且監(jiān)管策略是 akka.stream.Supervision.Resume或者akka.stream.Supervision.Restart, 該元素被丟棄狐肢,流和 substreams 將繼續(xù)運(yùn)行添吗。

當(dāng)元素對(duì)于謂詞為true時(shí),為隨后的元素打開并發(fā)出新的子流份名。

當(dāng)某個(gè)子流有待處理的元素而之前的元素尚未完全消耗時(shí)碟联,或子流背壓時(shí),背壓僵腺。

當(dāng)上游完成時(shí)鲤孵,完成。

當(dāng)下游取消并且子流以SubstreamCancelStrategy.drain取消時(shí)辰如,或者下游取消或任何子流以SubstreamCancelStrategy.propagate取消時(shí)普监,取消。

也請(qǐng)參見FlowOps.splitAfter

(55)splitWhen(p: Out ? Boolean): SubFlow[Out, Mat, Repr, Closed]
splitWhen(SubstreamCancelStrategy.drain)(p)

(56)splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: Out ? Boolean): SubFlow[Out, Mat, Repr, Closed]
此操作將給定謂詞應(yīng)用于所有傳入元素, 并將它們發(fā)送到輸出流中的一個(gè)流, 如果給定謂詞返回 true, 則結(jié)束當(dāng)前子流凯正。這意味著, 對(duì)于以下一系列謂詞值, 將產(chǎn)生三個(gè)子流, 長(zhǎng)度為2毙玻、2和 3:

     false, true,        // 元素進(jìn)入第一個(gè)子流
     false, true,        // 元素進(jìn)入第二個(gè)子流
     false, false, true  // 元素進(jìn)入第三個(gè)子流

需要注意的是子流也像任何其他流一樣傳播背壓,這意味著阻塞一個(gè)子流將阻塞“splitAfter”運(yùn)算符本身——從而阻塞所有子流——一旦所有的內(nèi)部或顯式緩沖區(qū)被填滿漆际。

如果拆分謂詞 p拋出一個(gè)異常, 并且監(jiān)管策略是 akka.stream.Supervision.Stop, 則流和 substreams 將以失敗完成淆珊。

如果拆分謂詞 p拋出一個(gè)異常, 并且監(jiān)管策略是 akka.stream.Supervision.Resume或者akka.stream.Supervision.Restart, 該元素被丟棄夺饲,流和 substreams 將繼續(xù)運(yùn)行奸汇。

當(dāng)元素經(jīng)過時(shí)發(fā)出。當(dāng)提供的謂詞為真時(shí), 它發(fā)出元素并為后續(xù)元素打開一個(gè)新的流往声。

當(dāng)某個(gè)子流有待處理的元素而之前的元素尚未完全消耗時(shí)擂找,或子流背壓時(shí),背壓浩销。

當(dāng)上游完成時(shí)完成贯涎。

當(dāng)下游取消并且子流以SubstreamCancelStrategy.drain取消時(shí),或者下游取消或任何子流以SubstreamCancelStrategy.propagate取消時(shí)慢洋,取消塘雳。

也請(qǐng)參見FlowOps.splitWhen

(57)splitAfter(p: Out ? Boolean): SubFlow[Out, Mat, Repr, Closed]
splitAfter(SubstreamCancelStrategy.drain)(p)

(58)flatMapConcat[T, M](f: Out ? Graph[SourceShape[T], M]): Repr[T]
將每個(gè)輸入元素轉(zhuǎn)換為輸出元素的Source普筹,然后通過串聯(lián)將其平鋪為輸出流败明,從而一個(gè)接著一個(gè)的完全處理Source。

當(dāng)當(dāng)前消費(fèi)的子流有一個(gè)元素可用時(shí)發(fā)出太防。

當(dāng)下游背壓時(shí)妻顶,背壓。

當(dāng)上游完成且所有子流完成時(shí)蜒车,完成讳嘱。

當(dāng)下游取消時(shí),取消酿愧。

(59)flatMapMerge[T, M](breadth: Int, f: Out ? Graph[SourceShape[T], M]): Repr[T]
將每個(gè)輸入元素轉(zhuǎn)換為輸出元素的“Source”沥潭,然后通過合并將輸出元素展平成輸出流,其中在任何給定時(shí)間最大“廣度”子流被處理嬉挡。

當(dāng)當(dāng)前消費(fèi)的子流有一個(gè)元素可用時(shí)發(fā)出钝鸽。

當(dāng)下游背壓時(shí),背壓棘伴。

當(dāng)上游完成且所有子流完成時(shí)寞埠,完成。

當(dāng)下游取消時(shí)焊夸,取消仁连。

例如:

val async = Flow[Int].map(_ * 2).async

Source(0 to 9)
        .map(_ * 10)
        .flatMapMerge(5, i ? Source(i to (i + 9)).via(async))
        .grouped(1000)
        .runWith(Sink.head)
        .futureValue
        .sorted should ===(0 to 198 by 2)

(60)initialTimeout(timeout: FiniteDuration): Repr[Out]
如果在提供的超時(shí)之前,第一個(gè)元素還沒有經(jīng)過這個(gè)階段,則流以scala.concurrent.TimeoutException失敗饭冬。

當(dāng)上游發(fā)出元素時(shí)使鹅,發(fā)出。

當(dāng)下游背壓時(shí)昌抠,背壓患朱。

當(dāng)上游完成時(shí)完成或者在第一個(gè)元素到達(dá)前超時(shí)已過則失敗。

當(dāng)下游取消時(shí)取消炊苫。

(61)completionTimeout(timeout: FiniteDuration): Repr[Out]
當(dāng)超時(shí)已過時(shí)裁厅,流尚未完成,則流以scala.concurrent.TimeoutException失敗侨艾。

當(dāng)上游發(fā)出元素時(shí)执虹,發(fā)出。

當(dāng)下游背壓時(shí)唠梨,背壓袋励。

當(dāng)上游完成時(shí)完成或者在上游完全前超時(shí)已過則失敗。

當(dāng)下游取消時(shí)取消当叭。

(62)idleTimeout(timeout: FiniteDuration): Repr[Out]
如果兩個(gè)處理的元素之間的時(shí)間超過了提供的超時(shí)時(shí)間茬故,則流以scala.concurrent.TimeoutException失敗。定期檢查超時(shí)蚁鳖,所以檢查的分辨率是一個(gè)周期(等于超時(shí)值)磺芭。

當(dāng)上游發(fā)出元素時(shí),發(fā)出才睹。

當(dāng)下游背壓時(shí)徘跪,背壓。

當(dāng)上游完成時(shí)完成或者如果在兩個(gè)發(fā)出的元素之間超時(shí)失敗琅攘。

當(dāng)下游取消時(shí)取消垮庐。

(63)backpressureTimeout(timeout: FiniteDuration): Repr[Out]
如果一個(gè)元素的發(fā)出和下一個(gè)下游需求之間的時(shí)間超過了提供的超時(shí)時(shí)間,則流以scala.concurrent.TimeoutException失敗坞琴。定期檢查超時(shí)哨查,所以檢查的分辨率是一個(gè)周期(等于超時(shí)值)。

當(dāng)上游發(fā)出元素時(shí)剧辐,發(fā)出寒亥。

當(dāng)下游背壓時(shí),背壓荧关。

當(dāng)上游完成時(shí)完成或者如果一個(gè)元素的發(fā)出和下一個(gè)下游需求之間的時(shí)間超時(shí)失敗溉奕。

當(dāng)下游取消時(shí)取消。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末忍啤,一起剝皮案震驚了整個(gè)濱河市加勤,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖鳄梅,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件叠国,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡戴尸,警方通過查閱死者的電腦和手機(jī)粟焊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來孙蒙,“玉大人卢佣,你說我怎么就攤上這事泪电〕宀荆” “怎么了拦焚?”我有些...
    開封第一講書人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)浑测。 經(jīng)常有香客問我,道長(zhǎng)歪玲,這世上最難降的妖魔是什么迁央? 我笑而不...
    開封第一講書人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮滥崩,結(jié)果婚禮上岖圈,老公的妹妹穿的比我還像新娘。我一直安慰自己钙皮,他們只是感情好蜂科,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著短条,像睡著了一般导匣。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上茸时,一...
    開封第一講書人閱讀 51,624評(píng)論 1 305
  • 那天贡定,我揣著相機(jī)與錄音,去河邊找鬼可都。 笑死缓待,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的渠牲。 我是一名探鬼主播旋炒,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼签杈!你這毒婦竟也來了瘫镇?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎汇四,沒想到半個(gè)月后接奈,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡通孽,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年序宦,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片背苦。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡互捌,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出行剂,到底是詐尸還是另有隱情秕噪,我是刑警寧澤,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布厚宰,位于F島的核電站腌巾,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏铲觉。R本人自食惡果不足惜澈蝙,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望撵幽。 院中可真熱鬧灯荧,春花似錦、人聲如沸盐杂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽链烈。三九已至厉斟,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間测垛,已是汗流浹背捏膨。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留食侮,地道東北人号涯。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像锯七,于是被迫代替她去往敵國(guó)和親链快。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容