【Lars教程目錄】
Lars源代碼
https://github.com/aceld/Lars
【Lars系統(tǒng)概述】
第1章-概述
第2章-項(xiàng)目目錄構(gòu)建
【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項(xiàng)目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機(jī)制
第8章-鏈接創(chuàng)建/銷毀Hook機(jī)制
第9章-消息任務(wù)隊(duì)列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測試
第14章-異步消息任務(wù)機(jī)制
第15章-鏈接屬性設(shè)置功能
【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項(xiàng)目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實(shí)時(shí)監(jiān)控
【Lars系統(tǒng)之Report Service模塊】
第1章-項(xiàng)目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報(bào)數(shù)據(jù)
第3章-存儲(chǔ)線程池及消息隊(duì)列
【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項(xiàng)目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計(jì)與實(shí)現(xiàn)
第4章-負(fù)載均衡模塊基礎(chǔ)設(shè)計(jì)
第5章-負(fù)載均衡獲取Host主機(jī)信息API
第6章-負(fù)載均衡上報(bào)Host主機(jī)信息API
第7章-過期窗口清理與過載超時(shí)(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負(fù)載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測試工具
第12章- Lars啟動(dòng)工具腳本
5) tcp鏈接與Message消息封裝
? 好了渗鬼,現(xiàn)在我們來將服務(wù)器的連接做一個(gè)簡單的封裝讯赏,在這之前励翼,我們要將我我們所發(fā)的數(shù)據(jù)做一個(gè)規(guī)定买决,采用TLV的格式睦柴,來進(jìn)行封裝。目的是解決TCP傳輸?shù)恼嘲鼏栴}扎筒。
5.1 Message消息封裝
? 先創(chuàng)建一個(gè)message.h頭文件
lars_reactor/include/message.h
#pragma once
//解決tcp粘包問題的消息頭
struct msg_head
{
int msgid;
int msglen;
};
//消息頭的二進(jìn)制長度艰山,固定數(shù)
#define MESSAGE_HEAD_LEN 8
//消息頭+消息體的最大長度限制
#define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN)
? 接下來我們每次在server和 client之間傳遞數(shù)據(jù)的時(shí)候窗宦,都發(fā)送這種數(shù)據(jù)格式的頭再加上后面的數(shù)據(jù)內(nèi)容即可赦颇。
5.2 創(chuàng)建一個(gè)tcp_conn連接類
lars_reactor/include/tcp_conn.h
#pragma once
#include "reactor_buf.h"
#include "event_loop.h"
//一個(gè)tcp的連接信息
class tcp_conn
{
public:
//初始化tcp_conn
tcp_conn(int connfd, event_loop *loop);
//處理讀業(yè)務(wù)
void do_read();
//處理寫業(yè)務(wù)
void do_write();
//銷毀tcp_conn
void clean_conn();
//發(fā)送消息的方法
int send_message(const char *data, int msglen, int msgid);
private:
//當(dāng)前鏈接的fd
int _connfd;
//該連接歸屬的event_poll
event_loop *_loop;
//輸出buf
output_buf obuf;
//輸入buf
input_buf ibuf;
};
簡單說明一下里面的成員和方法:
成員:
_connfd
:server剛剛accept成功的套接字
_loop
:當(dāng)前鏈接所綁定的事件觸發(fā)句柄.
obuf
:鏈接輸出緩沖,向?qū)Χ藢憯?shù)據(jù)
ibuf
:鏈接輸入緩沖赴涵,從對端讀數(shù)據(jù)
方法:
tcp_client()
:構(gòu)造媒怯,主要在里面實(shí)現(xiàn)初始化及創(chuàng)建鏈接鏈接的connect過程。
do_read()
:讀數(shù)據(jù)處理業(yè)務(wù)髓窜,主要是EPOLLIN事件觸發(fā)扇苞。
do_write()
:寫數(shù)據(jù)處理業(yè)務(wù),主要是EPOLLOUT事件觸發(fā)寄纵。
clean_conn()
:清空鏈接資源鳖敷。
send_message()
:將消息打包成TLV格式發(fā)送給對端。
? 接下來程拭,實(shí)現(xiàn)以下tcp_conn
類.
lars_reactor/src/tcp_conn.cpp
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <string.h>
#include "tcp_conn.h"
#include "message.h"
//回顯業(yè)務(wù)
void callback_busi(const char *data, uint32_t len, int msgid, void *args, tcp_conn *conn)
{
conn->send_message(data, len, msgid);
}
//連接的讀事件回調(diào)
static void conn_rd_callback(event_loop *loop, int fd, void *args)
{
tcp_conn *conn = (tcp_conn*)args;
conn->do_read();
}
//連接的寫事件回調(diào)
static void conn_wt_callback(event_loop *loop, int fd, void *args)
{
tcp_conn *conn = (tcp_conn*)args;
conn->do_write();
}
//初始化tcp_conn
tcp_conn::tcp_conn(int connfd, event_loop *loop)
{
_connfd = connfd;
_loop = loop;
//1. 將connfd設(shè)置成非阻塞狀態(tài)
int flag = fcntl(_connfd, F_GETFL, 0);
fcntl(_connfd, F_SETFL, O_NONBLOCK|flag);
//2. 設(shè)置TCP_NODELAY禁止做讀寫緩存定踱,降低小包延遲
int op = 1;
setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h
//3. 將該鏈接的讀事件讓event_loop監(jiān)控
_loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this);
//4 將該鏈接集成到對應(yīng)的tcp_server中
//TODO
}
//處理讀業(yè)務(wù)
void tcp_conn::do_read()
{
//1. 從套接字讀取數(shù)據(jù)
int ret = ibuf.read_data(_connfd);
if (ret == -1) {
fprintf(stderr, "read data from socket\n");
this->clean_conn();
return ;
}
else if ( ret == 0) {
//對端正常關(guān)閉
printf("connection closed by peer\n");
clean_conn();
return ;
}
//2. 解析msg_head數(shù)據(jù)
msg_head head;
//[這里用while,可能一次性讀取多個(gè)完整包過來]
while (ibuf.length() >= MESSAGE_HEAD_LEN) {
//2.1 讀取msg_head頭部恃鞋,固定長度MESSAGE_HEAD_LEN
memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN);
if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) {
fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen);
this->clean_conn();
break;
}
if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) {
//緩存buf中剩余的數(shù)據(jù)崖媚,小于實(shí)際上應(yīng)該接受的數(shù)據(jù)
//說明是一個(gè)不完整的包亦歉,應(yīng)該拋棄
break;
}
//2.2 再根據(jù)頭長度讀取數(shù)據(jù)體,然后針對數(shù)據(jù)體處理 業(yè)務(wù)
//TODO 添加包路由模式
//頭部處理完了畅哑,往后偏移MESSAGE_HEAD_LEN長度
ibuf.pop(MESSAGE_HEAD_LEN);
//處理ibuf.data()業(yè)務(wù)數(shù)據(jù)
printf("read data: %s\n", ibuf.data());
//回顯業(yè)務(wù)
callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this);
//消息體處理完了,往后便宜msglen長度
ibuf.pop(head.msglen);
}
ibuf.adjust();
return ;
}
//處理寫業(yè)務(wù)
void tcp_conn::do_write()
{
//do_write是觸發(fā)玩event事件要處理的事情肴楷,
//應(yīng)該是直接將out_buf力度數(shù)據(jù)io寫會(huì)對方客戶端
//而不是在這里組裝一個(gè)message再發(fā)
//組裝message的過程應(yīng)該是主動(dòng)調(diào)用
//只要obuf中有數(shù)據(jù)就寫
while (obuf.length()) {
int ret = obuf.write2fd(_connfd);
if (ret == -1) {
fprintf(stderr, "write2fd error, close conn!\n");
this->clean_conn();
return ;
}
if (ret == 0) {
//不是錯(cuò)誤,僅返回0表示不可繼續(xù)寫
break;
}
}
if (obuf.length() == 0) {
//數(shù)據(jù)已經(jīng)全部寫完荠呐,將_connfd的寫事件取消掉
_loop->del_io_event(_connfd, EPOLLOUT);
}
return ;
}
//發(fā)送消息的方法
int tcp_conn::send_message(const char *data, int msglen, int msgid)
{
printf("server send_message: %s:%d, msgid = %d\n", data, msglen, msgid);
bool active_epollout = false;
if(obuf.length() == 0) {
//如果現(xiàn)在已經(jīng)數(shù)據(jù)都發(fā)送完了阶祭,那么是一定要激活寫事件的
//如果有數(shù)據(jù),說明數(shù)據(jù)還沒有完全寫完到對端直秆,那么沒必要再激活等寫完再激活
active_epollout = true;
}
//1 先封裝message消息頭
msg_head head;
head.msgid = msgid;
head.msglen = msglen;
//1.1 寫消息頭
int ret = obuf.send_data((const char *)&head, MESSAGE_HEAD_LEN);
if (ret != 0) {
fprintf(stderr, "send head error\n");
return -1;
}
//1.2 寫消息體
ret = obuf.send_data(data, msglen);
if (ret != 0) {
//如果寫消息體失敗,那就回滾將消息頭的發(fā)送也取消
obuf.pop(MESSAGE_HEAD_LEN);
return -1;
}
if (active_epollout == true) {
//2. 激活EPOLLOUT寫事件
_loop->add_io_event(_connfd, conn_wt_callback, EPOLLOUT, this);
}
return 0;
}
//銷毀tcp_conn
void tcp_conn::clean_conn()
{
//鏈接清理工作
//1 將該鏈接從tcp_server摘除掉
//TODO
//2 將該鏈接從event_loop中摘除
_loop->del_io_event(_connfd);
//3 buf清空
ibuf.clear();
obuf.clear();
//4 關(guān)閉原始套接字
int fd = _connfd;
_connfd = -1;
close(fd);
}
? 具體每個(gè)方法的實(shí)現(xiàn)鞭盟,都很清晰圾结。其中conn_rd_callback()
和conn_wt_callback()
是注冊讀寫事件的回調(diào)函數(shù),設(shè)置為static是因?yàn)楹瘮?shù)類型沒有this指針齿诉。在里面分別再調(diào)用do_read()
和do_write()
方法筝野。
5.3 修正tcp_server對accept之后的處理方法
lars_reactor/src/tcp_server.cpp
//...
//開始提供創(chuàng)建鏈接服務(wù)
void tcp_server::do_accept()
{
int connfd;
while(true) {
//accept與客戶端創(chuàng)建鏈接
printf("begin accept\n");
connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
if (connfd == -1) {
if (errno == EINTR) {
fprintf(stderr, "accept errno=EINTR\n");
continue;
}
else if (errno == EMFILE) {
//建立鏈接過多,資源不夠
fprintf(stderr, "accept errno=EMFILE\n");
}
else if (errno == EAGAIN) {
fprintf(stderr, "accept errno=EAGAIN\n");
break;
}
else {
fprintf(stderr, "accept error");
exit(1);
}
}
else {
//accept succ!
// ============= 將之前的觸發(fā)回調(diào)的刪掉粤剧,改成如下====
tcp_conn *conn = new tcp_conn(connfd, _loop);
if (conn == NULL) {
fprintf(stderr, "new tcp_conn error\n");
exit(1);
}
// ============================================
printf("get new connection succ!\n");
break;
}
}
}
//...
? 這樣歇竟,每次accept成功之后,創(chuàng)建一個(gè)與當(dāng)前客戶端套接字綁定的tcp_conn對象抵恋。在構(gòu)造里就完成了基本的對于EPOLLIN事件的監(jiān)聽和回調(diào)動(dòng)作.
? 現(xiàn)在可以先編譯一下焕议,保證沒有語法錯(cuò)誤,但是如果想測試弧关,就不能夠使用nc
指令測試了盅安,因?yàn)楝F(xiàn)在服務(wù)端只能夠接收我們自定義的TLV格式的報(bào)文。那么我們需要自己寫一個(gè)客戶端來完成基本的測試世囊。
關(guān)于作者:
作者:Aceld(劉丹冰)
mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld
原創(chuàng)聲明:未經(jīng)作者允許請勿轉(zhuǎn)載, 如果轉(zhuǎn)載請注明出處