Flutter 的 Stream 探究

感覺文章還行喘落,請留下你的贊。

一蟹倾、Stream 是什么匣缘?

Stream 是異步數(shù)據(jù)事件的源。Stream 提供了一種接收事件序列的方法,可以通過 listen 進(jìn)行數(shù)據(jù)監(jiān)聽鲜棠;通過 error 接收失敗狀態(tài)肌厨;通過 done 接收事件的結(jié)束狀態(tài)。

注意: Stream 只有當(dāng)被監(jiān)聽后豁陆,才能使其產(chǎn)生事件柑爸。同時會產(chǎn)生一個 StreamSubscription 的對象,該對象是提供事件的活動對象盒音,可用于停止再次監(jiān)聽或者臨時暫停訂閱事件表鳍。

二、 Stream 的訂閱對象 StreamSubscription

  1. StreamSubscription 的種類

    Stream 的訂閱流有兩種祥诽,分別是:單訂閱流(Single-subscription)和多訂閱流(broadcast)譬圣。

  2. 單訂閱流(Single-subscription)
    • 單訂閱流只允許有一個監(jiān)聽器(listen);只有監(jiān)聽后才會產(chǎn)生事件雄坪;取消監(jiān)聽器時也停止事件發(fā)送厘熟,即是 Stream 還有更多事件。
    • 單訂閱流即是取消了第一次訂閱,也不允許再次訂閱绳姨。
    • 單訂閱流常用于流較大的連續(xù)數(shù)據(jù)事件颇玷,如文件 I/O,
  3. 多訂閱流(broadcast, 廣播)
    • 廣播流可以有多個監(jiān)聽器就缆,廣播事件就緒時就觸發(fā)其事件帖渠,無論有無監(jiān)聽器。

    • 廣播流常用于獨(dú)立的事件或者觀察者竭宰。

    • 廣播流可以取消監(jiān)聽空郊,然后再次監(jiān)聽。

      注意: 如果多個監(jiān)聽器想訂閱單個訂閱流切揭,請使用 asBroadcastStream 在單訂閱流頂部創(chuàng)建廣播流狞甚。

三舅桩、 Stream 的構(gòu)建

  • empty
    /// 空的廣播流
    void emptyStream() {
      var stream = Stream.empty();
      stream.listen((event) {
          print("empty -- listen");
      }, onDone: () {
          print("empty -- onDone"); // flutter: empty -- onDone
      }, onError: (e) {
          print("empty -- onError");
      }, cancelOnError: true);
    }
    

    這是一個流纵穿,它在監(jiān)聽后祝峻,只發(fā)出一個 onDone 事件汁展,其他什么都不做。

  • error
    // 創(chuàng)建一個錯誤的流
    void createErrorStream() {
        var stream = Stream.error("錯誤流");
        stream.listen((event) {
        print("error -- listen");
        }, onError: (e) {
        print("error -- onError"); // flutter: error -- onError
        }, onDone: () {
        print("error -- onDone");
        }, cancelOnError: true);
    }
    

    這是創(chuàng)建在事件完成前發(fā)出的單個錯誤事件流捧挺。

  • fromFuture
    // 從Future創(chuàng)建一個單事件流
    void createStreamFormFuture() {
        var stream = Stream.fromFuture(
        Future.delayed(Duration(seconds: 1), () {
            print("Future 延遲事件");
        }),
        );
        stream.listen((event) {
        print("fromFuture -- listen-- ${event.toString()}"); // flutter: fromFuture -- listen-- null
        }, onError: (e) {
        print("fromFuture -- onError");
        }, onDone: () {
        print("fromFuture -- onDone"); // flutter: fromFuture -- onDone
        }, cancelOnError: true);
    }
    

    這是有 Future 創(chuàng)建的一個單訂閱流州泊。

    注意:

    1.這是單訂閱流在未監(jiān)聽時捕捂,它的事件將有 _SyncStreamController 的緩存區(qū)保留励背,當(dāng)被監(jiān)聽后春霍,再次發(fā)出事件。
    2.對于一個單一的值來說叶眉,在 Future 做 then之前等待一個偵聽器是不值得的址儒。

  • fromFutures
    // 從一組 Future 中創(chuàng)建單訂閱流
    void createStreamFromMoreFuture() {
        var stream = Stream.fromFutures([
        Future(() {
            print("Future - 1");
        }),
        Future(() {
            print("Future - 2");
        }),
        ]);
        stream.listen((event) {
        print("fromFutures -- listen-- ${event.toString()}");
        }, onError: (e) {
        print("fromFutures -- onError");
        }, onDone: () {
        print("fromFutures -- onDone");
        }, cancelOnError: true);
    }
    

    日志輸出:

    flutter: Future - 1
    flutter: fromFutures -- listen-- null
    flutter: Future - 2
    flutter: fromFutures -- listen-- null
    flutter: fromFutures -- onDone
    

    從一組 Future 中創(chuàng)建一個單訂閱流。該流有幾個 Future 則 listen 方法將會被調(diào)用幾次衅疙,同時 onDone 方法最后調(diào)用莲趣。

  • fromIterable
    // 創(chuàng)建一個可以從改流中獲取數(shù)據(jù)的單訂閱流
    void createStreamFromIterable() {
        var stream = Stream.fromIterable([1, 2, 3, 4]);
        stream.listen((event) {
        print("fromIterable -- listen-- ${event.toString()}");
        }, onError: (e) {
        print("fromIterable -- onError");
        }, onDone: () {
        print("fromIterable -- onDone");
        }, cancelOnError: true);
    }
    

    日志輸出:

    flutter: fromIterable -- listen-- 1
    flutter: fromIterable -- listen-- 2
    flutter: fromIterable -- listen-- 3
    flutter: fromIterable -- listen-- 4
    flutter: fromIterable -- onDone
    

    創(chuàng)建一個可以從流中獲取值的單訂閱流。該流有幾個元素,則 listen 方法將會被調(diào)用幾次饱溢,同時 onDone 方法最后調(diào)用喧伞。

  • multi
    /// 創(chuàng)建一個多訂閱流
    void createMutilStream() {
        var stream = Stream.multi((control) {
        control.addSync("multi--1");
        control.addSync("multi--2");
        control.close();
        });
        stream.listen((event) {
        print("multi -- listen-- ${event.toString()}");
        }, onError: (e) {
        print("multi -- onError");
        }, onDone: () {
        print("multi -- onDone");
        }, cancelOnError: false);
    
        Future.delayed(Duration(seconds: 2), () {
        stream.listen((event) {
            print("multi -- listen-- 2-- ${event.toString()}");
        }, onError: (e) {
            print("multi -- onError -- 2");
        }, onDone: () {
            print("multi -- onDone-- 2");
        }, cancelOnError: false);
        });
    }
    

    日志輸出:

    flutter: multi -- listen-- multi--1
    flutter: multi -- listen-- multi--2
    flutter: multi -- onDone
    flutter: multi -- listen-- 2-- multi--1
    flutter: multi -- listen-- 2-- multi--2
    flutter: multi -- onDone-- 2
    

    創(chuàng)建一個廣播流,通過 control 發(fā)出響應(yīng)的事件理朋。注意在 control 不調(diào)用 close 方法絮识,則監(jiān)聽中的 onDone 方法也不會調(diào)用。

  • periodic
    // 創(chuàng)建一定間隔事件發(fā)出事件的流
    void createPeriodicStream() {
        var stream = Stream.periodic(Duration(seconds: 2), (value) {
        return value;
        });
        stream.listen((event) {
        print("periodic -- listen-- 2-- ${event.toString()}");
        }, onError: (e) {
        print("periodic -- onError -- 2");
        }, onDone: () {
        print("periodic -- onDone-- 2");
        }, cancelOnError: false);
    }
    

    日志輸出:

    flutter: periodic -- listen-- 2-- 0
    flutter: periodic -- listen-- 2-- 1
    flutter: periodic -- listen-- 2-- 2
    flutter: periodic -- listen-- 2-- 3
    flutter: periodic -- listen-- 2-- 4
    flutter: periodic -- listen-- 2-- 5
    ...
    

    這是創(chuàng)建一個指定間隔時間內(nèi)發(fā)出事件的單訂閱流嗽上。

  • value
    // 創(chuàng)建一個在完成前發(fā)出事件的單訂閱流
    void createSingleStream() {
        var stream = Stream.value("單值流");
        stream.listen((event) {
        print("value -- listen-- ${event.toString()}");
        }, onError: (e) {
        print("value -- onError");
        }, onDone: () {
        print("value -- onDone");
        }, cancelOnError: false);
    }
    

    日志輸出:

    flutter: value -- listen-- 單值流
    flutter: value -- onDone
    

    這是創(chuàng)建一個在完成前發(fā)出事件的單訂閱流。

  • eventTransformed
    /// 創(chuàng)建一個將現(xiàn)有流的所有事件通過接收器轉(zhuǎn)換通過管道發(fā)出
    void createevEntTransformed() {
        var stream = Stream.eventTransformed(Stream.fromIterable([1, 2]), (sink) {
        sink.add(2);
        return sink;
        });
    
        stream.listen((event) {
        print("eventTransformed -- listen-- ${event.toString()}");
        }, onError: (e) {
        print("eventTransformed -- onError");
        }, onDone: () {
        print("eventTransformed -- onDone");
        }, cancelOnError: false);
    }
    

    日志輸出:

    flutter: eventTransformed -- listen-- 2
    flutter: eventTransformed -- listen-- 1
    flutter: eventTransformed -- listen-- 2
    flutter: eventTransformed -- onDone
    

    這是創(chuàng)建一個將現(xiàn)有事件通過接收器轉(zhuǎn)換通過通道發(fā)出熄攘。

四兽愤、Stream 屬性

  • first

    first 是事件流的第一個元素,是 Futurt<T> 的對象。
    實(shí)現(xiàn)原理核心代碼是:

    Future<T> get first {
      _Future<T> future = new _Future<T>();
      StreamSubscription<T> subscription =
          this.listen(null, onError: future._completeError, onDone: () {
        try {
          throw IterableElementError.noElement();
        } catch (e, s) {
          _completeWithErrorCallback(future, e, s);
        }
      }, cancelOnError: true);
      subscription.onData((T value) {
        _cancelAndValue(subscription, future, value);
      });
      return future;
    }
    
    void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
      var cancelFuture = subscription.cancel();
      if (cancelFuture != null && !identical(cancelFuture, Future._nullFuture)) {
          cancelFuture.whenComplete(() => future._complete(value));
      } else {
          future._complete(value);
      }
    }
    

    上面解釋: 實(shí)現(xiàn)原理就是事件流發(fā)出第一個事件時 subscription.onData((T value) { _cancelAndValue(subscription, future, value); })浅萧,然后取消訂閱流subscription.cancel()逐沙。

    實(shí)例:

    void first() {
        var stream1 = Stream.value("first 測試");
        stream1.first.then((value) => print(value));
        var stream2 = Stream.fromIterable([1, 2, 3]);
        stream2.first.then((value) => print(value));
    }
    

    日志輸出:

    flutter: first 測試
    flutter: 1
    

    注意: 當(dāng)我們的流是單訂閱流時,調(diào)用 frist 之后洼畅,就不能被再次監(jiān)聽吩案。因?yàn)?stream.first 包含一次監(jiān)聽(源碼)。

  • isEmpty

    isEmpty 是判斷訂閱流是否為空帝簇。核心代碼如下:

    Future<bool> get isEmpty {
        _Future<bool> future = new _Future<bool>();
        StreamSubscription<T> subscription =
            this.listen(null, onError: future._completeError, onDone: () {
            future._complete(true);
        }, cancelOnError: true);
        subscription.onData((_) {
            _cancelAndValue(subscription, future, false);
        });
        return future;
    }
    

    上面代碼可知如果訂閱流能回調(diào) onDone 方法徘郭,則判定其不為空,否則為空丧肴。
    實(shí)例如下:

    void isEmpty() {
        var stream = Stream.empty();
        stream.isEmpty.then((value) => print(value));
    
        var stream1 = Stream.fromIterable(["發(fā)生錯誤"]);
        stream1.isEmpty.then((value) => print(value));
    
        var stream2 = Stream.multi((control) {
        control.add(1);
        });
        stream2.isEmpty.then((value) => print(value));
    }
    

    日志輸出:

    flutter: true
    flutter: false
    flutter: false
    
  • last

    last 是獲取訂閱流發(fā)出事件的最后一個残揉,是 Future<T> 的類型。核心代碼:

    Future<T> get last {
      _Future<T> future = new _Future<T>();
      late T result;
      bool foundResult = false;
      listen(
          (T value) {
            foundResult = true;
            result = value;
          },
          onError: future._completeError,
          onDone: () {
            if (foundResult) {
              future._complete(result);
              return;
            }
            try {
              throw IterableElementError.noElement();
            } catch (e, s) {
              _completeWithErrorCallback(future, e, s);
            }
          },
          cancelOnError: true);
      return future;
    }
    

    從上面代碼的 onDone 方法看出芋浮,這是把訂閱流的最后一個事件抱环,通過 Future 返回。

    注意:

    1. 如果訂閱流事件發(fā)生錯誤纸巷,這 Future 完成錯誤并停止處理镇草。
    2. 如果訂閱流是空,則 onDone 函數(shù)調(diào)用瘤旨,并拋出無事件的異常陶夜。

    實(shí)例:

    void last() {
      var stream = Stream.fromIterable([1, 2, 3]);
      stream.last.then((value) => print(value)); // 3
    }
    
  • length

    length 是獲取訂閱流發(fā)出事件的個數(shù)。核心代碼:

    Future<int> get length {
        _Future<int> future = new _Future<int>();
        int count = 0;
        this.listen(
            (_) {
              count++;
            },
            onError: future._completeError,
            onDone: () {
              future._complete(count);
            },
            cancelOnError: true);
        return future;
    }
    

    從上面代碼裆站,可以看到是通過聲明 count 來記錄流發(fā)出的事件条辟。

    實(shí)例:

    void length() {
      var stream = Stream.fromIterable([1, 2, 3]);
      stream.length.then((value) => print(value)); // 3
    }
    
  • single

    single 是獲取只能發(fā)出一次事件的訂閱流的事件。核心代碼:

    Future<T> get single {
      _Future<T> future = new _Future<T>();
      late T result;
      bool foundResult = false;
      StreamSubscription<T> subscription =
          this.listen(null, onError: future._completeError, onDone: () {
        if (foundResult) {
          future._complete(result);
          return;
        }
        try {
          throw IterableElementError.noElement();
        } catch (e, s) {
          _completeWithErrorCallback(future, e, s);
        }
      }, cancelOnError: true);
      subscription.onData((T value) {
        if (foundResult) {
          // This is the second element we get.
          try {
            throw IterableElementError.tooMany();
          } catch (e, s) {
            _cancelAndErrorWithReplacement(subscription, future, e, s);
          }
          return;
        }
        foundResult = true;
        result = value;
      });
      return future;
    }
    

    從上面代碼知道: 首先使用 foundResultonData 方法中為 false 跳過檢查宏胯,然后 foundResult 變?yōu)?trueresult 獲取發(fā)送事件羽嫡。在 onDone 方法中 foundResulttrue 完成 Future_complete方法結(jié)束。

    注意: 使用single 的訂閱流必須是只能包含一個事件的訂閱流肩袍,否則會拋出 tooMany 的異常杭棵。

    實(shí)例:

    void single() {
      var stream = Stream.fromIterable([1]);
      stream.single.then((value) => print(value));// 1
    }
    

五、Stream 的方法

  • any

    any 是檢查訂閱流中是否有符合test 條件的事件氛赐。核心代碼:

    Future<bool> any(bool test(T element)) {
      _Future<bool> future = new _Future<bool>();
      StreamSubscription<T> subscription =
          this.listen(null, onError: future._completeError, onDone: () {
        future._complete(false);
      }, cancelOnError: true);
      subscription.onData((T element) {
        _runUserCode(() => test(element), (bool isMatch) {
          if (isMatch) {
            _cancelAndValue(subscription, future, true);
          }
        }, _cancelAndErrorClosure(subscription, future));
      });
      return future;
    }
    
    void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
      var cancelFuture = subscription.cancel();
      if (cancelFuture != null && !identical(cancelFuture, Future._nullFuture)) {
        cancelFuture.whenComplete(() => future._complete(value));
      } else {
        future._complete(value);
      }
    }
    

    從上面代碼可以看到: 通過 onData 函數(shù)獲取 element 魂爪,然后經(jīng)過 _runUserCode 方法檢查并返回結(jié)果 isMatch ,如果 isMatchtrue 則調(diào)用 _cancelAndValue 方法取消訂閱 subscription.cancel() ,然后完成 Future艰管。

    注意: any 是檢查訂閱流中事件是否符合條件的事件滓侍,如果有則返回 true ,否則返回 false。any 只要檢查到有一個符合則檢查結(jié)束牲芋,后面的就不在檢查撩笆。

    實(shí)例:

    void any() {
      var stream = Stream.fromIterable([1, 2, 3]);
      stream.any((element) => element > 2).then((value) => print(value)); // true
    }
    
  • asBroadcastStream

    asBroadcastStream 是將一個單訂閱流轉(zhuǎn)換為一個廣播流捺球。核心代碼:

    Stream<T> asBroadcastStream(
        {void onListen(StreamSubscription<T> subscription)?,
        void onCancel(StreamSubscription<T> subscription)?}) {
      return new _AsBroadcastStream<T>(this, onListen, onCancel);
    }
    

    從上面代碼可知,原有的訂閱流經(jīng)過 asBroadcastStream 后夕冲,生成新的 _AsBroadcastStream 的訂閱流氮兵。

    實(shí)例代碼:

    void asBroadcastStream() {
      var stream = Stream.fromFuture(
        Future(() {
          return 110;
        }),
      );
    
      var stream1 = stream.asBroadcastStream();
      stream1
        ..listen((event) {
          print("event -- 1 -- $event");
        });
      stream1.listen((event) {
        print("event -- 2 -- $event");
      });
    }
    

    日志輸出:

    flutter: event -- 1 -- 110
    flutter: event -- 2 -- 110
    

    注意:上面的實(shí)例不能寫成下面形式:

    void asBroadcastStream() {
      var stream = Stream.fromFuture(
        Future(() {
          return 110;
        }),
      );
      stream.asBroadcastStream().listen((event) {
          print("event -- 1 -- $event");
        });
      stream.asBroadcastStream().listen((event) {
        print("event -- 2 -- $event");
      });
    }
    

    因?yàn)?asBroadcastStream 本身對 stream 就是訂閱,而stream 兩次 asBroadcastStream 就造成單訂閱多次被訂閱的錯誤歹鱼。

  • asyncExpand

    asyncExpand 是將一個訂閱流的事件全部轉(zhuǎn)化為一系列的異步事件的廣播流泣栈。核心代碼:

    Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) {
      _StreamControllerBase<E> controller;
      if (isBroadcast) {
        controller = _SyncBroadcastStreamController<E>(null, null);
      } else {
        controller = _SyncStreamController<E>(null, null, null, null);
      }
    
      controller.onListen = () {
        StreamSubscription<T> subscription = this.listen(null,
            onError: controller._addError, // Avoid Zone error replacement.
            onDone: controller.close);
        subscription.onData((T event) {
          Stream<E>? newStream;
          try {
            newStream = convert(event);
          } catch (e, s) {
            controller.addError(e, s);
            return;
          }
          if (newStream != null) {
            subscription.pause();
            controller.addStream(newStream).whenComplete(subscription.resume);
          }
        });
        controller.onCancel = subscription.cancel;
        if (!isBroadcast) {
          controller
            ..onPause = subscription.pause
            ..onResume = subscription.resume;
        }
      };
      return controller.stream;
    }
    

    從上面代碼可知,是將訂閱流事件經(jīng)過 asyncExpand 生成 _StreamControllerBase 對象,將 onData 獲取的數(shù)據(jù)生成新的Stream ,然后在由 _StreamControllerBase 通過 addStream 對外提供新的 Stream弥姻。

    實(shí)例代碼:

    void asyncExpand() {
      var stream = Stream.fromIterable([1, 2, 3]);
      stream
          .asyncExpand((event) => Stream.fromFuture(
                Future(() {
                  return event * 2;
                }),
              ))
          .listen((event) {
        print("asyncExpand-- $event");
      });
    
      var stream1 = Stream.fromIterable([1, 2, 3]).asBroadcastStream();
      var stream2 = stream1.asyncExpand((event) => Stream.fromFuture(
            Future(() {
              return event;
            }),
          ));
      stream2.listen((event) {
        print("asyncExpand--1- $event");
      });
      stream2.listen((event) {
        print("asyncExpand--2- $event");
      });
    }
    

    日志輸出:

    flutter: asyncExpand-- 2
    flutter: asyncExpand--1- 1
    flutter: asyncExpand--2- 1
    flutter: asyncExpand-- 4
    flutter: asyncExpand--1- 2
    flutter: asyncExpand--2- 2
    flutter: asyncExpand-- 6
    flutter: asyncExpand--1- 3
    flutter: asyncExpand--2- 3
    

    注意: 什么類型的訂閱流經(jīng)過 asyncExpand 生成對應(yīng)類型的訂閱流南片。

  • asyncMap

    asyncMap 是創(chuàng)建一個新流,并將該流的每個事件都轉(zhuǎn)化為一個新的異步事件蚁阳。核心代碼:

    Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) {
      _StreamControllerBase<E> controller;
      if (isBroadcast) {
        controller = _SyncBroadcastStreamController<E>(null, null);
      } else {
        controller = _SyncStreamController<E>(null, null, null, null);
      }
    
      controller.onListen = () {
        StreamSubscription<T> subscription = this.listen(null,
            onError: controller._addError, // Avoid Zone error replacement.
            onDone: controller.close);
        FutureOr<Null> add(E value) {
          controller.add(value);
        }
    
        final addError = controller._addError;
        final resume = subscription.resume;
        subscription.onData((T event) {
          FutureOr<E> newValue;
          try {
            newValue = convert(event);
          } catch (e, s) {
            controller.addError(e, s);
            return;
          }
          if (newValue is Future<E>) {
            subscription.pause();
            newValue.then(add, onError: addError).whenComplete(resume);
          } else {
            // TODO(40014): Remove cast when type promotion works.
            controller.add(newValue as dynamic);
          }
        });
        controller.onCancel = subscription.cancel;
        if (!isBroadcast) {
          controller
            ..onPause = subscription.pause
            ..onResume = resume;
        }
      };
      return controller.stream;
    }
    

    從上面代碼知道铃绒,訂閱流經(jīng)過 ansyMap 后生成新的 _StreamControllerBase 并返回 _StreamControllerBaseStream, 然后通過 訂閱流的 onData 方法獲取事件螺捐,然后經(jīng)過 convert 獲得 newValue 對象颠悬。然后在判斷 newValue 的類型,分別執(zhí)行不同的方法定血,最后通過 _StreamControllerBaseadd 方法進(jìn)行發(fā)出事件赔癌。

    實(shí)例代碼:

    void asyncMap() {
      var stream = Stream.fromIterable([1, 2]);
      stream.asyncMap((event) => event > 1).listen((event) {
        print("asyncMap -1- $event");
      });
    
      var stream1 = Stream.fromIterable([1, 2]);
      stream1
          .asyncMap((event) => Future(() {
                return event > 1;
              }))
          .listen((event) {
        print("asyncMap -2- $event");
      });
    }
    

    日志輸出:

    flutter: asyncMap -1- false
    flutter: asyncMap -1- true
    flutter: asyncMap -2- false
    flutter: asyncMap -2- true
    
  • cast

    cast 是將訂閱流轉(zhuǎn)化 Stream<R> 的訂閱流,作用是檢查訂閱流發(fā)出是事件是否是 R 類型澜沟。核心代碼:

    Stream<R> cast<R>() => Stream.castFrom<T, R>(this);
    
    static Stream<T> castFrom<S, T>(Stream<S> source) =>
        new CastStream<S, T>(source);
    
    class CastStream<S, T> extends Stream<T> {
      final Stream<S> _source;
      CastStream(this._source);
      bool get isBroadcast => _source.isBroadcast;
    
      StreamSubscription<T> listen(void Function(T data)? onData,
          {Function? onError, void Function()? onDone, bool? cancelOnError}) {
        return new CastStreamSubscription<S, T>(
            _source.listen(null, onDone: onDone, cancelOnError: cancelOnError))
          ..onData(onData)
          ..onError(onError);
      }
    
      Stream<R> cast<R>() => new CastStream<S, R>(_source);
    }
    

    實(shí)例代碼:

    void cast() {
      var stream = Stream.fromIterable([11, 22]);
      stream.cast<int>().listen((event) {
        print("cast--$event");
      });
    }
    

    日志輸出:

    flutter: cast--11
    flutter: cast--22
    

    注意: Streamcast 方法能夠檢查 Stream 發(fā)出事件的類型是否是指定類型灾票。

  • contains

    contains 是判斷訂閱流中事件是否有指定的事件, 返回一個 Future<bool> 對象茫虽。 核心代碼:

    Future<bool> contains(Object? needle) {
      _Future<bool> future = new _Future<bool>();
      StreamSubscription<T> subscription =
          this.listen(null, onError: future._completeError, onDone: () {
        future._complete(false);
      }, cancelOnError: true);
      subscription.onData((T element) {
        _runUserCode(() => (element == needle), (bool isMatch) {
          if (isMatch) {
            _cancelAndValue(subscription, future, true);
          }
        }, _cancelAndErrorClosure(subscription, future));
      });
      return future;
    }
    

    從上面代碼知刊苍,訂閱流經(jīng)過 onData 方法獲取 element 事件,然后又 _runUserCode 判斷 element 事件是否和指定的事件相等濒析,返回 isMatch, 如果 isMatchtrue 則有 _cancelAndValue 方法以 Future 返回正什。

    實(shí)例代碼:

    void contains() {
      var stream = Stream.fromIterable([2, 3, 4]);
      stream.contains(4).then((value) => print(value)); // true
    }
    
  • distinct

    distinct 是去除訂閱流中相同的事件只發(fā)出一次。核心代碼:

    Stream<T> distinct([bool equals(T previous, T next)?]) {
      return new _DistinctStream<T>(this, equals);
    }
    
    class _DistinctStream<T> extends _ForwardingStream<T, T> {
      static final _SENTINEL = new Object();
    
      final bool Function(T, T)? _equals;
    
      _DistinctStream(Stream<T> source, bool equals(T a, T b)?)
          : _equals = equals,
            super(source);
    
      StreamSubscription<T> _createSubscription(void onData(T data)?,
          Function? onError, void onDone()?, bool cancelOnError) {
        return new _StateStreamSubscription<Object?, T>(
            this, onData, onError, onDone, cancelOnError, _SENTINEL);
      }
    
      void _handleData(T inputEvent, _EventSink<T> sink) {
        var subscription = sink as _StateStreamSubscription<Object?, T>;
        var previous = subscription._subState;
        if (identical(previous, _SENTINEL)) {
          // First event. Cannot use [_equals].
          subscription._subState = inputEvent;
          sink._add(inputEvent);
        } else {
          T previousEvent = previous as T;
          var equals = _equals;
          bool isEqual;
          try {
            if (equals == null) {
              isEqual = (previousEvent == inputEvent);
            } else {
              isEqual = equals(previousEvent, inputEvent);
            }
          } catch (e, s) {
            _addErrorWithReplacement(sink, e, s);
            return;
          }
          if (!isEqual) {
            sink._add(inputEvent);
            subscription._subState = inputEvent;
          }
        }
      }
    }
    

    從上面代碼知号杏,訂閱流調(diào)用 distinct 方法生 _DistinctStream 訂閱流婴氮,然后 _DistinctStream 中創(chuàng)建 _StateStreamSubscription 的訂閱者,然后再有 _handleData 方法中首先使用 identical 方法規(guī)避第一次調(diào)用的判斷盾致,并將事件賦值給 _StateStreamSubscription_subState 屬性主经。下次 _handleData 調(diào)用經(jīng)過 equals 判斷得到 isEqual 對象,然后 isEqual 是否為 true 決定是否發(fā)出事件庭惜。

    實(shí)例代碼:

    void distinct() {
      var stream = Stream.fromIterable([1, 3, 3, 6]);
      stream.distinct().listen((event) {
        print("distinct -- $event");
      });
    }
    

    日志輸出:

    flutter: distinct -- 1
    flutter: distinct -- 3
    flutter: distinct -- 6
    
  • drain

    drain 是清除訂閱流所有的事件罩驻,自定義自己的數(shù)據(jù)事件以 Future 返回。核心代碼:

    Future<E> drain<E>([E? futureValue]) {
      if (futureValue == null) {
        futureValue = futureValue as E;
      }
      return listen(null, cancelOnError: true).asFuture<E>(futureValue);
    }
    

    從上面代碼知道訂閱流經(jīng)過 drain 方法蜈块,將 StreamSubscription 通過 asFuture 轉(zhuǎn)化為 Future 對象并帶 futureValue 初始值鉴腻。

    實(shí)例代碼:

    void drain() {
      var stream = Stream.fromIterable([11, 33, 44]);
      stream.drain(2).then((value) => print(value)); // 2
    }
    
  • elementAt

    elementAt 是獲取訂閱流中第 index 事件迷扇, 返回一個 Future 對象百揭。核心代碼:

    Future<T> elementAt(int index) {
      RangeError.checkNotNegative(index, "index");
      _Future<T> result = new _Future<T>();
      int elementIndex = 0;
      StreamSubscription<T> subscription;
      subscription =
          this.listen(null, onError: result._completeError, onDone: () {
        result._completeError(
            new RangeError.index(index, this, "index", null, elementIndex),
            StackTrace.empty);
      }, cancelOnError: true);
      subscription.onData((T value) {
        if (index == elementIndex) {
          _cancelAndValue(subscription, result, value);
          return;
        }
        elementIndex += 1;
      });
    
      return result;
    }
    

    從上面代碼知:訂閱流經(jīng)過 elementAt(int index) 方法爽哎,首先調(diào)用 checkNotNegative 檢查 index 的值是否小于零;然后生成 subscription 訂閱對象和elementIndex 記錄廣播次數(shù) 器一,然后在 onData 方法中判斷 index 是否和 elementIndex 相等课锌。如果相等,再有 _cancelAndValue 取消訂閱對象祈秕,然后調(diào)用 onDone 方法渺贤,返回 Future 對象。

    實(shí)例代碼:

    void elementAt() {
      var stream = Stream.fromIterable([11, 33, 44]);
      stream.elementAt(2).then((value) => print(value)); // 44
    }
    
  • every

    every 是判斷訂閱流事件是否符合指定的條件请毛,結(jié)果以 Future<bool> 返回志鞍。核心代碼:

    Future<bool> every(bool test(T element)) {
      _Future<bool> future = new _Future<bool>();
      StreamSubscription<T> subscription =
          this.listen(null, onError: future._completeError, onDone: () {
        future._complete(true);
      }, cancelOnError: true);
      subscription.onData((T element) {
        _runUserCode(() => test(element), (bool isMatch) {
          if (!isMatch) {
            _cancelAndValue(subscription, future, false);
          }
        }, _cancelAndErrorClosure(subscription, future));
      });
      return future;
    }
    

    從上面代碼知:訂閱流調(diào)用every 函數(shù),生成 subscriptionfuture 對象方仿。然后 subscriptiononData 方法中調(diào)用 _runUserCode 方法檢查是否符合條件固棚,并返回isMatch,然后在根據(jù) isMatchfalse 調(diào)用 _cancelAndValue 方法取消 subscription 取消監(jiān)聽仙蚜,以 Future<bool> 返回結(jié)果此洲。

    實(shí)例代碼:

    void every() {
      var stream = Stream.fromIterable([110, 220, 330]);
      stream.every((element) => element > 100).then((value) => print(value)); // true
    }
    
  • expand

    expand 是將訂閱流事件擴(kuò)展為事件序列。核心代碼:

    Stream<S> expand<S>(Iterable<S> convert(T element)) {
      return new _ExpandStream<T, S>(this, convert);
    }
    
    class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
      final _Transformation<S, Iterable<T>> _expand;
    
      _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
          : this._expand = expand,
            super(source);
    
      void _handleData(S inputEvent, _EventSink<T> sink) {
        try {
          for (T value in _expand(inputEvent)) {
            sink._add(value);
          }
        } catch (e, s) {
          // If either _expand or iterating the generated iterator throws,
          // we abort the iteration.
          _addErrorWithReplacement(sink, e, s);
        }
      }
    }
    

    從上面代碼知道委粉,訂閱流調(diào)用 expand 方法生成 _ExpandStream 訂閱流呜师,然后_ExpandStream_handleData 方法中通過 for...in... 通過 sink 將事件發(fā)出。

    實(shí)例代碼:

    void expand() {
      var stream = Stream.fromIterable([110, 220, 330]);
      stream.expand((element) => [element, element * 2]).listen((event) {
        print(event);
      });
    }
    

    日志輸出:

    flutter: 110
    flutter: 220
    flutter: 220
    flutter: 440
    flutter: 330
    flutter: 660
    
  • map

    map 是將訂閱流轉(zhuǎn)化為另一種訂閱流贾节。核心代碼:

    class _MapStream<S, T> extends _ForwardingStream<S, T> {
      final _Transformation<S, T> _transform;
    
      _MapStream(Stream<S> source, T transform(S event))
          : this._transform = transform,
            super(source);
    
      void _handleData(S inputEvent, _EventSink<T> sink) {
        T outputEvent;
        try {
          outputEvent = _transform(inputEvent);
        } catch (e, s) {
          _addErrorWithReplacement(sink, e, s);
          return;
        }
        sink._add(outputEvent);
      }
    }
    

    從上面代碼知: 訂閱流通過 map 生成 _MapStream 對象汁汗,然后有 _MapStream_handleData 方法中經(jīng)過 _Transformation 將結(jié)果給 outputEvent,最后通過 sink_add 方法發(fā)出 outputEvent 事件。

    實(shí)例代碼:

    void map() {
      var stream = Stream.fromIterable([110, 220, 330]);
      stream.map((event) => event > 200).listen((event) {
        print(event);
      });
    }
    

    日志輸出:

    flutter: false
    flutter: true
    flutter: true
    
  • skip(int count)

    skip 是跳過 count 次訂閱流中數(shù)據(jù)事件栗涂。核心代碼:

    class _SkipStream<T> extends _ForwardingStream<T, T> {
      final int _count;
    
      _SkipStream(Stream<T> source, int count)
          : this._count = count,
            super(source) {
        // This test is done early to avoid handling an async error
        // in the _handleData method.
        RangeError.checkNotNegative(count, "count");
      }
    
      StreamSubscription<T> _createSubscription(void onData(T data)?,
          Function? onError, void onDone()?, bool cancelOnError) {
        return new _StateStreamSubscription<int, T>(
            this, onData, onError, onDone, cancelOnError, _count);
      }
    
      void _handleData(T inputEvent, _EventSink<T> sink) {
        var subscription = sink as _StateStreamSubscription<int, T>;
        int count = subscription._subState;
        if (count > 0) {
          subscription._subState = count - 1;
          return;
        }
        sink._add(inputEvent);
      }
    }
    

    從上面代碼知知牌,事件源經(jīng)過 skip 函數(shù)生成 _SkipStream事件源; _SkipStream 中生成 StreamSubscription 訂閱對象;再有 _handleData 方法中通過 subscription._subState 獲取指定跳過的次數(shù)判斷是否大于零戴差,直至subscription._subState 小于零送爸,然后通過sink._add(inputEvent) 發(fā)出后續(xù)數(shù)據(jù)事件。

    實(shí)例代碼:

    void skip() {
      var stream = Stream.fromIterable([11, 22, 33]);
      stream.skip(2).listen((event) {
        print(event); // 33
      });
    }
    

    注意: 跳過次數(shù) count 暖释,只能大于等于零; count 可以大于事件源中發(fā)送事件的個數(shù)袭厂。

  • toList

    toList 是將事件源發(fā)出的所有數(shù)據(jù)事件存放到 List 中,并以 Future 形式返回球匕。核心代碼:

    Future<List<T>> toList() {
      List<T> result = <T>[];
      _Future<List<T>> future = new _Future<List<T>>();
      this.listen(
          (T data) {
            result.add(data);
          },
          onError: future._completeError,
          onDone: () {
            future._complete(result);
          },
          cancelOnError: true);
      return future;
    }
    

    從上面知道纹磺,事件源由 toList 創(chuàng)建 List<T> result 數(shù)組,然后事件源訂閱將發(fā)出事件存放到 List<T> result 中亮曹,然后在 onDone 方法中以 Future 返回橄杨。

    實(shí)例代碼:

    void toList() {
      var stream = Stream.fromIterable([11, 22, 33]);
      stream.toList().then((value) => print(value)); // [11,22,33]
    }
    
  • toSet

    toSet 是將事件源發(fā)出的事件排出重復(fù)的添加到 Set 中秘症。核心代碼:

    Future<Set<T>> toSet() {
      Set<T> result = new Set<T>();
      _Future<Set<T>> future = new _Future<Set<T>>();
      this.listen(
          (T data) {
            result.add(data);
          },
          onError: future._completeError,
          onDone: () {
            future._complete(result);
          },
          cancelOnError: true);
      return future;
    }
    

    從上面代碼知:發(fā)出事件的排重是有Set 完成。

    實(shí)例代碼:

    void toSet() {
      var stream = Stream.fromIterable([11, 22, 33, 33]);
      stream.toSet().then((value) => print(value)); // {11,22,33}
    }
    
  • take(int count)

    take 是獲取事件源中 count 次的事件并生成新的事件源式矫。核心代碼:

    class _TakeStream<T> extends _ForwardingStream<T, T> {
      final int _count;
    
      _TakeStream(Stream<T> source, int count)
          : this._count = count,
            super(source);
    
      StreamSubscription<T> _createSubscription(void onData(T data)?,
          Function? onError, void onDone()?, bool cancelOnError) {
        if (_count == 0) {
          _source.listen(null).cancel();
          return new _DoneStreamSubscription<T>(onDone);
        }
        return new _StateStreamSubscription<int, T>(
            this, onData, onError, onDone, cancelOnError, _count);
      }
    
      void _handleData(T inputEvent, _EventSink<T> sink) {
        var subscription = sink as _StateStreamSubscription<int, T>;
        int count = subscription._subState;
        if (count > 0) {
          sink._add(inputEvent);
          count -= 1;
          subscription._subState = count;
          if (count == 0) {
            // Closing also unsubscribes all subscribers, which unsubscribes
            // this from source.
            sink._close();
          }
        }
      }
    }
    

    從上面代碼可知乡摹,事件源經(jīng)過take生成 _TakeStream事件源,然后由 _handleData 方法獲取指定次數(shù)事件采转,經(jīng) sink._add(inputEvent) 發(fā)出聪廉。

    實(shí)例代碼:

    void take() {
      var stream = Stream.fromIterable([22, 22, 33, 33, 22, 11, 33, 11]);
      stream.take(2).listen((event) {
        print(event); //22 22
      });
    }
    
  • reduce

    reduce 是減少事件源發(fā)出事件,將事件組合將結(jié)果以Future 輸出故慈。核心代碼:

    Future<T> reduce(T combine(T previous, T element)) {
      _Future<T> result = new _Future<T>();
      bool seenFirst = false;
      late T value;
      StreamSubscription<T> subscription =
          this.listen(null, onError: result._completeError, onDone: () {
        if (!seenFirst) {
          try {
            // Throw and recatch, instead of just doing
            //  _completeWithErrorCallback, e, theError, StackTrace.current),
            // to ensure that the stackTrace is set on the error.
            throw IterableElementError.noElement();
          } catch (e, s) {
            _completeWithErrorCallback(result, e, s);
          }
        } else {
          result._complete(value);
        }
      }, cancelOnError: true);
      subscription.onData((T element) {
        if (seenFirst) {
          _runUserCode(() => combine(value, element), (T newValue) {
            value = newValue;
          }, _cancelAndErrorClosure(subscription, result));
        } else {
          value = element;
          seenFirst = true;
        }
      });
      return result;
    }
    

    從上面代碼知事件源有reduce 生成 StreamSubscription<T> subscription 對象板熊,在 onData 中有combine 將結(jié)果合并為 newValue 并賦值給 valueFuture 返回。

    實(shí)例代碼:

    void reduce() {
      var stream = Stream.fromIterable([11, 22, 33, 33]);
      stream
          .reduce((previous, element) => previous + element)
          .then((value) => print(value)); // 99
    }
    
  • join

    join 將事件源發(fā)出的事件以字符串的形式按指定字符竄拼接以 Futuren 輸出察绷。核心代碼:

    Future<String> join([String separator = ""]) {
      _Future<String> result = new _Future<String>();
      StringBuffer buffer = new StringBuffer();
      bool first = true;
      StreamSubscription<T> subscription =
          this.listen(null, onError: result._completeError, onDone: () {
        result._complete(buffer.toString());
      }, cancelOnError: true);
      subscription.onData(separator.isEmpty
          ? (T element) {
              try {
                buffer.write(element);
              } catch (e, s) {
                _cancelAndErrorWithReplacement(subscription, result, e, s);
              }
            }
          : (T element) {
              if (!first) {
                buffer.write(separator);
              }
              first = false;
              try {
                buffer.write(element);
              } catch (e, s) {
                _cancelAndErrorWithReplacement(subscription, result, e, s);
              }
            });
      return result;
    }
    

    從上面代碼知干签,事件源經(jīng)過 join 函數(shù)生成 resultbuffer拆撼、subscription 容劳。由 onData 方法將事件寫入到 buffer 中,然后在 onDone 中將 buffer.toString() 的結(jié)果以Future 返回情萤。

    實(shí)例代碼:

    void join() {
      var stream = Stream.fromIterable([11, 22, 33, 33]);
      stream.join("-").then((value) => print(value)); // 11-22-33-33
    }
    
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
禁止轉(zhuǎn)載鸭蛙,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。
  • 序言:七十年代末筋岛,一起剝皮案震驚了整個濱河市娶视,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌睁宰,老刑警劉巖肪获,帶你破解...
    沈念sama閱讀 218,640評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異柒傻,居然都是意外死亡孝赫,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,254評論 3 395
  • 文/潘曉璐 我一進(jìn)店門红符,熙熙樓的掌柜王于貴愁眉苦臉地迎上來青柄,“玉大人,你說我怎么就攤上這事预侯≈驴” “怎么了?”我有些...
    開封第一講書人閱讀 165,011評論 0 355
  • 文/不壞的土叔 我叫張陵萎馅,是天一觀的道長双戳。 經(jīng)常有香客問我,道長糜芳,這世上最難降的妖魔是什么飒货? 我笑而不...
    開封第一講書人閱讀 58,755評論 1 294
  • 正文 為了忘掉前任魄衅,我火速辦了婚禮,結(jié)果婚禮上塘辅,老公的妹妹穿的比我還像新娘晃虫。我一直安慰自己,他們只是感情好莫辨,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,774評論 6 392
  • 文/花漫 我一把揭開白布傲茄。 她就那樣靜靜地躺著毅访,像睡著了一般沮榜。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上喻粹,一...
    開封第一講書人閱讀 51,610評論 1 305
  • 那天蟆融,我揣著相機(jī)與錄音,去河邊找鬼守呜。 笑死型酥,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的查乒。 我是一名探鬼主播弥喉,決...
    沈念sama閱讀 40,352評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼玛迄!你這毒婦竟也來了由境?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,257評論 0 276
  • 序言:老撾萬榮一對情侶失蹤蓖议,失蹤者是張志新(化名)和其女友劉穎虏杰,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體勒虾,經(jīng)...
    沈念sama閱讀 45,717評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡纺阔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,894評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了修然。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片笛钝。...
    茶點(diǎn)故事閱讀 40,021評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖愕宋,靈堂內(nèi)的尸體忽然破棺而出玻靡,到底是詐尸還是另有隱情,我是刑警寧澤掏婶,帶...
    沈念sama閱讀 35,735評論 5 346
  • 正文 年R本政府宣布啃奴,位于F島的核電站,受9級特大地震影響雄妥,放射性物質(zhì)發(fā)生泄漏最蕾。R本人自食惡果不足惜依溯,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,354評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望瘟则。 院中可真熱鬧黎炉,春花似錦、人聲如沸醋拧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,936評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽丹壕。三九已至庆械,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間菌赖,已是汗流浹背缭乘。 一陣腳步聲響...
    開封第一講書人閱讀 33,054評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留琉用,地道東北人堕绩。 一個月前我還...
    沈念sama閱讀 48,224評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像邑时,于是被迫代替她去往敵國和親奴紧。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,974評論 2 355