【Lars教程目錄】
Lars源代碼
https://github.com/aceld/Lars
【Lars系統(tǒng)概述】
第1章-概述
第2章-項(xiàng)目目錄構(gòu)建
【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項(xiàng)目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機(jī)制
第8章-鏈接創(chuàng)建/銷毀Hook機(jī)制
第9章-消息任務(wù)隊(duì)列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測試
第14章-異步消息任務(wù)機(jī)制
第15章-鏈接屬性設(shè)置功能
【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項(xiàng)目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實(shí)時(shí)監(jiān)控
【Lars系統(tǒng)之Report Service模塊】
第1章-項(xiàng)目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報(bào)數(shù)據(jù)
第3章-存儲(chǔ)線程池及消息隊(duì)列
【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項(xiàng)目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計(jì)與實(shí)現(xiàn)
第4章-負(fù)載均衡模塊基礎(chǔ)設(shè)計(jì)
第5章-負(fù)載均衡獲取Host主機(jī)信息API
第6章-負(fù)載均衡上報(bào)Host主機(jī)信息API
第7章-過期窗口清理與過載超時(shí)(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負(fù)載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測試工具
第12章- Lars啟動(dòng)工具腳本
8) 消息業(yè)務(wù)路由分發(fā)機(jī)制
? 現(xiàn)在我們發(fā)送的消息都是message結(jié)構(gòu)的鼠锈,有個(gè)message頭里面其中有兩個(gè)關(guān)鍵的字段父丰,msgid
和msglen
,其中加入msgid
的意義就是我們可以甄別是哪個(gè)消息冠桃,從而對(duì)這類消息做出不同的業(yè)務(wù)處理煞肾。但是現(xiàn)在我們無論是服務(wù)端還是客戶端都是寫死的兩個(gè)業(yè)務(wù)咧织,就是"回顯業(yè)務(wù)",顯然這并不滿足我們作為服務(wù)器框架的需求。我們需要開發(fā)者可以注冊(cè)自己的回調(diào)業(yè)務(wù)籍救。所以我們需要提供一個(gè)注冊(cè)業(yè)務(wù)的入口习绢,然后在后端根據(jù)不同的msgid
來激活不同的回調(diào)業(yè)務(wù)函數(shù)。
8.1 添加消息分發(fā)路由類msg_router
? 下面我們提供這樣一個(gè)中轉(zhuǎn)的router模塊,在include/message.h添加
lars_reactor/include/message.h
#pragma once
#include <ext/hash_map>
//解決tcp粘包問題的消息頭
struct msg_head
{
int msgid;
int msglen;
};
//消息頭的二進(jìn)制長度钧忽,固定數(shù)
#define MESSAGE_HEAD_LEN 8
//消息頭+消息體的最大長度限制
#define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN)
//msg 業(yè)務(wù)回調(diào)函數(shù)原型
//===================== 消息分發(fā)路由機(jī)制 ==================
class tcp_client;
typedef void msg_callback(const char *data, uint32_t len, int msgid, tcp_client *client, void *user_data);
//消息路由分發(fā)機(jī)制
class msg_router
{
public:
msg_router():_router(),_args() {}
//給一個(gè)消息ID注冊(cè)一個(gè)對(duì)應(yīng)的回調(diào)業(yè)務(wù)函數(shù)
int register_msg_router(int msgid, msg_callback *msg_cb, void *user_data)
{
if(_router.find(msgid) != _router.end()) {
//該msgID的回調(diào)業(yè)務(wù)已經(jīng)存在
return -1;
}
_router[msgid] = msg_cb;
_args[msgid] = user_data;
return 0;
}
//調(diào)用注冊(cè)的對(duì)應(yīng)的回調(diào)業(yè)務(wù)函數(shù)
void call(int msgid, uint32_t msglen, const char *data, tcp_client *client)
{
//判斷msgid對(duì)應(yīng)的回調(diào)是否存在
if (_router.find(msgid) == _router.end()) {
fprintf(stderr, "msgid %d is not register!\n", msgid);
return;
}
//直接取出回調(diào)函數(shù)毯炮,執(zhí)行
msg_callback *callback = _router[msgid];
void *user_data = _args[msgid];
callback(data, msglen, msgid, client, user_data);
}
private:
//針對(duì)消息的路由分發(fā)逼肯,key為msgID, value為注冊(cè)的回調(diào)業(yè)務(wù)函數(shù)
__gnu_cxx::hash_map<int, msg_callback *> _router;
//回調(diào)業(yè)務(wù)函數(shù)對(duì)應(yīng)的參數(shù),key為msgID, value為對(duì)應(yīng)的參數(shù)
__gnu_cxx::hash_map<int, void *> _args;
};
//===================== 消息分發(fā)路由機(jī)制 ==================
? 開發(fā)者需要注冊(cè)一個(gè)msg_callback
類型的函數(shù)桃煎,通過msg_router
類的register_msg_router()
方法來注冊(cè)篮幢,同時(shí)通過call()
方法來調(diào)用。
? 全部回調(diào)業(yè)務(wù)函數(shù)和msgid的對(duì)應(yīng)關(guān)系保存在一個(gè)hash_map類型的_router
map中为迈,_args
保存對(duì)應(yīng)的參數(shù)三椿。
? 但是這里有個(gè)小細(xì)節(jié)需要注意一下,msg_callback
的函數(shù)類型聲明是這樣的葫辐。
typedef void msg_callback(const char *data, uint32_t len, int msgid, tcp_client *client, void *user_data);
? 其中這里面第4個(gè)參數(shù)搜锰,只能是tcp_client類型的參數(shù),也就是我們之前的設(shè)計(jì)的msg_callback只支持tcp_client的消息回調(diào)機(jī)制耿战,但是很明顯我們的需求是不僅是tcp_client
要用蛋叼,tcp_server中的tcp_conn
也要用到這個(gè)機(jī)制,那么很顯然這個(gè)參數(shù)在這就不是很合適剂陡,那么如果設(shè)定一個(gè)形參既能指向tcp_client
又能能指向tcp_conn
兩個(gè)類型呢狈涮,當(dāng)然答案就只能是將這兩個(gè)類抽象出來一層,用父類指針指向子類然后通過多態(tài)特性來調(diào)用就可以了鸭栖,所以我們需要先定義一個(gè)抽象類歌馍。
8.2 鏈接抽象類創(chuàng)建
? 經(jīng)過分析,我們定義如下的抽象類晕鹊,并提供一些接口松却。
lars_reactor/include/net_connection.h
#pragma once
/*
*
* 網(wǎng)絡(luò)通信的抽象類,任何需要進(jìn)行收發(fā)消息的模塊溅话,都可以實(shí)現(xiàn)該類
*
* */
class net_connection
{
public:
//發(fā)送消息的接口
virtual int send_message(const char *data, int datalen, int msgid) = 0;
};
? 然后讓我們tcp_server端的tcp_conn
類繼承net_connecton
, 客戶端的tcp_client
繼承net_connection
lars_reactor/include/tcp_conn.h
class tcp_conn : public net_connection
{
//...
};
lars_reactor/include/tcp_client.h
class tcp_client : public net_connection
{
//...
}
這樣晓锻,我們就可以用一個(gè)net_connection指針指向這兩種不同的對(duì)象實(shí)例了。
? 接下來我們將msg_callback
回調(diào)業(yè)務(wù)函數(shù)類型改成
typedef void msg_callback(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data);
? 這樣這個(gè)業(yè)務(wù)函數(shù)就可以支持tcp_conn和tcp_client了公荧。
所以修改之后带射,我們的msg_router
類定義如下:
lars_reactor/include/message.h
//消息路由分發(fā)機(jī)制
class msg_router
{
public:
msg_router(): {
printf("msg router init ...\n");
}
//給一個(gè)消息ID注冊(cè)一個(gè)對(duì)應(yīng)的回調(diào)業(yè)務(wù)函數(shù)
int register_msg_router(int msgid, msg_callback *msg_cb, void *user_data)
{
if(_router.find(msgid) != _router.end()) {
//該msgID的回調(diào)業(yè)務(wù)已經(jīng)存在
return -1;
}
printf("add msg cb msgid = %d\n", msgid);
_router[msgid] = msg_cb;
_args[msgid] = user_data;
return 0;
}
//調(diào)用注冊(cè)的對(duì)應(yīng)的回調(diào)業(yè)務(wù)函數(shù)
void call(int msgid, uint32_t msglen, const char *data, net_connection *net_conn)
{
printf("call msgid = %d\n", msgid);
//判斷msgid對(duì)應(yīng)的回調(diào)是否存在
if (_router.find(msgid) == _router.end()) {
fprintf(stderr, "msgid %d is not register!\n", msgid);
return;
}
//直接取出回調(diào)函數(shù),執(zhí)行
msg_callback *callback = _router[msgid];
void *user_data = _args[msgid];
callback(data, msglen, msgid, net_conn, user_data);
printf("=======\n");
}
private:
//針對(duì)消息的路由分發(fā)循狰,key為msgID, value為注冊(cè)的回調(diào)業(yè)務(wù)函數(shù)
__gnu_cxx::hash_map<int, msg_callback*> _router;
//回調(diào)業(yè)務(wù)函數(shù)對(duì)應(yīng)的參數(shù)窟社,key為msgID, value為對(duì)應(yīng)的參數(shù)
__gnu_cxx::hash_map<int, void*> _args;
};
8.3 msg_router集成到tcp_server中
A. tcp_server添加msg_router靜態(tài)成員變量
lars_reactor/include/tcp_server.h
class tcp_server
{
public:
// ...
//---- 消息分發(fā)路由 ----
static msg_router router;
// ...
};
同時(shí)定義及初始化
lars_reactor/src/tcp_server.cpp
//...
// ==== 消息分發(fā)路由 ===
msg_router tcp_server::router;
//...
B. tcp_server提供注冊(cè)路由方法
lars_reactor/include/tcp_server.c
class tcp_server
{
public:
//...
//注冊(cè)消息路由回調(diào)函數(shù)
void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {
router.register_msg_router(msgid, cb, user_data);
}
//...
public:
//全部已經(jīng)在線的連接信息
//---- 消息分發(fā)路由 ----
static msg_router router;
//...
};
C. 修正tcp_conn的do_read改成消息分發(fā)
lars_reactor/src/tcp_conn.cpp
//...
//處理讀業(yè)務(wù)
void tcp_conn::do_read()
{
//1. 從套接字讀取數(shù)據(jù)
int ret = ibuf.read_data(_connfd);
if (ret == -1) {
fprintf(stderr, "read data from socket\n");
this->clean_conn();
return ;
}
else if ( ret == 0) {
//對(duì)端正常關(guān)閉
printf("connection closed by peer\n");
clean_conn();
return ;
}
//2. 解析msg_head數(shù)據(jù)
msg_head head;
//[這里用while,可能一次性讀取多個(gè)完整包過來]
while (ibuf.length() >= MESSAGE_HEAD_LEN) {
//2.1 讀取msg_head頭部绪钥,固定長度MESSAGE_HEAD_LEN
memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN);
if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) {
fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen);
this->clean_conn();
break;
}
if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) {
//緩存buf中剩余的數(shù)據(jù)灿里,小于實(shí)際上應(yīng)該接受的數(shù)據(jù)
//說明是一個(gè)不完整的包,應(yīng)該拋棄
break;
}
//2.2 再根據(jù)頭長度讀取數(shù)據(jù)體程腹,然后針對(duì)數(shù)據(jù)體處理 業(yè)務(wù)
//頭部處理完了匣吊,往后偏移MESSAGE_HEAD_LEN長度
ibuf.pop(MESSAGE_HEAD_LEN);
//處理ibuf.data()業(yè)務(wù)數(shù)據(jù)
printf("read data: %s\n", ibuf.data());
//消息包路由模式
tcp_server::router.call(head.msgid, head.msglen, ibuf.data(), this);
////回顯業(yè)務(wù)
//callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this);
//消息體處理完了,往后便宜msglen長度
ibuf.pop(head.msglen);
}
ibuf.adjust();
return ;
}
//...
8.4 msg_router集成到tcp_client中
lars_reactor/include/tcp_client.h
class tcp_client : public net_connection
{
public:
// ...
//設(shè)置業(yè)務(wù)處理回調(diào)函數(shù)
//void set_msg_callback(msg_callback *msg_cb)
//{
//this->_msg_callback = msg_cb;
//}
//注冊(cè)消息路由回調(diào)函數(shù)
void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {
_router.register_msg_router(msgid, cb, user_data);
}
private:
//處理消息的分發(fā)路由
msg_router _router;
//msg_callback *_msg_callback; //單路由模式去掉
// ...
// ...
};
? 然后在修正tcp_client
的do_read()
方法。
lars_reactor/src/tcp_client.cpp
//處理讀業(yè)務(wù)
int tcp_client::do_read()
{
//確定已經(jīng)成功建立連接
assert(connected == true);
// 1. 一次性全部讀取出來
//得到緩沖區(qū)里有多少字節(jié)要被讀取,然后將字節(jié)數(shù)放入b里面色鸳。
int need_read = 0;
if (ioctl(_sockfd, FIONREAD, &need_read) == -1) {
fprintf(stderr, "ioctl FIONREAD error");
return -1;
}
//確保_buf可以容納可讀數(shù)據(jù)
assert(need_read <= _ibuf.capacity - _ibuf.length);
int ret;
do {
ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read);
} while(ret == -1 && errno == EINTR);
if (ret == 0) {
//對(duì)端關(guān)閉
if (_name != NULL) {
printf("%s client: connection close by peer!\n", _name);
}
else {
printf("client: connection close by peer!\n");
}
clean_conn();
return -1;
}
else if (ret == -1) {
fprintf(stderr, "client: do_read() , error\n");
clean_conn();
return -1;
}
assert(ret == need_read);
_ibuf.length += ret;
//2. 解包
msg_head head;
int msgid, length;
while (_ibuf.length >= MESSAGE_HEAD_LEN) {
memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN);
msgid = head.msgid;
length = head.msglen;
/*
if (length + MESSAGE_HEAD_LEN < _ibuf.length) {
break;
}
*/
//頭部讀取完畢
_ibuf.pop(MESSAGE_HEAD_LEN);
// ===================================
//3. 交給業(yè)務(wù)函數(shù)處理
//if (_msg_callback != NULL) {
//this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);
//}
// 消息路由分發(fā)
this->_router.call(msgid, length, _ibuf.data + _ibuf.head, this);
// ===================================
//數(shù)據(jù)區(qū)域處理完畢
_ibuf.pop(length);
}
//重置head指針
_ibuf.adjust();
return 0;
}
8.5 完成Lars Reactor V0.6開發(fā)
我們現(xiàn)在重新寫一下 server.cpp 和client.cpp的兩個(gè)應(yīng)用程序
lars_reacor/example/lars_reactor_0.6/server.cpp
#include "tcp_server.h"
//回顯業(yè)務(wù)的回調(diào)函數(shù)
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("callback_busi ...\n");
//直接回顯
conn->send_message(data, len, msgid);
}
//打印信息回調(diào)函數(shù)
void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("recv client: [%s]\n", data);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
int main()
{
event_loop loop;
tcp_server server(&loop, "127.0.0.1", 7777);
//注冊(cè)消息業(yè)務(wù)路由
server.add_msg_router(1, callback_busi);
server.add_msg_router(2, print_busi);
loop.event_process();
return 0;
}
lars_reacor/example/lars_reactor_0.6/client.cpp
#include "tcp_client.h"
#include <stdio.h>
#include <string.h>
//客戶端業(yè)務(wù)
void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
//得到服務(wù)端回執(zhí)的數(shù)據(jù)
printf("recv server: [%s]\n", data);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
int main()
{
event_loop loop;
//創(chuàng)建tcp客戶端
tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");
//注冊(cè)消息路由業(yè)務(wù)
client.add_msg_router(1, busi);
//開啟事件監(jiān)聽
loop.event_process();
return 0;
}
lars_reactor/src/tcp_client.cpp
//判斷鏈接是否是創(chuàng)建鏈接社痛,主要是針對(duì)非阻塞socket 返回EINPROGRESS錯(cuò)誤
static void connection_delay(event_loop *loop, int fd, void *args)
{
tcp_client *cli = (tcp_client*)args;
loop->del_io_event(fd);
int result = 0;
socklen_t result_len = sizeof(result);
getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);
if (result == 0) {
//鏈接是建立成功的
cli->connected = true;
printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
// ================ 發(fā)送msgid:1 =====
//建立連接成功之后,主動(dòng)發(fā)送send_message
const char *msg = "hello lars!";
int msgid = 1;
cli->send_message(msg, strlen(msg), msgid);
// ================ 發(fā)送msgid:2 =====
const char *msg2 = "hello Aceld!";
msgid = 2;
cli->send_message(msg2, strlen(msg2), msgid);
// ================
loop->add_io_event(fd, read_callback, EPOLLIN, cli);
if (cli->_obuf.length != 0) {
//輸出緩沖有數(shù)據(jù)可寫
loop->add_io_event(fd, write_callback, EPOLLOUT, cli);
}
}
else {
//鏈接創(chuàng)建失敗
fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
}
}
運(yùn)行結(jié)果:
服務(wù)端
$ ./server
msg_router init...
add msg cb msgid = 1
add msg cb msgid = 2
begin accept
get new connection succ!
read data: hello lars!?
call msgid = 1
callback_busi ...
server send_message: hello lars!?:11, msgid = 1
=======
read data: hello Aceld!
call msgid = 2
recv client: [hello Aceld!]
msgid: [2]
len: [12]
客戶端
$ ./client
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 1
connect 127.0.0.1:7777 succ!
do write over, del EPOLLOUT
call msgid = 1
recv server: [hello lars!]
msgid: [1]
len: [11]
=======
關(guān)于作者:
作者:Aceld(劉丹冰)
mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld
原創(chuàng)聲明:未經(jīng)作者允許請(qǐng)勿轉(zhuǎn)載, 如果轉(zhuǎn)載請(qǐng)注明出處