Elixir GenStage 簡介

1. 簡介

在2016年7月14號护赊,Elixir發(fā)布了GenStage。官方對GenStage的描述是:

GenStage is a new Elixir behaviour for exchanging events with back-pressure between Elixir processes.

GenStage是生產(chǎn)者和消費者之間交互事件(event)的規(guī)范机打。簡言之,Elixir希望用GenStage代替GenEvent并提供可組合的抽象層片迅,來從第三方系統(tǒng)獲取和處理數(shù)據(jù)残邀。

查看官方文檔,我們可以對Stage有個初步的理解:

Stages are computation steps that send and/or receive data from other stages.

Stage就是運算步驟障涯,每個Stage能發(fā)送數(shù)據(jù)或從其它Stage獲取數(shù)據(jù)罐旗。

本文主要基于Announcing GenStage和Elixir Conf 2016的Keynote內(nèi)容對于GenStage進行闡述。

2. 背景

當初José Valim創(chuàng)建Elixir的一大初衷是引入更好的抽象來處理集合唯蝶。所以Elixir才有List九秀,Enum,Stream粘我,Pipe |>這么多好東西鼓蜒。當然,不僅這樣征字,Elixir也提供給開發(fā)者一條處理集合的路徑都弹,從激進到懶惰,再到并發(fā)匙姜,再到分布式(from eager to lazy, to concurrent and then distributed)畅厢。

Elixir Collection

下面我們從一個簡單的單詞計數(shù)程序開始,探討集合處理過程的演變氮昧。

Word Counting

Eager / Enum

File.read!("path/to/some/file")
|> String.split("\n")
|> Enum.flat_map(fn line ->
    String.split(line, " ")
   end)
|> Enum.reduce(%{}, fn word, acc ->
    Map.update(acc, word, 1, & &1 + 1)
   end)
|> Enum.to_list()

這個方案對于小文件很適用框杜,但是對于大文件浦楣,它需要把文本全部讀入內(nèi)存,并且Enum.flat_map/2會創(chuàng)建一個巨大的List咪辱,這個List包含了文件中的所有單詞振劳,然后才能計數(shù)。這樣我們浪費了大量內(nèi)存油狂,并且浪費了很多構(gòu)建List的時間历恐,不用想,這段程序的效率也很低专筷。

Lazy / Stream

幸運的是弱贼,Elixir打一開始就提供了解決這個問題的方案,也就是大家耳熟能詳?shù)?code>streams仁堪。相比于Enum的eager哮洽,Stream則是lazy填渠。Stream會遍歷List的每個元素弦聂,在這個例子中,就是每一行氛什,而不是之前那樣把整個文件存到內(nèi)存中莺葫。

不大清楚eager和lazy的同學可以回想一下布爾表達式,exp1 && exp2枪眉,如果exp1求值是false捺檬,整個表達式肯定是false,我們不對exp2進行計算贸铜,偷一下懶堡纬,這就是lazy。假如我們還對exp2進行求值蒿秦,就是想知道它的值烤镐,這就是eager了。

我們來看一下采用Stream替代Enum后的版本:

File.stream!("path/to/some/file")
|> Stream.flat_map(fn line ->
    String.split(line, " ")
   end)
|> Enum.reduce(%{}, fn word, acc ->
    Map.update(acc, word, 1, & &1 + 1)
   end)
|> Enum.to_list()

通過使用File.stream!Stream.flat_map

Stream折疊了(folds)了計算過程棍鳖,元素一個一個進入流炮叶,而不是加載一個巨大的文件,通過Stream可以處理大型文件渡处,或者是「無限的」數(shù)據(jù)流镜悉,就比如twitter上每天產(chǎn)生的新信息。

TODO

Concurrent / Flow

當然医瘫,這個版本還是有些小缺陷侣肄,它還是沒有用到并發(fā)。現(xiàn)代計算機一般都有多個核心醇份,能否合理利用多核是我們高效完成任務的關鍵稼锅。

在ElixirConf 2015 keynote中叮喳,José Valim給出了一個最直接的多核解決方案。這個方案將你pipeline的一部分給到了另外的processes缰贝。

File.stream!("path/to/some/file")
|> Stream.flat_map(fn line ->
    String.split(line, " ")
   end)
|> Stream.async()  # NEW!
|> Enum.reduce(%{}, fn word, acc ->
    Map.update(acc, word, 1, & &1 + 1)
   end)
|> Enum.to_list()

Stream.async將在另外的process中運行之前的計算然后流式的把結(jié)果給到Enum.reduce這個process馍悟。不幸的是,這個方法仍不完善剩晴。

首先锣咒,不同process之間我們想盡量避免數(shù)據(jù)遷移。相反赞弥,我們想開啟多個processes來并發(fā)地做同一種計算任務毅整。其次,如果我們需要開發(fā)者手動放置Stream.async的話绽左,會出現(xiàn)很多低效或錯誤用法悼嫉。

盡管這個解決方案存在缺陷,但是它幫助我們提出了正確的問題:

  1. 如果Stream.async建立新的process拼窥,如何保證這些process被監(jiān)控戏蔑?
  2. 由于我們在進程間交換數(shù)據(jù),如何防止一個進程獲取太多數(shù)據(jù)鲁纠?我們需要一個back-pressure機制來讓接收進程規(guī)定來自發(fā)送進程的信息承載量总棵。

在2016的keynote中給出了Flow的解決方案。

Flow

在一個標準雙核電腦上改含,對于2GB的文本文件進行字數(shù)統(tǒng)計情龄,Enum的方式花時遠超10分鐘,等不下去了捍壤,Stream的方式花時60秒骤视,F(xiàn)low的方式花時36秒。

Flow

  1. We give up ordering and process locality for concurrency
  2. Tools for working with bounded and unbounded data
  3. It is not magic! There is an overhead when data flows through processes
  4. Requires volume and/or cpu/io bound work to see benefits

第三條是說鹃觉,假設我們要對一個文件的所有數(shù)字求和专酗,我們使用Flow并不會比Stream更快,因為我們要在不同process之前傳輸大量的數(shù)字帜慢。笼裳??粱玲?

Flow總共有1200行代碼躬柬,1300行的文檔。

3. GenStage

Paste_Image.png
Paste_Image.png
Paste_Image.png
Paste_Image.png

我們來寫一個簡單的pipeline抽减,它將產(chǎn)生events允青,增加數(shù)字,將數(shù)字乘二卵沉,打印到終端颠锉。

三個stages法牲,分別是:producer, :producer_consumer:consumer。把它們簡稱為A, B, C琼掠。

我們首先從producer A開始拒垃。A作為producer,它的主要職責是接收需求瓷蛙,consumer需要處理的事件的數(shù)量悼瓮,并且產(chǎn)生事件。這些事件存在于內(nèi)存中或者來自外部數(shù)據(jù)源〖桠現(xiàn)在實現(xiàn)一個簡單的計數(shù)器横堡,通過init/1給計數(shù)器一個初始值。

注意:所有GenStage項目都有Experimental命名空間作為前綴冠桃。所以下方的代碼中你都將看到Experimental.GenStage歌径。

alias Experimental.GenStage

defmodule A do
  use GenStage

  def init(counter) do
    {:producer, counter}
  end

  def handle_demand(demand, counter) when demand > 0 do
    # If the counter is 3 and we ask for 2 items, we will
    # emit the items 3 and 4, and set the state to 5.
    events = Enum.to_list(counter..counter+demand-1)

    # The events to emit is the second element of the tuple,
    # the third being the state.
    {:noreply, events, counter + demand}
  end
end

B是producer-consumer等太。這意味著它并不顯式地處理需求赏枚,因為需求總是被轉(zhuǎn)發(fā)到它的producers狼纬。一旦A接收了B的需求捞附,它會給B發(fā)送事件巡揍,B會轉(zhuǎn)換這些事件然后發(fā)送給C源内。在本例中毯炮,B會接收事件,并且把它們乘以初始存儲在state中的數(shù)字肃弟。

alias Experimental.GenStage

defmodule B do
  use GenStage

  def init(number) do
    {:producer_consumer, number}
  end

  def handle_events(events, _from, number) do
    events = Enum.map(events, & &1 * number)
    {:noreply, events, number}
  end
end

C is the consumer which will finally receive those events and print them every second to the terminal:
C是一個consumer,最終接收這些事件并且每時每刻輸出到終端零蓉。

alias Experimental.GenStage

defmodule C do
  use GenStage

  def init(sleeping_time) do
    {:consumer, sleeping_time}
  end

  def handle_events(events, _from, sleeping_time) do
    # Print events to terminal.
    IO.inspect(events)

    # Sleep the configured time.
    Process.sleep(sleeping_time)

    # We are a consumer, so we never emit events.
    {:noreply, [], sleeping_time}
  end
end
{:ok, a} = GenStage.start_link(A, 0)    # starting from zero
{:ok, b} = GenStage.start_link(B, 2)    # multiply by 2
{:ok, c} = GenStage.start_link(C, 1000) # sleep for a second

GenStage.sync_subscribe(c, to: b)
GenStage.sync_subscribe(b, to: a)

# Sleep so we see events printed.
Process.sleep(:infinity)

Refs

  1. http://elixir-lang.org/blog/2016/07/14/announcing-genstage/
  2. Elixir Conf 2016 GenStage
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末笤受,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子敌蜂,更是在濱河造成了極大的恐慌箩兽,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件章喉,死亡現(xiàn)場離奇詭異汗贫,居然都是意外死亡,警方通過查閱死者的電腦和手機秸脱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進店門落包,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人摊唇,你說我怎么就攤上這事咐蝇。” “怎么了巷查?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵有序,是天一觀的道長抹腿。 經(jīng)常有香客問我,道長旭寿,這世上最難降的妖魔是什么警绩? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮盅称,結(jié)果婚禮上房蝉,老公的妹妹穿的比我還像新娘。我一直安慰自己微渠,他們只是感情好搭幻,可當我...
    茶點故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著逞盆,像睡著了一般檀蹋。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上云芦,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天俯逾,我揣著相機與錄音,去河邊找鬼舅逸。 笑死桌肴,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的琉历。 我是一名探鬼主播坠七,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼旗笔!你這毒婦竟也來了彪置?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤蝇恶,失蹤者是張志新(化名)和其女友劉穎拳魁,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體撮弧,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡潘懊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了贿衍。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片授舟。...
    茶點故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖舌厨,靈堂內(nèi)的尸體忽然破棺而出岂却,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布躏哩,位于F島的核電站署浩,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏扫尺。R本人自食惡果不足惜筋栋,卻給世界環(huán)境...
    茶點故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望正驻。 院中可真熱鬧弊攘,春花似錦、人聲如沸姑曙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽伤靠。三九已至捣域,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間宴合,已是汗流浹背焕梅。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留卦洽,地道東北人贞言。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像阀蒂,于是被迫代替她去往敵國和親该窗。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理脂新,服務發(fā)現(xiàn)挪捕,斷路器,智...
    卡卡羅2017閱讀 134,656評論 18 139
  • /Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home...
    光劍書架上的書閱讀 3,880評論 2 8
  • 本篇文章是基于谷歌有關Graphic的一篇概覽文章的翻譯:http://source.android.com/de...
    lee_3do閱讀 7,123評論 2 21
  • 對象的創(chuàng)建與銷毀 Item 1: 使用static工廠方法争便,而不是構(gòu)造函數(shù)創(chuàng)建對象:僅僅是創(chuàng)建對象的方法,并非Fa...
    孫小磊閱讀 1,982評論 0 3
  • R I:一個沒有經(jīng)過訓練的心思断医,它的注意力是被外面的刺激因素觸發(fā)的滞乙,被碰到的事吸引,甚至別人的想法也會促動心思鉴嗤,總...
    DevaGopalKaur子涵閱讀 373評論 0 0