學(xué)習(xí) Rust Futures - Future 和 Stream

為什么要用 Futures

很多語言都提供了 promises屯换,futures 的支持驻仅。他們能讓開發(fā)者在面對并發(fā),異步等問題的時候摆舟,能直接寫出更加簡單優(yōu)雅的同步邏輯代碼亥曹,而不用在處理復(fù)雜的 callback 嵌套以及充斥在各地的被 callback 拆散的代碼邏輯。

在 Rust 里面恨诱,應(yīng)該很多人用 mio 編寫過網(wǎng)絡(luò)程序媳瞪。雖然 mio 是一個非常棒的庫,并且 TiKV 也使用 mio 處理網(wǎng)絡(luò)照宝,事件邏輯等蛇受,但 mio 太底層,我們?nèi)匀恍枰P(guān)注 async I/O厕鹃,需要寫很多 callback兢仰,這直接導(dǎo)致了代碼的復(fù)雜,邏輯的分散剂碴。

幸運的是把将,Rust 提供了 futures 庫來提供對 promises 和 futures 的支持,它的核心在于抽象一個 Future trait汗茄,提供 zero-cost 的抽象秸弛,讓上層邏輯自由的去實現(xiàn)組合。同時,Rust 也提供 tokio 組件递览,來簡化 mio 的編程工作叼屠,為 async I/O 提供了一站式的解決方案。

所以绞铃,后續(xù)對 futures 的說明镜雨,我們都會基于 tokio 組件來舉例。

Hello World

通常儿捧,第一個例子都是 Hello World荚坞。這里,我們會用 tokio-core 來實現(xiàn)一個簡單的 client菲盾,先跟遠端的 server 建立連接颓影,給 Server 發(fā)送 Hello World,并接受 Server 的返回懒鉴。

 extern crate futures;
 extern crate tokio_core; 
 
 use std::net::ToSocketAddrs;
 
 use futures::Future;
 use tokio_core::reactor::Core;
 use tokio_core::net::TcpStream;
 
 fn main() {
     let mut core = Core::new().unwrap();
     let addr = "127.0.0.1:8080".to_socket_addrs().unwrap().next().unwrap();
 
     let socket = TcpStream::connect(&addr, &core.handle());
 
     let request = socket.and_then(|socket| {
         tokio_core::io::write_all(socket, "Hello World".as_bytes())
     });
     let response = request.and_then(|(socket, _)| {
         tokio_core::io::read_to_end(socket, Vec::new())
     });
 
     let (_, data) = core.run(response).unwrap();
     println!("{}", String::from_utf8_lossy(&data));
 }

這里詳細說明一下:

  • 我們使用 Core::new() 創(chuàng)建了一個 EventLoop
  • 得到 EventLoop 的 handle诡挂,方便讓外面的對象綁定到這個 EventLoop 上面
  • 使用 TcpStream::connect() 連接 server,這里需要注意临谱,雖然我們調(diào)用了 connect璃俗,但是這里只是返回了一個 future,實際的 socket 建立會在后面進行悉默。
  • 然后我們通過 and_then來組合后續(xù)的操作城豁,and_then 表明如果當(dāng)前的 future 執(zhí)行成功,就會開始執(zhí)行下一個 future抄课。在上面唱星,我們通過 and_then 來分別生成一個 request future,以及更后面的 response future剖膳。
  • 最后魏颓,我們在 run 里面開始執(zhí)行 response future岭辣,它會從開始的 future 執(zhí)行吱晒,直到最后的 future 完成。

當(dāng) connect 成功之后沦童,就會執(zhí)行write_all仑濒,當(dāng) write_all 執(zhí)行成功之后,就會執(zhí)行 read_to_end偷遗,最后墩瞳,得到結(jié)果并輸出∈贤悖可以看到喉酌,上面雖然各個邏輯是異步的,但我們通過 future,將其轉(zhuǎn)成了同步的代碼泪电,并且通過 and_then 將各個 future 給串聯(lián)起來般妙。

Future

上面只是一個簡單的例子,實際 future 能做的更多相速。在繼續(xù)之前碟渺,我們來看看 futures 的核心 trait,F(xiàn)uture突诬。

Future 的關(guān)鍵定義如下:

 trait Future {
     type Item;
     type Error;
 
     fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
 
     // ...
 }

Future 內(nèi)部定義了兩個關(guān)聯(lián)類型苫拍,Item 和 Error,極大的方便了用戶自定義 Future旺隙。

Future 最關(guān)鍵的函數(shù)是 poll绒极,它會檢查當(dāng)前 future 的狀態(tài),看時候已經(jīng) ready蔬捷,能對外提供服務(wù)集峦,或者出現(xiàn)了錯誤。

poll 返回 Poll<Self::Item, Self::Error>抠刺,Poll 是一個 typedef塔淤,定義如下:

 pub type Poll<T, E> = Result<Async<T>, E>;
 
 pub enum Async<T> {
     Ready(T),
     NotReady,
 }

對于 Async,我們知道:

  • Ready(T) 表明這個 Future 已經(jīng)完成速妖,T 就是該 Future 的返回值
  • NotReady 表明這個 Future 并沒有 ready高蜂,我們需要在后續(xù)再次調(diào)用 poll

在實現(xiàn)自己的 future 的時候,我們需要注意罕容,因為 future 多數(shù)都會跟 event loop 一起使用备恤,所以 poll 一定不能 block 整個 event loop。如果 poll 有耗時的操作锦秒,我們需要將這些操作放在其他的線程去執(zhí)行露泊,然后在后續(xù)返回結(jié)果。

如果 poll 返回 NotReady 旅择,表明這個 Future 并沒有完成惭笑,我們需要知道何時再次調(diào)用這個 future 的 poll。所以生真, 一個 future 需要給當(dāng)前的 task 注冊一個通知沉噩,當(dāng)這個 future 的值已經(jīng) ready,task 會收到這個通知然后讓 future 繼續(xù)執(zhí)行柱蟀。關(guān)于 task川蒙,我們后面在繼續(xù)討論。

Stream

上面我們說了 Future长已,在 futures 庫里面另一個重要的 trait 就是 Stream畜眨。在 Future 里面昼牛,關(guān)鍵的 poll 函數(shù)其實處理的是一個值的情況,但有些時候康聂,我們需要處理連續(xù)流式的值匾嘱,譬如對于一個 TCP Listener 來說,它會持續(xù)的通過 accept 產(chǎn)生新的客戶端連接早抠。對于流式的處理霎烙,我們使用 Stream trait。

 trait Stream {
     type Item;
     type Error;
 
     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
 }

可以看到蕊连,Stream trait 跟 Future 差不多悬垃,最大的區(qū)別在于 poll 返回的是一個 Option<Self::Item>,而不是 Self::Item甘苍。

如果一個 Stream 結(jié)束了尝蠕,poll 會返回 Ready(None),后續(xù)對于該 Stream 的錯誤調(diào)用都會 panic载庭。

Stream 也是一個特殊的 Future看彼,我們可以使用 into_future 函數(shù)將 Stream 轉(zhuǎn)成一個 Future,這樣外面就能使用 Future 的 combinator(譬如 and_then囚聚,combinator 會在后續(xù)討論)將 Stream 與其他的 Future 連接起來靖榕。

Stream Example

前面我們舉了 Future 的一個例子,這里我們對 Stream 舉例顽铸,實現(xiàn)一個簡單的 echo 服務(wù)茁计。

 let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
 let addr = addr.parse::<SocketAddr>().unwrap();
 
 let mut core = Core::new().unwrap();
 let handle = core.handle();
 
 let socket = TcpListener::bind(&addr, &handle).unwrap();
 let done = socket.incoming().for_each(move |(socket, _addr)| {
     let (reader, writer) = socket.split();
     let amt = copy(reader, writer);
 
     handle.spawn(amt.then(move |_| {
         Ok(())
     }));
 
     Ok(())
 });
 
 core.run(done).unwrap();

這里詳細說明一下:

  • 通過 TcpListener::bind 創(chuàng)建一個 listener。
  • 通過 incoming 得到一個 Stream谓松,該 Stream 的 Item 是一個 tuple (TcpStream, SocketAddr) 星压。當(dāng)我們處理 incoming 的 Stream 的時候,其實就類似循環(huán)調(diào)用這個 listener 的 accept 函數(shù)鬼譬。
  • 然后我們使用 for_each 用來處理上面的 Stream娜膘。for_each 會返回一個 Future 對象,然后讓外面的的 Core 去執(zhí)行优质。因為上面的例子中竣贪,TCPListener 會持續(xù)接受連接,所以 run 不會結(jié)束盆赤。

這里我們在說明一下 for_each 接受一個 Stream 的 Item 之后贾富,如何處理的歉眷。

  • 使用 split 函數(shù)將得到 socket 的 reader 和 writer牺六。也就是我們能通過 reader 讀 socket 的數(shù)據(jù),通過 writer 往 socket 里面寫數(shù)據(jù)汗捡。
  • 使用 copy 生成一個 Future淑际,copy 會將 reader 的里面讀出來的數(shù)據(jù)畏纲,直接寫到 writer 里面。
  • 使用 handle.spawncopy 生成的 Future 綁定到 event loop 上面春缕。這樣我們才能同時處理多個客戶端的請求盗胀,而不會阻塞 event loop。

這里重點注意下 handle.spawn锄贼,編寫過 async I/O 的同學(xué)應(yīng)該都知道 socket 都是非阻塞的票灰,也就是,我們不可能通過一次 read 或者 write 就將這個 socket 的數(shù)據(jù)全部處理完成宅荤。所以屑迂,我們后續(xù)還會重新將 socket 給 register 到 event loop,這樣當(dāng)有新的事件的時候冯键,event loop 會重新調(diào)用對應(yīng)的回調(diào)函數(shù)惹盼。

如果我們直接用 mio,代碼是比較難寫的惫确,因為要處理很多異步邏輯情況手报,但現(xiàn)在我們僅僅只需要 spawn,傳入一個 Future改化,當(dāng)這個 Future 完成的時候就會調(diào)用對應(yīng)的閉包函數(shù)掩蛤。我們現(xiàn)在僅僅需要關(guān)注的是 Future 如何編寫,傳遞陈肛,而不再需要關(guān)注回調(diào)怎么寫了盏档。上面的例子,當(dāng) copy 這個 Future 完成燥爷,我們什么都不干返回(當(dāng)然也可以打印一點東西…)蜈亩,寫起來容易了太多。

小結(jié)

這里僅僅是簡單了介紹了 Future 和 Stream前翎,后面筆者還是深入研究相關(guān)的東西稚配,畢竟我們也會在 TiKV 項目里面使用。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末港华,一起剝皮案震驚了整個濱河市道川,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌立宜,老刑警劉巖冒萄,帶你破解...
    沈念sama閱讀 216,591評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異橙数,居然都是意外死亡尊流,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評論 3 392
  • 文/潘曉璐 我一進店門灯帮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來崖技,“玉大人逻住,你說我怎么就攤上這事∮祝” “怎么了瞎访?”我有些...
    開封第一講書人閱讀 162,823評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長吁恍。 經(jīng)常有香客問我扒秸,道長,這世上最難降的妖魔是什么冀瓦? 我笑而不...
    開封第一講書人閱讀 58,204評論 1 292
  • 正文 為了忘掉前任鸦采,我火速辦了婚禮,結(jié)果婚禮上咕幻,老公的妹妹穿的比我還像新娘渔伯。我一直安慰自己,他們只是感情好肄程,可當(dāng)我...
    茶點故事閱讀 67,228評論 6 388
  • 文/花漫 我一把揭開白布锣吼。 她就那樣靜靜地躺著,像睡著了一般蓝厌。 火紅的嫁衣襯著肌膚如雪玄叠。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,190評論 1 299
  • 那天拓提,我揣著相機與錄音读恃,去河邊找鬼。 笑死代态,一個胖子當(dāng)著我的面吹牛寺惫,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蹦疑,決...
    沈念sama閱讀 40,078評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼西雀,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了歉摧?” 一聲冷哼從身側(cè)響起艇肴,我...
    開封第一講書人閱讀 38,923評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎叁温,沒想到半個月后再悼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,334評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡膝但,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,550評論 2 333
  • 正文 我和宋清朗相戀三年冲九,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片锰镀。...
    茶點故事閱讀 39,727評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡娘侍,死狀恐怖咖刃,靈堂內(nèi)的尸體忽然破棺而出泳炉,到底是詐尸還是另有隱情憾筏,我是刑警寧澤,帶...
    沈念sama閱讀 35,428評論 5 343
  • 正文 年R本政府宣布胸竞,位于F島的核電站肌括,受9級特大地震影響樊零,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜古拴,卻給世界環(huán)境...
    茶點故事閱讀 41,022評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望真友。 院中可真熱鬧黄痪,春花似錦、人聲如沸盔然。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽愈案。三九已至挺尾,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間站绪,已是汗流浹背遭铺。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留恢准,地道東北人魂挂。 一個月前我還...
    沈念sama閱讀 47,734評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像馁筐,于是被迫代替她去往敵國和親锰蓬。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,619評論 2 354

推薦閱讀更多精彩內(nèi)容