Stream 介紹
Stream
和Future
類似濒生,但是Future
對應(yīng)的是一個item
的狀態(tài)的變化,而Stream
則是類似于iterator
罪治,在結(jié)束之前能夠得到多個值【跻澹或者我們可以簡單的理解為雁社,Stream
是由一系列的Future
組成晒骇,我們可以從Stream
讀取各個Future
的結(jié)果,直到Stream
結(jié)束洪囤。
Stream trait定義
定義如下:
trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker)
-> Poll<Option<Self::Item>>;
}
poll_next
函數(shù)有三種可能的返回值,分別如下:
-
Poll::Pending
說明下一個值還沒有就緒瘤缩,仍然需要等待。 -
Poll::Ready(Some(val))
已經(jīng)就緒剥啤,成功返回一個值,程序可以通過調(diào)用poll_next
再獲取下一個值府怯。 -
Poll::Ready(None)
表示Stream
已經(jīng)結(jié)束,不應(yīng)該在調(diào)用poll_next
富腊。
迭代
和同步的Iterator
類似,Stream
可以迭代處理其中的值赘被,如使用map, filter, fold, try_map, try_filter, and try_fold
等。但是Stream
不支持使用for
民假,而while let
和 next/try_next
則是允許的。 例子如下:
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
use futures::stream::StreamExt; // for `next`
let mut sum = 0;
while let Some(item) = stream.next().await {
sum += item;
}
sum
}
async fn sum_with_try_next(
mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
use futures::stream::TryStreamExt; // for `try_next`
let mut sum = 0;
while let Some(item) = stream.try_next().await? {
sum += item;
}
Ok(sum)
}
并發(fā)
上面的使用的迭代處理羊异,如果我們要并發(fā)的處理流彤断,則應(yīng)該使用for_each_concurrent
和 try_for_each_concurrent
,示例如下:
async fn jump_around(
mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
const MAX_CONCURRENT_JUMPERS: usize = 100;
stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
jump_n_times(num).await?;
report_n_jumps(num).await?;
Ok(())
}).await?;
Ok(())
}
參考資料
Rust異步編程