感覺文章還行喘落,請留下你的贊。
一蟹倾、Stream 是什么匣缘?
Stream 是異步數(shù)據(jù)事件的源。Stream 提供了一種接收事件序列的方法,可以通過 listen 進(jìn)行數(shù)據(jù)監(jiān)聽鲜棠;通過 error 接收失敗狀態(tài)肌厨;通過 done 接收事件的結(jié)束狀態(tài)。
注意: Stream 只有當(dāng)被監(jiān)聽后豁陆,才能使其產(chǎn)生事件柑爸。同時會產(chǎn)生一個 StreamSubscription 的對象,該對象是提供事件的活動對象盒音,可用于停止再次監(jiān)聽或者臨時暫停訂閱事件表鳍。
二、 Stream 的訂閱對象 StreamSubscription
-
StreamSubscription 的種類
Stream 的訂閱流有兩種祥诽,分別是:單訂閱流(Single-subscription)和多訂閱流(broadcast)譬圣。
-
單訂閱流(Single-subscription)
- 單訂閱流只允許有一個監(jiān)聽器(listen);只有監(jiān)聽后才會產(chǎn)生事件雄坪;取消監(jiān)聽器時也停止事件發(fā)送厘熟,即是 Stream 還有更多事件。
- 單訂閱流即是取消了第一次訂閱,也不允許再次訂閱绳姨。
- 單訂閱流常用于流較大的連續(xù)數(shù)據(jù)事件颇玷,如文件 I/O,
-
多訂閱流(broadcast, 廣播)
廣播流可以有多個監(jiān)聽器就缆,廣播事件就緒時就觸發(fā)其事件帖渠,無論有無監(jiān)聽器。
廣播流常用于獨(dú)立的事件或者觀察者竭宰。
-
廣播流可以取消監(jiān)聽空郊,然后再次監(jiān)聽。
注意: 如果多個監(jiān)聽器想訂閱單個訂閱流切揭,請使用 asBroadcastStream 在單訂閱流頂部創(chuàng)建廣播流狞甚。
三舅桩、 Stream 的構(gòu)建
-
empty
/// 空的廣播流 void emptyStream() { var stream = Stream.empty(); stream.listen((event) { print("empty -- listen"); }, onDone: () { print("empty -- onDone"); // flutter: empty -- onDone }, onError: (e) { print("empty -- onError"); }, cancelOnError: true); }
這是一個流纵穿,它在監(jiān)聽后祝峻,只發(fā)出一個 onDone 事件汁展,其他什么都不做。
-
error
// 創(chuàng)建一個錯誤的流 void createErrorStream() { var stream = Stream.error("錯誤流"); stream.listen((event) { print("error -- listen"); }, onError: (e) { print("error -- onError"); // flutter: error -- onError }, onDone: () { print("error -- onDone"); }, cancelOnError: true); }
這是創(chuàng)建在事件完成前發(fā)出的單個錯誤事件流捧挺。
-
fromFuture
// 從Future創(chuàng)建一個單事件流 void createStreamFormFuture() { var stream = Stream.fromFuture( Future.delayed(Duration(seconds: 1), () { print("Future 延遲事件"); }), ); stream.listen((event) { print("fromFuture -- listen-- ${event.toString()}"); // flutter: fromFuture -- listen-- null }, onError: (e) { print("fromFuture -- onError"); }, onDone: () { print("fromFuture -- onDone"); // flutter: fromFuture -- onDone }, cancelOnError: true); }
這是有 Future 創(chuàng)建的一個單訂閱流州泊。
注意:
1.這是單訂閱流在未監(jiān)聽時捕捂,它的事件將有 _SyncStreamController 的緩存區(qū)保留励背,當(dāng)被監(jiān)聽后春霍,再次發(fā)出事件。
2.對于一個單一的值來說叶眉,在 Future 做 then之前等待一個偵聽器是不值得的址儒。 -
fromFutures
// 從一組 Future 中創(chuàng)建單訂閱流 void createStreamFromMoreFuture() { var stream = Stream.fromFutures([ Future(() { print("Future - 1"); }), Future(() { print("Future - 2"); }), ]); stream.listen((event) { print("fromFutures -- listen-- ${event.toString()}"); }, onError: (e) { print("fromFutures -- onError"); }, onDone: () { print("fromFutures -- onDone"); }, cancelOnError: true); }
日志輸出:
flutter: Future - 1 flutter: fromFutures -- listen-- null flutter: Future - 2 flutter: fromFutures -- listen-- null flutter: fromFutures -- onDone
從一組 Future 中創(chuàng)建一個單訂閱流。該流有幾個 Future 則 listen 方法將會被調(diào)用幾次衅疙,同時 onDone 方法最后調(diào)用莲趣。
-
fromIterable
// 創(chuàng)建一個可以從改流中獲取數(shù)據(jù)的單訂閱流 void createStreamFromIterable() { var stream = Stream.fromIterable([1, 2, 3, 4]); stream.listen((event) { print("fromIterable -- listen-- ${event.toString()}"); }, onError: (e) { print("fromIterable -- onError"); }, onDone: () { print("fromIterable -- onDone"); }, cancelOnError: true); }
日志輸出:
flutter: fromIterable -- listen-- 1 flutter: fromIterable -- listen-- 2 flutter: fromIterable -- listen-- 3 flutter: fromIterable -- listen-- 4 flutter: fromIterable -- onDone
創(chuàng)建一個可以從流中獲取值的單訂閱流。該流有幾個元素,則 listen 方法將會被調(diào)用幾次饱溢,同時 onDone 方法最后調(diào)用喧伞。
-
multi
/// 創(chuàng)建一個多訂閱流 void createMutilStream() { var stream = Stream.multi((control) { control.addSync("multi--1"); control.addSync("multi--2"); control.close(); }); stream.listen((event) { print("multi -- listen-- ${event.toString()}"); }, onError: (e) { print("multi -- onError"); }, onDone: () { print("multi -- onDone"); }, cancelOnError: false); Future.delayed(Duration(seconds: 2), () { stream.listen((event) { print("multi -- listen-- 2-- ${event.toString()}"); }, onError: (e) { print("multi -- onError -- 2"); }, onDone: () { print("multi -- onDone-- 2"); }, cancelOnError: false); }); }
日志輸出:
flutter: multi -- listen-- multi--1 flutter: multi -- listen-- multi--2 flutter: multi -- onDone flutter: multi -- listen-- 2-- multi--1 flutter: multi -- listen-- 2-- multi--2 flutter: multi -- onDone-- 2
創(chuàng)建一個廣播流,通過 control 發(fā)出響應(yīng)的事件理朋。注意在 control 不調(diào)用 close 方法絮识,則監(jiān)聽中的 onDone 方法也不會調(diào)用。
-
periodic
// 創(chuàng)建一定間隔事件發(fā)出事件的流 void createPeriodicStream() { var stream = Stream.periodic(Duration(seconds: 2), (value) { return value; }); stream.listen((event) { print("periodic -- listen-- 2-- ${event.toString()}"); }, onError: (e) { print("periodic -- onError -- 2"); }, onDone: () { print("periodic -- onDone-- 2"); }, cancelOnError: false); }
日志輸出:
flutter: periodic -- listen-- 2-- 0 flutter: periodic -- listen-- 2-- 1 flutter: periodic -- listen-- 2-- 2 flutter: periodic -- listen-- 2-- 3 flutter: periodic -- listen-- 2-- 4 flutter: periodic -- listen-- 2-- 5 ...
這是創(chuàng)建一個指定間隔時間內(nèi)發(fā)出事件的單訂閱流嗽上。
-
value
// 創(chuàng)建一個在完成前發(fā)出事件的單訂閱流 void createSingleStream() { var stream = Stream.value("單值流"); stream.listen((event) { print("value -- listen-- ${event.toString()}"); }, onError: (e) { print("value -- onError"); }, onDone: () { print("value -- onDone"); }, cancelOnError: false); }
日志輸出:
flutter: value -- listen-- 單值流 flutter: value -- onDone
這是創(chuàng)建一個在完成前發(fā)出事件的單訂閱流。
-
eventTransformed
/// 創(chuàng)建一個將現(xiàn)有流的所有事件通過接收器轉(zhuǎn)換通過管道發(fā)出 void createevEntTransformed() { var stream = Stream.eventTransformed(Stream.fromIterable([1, 2]), (sink) { sink.add(2); return sink; }); stream.listen((event) { print("eventTransformed -- listen-- ${event.toString()}"); }, onError: (e) { print("eventTransformed -- onError"); }, onDone: () { print("eventTransformed -- onDone"); }, cancelOnError: false); }
日志輸出:
flutter: eventTransformed -- listen-- 2 flutter: eventTransformed -- listen-- 1 flutter: eventTransformed -- listen-- 2 flutter: eventTransformed -- onDone
這是創(chuàng)建一個將現(xiàn)有事件通過接收器轉(zhuǎn)換通過通道發(fā)出熄攘。
四兽愤、Stream 屬性
-
first
first 是事件流的第一個元素,是 Futurt<T> 的對象。
實(shí)現(xiàn)原理核心代碼是:Future<T> get first { _Future<T> future = new _Future<T>(); StreamSubscription<T> subscription = this.listen(null, onError: future._completeError, onDone: () { try { throw IterableElementError.noElement(); } catch (e, s) { _completeWithErrorCallback(future, e, s); } }, cancelOnError: true); subscription.onData((T value) { _cancelAndValue(subscription, future, value); }); return future; } void _cancelAndValue(StreamSubscription subscription, _Future future, value) { var cancelFuture = subscription.cancel(); if (cancelFuture != null && !identical(cancelFuture, Future._nullFuture)) { cancelFuture.whenComplete(() => future._complete(value)); } else { future._complete(value); } }
上面解釋: 實(shí)現(xiàn)原理就是事件流發(fā)出第一個事件時
subscription.onData((T value) { _cancelAndValue(subscription, future, value); })
浅萧,然后取消訂閱流subscription.cancel()
逐沙。實(shí)例:
void first() { var stream1 = Stream.value("first 測試"); stream1.first.then((value) => print(value)); var stream2 = Stream.fromIterable([1, 2, 3]); stream2.first.then((value) => print(value)); }
日志輸出:
flutter: first 測試 flutter: 1
注意: 當(dāng)我們的流是單訂閱流時,調(diào)用 frist 之后洼畅,就不能被再次監(jiān)聽吩案。因?yàn)?
stream.first
包含一次監(jiān)聽(源碼)。 -
isEmpty
isEmpty 是判斷訂閱流是否為空帝簇。核心代碼如下:
Future<bool> get isEmpty { _Future<bool> future = new _Future<bool>(); StreamSubscription<T> subscription = this.listen(null, onError: future._completeError, onDone: () { future._complete(true); }, cancelOnError: true); subscription.onData((_) { _cancelAndValue(subscription, future, false); }); return future; }
上面代碼可知如果訂閱流能回調(diào) onDone 方法徘郭,則判定其不為空,否則為空丧肴。
實(shí)例如下:void isEmpty() { var stream = Stream.empty(); stream.isEmpty.then((value) => print(value)); var stream1 = Stream.fromIterable(["發(fā)生錯誤"]); stream1.isEmpty.then((value) => print(value)); var stream2 = Stream.multi((control) { control.add(1); }); stream2.isEmpty.then((value) => print(value)); }
日志輸出:
flutter: true flutter: false flutter: false
-
last
last 是獲取訂閱流發(fā)出事件的最后一個残揉,是 Future<T> 的類型。核心代碼:
Future<T> get last { _Future<T> future = new _Future<T>(); late T result; bool foundResult = false; listen( (T value) { foundResult = true; result = value; }, onError: future._completeError, onDone: () { if (foundResult) { future._complete(result); return; } try { throw IterableElementError.noElement(); } catch (e, s) { _completeWithErrorCallback(future, e, s); } }, cancelOnError: true); return future; }
從上面代碼的 onDone 方法看出芋浮,這是把訂閱流的最后一個事件抱环,通過 Future 返回。
注意:
- 如果訂閱流事件發(fā)生錯誤纸巷,這 Future 完成錯誤并停止處理镇草。
- 如果訂閱流是空,則 onDone 函數(shù)調(diào)用瘤旨,并拋出無事件的異常陶夜。
實(shí)例:
void last() { var stream = Stream.fromIterable([1, 2, 3]); stream.last.then((value) => print(value)); // 3 }
-
length
length 是獲取訂閱流發(fā)出事件的個數(shù)。核心代碼:
Future<int> get length { _Future<int> future = new _Future<int>(); int count = 0; this.listen( (_) { count++; }, onError: future._completeError, onDone: () { future._complete(count); }, cancelOnError: true); return future; }
從上面代碼裆站,可以看到是通過聲明 count 來記錄流發(fā)出的事件条辟。
實(shí)例:
void length() { var stream = Stream.fromIterable([1, 2, 3]); stream.length.then((value) => print(value)); // 3 }
-
single
single 是獲取只能發(fā)出一次事件的訂閱流的事件。核心代碼:
Future<T> get single { _Future<T> future = new _Future<T>(); late T result; bool foundResult = false; StreamSubscription<T> subscription = this.listen(null, onError: future._completeError, onDone: () { if (foundResult) { future._complete(result); return; } try { throw IterableElementError.noElement(); } catch (e, s) { _completeWithErrorCallback(future, e, s); } }, cancelOnError: true); subscription.onData((T value) { if (foundResult) { // This is the second element we get. try { throw IterableElementError.tooMany(); } catch (e, s) { _cancelAndErrorWithReplacement(subscription, future, e, s); } return; } foundResult = true; result = value; }); return future; }
從上面代碼知道: 首先使用 foundResult 在 onData 方法中為 false 跳過檢查宏胯,然后 foundResult 變?yōu)?true 和 result 獲取發(fā)送事件羽嫡。在 onDone 方法中 foundResult 為 true 完成 Future 的
_complete
方法結(jié)束。注意: 使用single 的訂閱流必須是只能包含一個事件的訂閱流肩袍,否則會拋出 tooMany 的異常杭棵。
實(shí)例:
void single() { var stream = Stream.fromIterable([1]); stream.single.then((value) => print(value));// 1 }
五、Stream 的方法
-
any
any 是檢查訂閱流中是否有符合test 條件的事件氛赐。核心代碼:
Future<bool> any(bool test(T element)) { _Future<bool> future = new _Future<bool>(); StreamSubscription<T> subscription = this.listen(null, onError: future._completeError, onDone: () { future._complete(false); }, cancelOnError: true); subscription.onData((T element) { _runUserCode(() => test(element), (bool isMatch) { if (isMatch) { _cancelAndValue(subscription, future, true); } }, _cancelAndErrorClosure(subscription, future)); }); return future; } void _cancelAndValue(StreamSubscription subscription, _Future future, value) { var cancelFuture = subscription.cancel(); if (cancelFuture != null && !identical(cancelFuture, Future._nullFuture)) { cancelFuture.whenComplete(() => future._complete(value)); } else { future._complete(value); } }
從上面代碼可以看到: 通過 onData 函數(shù)獲取 element 魂爪,然后經(jīng)過
_runUserCode
方法檢查并返回結(jié)果 isMatch ,如果 isMatch 為 true 則調(diào)用_cancelAndValue
方法取消訂閱subscription.cancel()
,然后完成 Future艰管。注意: any 是檢查訂閱流中事件是否符合條件的事件滓侍,如果有則返回 true ,否則返回 false。any 只要檢查到有一個符合則檢查結(jié)束牲芋,后面的就不在檢查撩笆。
實(shí)例:
void any() { var stream = Stream.fromIterable([1, 2, 3]); stream.any((element) => element > 2).then((value) => print(value)); // true }
-
asBroadcastStream
asBroadcastStream 是將一個單訂閱流轉(zhuǎn)換為一個廣播流捺球。核心代碼:
Stream<T> asBroadcastStream( {void onListen(StreamSubscription<T> subscription)?, void onCancel(StreamSubscription<T> subscription)?}) { return new _AsBroadcastStream<T>(this, onListen, onCancel); }
從上面代碼可知,原有的訂閱流經(jīng)過 asBroadcastStream 后夕冲,生成新的 _AsBroadcastStream 的訂閱流氮兵。
實(shí)例代碼:
void asBroadcastStream() { var stream = Stream.fromFuture( Future(() { return 110; }), ); var stream1 = stream.asBroadcastStream(); stream1 ..listen((event) { print("event -- 1 -- $event"); }); stream1.listen((event) { print("event -- 2 -- $event"); }); }
日志輸出:
flutter: event -- 1 -- 110 flutter: event -- 2 -- 110
注意:上面的實(shí)例不能寫成下面形式:
void asBroadcastStream() { var stream = Stream.fromFuture( Future(() { return 110; }), ); stream.asBroadcastStream().listen((event) { print("event -- 1 -- $event"); }); stream.asBroadcastStream().listen((event) { print("event -- 2 -- $event"); }); }
因?yàn)?asBroadcastStream 本身對 stream 就是訂閱,而stream 兩次 asBroadcastStream 就造成單訂閱多次被訂閱的錯誤歹鱼。
-
asyncExpand
asyncExpand 是將一個訂閱流的事件全部轉(zhuǎn)化為一系列的異步事件的廣播流泣栈。核心代碼:
Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) { _StreamControllerBase<E> controller; if (isBroadcast) { controller = _SyncBroadcastStreamController<E>(null, null); } else { controller = _SyncStreamController<E>(null, null, null, null); } controller.onListen = () { StreamSubscription<T> subscription = this.listen(null, onError: controller._addError, // Avoid Zone error replacement. onDone: controller.close); subscription.onData((T event) { Stream<E>? newStream; try { newStream = convert(event); } catch (e, s) { controller.addError(e, s); return; } if (newStream != null) { subscription.pause(); controller.addStream(newStream).whenComplete(subscription.resume); } }); controller.onCancel = subscription.cancel; if (!isBroadcast) { controller ..onPause = subscription.pause ..onResume = subscription.resume; } }; return controller.stream; }
從上面代碼可知,是將訂閱流事件經(jīng)過
asyncExpand
生成 _StreamControllerBase 對象,將onData
獲取的數(shù)據(jù)生成新的Stream ,然后在由 _StreamControllerBase 通過addStream
對外提供新的 Stream弥姻。實(shí)例代碼:
void asyncExpand() { var stream = Stream.fromIterable([1, 2, 3]); stream .asyncExpand((event) => Stream.fromFuture( Future(() { return event * 2; }), )) .listen((event) { print("asyncExpand-- $event"); }); var stream1 = Stream.fromIterable([1, 2, 3]).asBroadcastStream(); var stream2 = stream1.asyncExpand((event) => Stream.fromFuture( Future(() { return event; }), )); stream2.listen((event) { print("asyncExpand--1- $event"); }); stream2.listen((event) { print("asyncExpand--2- $event"); }); }
日志輸出:
flutter: asyncExpand-- 2 flutter: asyncExpand--1- 1 flutter: asyncExpand--2- 1 flutter: asyncExpand-- 4 flutter: asyncExpand--1- 2 flutter: asyncExpand--2- 2 flutter: asyncExpand-- 6 flutter: asyncExpand--1- 3 flutter: asyncExpand--2- 3
注意: 什么類型的訂閱流經(jīng)過
asyncExpand
生成對應(yīng)類型的訂閱流南片。 -
asyncMap
asyncMap 是創(chuàng)建一個新流,并將該流的每個事件都轉(zhuǎn)化為一個新的異步事件蚁阳。核心代碼:
Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) { _StreamControllerBase<E> controller; if (isBroadcast) { controller = _SyncBroadcastStreamController<E>(null, null); } else { controller = _SyncStreamController<E>(null, null, null, null); } controller.onListen = () { StreamSubscription<T> subscription = this.listen(null, onError: controller._addError, // Avoid Zone error replacement. onDone: controller.close); FutureOr<Null> add(E value) { controller.add(value); } final addError = controller._addError; final resume = subscription.resume; subscription.onData((T event) { FutureOr<E> newValue; try { newValue = convert(event); } catch (e, s) { controller.addError(e, s); return; } if (newValue is Future<E>) { subscription.pause(); newValue.then(add, onError: addError).whenComplete(resume); } else { // TODO(40014): Remove cast when type promotion works. controller.add(newValue as dynamic); } }); controller.onCancel = subscription.cancel; if (!isBroadcast) { controller ..onPause = subscription.pause ..onResume = resume; } }; return controller.stream; }
從上面代碼知道铃绒,訂閱流經(jīng)過
ansyMap
后生成新的_StreamControllerBase
并返回 _StreamControllerBase 的 Stream, 然后通過 訂閱流的onData
方法獲取事件螺捐,然后經(jīng)過convert
獲得newValue
對象颠悬。然后在判斷 newValue 的類型,分別執(zhí)行不同的方法定血,最后通過 _StreamControllerBase 的add
方法進(jìn)行發(fā)出事件赔癌。實(shí)例代碼:
void asyncMap() { var stream = Stream.fromIterable([1, 2]); stream.asyncMap((event) => event > 1).listen((event) { print("asyncMap -1- $event"); }); var stream1 = Stream.fromIterable([1, 2]); stream1 .asyncMap((event) => Future(() { return event > 1; })) .listen((event) { print("asyncMap -2- $event"); }); }
日志輸出:
flutter: asyncMap -1- false flutter: asyncMap -1- true flutter: asyncMap -2- false flutter: asyncMap -2- true
-
cast
cast 是將訂閱流轉(zhuǎn)化 Stream<R> 的訂閱流,作用是檢查訂閱流發(fā)出是事件是否是 R 類型澜沟。核心代碼:
Stream<R> cast<R>() => Stream.castFrom<T, R>(this); static Stream<T> castFrom<S, T>(Stream<S> source) => new CastStream<S, T>(source); class CastStream<S, T> extends Stream<T> { final Stream<S> _source; CastStream(this._source); bool get isBroadcast => _source.isBroadcast; StreamSubscription<T> listen(void Function(T data)? onData, {Function? onError, void Function()? onDone, bool? cancelOnError}) { return new CastStreamSubscription<S, T>( _source.listen(null, onDone: onDone, cancelOnError: cancelOnError)) ..onData(onData) ..onError(onError); } Stream<R> cast<R>() => new CastStream<S, R>(_source); }
實(shí)例代碼:
void cast() { var stream = Stream.fromIterable([11, 22]); stream.cast<int>().listen((event) { print("cast--$event"); }); }
日志輸出:
flutter: cast--11 flutter: cast--22
注意: Stream 的
cast
方法能夠檢查 Stream 發(fā)出事件的類型是否是指定類型灾票。 -
contains
contains 是判斷訂閱流中事件是否有指定的事件, 返回一個 Future<bool> 對象茫虽。 核心代碼:
Future<bool> contains(Object? needle) { _Future<bool> future = new _Future<bool>(); StreamSubscription<T> subscription = this.listen(null, onError: future._completeError, onDone: () { future._complete(false); }, cancelOnError: true); subscription.onData((T element) { _runUserCode(() => (element == needle), (bool isMatch) { if (isMatch) { _cancelAndValue(subscription, future, true); } }, _cancelAndErrorClosure(subscription, future)); }); return future; }
從上面代碼知刊苍,訂閱流經(jīng)過
onData
方法獲取 element 事件,然后又_runUserCode
判斷 element 事件是否和指定的事件相等濒析,返回 isMatch, 如果 isMatch 為 true 則有_cancelAndValue
方法以 Future 返回正什。實(shí)例代碼:
void contains() { var stream = Stream.fromIterable([2, 3, 4]); stream.contains(4).then((value) => print(value)); // true }
-
distinct
distinct 是去除訂閱流中相同的事件只發(fā)出一次。核心代碼:
Stream<T> distinct([bool equals(T previous, T next)?]) { return new _DistinctStream<T>(this, equals); } class _DistinctStream<T> extends _ForwardingStream<T, T> { static final _SENTINEL = new Object(); final bool Function(T, T)? _equals; _DistinctStream(Stream<T> source, bool equals(T a, T b)?) : _equals = equals, super(source); StreamSubscription<T> _createSubscription(void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError) { return new _StateStreamSubscription<Object?, T>( this, onData, onError, onDone, cancelOnError, _SENTINEL); } void _handleData(T inputEvent, _EventSink<T> sink) { var subscription = sink as _StateStreamSubscription<Object?, T>; var previous = subscription._subState; if (identical(previous, _SENTINEL)) { // First event. Cannot use [_equals]. subscription._subState = inputEvent; sink._add(inputEvent); } else { T previousEvent = previous as T; var equals = _equals; bool isEqual; try { if (equals == null) { isEqual = (previousEvent == inputEvent); } else { isEqual = equals(previousEvent, inputEvent); } } catch (e, s) { _addErrorWithReplacement(sink, e, s); return; } if (!isEqual) { sink._add(inputEvent); subscription._subState = inputEvent; } } } }
從上面代碼知号杏,訂閱流調(diào)用
distinct
方法生 _DistinctStream 訂閱流婴氮,然后 _DistinctStream 中創(chuàng)建 _StateStreamSubscription 的訂閱者,然后再有_handleData
方法中首先使用identical
方法規(guī)避第一次調(diào)用的判斷盾致,并將事件賦值給 _StateStreamSubscription的_subState 屬性主经。下次_handleData
調(diào)用經(jīng)過equals
判斷得到 isEqual 對象,然后 isEqual 是否為 true 決定是否發(fā)出事件庭惜。實(shí)例代碼:
void distinct() { var stream = Stream.fromIterable([1, 3, 3, 6]); stream.distinct().listen((event) { print("distinct -- $event"); }); }
日志輸出:
flutter: distinct -- 1 flutter: distinct -- 3 flutter: distinct -- 6
-
drain
drain 是清除訂閱流所有的事件罩驻,自定義自己的數(shù)據(jù)事件以 Future 返回。核心代碼:
Future<E> drain<E>([E? futureValue]) { if (futureValue == null) { futureValue = futureValue as E; } return listen(null, cancelOnError: true).asFuture<E>(futureValue); }
從上面代碼知道訂閱流經(jīng)過
drain
方法蜈块,將 StreamSubscription 通過asFuture
轉(zhuǎn)化為 Future 對象并帶 futureValue 初始值鉴腻。實(shí)例代碼:
void drain() { var stream = Stream.fromIterable([11, 33, 44]); stream.drain(2).then((value) => print(value)); // 2 }
-
elementAt
elementAt 是獲取訂閱流中第 index 事件迷扇, 返回一個 Future 對象百揭。核心代碼:
Future<T> elementAt(int index) { RangeError.checkNotNegative(index, "index"); _Future<T> result = new _Future<T>(); int elementIndex = 0; StreamSubscription<T> subscription; subscription = this.listen(null, onError: result._completeError, onDone: () { result._completeError( new RangeError.index(index, this, "index", null, elementIndex), StackTrace.empty); }, cancelOnError: true); subscription.onData((T value) { if (index == elementIndex) { _cancelAndValue(subscription, result, value); return; } elementIndex += 1; }); return result; }
從上面代碼知:訂閱流經(jīng)過
elementAt(int index)
方法爽哎,首先調(diào)用checkNotNegative
檢查 index 的值是否小于零;然后生成 subscription 訂閱對象和elementIndex 記錄廣播次數(shù) 器一,然后在onData
方法中判斷 index 是否和 elementIndex 相等课锌。如果相等,再有_cancelAndValue
取消訂閱對象祈秕,然后調(diào)用onDone
方法渺贤,返回 Future 對象。實(shí)例代碼:
void elementAt() { var stream = Stream.fromIterable([11, 33, 44]); stream.elementAt(2).then((value) => print(value)); // 44 }
-
every
every 是判斷訂閱流事件是否符合指定的條件请毛,結(jié)果以 Future<bool> 返回志鞍。核心代碼:
Future<bool> every(bool test(T element)) { _Future<bool> future = new _Future<bool>(); StreamSubscription<T> subscription = this.listen(null, onError: future._completeError, onDone: () { future._complete(true); }, cancelOnError: true); subscription.onData((T element) { _runUserCode(() => test(element), (bool isMatch) { if (!isMatch) { _cancelAndValue(subscription, future, false); } }, _cancelAndErrorClosure(subscription, future)); }); return future; }
從上面代碼知:訂閱流調(diào)用
every
函數(shù),生成 subscription 和 future 對象方仿。然后 subscription的onData
方法中調(diào)用_runUserCode
方法檢查是否符合條件固棚,并返回isMatch,然后在根據(jù) isMatch 為 false 調(diào)用_cancelAndValue
方法取消 subscription 取消監(jiān)聽仙蚜,以 Future<bool> 返回結(jié)果此洲。實(shí)例代碼:
void every() { var stream = Stream.fromIterable([110, 220, 330]); stream.every((element) => element > 100).then((value) => print(value)); // true }
-
expand
expand 是將訂閱流事件擴(kuò)展為事件序列。核心代碼:
Stream<S> expand<S>(Iterable<S> convert(T element)) { return new _ExpandStream<T, S>(this, convert); } class _ExpandStream<S, T> extends _ForwardingStream<S, T> { final _Transformation<S, Iterable<T>> _expand; _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) : this._expand = expand, super(source); void _handleData(S inputEvent, _EventSink<T> sink) { try { for (T value in _expand(inputEvent)) { sink._add(value); } } catch (e, s) { // If either _expand or iterating the generated iterator throws, // we abort the iteration. _addErrorWithReplacement(sink, e, s); } } }
從上面代碼知道委粉,訂閱流調(diào)用
expand
方法生成 _ExpandStream 訂閱流呜师,然后_ExpandStream 的_handleData
方法中通過for...in...
通過 sink 將事件發(fā)出。實(shí)例代碼:
void expand() { var stream = Stream.fromIterable([110, 220, 330]); stream.expand((element) => [element, element * 2]).listen((event) { print(event); }); }
日志輸出:
flutter: 110 flutter: 220 flutter: 220 flutter: 440 flutter: 330 flutter: 660
-
map
map 是將訂閱流轉(zhuǎn)化為另一種訂閱流贾节。核心代碼:
class _MapStream<S, T> extends _ForwardingStream<S, T> { final _Transformation<S, T> _transform; _MapStream(Stream<S> source, T transform(S event)) : this._transform = transform, super(source); void _handleData(S inputEvent, _EventSink<T> sink) { T outputEvent; try { outputEvent = _transform(inputEvent); } catch (e, s) { _addErrorWithReplacement(sink, e, s); return; } sink._add(outputEvent); } }
從上面代碼知: 訂閱流通過
map
生成 _MapStream 對象汁汗,然后有 _MapStream 的_handleData
方法中經(jīng)過_Transformation
將結(jié)果給 outputEvent,最后通過 sink 的_add
方法發(fā)出 outputEvent 事件。實(shí)例代碼:
void map() { var stream = Stream.fromIterable([110, 220, 330]); stream.map((event) => event > 200).listen((event) { print(event); }); }
日志輸出:
flutter: false flutter: true flutter: true
-
skip(int count)
skip 是跳過 count 次訂閱流中數(shù)據(jù)事件栗涂。核心代碼:
class _SkipStream<T> extends _ForwardingStream<T, T> { final int _count; _SkipStream(Stream<T> source, int count) : this._count = count, super(source) { // This test is done early to avoid handling an async error // in the _handleData method. RangeError.checkNotNegative(count, "count"); } StreamSubscription<T> _createSubscription(void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError) { return new _StateStreamSubscription<int, T>( this, onData, onError, onDone, cancelOnError, _count); } void _handleData(T inputEvent, _EventSink<T> sink) { var subscription = sink as _StateStreamSubscription<int, T>; int count = subscription._subState; if (count > 0) { subscription._subState = count - 1; return; } sink._add(inputEvent); } }
從上面代碼知知牌,事件源經(jīng)過
skip
函數(shù)生成 _SkipStream事件源; _SkipStream 中生成 StreamSubscription 訂閱對象;再有_handleData
方法中通過 subscription._subState 獲取指定跳過的次數(shù)判斷是否大于零戴差,直至subscription._subState 小于零送爸,然后通過sink._add(inputEvent) 發(fā)出后續(xù)數(shù)據(jù)事件。實(shí)例代碼:
void skip() { var stream = Stream.fromIterable([11, 22, 33]); stream.skip(2).listen((event) { print(event); // 33 }); }
注意: 跳過次數(shù) count 暖释,只能大于等于零; count 可以大于事件源中發(fā)送事件的個數(shù)袭厂。
-
toList
toList 是將事件源發(fā)出的所有數(shù)據(jù)事件存放到 List 中,并以 Future 形式返回球匕。核心代碼:
Future<List<T>> toList() { List<T> result = <T>[]; _Future<List<T>> future = new _Future<List<T>>(); this.listen( (T data) { result.add(data); }, onError: future._completeError, onDone: () { future._complete(result); }, cancelOnError: true); return future; }
從上面知道纹磺,事件源由
toList
創(chuàng)建List<T> result
數(shù)組,然后事件源訂閱將發(fā)出事件存放到 List<T> result 中亮曹,然后在onDone
方法中以 Future 返回橄杨。實(shí)例代碼:
void toList() { var stream = Stream.fromIterable([11, 22, 33]); stream.toList().then((value) => print(value)); // [11,22,33] }
-
toSet
toSet 是將事件源發(fā)出的事件排出重復(fù)的添加到 Set 中秘症。核心代碼:
Future<Set<T>> toSet() { Set<T> result = new Set<T>(); _Future<Set<T>> future = new _Future<Set<T>>(); this.listen( (T data) { result.add(data); }, onError: future._completeError, onDone: () { future._complete(result); }, cancelOnError: true); return future; }
從上面代碼知:發(fā)出事件的排重是有Set 完成。
實(shí)例代碼:
void toSet() { var stream = Stream.fromIterable([11, 22, 33, 33]); stream.toSet().then((value) => print(value)); // {11,22,33} }
-
take(int count)
take 是獲取事件源中 count 次的事件并生成新的事件源式矫。核心代碼:
class _TakeStream<T> extends _ForwardingStream<T, T> { final int _count; _TakeStream(Stream<T> source, int count) : this._count = count, super(source); StreamSubscription<T> _createSubscription(void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError) { if (_count == 0) { _source.listen(null).cancel(); return new _DoneStreamSubscription<T>(onDone); } return new _StateStreamSubscription<int, T>( this, onData, onError, onDone, cancelOnError, _count); } void _handleData(T inputEvent, _EventSink<T> sink) { var subscription = sink as _StateStreamSubscription<int, T>; int count = subscription._subState; if (count > 0) { sink._add(inputEvent); count -= 1; subscription._subState = count; if (count == 0) { // Closing also unsubscribes all subscribers, which unsubscribes // this from source. sink._close(); } } } }
從上面代碼可知乡摹,事件源經(jīng)過
take
生成_TakeStream
事件源,然后由_handleData
方法獲取指定次數(shù)事件采转,經(jīng)sink._add(inputEvent)
發(fā)出聪廉。實(shí)例代碼:
void take() { var stream = Stream.fromIterable([22, 22, 33, 33, 22, 11, 33, 11]); stream.take(2).listen((event) { print(event); //22 22 }); }
-
reduce
reduce 是減少事件源發(fā)出事件,將事件組合將結(jié)果以Future 輸出故慈。核心代碼:
Future<T> reduce(T combine(T previous, T element)) { _Future<T> result = new _Future<T>(); bool seenFirst = false; late T value; StreamSubscription<T> subscription = this.listen(null, onError: result._completeError, onDone: () { if (!seenFirst) { try { // Throw and recatch, instead of just doing // _completeWithErrorCallback, e, theError, StackTrace.current), // to ensure that the stackTrace is set on the error. throw IterableElementError.noElement(); } catch (e, s) { _completeWithErrorCallback(result, e, s); } } else { result._complete(value); } }, cancelOnError: true); subscription.onData((T element) { if (seenFirst) { _runUserCode(() => combine(value, element), (T newValue) { value = newValue; }, _cancelAndErrorClosure(subscription, result)); } else { value = element; seenFirst = true; } }); return result; }
從上面代碼知事件源有
reduce
生成 StreamSubscription<T> subscription 對象板熊,在onData
中有combine
將結(jié)果合并為 newValue 并賦值給 value 以 Future 返回。實(shí)例代碼:
void reduce() { var stream = Stream.fromIterable([11, 22, 33, 33]); stream .reduce((previous, element) => previous + element) .then((value) => print(value)); // 99 }
-
join
join 將事件源發(fā)出的事件以字符串的形式按指定字符竄拼接以 Futuren 輸出察绷。核心代碼:
Future<String> join([String separator = ""]) { _Future<String> result = new _Future<String>(); StringBuffer buffer = new StringBuffer(); bool first = true; StreamSubscription<T> subscription = this.listen(null, onError: result._completeError, onDone: () { result._complete(buffer.toString()); }, cancelOnError: true); subscription.onData(separator.isEmpty ? (T element) { try { buffer.write(element); } catch (e, s) { _cancelAndErrorWithReplacement(subscription, result, e, s); } } : (T element) { if (!first) { buffer.write(separator); } first = false; try { buffer.write(element); } catch (e, s) { _cancelAndErrorWithReplacement(subscription, result, e, s); } }); return result; }
從上面代碼知干签,事件源經(jīng)過
join
函數(shù)生成 result、buffer拆撼、subscription 容劳。由onData
方法將事件寫入到 buffer 中,然后在onDone
中將 buffer.toString() 的結(jié)果以Future 返回情萤。實(shí)例代碼:
void join() { var stream = Stream.fromIterable([11, 22, 33, 33]); stream.join("-").then((value) => print(value)); // 11-22-33-33 }