(6)鏈接與消息封裝(Reactor部分)【Lars-基于C++負(fù)載均衡遠(yuǎn)程服務(wù)器調(diào)度系統(tǒng)教程】

【Lars教程目錄】

Lars源代碼
https://github.com/aceld/Lars


【Lars系統(tǒng)概述】
第1章-概述
第2章-項(xiàng)目目錄構(gòu)建


【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項(xiàng)目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機(jī)制
第8章-鏈接創(chuàng)建/銷毀Hook機(jī)制
第9章-消息任務(wù)隊(duì)列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測試
第14章-異步消息任務(wù)機(jī)制
第15章-鏈接屬性設(shè)置功能


【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項(xiàng)目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實(shí)時(shí)監(jiān)控


【Lars系統(tǒng)之Report Service模塊】
第1章-項(xiàng)目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報(bào)數(shù)據(jù)
第3章-存儲(chǔ)線程池及消息隊(duì)列


【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項(xiàng)目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計(jì)與實(shí)現(xiàn)
第4章-負(fù)載均衡模塊基礎(chǔ)設(shè)計(jì)
第5章-負(fù)載均衡獲取Host主機(jī)信息API
第6章-負(fù)載均衡上報(bào)Host主機(jī)信息API
第7章-過期窗口清理與過載超時(shí)(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負(fù)載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測試工具
第12章- Lars啟動(dòng)工具腳本


5) tcp鏈接與Message消息封裝

? 好了渗鬼,現(xiàn)在我們來將服務(wù)器的連接做一個(gè)簡單的封裝讯赏,在這之前励翼,我們要將我我們所發(fā)的數(shù)據(jù)做一個(gè)規(guī)定买决,采用TLV的格式睦柴,來進(jìn)行封裝。目的是解決TCP傳輸?shù)恼嘲鼏栴}扎筒。

5.1 Message消息封裝

7-TCP粘包問題-拆包封包過程.jpeg

? 先創(chuàng)建一個(gè)message.h頭文件

lars_reactor/include/message.h

#pragma once

//解決tcp粘包問題的消息頭
struct msg_head
{
    int msgid;
    int msglen;
};

//消息頭的二進(jìn)制長度艰山,固定數(shù)
#define MESSAGE_HEAD_LEN 8

//消息頭+消息體的最大長度限制
#define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN)

? 接下來我們每次在server和 client之間傳遞數(shù)據(jù)的時(shí)候窗宦,都發(fā)送這種數(shù)據(jù)格式的頭再加上后面的數(shù)據(jù)內(nèi)容即可赦颇。

5.2 創(chuàng)建一個(gè)tcp_conn連接類

lars_reactor/include/tcp_conn.h

#pragma once

#include "reactor_buf.h"
#include "event_loop.h"

//一個(gè)tcp的連接信息
class tcp_conn
{
public:
    //初始化tcp_conn
    tcp_conn(int connfd, event_loop *loop);

    //處理讀業(yè)務(wù)
    void do_read();

    //處理寫業(yè)務(wù)
    void do_write();

    //銷毀tcp_conn
    void clean_conn();

    //發(fā)送消息的方法
    int send_message(const char *data, int msglen, int msgid);

private:
    //當(dāng)前鏈接的fd
    int _connfd;
    //該連接歸屬的event_poll
    event_loop *_loop;
    //輸出buf
    output_buf obuf;     
    //輸入buf
    input_buf ibuf;
};

簡單說明一下里面的成員和方法:

成員:

_connfd:server剛剛accept成功的套接字

_loop:當(dāng)前鏈接所綁定的事件觸發(fā)句柄.

obuf:鏈接輸出緩沖,向?qū)Χ藢憯?shù)據(jù)

ibuf:鏈接輸入緩沖赴涵,從對端讀數(shù)據(jù)

方法

tcp_client():構(gòu)造媒怯,主要在里面實(shí)現(xiàn)初始化及創(chuàng)建鏈接鏈接的connect過程。

do_read():讀數(shù)據(jù)處理業(yè)務(wù)髓窜,主要是EPOLLIN事件觸發(fā)扇苞。

do_write():寫數(shù)據(jù)處理業(yè)務(wù),主要是EPOLLOUT事件觸發(fā)寄纵。

clean_conn():清空鏈接資源鳖敷。

send_message():將消息打包成TLV格式發(fā)送給對端。

? 接下來程拭,實(shí)現(xiàn)以下tcp_conn類.

lars_reactor/src/tcp_conn.cpp

#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <string.h>

#include "tcp_conn.h"
#include "message.h"


//回顯業(yè)務(wù)
void callback_busi(const char *data, uint32_t len, int msgid, void *args, tcp_conn *conn)
{
    conn->send_message(data, len, msgid);
}


//連接的讀事件回調(diào)
static void conn_rd_callback(event_loop *loop, int fd, void *args)
{
    tcp_conn *conn = (tcp_conn*)args;
    conn->do_read();
}
//連接的寫事件回調(diào)
static void conn_wt_callback(event_loop *loop, int fd, void *args)
{
    tcp_conn *conn = (tcp_conn*)args;
    conn->do_write();
}

//初始化tcp_conn
tcp_conn::tcp_conn(int connfd, event_loop *loop)
{
    _connfd = connfd;
    _loop = loop;
    //1. 將connfd設(shè)置成非阻塞狀態(tài)
    int flag = fcntl(_connfd, F_GETFL, 0);
    fcntl(_connfd, F_SETFL, O_NONBLOCK|flag);

    //2. 設(shè)置TCP_NODELAY禁止做讀寫緩存定踱,降低小包延遲
    int op = 1;
    setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h

    //3. 將該鏈接的讀事件讓event_loop監(jiān)控 
    _loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this);

    //4 將該鏈接集成到對應(yīng)的tcp_server中
    //TODO
}

//處理讀業(yè)務(wù)
void tcp_conn::do_read()
{
    //1. 從套接字讀取數(shù)據(jù)
    int ret = ibuf.read_data(_connfd);
    if (ret == -1) {
        fprintf(stderr, "read data from socket\n");
        this->clean_conn();
        return ;
    }
    else if ( ret == 0) {
        //對端正常關(guān)閉
        printf("connection closed by peer\n");
        clean_conn();
        return ;
    }

    //2. 解析msg_head數(shù)據(jù)    
    msg_head head;    
    
    //[這里用while,可能一次性讀取多個(gè)完整包過來]
    while (ibuf.length() >= MESSAGE_HEAD_LEN)  {
        //2.1 讀取msg_head頭部恃鞋,固定長度MESSAGE_HEAD_LEN    
        memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN);
        if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) {
            fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen);
            this->clean_conn();
            break;
        }
        if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) {
            //緩存buf中剩余的數(shù)據(jù)崖媚,小于實(shí)際上應(yīng)該接受的數(shù)據(jù)
            //說明是一個(gè)不完整的包亦歉,應(yīng)該拋棄
            break;
        }

        //2.2 再根據(jù)頭長度讀取數(shù)據(jù)體,然后針對數(shù)據(jù)體處理 業(yè)務(wù)
        //TODO 添加包路由模式
        
        //頭部處理完了畅哑,往后偏移MESSAGE_HEAD_LEN長度
        ibuf.pop(MESSAGE_HEAD_LEN);
        
        //處理ibuf.data()業(yè)務(wù)數(shù)據(jù)
        printf("read data: %s\n", ibuf.data());

        //回顯業(yè)務(wù)
        callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this);

        //消息體處理完了,往后便宜msglen長度
        ibuf.pop(head.msglen);
    }

    ibuf.adjust();
    
    return ;
}

//處理寫業(yè)務(wù)
void tcp_conn::do_write()
{
    //do_write是觸發(fā)玩event事件要處理的事情肴楷,
    //應(yīng)該是直接將out_buf力度數(shù)據(jù)io寫會(huì)對方客戶端 
    //而不是在這里組裝一個(gè)message再發(fā)
    //組裝message的過程應(yīng)該是主動(dòng)調(diào)用
    
    //只要obuf中有數(shù)據(jù)就寫
    while (obuf.length()) {
        int ret = obuf.write2fd(_connfd);
        if (ret == -1) {
            fprintf(stderr, "write2fd error, close conn!\n");
            this->clean_conn();
            return ;
        }
        if (ret == 0) {
            //不是錯(cuò)誤,僅返回0表示不可繼續(xù)寫
            break;
        }
    }

    if (obuf.length() == 0) {
        //數(shù)據(jù)已經(jīng)全部寫完荠呐,將_connfd的寫事件取消掉
        _loop->del_io_event(_connfd, EPOLLOUT);
    }

    return ;
}

//發(fā)送消息的方法
int tcp_conn::send_message(const char *data, int msglen, int msgid)
{
    printf("server send_message: %s:%d, msgid = %d\n", data, msglen, msgid);
    bool active_epollout = false; 
    if(obuf.length() == 0) {
        //如果現(xiàn)在已經(jīng)數(shù)據(jù)都發(fā)送完了阶祭,那么是一定要激活寫事件的
        //如果有數(shù)據(jù),說明數(shù)據(jù)還沒有完全寫完到對端直秆,那么沒必要再激活等寫完再激活
        active_epollout = true;
    }

    //1 先封裝message消息頭
    msg_head head;
    head.msgid = msgid;
    head.msglen = msglen;
     
    //1.1 寫消息頭
    int ret = obuf.send_data((const char *)&head, MESSAGE_HEAD_LEN);
    if (ret != 0) {
        fprintf(stderr, "send head error\n");
        return -1;
    }

    //1.2 寫消息體
    ret = obuf.send_data(data, msglen);
    if (ret != 0) {
        //如果寫消息體失敗,那就回滾將消息頭的發(fā)送也取消
        obuf.pop(MESSAGE_HEAD_LEN);
        return -1;
    }

    if (active_epollout == true) {
        //2. 激活EPOLLOUT寫事件
        _loop->add_io_event(_connfd, conn_wt_callback, EPOLLOUT, this);
    }


    return 0;
}

//銷毀tcp_conn
void tcp_conn::clean_conn()
{
    //鏈接清理工作
    //1 將該鏈接從tcp_server摘除掉    
    //TODO 
    //2 將該鏈接從event_loop中摘除
    _loop->del_io_event(_connfd);
    //3 buf清空
    ibuf.clear(); 
    obuf.clear();
    //4 關(guān)閉原始套接字
    int fd = _connfd;
    _connfd = -1;
    close(fd);
}

? 具體每個(gè)方法的實(shí)現(xiàn)鞭盟,都很清晰圾结。其中conn_rd_callback()conn_wt_callback()是注冊讀寫事件的回調(diào)函數(shù),設(shè)置為static是因?yàn)楹瘮?shù)類型沒有this指針齿诉。在里面分別再調(diào)用do_read()do_write()方法筝野。

5.3 修正tcp_server對accept之后的處理方法

lars_reactor/src/tcp_server.cpp

//...

//開始提供創(chuàng)建鏈接服務(wù)
void tcp_server::do_accept()
{
    int connfd;    
    while(true) {
        //accept與客戶端創(chuàng)建鏈接
        printf("begin accept\n");
        connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
        if (connfd == -1) {
            if (errno == EINTR) {
                fprintf(stderr, "accept errno=EINTR\n");
                continue;
            }
            else if (errno == EMFILE) {
                //建立鏈接過多,資源不夠
                fprintf(stderr, "accept errno=EMFILE\n");
            }
            else if (errno == EAGAIN) {
                fprintf(stderr, "accept errno=EAGAIN\n");
                break;
            }
            else {
                fprintf(stderr, "accept error");
                exit(1);
            }
        }
        else {
            //accept succ!
            // ============= 將之前的觸發(fā)回調(diào)的刪掉粤剧,改成如下====
            tcp_conn *conn = new tcp_conn(connfd, _loop);
            if (conn == NULL) {
                fprintf(stderr, "new tcp_conn error\n");
                exit(1);
            }
            // ============================================
            printf("get new connection succ!\n");
            break;
        }
    }
}

//...

? 這樣歇竟,每次accept成功之后,創(chuàng)建一個(gè)與當(dāng)前客戶端套接字綁定的tcp_conn對象抵恋。在構(gòu)造里就完成了基本的對于EPOLLIN事件的監(jiān)聽和回調(diào)動(dòng)作.

? 現(xiàn)在可以先編譯一下焕议,保證沒有語法錯(cuò)誤,但是如果想測試弧关,就不能夠使用nc指令測試了盅安,因?yàn)楝F(xiàn)在服務(wù)端只能夠接收我們自定義的TLV格式的報(bào)文。那么我們需要自己寫一個(gè)客戶端來完成基本的測試世囊。


關(guān)于作者:

作者:Aceld(劉丹冰)

mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld

原創(chuàng)聲明:未經(jīng)作者允許請勿轉(zhuǎn)載, 如果轉(zhuǎn)載請注明出處

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末别瞭,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子株憾,更是在濱河造成了極大的恐慌蝙寨,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,222評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嗤瞎,死亡現(xiàn)場離奇詭異墙歪,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)猫胁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,455評論 3 385
  • 文/潘曉璐 我一進(jìn)店門箱亿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人弃秆,你說我怎么就攤上這事届惋∷杳保” “怎么了?”我有些...
    開封第一講書人閱讀 157,720評論 0 348
  • 文/不壞的土叔 我叫張陵脑豹,是天一觀的道長郑藏。 經(jīng)常有香客問我,道長瘩欺,這世上最難降的妖魔是什么必盖? 我笑而不...
    開封第一講書人閱讀 56,568評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮俱饿,結(jié)果婚禮上歌粥,老公的妹妹穿的比我還像新娘。我一直安慰自己拍埠,他們只是感情好失驶,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,696評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著枣购,像睡著了一般嬉探。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上棉圈,一...
    開封第一講書人閱讀 49,879評論 1 290
  • 那天涩堤,我揣著相機(jī)與錄音,去河邊找鬼分瘾。 笑死胎围,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的德召。 我是一名探鬼主播痊远,決...
    沈念sama閱讀 39,028評論 3 409
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼氏捞!你這毒婦竟也來了碧聪?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,773評論 0 268
  • 序言:老撾萬榮一對情侶失蹤液茎,失蹤者是張志新(化名)和其女友劉穎逞姿,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體捆等,經(jīng)...
    沈念sama閱讀 44,220評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡滞造,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,550評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了栋烤。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片谒养。...
    茶點(diǎn)故事閱讀 38,697評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖明郭,靈堂內(nèi)的尸體忽然破棺而出买窟,到底是詐尸還是另有隱情丰泊,我是刑警寧澤,帶...
    沈念sama閱讀 34,360評論 4 332
  • 正文 年R本政府宣布始绍,位于F島的核電站瞳购,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏亏推。R本人自食惡果不足惜学赛,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,002評論 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望吞杭。 院中可真熱鬧盏浇,春花似錦、人聲如沸芽狗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,782評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽译蒂。三九已至,卻和暖如春谊却,著一層夾襖步出監(jiān)牢的瞬間柔昼,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,010評論 1 266
  • 我被黑心中介騙來泰國打工炎辨, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留捕透,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,433評論 2 360
  • 正文 我出身青樓碴萧,卻偏偏與公主長得像乙嘀,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子破喻,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,587評論 2 350

推薦閱讀更多精彩內(nèi)容