EMQX源碼閱讀(三)

本次主要走一下客戶端創(chuàng)建連接和接收數(shù)據(jù)的流程鸣哀。

接上篇霍转,創(chuàng)建Socket成功后,回調函數(shù):emqx_connection, start_link, [Options -- SockOpts]琉历。

emqx_connection.erl:
本模塊為一個gen_server模塊恩沽,所以它會給每一個客戶端啟動一個進程亚兄,并在初始化時渐白,從acceptor接管Socket套接字。
init callback:

init(Parent, Transport, RawSocket, Options) ->
    case Transport:wait(RawSocket) of
        {ok, Socket} ->
            run_loop(Parent, init_state(Transport, Socket, Options));
        {error, Reason} ->
            ok = Transport:fast_close(RawSocket),
            exit_on_sock_error(Reason)
    end.

這里重點函數(shù)有兩個:init_state/3和run_loop/2
init_state片吊,顧名思義佩谣,是將進程state中的數(shù)據(jù)或對象初始化把还,其中主要有套接字信息、Frame、Parse吊履、Channel等相關幫助模塊的初始化安皱、GC初始化等等。
run_loop艇炎,處理Socket數(shù)據(jù)的輪詢函數(shù):

  1. 通過activate_socket酌伊,設置允許接收數(shù)據(jù)包的個數(shù),有效的控制接收流量
  2. 調用hibernate/2
hibernate(Parent, State) ->
    proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]).

這里重點查了一下proc_lib:hibeernate的用法缀踪,類似erlang:hibernate居砖,意思是,使調用進程處于等待狀態(tài)驴娃,當有數(shù)據(jù)接收時奏候,喚醒并調用MFA。

  1. wakeup_from_hib/2
    其中是一個receive的流程唇敞,處理收到的數(shù)據(jù):
receive
        {system, From, Request} ->
            sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
        {'EXIT', Parent, Reason} ->
            terminate(Reason, State);
        Msg ->
            process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State))
    after
        IdleTimeout ->
            hibernate(Parent, cancel_stats_timer(State))
    end.
  1. process_msg:
    主要處理分包蔗草,解MQTT協(xié)議包。并將完整的解析后數(shù)據(jù)疆柔,交給channel處理咒精。
%%--------------------------------------------------------------------
%% Handle incoming packet

handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
    ok = inc_incoming_stats(Packet),
    ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
    with_channel(handle_in, [Packet], State);

handle_incoming(FrameError, State) ->
    with_channel(handle_in, [FrameError], State).

%%--------------------------------------------------------------------
%% With Channel

with_channel(Fun, Args, State = #state{channel = Channel}) ->
    case erlang:apply(emqx_channel, Fun, Args ++ [Channel]) of
        ok -> {ok, State};
        {ok, NChannel} ->
            {ok, State#state{channel = NChannel}};
        {ok, Replies, NChannel} ->
            {ok, next_msgs(Replies), State#state{channel = NChannel}};
        {shutdown, Reason, NChannel} ->
            shutdown(Reason, State#state{channel = NChannel});
        {shutdown, Reason, Packet, NChannel} ->
            NState = State#state{channel = NChannel},
            ok = handle_outgoing(Packet, NState),
            shutdown(Reason, NState)
    end.

emqx_channel.erl & emqx_session.erl:
為什么將這兩個放一起說呢,是因為他們倆是配合做事的旷档。
主要是處理MQTT的各種協(xié)議包了:CONNECT模叙,SUBSCRIBE,PUBLISH鞋屈,UNSUB向楼,DISCONN等等。有興趣的同學可以深入進去谐区,看看每一個協(xié)議包的處理流程,本次就不再贅述了逻卖。

截取connect的流程性代碼片段:
handle_in:

handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
    case pipeline([fun enrich_conninfo/2,
                   fun check_connect/2,
                   fun enrich_client/2,
                   fun set_logger_meta/2,
                   fun check_banned/2,
                   fun auth_connect/2
                  ], ConnPkt, Channel#channel{conn_state = connecting}) of
        {ok, NConnPkt, NChannel} ->
            process_connect(NConnPkt, NChannel);
        {error, ReasonCode, NChannel} ->
            handle_out(connack, {ReasonCode, ConnPkt}, NChannel)
    end;

process_connect:

%%--------------------------------------------------------------------
%% Process Connect
%%--------------------------------------------------------------------

process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
                Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
    case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
        {ok, #{session := Session, present := false}} ->
            NChannel = Channel#channel{session = Session},
            handle_out(connack, {?RC_SUCCESS, sp(false), ConnPkt}, NChannel);
        {ok, #{session := Session, present := true, pendings := Pendings}} ->
            %%TODO: improve later.
            Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
            NChannel = Channel#channel{session  = Session,
                                       resuming = true,
                                       pendings = Pendings1
                                      },
            handle_out(connack, {?RC_SUCCESS, sp(true), ConnPkt}, NChannel);
        {error, client_id_unavailable} ->
            handle_out(connack, {?RC_CLIENT_IDENTIFIER_NOT_VALID, ConnPkt}, Channel);
        {error, Reason} ->
            ?LOG(error, "Failed to open session due to ~p", [Reason]),
            handle_out(connack, {?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel)
    end.

可以簡單看出來:

  1. 收到connect請求后宋列,會嘗試建立session數(shù)據(jù)
  2. connect的返回結果,是調用handle_out
-spec(handle_out(atom(), term(), channel())
      -> {ok, channel()}
       | {ok, replies(), channel()}
       | {shutdown, Reason :: term(), channel()}
       | {shutdown, Reason :: term(), replies(), channel()}).
handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel) ->
    AckProps = run_fold([fun enrich_connack_caps/2,
                         fun enrich_server_keepalive/2,
                         fun enrich_assigned_clientid/2
                        ], #{}, Channel),
    AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
    return_connack(AckPacket,
                   ensure_keepalive(AckProps,
                                    ensure_connected(ConnPkt, Channel)));

handle_out(connack, {ReasonCode, _ConnPkt},
           Channel = #channel{conninfo   = ConnInfo,
                              clientinfo = ClientInfo}) ->
    ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]),
    AckPacket = ?CONNACK_PACKET(
                   case maps:get(proto_ver, ConnInfo) of
                       ?MQTT_PROTO_V5 -> ReasonCode;
                       _Other -> emqx_reason_codes:compat(connack, ReasonCode)
                   end),
    shutdown(emqx_reason_codes:name(ReasonCode), AckPacket, Channel);
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末评也,一起剝皮案震驚了整個濱河市炼杖,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌盗迟,老刑警劉巖坤邪,帶你破解...
    沈念sama閱讀 219,270評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異罚缕,居然都是意外死亡艇纺,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來黔衡,“玉大人蚓聘,你說我怎么就攤上這事∶私伲” “怎么了夜牡?”我有些...
    開封第一講書人閱讀 165,630評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長侣签。 經(jīng)常有香客問我塘装,道長,這世上最難降的妖魔是什么影所? 我笑而不...
    開封第一講書人閱讀 58,906評論 1 295
  • 正文 為了忘掉前任蹦肴,我火速辦了婚禮,結果婚禮上型檀,老公的妹妹穿的比我還像新娘冗尤。我一直安慰自己,他們只是感情好胀溺,可當我...
    茶點故事閱讀 67,928評論 6 392
  • 文/花漫 我一把揭開白布裂七。 她就那樣靜靜地躺著,像睡著了一般仓坞。 火紅的嫁衣襯著肌膚如雪背零。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,718評論 1 305
  • 那天无埃,我揣著相機與錄音徙瓶,去河邊找鬼。 笑死嫉称,一個胖子當著我的面吹牛侦镇,可吹牛的內容都是我干的。 我是一名探鬼主播织阅,決...
    沈念sama閱讀 40,442評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼壳繁,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了荔棉?” 一聲冷哼從身側響起闹炉,我...
    開封第一講書人閱讀 39,345評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎润樱,沒想到半個月后渣触,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,802評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡壹若,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,984評論 3 337
  • 正文 我和宋清朗相戀三年嗅钻,在試婚紗的時候發(fā)現(xiàn)自己被綠了皂冰。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,117評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡啊犬,死狀恐怖灼擂,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情觉至,我是刑警寧澤剔应,帶...
    沈念sama閱讀 35,810評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站语御,受9級特大地震影響峻贮,放射性物質發(fā)生泄漏。R本人自食惡果不足惜应闯,卻給世界環(huán)境...
    茶點故事閱讀 41,462評論 3 331
  • 文/蒙蒙 一纤控、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧碉纺,春花似錦船万、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,011評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至态贤,卻和暖如春舱呻,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背悠汽。 一陣腳步聲響...
    開封第一講書人閱讀 33,139評論 1 272
  • 我被黑心中介騙來泰國打工箱吕, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人柿冲。 一個月前我還...
    沈念sama閱讀 48,377評論 3 373
  • 正文 我出身青樓茬高,卻偏偏與公主長得像,于是被迫代替她去往敵國和親假抄。 傳聞我的和親對象是個殘疾皇子怎栽,可洞房花燭夜當晚...
    茶點故事閱讀 45,060評論 2 355

推薦閱讀更多精彩內容