(1)apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed]
由Iterable創(chuàng)建Source试疙。
例如:Source(Seq(1,2,3))
這類似于從迭代器開始, 但與此流的發(fā)布者直接連接的每個訂閱者都將看到一個單獨的元素流 (總是從開始處開始), 而不管它們何時訂閱。
(2)fromIterator[T](f: () ? Iterator[T]): Source[T, NotUsed]
從一個產(chǎn)生迭代器的函數(shù)開始一個新的Source厚宰。生成的元素流將繼續(xù)猴仑,直到迭代器運行為空或在評估next()方法時失敗讯蒲。根據(jù)來自下游轉(zhuǎn)換步驟的需求,將元素從迭代器中拉出淋硝。
例如:
val iterator = Iterator.iterate(false)(!_)
//創(chuàng)建一個無限迭代器制恍,重復(fù)地將給定的函數(shù)應(yīng)用于先前的結(jié)果丑勤。
//第一個參數(shù)是初始值,第二個參數(shù)是將重復(fù)應(yīng)用的函數(shù)吧趣。
Source.fromIterator(() ? iterator)
.grouped(10)
.runWith(Sink.head)
.futureValue
結(jié)果是
immutable.Seq(false, true, false, true, false, true, false, true, false, true)
(3)cycle[T](f: () ? Iterator[T]): Source[T, NotUsed]
從給定的元素(由產(chǎn)生迭代器的函數(shù)得到)開始一個循環(huán)的Source法竞。元素的生產(chǎn)流將無限地重復(fù)由函數(shù)參數(shù)提供的元素序列。
例如:
Source.cycle(() ? List(1, 2, 3).iterator)
.grouped(9)
.runWith(Sink.head)
.futureValue
結(jié)果是
immutable.Seq(1, 2, 3, 1, 2, 3, 1, 2, 3)
(4)fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M]
由source形狀(即只有一個出口)的圖創(chuàng)建Source强挫。
例如:
val pairs = Source.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// prepare graph elements
val zip = b.add(Zip[Int, Int]())
def ints = Source.fromIterator(() => Iterator.from(1))
// connect the graph
ints.filter(_ % 2 != 0) ~> zip.in0
ints.filter(_ % 2 == 0) ~> zip.in1
// expose port
SourceShape(zip.out)
})
(5)fromFuture[T](future: Future[T]): Source[T, NotUsed]
從給定的Future創(chuàng)建Source岔霸。當(dāng)Future以成功值完成時(可能在物化Flow之前或者之后發(fā)生),流由一個元素組成俯渤。當(dāng)Future以失敗完成時呆细,流將終止并帶有一個failure。
例如:
Source.fromFuture(Future.successful("Hello Streams!"))
(6)fromCompletionStage[T](future: CompletionStage[T]): Source[T, NotUsed]
類似于Scala的Future創(chuàng)建Source八匠,此處是由Java的CompletionStage創(chuàng)建Source絮爷。
(7)fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): Source[T, Future[M]]
由給定的future source形狀的圖創(chuàng)建Source。一旦給定的Future成功完成梨树,則元素從異步source流出坑夯。如果Future失敗,則流失敗抡四。
(8)fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]]
類似于fromFutureSource
(9)tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable]
元素定期以指定的間隔發(fā)出柜蜈。
"滴答" 元素將被傳遞到請求任何元素的下游用戶。
如果使用者在生成滴答元素時沒有請求任何元素, 它以后將不會接收該滴答元素指巡。它將在請求更多元素時立即接收新的滴答元素淑履。
例如:
Source.tick(initialDelay = 2.second, interval = 1.second, "message!")
(10)single[T](element: T): Source[T, NotUsed]
由一個元素創(chuàng)建Source。
例如:Source.single("only one element")
(11)repeat[T](element: T): Source[T, NotUsed]
創(chuàng)建一個連續(xù)發(fā)送給定元素的Source藻雪。
例如:
Source.repeat(42)
.grouped(3)
.runWith(Sink.head)
.futureValue
結(jié)果是:
immutable.Seq(42,42,42)
(12)unfold[S, E](s: S)(f: S ? Option[(S, E)]): Source[E, NotUsed]
創(chuàng)建一個Source
秘噪,它會將S
類型的值展開成一對下一個狀態(tài)S
,'E`類型的輸出元素勉耀。
例如指煎,10M以下的所有斐波納契數(shù)字:
Source.unfold(0 → 1) {
case (a, _) if a > 10000000 ? None
case (a, b) ? Some((b → (a + b)) → a)
}
(13)unfoldAsync[S, E](s: S)(f: S ? Future[Option[(S, E)]]): Source[E, NotUsed]
與unfold相同,但是使用一個異步函數(shù)來產(chǎn)生下一個狀態(tài)元素元組瑰排。
Source.unfoldAsync(0 → 1) {
case (a, _) if a > 10000000 ? Future.successful(None)
case (a, b) ? Future{
Thread.sleep(1000)
Some((b → (a + b)) → a)
}
}
(14)empty[T]: Source[T, NotUsed]
創(chuàng)建一個沒有元素的Source贯要,即為每個連接的Sink立即完成的空流暖侨。
例如:Source.empty
(15)maybe[T]: Source[T, Promise[Option[T]]]
創(chuàng)建一個Source
椭住,它物化為一個scala.concurrent.Promise
,它控制什么元素從Source發(fā)出字逗。
如果物化的promise以Some完成京郑,那么該值將在下游生成宅广,然后是完成。
如果物化的promise以None完成些举,那么下游不會產(chǎn)生值跟狱,并立即發(fā)出完成信號。
如果物化的promise以failure完成户魏,那么返回的source將以那個錯誤終止驶臊。
如果在promise完成前,source的下游取消叼丑,那么promise將以None完成关翎。
(16)failed[T](cause: Throwable): Source[T, NotUsed]
創(chuàng)建一個Source,它立刻終止流鸠信,并將錯誤cause
給每一個連接的Sink纵寝。
例如:
val ex = new Exception("buh")
Source.failed(ex)
.flatMapMerge(1, identity)
.runWith(Sink.head)
.futureValue
(17)lazily[T, M](create: () ? Source[T, M]): Source[T, Future[M]]
創(chuàng)建一個Source,直到下游有需求才物化星立,當(dāng)source物化時爽茴,物化的future將以其值完成,如果下游取消或失敗沒有任何需求绰垂,則不會調(diào)用創(chuàng)建工廠室奏,物化的Future是失敗。
(18)asSubscriber[T]: Source[T, Subscriber[T]]
創(chuàng)建一個Source劲装,其物化為一個org.reactivestreams.Subscriber
(19)actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef]
如何先定義流窍奋,而后給流傳遞數(shù)據(jù)呢?答案就是Source.actorRef酱畅。說明:Source.actorRef沒有背壓策略琳袄。
創(chuàng)建一個Source,其物化為一個akka.actor.ActorRef
纺酸。
如果下游有需求, 發(fā)送到該actor的消息將被發(fā)送到流中, 否則它們將被緩沖, 直到收到需求請求為止窖逗。
根據(jù)定義的akka.stream.OverflowStrategy
,如果緩沖區(qū)中沒有可用空間, 則可能會丟棄元素餐蔬。
策略akka.stream.OverflowStrategy.backpressure
不受支持, 如果將其作為參數(shù)傳遞, 則會拋出異常 llegalArgument ( "Backpressure overflowStrategy not supported")碎紊。
可以使用0的bufferSize
禁用緩沖區(qū), 如果下游沒有需求, 則會丟棄接收到的消息。當(dāng) bufferSize
是 0, overflowStrategy
并不重要樊诺。在此源之后添加一個異步邊界;因此, 假定下游總是會產(chǎn)生需求是絕不會安全的仗考。
通過將akka.actor.Status.Failure
發(fā)送到actor引用, 從而失敗來完成流。在標(biāo)示完成前词爬,以防actor仍消耗其內(nèi)部緩沖區(qū)(在收到一個akka.actor.Status.Success
之后)秃嗜,它收到一個akka.actor.Status.Failure
,故障將為 立即向下游發(fā)信號(而不是完成信號)。
當(dāng)流完成锅锨、失敗或從下游取消時, actor將被停止, 即您可以在發(fā)生此情況時觀察它以獲得通知叽赊。
val stringSourceinFuture=Source.actorRef[String](100,OverflowStrategy.fail) // 緩存最大為100,超出的話必搞,將以失敗告終
val hahaStrSource=stringSourceinFuture.filter(str=>str.startsWith("haha")) //source數(shù)據(jù)流中把不是以"haha"開頭的字符串過濾掉
val actor=hahaStrSource.to(Sink.foreach(println)).run()
actor!"asdsadasd"
actor!"hahaasd"
actor!Success("ok")// 數(shù)據(jù)流成功完成并關(guān)閉
(20)combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ? Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed]
像Merge
或Concat
按照扇入策略將多個Source合并必指,返回一個Source。
例如:
val sourceOne = Source(List(1))
val sourceTwo = Source(List(2))
val merged = Source.combine(sourceOne, sourceTwo)(Merge(_))
val mergedResult: Future[Int] = merged.runWith(Sink.fold(0)(_ + _))
(21)zipN[T](sources: immutable.Seq[Source[T, _]]): Source[immutable.Seq[T], NotUsed]
將多個流的元素合并到一個序列流中恕洲。
val sources = immutable.Seq(
Source(List(1, 2, 3)),
Source(List(10, 20, 30)),
Source(List(100, 200, 300)))
Source.zipN(sources)
.runWith(Sink.seq)
.futureValue
結(jié)果是:
immutable.Seq(
immutable.Seq(1, 10, 100),
immutable.Seq(2, 20, 200),
immutable.Seq(3, 30, 300))
(22)zipWithN[T, O](zipper: immutable.Seq[T] ? O)(sources: immutable.Seq[Source[T, _]]): Source[O, NotUsed]
使用組合函數(shù)將多個流的元素合并到一個序列流中塔橡。
val sources = immutable.Seq(
Source(List(1, 2, 3)),
Source(List(10, 20, 30)),
Source(List(100, 200, 300)))
Source.zipWithN[Int, Int](_.sum)(sources)
.runWith(Sink.seq)
.futureValue
結(jié)果是:
immutable.Seq(111, 222, 333)
(23)queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]]
創(chuàng)建一個Source,它物化為````akka.stream.scaladsl.SourceQueue```霜第。
您可以將元素推送到隊列中, 如果下游有需求, 則它們將被發(fā)送到流中, 否則它們將被緩沖, 直到收到需求請求為止谱邪。如果下游終止, 緩沖區(qū)中的元素將被丟棄。
根據(jù)定義的akka.stream.OverflowStrategy
庶诡,如果緩沖區(qū)中沒有可用空間, 則可能會丟棄元素惦银。
確認(rèn)機(jī)制可用。
akka.stream.scaladsl.SourceQueue.offer
返回Future [QueueOfferResult]
末誓,如果元素被添加到緩沖區(qū)或發(fā)送到下游扯俱,則它將以QueueOfferResult.Enqueued
完成。 如果元素被丟棄喇澡,它將以QueueOfferResult.Dropped
完成迅栅。當(dāng)流失敗時,以QueueOfferResult.Failure
完成 或者下游完成時晴玖,以
QueueOfferResult.QueueClosed
完成读存。
當(dāng)緩沖區(qū)已滿時,策略akka.stream.OverflowStrategy.backpressure
不會完成最后offer():Future
調(diào)用呕屎。
可以使用akka.stream.scaladsl.SourceQueue.watchCompletion
查看流的可訪問性让簿。當(dāng)流完成時,它返回一個以成功完成的future或者當(dāng)流失敗時秀睛,它返回一個以失敗完成的future尔当。
可以通過設(shè)置bufferSize
為0關(guān)閉緩沖區(qū),然后接收到的消息將等待下游的需求蹂安,如果有另一個消息等待下游需求椭迎,這種情況下結(jié)果將根據(jù)溢出策略完成。
(24)unfoldResource[T, S](create: () ? S, read: (S) ? Option[T], close: (S) ? Unit): Source[T, NotUsed]
從某個可以打開田盈、讀取畜号、關(guān)閉的資源,創(chuàng)建一個Source允瞧。
以阻塞的方式與資源交互简软。
可以使用監(jiān)管策略來處理read
函數(shù)的異常蛮拔。所有由create
或close
拋出的異常,都將使流失敗替饿。
Restart
監(jiān)管策略將關(guān)閉并再次創(chuàng)建阻塞IO语泽。默認(rèn)策略是Stop
贸典,意味著在read
函數(shù)出現(xiàn)錯誤流將終止视卢。
通過變更akka.stream.blocking-io-dispatcher
或者為提供的Source使用ActorAttributes
設(shè)置,來配置默認(rèn)的調(diào)度器廊驼。
遵守ActorAttributes. SupervisionStrategy
屬性据过。
(25)unfoldResourceAsync[T, S](create: () ? Future[S], read: (S) ? Future[Option[T]], close: (S) ? Future[Done]): Source[T, NotUsed]
類似于unfoldResource
,但是使用返回Futures
而不是純值的函數(shù)妒挎。