Stream的生成
1拾给、從零開始創(chuàng)建Stream
創(chuàng)建一個Stream可以通過異步生成器(async*)函數(shù)。當異步生成器函數(shù)被調用時會創(chuàng)建一個 Stream兔沃,而函數(shù)體則會在該 Stream 被監(jiān)聽時開始運行蒋得。當函數(shù)返回時,Stream 關閉乒疏。在函數(shù)返回前额衙,你可以使用 yield 或 yield 語句向該 Stream 提交事件。
下面是一個周期性發(fā)送整數(shù)的函數(shù)例子:
void main() {
var duration = Duration(seconds: 3);
var stream = timedCounter(duration, 10);
stream.listen((event) {
print(event);
});
}
Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
var i = 0;
while(true) {
await Future.delayed(interval);
yield i++;
if(i == maxCount) break;
}
}
2怕吴、轉換現(xiàn)有的Stream
我們在創(chuàng)建 Stream 時常見的情形是根據(jù)現(xiàn)有 Stream 的事件創(chuàng)建一個新的 Stream窍侧。比如你已經有了一個可以提供字節(jié)事件的 Stream,然后你想將該 Stream 變?yōu)橐粋€可以提供字符串的 Stream转绷,并且該 Stream 中的字符串還經過 UTF-8 編碼伟件。對于這種情況,常用的辦法是創(chuàng)建一個新的 Stream 去等待獲取原 Stream 的事件暇咆,然后再將新 Stream 中的事件輸出锋爪。例如:
/// 將連續(xù)的字符串 Stream 拆分為行。
///
/// 輸入的字符串來自于"源" Stream 并以較小的 chunk 塊提供爸业。
Stream<String> lines(Stream<String> source) async* {
// 存儲從上一個數(shù)據(jù)塊中分離出的字符串行其骄。
var partial = '';
// 等到新的數(shù)據(jù)塊可用時開始處理。
await for (var chunk in source) {
var lines = chunk.split('\n');
lines[0] = partial + lines[0]; // 追加拼接行扯旷。
partial = lines.removeLast(); // 刪除剩余不完整的行拯爽。
for (var line in lines) {
yield line; // 將分離的每個字符串行添加至輸出 Stream。
}
}
// 最后如果最終的字符串行不為空則將其添加至輸出流钧忽。
if (partial.isNotEmpty) yield partial;
}
3毯炮、使用 StreamController
如果你 Stream 的事件不僅來自于異步函數(shù)可以遍歷的 Stream 和 Future,還來自于你程序的不同部分耸黑,這種情況使用上述兩種方式生成 Stream 就顯得比較困難桃煎。面對這種情況,我們可以使用一個 StreamController 來創(chuàng)建和填充 Stream大刊。
StreamController 可以為你生成一個 Stream为迈,并提供在任何時候、任何地方將事件添加到該 Stream 的方法缺菌。該 Stream 具有處理監(jiān)聽器和暫停所需的所有邏輯葫辐。控制器對象你可以自行處理而只需返回調用者所需的 Stream 即可伴郁。
void main() {
var duration = Duration(seconds: 3);
var stream = timedCounter(duration, 10);
stream.listen((event) {
print(event);
});
}
Stream<int> timedCounter(Duration interval, [int maxCount]) {
var controller = StreamController<int>();
var counter = 0;
Timer timer;
void tick(Timer timer) {
counter++;
controller.add(counter); // 請求 Stream 將計數(shù)器值作為事件發(fā)送耿战。
if (maxCount != null && counter >= maxCount) {
timer.cancel();
controller.close(); // 請求 Stream 關閉并告知監(jiān)聽器。
}
}
void startTimer() {
timer = Timer.periodic(interval, tick);
}
void stopTimer() {
if(timer != null) {
timer.cancel();
timer = null;
}
}
controller = StreamController<int> (
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer,
);
return controller.stream;
}
不通過async*函數(shù)創(chuàng)建Stream時焊傅,請務必牢記以下幾點:
使用同步控制器時要小心剂陡。例如狈涮,使用
StreamController(sync: true)
構造方法創(chuàng)建控制器。當你發(fā)送一個事件到一個未暫停的同步控制器(例如:使用 EventSink 中定義的add()
鸭栖、addError()
或close()
方法)薯嗤,事件立即發(fā)送給所有 Stream 的監(jiān)聽器。在添加監(jiān)聽器的代碼返回之前纤泵,決不能調用Stream
監(jiān)聽器,而在錯誤的事件使用同步控制器會破壞該規(guī)則并導致其它正常代碼執(zhí)行失敗镜粤。因此捏题,你應該避免使用同步控制器。如果你使用 StreamController肉渴, onListen 回調會在 listen 方法調用返回 StreamSubscription 前返回公荧。不要讓 onListen 回調依賴于已經存在的訂閱。例如同规,在下面的代碼中循狰,onListen 回調有可能會在 subscription 變量被初始化為一個有效值之前被觸發(fā)(同時 處理器 被調用)
當 Stream 的監(jiān)聽器狀態(tài)改變時,由 StreamController 定義的 onListen券勺、onPause绪钥、onResume 和 onCancel 回調會被調用,該調用絕不會發(fā)生在事件生成時或在某個狀態(tài)變化處理回調的調用期間关炼。在這些情況出現(xiàn)時程腹,狀態(tài)變化的回調會被延遲,直到上一個回調執(zhí)行完成儒拂。
不要嘗試自己去實現(xiàn) Stream 接口寸潦。否則很容易在事件、回調以及添加和移除監(jiān)聽器這些操作交互時出現(xiàn)一些難以察覺的錯誤社痛。你應該總是使用一個現(xiàn)有的 Stream(比如由 StreamController 生成的)去實現(xiàn)新 Stream 中 listen 方法的調用见转。
盡管你可以通過擴展 Stream 類并實現(xiàn) listen 方法來實現(xiàn)更多額外的功能,但一般不建議這么做蒜哀,因為這樣會引入一個調用者必須考慮的新類型斩箫。相反,你可以創(chuàng)建一個(或多個)具有 Stream 的類而不是一個(或多個)Stream凡怎。