【Lars教程目錄】
Lars源代碼
https://github.com/aceld/Lars
【Lars系統(tǒng)概述】
第1章-概述
第2章-項(xiàng)目目錄構(gòu)建
【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項(xiàng)目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機(jī)制
第8章-鏈接創(chuàng)建/銷毀Hook機(jī)制
第9章-消息任務(wù)隊(duì)列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測(cè)試
第14章-異步消息任務(wù)機(jī)制
第15章-鏈接屬性設(shè)置功能
【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡(jiǎn)介
第2章-數(shù)據(jù)庫(kù)創(chuàng)建
第3章-項(xiàng)目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實(shí)時(shí)監(jiān)控
【Lars系統(tǒng)之Report Service模塊】
第1章-項(xiàng)目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報(bào)數(shù)據(jù)
第3章-存儲(chǔ)線程池及消息隊(duì)列
【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項(xiàng)目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計(jì)與實(shí)現(xiàn)
第4章-負(fù)載均衡模塊基礎(chǔ)設(shè)計(jì)
第5章-負(fù)載均衡獲取Host主機(jī)信息API
第6章-負(fù)載均衡上報(bào)Host主機(jī)信息API
第7章-過(guò)期窗口清理與過(guò)載超時(shí)(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負(fù)載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測(cè)試工具
第12章- Lars啟動(dòng)工具腳本
4) 事件觸發(fā)event_loop
? 接下來(lái)我們要嘗試添加多路IO的處理機(jī)制,當(dāng)然linux的平臺(tái)下隔箍, 最優(yōu)的選擇就是使用epoll來(lái)做谓娃,但是用原生的epoll實(shí)際上編程起來(lái)擴(kuò)展性不是很強(qiáng),那么我們就需要封裝一套IO事件處理機(jī)制鞍恢。
4.1 io_event基于IO事件封裝
? 我們首先定義一個(gè)IO事件類來(lái)包括一個(gè)時(shí)間需要擁有的基本成員信息.
lars_reactor/include/event_base.h
#pragma once
/*
* 定義一些IO復(fù)用機(jī)制或者其他異常觸發(fā)機(jī)制的事件封裝
*
* */
class event_loop;
//IO事件觸發(fā)的回調(diào)函數(shù)
typedef void io_callback(event_loop *loop, int fd, void *args);
/*
* 封裝一次IO觸發(fā)實(shí)現(xiàn)
* */
struct io_event
{
io_event():read_callback(NULL),write_callback(NULL),rcb_args(NULL),wcb_args(NULL) {}
int mask; //EPOLLIN EPOLLOUT
io_callback *read_callback; //EPOLLIN事件 觸發(fā)的回調(diào)
io_callback *write_callback;//EPOLLOUT事件 觸發(fā)的回調(diào)
void *rcb_args; //read_callback的回調(diào)函數(shù)參數(shù)
void *wcb_args; //write_callback的回調(diào)函數(shù)參數(shù)
};
? 一個(gè)io_event
對(duì)象應(yīng)該包含 一個(gè)epoll的事件標(biāo)識(shí)EPOLLIN/EPOLLOUT
,和對(duì)應(yīng)事件的處理函數(shù)read_callback
,write_callback
傻粘。他們都應(yīng)該是io_callback
類型。然后對(duì)應(yīng)的函數(shù)形參帮掉。
4.2 event_loop事件循環(huán)處理機(jī)制
? 接下來(lái)我們就要通過(guò)event_loop類來(lái)實(shí)現(xiàn)io_event的基本增刪操作弦悉,放在原生的epoll
堆中。
lars_reactor/include/event_loop.h
#pragma once
/*
*
* event_loop事件處理機(jī)制
*
* */
#include <sys/epoll.h>
#include <ext/hash_map>
#include <ext/hash_set>
#include "event_base.h"
#define MAXEVENTS 10
// map: fd->io_event
typedef __gnu_cxx::hash_map<int, io_event> io_event_map;
//定義指向上面map類型的迭代器
typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it;
//全部正在監(jiān)聽的fd集合
typedef __gnu_cxx::hash_set<int> listen_fd_set;
class event_loop
{
public:
//構(gòu)造蟆炊,初始化epoll堆
event_loop();
//阻塞循環(huán)處理事件
void event_process();
//添加一個(gè)io事件到loop中
void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL);
//刪除一個(gè)io事件從loop中
void del_io_event(int fd);
//刪除一個(gè)io事件的EPOLLIN/EPOLLOUT
void del_io_event(int fd, int mask);
private:
int _epfd; //epoll fd
//當(dāng)前event_loop 監(jiān)控的fd和對(duì)應(yīng)事件的關(guān)系
io_event_map _io_evs;
//當(dāng)前event_loop 一共哪些fd在監(jiān)聽
listen_fd_set listen_fds;
//一次性最大處理的事件
struct epoll_event _fired_evs[MAXEVENTS];
};
屬性:
_epfd
:是epoll原生堆的fd稽莉。
_io_evs
:是一個(gè)hash_map對(duì)象,主要是方便我們管理fd
<—>io_event
的對(duì)應(yīng)關(guān)系涩搓,方便我們來(lái)查找和處理污秆。
_listen_fds
:記錄目前一共有多少個(gè)fd正在本我們的event_loop
機(jī)制所監(jiān)控.
_fried_evs
:已經(jīng)通過(guò)epoll_wait返回的被激活需要上層處理的fd集合.
方法:
event_loop()
:構(gòu)造函數(shù),主要初始化epoll.
event_process()
:永久阻塞昧甘,等待觸發(fā)的事件良拼,去調(diào)用對(duì)應(yīng)的函數(shù)callback方法。
add_io_event()
:綁定一個(gè)fd和一個(gè)io_event
的關(guān)系充边,并添加對(duì)應(yīng)的事件到event_loop
中庸推。
del_io_event()
:從event_loop
刪除該事件。
? 具體實(shí)現(xiàn)方法如下:
lars_reactor/src/event_loop.cpp
#include "event_loop.h"
#include <assert.h>
//構(gòu)造浇冰,初始化epoll堆
event_loop::event_loop()
{
//flag=0 等價(jià)于epll_craete
_epfd = epoll_create1(0);
if (_epfd == -1) {
fprintf(stderr, "epoll_create error\n");
exit(1);
}
}
//阻塞循環(huán)處理事件
void event_loop::event_process()
{
while (true) {
io_event_map_it ev_it;
int nfds = epoll_wait(_epfd, _fired_evs, MAXEVENTS, 10);
for (int i = 0; i < nfds; i++) {
//通過(guò)觸發(fā)的fd找到對(duì)應(yīng)的綁定事件
ev_it = _io_evs.find(_fired_evs[i].data.fd);
assert(ev_it != _io_evs.end());
io_event *ev = &(ev_it->second);
if (_fired_evs[i].events & EPOLLIN) {
//讀事件贬媒,掉讀回調(diào)函數(shù)
void *args = ev->rcb_args;
ev->read_callback(this, _fired_evs[i].data.fd, args);
}
else if (_fired_evs[i].events & EPOLLOUT) {
//寫事件,掉寫回調(diào)函數(shù)
void *args = ev->wcb_args;
ev->write_callback(this, _fired_evs[i].data.fd, args);
}
else if (_fired_evs[i].events &(EPOLLHUP|EPOLLERR)) {
//水平觸發(fā)未處理肘习,可能會(huì)出現(xiàn)HUP事件际乘,正常處理讀寫,沒(méi)有則清空
if (ev->read_callback != NULL) {
void *args = ev->rcb_args;
ev->read_callback(this, _fired_evs[i].data.fd, args);
}
else if (ev->write_callback != NULL) {
void *args = ev->wcb_args;
ev->write_callback(this, _fired_evs[i].data.fd, args);
}
else {
//刪除
fprintf(stderr, "fd %d get error, delete it from epoll\n", _fired_evs[i].data.fd);
this->del_io_event(_fired_evs[i].data.fd);
}
}
}
}
}
/*
* 這里我們處理的事件機(jī)制是
* 如果EPOLLIN 在mask中漂佩, EPOLLOUT就不允許在mask中
* 如果EPOLLOUT 在mask中脖含, EPOLLIN就不允許在mask中
* 如果想注冊(cè)EPOLLIN|EPOLLOUT的事件罪塔, 那么就調(diào)用add_io_event() 方法兩次來(lái)注冊(cè)。
* */
//添加一個(gè)io事件到loop中
void event_loop::add_io_event(int fd, io_callback *proc, int mask, void *args)
{
int final_mask;
int op;
//1 找到當(dāng)前fd是否已經(jīng)有事件
io_event_map_it it = _io_evs.find(fd);
if (it == _io_evs.end()) {
//2 如果沒(méi)有操作動(dòng)作就是ADD
//沒(méi)有找到
final_mask = mask;
op = EPOLL_CTL_ADD;
}
else {
//3 如果有操作董酒是MOD
//添加事件標(biāo)識(shí)位
final_mask = it->second.mask | mask;
op = EPOLL_CTL_MOD;
}
//4 注冊(cè)回調(diào)函數(shù)
if (mask & EPOLLIN) {
//讀事件回調(diào)函數(shù)注冊(cè)
_io_evs[fd].read_callback = proc;
_io_evs[fd].rcb_args = args;
}
else if (mask & EPOLLOUT) {
_io_evs[fd].write_callback = proc;
_io_evs[fd].wcb_args = args;
}
//5 epoll_ctl添加到epoll堆里
_io_evs[fd].mask = final_mask;
//創(chuàng)建原生epoll事件
struct epoll_event event;
event.events = final_mask;
event.data.fd = fd;
if (epoll_ctl(_epfd, op, fd, &event) == -1) {
fprintf(stderr, "epoll ctl %d error\n", fd);
return;
}
//6 將fd添加到監(jiān)聽集合中
listen_fds.insert(fd);
}
//刪除一個(gè)io事件從loop中
void event_loop::del_io_event(int fd)
{
//將事件從_io_evs刪除
_io_evs.erase(fd);
//將fd從監(jiān)聽集合中刪除
listen_fds.erase(fd);
//將fd從epoll堆刪除
epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
}
//刪除一個(gè)io事件的EPOLLIN/EPOLLOUT
void event_loop::del_io_event(int fd, int mask)
{
//如果沒(méi)有該事件器赞,直接返回
io_event_map_it it = _io_evs.find(fd);
if (it == _io_evs.end()) {
return ;
}
int &o_mask = it->second.mask;
//修正mask
o_mask = o_mask & (~mask);
if (o_mask == 0) {
//如果修正之后 mask為0垢袱,則刪除
this->del_io_event(fd);
}
else {
//如果修正之后,mask非0港柜,則修改
struct epoll_event event;
event.events = o_mask;
event.data.fd = fd;
epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &event);
}
}
? 這里del_io_event
提供兩個(gè)重載请契,一個(gè)是直接刪除事件,一個(gè)是修正事件夏醉。
4.3 Reactor集成event_loop機(jī)制
? 好了爽锥,那么接下來(lái),就讓讓Lars Reactor框架集成event_loop
機(jī)制畔柔。
首先簡(jiǎn)單修正一個(gè)tcp_server.cpp
文件氯夷,對(duì)之前的do_accept()
的調(diào)度時(shí)機(jī)做一下修正。
1. 在`tcp_server`成員新增`event_loop`成員靶擦。
lars_reactor/include/tcp_server.h
#pragma once
#include <netinet/in.h>
#include "event_loop.h"
class tcp_server
{
public:
//server的構(gòu)造函數(shù)
tcp_server(event_loop* loop, const char *ip, uint16_t port);
//開始提供創(chuàng)建鏈接服務(wù)
void do_accept();
//鏈接對(duì)象釋放的析構(gòu)
~tcp_server();
private:
int _sockfd; //套接字
struct sockaddr_in _connaddr; //客戶端鏈接地址
socklen_t _addrlen; //客戶端鏈接地址長(zhǎng)度
// ============= 新增 ======================
//event_loop epoll事件機(jī)制
event_loop* _loop;
// ============= 新增 ======================
};
- 構(gòu)造函數(shù)在創(chuàng)建完listen fd之后腮考,添加accept事件。
lars_reactor/src/tcp_server.cpp
//listen fd 客戶端有新鏈接請(qǐng)求過(guò)來(lái)的回調(diào)函數(shù)
void accept_callback(event_loop *loop, int fd, void *args)
{
tcp_server *server = (tcp_server*)args;
server->do_accept();
}
//server的構(gòu)造函數(shù)
tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port)
{
bzero(&_connaddr, sizeof(_connaddr));
//忽略一些信號(hào) SIGHUP, SIGPIPE
//SIGPIPE:如果客戶端關(guān)閉玄捕,服務(wù)端再次write就會(huì)產(chǎn)生
//SIGHUP:如果terminal關(guān)閉踩蔚,會(huì)給當(dāng)前進(jìn)程發(fā)送該信號(hào)
if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {
fprintf(stderr, "signal ignore SIGHUP\n");
}
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
fprintf(stderr, "signal ignore SIGPIPE\n");
}
//1. 創(chuàng)建socket
_sockfd = socket(AF_INET, SOCK_STREAM /*| SOCK_NONBLOCK*/ | SOCK_CLOEXEC, IPPROTO_TCP);
if (_sockfd == -1) {
fprintf(stderr, "tcp_server::socket()\n");
exit(1);
}
//2 初始化地址
struct sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
inet_aton(ip, &server_addr.sin_addr);
server_addr.sin_port = htons(port);
//2-1可以多次監(jiān)聽,設(shè)置REUSE屬性
int op = 1;
if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &op, sizeof(op)) < 0) {
fprintf(stderr, "setsocketopt SO_REUSEADDR\n");
}
//3 綁定端口
if (bind(_sockfd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
fprintf(stderr, "bind error\n");
exit(1);
}
//4 監(jiān)聽ip端口
if (listen(_sockfd, 500) == -1) {
fprintf(stderr, "listen error\n");
exit(1);
}
// ============= 新增 ======================
//5 將_sockfd添加到event_loop中
_loop = loop;
//6 注冊(cè)_socket讀事件-->accept處理
_loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this);
// ============= 新增 ======================
}
- 修改do_accept()方法
lars_reactor/src/tcp_server.cpp
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>
#include <signal.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>
#include "tcp_server.h"
#include "reactor_buf.h"
//臨時(shí)的收發(fā)消息
struct message{
char data[m4K];
char len;
};
struct message msg;
void server_rd_callback(event_loop *loop, int fd, void *args);
void server_wt_callback(event_loop *loop, int fd, void *args);
//...省略其他代碼
//...省略其他代碼
//server read_callback
void server_rd_callback(event_loop *loop, int fd, void *args)
{
int ret = 0;
struct message *msg = (struct message*)args;
input_buf ibuf;
ret = ibuf.read_data(fd);
if (ret == -1) {
fprintf(stderr, "ibuf read_data error\n");
//刪除事件
loop->del_io_event(fd);
//對(duì)端關(guān)閉
close(fd);
return;
}
if (ret == 0) {
//刪除事件
loop->del_io_event(fd);
//對(duì)端關(guān)閉
close(fd);
return ;
}
printf("ibuf.length() = %d\n", ibuf.length());
//將讀到的數(shù)據(jù)放在msg中
msg->len = ibuf.length();
bzero(msg->data, msg->len);
memcpy(msg->data, ibuf.data(), msg->len);
ibuf.pop(msg->len);
ibuf.adjust();
printf("recv data = %s\n", msg->data);
//刪除讀事件枚粘,添加寫事件
loop->del_io_event(fd, EPOLLIN);
loop->add_io_event(fd, server_wt_callback, EPOLLOUT, msg);
}
//server write_callback
void server_wt_callback(event_loop *loop, int fd, void *args)
{
struct message *msg = (struct message*)args;
output_buf obuf;
//回顯數(shù)據(jù)
obuf.send_data(msg->data, msg->len);
while(obuf.length()) {
int write_ret = obuf.write2fd(fd);
if (write_ret == -1) {
fprintf(stderr, "write connfd error\n");
return;
}
else if(write_ret == 0) {
//不是錯(cuò)誤馅闽,表示此時(shí)不可寫
break;
}
}
//刪除寫事件,添加讀事件
loop->del_io_event(fd, EPOLLOUT);
loop->add_io_event(fd, server_rd_callback, EPOLLIN, msg);
}
//...省略其他代碼
//...省略其他代碼
//開始提供創(chuàng)建鏈接服務(wù)
void tcp_server::do_accept()
{
int connfd;
while(true) {
//accept與客戶端創(chuàng)建鏈接
printf("begin accept\n");
connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
if (connfd == -1) {
if (errno == EINTR) {
fprintf(stderr, "accept errno=EINTR\n");
continue;
}
else if (errno == EMFILE) {
//建立鏈接過(guò)多馍迄,資源不夠
fprintf(stderr, "accept errno=EMFILE\n");
}
else if (errno == EAGAIN) {
fprintf(stderr, "accept errno=EAGAIN\n");
break;
}
else {
fprintf(stderr, "accept error");
exit(1);
}
}
else {
//accept succ!
// ============= 新增 ======================
this->_loop->add_io_event(connfd, server_rd_callback, EPOLLIN, &msg);
break;
// ============= 新增 ======================
}
}
}
//...省略其他代碼
//...省略其他代碼
4.4 完成Lars Reactor V0.3開發(fā)
? 我們將lars_reactor/example/lars_reactor_0.2的代碼復(fù)制一份到 lars_reactor/example/lars_reactor_0.3中福也。
lars_reactor/example/lars_reactor_0.3/lars_reactor.cpp
#include "tcp_server.h"
int main()
{
event_loop loop;
tcp_server server(&loop, "127.0.0.1", 7777);
loop.event_process();
return 0;
}
編譯。
啟動(dòng)服務(wù)器
$ ./lars_reactor
分別啟動(dòng)2個(gè)客戶端
client1
$ nc 127.0.0.1 7777
hello Iam client1
hello Iam client1 回顯
client2
$ nc 127.0.0.1 7777
hello Iam client2
hello Iam client2 回顯
服務(wù)端打印
$ ./lars_reactor
begin accept
ibuf.length() = 18
recv data = hello Iam client1
begin accept
ibuf.length() = 18
recv data = hello Iam client2
目前我們已經(jīng)成功將event_loop
機(jī)制加入到reactor中了攀圈,接下來(lái)繼續(xù)添加功能暴凑。
關(guān)于作者:
作者:Aceld(劉丹冰)
mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld
原創(chuàng)聲明:未經(jīng)作者允許請(qǐng)勿轉(zhuǎn)載, 如果轉(zhuǎn)載請(qǐng)注明出處