為什么要用 Futures
很多語言都提供了 promises屯换,futures 的支持驻仅。他們能讓開發(fā)者在面對并發(fā),異步等問題的時候摆舟,能直接寫出更加簡單優(yōu)雅的同步邏輯代碼亥曹,而不用在處理復(fù)雜的 callback 嵌套以及充斥在各地的被 callback 拆散的代碼邏輯。
在 Rust 里面恨诱,應(yīng)該很多人用 mio 編寫過網(wǎng)絡(luò)程序媳瞪。雖然 mio 是一個非常棒的庫,并且 TiKV 也使用 mio 處理網(wǎng)絡(luò)照宝,事件邏輯等蛇受,但 mio 太底層,我們?nèi)匀恍枰P(guān)注 async I/O厕鹃,需要寫很多 callback兢仰,這直接導(dǎo)致了代碼的復(fù)雜,邏輯的分散剂碴。
幸運的是把将,Rust 提供了 futures 庫來提供對 promises 和 futures 的支持,它的核心在于抽象一個 Future trait汗茄,提供 zero-cost 的抽象秸弛,讓上層邏輯自由的去實現(xiàn)組合。同時,Rust 也提供 tokio 組件递览,來簡化 mio 的編程工作叼屠,為 async I/O 提供了一站式的解決方案。
所以绞铃,后續(xù)對 futures 的說明镜雨,我們都會基于 tokio 組件來舉例。
Hello World
通常儿捧,第一個例子都是 Hello World荚坞。這里,我們會用 tokio-core 來實現(xiàn)一個簡單的 client菲盾,先跟遠端的 server 建立連接颓影,給 Server 發(fā)送 Hello World,并接受 Server 的返回懒鉴。
extern crate futures;
extern crate tokio_core;
use std::net::ToSocketAddrs;
use futures::Future;
use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
fn main() {
let mut core = Core::new().unwrap();
let addr = "127.0.0.1:8080".to_socket_addrs().unwrap().next().unwrap();
let socket = TcpStream::connect(&addr, &core.handle());
let request = socket.and_then(|socket| {
tokio_core::io::write_all(socket, "Hello World".as_bytes())
});
let response = request.and_then(|(socket, _)| {
tokio_core::io::read_to_end(socket, Vec::new())
});
let (_, data) = core.run(response).unwrap();
println!("{}", String::from_utf8_lossy(&data));
}
這里詳細說明一下:
- 我們使用
Core::new()
創(chuàng)建了一個 EventLoop - 得到 EventLoop 的 handle诡挂,方便讓外面的對象綁定到這個 EventLoop 上面
- 使用
TcpStream::connect()
連接 server,這里需要注意临谱,雖然我們調(diào)用了 connect璃俗,但是這里只是返回了一個 future,實際的 socket 建立會在后面進行悉默。 - 然后我們通過
and_then
來組合后續(xù)的操作城豁,and_then
表明如果當(dāng)前的 future 執(zhí)行成功,就會開始執(zhí)行下一個 future抄课。在上面唱星,我們通過and_then
來分別生成一個 request future,以及更后面的 response future剖膳。 - 最后魏颓,我們在
run
里面開始執(zhí)行 response future岭辣,它會從開始的 future 執(zhí)行吱晒,直到最后的 future 完成。
當(dāng) connect
成功之后沦童,就會執(zhí)行write_all
仑濒,當(dāng) write_all
執(zhí)行成功之后,就會執(zhí)行 read_to_end
偷遗,最后墩瞳,得到結(jié)果并輸出∈贤悖可以看到喉酌,上面雖然各個邏輯是異步的,但我們通過 future,將其轉(zhuǎn)成了同步的代碼泪电,并且通過 and_then
將各個 future 給串聯(lián)起來般妙。
Future
上面只是一個簡單的例子,實際 future 能做的更多相速。在繼續(xù)之前碟渺,我們來看看 futures 的核心 trait,F(xiàn)uture突诬。
Future 的關(guān)鍵定義如下:
trait Future {
type Item;
type Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
// ...
}
Future 內(nèi)部定義了兩個關(guān)聯(lián)類型苫拍,Item 和 Error,極大的方便了用戶自定義 Future旺隙。
Future 最關(guān)鍵的函數(shù)是 poll绒极,它會檢查當(dāng)前 future 的狀態(tài),看時候已經(jīng) ready蔬捷,能對外提供服務(wù)集峦,或者出現(xiàn)了錯誤。
poll
返回 Poll<Self::Item, Self::Error>
抠刺,Poll 是一個 typedef塔淤,定義如下:
pub type Poll<T, E> = Result<Async<T>, E>;
pub enum Async<T> {
Ready(T),
NotReady,
}
對于 Async,我們知道:
-
Ready(T)
表明這個 Future 已經(jīng)完成速妖,T 就是該 Future 的返回值 -
NotReady
表明這個 Future 并沒有 ready高蜂,我們需要在后續(xù)再次調(diào)用 poll
在實現(xiàn)自己的 future 的時候,我們需要注意罕容,因為 future 多數(shù)都會跟 event loop 一起使用备恤,所以 poll
一定不能 block 整個 event loop。如果 poll
有耗時的操作锦秒,我們需要將這些操作放在其他的線程去執(zhí)行露泊,然后在后續(xù)返回結(jié)果。
如果 poll
返回 NotReady
旅择,表明這個 Future 并沒有完成惭笑,我們需要知道何時再次調(diào)用這個 future 的 poll
。所以生真, 一個 future 需要給當(dāng)前的 task 注冊一個通知沉噩,當(dāng)這個 future 的值已經(jīng) ready,task 會收到這個通知然后讓 future 繼續(xù)執(zhí)行柱蟀。關(guān)于 task川蒙,我們后面在繼續(xù)討論。
Stream
上面我們說了 Future长已,在 futures 庫里面另一個重要的 trait 就是 Stream畜眨。在 Future 里面昼牛,關(guān)鍵的 poll
函數(shù)其實處理的是一個值的情況,但有些時候康聂,我們需要處理連續(xù)流式的值匾嘱,譬如對于一個 TCP Listener 來說,它會持續(xù)的通過 accept
產(chǎn)生新的客戶端連接早抠。對于流式的處理霎烙,我們使用 Stream trait。
trait Stream {
type Item;
type Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
}
可以看到蕊连,Stream trait 跟 Future 差不多悬垃,最大的區(qū)別在于 poll
返回的是一個 Option<Self::Item>
,而不是 Self::Item
甘苍。
如果一個 Stream 結(jié)束了尝蠕,poll
會返回 Ready(None)
,后續(xù)對于該 Stream 的錯誤調(diào)用都會 panic载庭。
Stream 也是一個特殊的 Future看彼,我們可以使用 into_future
函數(shù)將 Stream 轉(zhuǎn)成一個 Future,這樣外面就能使用 Future 的 combinator(譬如 and_then
囚聚,combinator 會在后續(xù)討論)將 Stream 與其他的 Future 連接起來靖榕。
Stream Example
前面我們舉了 Future 的一個例子,這里我們對 Stream 舉例顽铸,實現(xiàn)一個簡單的 echo
服務(wù)茁计。
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&addr, &handle).unwrap();
let done = socket.incoming().for_each(move |(socket, _addr)| {
let (reader, writer) = socket.split();
let amt = copy(reader, writer);
handle.spawn(amt.then(move |_| {
Ok(())
}));
Ok(())
});
core.run(done).unwrap();
這里詳細說明一下:
- 通過
TcpListener::bind
創(chuàng)建一個 listener。 - 通過
incoming
得到一個 Stream谓松,該 Stream 的 Item 是一個 tuple(TcpStream, SocketAddr)
星压。當(dāng)我們處理incoming
的 Stream 的時候,其實就類似循環(huán)調(diào)用這個 listener 的accept
函數(shù)鬼譬。 - 然后我們使用
for_each
用來處理上面的 Stream娜膘。for_each
會返回一個 Future 對象,然后讓外面的的 Core 去執(zhí)行优质。因為上面的例子中竣贪,TCPListener 會持續(xù)接受連接,所以run
不會結(jié)束盆赤。
這里我們在說明一下 for_each
接受一個 Stream 的 Item 之后贾富,如何處理的歉眷。
- 使用
split
函數(shù)將得到 socket 的 reader 和 writer牺六。也就是我們能通過 reader 讀 socket 的數(shù)據(jù),通過 writer 往 socket 里面寫數(shù)據(jù)汗捡。 - 使用
copy
生成一個 Future淑际,copy
會將 reader 的里面讀出來的數(shù)據(jù)畏纲,直接寫到 writer 里面。 - 使用
handle.spawn
將copy
生成的 Future 綁定到 event loop 上面春缕。這樣我們才能同時處理多個客戶端的請求盗胀,而不會阻塞 event loop。
這里重點注意下 handle.spawn
锄贼,編寫過 async I/O 的同學(xué)應(yīng)該都知道 socket 都是非阻塞的票灰,也就是,我們不可能通過一次 read 或者 write 就將這個 socket 的數(shù)據(jù)全部處理完成宅荤。所以屑迂,我們后續(xù)還會重新將 socket 給 register 到 event loop,這樣當(dāng)有新的事件的時候冯键,event loop 會重新調(diào)用對應(yīng)的回調(diào)函數(shù)惹盼。
如果我們直接用 mio,代碼是比較難寫的惫确,因為要處理很多異步邏輯情況手报,但現(xiàn)在我們僅僅只需要 spawn
,傳入一個 Future改化,當(dāng)這個 Future 完成的時候就會調(diào)用對應(yīng)的閉包函數(shù)掩蛤。我們現(xiàn)在僅僅需要關(guān)注的是 Future 如何編寫,傳遞陈肛,而不再需要關(guān)注回調(diào)怎么寫了盏档。上面的例子,當(dāng) copy
這個 Future 完成燥爷,我們什么都不干返回(當(dāng)然也可以打印一點東西…)蜈亩,寫起來容易了太多。
小結(jié)
這里僅僅是簡單了介紹了 Future 和 Stream前翎,后面筆者還是深入研究相關(guān)的東西稚配,畢竟我們也會在 TiKV
項目里面使用。