并發(fā)
創(chuàng)建進程
使用 erlang:spawn/1,2,3,4 用來創(chuàng)建一個 erlang 進程浅役。Erlang 進程不是操作系統(tǒng)的進程陈症,而是類似其他語言里“協(xié)程”的概念问潭,它由 Erlang 虛擬機調度等舔。本文以后說“進程”卫玖,就是指 Erlang 進程。
進程之間是互相獨立的原朝,一個進程要想與另外一個進程通信驯嘱,就必須通過消息傳遞。消息會被發(fā)送到對方進程的信箱存儲起來喳坠,對方進程可以在合適的時間鞠评,按照自定的順序讀取信箱里的消息。
Erlang 里進程非常輕量壕鹉,啟動速度很快剃幌,并且可以同時運行千千萬萬個,默認的進程個數(shù)上限是 262144 御板,但可以在啟動時使用 erl +P 修改這個配置锥忿。
1> HelloParallel = fun() -> io:format("hello parallel!~n") end.
#Fun<erl_eval.20.99386804>
2> spawn(HelloParallel). %% spawn/1 BIF 接受一個函數(shù)做為參數(shù)。
hello parallel!
<0.63.0>
3> PID = pid(0,63,0). %% 使用 pid 來生成一個 PID
4> is_pid(PID). %% 檢查是否是 PID 類型
true
5> is_process_alive(PID). %% 檢查 Process 是否還活著怠肋。顯示 false 是因為它已經運行完成終止了敬鬓。
false
spawn 函數(shù)返回一個新進程的 pid,我們可以使用這個 pid 與其交互笙各。
erlang shell 也是有 pid 的钉答。前面說到一個運行時錯誤會使得當前的shell 進程崩潰,并重新啟動一個新的進程杈抢,我們驗證一下:
1> self(). %% self/1 返回當前進程的 pid
<0.60.0>
2> 1 = 2.
** exception error: no match of right hand side value 2
3> self().
<0.63.0>
消息發(fā)送和接收
使用消息發(fā)送運算符 !
發(fā)送消息数尿。
4> self() ! "hello". %% 向自己所在的進程發(fā)送一個 List 類型的 "hello". `!` 操作的返回值是消息內容, "hello".
"hello"
5> flush(). %% flush() 將當前 process 的信箱里的所有消息清空并打印。
Shell got "hello"
ok
receive ... end 語句使用 pattern matching 來從自己進程的信箱里讀取消息惶楼,可以使用 after 語句來設置等待超時時間:
1> self() ! "msg1".
"msg1"
2> self() ! "msg2".
"msg2"
3> self() ! "msg3".
"msg3"
4> receive Msg -> Msg after 3000 -> no_more end. %% 讀取任意消息并返回這條消息右蹦,如果信箱里沒有消息了诊杆,等待 3 秒后結束并返回 no_more.
"msg1"
5> receive Msg -> Msg after 3000 -> no_more end. %% 后面這兩條為什么返回 no_more ? 不應該是 "msg2", "msg3" 嗎?
no_more
6> receive Msg -> Msg after 3000 -> no_more end.
no_more
上面的第 4 行 receive
語句里何陆,erlang shell 進程查看郵箱晨汹,查到第一個消息是 "msg1", Msg 被綁定為 "msg1"。再次運行 receive 語句的時候贷盲,由于 Msg 的值已經為 "msg1"淘这,與信箱里的 "msg2", "msg3" 都不匹配,所以后面兩條 receive
語句都沒有從信箱里讀取新消息巩剖,"msg2" 和 "msg3" 仍然存儲在信箱里:
16> flush().
Shell got "msg2"
Shell got "msg3"
ok
注意雖然后面兩個 receive
語句都沒有從信箱里讀取消息铝穷,但在 receive 語句的執(zhí)行過程中,它仍然是從頭到尾遍歷了整個郵箱佳魔,并嘗試拿郵箱里的各個消息跟代碼里的 Msg
進行匹配曙聂,這是消耗資源的,等后面消息堆積越多越麻煩吃引。這個叫 Selective Message Reception
. 消息的讀取順序是接收方決定的筹陵。
所以一般情況下我們在讀取信箱消息時,讀到我們不感興趣的消息也取出來镊尺,打個 error log 然后扔掉它,不要讓它一直在信箱里耗費資源并思。
在 Erlang shell 已經伸展不開拳腳了庐氮。讓我們來寫個復雜點的程序:
我們的程序實現(xiàn)一個 消息緩存,具體需求是:
- 我們需要一個消息棧宋彼,用于存儲用戶發(fā)來的消息弄砍。
- 考慮到用戶發(fā)來的消息可能有很多,我們需要好幾個這樣的消息棧來分擔負載输涕。
- 我們還想能夠給消息棧命名音婶,以便區(qū)分。
-module(msg_cache).
%% APIs
-export([start_one/1]).
%% for spawns
-export([loop/1]).
%% 定義進程的 state莱坎。
%% 我們一般說衣式,一個服務、或 “對象” 會維護自己內部的 '狀態(tài)'
%% 狀態(tài)可能是一個字符串緩存檐什,可能是某個資源的引用碴卧,這個跟業(yè)務相關。
%% 狀態(tài)存在于內存中乃正,跟外界隔離住册,通過 API 接口與外界交互。
%% 面向對象語言里用 類和對象來存儲狀態(tài)瓮具,Erlang 里我們用 process荧飞。
%% 所以我們又說 Erlang 是 “面向Process 編程的”
-record(state, {
name, %% 消息棧的名字
length = 0, %% 消息棧長度
buff = [] %% 消息棧的存儲列表
}).
loop(State = #state{name = Name, length = Len, buff = Buff}) ->
receive
{get_name, From}->
From ! {ok, Name},
loop(State);
{get_length, From}->
From ! {ok, Len},
loop(State);
{set_name, NewName, From} ->
From ! ok,
loop(State#state{name = NewName});
{push, Msg, From} ->
From ! ok,
loop(State#state{buff = [Msg | Buff], length = Len + 1});
{pop, [], From} ->
From ! {error, empty},
loop(State);
{pop, [TopMsg | Msgs], From} ->
From ! {ok, TopMsg},
loop(State#state{buff = Msgs, length = Len - 1});
_Unsupported ->
erlang:error(io_libs:format("unsupported msg: ", [_Unsupported]) )
end.
start_one(BuffName) ->
%% 啟動一個消息棧凡人,并返回其 PID
Pid = spawn(msg_cache, loop, [#state{name=BuffName}]),
io:format("Buff ~s created! Pid = ~p~n", [BuffName, Pid]),
Pid
其實除了 loop/1 長一點,其他的都挺容易理解的叹阔。
注意 loop/1 里的每個分支的最后一個語句都是尾遞歸划栓,意味著只要不出錯,loop/1 就一直循環(huán)下去条获,所以進程就不會停止忠荞。
思考:如果把上面代碼里 receive 語句的最后一個 _Unsupported -> 分支刪掉的話,會發(fā)生什么帅掘?
receive 語句里委煤,接受消息時,都要求消息發(fā)送方將自己的 Pid 帶過來修档,放到 From
變量里碧绞,以便我們回復消息給對方。
我們來試試:
1> PID = msg_cache:start_one("cache2").
Buff cache2 created! Pid = <0.62.0>
<0.62.0>
2> PID ! {get_length, self()}.
{get_length,<0.60.0>}
3> flush().
Shell got {ok,0}
ok
4> PID ! {pop, self()}.
{pop,<0.60.0>}
5> flush().
Shell got {error,empty}
ok
6> PID ! {push, "msg1", self()}.
{push,"msg1",<0.60.0>}
7> PID ! {push, "msg2", self()}.
{push,"msg2",<0.60.0>}
8> PID ! {push, "msg3", self()}.
{push,"msg3",<0.60.0>}
9> PID ! {get_length, self()}.
{get_length,<0.60.0>}
10> flush().
Shell got ok
Shell got ok
Shell got ok
Shell got {ok,3}
ok
11> PID ! {pop, self()}.
{pop,<0.60.0>}
12> flush().
Shell got {ok,"msg3"}
ok
13> PID ! {get_length, self()}.
{get_length,<0.60.0>}
14> flush().
Shell got {ok,2}
ok
繼續(xù)往下閱讀之前吱窝,仔細看一下這個例子讥邻,確保你完全理解了這段代碼。
挺厲害的吧院峡?但我們還有兩個問題沒有解決:
- 沒有一個易用易維護的 API兴使。 PID ! {get_length, self()}. 這種調用方式實在有些反人類。
- 沒有管理進程照激。我們調用一次 msg_cache:start_one/1 就啟動了一個msg_cache, 但是現(xiàn)在我不知道當前已經啟動了幾個 msg_cache.
我們來解決這第一個問題发魄,重新整理一下代碼:
-module(msg_cache).
%% APIs
-export([start_one/1,
get_name/1,
get_length/1,
pop/1,
set_name/2,
push/2
]).
%% for spawns
-export([loop/1]).
-define(API_TIMEOUT, 3000).
-record(state, {
name,
length = 0,
buff = []
}).
start_one(BuffName) ->
Pid = spawn(msg_cache, loop, [#state{name=BuffName}]),
io:format("Buff ~s created! Pid = ~p~n", [BuffName, Pid]),
Pid.
%% 加了這幾個 API
get_name(CacheID) ->
call(CacheID, {get_name, self()}).
get_length(CacheID) ->
call(CacheID, {get_length, self()}).
set_name(CacheID, NewName) ->
call(CacheID, {set_name, NewName, self()}).
pop(CacheID) ->
call(CacheID, {pop, self()}).
push(CacheID, Msg) ->
call(CacheID, {push, Msg, self()}).
%% 由于發(fā)送和接受消息的處理方面,各個 API 都差不多俩垃,就提取出來專門寫個 call 函數(shù)励幼,提高代碼復用。
call(Pid, Request) ->
Pid ! Request,
receive
Response -> Response
after ?API_TIMEOUT ->
{error, api_timeout}
end.
%% loop 這一部分我們沒改動任何代碼
loop(State = #state{name = Name, length = Len, buff = Buff}) ->
receive
{get_name, From}->
From ! {ok, Name},
loop(State);
{get_length, From}->
From ! {ok, Len},
loop(State);
{set_name, NewName, From} ->
From ! ok,
loop(State#state{name = NewName});
{push, Msg, From} ->
From ! ok,
loop(State#state{buff = [Msg | Buff], length = Len + 1});
{pop, From} ->
case Buff of
[] ->
From ! {error, empty},
loop(State);
[TopMsg | Msgs] ->
From ! {ok, TopMsg},
loop(State#state{buff = Msgs, length = Len - 1})
end;
_Unsupported ->
erlang:error(io_libs:format("unsupported msg: ", [_Unsupported]) )
end.
再試一下:
1> PID = msg_cache:start_one("cache_worker_1").
Buff cache_worker_1 created! Pid = <0.62.0>
<0.62.0>
2> msg_cache:get_name(PID).
{ok,"cache_worker_1"}
3> msg_cache:get_length(PID).
{ok,0}
4> msg_cache:pop(PID).
{error,empty}
5> msg_cache:push(PID, "msg1").
ok
6> msg_cache:push(PID, "msg2").
ok
7> msg_cache:get_length(PID).
{ok,2}
8> msg_cache:pop(PID).
{ok,"msg2"}
9> msg_cache:pop(PID).
{ok,"msg1"}
10> msg_cache:pop(PID).
{error,empty}
11> msg_cache:get_length(PID).
{ok,0}
還闊以吧口柳?
留個作業(yè)
上面那個 "管理進程" 我們沒有實現(xiàn)苹粟。你來實現(xiàn)它。
我想這么調用:
%% 啟動兩個 worker:
1> msg_cache:start_cache_workers(["c_worker_1", "c_worker_2"]).
[<0.62.0>, <0.65.0>]
%% 列出所有 workers, 返回值是個 worker 列表, 元素展示了每個 worker 的 name, pid, 和 length 跃闹。
2> CachePidList = msg_cache:list_cache_workers().
[{"c_worker_1", <0.62.0>, 0}, {"c_worker_2", <0.65.0>, 0}]
%% 負載均衡, 會往隨機的一個 cache worker 里 push.
%% 注意我這里調用 msg_cache:push 的時候嵌削,沒有提供某個 cache worker 的 PID
3> ok = msg_cache:push("msg1").
ok
4> ok = msg_cache:push("msg2").
ok
5> CachePidList = msg_cache:list_cache_workers().
[{"c_worker_1", <0.62.0>, 1}, {"c_worker_2", <0.65.0>, 1}]
%% 至于 pop 不用管順序了,有消息就隨便 pop 出一個來辣卒。
4> msg_cache:pop().
{ok, "msg1"}
提示:
- erlang:register/2 可以給一個 PID 注冊一個名字掷贾,以后使用這個 PID 就可以使用這個名字代替。比如
register(msg_cache_manger, Pid).
msg_cache:list_cache_workers() ->
msg_cache_manger ! get_all_workers.
課后必讀文章
Erlang 中的錯誤處理機制, Link荣茫、Monitor:
Errors and Processes
ETS
ETS (Erlang Term Storage) 是設計來存放大量的 Erlang 數(shù)據的想帅。跟 ETS 打交道不用消息格式轉換,可直接存放 Erlang 數(shù)據格式 (erlang 各種數(shù)據格式的統(tǒng)稱叫做 erlang terms)啡莉。
ETS 非掣圩迹快旨剥,訪問時間是常數(shù)級的檬果,自動幫你解決了多進程訪問的各種競態(tài)條件問題黍匾,讓我們在 Erlang 中做并發(fā)編程一身輕松。ETS 是非常優(yōu)秀的緩存系統(tǒng)鞋拟,是我們開發(fā)中不可或缺的利器之一衩椒。這比起用某種流行語言來說蚌父,舒服太多[1]。
ETS 只將數(shù)據存儲在內存里毛萌,如果想保存到磁盤苟弛,或者要在多個 Erlang Node 之間共享數(shù)據,OTP 基于 ETS 和 DETS 實現(xiàn)了 mnesia.
NODE: mnesia 只適合用來做緩存阁将,在多個 Node 之間共享少量數(shù)據膏秫,非常快速做盅。但是并不適合當做數(shù)據庫存儲大量的數(shù)據缤削,因為 mnesia 在啟動時會加載所有數(shù)據到內存里,導致啟動緩慢吹榴、新節(jié)點加入緩慢亭敢。并且 mnesia 是強一致性的數(shù)據庫,其本身并不處理由于集群腦裂導致的不一致性腊尚,這可能不太符合你的預期吨拗。
ETS 支持幾種數(shù)據類型:
- set: set 是普通的 key - value 存儲類型,一個 ETS table 里婿斥,兩個數(shù)據的 key 不能相同。重復插入 key 相同的兩條數(shù)據哨鸭,后面的那條會覆蓋前面的那條民宿。
- ordered_set: 有序的 set 表。
- bag: bag 允許多個 key 相同的數(shù)據的存在像鸡,但 key, value 都完全相同的數(shù)據只能留一個活鹰。
- duplicate_bag: 允許多個 key, value 完全相同的數(shù)據的存在。
我們來試試 set 類型的 table只估,這也是最常用的類型志群。我們創(chuàng)建一個命名表,叫 users
, 然后插入兩條數(shù)據:
1> ets:new(users, [set, named_table]).
users
2> ets:info(users). %% 注意默認的權限是 protected
[{read_concurrency,false},
{write_concurrency,false},
{compressed,false},
{memory,304},
{owner,<0.57.0>},
{heir,none},
{name,users},
{size,0},
{node,nonode@nohost},
{named_table,true},
{type,set},
{keypos,1},
{protection,protected}]
3> ets:insert(users, {1, <<"Shawn">>, 27}).
true
4> ets:insert(users, {2, <<"Scarlett">>, 25}).
true
5> ets:lookup(users, 1).
[{1,<<"Shawn">>,27}]
6> ets:lookup(users, 2).
[{2,<<"Scarlett">>,25}]
7> ets:info(users).
[{read_concurrency,false},
{write_concurrency,false},
{compressed,false},
{memory,332},
{owner,<0.57.0>},
{heir,none},
{name,users},
{size,2},
{node,nonode@nohost},
{named_table,true},
{type,set},
{keypos,1},
{protection,protected}]
8>
注意上邊的示例里:
- 創(chuàng)建 ETS table 時給了兩個 Options 參數(shù):[set, named_table]蛔钙。set 是指定創(chuàng)建 set 類型的表锌云,named_table 是創(chuàng)建命名表,命名為
users
吁脱,后面可以用這個表名來引用桑涎。 - 插入數(shù)據
{1, <<"Shawn">>, 27}
和{2, <<"Scarlett">>, 25}
時彬向,兩個 tuple 的第一項就是默認的 key,tuple 里其他項都是 values攻冷。如果你想用其他的項作為 key娃胆,可以在 ets:new 的時候,指定{keypos, Pos}
參數(shù)等曼,設置 key 在 tuple 中的位置里烦。
ETS 表的其他類型你可以自己試驗一下。
需要注意的是:
- ETS 表里的任何數(shù)據都不參加 GC
- ETS 表有自己的
owner
進程禁谦,默認情況下胁黑,創(chuàng)建表的那個進程就是 ETS table 的 owner。如果 owner 進程掛了枷畏,ETS 表也就被釋放了别厘。我們上邊的例子里,erlang shell 進程就是user
table 的 owner拥诡。 - ETS 表也是有訪問權限的触趴,默認是
protected
:- public:任何人可以讀寫這張表。
- protected: owner 可以讀寫渴肉,但其他進程只能讀冗懦。
- private:只有 owner 可以讀寫。別的進程無法訪問仇祭。
由于 ETS 表非常高效披蕉,一般情況下我們都直接使用 public
,然后設置 {read_concurrency, true}
或 {write_concurrency,true}
選項來提高并發(fā)讀或寫的效率乌奇,在寫一個管理模塊來直接訪問 ets 表没讲,讓什么封裝什么設計模式都去 shi。
OTP
OTP 已經失去了字面意思礁苗,基本上指的就是 Erlang 生態(tài)環(huán)境的官方部分爬凑。Erlang 世界的組成是這樣的:
- Erlang 以及 Elixir 等語言。
- 工具和函數(shù)庫试伙,包括 erlang runtime嘁信,kernel,stdlib(像 lists 這種的官方庫), sasl, 還有像 ETS疏叨,dbg 之類的很多潘靖。
- 系統(tǒng)設計原則, 包括本章要講的一眾 Behaviors。是一堆應用于并發(fā)世界的設計模式蚤蔓,他們包含了解決通用問題的通用代碼卦溢。
- 開源社區(qū)生態(tài)環(huán)境,包括各種開源軟件和社區(qū)。
OTP 指的是前三個既绕,Elixir 的話還不大算啄刹。
Erlang 的邏輯是,架構的設計應該由有經驗的人負責凄贩,由專家做好基礎代碼框架誓军,解決好最困難的問題。而使用者只需要寫自己的邏輯代碼疲扎。這就是 OTP behaviors昵时,他們已經在通信、互聯(lián)網領域椒丧,經歷了幾十年的戰(zhàn)火考驗壹甥。
本文要講的有三個:
- gen_server
- application
- supervisor
本章只講解 gen_server。 application 和 supervisor 放到后面 Hello World 工程里講解壶熏。
gen_server 要解決的問題句柠,就是我們上面那個 msg_cache 面臨的問題:怎樣做一個服務來響應用戶的請求。
我們之前寫的代碼很短棒假,可以工作溯职,但是很多東西都沒有考慮。比如請求者如果同時收到來自服務端的兩個 Response 的話帽哑,不知道是對應哪個請求的:
%% 服務端:
{get_name, From}->
From ! {ok, Name},
loop(State);
{get_length, From}->
From ! {ok, Len},
loop(State);
%% 客戶端:
ServerPID ! {get_length, self()}, %% 客戶端連續(xù)調用了兩次
ServerPID ! {get_length, self()},
receive
{ok, Len} -> %% 你知道這次匹配到的消息谜酒,是上面哪次調用的回復嗎?
success;
_ ->
failed
end.
上面代碼里連續(xù)調用了兩次 {get_length}, 但是由于發(fā)送消息是異步的妻枕,消息通過網絡回來僻族,你并不能確定第一次收到的回復就是第一次調用產生的。
這個問題可以加一個隨機生成的 RequestID 的字段來解決屡谐,客戶端發(fā)送請求消息的時候帶 RequestID 過去述么,服務端返回的時候再傳回來°堤停客戶端通過匹配 RequestID碉输,就能知道當前的回復是對應的哪個請求。
但這種需求其實是通用的亭珍,你現(xiàn)在寫 msg_cache 用得到,改天寫其他代碼也一樣用得到枝哄。另外我們也沒有過多考慮異常的情況:如果程序崩潰了怎么辦肄梨?發(fā)送消息怎么知道對方是不是還活著?
諸如此類的問題應該由專家來解決挠锥,所以我們有了 gen_server
.
gen_server 的模板是這樣的:
-module(gen_server_demo).
-behaviour(gen_server).
%% API functions
-export([start_link/0]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state, {}).
%%%% %%%% %%%% %%%% %%%%
%%%% 這是給客戶端調用的接口部分
%%%% %%%% %%%% %%%% %%%%
%% 啟動一個服務众羡,后臺會啟動一個 erlang process, 并進入 loop 函數(shù), 回想一下我們實現(xiàn) msg_cache 時寫的那個 loop/1.
%% 但是這個 loop 函數(shù)屬于通用部分的代碼,是由 OTP 官方實現(xiàn)的蓖租,所以代碼不在這里粱侣,在 OTP 代碼的 lib/stdlib/src/gen_server.erl 里羊壹。
start_link() ->
%% gen_server:start_link 啟動 process, 然后將 process 注冊在當前
%% node 上,注冊名字就是當前 Module 名:gen_server_demo
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%%% %%%% %%%% %%%% %%%%
%%%% 這是 gen_server 發(fā)生某事件時的回調函數(shù)部分
%%%% %%%% %%%% %%%% %%%%
%% gen_server:start_link 被調用齐婴,服務啟動時油猫,回調 init/1
init([]) ->
{ok, #state{}}.
%% gen_server:call 被調用。gen_server:call 是“同步”調用柠偶,調用方可以設置一個超時時間情妖。
%% 返回值里的 Reply 是返回給調用者的內容。
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
%% gen_server:cast 被調用诱担。gen_server:cast 是“異步”調用毡证。
%% 調用者一般是想發(fā)一個消息給我們的 gen_server,然后繼續(xù)做自己的事情蔫仙,他不想收到來自 gen_server 的回復料睛。
handle_cast(_Msg, State) ->
{noreply, State}.
%% gen_server 進程收到一個普通 erlang 消息:一個不是通過 gen_server:call 和 gen_server:cast 發(fā)來的消息。
handle_info(_Info, State) ->
{noreply, State}.
%% 上面的三個函數(shù) handle_call, handle_cast, handle_info
%% 都可以返回一個 {stop, Reason, State}摇邦,這樣的話 gen_server 會退出恤煞。
%% 但退出之前,可能會回調 terminate(_Reason, _State)涎嚼。
%% gen_server 將要退出時阱州,回調 terminate/2.
%% 注意
%% 1) 要想 terminate 在 gen_server 退出前被回調,gen_server 必須捕獲退出信號:
%% 需要在 init 回調里法梯,加這么一行:process_flag(trap_exit, true).
%% 2) 有幾個特定的 Reason 被認為是正常退出:normal, shutdown, or {shutdown,Term}苔货,
%% 其他的 Reason,sasl 是會報錯打日志的立哑。
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
gen_server 真正的進程代碼在 OTP 庫里夜惭,運行 start_link(),gen_server 就在后臺跑起來了铛绰。你需要實現(xiàn)的只是這個模板里的各個回調函數(shù)诈茧,將你的業(yè)務邏輯放到這些回調里。
仔細看一下上面的 gen_server 模板和注釋捂掰,確保你能完全理解敢会。
我不想重新實現(xiàn)之前的 msg_cache,一點都不酷这嚣。我們重新寫個其他的鸥昏,讓你對 Erlang 程序的基本設計理念有更深的印象。
我們要實現(xiàn)一個多用戶聊天的程序:
- 用戶能夠查詢在線的其他用戶姐帚。
- 用戶之間能夠聊天吏垮。
- 要容易擴展,因為后面我們的 Client 會通過TCP、WebSocket 等連接上來膳汪,不會是 Erlang 寫的 Client唯蝶。
- 要容易伸縮,因為我們業(yè)務發(fā)展很快遗嗽,用戶量會越來越大粘我,我們希望程序能很容易的部署在多臺服務器上。
先來設計我們程序的架構:
- 每個 client 連接上來媳谁,都會啟動一個新的 Process涂滴,叫做 ChatServer.
- ChatServer 負責維護這個 Client 的 TCP 連接。
- Route 是一個Module晴音,它提供了數(shù)據庫的管理柔纵,數(shù)據庫里維護了從 User 到其 ChatServer 的 PID 的映射關系。
注意我們的設計思想:
- 為每一個連接上來的請求啟動一個 Erlang 進程 "ChatServer"锤躁,不要擔心進程個數(shù)搁料,百萬也沒問題。
- 兩個用戶之間的消息傳遞系羞,體現(xiàn)在服務端就是兩個 "ChatServer" 之間的 Erlang 消息傳遞郭计。
- Route 部分只是一個 Module,不是進程椒振。每一個 ChatServer 調用 Route 里的代碼的時候昭伸,執(zhí)行過程其實是在每個 ChatServer 進程內部的。這樣我們就避免了集中向一個進程發(fā)送消息帶來的瓶頸澎迎。我們把這種瓶頸的處理留給了 ETS 來解決庐杨。
- 如何伸縮?ChatServer 在不在同一個服務器上沒什么關系夹供。
ChatServerPID 灵份!{send, Msg}
會將消息發(fā)送到ChatServerPID,即使 ChatServerPID 在遠端的服務器上哮洽。分布式部署的時候填渠,這行代碼根本不用改,你要做的僅僅是添加一個新的 Erlang Node鸟辅。分布式 Erlang 后面還要講氛什。 - 如何擴展?ETS 使用 Route Module 管理匪凉,為的就是當以后換用其他的緩存數(shù)據庫的時候簡單一些屉更。我們設想后面為了做分布式集群,要用 mnesia 替代 ETS洒缀,只需要寫一個新的 Route Module,內部改用 mnesia 存儲,然后替換線上已經加載的老的 Route Module树绩。線上系統(tǒng)都不用停止萨脑,客戶端的連接一個都不會斷!
你現(xiàn)在能否體會到 Erlang 的實用主義呢饺饭?完全沒廢話渤早,就是解決問題!
Client 部分我們現(xiàn)在不做瘫俊,讓前端的同學幫我們實現(xiàn)鹊杖。但假設我們的前端程序員還沒到崗,所以我們可以先放著 WebSocket 部分后面再做扛芽。但有兩個過程必須現(xiàn)在實現(xiàn):
- 當 Client 登錄時骂蓖,我們需要使用 Route 注冊 user 所在的 ChatServer 的 PID。
- 當 Client 發(fā)消息時川尖,我們需要使用 Route 查找對方的 ChatServer 的 PID登下。
首先我們來定義我們的消息協(xié)議。我們的消息體內包含幾部分叮喳,發(fā)送者ID被芳,接收者ID,以及消息內容:
-record(msg, {
from_userid,
to_userid,
payload
}).
接下來讓我們來實現(xiàn) Route 模塊馍悟,實現(xiàn)數(shù)據庫創(chuàng)建畔濒,注冊,查找與注銷功能:
-module(route).
-export([ensure_db/0,
lookup_server/1,
register_server/2,
unregister_server/1]).
ensure_db() ->
case ets:info(servers) of
undefined ->
%% 為了演示方便锣咒,我們啟動一個臨時進程來創(chuàng)建 ETS 表侵状,
%% 如果直接在 erlang shell 里創(chuàng)建ETS的話,出錯時 shell 的崩潰連帶著我們的ETS也丟了宠哄。
%% 當然線上系統(tǒng)不會這么做壹将。
spawn(fun() -> ets:new(servers, [named_table, public]), receive after infinity->ok end end);
_ -> ok
end.
lookup_server(UserID) ->
case ets:lookup(servers, UserID) of
[{UserID, ServerID}] -> {ok, ServerID};
_ -> {error, no_server}
end.
register_server(UserID, ServerID) ->
ets:insert(servers, {UserID, ServerID}).
unregister_server(UserID) ->
ets:delete(servers, UserID).
接下來實現(xiàn)我們的 ChatServer:
-module(chat_server).
-behaviour(gen_server).
%% state 保存用戶的 userid,以及 client 端連上來的 socket.
-record(state, {
userid,
socket
}).
%% 后面當一個新連接連接到服務器的時候毛嫉,我們會調用 start_link 啟動一個新的 ChatServer 為之服務诽俯。
%% 注意這里使用的是 gen_server:start_link/3,沒有注冊進程名承粤,我們直接使用 PID. 因為我們要啟動很多個 ChatServer暴区。
start_link(UserID, Socket) ->
{ok, ServerID} = gen_server:start_link(?MODULE, [UserID, Socket], []),
ServerID.
%% 在 init 回調里注冊用戶的 ChatServer。
%% 注意我們捕獲了 exit signal, 以便程序退出的時候回調 terminate/2.
%% 我們在 terminate/2 里取消注冊辛臊。
init([UserID, Socket]) ->
process_flag(trap_exit, true),
route:register_server(UserID, self()),
{ok, #state{userid=UserID, socket=Socket}}.
%% 如果我們的 ChatServer 收到一條來自 Socket 的消息仙粱,它會收到一條類似 {tcp, Sock, Data} 的普通消息。
%% 我們需要在 handle_info 里處理彻舰,轉發(fā)給對方的 ChatServer伐割。
handle_info({tcp, #msg{to_userid = ToUserID, payload = Payload} = Msg}, State) ->
io:format("Chat Server(User: ~p) - received msg from tcp client, Msg: ~p~n",[State#state.userid, Msg]),
case route:lookup_server(ToUserID) of
{error, Reason} ->
io:format("Chat Server(User: ~p) - cannot forward to Chat Server(User: ~p): ~p~n",
[State#state.userid, ToUserID, Reason]);
{ok, TargetServerID} ->
io:format("Chat Server(User: ~p) - forward msg to Chat Server(User: ~p), Payload: ~p~n",
[State#state.userid, ToUserID, Payload]),
ok = gen_server:call(TargetServerID, {send, Msg})
end,
{noreply, State};
%% 我們的 ChatServer 收到一條來自對端 ChatServer 的轉發(fā)請求
handle_call({send, #msg{payload = Payload}}, _From, State) ->
io:format("Chat Server(User: ~p) - deliver msg to tcp client, Payload: ~p~n",
[State#state.userid, Payload]),
send_to_client_via_tcp(State#state.socket, Payload),
{reply, ok, State};
%% Socket 部分我們沒有實現(xiàn)候味,暫時就簡單打印一下
send_to_client_via_tcp(_Socket, Payload) ->
%gen_tcp:send(_Socket, Payload),
io:format("Sent To Client: ~p~n",[Payload]).
完工了!我們測試一下:
1> c(chat_server).
{ok,chat_server}
2> c(route).
{ok,route}
%% 現(xiàn)在模擬系統(tǒng)啟動時隔心,初始化 DB 的過程白群。
%% 后續(xù)這個會在啟動代碼里寫。
3> route:ensure_db().
<0.73.0>
%% 現(xiàn)在我們模擬一個新的用戶登錄上來硬霍,啟動新的 ChatServer 的過程帜慢。
%% 后續(xù)這個過程當然是由 WebSocket 模塊調用。
4> ServerIDUser1 = chat_server:start_link(<<"user1">>, fake_socket).
<0.75.0>
5> ServerIDUser2 = chat_server:start_link(<<"user2">>, fake_socket).
<0.77.0>
%% 我們來做一個 #msg{} 消息體唯卖。
%% 后續(xù)我們應該在收到 socket 上來的消息解析成功之后粱玲,打包一個 #msg{} 消息體。
6> rr("chat_protocol.hrl").
[msg]
7> Msg = #msg{from_userid= <<"user1">>, to_userid = <<"user2">>, payload = <<"hello?">>}.
#msg{from_userid = <<"user1">>,to_userid = <<"user2">>,
payload = <<"hello?">>}
%% 模擬從 socket 收到消息的過程拜轨。
8> ServerIDUser1 ! {tcp, Msg}.
Chat Server(User: <<"user1">>) - received msg from tcp client, Msg: {msg,
<<"user1">>,
<<"user2">>,
<<"hello?">>}
{tcp,#msg{from_userid = <<"user1">>,to_userid = <<"user2">>,
payload = <<"hello?">>}}
Chat Server(User: <<"user1">>) - forward msg to Chat Server(User: <<"user2">>), Payload: <<"hello?">>
Chat Server(User: <<"user2">>) - deliver msg to tcp client, Payload: <<"hello?">>
Sent To Client: <<"hello?">>
9>
我們看到服務端的路由已經走通了抽减,接下來只要寫一個 web socket 模塊,listen 在某個端口撩轰,當有連接請求時胯甩,像上面第 4、第 5 行一樣調用 chat_server:start_link/2 就行了堪嫂。當然 send_to_client_via_tcp 也要改為真正往 socket 發(fā)送消息偎箫。
完整代碼:
https://github.com/terry-xiaoyu/learn-erlang-in-30-mins/tree/master/chat
一個完整的線上演示:
(即將上線)
書接下文:30 分鐘學 Erlang (三)
-
Golang 里你需要自己找多線程安全的 maps 庫,寫并發(fā)沒有安全感皆串。Golang 官方也沒有下文要說到的 OTP 里提供的各種 Behavior淹办,代碼寫起來天馬行空最后一團糟。然后又不能支持函數(shù)式的 pattern matching 等寫法... 總之用 golang 寫代碼從來不會給人愉快的感覺恶复。流行是流行的怜森,但那叫“普通”吧?第一次這么吐槽 golang谤牡,但這篇是 erlang 的教程副硅,應該不算過分吧。等到寫 go 的時候我再來吐槽 erlang 翅萤。我是不會寫 go 的 ... ?