【Lars教程目錄】
Lars源代碼
https://github.com/aceld/Lars
【Lars系統(tǒng)概述】
第1章-概述
第2章-項(xiàng)目目錄構(gòu)建
【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項(xiàng)目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機(jī)制
第8章-鏈接創(chuàng)建/銷毀Hook機(jī)制
第9章-消息任務(wù)隊(duì)列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測試
第14章-異步消息任務(wù)機(jī)制
第15章-鏈接屬性設(shè)置功能
【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項(xiàng)目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實(shí)時(shí)監(jiān)控
【Lars系統(tǒng)之Report Service模塊】
第1章-項(xiàng)目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報(bào)數(shù)據(jù)
第3章-存儲(chǔ)線程池及消息隊(duì)列
【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項(xiàng)目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計(jì)與實(shí)現(xiàn)
第4章-負(fù)載均衡模塊基礎(chǔ)設(shè)計(jì)
第5章-負(fù)載均衡獲取Host主機(jī)信息API
第6章-負(fù)載均衡上報(bào)Host主機(jī)信息API
第7章-過期窗口清理與過載超時(shí)(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負(fù)載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測試工具
第12章- Lars啟動(dòng)工具腳本
我們接下來要設(shè)計(jì)線程池和與之對(duì)應(yīng)的消息隊(duì)列止喷。具體的總體形勢(shì)應(yīng)該是這樣的
這里面有幾個(gè)類型矮烹,
thread_pool
就是我們要?jiǎng)?chuàng)建的線程池美尸,這里面會(huì)有很多thread
其中每個(gè)thread
都會(huì)啟動(dòng)一個(gè)epoll
也就是我們封裝好的event_loop
來監(jiān)控各自創(chuàng)建好的tcp_conn
的讀寫事件继控。每個(gè)thread
都會(huì)有一個(gè)thread_queue
消息任務(wù)隊(duì)列與之綁定,每個(gè)thread_queue
里面會(huì)接收task_msg
任務(wù)類型嫉柴。
10.1 消息任務(wù)類型
lars_reactor/include/task_msg.h
#pragma once
#include "event_loop.h"
struct task_msg
{
enum TASK_TYPE
{
NEW_CONN, //新建鏈接的任務(wù)
NEW_TASK, //一般的任務(wù)
};
TASK_TYPE type; //任務(wù)類型
//任務(wù)的一些參數(shù)
union {
//針對(duì) NEW_CONN新建鏈接任務(wù)厌杜,需要傳遞connfd
int connfd;
/*==== 暫時(shí)用不上 ==== */
//針對(duì) NEW_TASK 新建任務(wù),
//那么可以給一個(gè)任務(wù)提供一個(gè)回調(diào)函數(shù)
struct {
void (*task_cb)(event_loop*, void *args);
void *args;
};
};
};
? 這里面task_msg一共有兩個(gè)類型的type,一個(gè)是新鏈接的任務(wù)计螺,一個(gè)是普通任務(wù)夯尽。兩個(gè)任務(wù)所攜帶的參數(shù)不同,所以用了一個(gè)union登馒。
?
10.2 消息任務(wù)隊(duì)列
lars_reactor/include/thread_queue.h
#pragma once
#include <queue>
#include <pthread.h>
#include <sys/eventfd.h>
#include <stdio.h>
#include <unistd.h>
#include "event_loop.h"
/*
*
* 每個(gè)thread對(duì)應(yīng)的 消息任務(wù)隊(duì)列
*
* */
template <typename T>
class thread_queue
{
public:
thread_queue()
{
_loop = NULL;
pthread_mutex_init(&_queue_mutex, NULL);
_evfd = eventfd(0, EFD_NONBLOCK);
if (_evfd == -1) {
perror("evenfd(0, EFD_NONBLOCK)");
exit(1);
}
}
~thread_queue()
{
pthread_mutex_destroy(&_queue_mutex);
close(_evfd);
}
//向隊(duì)列添加一個(gè)任務(wù)
void send(const T& task) {
//觸發(fā)消息事件的占位傳輸內(nèi)容
unsigned long long idle_num = 1;
pthread_mutex_lock(&_queue_mutex);
//將任務(wù)添加到隊(duì)列
_queue.push(task);
//向_evfd寫匙握,觸發(fā)對(duì)應(yīng)的EPOLLIN事件,來處理該任務(wù)
int ret = write(_evfd, &idle_num, sizeof(unsigned long long));
if (ret == -1) {
perror("_evfd write");
}
pthread_mutex_unlock(&_queue_mutex);
}
//獲取隊(duì)列,(當(dāng)前隊(duì)列已經(jīng)有任務(wù))
void recv(std::queue<T>& new_queue) {
unsigned int long long idle_num = 1;
pthread_mutex_lock(&_queue_mutex);
//把占位的數(shù)據(jù)讀出來陈轿,確保底層緩沖沒有數(shù)據(jù)存留
int ret = read(_evfd, &idle_num, sizeof(unsigned long long));
if (ret == -1) {
perror("_evfd read");
}
//將當(dāng)前的隊(duì)列拷貝出去,將一個(gè)空隊(duì)列換回當(dāng)前隊(duì)列,同時(shí)清空自身隊(duì)列圈纺,確保new_queue是空隊(duì)列
std::swap(new_queue, _queue);
pthread_mutex_unlock(&_queue_mutex);
}
//設(shè)置當(dāng)前thead_queue是被哪個(gè)事件觸發(fā)event_loop監(jiān)控
void set_loop(event_loop *loop) {
_loop = loop;
}
//設(shè)置當(dāng)前消息任務(wù)隊(duì)列的 每個(gè)任務(wù)觸發(fā)的回調(diào)業(yè)務(wù)
void set_callback(io_callback *cb, void *args = NULL)
{
if (_loop != NULL) {
_loop->add_io_event(_evfd, cb, EPOLLIN, args);
}
}
//得到當(dāng)前l(fā)oop
event_loop * get_loop() {
return _loop;
}
private:
int _evfd; //觸發(fā)消息任務(wù)隊(duì)列讀取的每個(gè)消息業(yè)務(wù)的fd
event_loop *_loop; //當(dāng)前消息任務(wù)隊(duì)列所綁定在哪個(gè)event_loop事件觸發(fā)機(jī)制中
std::queue<T> _queue; //隊(duì)列
pthread_mutex_t _queue_mutex; //進(jìn)行添加任務(wù)、讀取任務(wù)的保護(hù)鎖
};
? 一個(gè)模板類麦射,主要是消息任務(wù)隊(duì)列里的元素類型未必一定是task_msg
類型蛾娶。
thread_queue
需要綁定一個(gè)event_loop
。來觸發(fā)消息到達(dá)潜秋,捕獲消息并且觸發(fā)處理消息業(yè)務(wù)的動(dòng)作蛔琅。
? 這里面有個(gè)_evfd
是為了觸發(fā)消息隊(duì)列消息到達(dá),處理該消息作用的峻呛,將_evfd
加入到對(duì)應(yīng)線程的event_loop
中罗售,然后再通過set_callback
設(shè)置一個(gè)通用的該queue全部消息所觸發(fā)的處理業(yè)務(wù)call_back,在這個(gè)call_back里開發(fā)者可以自定義實(shí)現(xiàn)一些處理業(yè)務(wù)流程辜窑。
- 通過
send
將任務(wù)發(fā)送給消息隊(duì)列。 - 通過
event_loop
觸發(fā)注冊(cè)的io_callback得到消息隊(duì)列里的任務(wù)莽囤。 - 在io_callback中調(diào)用
recv
取得task
任務(wù)谬擦,根據(jù)任務(wù)的不同類型,處理自定義不同業(yè)務(wù)流程朽缎。
10.3 線程池
? 接下來,我們定義線程池谜悟,將thread_queue
和thread_pool
進(jìn)行關(guān)聯(lián)话肖。
lars_reactor/include/thread_pool.h
#pragma once
#include <pthread.h>
#include "task_msg.h"
#include "thread_queue.h"
class thread_pool
{
public:
//構(gòu)造,初始化線程池, 開辟thread_cnt個(gè)
thread_pool(int thread_cnt);
//獲取一個(gè)thead
thread_queue<task_msg>* get_thread();
private:
//_queues是當(dāng)前thread_pool全部的消息任務(wù)隊(duì)列頭指針
thread_queue<task_msg> ** _queues;
//當(dāng)前線程池中的線程個(gè)數(shù)
int _thread_cnt;
//已經(jīng)啟動(dòng)的全部therad編號(hào)
pthread_t * _tids;
//當(dāng)前選中的線程隊(duì)列下標(biāo)
int _index;
};
屬性:
_queues
:是thread_queue
集合葡幸,和當(dāng)前線程數(shù)量一一對(duì)應(yīng)最筒,每個(gè)線程對(duì)應(yīng)一個(gè)queue。里面存的元素是task_msg
蔚叨。
_tids
:保存線程池中每個(gè)線程的ID床蜘。
_thread_cnt
:當(dāng)前線程的個(gè)數(shù).
_index
:表示外層在選擇哪個(gè)thead處理任務(wù)時(shí)的一個(gè)下標(biāo),因?yàn)槭禽喸兲幚砻锼孕枰粋€(gè)下標(biāo)記錄邢锯。
方法:
thread_pool()
:構(gòu)造函數(shù),初始化線程池搀别。
get_thread()
:通過輪詢方式丹擎,獲取一個(gè)線程的thread_queue.
lars_reactor/src/thread_pool.cpp
#include "thread_pool.h"
#include "event_loop.h"
#include "tcp_conn.h"
#include <unistd.h>
#include <stdio.h>
/*
* 一旦有task消息過來,這個(gè)業(yè)務(wù)是處理task消息業(yè)務(wù)的主流程
*
* 只要有人調(diào)用 thread_queue:: send()方法就會(huì)觸發(fā)次函數(shù)
*/
void deal_task_message(event_loop *loop, int fd, void *args)
{
//得到是哪個(gè)消息隊(duì)列觸發(fā)的
thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args;
//將queue中的全部任務(wù)取出來
std::queue<task_msg> tasks;
queue->recv(tasks);
while (tasks.empty() != true) {
task_msg task = tasks.front();
//彈出一個(gè)元素
tasks.pop();
if (task.type == task_msg::NEW_CONN) {
//是一個(gè)新建鏈接的任務(wù)
//并且將這個(gè)tcp_conn加入當(dāng)當(dāng)前線程的loop中去監(jiān)聽
tcp_conn *conn = new tcp_conn(task.connfd, loop);
if (conn == NULL) {
fprintf(stderr, "in thread new tcp_conn error\n");
exit(1);
}
printf("[thread]: get new connection succ!\n");
}
else if (task.type == task_msg::NEW_TASK) {
//是一個(gè)新的普通任務(wù)
//TODO
}
else {
//其他未識(shí)別任務(wù)
fprintf(stderr, "unknow task!\n");
}
}
}
//一個(gè)線程的主業(yè)務(wù)main函數(shù)
void *thread_main(void *args)
{
thread_queue<task_msg> *queue = (thread_queue<task_msg>*)args;
//每個(gè)線程都應(yīng)該有一個(gè)event_loop來監(jiān)控客戶端鏈接的讀寫事件
event_loop *loop = new event_loop();
if (loop == NULL) {
fprintf(stderr, "new event_loop error\n");
exit(1);
}
//注冊(cè)一個(gè)觸發(fā)消息任務(wù)讀寫的callback函數(shù)
queue->set_loop(loop);
queue->set_callback(deal_task_message, queue);
//啟動(dòng)阻塞監(jiān)聽
loop->event_process();
return NULL;
}
thread_pool::thread_pool(int thread_cnt)
{
_index = 0;
_queues = NULL;
_thread_cnt = thread_cnt;
if (_thread_cnt <= 0) {
fprintf(stderr, "_thread_cnt < 0\n");
exit(1);
}
//任務(wù)隊(duì)列的個(gè)數(shù)和線程個(gè)數(shù)一致
_queues = new thread_queue<task_msg>*[thread_cnt];
_tids = new pthread_t[thread_cnt];
int ret;
for (int i = 0; i < thread_cnt; ++i) {
//創(chuàng)建一個(gè)線程
printf("create %d thread\n", i);
//給當(dāng)前線程創(chuàng)建一個(gè)任務(wù)消息隊(duì)列
_queues[i] = new thread_queue<task_msg>();
ret = pthread_create(&_tids[i], NULL, thread_main, _queues[i]);
if (ret == -1) {
perror("thread_pool, create thread");
exit(1);
}
//將線程脫離
pthread_detach(_tids[i]);
}
}
thread_queue<task_msg>* thread_pool::get_thread()
{
if (_index == _thread_cnt) {
_index = 0;
}
return _queues[_index];
}
? 這里主要看deal_task_message()
方法歇父,是處理收到的task任務(wù)的蒂培。目前我們只對(duì)NEW_CONN
類型的任務(wù)進(jìn)行處理,一般任務(wù)先不做處理榜苫,因?yàn)闀簳r(shí)用不上护戳。
? NEW_CONN
的處理主要是讓當(dāng)前線程創(chuàng)建鏈接,并且將該鏈接由當(dāng)前線程的event_loop接管垂睬。
? 接下來我們就要將線程池添加到reactor框架中去媳荒。
10.4 reactor線程池關(guān)聯(lián)
? 將線程池添加到tcp_server
中。
lars_reactor/include/tcp_server.h
#pragma once
#include <netinet/in.h>
#include "event_loop.h"
#include "tcp_conn.h"
#include "message.h"
#include "thread_pool.h"
class tcp_server
{
public:
// ...
// ...
private:
// ...
//線程池
thread_pool *_thread_pool;
};
在構(gòu)造函數(shù)中羔飞,添加_thread_pool的初始化工作肺樟。并且在accept成功之后交給線程處理客戶端的讀寫事件。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>
#include <signal.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>
#include "tcp_server.h"
#include "tcp_conn.h"
#include "reactor_buf.h"
//server的構(gòu)造函數(shù)
tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port)
{
// ...
//6 創(chuàng)建鏈接管理
_max_conns = MAX_CONNS;
//創(chuàng)建鏈接信息數(shù)組
conns = new tcp_conn*[_max_conns+3];//3是因?yàn)閟tdin,stdout,stderr 已經(jīng)被占用逻淌,再新開fd一定是從3開始,所以不加3就會(huì)棧溢出
if (conns == NULL) {
fprintf(stderr, "new conns[%d] error\n", _max_conns);
exit(1);
}
//7 =============創(chuàng)建線程池=================
int thread_cnt = 3;//TODO 從配置文件中讀取
if (thread_cnt > 0) {
_thread_pool = new thread_pool(thread_cnt);
if (_thread_pool == NULL) {
fprintf(stderr, "tcp_server new thread_pool error\n");
exit(1);
}
}
// ========================================
//8 注冊(cè)_socket讀事件-->accept處理
_loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this);
}
//開始提供創(chuàng)建鏈接服務(wù)
void tcp_server::do_accept()
{
int connfd;
while(true) {
//accept與客戶端創(chuàng)建鏈接
printf("begin accept\n");
connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
if (connfd == -1) {
if (errno == EINTR) {
fprintf(stderr, "accept errno=EINTR\n");
continue;
}
else if (errno == EMFILE) {
//建立鏈接過多么伯,資源不夠
fprintf(stderr, "accept errno=EMFILE\n");
}
else if (errno == EAGAIN) {
fprintf(stderr, "accept errno=EAGAIN\n");
break;
}
else {
fprintf(stderr, "accept error\n");
exit(1);
}
}
else {
//accept succ!
int cur_conns;
get_conn_num(&cur_conns);
//1 判斷鏈接數(shù)量
if (cur_conns >= _max_conns) {
fprintf(stderr, "so many connections, max = %d\n", _max_conns);
close(connfd);
}
else {
// ========= 將新連接由線程池處理 ==========
if (_thread_pool != NULL) {
//啟動(dòng)多線程模式 創(chuàng)建鏈接
//1 選擇一個(gè)線程來處理
thread_queue<task_msg>* queue = _thread_pool->get_thread();
//2 創(chuàng)建一個(gè)新建鏈接的消息任務(wù)
task_msg task;
task.type = task_msg::NEW_CONN;
task.connfd = connfd;
//3 添加到消息隊(duì)列中,讓對(duì)應(yīng)的thread進(jìn)程event_loop處理
queue->send(task);
// =====================================
}
else {
//啟動(dòng)單線程模式
tcp_conn *conn = new tcp_conn(connfd, _loop);
if (conn == NULL) {
fprintf(stderr, "new tcp_conn error\n");
exit(1);
}
printf("[tcp_server]: get new connection succ!\n");
break;
}
}
}
}
}
10.5 完成Lars ReactorV0.8開發(fā)
? 0.8版本的server.cpp和client.cpp是不用改變的卡儒。開啟服務(wù)端和客戶端觀察執(zhí)行結(jié)果即可田柔。
服務(wù)端:
$ ./server
msg_router init...
create 0 thread
create 1 thread
create 2 thread
add msg cb msgid = 1
add msg cb msgid = 2
begin accept
begin accept
[thread]: get new connection succ!
read data: Hello Lars!
call msgid = 1
call data = Hello Lars!
call msglen = 11
callback_busi ...
=======
客戶端
$ ./client
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 1
add msg cb msgid = 101
connect 127.0.0.1:7777 succ!
do write over, del EPOLLOUT
call msgid = 101
call data = welcome! you online..?
call msglen = 21
recv server: [welcome! you online..]
msgid: [101]
len: [21]
=======
call msgid = 1
call data = Hello Lars!
call msglen = 11
recv server: [Hello Lars!]
msgid: [1]
len: [11]
=======
? 我們會(huì)發(fā)現(xiàn)俐巴,鏈接已經(jīng)成功創(chuàng)建成功,并且是由于線程處理的讀寫任務(wù)硬爆。
關(guān)于作者:
作者:Aceld(劉丹冰)
mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld
原創(chuàng)聲明:未經(jīng)作者允許請(qǐng)勿轉(zhuǎn)載, 如果轉(zhuǎn)載請(qǐng)注明出處