【Lars教程目錄】
Lars源代碼
https://github.com/aceld/Lars
【Lars系統(tǒng)概述】
第1章-概述
第2章-項(xiàng)目目錄構(gòu)建
【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項(xiàng)目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機(jī)制
第8章-鏈接創(chuàng)建/銷毀Hook機(jī)制
第9章-消息任務(wù)隊(duì)列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測試
第14章-異步消息任務(wù)機(jī)制
第15章-鏈接屬性設(shè)置功能
【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項(xiàng)目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實(shí)時(shí)監(jiān)控
【Lars系統(tǒng)之Report Service模塊】
第1章-項(xiàng)目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報(bào)數(shù)據(jù)
第3章-存儲線程池及消息隊(duì)列
【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項(xiàng)目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計(jì)與實(shí)現(xiàn)
第4章-負(fù)載均衡模塊基礎(chǔ)設(shè)計(jì)
第5章-負(fù)載均衡獲取Host主機(jī)信息API
第6章-負(fù)載均衡上報(bào)Host主機(jī)信息API
第7章-過期窗口清理與過載超時(shí)(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負(fù)載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測試工具
第12章- Lars啟動工具腳本
9) 鏈接創(chuàng)建/銷毀Hook機(jī)制
? 下面我們來給鏈接注冊兩個(gè)hook節(jié)點(diǎn)的函數(shù)馋嗜,即服務(wù)端有新的客戶端鏈接創(chuàng)建之后用戶可以注冊一個(gè)回調(diào),有客戶端斷開鏈接的回調(diào)卿叽。還有客戶端在成功與服務(wù)端創(chuàng)建鏈接之后創(chuàng)建的回調(diào)赋铝,和客戶端與服務(wù)端斷開鏈接之前的回調(diào)铲咨。
9.1 tcp_server服務(wù)端添加鏈接Hook函數(shù)
A. 定義Hook函數(shù)原型
lars_reactor/include/net_connection.h
#pragma once
/*
*
* 網(wǎng)絡(luò)通信的抽象類,任何需要進(jìn)行收發(fā)消息的模塊鸟妙,都可以實(shí)現(xiàn)該類
*
* */
class net_connection
{
public:
net_connection() {}
//發(fā)送消息的接口
virtual int send_message(const char *data, int datalen, int msgid) = 0;
};
//創(chuàng)建鏈接/銷毀鏈接 要觸發(fā)的 回調(diào)函數(shù)類型
typedef void (*conn_callback)(net_connection *conn, void *args);
B. tcp_server定義相關(guān)hook函數(shù)的屬性
lars_reactor/include/tcp_server.h
#pragma once
#include <netinet/in.h>
#include "event_loop.h"
#include "tcp_conn.h"
#include "message.h"
class tcp_server
{
public:
//server的構(gòu)造函數(shù)
tcp_server(event_loop* loop, const char *ip, uint16_t port);
//開始提供創(chuàng)建鏈接服務(wù)
void do_accept();
//鏈接對象釋放的析構(gòu)
~tcp_server();
//注冊消息路由回調(diào)函數(shù)
void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {
router.register_msg_router(msgid, cb, user_data);
}
private:
//基礎(chǔ)信息
int _sockfd; //套接字
struct sockaddr_in _connaddr; //客戶端鏈接地址
socklen_t _addrlen; //客戶端鏈接地址長度
//event_loop epoll事件機(jī)制
event_loop* _loop;
public:
//---- 消息分發(fā)路由 ----
static msg_router router;
//---- 客戶端鏈接管理部分-----
public:
static void increase_conn(int connfd, tcp_conn *conn); //新增一個(gè)新建的連接
static void decrease_conn(int connfd); //減少一個(gè)斷開的連接
static void get_conn_num(int *curr_conn); //得到當(dāng)前鏈接的刻度
static tcp_conn **conns; //全部已經(jīng)在線的連接信息
// ------- 創(chuàng)建鏈接/銷毀鏈接 Hook 部分 -----
//設(shè)置鏈接的創(chuàng)建hook函數(shù)
static void set_conn_start(conn_callback cb, void *args = NULL) {
conn_start_cb = cb;
conn_start_cb_args = args;
}
//設(shè)置鏈接的銷毀hook函數(shù)
static void set_conn_close(conn_callback cb, void *args = NULL) {
conn_close_cb = cb;
conn_close_cb_args = args;
}
//創(chuàng)建鏈接之后要觸發(fā)的 回調(diào)函數(shù)
static conn_callback conn_start_cb;
static void *conn_start_cb_args;
//銷毀鏈接之前要觸發(fā)的 回調(diào)函數(shù)
static conn_callback conn_close_cb;
static void *conn_close_cb_args;
private:
//TODO
//從配置文件中讀取
#define MAX_CONNS 10000
static int _max_conns; //最大client鏈接個(gè)數(shù)
static int _curr_conns; //當(dāng)前鏈接刻度
static pthread_mutex_t _conns_mutex; //保護(hù)_curr_conns刻度修改的鎖
};
C. tcp_conn在連接創(chuàng)建/銷毀調(diào)用Hook函數(shù)
//初始化tcp_conn
tcp_conn::tcp_conn(int connfd, event_loop *loop)
{
_connfd = connfd;
_loop = loop;
//1. 將connfd設(shè)置成非阻塞狀態(tài)
int flag = fcntl(_connfd, F_GETFL, 0);
fcntl(_connfd, F_SETFL, O_NONBLOCK|flag);
//2. 設(shè)置TCP_NODELAY禁止做讀寫緩存扬绪,降低小包延遲
int op = 1;
setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h
//2.5 如果用戶注冊了鏈接建立Hook 則調(diào)用
if (tcp_server::conn_start_cb) {
tcp_server::conn_start_cb(this, tcp_server::conn_start_cb_args);
}
//3. 將該鏈接的讀事件讓event_loop監(jiān)控
_loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this);
//4 將該鏈接集成到對應(yīng)的tcp_server中
tcp_server::increase_conn(_connfd, this);
}
//...
//...
//銷毀tcp_conn
void tcp_conn::clean_conn()
{
// 如果注冊了鏈接銷毀Hook函數(shù)荠呐,則調(diào)用
if (tcp_server::conn_close_cb) {
tcp_server::conn_close_cb(this, tcp_server::conn_close_cb_args);
}
//鏈接清理工作
//1 將該鏈接從tcp_server摘除掉
tcp_server::decrease_conn(_connfd);
//2 將該鏈接從event_loop中摘除
_loop->del_io_event(_connfd);
//3 buf清空
ibuf.clear();
obuf.clear();
//4 關(guān)閉原始套接字
int fd = _connfd;
_connfd = -1;
close(fd);
}
9.2 tcp_client客戶端添加鏈接Hook函數(shù)
A. tcp_client添加Hook屬性
lars_reactor/include/tcp_client.h
#pragma once
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "io_buf.h"
#include "event_loop.h"
#include "message.h"
#include "net_connection.h"
class tcp_client : public net_connection
{
public:
//初始化客戶端套接字
tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name);
//發(fā)送message方法
int send_message(const char *data, int msglen, int msgid);
//創(chuàng)建鏈接
void do_connect();
//處理讀業(yè)務(wù)
int do_read();
//處理寫業(yè)務(wù)
int do_write();
//釋放鏈接資源
void clean_conn();
~tcp_client();
//設(shè)置業(yè)務(wù)處理回調(diào)函數(shù)
//void set_msg_callback(msg_callback *msg_cb)
//{
//this->_msg_callback = msg_cb;
//}
//注冊消息路由回調(diào)函數(shù)
void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {
_router.register_msg_router(msgid, cb, user_data);
}
//----- 鏈接創(chuàng)建/銷毀回調(diào)Hook ----
//設(shè)置鏈接的創(chuàng)建hook函數(shù)
void set_conn_start(conn_callback cb, void *args = NULL)
{
_conn_start_cb = cb;
_conn_start_cb_args = args;
}
//設(shè)置鏈接的銷毀hook函數(shù)
void set_conn_close(conn_callback cb, void *args = NULL) {
_conn_close_cb = cb;
_conn_close_cb_args = args;
}
//創(chuàng)建鏈接之后要觸發(fā)的 回調(diào)函數(shù)
conn_callback _conn_start_cb;
void * _conn_start_cb_args;
//銷毀鏈接之前要觸發(fā)的 回調(diào)函數(shù)
conn_callback _conn_close_cb;
void * _conn_close_cb_args;
// ---------------------------------
bool connected; //鏈接是否創(chuàng)建成功
//server端地址
struct sockaddr_in _server_addr;
io_buf _obuf;
io_buf _ibuf;
private:
int _sockfd;
socklen_t _addrlen;
//處理消息的分發(fā)路由
msg_router _router;
//msg_callback *_msg_callback; //單路由模式去掉
//客戶端的事件處理機(jī)制
event_loop* _loop;
//當(dāng)前客戶端的名稱 用戶記錄日志
const char *_name;
};
B. tcp_client在創(chuàng)建/銷毀調(diào)用Hook
lars_reactor/src/tcp_client.c
//創(chuàng)建鏈接
void tcp_client::do_connect()
{
// ...
// ...
int ret = connect(_sockfd, (const struct sockaddr*)&_server_addr, _addrlen);
if (ret == 0) {
//鏈接創(chuàng)建成功
connected = true;
//調(diào)用開發(fā)者客戶端注冊的創(chuàng)建鏈接之后的hook函數(shù)
if (_conn_start_cb != NULL) {
_conn_start_cb(this, _conn_start_cb_args);
}
// ...
// ...
}
}
//判斷鏈接是否是創(chuàng)建鏈接赛蔫,主要是針對非阻塞socket 返回EINPROGRESS錯誤
static void connection_delay(event_loop *loop, int fd, void *args)
{
tcp_client *cli = (tcp_client*)args;
loop->del_io_event(fd);
int result = 0;
socklen_t result_len = sizeof(result);
getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);
if (result == 0) {
//鏈接是建立成功的
cli->connected = true;
printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
//調(diào)用開發(fā)者注冊的創(chuàng)建鏈接Hook函數(shù)
if (cli->_conn_start_cb != NULL) {
cli->_conn_start_cb(cli, cli->_conn_start_cb_args);
}
// ....
// ...
}
}
//釋放鏈接資源,重置連接
void tcp_client::clean_conn()
{
if (_sockfd != -1) {
printf("clean conn, del socket!\n");
_loop->del_io_event(_sockfd);
close(_sockfd);
}
connected = false;
//調(diào)用開發(fā)者注冊的銷毀鏈接之前觸發(fā)的Hook
if (_conn_close_cb != NULL) {
_conn_close_cb(this, _conn_close_cb_args);
}
//重新連接
this->do_connect();
}
9.3 完成Lars Reactor V0.7開發(fā)
server.cpp
#include "tcp_server.h"
#include <string.h>
//回顯業(yè)務(wù)的回調(diào)函數(shù)
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("callback_busi ...\n");
//直接回顯
conn->send_message(data, len, msgid);
}
//打印信息回調(diào)函數(shù)
void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("recv client: [%s]\n", data);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
//新客戶端創(chuàng)建的回調(diào)
void on_client_build(net_connection *conn, void *args)
{
int msgid = 101;
const char *msg = "welcome! you online..";
conn->send_message(msg, strlen(msg), msgid);
}
//客戶端銷毀的回調(diào)
void on_client_lost(net_connection *conn, void *args)
{
printf("connection is lost !\n");
}
int main()
{
event_loop loop;
tcp_server server(&loop, "127.0.0.1", 7777);
//注冊消息業(yè)務(wù)路由
server.add_msg_router(1, callback_busi);
server.add_msg_router(2, print_busi);
//注冊鏈接hook回調(diào)
server.set_conn_start(on_client_build);
server.set_conn_close(on_client_lost);
loop.event_process();
return 0;
}
client.cpp
#include "tcp_client.h"
#include <stdio.h>
#include <string.h>
//客戶端業(yè)務(wù)
void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
//得到服務(wù)端回執(zhí)的數(shù)據(jù)
printf("recv server: [%s]\n", data);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
//客戶端銷毀的回調(diào)
void on_client_build(net_connection *conn, void *args)
{
int msgid = 1;
const char *msg = "Hello Lars!";
conn->send_message(msg, strlen(msg), msgid);
}
//客戶端銷毀的回調(diào)
void on_client_lost(net_connection *conn, void *args)
{
printf("on_client_lost...\n");
printf("Client is lost!\n");
}
int main()
{
event_loop loop;
//創(chuàng)建tcp客戶端
tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");
//注冊消息路由業(yè)務(wù)
client.add_msg_router(1, busi);
client.add_msg_router(101, busi);
//設(shè)置hook函數(shù)
client.set_conn_start(on_client_build);
client.set_conn_close(on_client_lost);
//開啟事件監(jiān)聽
loop.event_process();
return 0;
}
運(yùn)行結(jié)果
服務(wù)端
$ ./server
msg_router init...
add msg cb msgid = 1
add msg cb msgid = 2
begin accept
get new connection succ!
read data: Hello Lars!
call msgid = 1
callback_busi ...
=======
connection closed by peer
connection is lost !
客戶端:
$ ./client
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 1
add msg cb msgid = 101
connect 127.0.0.1:7777 succ!
do write over, del EPOLLOUT
call msgid = 101
recv server: [welcome! you online..]
msgid: [101]
len: [21]
=======
call msgid = 1
recv server: [Hello Lars!]
msgid: [1]
len: [11]
=======
^C
? 這樣我們的成功的將hook機(jī)制加入進(jìn)去了。
關(guān)于作者:
作者:Aceld(劉丹冰)
mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld
原創(chuàng)聲明:未經(jīng)作者允許請勿轉(zhuǎn)載, 如果轉(zhuǎn)載請注明出處