【Lars教程目錄】
Lars源代碼
https://github.com/aceld/Lars
【Lars系統(tǒng)概述】
第1章-概述
第2章-項(xiàng)目目錄構(gòu)建
【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項(xiàng)目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機(jī)制
第8章-鏈接創(chuàng)建/銷毀Hook機(jī)制
第9章-消息任務(wù)隊(duì)列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測(cè)試
第14章-異步消息任務(wù)機(jī)制
第15章-鏈接屬性設(shè)置功能
【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡(jiǎn)介
第2章-數(shù)據(jù)庫(kù)創(chuàng)建
第3章-項(xiàng)目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實(shí)時(shí)監(jiān)控
【Lars系統(tǒng)之Report Service模塊】
第1章-項(xiàng)目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報(bào)數(shù)據(jù)
第3章-存儲(chǔ)線程池及消息隊(duì)列
【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項(xiàng)目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計(jì)與實(shí)現(xiàn)
第4章-負(fù)載均衡模塊基礎(chǔ)設(shè)計(jì)
第5章-負(fù)載均衡獲取Host主機(jī)信息API
第6章-負(fù)載均衡上報(bào)Host主機(jī)信息API
第7章-過期窗口清理與過載超時(shí)(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負(fù)載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測(cè)試工具
第12章- Lars啟動(dòng)工具腳本
6) tcp客戶端觸發(fā)模型
? 我們可以給客戶端添加觸發(fā)模型缝裁。同時(shí)也提供一系列的接口供開發(fā)者寫客戶端應(yīng)用程序來使用。
6.1 tcp_client類設(shè)計(jì)
lars_reactor/include/tcp_client.h
#pragma once
#include "io_buf.h"
#include "event_loop.h"
#include "message.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
class tcp_client
{
public:
//初始化客戶端套接字
tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name);
//發(fā)送message方法
int send_message(const char *data, int msglen, int msgid);
//創(chuàng)建鏈接
void do_connect();
//處理讀業(yè)務(wù)
int do_read();
//處理寫業(yè)務(wù)
int do_write();
//釋放鏈接資源
void clean_conn();
~tcp_client();
//設(shè)置業(yè)務(wù)處理回調(diào)函數(shù)
void set_msg_callback(msg_callback *msg_cb)
{
this->_msg_callback = msg_cb;
}
bool connected; //鏈接是否創(chuàng)建成功
//server端地址
struct sockaddr_in _server_addr;
io_buf _obuf;
io_buf _ibuf;
private:
int _sockfd;
socklen_t _addrlen;
//客戶端的事件處理機(jī)制
event_loop* _loop;
//當(dāng)前客戶端的名稱 用戶記錄日志
const char *_name;
msg_callback *_msg_callback;
};
? 這里注意的是结序,tcp_client并不是tcp_server的一部分膏燃,而是單純?yōu)閷懣蛻舳颂峁┑慕涌谝瘛K赃@里也需要實(shí)現(xiàn)一套對(duì)讀寫事件處理的業(yè)務(wù)。 這里使用的讀寫緩沖是原始的io_buf
荔睹,并不是服務(wù)器封裝好的reactor_buf
原因是后者是轉(zhuǎn)為server做了一層封裝楷力,io_buf的基本方法比較全。
關(guān)鍵成員:
_sockfd
:當(dāng)前客戶端套接字锌奴。
_server_addr
: 鏈接的服務(wù)端的IP地址兽狭。
_loop
: 客戶端異步觸發(fā)事件機(jī)制event_loop句柄。
_msg_callback
: 當(dāng)前客戶端處理服務(wù)端的回調(diào)業(yè)務(wù)鹿蜀。
connected
:是否已經(jīng)成功connect服務(wù)端的標(biāo)致箕慧。
方法:
tcp_client()
:構(gòu)造函數(shù),主要是在里面完成基本的套接字初始化及connect操作.
do_connect()
:創(chuàng)建鏈接
do_read()
:處理鏈接的讀業(yè)務(wù)茴恰。
do_write()
:處理鏈接的寫業(yè)務(wù)颠焦。
clean_conn()
:清空鏈接資源。
6.2 創(chuàng)建鏈接
lars_reactor/src/tcp_client.cpp
tcp_client::tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name):
_ibuf(4194304),
_obuf(4194304)
{
_sockfd = -1;
_msg_callback = NULL;
_name = name;
_loop = loop;
bzero(&_server_addr, sizeof(_server_addr));
_server_addr.sin_family = AF_INET;
inet_aton(ip, &_server_addr.sin_addr);
_server_addr.sin_port = htons(port);
_addrlen = sizeof(_server_addr);
this->do_connect();
}
? 這里初始化tcp_client鏈接信息往枣,然后調(diào)用do_connect()
創(chuàng)建鏈接.
lars_reactor/src/tcp_client.cpp
//創(chuàng)建鏈接
void tcp_client::do_connect()
{
if (_sockfd != -1) {
close(_sockfd);
}
//創(chuàng)建套接字
_sockfd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, IPPROTO_TCP);
if (_sockfd == -1) {
fprintf(stderr, "create tcp client socket error\n");
exit(1);
}
int ret = connect(_sockfd, (const struct sockaddr*)&_server_addr, _addrlen);
if (ret == 0) {
//鏈接創(chuàng)建成功
connected = true;
//注冊(cè)讀回調(diào)
_loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
//如果寫緩沖去有數(shù)據(jù)伐庭,那么也需要觸發(fā)寫回調(diào)
if (this->_obuf.length != 0) {
_loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this);
}
printf("connect %s:%d succ!\n", inet_ntoa(_server_addr.sin_addr), ntohs(_server_addr.sin_port));
}
else {
if(errno == EINPROGRESS) {
//fd是非阻塞的,可能會(huì)出現(xiàn)這個(gè)錯(cuò)誤,但是并不表示鏈接創(chuàng)建失敗
//如果fd是可寫狀態(tài)分冈,則為鏈接是創(chuàng)建成功的.
fprintf(stderr, "do_connect EINPROGRESS\n");
//讓event_loop去觸發(fā)一個(gè)創(chuàng)建判斷鏈接業(yè)務(wù) 用EPOLLOUT事件立刻觸發(fā)
_loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this);
}
else {
fprintf(stderr, "connection error\n");
exit(1);
}
}
}
6.3 有關(guān)非阻塞客戶端socket創(chuàng)建鏈接問題
? 這里轉(zhuǎn)載一篇文章圾另,是有關(guān)非阻塞套接字,connect返回-1雕沉,并且errno是EINPROGRESS
的情況集乔。因?yàn)槲覀兊腸lient是采用event_loop形式,socket需要被設(shè)置為非阻塞坡椒。所以需要針對(duì)這個(gè)情況做處理扰路。下面是說明。
? 客戶端測(cè)試程序時(shí)倔叼,由于出現(xiàn)很多客戶端汗唱,經(jīng)過connect成功后,代碼卡在recv系統(tǒng)調(diào)用中缀雳,后來發(fā)現(xiàn)可能是由于socket默認(rèn)是阻塞模式渡嚣,所以會(huì)令很多客戶端鏈接處于鏈接卻不能傳輸數(shù)據(jù)狀態(tài)梢睛。
? 后來修改socket為非阻塞模式肥印,但在connect的時(shí)候,發(fā)現(xiàn)返回值為-1绝葡,剛開始以為是connect出現(xiàn)錯(cuò)誤深碱,但在服務(wù)器上看到了鏈接是ESTABLISED狀態(tài)。證明鏈接是成功的
? 但為什么會(huì)出現(xiàn)返回值是-1呢藏畅? 經(jīng)過查詢資料敷硅,以及看stevens的APUE功咒,也發(fā)現(xiàn)有這么一說。
? 當(dāng)connect在非阻塞模式下绞蹦,會(huì)出現(xiàn)返回-1
值力奋,錯(cuò)誤碼是EINPROGRESS
,但如何判斷connect是聯(lián)通的呢幽七?stevens書中說明要在connect后景殷,繼續(xù)判斷該socket是否可寫?
? 若可寫澡屡,則證明鏈接成功猿挚。
? 如何判斷可寫,有2種方案驶鹉,一種是select判斷是否可寫绩蜻,二用poll模型。
select:
int CheckConnect(int iSocket)
{
fd_set rset;
FD_ZERO(&rset);
FD_SET(iSocket, &rset);
timeval tm;
tm. tv_sec = 0;
tm.tv_usec = 0;
if ( select(iSocket + 1, NULL, &rset, NULL, &tval) <= 0)
{
close(iSocket);
return -1;
}
if (FD_ISSET(iSocket, &rset))
{
int err = -1;
socklen_t len = sizeof(int);
if ( getsockopt(iSocket, SOL_SOCKET, SO_ERROR ,&err, &len) < 0 )
{
close(iSocket);
printf("errno:%d %s\n", errno, strerror(errno));
return -2;
}
if (err)
{
errno = err;
close(iSocket);
return -3;
}
}
return 0;
}
poll:
int CheckConnect(int iSocket) {
struct pollfd fd;
int ret = 0;
socklen_t len = 0;
fd.fd = iSocket;
fd.events = POLLOUT;
while ( poll (&fd, 1, -1) == -1 ) {
if( errno != EINTR ){
perror("poll");
return -1;
}
}
len = sizeof(ret);
if ( getsockopt (iSocket, SOL_SOCKET, SO_ERROR, &ret, &len) == -1 ) {
perror("getsockopt");
return -1;
}
if(ret != 0) {
fprintf (stderr, "socket %d connect failed: %s\n",
iSocket, strerror (ret));
return -1;
}
return 0;
}
6.3 針對(duì)EINPROGRESS的連接創(chuàng)建處理
? 看上面do_connect()
的代碼其中一部分:
if(errno == EINPROGRESS) {
//fd是非阻塞的室埋,可能會(huì)出現(xiàn)這個(gè)錯(cuò)誤,但是并不表示鏈接創(chuàng)建失敗
//如果fd是可寫狀態(tài)办绝,則為鏈接是創(chuàng)建成功的.
fprintf(stderr, "do_connect EINPROGRESS\n");
//讓event_loop去觸發(fā)一個(gè)創(chuàng)建判斷鏈接業(yè)務(wù) 用EPOLLOUT事件立刻觸發(fā)
_loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this);
}
這里是又觸發(fā)一個(gè)寫事件,直接讓程序流程跳轉(zhuǎn)到connection_delay()
方法.那么我們需要在里面判斷鏈接是否已經(jīng)判斷成功词顾,并且做出一定的創(chuàng)建成功之后的業(yè)務(wù)動(dòng)作八秃。
lars_reactor/src/tcp_client.cpp
//判斷鏈接是否是創(chuàng)建鏈接,主要是針對(duì)非阻塞socket 返回EINPROGRESS錯(cuò)誤
static void connection_delay(event_loop *loop, int fd, void *args)
{
tcp_client *cli = (tcp_client*)args;
loop->del_io_event(fd);
int result = 0;
socklen_t result_len = sizeof(result);
getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);
if (result == 0) {
//鏈接是建立成功的
cli->connected = true;
printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
//建立連接成功之后肉盹,主動(dòng)發(fā)送send_message
const char *msg = "hello lars!";
int msgid = 1;
cli->send_message(msg, strlen(msg), msgid);
loop->add_io_event(fd, read_callback, EPOLLIN, cli);
if (cli->_obuf.length != 0) {
//輸出緩沖有數(shù)據(jù)可寫
loop->add_io_event(fd, write_callback, EPOLLOUT, cli);
}
}
else {
//鏈接創(chuàng)建失敗
fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
}
}
? 這是一個(gè)事件回調(diào)昔驱,所以用的是static方法而不是成員方法。首先是利用getsockopt
判斷鏈接是否創(chuàng)建成功上忍,如果成功翼闹,那么 我們當(dāng)前這個(gè)版本的客戶端是直接寫死主動(dòng)調(diào)用send_message()
方法發(fā)送給服務(wù)端一個(gè)hello lars!
字符串。然后直接交給我們的read_callback()
方法處理既绕,當(dāng)然如果寫緩沖有數(shù)據(jù)务热,我們也會(huì)觸發(fā)寫的write_callback()
方法。
? 接下來吓笙,看看這兩個(gè)callback以及send_message是怎么實(shí)現(xiàn)的淑玫。
callback
lars_reactor/src/tcp_client.cpp
static void write_callback(event_loop *loop, int fd, void *args)
{
tcp_client *cli = (tcp_client *)args;
cli->do_write();
}
static void read_callback(event_loop *loop, int fd, void *args)
{
tcp_client *cli = (tcp_client *)args;
cli->do_read();
}
//處理讀業(yè)務(wù)
int tcp_client::do_read()
{
//確定已經(jīng)成功建立連接
assert(connected == true);
// 1. 一次性全部讀取出來
//得到緩沖區(qū)里有多少字節(jié)要被讀取,然后將字節(jié)數(shù)放入b里面面睛。
int need_read = 0;
if (ioctl(_sockfd, FIONREAD, &need_read) == -1) {
fprintf(stderr, "ioctl FIONREAD error");
return -1;
}
//確保_buf可以容納可讀數(shù)據(jù)
assert(need_read <= _ibuf.capacity - _ibuf.length);
int ret;
do {
ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read);
} while(ret == -1 && errno == EINTR);
if (ret == 0) {
//對(duì)端關(guān)閉
if (_name != NULL) {
printf("%s client: connection close by peer!\n", _name);
}
else {
printf("client: connection close by peer!\n");
}
clean_conn();
return -1;
}
else if (ret == -1) {
fprintf(stderr, "client: do_read() , error\n");
clean_conn();
return -1;
}
assert(ret == need_read);
_ibuf.length += ret;
//2. 解包
msg_head head;
int msgid, length;
while (_ibuf.length >= MESSAGE_HEAD_LEN) {
memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN);
msgid = head.msgid;
length = head.msglen;
/*
if (length + MESSAGE_HEAD_LEN < _ibuf.length) {
break;
}
*/
//頭部讀取完畢
_ibuf.pop(MESSAGE_HEAD_LEN);
//3. 交給業(yè)務(wù)函數(shù)處理
if (_msg_callback != NULL) {
this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);
}
//數(shù)據(jù)區(qū)域處理完畢
_ibuf.pop(length);
}
//重置head指針
_ibuf.adjust();
return 0;
}
//處理寫業(yè)務(wù)
int tcp_client::do_write()
{
//數(shù)據(jù)有長(zhǎng)度絮蒿,切頭部索引是起始位置
assert(_obuf.head == 0 && _obuf.length);
int ret;
while (_obuf.length) {
//寫數(shù)據(jù)
do {
ret = write(_sockfd, _obuf.data, _obuf.length);
} while(ret == -1 && errno == EINTR);//非阻塞異常繼續(xù)重寫
if (ret > 0) {
_obuf.pop(ret);
_obuf.adjust();
}
else if (ret == -1 && errno != EAGAIN) {
fprintf(stderr, "tcp client write \n");
this->clean_conn();
}
else {
//出錯(cuò),不能再繼續(xù)寫
break;
}
}
if (_obuf.length == 0) {
//已經(jīng)寫完,刪除寫事件
printf("do write over, del EPOLLOUT\n");
this->_loop->del_io_event(_sockfd, EPOLLOUT);
}
return 0;
}
//釋放鏈接資源,重置連接
void tcp_client::clean_conn()
{
if (_sockfd != -1) {
printf("clean conn, del socket!\n");
_loop->del_io_event(_sockfd);
close(_sockfd);
}
connected = false;
//重新連接
this->do_connect();
}
tcp_client::~tcp_client()
{
close(_sockfd);
}
? 這里是基本的讀數(shù)據(jù)和寫數(shù)據(jù)的處理業(yè)務(wù)實(shí)現(xiàn)叁鉴。我們重點(diǎn)看do_read()
方法土涝,里面有段代碼:
//3. 交給業(yè)務(wù)函數(shù)處理
if (_msg_callback != NULL) {
this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);
}
? 是將我們從服務(wù)端讀取到的代碼,交給了_msg_callback()
方法來處理的幌墓,這個(gè)實(shí)際上是用戶開發(fā)者自己在業(yè)務(wù)上注冊(cè)的回調(diào)業(yè)務(wù)函數(shù)但壮。在tcp_client.h中我們已經(jīng)提供了set_msg_callback
暴露給開發(fā)者注冊(cè)使用冀泻。
send_message
lars_reactor/src/tcp_client.cpp
//主動(dòng)發(fā)送message方法
int tcp_client::send_message(const char *data, int msglen, int msgid)
{
if (connected == false) {
fprintf(stderr, "no connected , send message stop!\n");
return -1;
}
//是否需要添加寫事件觸發(fā)
//如果obuf中有數(shù)據(jù),沒必要添加蜡饵,如果沒有數(shù)據(jù)弹渔,添加完數(shù)據(jù)需要觸發(fā)
bool need_add_event = (_obuf.length == 0) ? true:false;
if (msglen + MESSAGE_HEAD_LEN > this->_obuf.capacity - _obuf.length) {
fprintf(stderr, "No more space to Write socket!\n");
return -1;
}
//封裝消息頭
msg_head head;
head.msgid = msgid;
head.msglen = msglen;
memcpy(_obuf.data + _obuf.length, &head, MESSAGE_HEAD_LEN);
_obuf.length += MESSAGE_HEAD_LEN;
memcpy(_obuf.data + _obuf.length, data, msglen);
_obuf.length += msglen;
if (need_add_event) {
_loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this);
}
return 0;
}
? 將發(fā)送的數(shù)據(jù)寫給obuf,然后出發(fā)write_callback將obuf的數(shù)據(jù)傳遞給對(duì)方服務(wù)端。
6.4 完成Lars Reactor V0.4開發(fā)
? 好了溯祸,現(xiàn)在我們框架部分已經(jīng)完成捞附,接下來我們就要實(shí)現(xiàn)一個(gè)serverapp 和 一個(gè)clientapp來進(jìn)行測(cè)試.
我們創(chuàng)建example/lars_reactor_0.4
文件夾。
Makefile
CXX=g++
CFLAGS=-g -O2 -Wall -fPIC -Wno-deprecated
INC=-I../../include
LIB=-L../../lib -llreactor -lpthread
OBJS = $(addsuffix .o, $(basename $(wildcard *.cc)))
all:
$(CXX) -o server $(CFLAGS) server.cpp $(INC) $(LIB)
$(CXX) -o client $(CFLAGS) client.cpp $(INC) $(LIB)
clean:
-rm -f *.o server client
服務(wù)端代碼:
server.cpp
#include "tcp_server.h"
int main()
{
event_loop loop;
tcp_server server(&loop, "127.0.0.1", 7777);
loop.event_process();
return 0;
}
客戶端代碼:
client.cpp
#include "tcp_client.h"
#include <stdio.h>
#include <string.h>
//客戶端業(yè)務(wù)
void busi(const char *data, uint32_t len, int msgid, tcp_client *conn, void *user_data)
{
//得到服務(wù)端回執(zhí)的數(shù)據(jù)
printf("recv server: [%s]\n", data);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
int main()
{
event_loop loop;
//創(chuàng)建tcp客戶端
tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.4");
//注冊(cè)回調(diào)業(yè)務(wù)
client.set_msg_callback(busi);
//開啟事件監(jiān)聽
loop.event_process();
return 0;
}
編譯并分別啟動(dòng)server 和client
服務(wù)端輸出:
$ ./server
begin accept
get new connection succ!
read data: hello lars!
server send_message: hello lars!:11, msgid = 1
客戶端輸出:
$ ./client
do_connect EINPROGRESS
connect 127.0.0.1:7777 succ!
do write over, del EPOLLOUT
recv server: [hello lars!]
msgid: [1]
len: [11]
? 現(xiàn)在客戶端已經(jīng)成功的發(fā)送數(shù)據(jù)給服務(wù)端您没,并且回顯的數(shù)據(jù)也直接被客戶端解析鸟召,我們的框架到現(xiàn)在就可以做一個(gè)基本的客戶端和服務(wù)端的完成了,但是還差很多氨鹏,接下來我們繼續(xù)優(yōu)化欧募。
關(guān)于作者:
作者:Aceld(劉丹冰)
mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld
原創(chuàng)聲明:未經(jīng)作者允許請(qǐng)勿轉(zhuǎn)載, 如果轉(zhuǎn)載請(qǐng)注明出處