(13)udp服務(wù)與客戶端(Reactor部分)-【Lars-基于C++負載均衡遠程服務(wù)器調(diào)度系統(tǒng)教程】

【Lars教程目錄】

Lars源代碼
https://github.com/aceld/Lars


【Lars系統(tǒng)概述】
第1章-概述
第2章-項目目錄構(gòu)建


【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機制
第8章-鏈接創(chuàng)建/銷毀Hook機制
第9章-消息任務(wù)隊列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測試
第14章-異步消息任務(wù)機制
第15章-鏈接屬性設(shè)置功能


【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實時監(jiān)控


【Lars系統(tǒng)之Report Service模塊】
第1章-項目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報數(shù)據(jù)
第3章-存儲線程池及消息隊列


【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計與實現(xiàn)
第4章-負載均衡模塊基礎(chǔ)設(shè)計
第5章-負載均衡獲取Host主機信息API
第6章-負載均衡上報Host主機信息API
第7章-過期窗口清理與過載超時(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測試工具
第12章- Lars啟動工具腳本


12) udp服務(wù)與客戶端

? 接下來為了讓Reactor框架功能更加豐富殿雪,結(jié)合之前的功能胳泉,再加上udpserver的服務(wù)接口锤窑。udp我們暫時不考慮加線程池實現(xiàn)计福,只是單線程的處理方式涝动。

12.1 udp_server服務(wù)端功能實現(xiàn)

lars_reactor/include/udp_server.h

#pragma  once

#include <netinet/in.h>
#include "event_loop.h"
#include "net_connection.h"
#include "message.h"

class udp_server :public net_connection 
{
public:
    udp_server(event_loop *loop, const char *ip, uint16_t port);

    virtual int send_message(const char *data, int msglen, int msgid);

    //注冊消息路由回調(diào)函數(shù)
    void add_msg_router(int msgid, msg_callback* cb, void *user_data = NULL);

    ~udp_server();

    //處理消息業(yè)務(wù)
    void do_read();
    
private:
    int _sockfd;

    char _read_buf[MESSAGE_LENGTH_LIMIT];
    char _write_buf[MESSAGE_LENGTH_LIMIT];

    //事件觸發(fā)
    event_loop* _loop;

    //服務(wù)端ip
    struct sockaddr_in _client_addr;
    socklen_t _client_addrlen;
    
    //消息路由分發(fā)
    msg_router _router;
};

? 對應(yīng)的方法實現(xiàn)方式如下:

lars_reactor/src/udp_server.cpp

#include <signal.h>
#include <unistd.h>
#include <strings.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include "udp_server.h"


void read_callback(event_loop *loop, int fd, void *args)
{
    udp_server *server = (udp_server*)args;

    //處理業(yè)務(wù)函數(shù)
    server->do_read();
}

void udp_server::do_read()
{
    while (true) {
        int pkg_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, (struct sockaddr *)&_client_addr, &_client_addrlen);

        if (pkg_len == -1) {
            if (errno == EINTR) {
                continue;
            }
            else if (errno == EAGAIN) {
                break;
            }
            else {
                perror("recvfrom\n");
                break;
            }
        }

        //處理數(shù)據(jù)
        msg_head head; 
        memcpy(&head, _read_buf, MESSAGE_HEAD_LEN);
        if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkg_len) {
            //報文格式有問題
            fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkg_len = %d\n", head.msgid, head.msglen, pkg_len);
            continue;
        }

        //調(diào)用注冊的路由業(yè)務(wù)
        _router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this);
    }
}


udp_server::udp_server(event_loop *loop, const char *ip, uint16_t port)
{
    //1 忽略一些信號
    if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {
        perror("signal ignore SIGHUB");
        exit(1);
    }
    if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
        perror("signal ignore SIGPIPE");
        exit(1);
    }
    
    //2 創(chuàng)建套接字
    //SOCK_CLOEXEC在execl中使用該socket則關(guān)閉蘸嘶,在fork中使用該socket不關(guān)閉
    _sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);
    if (_sockfd == -1) {
        perror("create udp socket");
        exit(1);
    }

    //3 設(shè)置服務(wù)ip+port
    struct sockaddr_in servaddr;
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_aton(ip, &servaddr.sin_addr);//設(shè)置ip
    servaddr.sin_port = htons(port);//設(shè)置端口

    //4 綁定
    bind(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));
    
    //3 添加讀業(yè)務(wù)事件
    _loop = loop;

    bzero(&_client_addr, sizeof(_client_addr));
    _client_addrlen = sizeof(_client_addr);
    

    printf("server on %s:%u is running...\n", ip, port);

    _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
    
}

int udp_server::send_message(const char *data, int msglen, int msgid)
{
    if (msglen > MESSAGE_LENGTH_LIMIT) {
        fprintf(stderr, "too large message to send\n");
        return -1;
    }

    msg_head head;
    head.msglen = msglen;
    head.msgid = msgid;

    memcpy(_write_buf,  &head, MESSAGE_HEAD_LEN);
    memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen);

    int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, (struct sockaddr*)&_client_addr, _client_addrlen);
    if (ret == -1) {
        perror("sendto()..");
        return -1;
    }

    return ret;
}

//注冊消息路由回調(diào)函數(shù)
void udp_server::add_msg_router(int msgid, msg_callback* cb, void *user_data)
{
    _router.register_msg_router(msgid, cb, user_data);
}

udp_server::~udp_server()
{
    _loop->del_io_event(_sockfd);
    close(_sockfd);
}

? 這里面實現(xiàn)的方式和tcp_server的實現(xiàn)方式幾乎一樣,需要注意的是瞬雹,udp的socket編程是不需要listen的,而且也不需要accept刽虹。所以recvfrom就能夠得知每個包的對應(yīng)客戶端是誰酗捌,然后回執(zhí)消息給對應(yīng)的客戶端就可以。因為沒有連接涌哲,所以都是以包為單位來處理的胖缤,一個包一個包處理》Щ可能相鄰的兩個包來自不同的客戶端哪廓。

12.2 udp_client客戶端功能實現(xiàn)

lars_reactor/include/udp_client.h

#pragma once

#include "net_connection.h"
#include "message.h"
#include "event_loop.h"

class udp_client: public net_connection
{
public:
    udp_client(event_loop *loop, const char *ip, uint16_t port);
    ~udp_client();

    void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL);

    virtual int send_message(const char *data, int msglen, int msgid);

    //處理消息
    void do_read();

private:

    int _sockfd;

    char _read_buf[MESSAGE_LENGTH_LIMIT];
    char _write_buf[MESSAGE_LENGTH_LIMIT];

    //事件觸發(fā)
    event_loop *_loop;

    //消息路由分發(fā)
    msg_router _router;
};

lars_reactor/src/udp_client.cpp

#include "udp_client.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <strings.h>
#include <string.h>
#include <stdio.h>


void read_callback(event_loop *loop, int fd, void *args)
{
    udp_client *client = (udp_client*)args;
    client->do_read();
}



udp_client::udp_client(event_loop *loop, const char *ip, uint16_t port)
{
    //1 創(chuàng)建套接字
    _sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);
    if (_sockfd == -1) {
        perror("create socket error");
        exit(1);
    }

    struct sockaddr_in servaddr;
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_aton(ip, &servaddr.sin_addr);
    servaddr.sin_port = htons(port);

    //2 鏈接
    int ret = connect(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));
    if (ret  == -1) {
        perror("connect");
        exit(1);
    }


    //3 添加讀事件
    _loop = loop; 
    _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
}

udp_client::~udp_client()
{
    _loop->del_io_event(_sockfd);
    close(_sockfd);
}

//處理消息
void udp_client::do_read()
{
    while (true) {
        int pkt_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, NULL, NULL);
        if (pkt_len == -1) {
            if (errno == EINTR) {
                continue;
            }
            else if (errno == EAGAIN) {
                break;
            }
            else {
                perror("recvfrom()");
                break;
            }
        }

        //處理客戶端包
        msg_head head; 
        memcpy(&head, _read_buf, MESSAGE_HEAD_LEN);
        if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkt_len) {
            //報文格式有問題
            fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkt_len = %d\n", head.msgid, head.msglen, pkt_len);
            continue;
        }

        //調(diào)用注冊的路由業(yè)務(wù)
        _router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this);
    }
}
    
void udp_client::add_msg_router(int msgid, msg_callback *cb, void *user_data)
{
    _router.register_msg_router(msgid, cb, user_data);
}

int udp_client::send_message(const char *data, int msglen, int msgid)
{
    if (msglen > MESSAGE_LENGTH_LIMIT) {
        fprintf(stderr, "too large message to send\n");
        return -1;
    }

    msg_head head;
    head.msglen = msglen;
    head.msgid = msgid;

    memcpy(_write_buf,  &head, MESSAGE_HEAD_LEN);
    memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen);

    int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, NULL, 0);
    if (ret == -1) {
        perror("sendto()..");
        return -1;
    }

    return ret;
}

? 客戶端和服務(wù)端代碼除了構(gòu)造函數(shù)不同,其他基本差不多初烘。接下來我們可以測試一下udp的通信功能

12.3 完成Lars Reactor V0.10開發(fā)

服務(wù)端

server.cpp

#include <string>
#include <string.h>
#include "config_file.h"
#include "udp_server.h"

//回顯業(yè)務(wù)的回調(diào)函數(shù)
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
    printf("callback_busi ...\n");
    //直接回顯
    conn->send_message(data, len, msgid);
}

int main() 
{
    event_loop loop;

    //加載配置文件
    config_file::setPath("./serv.conf");
    std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
    short port = config_file::instance()->GetNumber("reactor", "port", 8888);

    printf("ip = %s, port = %d\n", ip.c_str(), port);

    udp_server server(&loop, ip.c_str(), port);

    //注冊消息業(yè)務(wù)路由
    server.add_msg_router(1, callback_busi);

    loop.event_process();

    return 0;
}

客戶端

client.cpp

#include <stdio.h>
#include <string.h>

#include "udp_client.h"


//客戶端業(yè)務(wù)
void busi(const char *data, uint32_t len, int msgid, net_connection  *conn, void *user_data)
{
    //得到服務(wù)端回執(zhí)的數(shù)據(jù) 
    char *str = NULL;
    
    str = (char*)malloc(len+1);
    memset(str, 0, len+1);
    memcpy(str, data, len);
    printf("recv server: [%s]\n", str);
    printf("msgid: [%d]\n", msgid);
    printf("len: [%d]\n", len);
}


int main() 
{
    event_loop loop;

    //創(chuàng)建udp客戶端
    udp_client client(&loop, "127.0.0.1", 7777);


    //注冊消息路由業(yè)務(wù)
    client.add_msg_router(1, busi);

    //發(fā)消息
    int msgid = 1; 
    const char *msg = "Hello Lars!";

    client.send_message(msg, strlen(msg), msgid);

    //開啟事件監(jiān)聽
    loop.event_process();

    return 0;
}

啟動服務(wù)端和客戶端并允許涡真,結(jié)果如下:

server

$ ./server 
ip = 127.0.0.1, port = 7777
msg_router init...
server on 127.0.0.1:7777 is running...
add msg cb msgid = 1
call msgid = 1
call data = Hello Lars!
call msglen = 11
callback_busi ...
=======

client

$ ./client 
msg_router init...
add msg cb msgid = 1
call msgid = 1
call data = Hello Lars!
call msglen = 11
recv server: [Hello Lars!]
msgid: [1]
len: [11]
=======

關(guān)于作者:

作者:Aceld(劉丹冰)

mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld

原創(chuàng)聲明:未經(jīng)作者允許請勿轉(zhuǎn)載, 如果轉(zhuǎn)載請注明出處

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市肾筐,隨后出現(xiàn)的幾起案子哆料,更是在濱河造成了極大的恐慌,老刑警劉巖吗铐,帶你破解...
    沈念sama閱讀 212,222評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件东亦,死亡現(xiàn)場離奇詭異,居然都是意外死亡唬渗,警方通過查閱死者的電腦和手機典阵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,455評論 3 385
  • 文/潘曉璐 我一進店門奋渔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人萄喳,你說我怎么就攤上這事卒稳。” “怎么了他巨?”我有些...
    開封第一講書人閱讀 157,720評論 0 348
  • 文/不壞的土叔 我叫張陵充坑,是天一觀的道長。 經(jīng)常有香客問我染突,道長捻爷,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,568評論 1 284
  • 正文 為了忘掉前任份企,我火速辦了婚禮也榄,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘司志。我一直安慰自己甜紫,他們只是感情好,可當我...
    茶點故事閱讀 65,696評論 6 386
  • 文/花漫 我一把揭開白布骂远。 她就那樣靜靜地躺著囚霸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪激才。 梳的紋絲不亂的頭發(fā)上拓型,一...
    開封第一講書人閱讀 49,879評論 1 290
  • 那天,我揣著相機與錄音瘸恼,去河邊找鬼劣挫。 笑死,一個胖子當著我的面吹牛东帅,可吹牛的內(nèi)容都是我干的压固。 我是一名探鬼主播,決...
    沈念sama閱讀 39,028評論 3 409
  • 文/蒼蘭香墨 我猛地睜開眼靠闭,長吁一口氣:“原來是場噩夢啊……” “哼邓夕!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起阎毅,我...
    開封第一講書人閱讀 37,773評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎点弯,沒想到半個月后扇调,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,220評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡抢肛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,550評論 2 327
  • 正文 我和宋清朗相戀三年狼钮,在試婚紗的時候發(fā)現(xiàn)自己被綠了碳柱。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,697評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡熬芜,死狀恐怖莲镣,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情涎拉,我是刑警寧澤瑞侮,帶...
    沈念sama閱讀 34,360評論 4 332
  • 正文 年R本政府宣布,位于F島的核電站鼓拧,受9級特大地震影響半火,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜季俩,卻給世界環(huán)境...
    茶點故事閱讀 40,002評論 3 315
  • 文/蒙蒙 一钮糖、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧酌住,春花似錦店归、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,782評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至祭示,卻和暖如春肄满,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背质涛。 一陣腳步聲響...
    開封第一講書人閱讀 32,010評論 1 266
  • 我被黑心中介騙來泰國打工稠歉, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人汇陆。 一個月前我還...
    沈念sama閱讀 46,433評論 2 360
  • 正文 我出身青樓怒炸,卻偏偏與公主長得像,于是被迫代替她去往敵國和親毡代。 傳聞我的和親對象是個殘疾皇子阅羹,可洞房花燭夜當晚...
    茶點故事閱讀 43,587評論 2 350

推薦閱讀更多精彩內(nèi)容