Dart - Stream

Core Stream types

There are two kinds of streams: "Single-subscription" streams and "broadcast" streams.

Single-subscription

  1. a single-subscription stream allows only a single listener during the whole lifetime of the stream.
  2. it does'n start generating events until it has a listener,and it stops sending events when the listener is unsubscribed, even if the source of events could still provider more.
  3. Listening twice on a single-subscripiton stream is not allowed, even after the first subscription has been canceled.
  4. single -subscription stream are generally used for streaming chunks of larger contiguous data like file I/o.

broadcast stream allows any number of listeners,and it fires its events when they are ready, whether there are listeners or not.

  1. broadcast streams are used for independent events/observers.
  2. if several listeners want to listen to a single subscription stream, use [asBroadStream] to create a boradcart stream on top of the non-broadcast stream.

creates a new single-subscription stream from the future.

when the future completes, the stream will fire one event, either data or error, and then close with a done-event.

factory Stream.fromFuture(Future<T> future){}

create a single-subscription stream from a group of futures.

  1. the stream reports the results for the futures on the stream in the order in which the futures complete.
    2.each future provides either a data event or an error event,depending on how the future completes.
  2. if some futures have already completed when "Stream.fromfutures" is called,their results will be emitted in some unspecified order.
  3. when all futures have completed, the stream is closed.
  4. if [futures] is empty ,the stream closes as soon as possible.
factory Stream.fromFutures(Iterate<Future<T>> futures) {}

creates a single-subscription stream that gets its data from [elements].

  1. the utterable is iterated when the stream receives a listener ,and stops iterating if the listener cancels the subscription ,or if the [Iterator.moveNext] methods returns "false" or throws.Iterations is suspended while the stream subscription is paused.
  2. if calling [iterator.moveNext] on "elements.iterator" throws, the stream emits that error and then it closes.
  3. if reading [Iterator.current] on "elements.iterator" throws, the stream emits that error, but keeps iterating.
factory Stream.fromIterable(Iterable<T> elements){}

creates a stream that repeatedly emits events at [perid] intervals.

the event values are computed by invoking [computation] .the argument to this callback is an integer in an integer that starts with 0 and Is incremented for event event.

  1. the [period] must a non-negative [Duraton]
  2. if [computation] is omitted the event values will all the "null"
  3. the [computation] must not be omitted if the event type [T] does not allow "null" as a value.

creates a stream where all events of an existing stream are piped through a sink-transformation.

factory Stream.eventTransformed(Stream<dynamic> source,EventSink<dynamic> mapSink(EventSink<T> sink) {
 return new _BoundSinkStream(source,mapSink);
}

adapts [source] to be a "stream<T>".

this allows [source] to be used at the new type, but at run-time is must satisfy the requirements of the both the new type and its original type.
data events created bu the source stream must also be instances of [T]

static Stream<T> castFrom<S,T>(Stream<S> source) => new CastStream<S,T>(source);

returns a multi-subscription stream that produces the same events as this .

the returned stream will subscribe to this stream when it's first subscriber is added, and will subscribed until this stream ends, or a callback cancels the subscription.

adds a subscription to this stream.

SrreamSubscription<T> listen(void onData(T event)?,{Function? onError,void onDone()?,bool cancelOnError});

creates a new stream from this stream that discards some elements.

  1. the new stream sends the same error and done events as this stream, but it only sends the data events that satisfy the [test].
  2. if the [test] function throws, the data event is dropped and the error is emitted on the returned stream instead.
  3. the returned stream is a broadcast stream if this stream is, .
  4. if a broadcast stream is listened to more than once, each subscription will individually perform the "test"
Stream<T> where(bool test(T event)) {
return new _whereStream<T>(this, test);

Transforms each element of this stream into a new stream event.

  1. creates a new stream that converts each element of this stream to a new value using the [convert] function ,and emits the result.
  2. if [convert] throws, the returned stream reports it as an error event instead.
  3. error and down events are passed through unchanged to the returned stream.
  4. the returned stream is a broadcast stream if this stream is.
Stream<S> map<S>(S convert(T event)) {
 return new _MapStream<T,S>(this,convert);
}

Creates a new stream with each data event of this stream asynchronously mapped to a new event.

Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) {
}

Transforms each element into a sequence of asynchronous events.

returns a new stream and for each event of this streamed the following:

  1. If the event is an error event or a done event, it is emitted directly by the returned stream.
    2.otherwise it is an element, then the [convert] function is called with the element as argument to produce a convert-stream for the element.
  2. if that call throws, the error is emitted on the returned stream.
  3. if the call returns "null",no further action is taken for the elements, otherwise,this stream is paused and convert-stream is listended to .
  4. every data and error event of the convert-stream is emitted on the returned stream in the order it is produced.
  5. when the convert - stream ends, this stream is resumed.
  6. the returned stream is a broadcast stream if the stream is
Stream<E> asyncExpand<E>(Stream<E>? convert(T event)){
}

Creates a wrapper Stream that intercepts some errors from this stream.

  1. the returned stream is a broadcast stream if this stream is .
  2. if a broadcast stream is listened to more than once, each subscription will individually perform the test and handle the error
Stream <T> handleError(Function onError,{bool test(error)?}) {}

Transforms each element of this stream into a sequence of elements.

  1. returns a new stream where each element of this stream is replaced by zero or more data events.
  2. error events and the done event of this stream are forwarded directly to the returned stream.
  3. the returned stream is a broadcast stream if this stream is.
  4. if a boradcast stream is listended to more than once, each subscription will individually call convert and expand the events.
Stream<S> expand<S>(Iterable<S> convert(T element)) {
 return new _ExpandStream<T,S>(this,convert);
}

Pipes the events of this stream into [streamConsumer].

  1. all events of this stream are added to "streamConsumer" using [StreamConsumer.addStream].
  2. the "streamConsumer" is closed when this stream has been successfully added to it - when the future returned by "addStream" completes without an error.
  3. returns a future which completes when this stream has been consumed and thee consumer has been closed.
  4. the returned future completes with the same result as the future returned by [StreamConsume.close].
  5. if the call to [streamConsumer.addStream] fails in some way, this method fails in the same way.
Future pipe(StreamConsumer<T> streamConsumer) {
return streamConsumer.addStream(this).then((_)=>streamConsumer.close());
}

Applies [streamTransformer] to this stream.

This method should always be used for transformations which treat
the entire stream as representing a single value
which has perhaps been split into several parts for transport,
like a file being read from disk or being fetched over a network.
The transformation will then produce a new stream which
transforms the stream's value incrementally (perhaps using
[Converter.startChunkedConversion]). The resulting stream
may again be chunks of the result, but does not have to
correspond to specific events from the source string.
分包流加載

Stream<S> transform<S>(StreamTransformer<T,S> streamTransformer) {
return streamTransformer.bind(this);
}

Combines a sequence of values by repeatedly applying [combine].

Similar to [Iterable.reduce], this function maintains a value,

  1. the value is updated to the result of calling [combine] with the previous value and the element.
  2. when this stream is done, the returned future is completed with the value at that time.
  3. if this stream is empty ,the returned future is completed with an error.
  4. if this emits an error, or the call to [combine] throws, the returned future is completed with that error, and processing is stopped.
Future<T> reduce( T combine(T previous,T element)) {
}

Combines a sequence of values by repeatedly applying [combine].

Similar to [Iterable.fold], this function maintains a value,starting with [initialValue] and updated for each element of this stream.

  1. for each element, the value is updated to the result of calling [combine] with the previous value and the element
  2. when this stream is done, the returned future is completed with the value at that time.
  3. for any empty stream,the future is completed with [inititalValue].
  4. if this stream emits an error, or the call to [combine] throws, the returned future is completed with that error. and processing is stopped.
Future<S> fold<S>(S initialValue,S combine(S previous,T element)) {
}

combines the string representation of elements into a single string.

1.each element is converted to a string using its [Object.toString] method.

  1. if [separator] is provided, it is inserted between element string representations.
  2. the returned future is completed with the combined string when this stream is done.
  3. if this stream emits an error, or the call to [Object.toString] throws, the returned future is completed with that error. and processing stops.
Future<String> join([String separator = ""]) {}

Returns whether [needle] occurs in the elements provided by this stream,

  1. compares each element of this stream to [needle] using [Object.==]. if an equal element is found., the returned future is completed with "true" . if this stream ends without finding a match ,the future is completed with "false".
  2. if this stream emits an error, or the call to [Object.==] throws,the returned future is completed with that error and processing stops.
Future<bool> contains(Object? needle){}

Executes [action] on each element of this stream.

  1. completes the returned [Future] when all elements of this stream have been processed.
  2. if this stream emits an error, or if the call to [action] throws, the returned future completes with that error.and processing stops.
Future forEach(void action(T element)){}

checks whether [test] accepts all elements provided by this stream.

  1. calls [test] on each element of this stream.
  2. if the call returns "false",the returned future is completed with "false" and processing stops.
  3. if this stream ends without finding an element that [test] rejects, the returned future is completed with "true".
  4. if this stream emits an errors , or if the call to [test] throws, the returned future is completed with that error, and processing stops.
Future<bool> every(bool test(T element)){}

checks whether [test] accepts any element provided by this stream.

  1. calls [test] on each element of this stream, if the call returns "true",the returned future is completed with "true" and processing stops.
  2. if this stream ends without finding an element that [test] accepts, the returned future is completed with "false".
  3. if this stream emits an error, or if the call to [test] throws, the returned future is completed with that error, and processing stops.
Future<bool> any( bool test(T element)) {}

the number of elements in this stream

Future<int> get length {}

whether this stream contains any elements.

Future<bool> get isEmpty{}

adapt this stream to be a "Stream<R>"

this stream is wrapped as a "Stream<R> " which checks at run-time that each data event emitted by this stream is also an instance of [R].

Stream<R> cast<R>() => Stream.castFrom<T,R>(this);

collects all elements of this stream in a [List].

  1. creates a "List<T>" and adds all elements of this stream to the list in the order they arrive.
  2. when this stream ends, the returned future is completed with that list.
  3. if this stream emits an error, the returned future is completed with that error. and processing stops.
Future<List<T>> toList() {}

collects the. data of this stream in a [Set].

  1. create a "Set<T>" and adds all. elements of this stream to the set. in the order they arrive.
  2. when this stream ends, the returned future is completed with that set.
  3. when this stream ends,the returned future is completed with that set.
    4.the returned set is the same type as created by "<T>{}",if another type of set is needed, either user [forEach] to add each element to the set, or use "toList().then((list) => new SomeOtherSet.from(list))" to create the set.
  4. if this stream emits an error, the returned future is completed with that error. and processing stops.
Future<Set<T>> toSet(){}

discards(丟棄)all data on this stream, but signals when it is done or an error occurred.

  1. when subscribing using [drain],cancelOnError will be true.this means that the future will complete with the first error on this stream and then cancel the subscription.
  2. if this stream emits an error, the returned future is completed with that error, and processing is stoped.
  3. in case of a "done" event the future completes with the given [futureValue].
  4. the [FutureValue] must not be omitted if "null" is not assignable to [E].
Future<E> drain<E>(([E? futureValue]){}

provides at most the first [count] data events of this stream.

  1. if this is a single-subscription (non-broaccast)streams it cannot be resumed after the returned stream has been listened to.
  2. if this is a broadcast stream,the returned stream is a boradcast stream.
  3. in that case, the events are only counted from the time the returned stream is listended to .
Stream<T> take(int count) {
return new _TakeStream<T>(this, count);
}

forwards data events while [test] is successful.

  1. returns a stream that provides the same events as this stream until [test] fails for a data event.
  2. the returned stream is done when either the stream is done, or when this stream first emits a data event that fails [test].
  3. the "test" call is considered failing if It returns a non-"true" value or if it throws. if the "test" call throws, the error is emitted as the last event on the returned streams.
  4. stops listening to this stream after the accepted elements.
  5. internally the method cancels its subscription after these elements.this means that single-subscription(non-broadcast)streams are closed and cannot be reused after a call to this method.
  6. the returned stream is a broadcast stream if this stream is .
  7. for a broadcast stream,the events are only tested from the time the returned stream is listened to .
Stream<T> takeWhile(bool test(T element)) {
return new _TakeWhileStream<T>(this, test);
}

Skips the first [count] data events from this stream

Stream<T> skip(int count) {
return new _SkipStream<T>(this, count);
}

skip data events from this stream while they are matched by [test].

  1. if it returns a non-"true" value or if the call to "test" throws
  2. if the call throws, the error is emitted as an error event on the returned stream instead of the data event, otherwise the event that made "test" return non-true is emitted as the first data event.
  3. error and done events are provided by the returned stream unmodified.
  4. the returned stream is a broadcast stream if this stream is
  5. for a broadcast stream ,the events are only tested from the time the returned stream is listened to.
Stream<T>skipWhile(bool test(T element)){
 return new _SKipWhileStream<T>(this, test);
}

skips data events if they are equal to the previous data event.(跳過連續(xù)相同的值)

  1. the returned stream provides the same events as this stream,except that it never provides two consecutive(連續(xù))data events that are equal, that is ,error are passed through to the returned stream,and data events are passed through if they are distinct from the more recently emittable data event.
  2. equality is determined by the provided [equals] method. if that is omitted, the "==" operator on the last provided data element is used.
  3. if [equals] throws, the data event is repeated by an error event containing the thrown error, the behavior is equivalent to the original stream emitting the error event, and it doesn't change the what the most recently emitted data event is.
  4. the returned stream is a broadcast stream if this stream is .
  5. if a broadcast stream is listened to more than once, each subscription will individually perform the equals test.
Stream<T> distinct([bool equals(T previous,T next)?]) {
 return new _DistinctStream<T>(this, equals);
}

the first element of this stream

Future<T> get first {}

the last element of this stream

if this stream is empty (the done event is the first event),the returned future completes with an error.

Future<T> get last{}

the single element of this stream

  1. if this stream emits an error event, the returned future is completed with that error and processing stops.
  2. if this is empty or has more than one element,the returned future completes with an error.
Future<T> get single{}

Finds the first element of this stream matching [test].

  1. return a future that is completed with the first element of this stream that [test] returns "true".
  2. if no such element is found before this stream is done ,and a [orElse] function is provided. if [orElse] throws,the returned future is completed with that error.
Future<T> firstWhere(bool test(T element),{T orElse()?} ) {}

Finds the last element in this stream matching [test].

Future<T> last where(bool test(T element),{T orElse()?}) {}

Finds the single element in this stream matching [test]

like [lastWhere],except that it is an error if more than one matching element occurs in this stream.

Future<T> singleWhere(bool test(T element),{T orElse()?}) {
}

returns the value of the [index] the data event of this stream.

Future<T> elementAt(int index) {
}

creates a new stream with the same events as this stream

  1. whenever more than [timeLimit] passes between two events from this stream,the [onTimeOut] function is called, which can emit further events on the returned stream.
    2.the countdown doesn't start until the returned stream is listened to . the countdown is reset every time an event is forwarded from this streams,or when this stream is paused and resumed.
  2. if [onTimeout] is omitted, a timeout will just put a [TimeOutException] into the error channel of the returned stream.
  3. if the call to [onTimeOut] throws, the error is emitted on the returned stream.
  4. the returned stream is a broadcast stream if this stream is ,
  5. if a broadcast stream is listened to more than once, each subscription will have its individually time that starts counting on listen, and the subscriptions's timers cane paused individually.
Stream<T> timeout(Duration timeLimit,{void onTimeOut(EventSink<T> sink)?}) {}

A subscription on events from a [Stream].

when you listen to a [Stream] using [Stream.listen] a [StreamSubscription] object is returned.

abstract class StreamSubScription<T> {}

A [Sink] that supports adding errors.

  1. this makes it suitable for capturing the results of asynchronous computations, which can complete with a value or an error.
  2. the [EventSink] has been designed to handle asynchronous events from [Stream]s,See ,for example,[Stream,eventTransformed] which uses "EventSink" to transform events.

[Stream] wrapper that only exposes the [Stream] interface.

class StreamView<T> extends Stream<T> {}

Abstract interface for a "Sink" accepting multiple entire streams.

  1. a consumer can accept a number of consecutive streams using [addStrem] ,and when no further data need to be added, the [close] method tells the consumer to complete its work and shut down .
  2. the [Stream.pipe] accepts a "StreamConsumer" and will pass the stream to the consumers [addStream] method. when that completes ,it will call [close] and then complete its own returned future.
abstract class StreamConsumer<S> {}

A object that accpets stream events both synchronously and asynchronously.

  1. A [StreamSink] combines the methods from [StreamConsumer] and [EventSink].
  2. The [EventSink] methods can't be used while the [addStream] is called. as soon as the [addStream]'s [Future] completes with a value, the [EventSink] methods can be used again.
  3. if [addStream] is called after any of the [EventSink] methods it'l be delayed until the underlying system has consumed the data added by the [EventSink] methods.
  4. when [EventSink] methods are used,the [done]、[Future] can be used to catch any error.
  5. when [close] is called, it will return the [done] 、[Future]
abstract class StreamSink<S> implements EventSink<S>,StreamConsumer<S>{}

Transforms a Stream

  1. when a stream's [Stream.transform] method is invoked with a [StreamTransformer] ,the stream calls the [bind] method on the provided transformer, the resulting stream is then returned from the [Stream.transform] method.
    2.conceptually,a transformer is simply a function from [Stream] to [Stream] that is encapsulated into a class.
  2. it is good practice to write transformers that can be used multiple times.
  3. all other transforming methods on [Stream], such as [Stream.map]逼纸、[Stream.where]创肥、[Stream.expand] can be implements using [Stream.transform]
  4. A [StreamTransformer] is thus very powerful but often also a bit more complicated(復雜) to use.
abstract class StreamTransformer<S,T> {}

A [Iterator]- like interface for the values of a [Stream].

  1. this wraps a [Stream] and a subscription on the stream. it listens on the stream,and completed the future returned by [moveNext] when the next value becomes available料祠。
  2. the stream may be paused between calls to [moveNext]
  3. the [current] value must only be used after a future returned by [moveNext] has complete with "true",and only until [moveNext] is called again.
abstract class StreamIterator<T>{}

An enhanced stream constroller provided by [Stream.multi]

  1. Acts like a normal asynchronous controller, but also allows adding events synchronously.
  2. as with any synchronous event delivery, the sender should be very careful to not deliver events at times when a new listener might not be ready to receive them .
  3. that generally means only delivering events synchronously in response to other asynchronous events,because that is a time when an asynchronous event could happen.
abstract class MultiStreamController<T> implements StreamController<T>{}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末汉形,一起剝皮案震驚了整個濱河市后豫,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌涌穆,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,599評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件雀久,死亡現(xiàn)場離奇詭異宿稀,居然都是意外死亡,警方通過查閱死者的電腦和手機赖捌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評論 3 385
  • 文/潘曉璐 我一進店門原叮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人巡蘸,你說我怎么就攤上這事奋隶。” “怎么了悦荒?”我有些...
    開封第一講書人閱讀 158,084評論 0 348
  • 文/不壞的土叔 我叫張陵唯欣,是天一觀的道長。 經(jīng)常有香客問我搬味,道長境氢,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,708評論 1 284
  • 正文 為了忘掉前任碰纬,我火速辦了婚禮萍聊,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘悦析。我一直安慰自己寿桨,他們只是感情好,可當我...
    茶點故事閱讀 65,813評論 6 386
  • 文/花漫 我一把揭開白布强戴。 她就那樣靜靜地躺著亭螟,像睡著了一般。 火紅的嫁衣襯著肌膚如雪骑歹。 梳的紋絲不亂的頭發(fā)上预烙,一...
    開封第一講書人閱讀 50,021評論 1 291
  • 那天,我揣著相機與錄音道媚,去河邊找鬼扁掸。 笑死,一個胖子當著我的面吹牛最域,可吹牛的內(nèi)容都是我干的谴分。 我是一名探鬼主播仅政,決...
    沈念sama閱讀 39,120評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼撞芍!你這毒婦竟也來了温学?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,866評論 0 268
  • 序言:老撾萬榮一對情侶失蹤饵沧,失蹤者是張志新(化名)和其女友劉穎巍杈,沒想到半個月后二打,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體匿刮,經(jīng)...
    沈念sama閱讀 44,308評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡僧凰,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,633評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了熟丸。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片训措。...
    茶點故事閱讀 38,768評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖光羞,靈堂內(nèi)的尸體忽然破棺而出绩鸣,到底是詐尸還是另有隱情,我是刑警寧澤纱兑,帶...
    沈念sama閱讀 34,461評論 4 333
  • 正文 年R本政府宣布呀闻,位于F島的核電站,受9級特大地震影響潜慎,放射性物質(zhì)發(fā)生泄漏捡多。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,094評論 3 317
  • 文/蒙蒙 一铐炫、第九天 我趴在偏房一處隱蔽的房頂上張望垒手。 院中可真熱鬧,春花似錦倒信、人聲如沸科贬。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,850評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽唆迁。三九已至,卻和暖如春竞穷,著一層夾襖步出監(jiān)牢的瞬間唐责,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,082評論 1 267
  • 我被黑心中介騙來泰國打工瘾带, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留鼠哥,地道東北人。 一個月前我還...
    沈念sama閱讀 46,571評論 2 362
  • 正文 我出身青樓看政,卻偏偏與公主長得像朴恳,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子允蚣,可洞房花燭夜當晚...
    茶點故事閱讀 43,666評論 2 350

推薦閱讀更多精彩內(nèi)容