本次主要走一下客戶端創(chuàng)建連接和接收數(shù)據(jù)的流程鸣哀。
接上篇霍转,創(chuàng)建Socket成功后,回調函數(shù):emqx_connection, start_link, [Options -- SockOpts]琉历。
emqx_connection.erl:
本模塊為一個gen_server模塊恩沽,所以它會給每一個客戶端啟動一個進程亚兄,并在初始化時渐白,從acceptor接管Socket套接字。
init callback:
init(Parent, Transport, RawSocket, Options) ->
case Transport:wait(RawSocket) of
{ok, Socket} ->
run_loop(Parent, init_state(Transport, Socket, Options));
{error, Reason} ->
ok = Transport:fast_close(RawSocket),
exit_on_sock_error(Reason)
end.
這里重點函數(shù)有兩個:init_state/3和run_loop/2
init_state片吊,顧名思義佩谣,是將進程state中的數(shù)據(jù)或對象初始化把还,其中主要有套接字信息、Frame、Parse吊履、Channel等相關幫助模塊的初始化安皱、GC初始化等等。
run_loop艇炎,處理Socket數(shù)據(jù)的輪詢函數(shù):
- 通過activate_socket酌伊,設置允許接收數(shù)據(jù)包的個數(shù),有效的控制接收流量
- 調用hibernate/2
hibernate(Parent, State) ->
proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]).
這里重點查了一下proc_lib:hibeernate的用法缀踪,類似erlang:hibernate居砖,意思是,使調用進程處于等待狀態(tài)驴娃,當有數(shù)據(jù)接收時奏候,喚醒并調用MFA。
- wakeup_from_hib/2
其中是一個receive的流程唇敞,處理收到的數(shù)據(jù):
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State))
after
IdleTimeout ->
hibernate(Parent, cancel_stats_timer(State))
end.
- process_msg:
主要處理分包蔗草,解MQTT協(xié)議包。并將完整的解析后數(shù)據(jù)疆柔,交給channel處理咒精。
%%--------------------------------------------------------------------
%% Handle incoming packet
handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
ok = inc_incoming_stats(Packet),
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
with_channel(handle_in, [Packet], State);
handle_incoming(FrameError, State) ->
with_channel(handle_in, [FrameError], State).
%%--------------------------------------------------------------------
%% With Channel
with_channel(Fun, Args, State = #state{channel = Channel}) ->
case erlang:apply(emqx_channel, Fun, Args ++ [Channel]) of
ok -> {ok, State};
{ok, NChannel} ->
{ok, State#state{channel = NChannel}};
{ok, Replies, NChannel} ->
{ok, next_msgs(Replies), State#state{channel = NChannel}};
{shutdown, Reason, NChannel} ->
shutdown(Reason, State#state{channel = NChannel});
{shutdown, Reason, Packet, NChannel} ->
NState = State#state{channel = NChannel},
ok = handle_outgoing(Packet, NState),
shutdown(Reason, NState)
end.
emqx_channel.erl & emqx_session.erl:
為什么將這兩個放一起說呢,是因為他們倆是配合做事的旷档。
主要是處理MQTT的各種協(xié)議包了:CONNECT模叙,SUBSCRIBE,PUBLISH鞋屈,UNSUB向楼,DISCONN等等。有興趣的同學可以深入進去谐区,看看每一個協(xié)議包的處理流程,本次就不再贅述了逻卖。
截取connect的流程性代碼片段:
handle_in:
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
case pipeline([fun enrich_conninfo/2,
fun check_connect/2,
fun enrich_client/2,
fun set_logger_meta/2,
fun check_banned/2,
fun auth_connect/2
], ConnPkt, Channel#channel{conn_state = connecting}) of
{ok, NConnPkt, NChannel} ->
process_connect(NConnPkt, NChannel);
{error, ReasonCode, NChannel} ->
handle_out(connack, {ReasonCode, ConnPkt}, NChannel)
end;
process_connect:
%%--------------------------------------------------------------------
%% Process Connect
%%--------------------------------------------------------------------
process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
{ok, #{session := Session, present := false}} ->
NChannel = Channel#channel{session = Session},
handle_out(connack, {?RC_SUCCESS, sp(false), ConnPkt}, NChannel);
{ok, #{session := Session, present := true, pendings := Pendings}} ->
%%TODO: improve later.
Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
NChannel = Channel#channel{session = Session,
resuming = true,
pendings = Pendings1
},
handle_out(connack, {?RC_SUCCESS, sp(true), ConnPkt}, NChannel);
{error, client_id_unavailable} ->
handle_out(connack, {?RC_CLIENT_IDENTIFIER_NOT_VALID, ConnPkt}, Channel);
{error, Reason} ->
?LOG(error, "Failed to open session due to ~p", [Reason]),
handle_out(connack, {?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel)
end.
可以簡單看出來:
- 收到connect請求后宋列,會嘗試建立session數(shù)據(jù)
- connect的返回結果,是調用handle_out
-spec(handle_out(atom(), term(), channel())
-> {ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}).
handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel) ->
AckProps = run_fold([fun enrich_connack_caps/2,
fun enrich_server_keepalive/2,
fun enrich_assigned_clientid/2
], #{}, Channel),
AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
return_connack(AckPacket,
ensure_keepalive(AckProps,
ensure_connected(ConnPkt, Channel)));
handle_out(connack, {ReasonCode, _ConnPkt},
Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]),
AckPacket = ?CONNACK_PACKET(
case maps:get(proto_ver, ConnInfo) of
?MQTT_PROTO_V5 -> ReasonCode;
_Other -> emqx_reason_codes:compat(connack, ReasonCode)
end),
shutdown(emqx_reason_codes:name(ReasonCode), AckPacket, Channel);