30 分鐘學 Erlang (二)

并發(fā)

創(chuàng)建進程

使用 erlang:spawn/1,2,3,4 用來創(chuàng)建一個 erlang 進程浅役。Erlang 進程不是操作系統(tǒng)的進程陈症,而是類似其他語言里“協(xié)程”的概念问潭,它由 Erlang 虛擬機調度等舔。本文以后說“進程”卫玖,就是指 Erlang 進程。

進程之間是互相獨立的原朝,一個進程要想與另外一個進程通信驯嘱,就必須通過消息傳遞。消息會被發(fā)送到對方進程的信箱存儲起來喳坠,對方進程可以在合適的時間鞠评,按照自定的順序讀取信箱里的消息。

Erlang 里進程非常輕量壕鹉,啟動速度很快剃幌,并且可以同時運行千千萬萬個,默認的進程個數(shù)上限是 262144 御板,但可以在啟動時使用 erl +P 修改這個配置锥忿。

1> HelloParallel = fun() -> io:format("hello parallel!~n") end.
#Fun<erl_eval.20.99386804>
2> spawn(HelloParallel).  %% spawn/1 BIF 接受一個函數(shù)做為參數(shù)。
hello parallel!
<0.63.0>
3> PID = pid(0,63,0).   %% 使用 pid 來生成一個 PID
4> is_pid(PID).  %% 檢查是否是 PID 類型
true
5> is_process_alive(PID). %%  檢查 Process 是否還活著怠肋。顯示 false 是因為它已經運行完成終止了敬鬓。
false

spawn 函數(shù)返回一個新進程的 pid,我們可以使用這個 pid 與其交互笙各。

erlang shell 也是有 pid 的钉答。前面說到一個運行時錯誤會使得當前的shell 進程崩潰,并重新啟動一個新的進程杈抢,我們驗證一下:

1> self().   %% self/1 返回當前進程的 pid
<0.60.0>
2> 1 = 2.
** exception error: no match of right hand side value 2
3> self().
<0.63.0>

消息發(fā)送和接收

使用消息發(fā)送運算符 ! 發(fā)送消息数尿。

4> self() ! "hello".    %% 向自己所在的進程發(fā)送一個 List 類型的 "hello". `!` 操作的返回值是消息內容, "hello".
"hello"
5> flush().  %% flush() 將當前 process 的信箱里的所有消息清空并打印。
Shell got "hello"
ok

receive ... end 語句使用 pattern matching 來從自己進程的信箱里讀取消息惶楼,可以使用 after 語句來設置等待超時時間:

1> self() ! "msg1".
"msg1"
2> self() ! "msg2".
"msg2"
3> self() ! "msg3".
"msg3"
4> receive Msg -> Msg after 3000 -> no_more end. %% 讀取任意消息并返回這條消息右蹦,如果信箱里沒有消息了诊杆,等待 3 秒后結束并返回 no_more.
"msg1"
5> receive Msg -> Msg after 3000 -> no_more end.  %% 后面這兩條為什么返回 no_more ? 不應該是 "msg2", "msg3" 嗎?
no_more
6> receive Msg -> Msg after 3000 -> no_more end.
no_more

上面的第 4 行 receive 語句里何陆,erlang shell 進程查看郵箱晨汹,查到第一個消息是 "msg1", Msg 被綁定為 "msg1"。再次運行 receive 語句的時候贷盲,由于 Msg 的值已經為 "msg1"淘这,與信箱里的 "msg2", "msg3" 都不匹配,所以后面兩條 receive 語句都沒有從信箱里讀取新消息巩剖,"msg2" 和 "msg3" 仍然存儲在信箱里:

16> flush().
Shell got "msg2"
Shell got "msg3"
ok

注意雖然后面兩個 receive 語句都沒有從信箱里讀取消息铝穷,但在 receive 語句的執(zhí)行過程中,它仍然是從頭到尾遍歷了整個郵箱佳魔,并嘗試拿郵箱里的各個消息跟代碼里的 Msg 進行匹配曙聂,這是消耗資源的,等后面消息堆積越多越麻煩吃引。這個叫 Selective Message Reception. 消息的讀取順序是接收方決定的筹陵。

所以一般情況下我們在讀取信箱消息時,讀到我們不感興趣的消息也取出來镊尺,打個 error log 然后扔掉它,不要讓它一直在信箱里耗費資源并思。

在 Erlang shell 已經伸展不開拳腳了庐氮。讓我們來寫個復雜點的程序:
我們的程序實現(xiàn)一個 消息緩存,具體需求是:

  • 我們需要一個消息棧宋彼,用于存儲用戶發(fā)來的消息弄砍。
  • 考慮到用戶發(fā)來的消息可能有很多,我們需要好幾個這樣的消息棧來分擔負載输涕。
  • 我們還想能夠給消息棧命名音婶,以便區(qū)分。
-module(msg_cache).

%% APIs
-export([start_one/1]).

%% for spawns
-export([loop/1]).

%% 定義進程的 state莱坎。
%% 我們一般說衣式,一個服務、或 “對象” 會維護自己內部的 '狀態(tài)'
%% 狀態(tài)可能是一個字符串緩存檐什,可能是某個資源的引用碴卧,這個跟業(yè)務相關。
%% 狀態(tài)存在于內存中乃正,跟外界隔離住册,通過 API 接口與外界交互。
%% 面向對象語言里用 類和對象來存儲狀態(tài)瓮具,Erlang 里我們用 process荧飞。
%% 所以我們又說 Erlang 是 “面向Process 編程的”
-record(state, {
            name,      %% 消息棧的名字
            length = 0,  %% 消息棧長度
            buff = []   %% 消息棧的存儲列表
         }).

loop(State = #state{name = Name, length = Len, buff = Buff}) ->
  receive
    {get_name, From}->
      From ! {ok, Name},
      loop(State);
    {get_length, From}->
      From ! {ok, Len},
      loop(State);
    {set_name, NewName, From} ->
      From ! ok,
      loop(State#state{name = NewName});
    {push, Msg, From} ->
      From ! ok,
      loop(State#state{buff = [Msg | Buff], length = Len + 1});
    {pop, [], From} ->
      From ! {error, empty},
      loop(State);
    {pop, [TopMsg | Msgs], From} ->
      From ! {ok, TopMsg},
      loop(State#state{buff = Msgs, length = Len - 1});
    _Unsupported ->
      erlang:error(io_libs:format("unsupported msg: ", [_Unsupported]) )
  end.

start_one(BuffName) ->
  %% 啟動一個消息棧凡人,并返回其 PID
  Pid = spawn(msg_cache, loop, [#state{name=BuffName}]),
  io:format("Buff ~s created! Pid = ~p~n", [BuffName, Pid]),
  Pid

其實除了 loop/1 長一點,其他的都挺容易理解的叹阔。
注意 loop/1 里的每個分支的最后一個語句都是尾遞歸划栓,意味著只要不出錯,loop/1 就一直循環(huán)下去条获,所以進程就不會停止忠荞。

思考:如果把上面代碼里 receive 語句的最后一個 _Unsupported -> 分支刪掉的話,會發(fā)生什么帅掘?

receive 語句里委煤,接受消息時,都要求消息發(fā)送方將自己的 Pid 帶過來修档,放到 From 變量里碧绞,以便我們回復消息給對方。

我們來試試:

1> PID = msg_cache:start_one("cache2").
Buff cache2 created! Pid = <0.62.0>
<0.62.0>
2> PID ! {get_length, self()}.
{get_length,<0.60.0>}
3> flush().
Shell got {ok,0}
ok

4> PID ! {pop, self()}.
{pop,<0.60.0>}
5> flush().
Shell got {error,empty}
ok

6> PID ! {push, "msg1", self()}.
{push,"msg1",<0.60.0>}
7> PID ! {push, "msg2", self()}.
{push,"msg2",<0.60.0>}
8> PID ! {push, "msg3", self()}.
{push,"msg3",<0.60.0>}
9> PID ! {get_length, self()}.
{get_length,<0.60.0>}
10> flush().
Shell got ok
Shell got ok
Shell got ok
Shell got {ok,3}
ok

11> PID ! {pop, self()}.
{pop,<0.60.0>}
12> flush().
Shell got {ok,"msg3"}
ok

13> PID ! {get_length, self()}.
{get_length,<0.60.0>}
14> flush().
Shell got {ok,2}
ok

繼續(xù)往下閱讀之前吱窝,仔細看一下這個例子讥邻,確保你完全理解了這段代碼。

挺厲害的吧院峡?但我們還有兩個問題沒有解決:

  • 沒有一個易用易維護的 API兴使。 PID ! {get_length, self()}. 這種調用方式實在有些反人類。
  • 沒有管理進程照激。我們調用一次 msg_cache:start_one/1 就啟動了一個msg_cache, 但是現(xiàn)在我不知道當前已經啟動了幾個 msg_cache.

我們來解決這第一個問題发魄,重新整理一下代碼:

-module(msg_cache).

%% APIs
-export([start_one/1,
         get_name/1,
         get_length/1,
         pop/1,
         set_name/2,
         push/2
        ]).

%% for spawns
-export([loop/1]).

-define(API_TIMEOUT, 3000).

-record(state, {
            name,
            length = 0,
            buff = []
         }).

start_one(BuffName) ->
  Pid = spawn(msg_cache, loop, [#state{name=BuffName}]),
  io:format("Buff ~s created! Pid = ~p~n", [BuffName, Pid]),
  Pid.

%% 加了這幾個 API
get_name(CacheID) ->
  call(CacheID, {get_name, self()}).
get_length(CacheID) ->
  call(CacheID, {get_length, self()}).
set_name(CacheID, NewName) ->
  call(CacheID, {set_name, NewName, self()}).
pop(CacheID) ->
  call(CacheID, {pop, self()}).
push(CacheID, Msg) ->
  call(CacheID, {push, Msg, self()}).

%% 由于發(fā)送和接受消息的處理方面,各個 API 都差不多俩垃,就提取出來專門寫個 call 函數(shù)励幼,提高代碼復用。
call(Pid, Request) ->
  Pid ! Request,
  receive
    Response -> Response
  after ?API_TIMEOUT ->
    {error, api_timeout}
  end.

%% loop 這一部分我們沒改動任何代碼
loop(State = #state{name = Name, length = Len, buff = Buff}) ->
  receive
    {get_name, From}->
      From ! {ok, Name},
      loop(State);
    {get_length, From}->
      From ! {ok, Len},
      loop(State);
    {set_name, NewName, From} ->
      From ! ok,
      loop(State#state{name = NewName});
    {push, Msg, From} ->
      From ! ok,
      loop(State#state{buff = [Msg | Buff], length = Len + 1});
    {pop, From} ->
      case Buff of
        [] ->
          From ! {error, empty},
          loop(State);
        [TopMsg | Msgs] ->
          From ! {ok, TopMsg},
          loop(State#state{buff = Msgs, length = Len - 1})
      end;
    _Unsupported ->
      erlang:error(io_libs:format("unsupported msg: ", [_Unsupported]) )
  end.

再試一下:

1> PID = msg_cache:start_one("cache_worker_1").
Buff cache_worker_1 created! Pid = <0.62.0>
<0.62.0>
2> msg_cache:get_name(PID).
{ok,"cache_worker_1"}
3> msg_cache:get_length(PID).
{ok,0}
4> msg_cache:pop(PID).
{error,empty}
5> msg_cache:push(PID, "msg1").
ok
6> msg_cache:push(PID, "msg2").
ok
7> msg_cache:get_length(PID).
{ok,2}
8> msg_cache:pop(PID).
{ok,"msg2"}
9> msg_cache:pop(PID).
{ok,"msg1"}
10> msg_cache:pop(PID).
{error,empty}
11> msg_cache:get_length(PID).
{ok,0}

還闊以吧口柳?

留個作業(yè)

上面那個 "管理進程" 我們沒有實現(xiàn)苹粟。你來實現(xiàn)它。

我想這么調用:

%% 啟動兩個 worker:
1> msg_cache:start_cache_workers(["c_worker_1", "c_worker_2"]).
[<0.62.0>, <0.65.0>]

%% 列出所有 workers, 返回值是個 worker 列表, 元素展示了每個 worker 的 name, pid, 和 length 跃闹。
2> CachePidList = msg_cache:list_cache_workers().
[{"c_worker_1", <0.62.0>, 0}, {"c_worker_2", <0.65.0>, 0}]

%% 負載均衡, 會往隨機的一個 cache worker 里 push.
%% 注意我這里調用 msg_cache:push 的時候嵌削,沒有提供某個 cache worker 的 PID
3> ok = msg_cache:push("msg1").
ok
4> ok = msg_cache:push("msg2").
ok
5> CachePidList = msg_cache:list_cache_workers().
[{"c_worker_1", <0.62.0>, 1}, {"c_worker_2", <0.65.0>, 1}]

%% 至于 pop 不用管順序了,有消息就隨便 pop 出一個來辣卒。
4> msg_cache:pop().
{ok, "msg1"}

提示:

  • erlang:register/2 可以給一個 PID 注冊一個名字掷贾,以后使用這個 PID 就可以使用這個名字代替。比如
register(msg_cache_manger, Pid).

msg_cache:list_cache_workers() ->
  msg_cache_manger ! get_all_workers.
課后必讀文章

Erlang 中的錯誤處理機制, Link荣茫、Monitor:
Errors and Processes

ETS

ETS (Erlang Term Storage) 是設計來存放大量的 Erlang 數(shù)據的想帅。跟 ETS 打交道不用消息格式轉換,可直接存放 Erlang 數(shù)據格式 (erlang 各種數(shù)據格式的統(tǒng)稱叫做 erlang terms)啡莉。
ETS 非掣圩迹快旨剥,訪問時間是常數(shù)級的檬果,自動幫你解決了多進程訪問的各種競態(tài)條件問題黍匾,讓我們在 Erlang 中做并發(fā)編程一身輕松。ETS 是非常優(yōu)秀的緩存系統(tǒng)鞋拟,是我們開發(fā)中不可或缺的利器之一衩椒。這比起用某種流行語言來說蚌父,舒服太多[1]
ETS 只將數(shù)據存儲在內存里毛萌,如果想保存到磁盤苟弛,或者要在多個 Erlang Node 之間共享數(shù)據,OTP 基于 ETS 和 DETS 實現(xiàn)了 mnesia.
NODE: mnesia 只適合用來做緩存阁将,在多個 Node 之間共享少量數(shù)據膏秫,非常快速做盅。但是并不適合當做數(shù)據庫存儲大量的數(shù)據缤削,因為 mnesia 在啟動時會加載所有數(shù)據到內存里,導致啟動緩慢吹榴、新節(jié)點加入緩慢亭敢。并且 mnesia 是強一致性的數(shù)據庫,其本身并不處理由于集群腦裂導致的不一致性腊尚,這可能不太符合你的預期吨拗。

ETS 支持幾種數(shù)據類型:

  • set: set 是普通的 key - value 存儲類型,一個 ETS table 里婿斥,兩個數(shù)據的 key 不能相同。重復插入 key 相同的兩條數(shù)據哨鸭,后面的那條會覆蓋前面的那條民宿。
  • ordered_set: 有序的 set 表。
  • bag: bag 允許多個 key 相同的數(shù)據的存在像鸡,但 key, value 都完全相同的數(shù)據只能留一個活鹰。
  • duplicate_bag: 允許多個 key, value 完全相同的數(shù)據的存在。

我們來試試 set 類型的 table只估,這也是最常用的類型志群。我們創(chuàng)建一個命名表,叫 users, 然后插入兩條數(shù)據:

1> ets:new(users, [set, named_table]).
users
2> ets:info(users).   %% 注意默認的權限是 protected
[{read_concurrency,false},
 {write_concurrency,false},
 {compressed,false},
 {memory,304},
 {owner,<0.57.0>},
 {heir,none},
 {name,users},
 {size,0},
 {node,nonode@nohost},
 {named_table,true},
 {type,set},
 {keypos,1},
 {protection,protected}]
3> ets:insert(users, {1, <<"Shawn">>, 27}).
true
4> ets:insert(users, {2, <<"Scarlett">>, 25}).
true
5> ets:lookup(users, 1).
[{1,<<"Shawn">>,27}]
6> ets:lookup(users, 2).
[{2,<<"Scarlett">>,25}]
7> ets:info(users).
[{read_concurrency,false},
 {write_concurrency,false},
 {compressed,false},
 {memory,332},
 {owner,<0.57.0>},
 {heir,none},
 {name,users},
 {size,2},
 {node,nonode@nohost},
 {named_table,true},
 {type,set},
 {keypos,1},
 {protection,protected}]
8>

注意上邊的示例里:

  • 創(chuàng)建 ETS table 時給了兩個 Options 參數(shù):[set, named_table]蛔钙。set 是指定創(chuàng)建 set 類型的表锌云,named_table 是創(chuàng)建命名表,命名為 users吁脱,后面可以用這個表名來引用桑涎。
  • 插入數(shù)據 {1, <<"Shawn">>, 27}{2, <<"Scarlett">>, 25} 時彬向,兩個 tuple 的第一項就是默認的 key,tuple 里其他項都是 values攻冷。如果你想用其他的項作為 key娃胆,可以在 ets:new 的時候,指定 {keypos, Pos} 參數(shù)等曼,設置 key 在 tuple 中的位置里烦。

ETS 表的其他類型你可以自己試驗一下。

需要注意的是:

  • ETS 表里的任何數(shù)據都不參加 GC
  • ETS 表有自己的 owner 進程禁谦,默認情況下胁黑,創(chuàng)建表的那個進程就是 ETS table 的 owner。如果 owner 進程掛了枷畏,ETS 表也就被釋放了别厘。我們上邊的例子里,erlang shell 進程就是 user table 的 owner拥诡。
  • ETS 表也是有訪問權限的触趴,默認是 protected:
    • public:任何人可以讀寫這張表。
    • protected: owner 可以讀寫渴肉,但其他進程只能讀冗懦。
    • private:只有 owner 可以讀寫。別的進程無法訪問仇祭。

由于 ETS 表非常高效披蕉,一般情況下我們都直接使用 public,然后設置 {read_concurrency, true}{write_concurrency,true} 選項來提高并發(fā)讀或寫的效率乌奇,在寫一個管理模塊來直接訪問 ets 表没讲,讓什么封裝什么設計模式都去 shi。

OTP

OTP 已經失去了字面意思礁苗,基本上指的就是 Erlang 生態(tài)環(huán)境的官方部分爬凑。Erlang 世界的組成是這樣的:

  • Erlang 以及 Elixir 等語言。
  • 工具和函數(shù)庫试伙,包括 erlang runtime嘁信,kernel,stdlib(像 lists 這種的官方庫), sasl, 還有像 ETS疏叨,dbg 之類的很多潘靖。
  • 系統(tǒng)設計原則, 包括本章要講的一眾 Behaviors。是一堆應用于并發(fā)世界的設計模式蚤蔓,他們包含了解決通用問題的通用代碼卦溢。
  • 開源社區(qū)生態(tài)環(huán)境,包括各種開源軟件和社區(qū)。

OTP 指的是前三個既绕,Elixir 的話還不大算啄刹。

Erlang 的邏輯是,架構的設計應該由有經驗的人負責凄贩,由專家做好基礎代碼框架誓军,解決好最困難的問題。而使用者只需要寫自己的邏輯代碼疲扎。這就是 OTP behaviors昵时,他們已經在通信、互聯(lián)網領域椒丧,經歷了幾十年的戰(zhàn)火考驗壹甥。

本文要講的有三個:

  • gen_server
  • application
  • supervisor

本章只講解 gen_server。 application 和 supervisor 放到后面 Hello World 工程里講解壶熏。

gen_server 要解決的問題句柠,就是我們上面那個 msg_cache 面臨的問題:怎樣做一個服務來響應用戶的請求。

我們之前寫的代碼很短棒假,可以工作溯职,但是很多東西都沒有考慮。比如請求者如果同時收到來自服務端的兩個 Response 的話帽哑,不知道是對應哪個請求的:

%% 服務端:
    {get_name, From}->
      From ! {ok, Name},
      loop(State);
    {get_length, From}->
      From ! {ok, Len},
      loop(State);

%% 客戶端:
    ServerPID ! {get_length, self()},   %% 客戶端連續(xù)調用了兩次
    ServerPID ! {get_length, self()},  
    receive
      {ok, Len} ->  %% 你知道這次匹配到的消息谜酒,是上面哪次調用的回復嗎?
         success;
      _ ->
         failed
    end.

上面代碼里連續(xù)調用了兩次 {get_length}, 但是由于發(fā)送消息是異步的妻枕,消息通過網絡回來僻族,你并不能確定第一次收到的回復就是第一次調用產生的。

這個問題可以加一個隨機生成的 RequestID 的字段來解決屡谐,客戶端發(fā)送請求消息的時候帶 RequestID 過去述么,服務端返回的時候再傳回來°堤停客戶端通過匹配 RequestID碉输,就能知道當前的回復是對應的哪個請求。

但這種需求其實是通用的亭珍,你現(xiàn)在寫 msg_cache 用得到,改天寫其他代碼也一樣用得到枝哄。另外我們也沒有過多考慮異常的情況:如果程序崩潰了怎么辦肄梨?發(fā)送消息怎么知道對方是不是還活著?

諸如此類的問題應該由專家來解決挠锥,所以我們有了 gen_server.
gen_server 的模板是這樣的:

-module(gen_server_demo).
-behaviour(gen_server).

%% API functions
-export([start_link/0]).

%% gen_server callbacks
-export([init/1,
         handle_call/3,
         handle_cast/2,
         handle_info/2,
         terminate/2,
         code_change/3]).

-record(state, {}).
%%%% %%%% %%%% %%%% %%%% 
%%%% 這是給客戶端調用的接口部分
%%%% %%%% %%%% %%%% %%%% 
%% 啟動一個服務众羡,后臺會啟動一個 erlang process, 并進入 loop 函數(shù), 回想一下我們實現(xiàn) msg_cache 時寫的那個 loop/1.
%% 但是這個 loop 函數(shù)屬于通用部分的代碼,是由 OTP 官方實現(xiàn)的蓖租,所以代碼不在這里粱侣,在 OTP 代碼的 lib/stdlib/src/gen_server.erl 里羊壹。
start_link() ->
    %% gen_server:start_link 啟動 process, 然后將 process 注冊在當前
    %% node 上,注冊名字就是當前 Module 名:gen_server_demo
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

%%%% %%%% %%%% %%%% %%%% 
%%%% 這是 gen_server 發(fā)生某事件時的回調函數(shù)部分
%%%% %%%% %%%% %%%% %%%%

%% gen_server:start_link 被調用齐婴,服務啟動時油猫,回調 init/1
init([]) ->
    {ok, #state{}}.

%% gen_server:call 被調用。gen_server:call 是“同步”調用柠偶,調用方可以設置一個超時時間情妖。
%% 返回值里的 Reply 是返回給調用者的內容。
handle_call(_Request, _From, State) ->
    Reply = ok,
    {reply, Reply, State}.

%% gen_server:cast 被調用诱担。gen_server:cast 是“異步”調用毡证。
%% 調用者一般是想發(fā)一個消息給我們的 gen_server,然后繼續(xù)做自己的事情蔫仙,他不想收到來自 gen_server 的回復料睛。
handle_cast(_Msg, State) ->
    {noreply, State}.

%% gen_server 進程收到一個普通 erlang 消息:一個不是通過 gen_server:call 和 gen_server:cast 發(fā)來的消息。
handle_info(_Info, State) ->
    {noreply, State}.

%% 上面的三個函數(shù) handle_call, handle_cast, handle_info
%%   都可以返回一個 {stop, Reason, State}摇邦,這樣的話 gen_server 會退出恤煞。
%%   但退出之前,可能會回調 terminate(_Reason, _State)涎嚼。


%% gen_server 將要退出時阱州,回調 terminate/2.
%% 注意
%% 1) 要想 terminate 在 gen_server 退出前被回調,gen_server 必須捕獲退出信號:
%%    需要在 init 回調里法梯,加這么一行:process_flag(trap_exit, true).
%% 2) 有幾個特定的 Reason 被認為是正常退出:normal, shutdown, or {shutdown,Term}苔货,
%%    其他的 Reason,sasl 是會報錯打日志的立哑。
terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

gen_server 真正的進程代碼在 OTP 庫里夜惭,運行 start_link(),gen_server 就在后臺跑起來了铛绰。你需要實現(xiàn)的只是這個模板里的各個回調函數(shù)诈茧,將你的業(yè)務邏輯放到這些回調里。

仔細看一下上面的 gen_server 模板和注釋捂掰,確保你能完全理解敢会。

我不想重新實現(xiàn)之前的 msg_cache,一點都不酷这嚣。我們重新寫個其他的鸥昏,讓你對 Erlang 程序的基本設計理念有更深的印象。

我們要實現(xiàn)一個多用戶聊天的程序:

  • 用戶能夠查詢在線的其他用戶姐帚。
  • 用戶之間能夠聊天吏垮。
  • 要容易擴展,因為后面我們的 Client 會通過TCP、WebSocket 等連接上來膳汪,不會是 Erlang 寫的 Client唯蝶。
  • 要容易伸縮,因為我們業(yè)務發(fā)展很快遗嗽,用戶量會越來越大粘我,我們希望程序能很容易的部署在多臺服務器上。

先來設計我們程序的架構:


chat_server-2.png
  • 每個 client 連接上來媳谁,都會啟動一個新的 Process涂滴,叫做 ChatServer.
  • ChatServer 負責維護這個 Client 的 TCP 連接。
  • Route 是一個Module晴音,它提供了數(shù)據庫的管理柔纵,數(shù)據庫里維護了從 User 到其 ChatServer 的 PID 的映射關系。

注意我們的設計思想:

  • 為每一個連接上來的請求啟動一個 Erlang 進程 "ChatServer"锤躁,不要擔心進程個數(shù)搁料,百萬也沒問題。
  • 兩個用戶之間的消息傳遞系羞,體現(xiàn)在服務端就是兩個 "ChatServer" 之間的 Erlang 消息傳遞郭计。
  • Route 部分只是一個 Module,不是進程椒振。每一個 ChatServer 調用 Route 里的代碼的時候昭伸,執(zhí)行過程其實是在每個 ChatServer 進程內部的。這樣我們就避免了集中向一個進程發(fā)送消息帶來的瓶頸澎迎。我們把這種瓶頸的處理留給了 ETS 來解決庐杨。
  • 如何伸縮?ChatServer 在不在同一個服務器上沒什么關系夹供。ChatServerPID 灵份!{send, Msg} 會將消息發(fā)送到ChatServerPID,即使 ChatServerPID 在遠端的服務器上哮洽。分布式部署的時候填渠,這行代碼根本不用改,你要做的僅僅是添加一個新的 Erlang Node鸟辅。分布式 Erlang 后面還要講氛什。
  • 如何擴展?ETS 使用 Route Module 管理匪凉,為的就是當以后換用其他的緩存數(shù)據庫的時候簡單一些屉更。我們設想后面為了做分布式集群,要用 mnesia 替代 ETS洒缀,只需要寫一個新的 Route Module,內部改用 mnesia 存儲,然后替換線上已經加載的老的 Route Module树绩。線上系統(tǒng)都不用停止萨脑,客戶端的連接一個都不會斷!

你現(xiàn)在能否體會到 Erlang 的實用主義呢饺饭?完全沒廢話渤早,就是解決問題!

Client 部分我們現(xiàn)在不做瘫俊,讓前端的同學幫我們實現(xiàn)鹊杖。但假設我們的前端程序員還沒到崗,所以我們可以先放著 WebSocket 部分后面再做扛芽。但有兩個過程必須現(xiàn)在實現(xiàn):

  • 當 Client 登錄時骂蓖,我們需要使用 Route 注冊 user 所在的 ChatServer 的 PID。
  • 當 Client 發(fā)消息時川尖,我們需要使用 Route 查找對方的 ChatServer 的 PID登下。

首先我們來定義我們的消息協(xié)議。我們的消息體內包含幾部分叮喳,發(fā)送者ID被芳,接收者ID,以及消息內容:

-record(msg, {
  from_userid,
  to_userid,
  payload
}).

接下來讓我們來實現(xiàn) Route 模塊馍悟,實現(xiàn)數(shù)據庫創(chuàng)建畔濒,注冊,查找與注銷功能:

-module(route).
-export([ensure_db/0,
         lookup_server/1,
         register_server/2,
         unregister_server/1]).

ensure_db() ->
  case ets:info(servers) of
    undefined ->
      %% 為了演示方便锣咒,我們啟動一個臨時進程來創(chuàng)建 ETS 表侵状,
      %% 如果直接在 erlang shell 里創(chuàng)建ETS的話,出錯時 shell 的崩潰連帶著我們的ETS也丟了宠哄。
      %% 當然線上系統(tǒng)不會這么做壹将。
      spawn(fun() -> ets:new(servers, [named_table, public]), receive after infinity->ok end end);
    _ -> ok
  end.

lookup_server(UserID) ->
  case ets:lookup(servers, UserID) of
    [{UserID, ServerID}] -> {ok, ServerID};
    _ -> {error, no_server}
  end.

register_server(UserID, ServerID) ->
  ets:insert(servers, {UserID, ServerID}).

unregister_server(UserID) ->
  ets:delete(servers, UserID).

接下來實現(xiàn)我們的 ChatServer:

-module(chat_server).
-behaviour(gen_server).
%% state 保存用戶的 userid,以及 client 端連上來的 socket.
-record(state, {
  userid,
  socket
}).

%% 后面當一個新連接連接到服務器的時候毛嫉,我們會調用 start_link 啟動一個新的 ChatServer 為之服務诽俯。
%% 注意這里使用的是 gen_server:start_link/3,沒有注冊進程名承粤,我們直接使用 PID. 因為我們要啟動很多個 ChatServer暴区。
start_link(UserID, Socket) ->
  {ok, ServerID}  = gen_server:start_link(?MODULE, [UserID, Socket], []),
  ServerID.

%% 在 init 回調里注冊用戶的 ChatServer。
%% 注意我們捕獲了 exit signal, 以便程序退出的時候回調 terminate/2. 
%% 我們在 terminate/2 里取消注冊辛臊。
init([UserID, Socket]) ->
    process_flag(trap_exit, true),
    route:register_server(UserID, self()),
    {ok, #state{userid=UserID, socket=Socket}}.

%% 如果我們的 ChatServer 收到一條來自 Socket 的消息仙粱,它會收到一條類似 {tcp, Sock, Data} 的普通消息。
%% 我們需要在 handle_info 里處理彻舰,轉發(fā)給對方的 ChatServer伐割。
handle_info({tcp, #msg{to_userid = ToUserID, payload = Payload} = Msg}, State) ->
  io:format("Chat Server(User: ~p) - received msg from tcp client, Msg: ~p~n",[State#state.userid, Msg]),
  case route:lookup_server(ToUserID) of
    {error, Reason} ->
      io:format("Chat Server(User: ~p) - cannot forward to Chat Server(User: ~p): ~p~n",
          [State#state.userid, ToUserID, Reason]);
    {ok, TargetServerID} ->
      io:format("Chat Server(User: ~p) - forward msg to Chat Server(User: ~p), Payload: ~p~n",
        [State#state.userid, ToUserID, Payload]),
      ok = gen_server:call(TargetServerID, {send, Msg})
  end,
  {noreply, State};

%% 我們的 ChatServer 收到一條來自對端 ChatServer 的轉發(fā)請求
handle_call({send, #msg{payload = Payload}}, _From, State) ->
  io:format("Chat Server(User: ~p) - deliver msg to tcp client, Payload: ~p~n",
    [State#state.userid, Payload]),
  send_to_client_via_tcp(State#state.socket, Payload),
  {reply, ok, State};

%% Socket 部分我們沒有實現(xiàn)候味,暫時就簡單打印一下
send_to_client_via_tcp(_Socket, Payload) ->
  %gen_tcp:send(_Socket, Payload),
  io:format("Sent To Client: ~p~n",[Payload]).

完工了!我們測試一下:

1> c(chat_server).
{ok,chat_server}
2> c(route).
{ok,route}

%% 現(xiàn)在模擬系統(tǒng)啟動時隔心,初始化 DB 的過程白群。
%% 后續(xù)這個會在啟動代碼里寫。
3> route:ensure_db().
<0.73.0>

%% 現(xiàn)在我們模擬一個新的用戶登錄上來硬霍,啟動新的 ChatServer 的過程帜慢。
%% 后續(xù)這個過程當然是由 WebSocket 模塊調用。
4> ServerIDUser1 = chat_server:start_link(<<"user1">>, fake_socket).
<0.75.0>
5> ServerIDUser2 = chat_server:start_link(<<"user2">>, fake_socket).
<0.77.0>

%% 我們來做一個 #msg{} 消息體唯卖。
%% 后續(xù)我們應該在收到 socket 上來的消息解析成功之后粱玲,打包一個 #msg{} 消息體。
6> rr("chat_protocol.hrl").
[msg]
7> Msg = #msg{from_userid= <<"user1">>, to_userid = <<"user2">>, payload = <<"hello?">>}.
#msg{from_userid = <<"user1">>,to_userid = <<"user2">>,
     payload = <<"hello?">>}


%% 模擬從 socket 收到消息的過程拜轨。
8> ServerIDUser1 ! {tcp, Msg}.
Chat Server(User: <<"user1">>) - received msg from tcp client, Msg: {msg,
                                                                     <<"user1">>,
                                                                     <<"user2">>,
                                                                     <<"hello?">>}
{tcp,#msg{from_userid = <<"user1">>,to_userid = <<"user2">>,
          payload = <<"hello?">>}}
Chat Server(User: <<"user1">>) - forward msg to Chat Server(User: <<"user2">>), Payload: <<"hello?">>
Chat Server(User: <<"user2">>) - deliver msg to tcp client, Payload: <<"hello?">>
Sent To Client: <<"hello?">>
9>

我們看到服務端的路由已經走通了抽减,接下來只要寫一個 web socket 模塊,listen 在某個端口撩轰,當有連接請求時胯甩,像上面第 4、第 5 行一樣調用 chat_server:start_link/2 就行了堪嫂。當然 send_to_client_via_tcp 也要改為真正往 socket 發(fā)送消息偎箫。

完整代碼:
https://github.com/terry-xiaoyu/learn-erlang-in-30-mins/tree/master/chat

一個完整的線上演示:
(即將上線)

書接下文30 分鐘學 Erlang (三)


  1. Golang 里你需要自己找多線程安全的 maps 庫,寫并發(fā)沒有安全感皆串。Golang 官方也沒有下文要說到的 OTP 里提供的各種 Behavior淹办,代碼寫起來天馬行空最后一團糟。然后又不能支持函數(shù)式的 pattern matching 等寫法... 總之用 golang 寫代碼從來不會給人愉快的感覺恶复。流行是流行的怜森,但那叫“普通”吧?第一次這么吐槽 golang谤牡,但這篇是 erlang 的教程副硅,應該不算過分吧。等到寫 go 的時候我再來吐槽 erlang 翅萤。我是不會寫 go 的 ... ?

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末恐疲,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子套么,更是在濱河造成了極大的恐慌培己,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,000評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件胚泌,死亡現(xiàn)場離奇詭異省咨,居然都是意外死亡,警方通過查閱死者的電腦和手機玷室,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,745評論 3 399
  • 文/潘曉璐 我一進店門零蓉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來笤受,“玉大人,你說我怎么就攤上這事壁公「新郏” “怎么了?”我有些...
    開封第一講書人閱讀 168,561評論 0 360
  • 文/不壞的土叔 我叫張陵紊册,是天一觀的道長。 經常有香客問我快耿,道長囊陡,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,782評論 1 298
  • 正文 為了忘掉前任掀亥,我火速辦了婚禮撞反,結果婚禮上,老公的妹妹穿的比我還像新娘搪花。我一直安慰自己遏片,他們只是感情好,可當我...
    茶點故事閱讀 68,798評論 6 397
  • 文/花漫 我一把揭開白布撮竿。 她就那樣靜靜地躺著吮便,像睡著了一般。 火紅的嫁衣襯著肌膚如雪幢踏。 梳的紋絲不亂的頭發(fā)上髓需,一...
    開封第一講書人閱讀 52,394評論 1 310
  • 那天,我揣著相機與錄音房蝉,去河邊找鬼僚匆。 笑死,一個胖子當著我的面吹牛搭幻,可吹牛的內容都是我干的咧擂。 我是一名探鬼主播,決...
    沈念sama閱讀 40,952評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼檀蹋,長吁一口氣:“原來是場噩夢啊……” “哼松申!你這毒婦竟也來了?” 一聲冷哼從身側響起续扔,我...
    開封第一講書人閱讀 39,852評論 0 276
  • 序言:老撾萬榮一對情侶失蹤攻臀,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后纱昧,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體刨啸,經...
    沈念sama閱讀 46,409評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,483評論 3 341
  • 正文 我和宋清朗相戀三年识脆,在試婚紗的時候發(fā)現(xiàn)自己被綠了设联。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片善已。...
    茶點故事閱讀 40,615評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖离例,靈堂內的尸體忽然破棺而出换团,到底是詐尸還是另有隱情,我是刑警寧澤宫蛆,帶...
    沈念sama閱讀 36,303評論 5 350
  • 正文 年R本政府宣布艘包,位于F島的核電站,受9級特大地震影響耀盗,放射性物質發(fā)生泄漏想虎。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,979評論 3 334
  • 文/蒙蒙 一叛拷、第九天 我趴在偏房一處隱蔽的房頂上張望舌厨。 院中可真熱鬧,春花似錦忿薇、人聲如沸裙椭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,470評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽揉燃。三九已至,卻和暖如春瑰抵,著一層夾襖步出監(jiān)牢的瞬間你雌,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,571評論 1 272
  • 我被黑心中介騙來泰國打工二汛, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留婿崭,地道東北人。 一個月前我還...
    沈念sama閱讀 49,041評論 3 377
  • 正文 我出身青樓肴颊,卻偏偏與公主長得像氓栈,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子婿着,可洞房花燭夜當晚...
    茶點故事閱讀 45,630評論 2 359

推薦閱讀更多精彩內容

  • 本文寫給誰看的授瘦? 那些已經有過至少一門編程語言基礎,并且需要快速了解Erlang竟宋,掌握其基本要點提完,并馬上投入工作中...
    Shawn_xiaoyu閱讀 31,332評論 9 60
  • 世界是并行的,Erlang程序反應了我們思考和交流的方式丘侠,人作為個體通過發(fā)送消息進行交流徒欣,如果有人死亡,其他人會注...
    abel_cao閱讀 2,768評論 1 4
  • erlang常規(guī)面試題 基礎 消息發(fā)送 基礎相關 OTP相關 gen_server:cast和erlang:sen...
    randyjia閱讀 4,205評論 1 3
  • erlang應用腳本stop分析 其實這篇文章的名字應該是如何安全關閉erlang應用更加科學蜗字。 erlang應用...
    randyjiawenjie閱讀 1,408評論 0 1
  • erlang應用腳本stop分析 其實這篇文章的名字應該是如何安全關閉erlang應用更加科學打肝。 erlang應用...
    randyjia閱讀 1,231評論 0 1