1. 簡介
在2016年7月14號护赊,Elixir發(fā)布了GenStage。官方對GenStage的描述是:
GenStage is a new Elixir behaviour for exchanging events with back-pressure between Elixir processes.
GenStage是生產(chǎn)者和消費者之間交互事件(event)的規(guī)范机打。簡言之,Elixir希望用GenStage代替GenEvent并提供可組合的抽象層片迅,來從第三方系統(tǒng)獲取和處理數(shù)據(jù)残邀。
查看官方文檔,我們可以對Stage有個初步的理解:
Stages are computation steps that send and/or receive data from other stages.
Stage就是運算步驟障涯,每個Stage能發(fā)送數(shù)據(jù)或從其它Stage獲取數(shù)據(jù)罐旗。
本文主要基于Announcing GenStage和Elixir Conf 2016的Keynote內(nèi)容對于GenStage進行闡述。
2. 背景
當初José Valim創(chuàng)建Elixir的一大初衷是引入更好的抽象來處理集合唯蝶。所以Elixir才有List九秀,Enum,Stream粘我,Pipe |>
這么多好東西鼓蜒。當然,不僅這樣征字,Elixir也提供給開發(fā)者一條處理集合的路徑都弹,從激進到懶惰,再到并發(fā)匙姜,再到分布式(from eager to lazy, to concurrent and then distributed)畅厢。
下面我們從一個簡單的單詞計數(shù)程序開始,探討集合處理過程的演變氮昧。
Eager / Enum
File.read!("path/to/some/file")
|> String.split("\n")
|> Enum.flat_map(fn line ->
String.split(line, " ")
end)
|> Enum.reduce(%{}, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
這個方案對于小文件很適用框杜,但是對于大文件浦楣,它需要把文本全部讀入內(nèi)存,并且Enum.flat_map/2
會創(chuàng)建一個巨大的List咪辱,這個List包含了文件中的所有單詞振劳,然后才能計數(shù)。這樣我們浪費了大量內(nèi)存油狂,并且浪費了很多構(gòu)建List的時間历恐,不用想,這段程序的效率也很低专筷。
Lazy / Stream
幸運的是弱贼,Elixir打一開始就提供了解決這個問題的方案,也就是大家耳熟能詳?shù)?code>streams仁堪。相比于Enum的eager哮洽,Stream則是lazy填渠。Stream會遍歷List的每個元素弦聂,在這個例子中,就是每一行氛什,而不是之前那樣把整個文件存到內(nèi)存中莺葫。
不大清楚eager和lazy的同學可以回想一下布爾表達式,exp1 && exp2枪眉,如果exp1求值是false捺檬,整個表達式肯定是false,我們不對exp2進行計算贸铜,偷一下懶堡纬,這就是lazy。假如我們還對exp2進行求值蒿秦,就是想知道它的值烤镐,這就是eager了。
我們來看一下采用Stream替代Enum后的版本:
File.stream!("path/to/some/file")
|> Stream.flat_map(fn line ->
String.split(line, " ")
end)
|> Enum.reduce(%{}, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
通過使用File.stream!
和 Stream.flat_map
Stream折疊了(folds)了計算過程棍鳖,元素一個一個進入流炮叶,而不是加載一個巨大的文件,通過Stream可以處理大型文件渡处,或者是「無限的」數(shù)據(jù)流镜悉,就比如twitter上每天產(chǎn)生的新信息。
TODO
Concurrent / Flow
當然医瘫,這個版本還是有些小缺陷侣肄,它還是沒有用到并發(fā)。現(xiàn)代計算機一般都有多個核心醇份,能否合理利用多核是我們高效完成任務的關鍵稼锅。
在ElixirConf 2015 keynote中叮喳,José Valim給出了一個最直接的多核解決方案。這個方案將你pipeline的一部分給到了另外的processes缰贝。
File.stream!("path/to/some/file")
|> Stream.flat_map(fn line ->
String.split(line, " ")
end)
|> Stream.async() # NEW!
|> Enum.reduce(%{}, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
Stream.async
將在另外的process中運行之前的計算然后流式的把結(jié)果給到Enum.reduce
這個process馍悟。不幸的是,這個方法仍不完善剩晴。
首先锣咒,不同process之間我們想盡量避免數(shù)據(jù)遷移。相反赞弥,我們想開啟多個processes來并發(fā)地做同一種計算任務毅整。其次,如果我們需要開發(fā)者手動放置Stream.async
的話绽左,會出現(xiàn)很多低效或錯誤用法悼嫉。
盡管這個解決方案存在缺陷,但是它幫助我們提出了正確的問題:
- 如果
Stream.async
建立新的process拼窥,如何保證這些process被監(jiān)控戏蔑? - 由于我們在進程間交換數(shù)據(jù),如何防止一個進程獲取太多數(shù)據(jù)鲁纠?我們需要一個back-pressure機制來讓接收進程規(guī)定來自發(fā)送進程的信息承載量总棵。
在2016的keynote中給出了Flow的解決方案。
在一個標準雙核電腦上改含,對于2GB的文本文件進行字數(shù)統(tǒng)計情龄,Enum的方式花時遠超10分鐘,等不下去了捍壤,Stream的方式花時60秒骤视,F(xiàn)low的方式花時36秒。
Flow
- We give up ordering and process locality for concurrency
- Tools for working with bounded and unbounded data
- It is not magic! There is an overhead when data flows through processes
- Requires volume and/or cpu/io bound work to see benefits
第三條是說鹃觉,假設我們要對一個文件的所有數(shù)字求和专酗,我們使用Flow并不會比Stream更快,因為我們要在不同process之前傳輸大量的數(shù)字帜慢。笼裳??粱玲?
Flow總共有1200行代碼躬柬,1300行的文檔。
3. GenStage
我們來寫一個簡單的pipeline抽减,它將產(chǎn)生events允青,增加數(shù)字,將數(shù)字乘二卵沉,打印到終端颠锉。
三個stages法牲,分別是:producer
, :producer_consumer
和 :consumer
。把它們簡稱為A, B, C
琼掠。
我們首先從producer A開始拒垃。A作為producer,它的主要職責是接收需求瓷蛙,consumer需要處理的事件的數(shù)量悼瓮,并且產(chǎn)生事件。這些事件存在于內(nèi)存中或者來自外部數(shù)據(jù)源〖桠現(xiàn)在實現(xiàn)一個簡單的計數(shù)器横堡,通過init/1
給計數(shù)器一個初始值。
注意:所有GenStage項目都有Experimental命名空間作為前綴冠桃。所以下方的代碼中你都將看到
Experimental.GenStage
歌径。
alias Experimental.GenStage
defmodule A do
use GenStage
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
events = Enum.to_list(counter..counter+demand-1)
# The events to emit is the second element of the tuple,
# the third being the state.
{:noreply, events, counter + demand}
end
end
B是producer-consumer等太。這意味著它并不顯式地處理需求赏枚,因為需求總是被轉(zhuǎn)發(fā)到它的producers狼纬。一旦A接收了B的需求捞附,它會給B發(fā)送事件巡揍,B會轉(zhuǎn)換這些事件然后發(fā)送給C源内。在本例中毯炮,B會接收事件,并且把它們乘以初始存儲在state中的數(shù)字肃弟。
alias Experimental.GenStage
defmodule B do
use GenStage
def init(number) do
{:producer_consumer, number}
end
def handle_events(events, _from, number) do
events = Enum.map(events, & &1 * number)
{:noreply, events, number}
end
end
C is the consumer which will finally receive those events and print them every second to the terminal:
C是一個consumer,最終接收這些事件并且每時每刻輸出到終端零蓉。
alias Experimental.GenStage
defmodule C do
use GenStage
def init(sleeping_time) do
{:consumer, sleeping_time}
end
def handle_events(events, _from, sleeping_time) do
# Print events to terminal.
IO.inspect(events)
# Sleep the configured time.
Process.sleep(sleeping_time)
# We are a consumer, so we never emit events.
{:noreply, [], sleeping_time}
end
end
{:ok, a} = GenStage.start_link(A, 0) # starting from zero
{:ok, b} = GenStage.start_link(B, 2) # multiply by 2
{:ok, c} = GenStage.start_link(C, 1000) # sleep for a second
GenStage.sync_subscribe(c, to: b)
GenStage.sync_subscribe(b, to: a)
# Sleep so we see events printed.
Process.sleep(:infinity)