Akka之Source相關(guān)API總結(jié)

(1)apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed]
由Iterable創(chuàng)建Source试疙。
例如:Source(Seq(1,2,3))
這類似于從迭代器開始, 但與此流的發(fā)布者直接連接的每個訂閱者都將看到一個單獨的元素流 (總是從開始處開始), 而不管它們何時訂閱。

(2)fromIterator[T](f: () ? Iterator[T]): Source[T, NotUsed]
從一個產(chǎn)生迭代器的函數(shù)開始一個新的Source厚宰。生成的元素流將繼續(xù)猴仑,直到迭代器運行為空或在評估next()方法時失敗讯蒲。根據(jù)來自下游轉(zhuǎn)換步驟的需求,將元素從迭代器中拉出淋硝。
例如:

val iterator = Iterator.iterate(false)(!_)
//創(chuàng)建一個無限迭代器制恍,重復(fù)地將給定的函數(shù)應(yīng)用于先前的結(jié)果丑勤。
//第一個參數(shù)是初始值,第二個參數(shù)是將重復(fù)應(yīng)用的函數(shù)吧趣。
Source.fromIterator(() ? iterator)
        .grouped(10)
        .runWith(Sink.head)
        .futureValue

結(jié)果是
immutable.Seq(false, true, false, true, false, true, false, true, false, true)

(3)cycle[T](f: () ? Iterator[T]): Source[T, NotUsed]
從給定的元素(由產(chǎn)生迭代器的函數(shù)得到)開始一個循環(huán)的Source法竞。元素的生產(chǎn)流將無限地重復(fù)由函數(shù)參數(shù)提供的元素序列。
例如:

Source.cycle(() ? List(1, 2, 3).iterator)
         .grouped(9)
         .runWith(Sink.head)
         .futureValue

結(jié)果是
immutable.Seq(1, 2, 3, 1, 2, 3, 1, 2, 3)

(4)fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M]
由source形狀(即只有一個出口)的圖創(chuàng)建Source强挫。
例如:

val pairs = Source.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._
 
  // prepare graph elements
  val zip = b.add(Zip[Int, Int]())
  def ints = Source.fromIterator(() => Iterator.from(1))
 
  // connect the graph
  ints.filter(_ % 2 != 0) ~> zip.in0
  ints.filter(_ % 2 == 0) ~> zip.in1
 
  // expose port
  SourceShape(zip.out)
})

(5)fromFuture[T](future: Future[T]): Source[T, NotUsed]
從給定的Future創(chuàng)建Source岔霸。當(dāng)Future以成功值完成時(可能在物化Flow之前或者之后發(fā)生),流由一個元素組成俯渤。當(dāng)Future以失敗完成時呆细,流將終止并帶有一個failure。
例如:
Source.fromFuture(Future.successful("Hello Streams!"))

(6)fromCompletionStage[T](future: CompletionStage[T]): Source[T, NotUsed]
類似于Scala的Future創(chuàng)建Source八匠,此處是由Java的CompletionStage創(chuàng)建Source絮爷。

(7)fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): Source[T, Future[M]]
由給定的future source形狀的圖創(chuàng)建Source。一旦給定的Future成功完成梨树,則元素從異步source流出坑夯。如果Future失敗,則流失敗抡四。

(8)fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]]
類似于fromFutureSource

(9)tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable]
元素定期以指定的間隔發(fā)出柜蜈。
"滴答" 元素將被傳遞到請求任何元素的下游用戶。
如果使用者在生成滴答元素時沒有請求任何元素, 它以后將不會接收該滴答元素指巡。它將在請求更多元素時立即接收新的滴答元素淑履。
例如:
Source.tick(initialDelay = 2.second, interval = 1.second, "message!")

(10)single[T](element: T): Source[T, NotUsed]
由一個元素創(chuàng)建Source。
例如:Source.single("only one element")

(11)repeat[T](element: T): Source[T, NotUsed]
創(chuàng)建一個連續(xù)發(fā)送給定元素的Source藻雪。
例如:

Source.repeat(42)
        .grouped(3)
        .runWith(Sink.head) 
        .futureValue

結(jié)果是:
immutable.Seq(42,42,42)

(12)unfold[S, E](s: S)(f: S ? Option[(S, E)]): Source[E, NotUsed]
創(chuàng)建一個Source秘噪,它會將S類型的值展開成一對下一個狀態(tài)S,'E`類型的輸出元素勉耀。
例如指煎,10M以下的所有斐波納契數(shù)字:

Source.unfold(0 → 1) {
    case (a, _) if a > 10000000 ? None
    case (a, b) ? Some((b → (a + b)) → a)
 }

(13)unfoldAsync[S, E](s: S)(f: S ? Future[Option[(S, E)]]): Source[E, NotUsed]
與unfold相同,但是使用一個異步函數(shù)來產(chǎn)生下一個狀態(tài)元素元組瑰排。

Source.unfoldAsync(0 → 1) {
     case (a, _) if a > 10000000 ? Future.successful(None)
     case (a, b) ? Future{
       Thread.sleep(1000)
       Some((b → (a + b)) → a)
     }
 }

(14)empty[T]: Source[T, NotUsed]
創(chuàng)建一個沒有元素的Source贯要,即為每個連接的Sink立即完成的空流暖侨。
例如:Source.empty

(15)maybe[T]: Source[T, Promise[Option[T]]]
創(chuàng)建一個Source椭住,它物化為一個scala.concurrent.Promise,它控制什么元素從Source發(fā)出字逗。

如果物化的promise以Some完成京郑,那么該值將在下游生成宅广,然后是完成。
如果物化的promise以None完成些举,那么下游不會產(chǎn)生值跟狱,并立即發(fā)出完成信號。
如果物化的promise以failure完成户魏,那么返回的source將以那個錯誤終止驶臊。
如果在promise完成前,source的下游取消叼丑,那么promise將以None完成关翎。

(16)failed[T](cause: Throwable): Source[T, NotUsed]
創(chuàng)建一個Source,它立刻終止流鸠信,并將錯誤cause給每一個連接的Sink纵寝。
例如:

val ex = new Exception("buh")
Source.failed(ex)
     .flatMapMerge(1, identity)
     .runWith(Sink.head)
      .futureValue

(17)lazily[T, M](create: () ? Source[T, M]): Source[T, Future[M]]
創(chuàng)建一個Source,直到下游有需求才物化星立,當(dāng)source物化時爽茴,物化的future將以其值完成,如果下游取消或失敗沒有任何需求绰垂,則不會調(diào)用創(chuàng)建工廠室奏,物化的Future是失敗。

(18)asSubscriber[T]: Source[T, Subscriber[T]]
創(chuàng)建一個Source劲装,其物化為一個org.reactivestreams.Subscriber

(19)actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef]
如何先定義流窍奋,而后給流傳遞數(shù)據(jù)呢?答案就是Source.actorRef酱畅。說明:Source.actorRef沒有背壓策略琳袄。
創(chuàng)建一個Source,其物化為一個akka.actor.ActorRef纺酸。

如果下游有需求, 發(fā)送到該actor的消息將被發(fā)送到流中, 否則它們將被緩沖, 直到收到需求請求為止窖逗。

根據(jù)定義的akka.stream.OverflowStrategy,如果緩沖區(qū)中沒有可用空間, 則可能會丟棄元素餐蔬。

策略akka.stream.OverflowStrategy.backpressure不受支持, 如果將其作為參數(shù)傳遞, 則會拋出異常 llegalArgument ( "Backpressure overflowStrategy not supported")碎紊。

可以使用0的bufferSize禁用緩沖區(qū), 如果下游沒有需求, 則會丟棄接收到的消息。當(dāng) bufferSize是 0, overflowStrategy并不重要樊诺。在此源之后添加一個異步邊界;因此, 假定下游總是會產(chǎn)生需求是絕不會安全的仗考。

通過將akka.actor.Status.Failure發(fā)送到actor引用, 從而失敗來完成流。在標(biāo)示完成前词爬,以防actor仍消耗其內(nèi)部緩沖區(qū)(在收到一個akka.actor.Status.Success之后)秃嗜,它收到一個akka.actor.Status.Failure,故障將為 立即向下游發(fā)信號(而不是完成信號)。

當(dāng)流完成锅锨、失敗或從下游取消時, actor將被停止, 即您可以在發(fā)生此情況時觀察它以獲得通知叽赊。

val stringSourceinFuture=Source.actorRef[String](100,OverflowStrategy.fail) // 緩存最大為100,超出的話必搞,將以失敗告終
  val hahaStrSource=stringSourceinFuture.filter(str=>str.startsWith("haha")) //source數(shù)據(jù)流中把不是以"haha"開頭的字符串過濾掉
  val actor=hahaStrSource.to(Sink.foreach(println)).run()
  actor!"asdsadasd"
  actor!"hahaasd"
  actor!Success("ok")// 數(shù)據(jù)流成功完成并關(guān)閉

(20)combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ? Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed]
MergeConcat按照扇入策略將多個Source合并必指,返回一個Source。
例如:

val sourceOne = Source(List(1))
val sourceTwo = Source(List(2))
val merged = Source.combine(sourceOne, sourceTwo)(Merge(_))

val mergedResult: Future[Int] = merged.runWith(Sink.fold(0)(_ + _))

(21)zipN[T](sources: immutable.Seq[Source[T, _]]): Source[immutable.Seq[T], NotUsed]
將多個流的元素合并到一個序列流中恕洲。

val sources = immutable.Seq(
        Source(List(1, 2, 3)),
        Source(List(10, 20, 30)),
        Source(List(100, 200, 300)))

Source.zipN(sources)
        .runWith(Sink.seq)
        .futureValue

結(jié)果是:

immutable.Seq(
          immutable.Seq(1, 10, 100),
          immutable.Seq(2, 20, 200),
          immutable.Seq(3, 30, 300))

(22)zipWithN[T, O](zipper: immutable.Seq[T] ? O)(sources: immutable.Seq[Source[T, _]]): Source[O, NotUsed]
使用組合函數(shù)將多個流的元素合并到一個序列流中塔橡。

val sources = immutable.Seq(
        Source(List(1, 2, 3)),
        Source(List(10, 20, 30)),
        Source(List(100, 200, 300)))

Source.zipWithN[Int, Int](_.sum)(sources)
        .runWith(Sink.seq)
        .futureValue

結(jié)果是:

immutable.Seq(111, 222, 333)

(23)queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]]
創(chuàng)建一個Source,它物化為````akka.stream.scaladsl.SourceQueue```霜第。

您可以將元素推送到隊列中, 如果下游有需求, 則它們將被發(fā)送到流中, 否則它們將被緩沖, 直到收到需求請求為止谱邪。如果下游終止, 緩沖區(qū)中的元素將被丟棄。

根據(jù)定義的akka.stream.OverflowStrategy庶诡,如果緩沖區(qū)中沒有可用空間, 則可能會丟棄元素惦银。

確認(rèn)機(jī)制可用。
akka.stream.scaladsl.SourceQueue.offer返回Future [QueueOfferResult]末誓,如果元素被添加到緩沖區(qū)或發(fā)送到下游扯俱,則它將以QueueOfferResult.Enqueued完成。 如果元素被丟棄喇澡,它將以QueueOfferResult.Dropped完成迅栅。當(dāng)流失敗時,以QueueOfferResult.Failure完成 或者下游完成時晴玖,以
QueueOfferResult.QueueClosed完成读存。

當(dāng)緩沖區(qū)已滿時,策略akka.stream.OverflowStrategy.backpressure不會完成最后offer():Future調(diào)用呕屎。

可以使用akka.stream.scaladsl.SourceQueue.watchCompletion查看流的可訪問性让簿。當(dāng)流完成時,它返回一個以成功完成的future或者當(dāng)流失敗時秀睛,它返回一個以失敗完成的future尔当。

可以通過設(shè)置bufferSize為0關(guān)閉緩沖區(qū),然后接收到的消息將等待下游的需求蹂安,如果有另一個消息等待下游需求椭迎,這種情況下結(jié)果將根據(jù)溢出策略完成。

(24)unfoldResource[T, S](create: () ? S, read: (S) ? Option[T], close: (S) ? Unit): Source[T, NotUsed]
從某個可以打開田盈、讀取畜号、關(guān)閉的資源,創(chuàng)建一個Source允瞧。
以阻塞的方式與資源交互简软。

可以使用監(jiān)管策略來處理read函數(shù)的異常蛮拔。所有由createclose拋出的異常,都將使流失敗替饿。

Restart監(jiān)管策略將關(guān)閉并再次創(chuàng)建阻塞IO语泽。默認(rèn)策略是Stop贸典,意味著在read函數(shù)出現(xiàn)錯誤流將終止视卢。

通過變更akka.stream.blocking-io-dispatcher或者為提供的Source使用ActorAttributes設(shè)置,來配置默認(rèn)的調(diào)度器廊驼。

遵守ActorAttributes. SupervisionStrategy屬性据过。

(25)unfoldResourceAsync[T, S](create: () ? Future[S], read: (S) ? Future[Option[T]], close: (S) ? Future[Done]): Source[T, NotUsed]
類似于unfoldResource,但是使用返回Futures而不是純值的函數(shù)妒挎。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末绳锅,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子酝掩,更是在濱河造成了極大的恐慌鳞芙,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件期虾,死亡現(xiàn)場離奇詭異原朝,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)镶苞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進(jìn)店門喳坠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人茂蚓,你說我怎么就攤上這事壕鹉。” “怎么了聋涨?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵晾浴,是天一觀的道長。 經(jīng)常有香客問我牍白,道長怠肋,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任淹朋,我火速辦了婚禮笙各,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘础芍。我一直安慰自己杈抢,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布仑性。 她就那樣靜靜地躺著惶楼,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上歼捐,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天何陆,我揣著相機(jī)與錄音,去河邊找鬼豹储。 笑死贷盲,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的剥扣。 我是一名探鬼主播巩剖,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼钠怯!你這毒婦竟也來了佳魔?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤晦炊,失蹤者是張志新(化名)和其女友劉穎鞠鲜,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體断国,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡贤姆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了并思。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片庐氮。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖宋彼,靈堂內(nèi)的尸體忽然破棺而出弄砍,到底是詐尸還是另有隱情,我是刑警寧澤输涕,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布音婶,位于F島的核電站,受9級特大地震影響莱坎,放射性物質(zhì)發(fā)生泄漏衣式。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一檐什、第九天 我趴在偏房一處隱蔽的房頂上張望碴卧。 院中可真熱鬧,春花似錦乃正、人聲如沸住册。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽荧飞。三九已至凡人,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間叹阔,已是汗流浹背挠轴。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留耳幢,地道東北人岸晦。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像帅掘,于是被迫代替她去往敵國和親委煤。 傳聞我的和親對象是個殘疾皇子堂油,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容