注:本文為閱讀了muduo網(wǎng)絡(luò)庫源碼及作者著作之后對于網(wǎng)絡(luò)庫的復(fù)現(xiàn)和筆記
功能
我們定義一個(gè)class Acceptor
我擂,其功能是:讓服務(wù)器在指定的端口處進(jìn)行監(jiān)聽,如果在端口監(jiān)聽到連接缓艳,則執(zhí)行由class Acceptor
的類用戶注冊的回調(diào)函數(shù)校摩。
底層API
首先梳理一下與Acceptor相關(guān)的底層API調(diào)用。
int socket(int domain, int type, int protocol)
用于創(chuàng)建本地socket fd阶淘,domain
指示網(wǎng)絡(luò)的通信所在域衙吩,通常選擇AF_INET
即可,代表IPV4溪窒;type
指示socket fd類型坤塞,對于TCP協(xié)議冯勉,因?yàn)槭橇魇絽f(xié)議,加上我們的網(wǎng)絡(luò)庫的非阻塞特性摹芙,而通常還需指定CLOSE ON EXECVE灼狰,所以通常需要輸入SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC
;protocol
指示具體協(xié)議浮禾,這里我們選用IPPROTO_TCP
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrLen)
用于將socket()
生成的socket fd與服務(wù)器的監(jiān)聽地址(IP加端口)進(jìn)行綁定伏嗜。int listen(sockfd, int backlog)
對生成的套接字進(jìn)行實(shí)際的監(jiān)聽,backlog
指定能夠同時(shí)容納的最多監(jiān)聽個(gè)數(shù)伐厌。int accept(int sockfd, struct sockaddr *peerAddr, socklen_t addrLen )
用于監(jiān)聽到了“來電”之后承绸,接受對應(yīng)的TCP連接,返回對端(客戶端)的socket fd挣轨,并將對端的地址填入到peerAddr
中军熏。
模塊拆分
之前的Reactor模式是一個(gè)經(jīng)典的IO多路復(fù)用模式,我們已經(jīng)用一個(gè)class EventLoop
抽象出了整個(gè)多路復(fù)用的網(wǎng)絡(luò)模型卷扮,接下來就是將這個(gè)模型用起來荡澎,去構(gòu)建實(shí)際的socket網(wǎng)絡(luò)程序了。
socket網(wǎng)絡(luò)編程設(shè)計(jì)到了繁多的底層API接口晤锹,在現(xiàn)代C++特性之中摩幔,自然也要對其進(jìn)行合理的封裝,才能發(fā)揮出語言的最大優(yōu)勢鞭铆。首先繪制一下服務(wù)端的對于TCP連接的接收過程或衡,如圖1所示。
對于底層API而言车遂,"socket() 到 bind() "是一條龍式的操作封断,因此可以定義一個(gè)class Socket
來封裝這個(gè)過程。服務(wù)器根據(jù)這一套流程建立本地的socket舶担。隨后當(dāng)開始決定監(jiān)聽之后坡疼,則開始進(jìn)行"listen()",當(dāng)探測到連接請求的時(shí)候衣陶, 開始" accept()"柄瑰,得到對端的socket以及網(wǎng)絡(luò)地址,然后就可以調(diào)用用戶注冊的回調(diào)函數(shù)了剪况。需要注意的是圖1中的handleRead()
子框教沾,這個(gè)子框才是Acceptor中的Channel的readable回調(diào)。原因其實(shí)很簡單拯欧,因?yàn)閷τ诜?wù)器而言详囤,當(dāng)探測到有客戶端發(fā)起連接請求之后,服務(wù)器的callback應(yīng)該是先建立連接镐作,再執(zhí)行用戶回調(diào)藏姐,那么“建立連接+執(zhí)行用戶回調(diào)”的整個(gè)過程才是“客戶端發(fā)起連接請求”這個(gè)readable事件的回調(diào)。
代碼實(shí)戰(zhàn)
/* Socket.h */
#ifndef SOCKET_H
#define SOCKET_H
#include "muduo/base/noncopyable.h"
#include <muduo/net/InetAddress.h>
#include <boost/noncopyable.hpp>
class Socket: boost::noncopyable
{
private:
const int sockfd_;
public:
int fd() const { return sockfd_; }
void bindAddress(const muduo::net::InetAddress& localaddr);
void listen();
// fill the peeraddr and return peer connection fd
// If failed, return -1;
int accept(muduo::net::InetAddress* peeraddr);
// SO_REUSEADDR
void setReuseAddr(bool on);
// SO_REUSEPORT
void setReusePort(bool on);
explicit Socket(const int &fd);
~Socket();
};
#endif /* SOCKET_H */
#include "Socket.h"
#include "muduo/base/Logging.h"
#include "muduo/net/InetAddress.h"
#include "SockOptions.h"
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <stdio.h> // snprintf
Socket::Socket(const int &fd):
sockfd_(fd)
{
}
Socket::~Socket(){
sockoptions::close(sockfd_);
}
void Socket::bindAddress(const muduo::net::InetAddress &localaddr){
sockoptions::bindOrDie(sockfd_, localaddr.getSockAddr());
}
void Socket::listen(){
sockoptions::listenOrDie(sockfd_);
}
int Socket::accept(muduo::net::InetAddress *peeraddr){
sockaddr_in6 tmpAddr;
bzero(&tmpAddr, sizeof tmpAddr);
int connfd = sockoptions::accept(sockfd_, &tmpAddr);
peeraddr->setSockAddrInet6(tmpAddr);
return connfd;
}
void Socket::setReuseAddr(bool on){
int reused = on;
int len = static_cast<socklen_t>(sizeof reused);
int ret = ::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &reused, len);
if(ret < 0){
LOG_ERROR << "setReuseAddr falied";
}
}
void Socket::setReusePort(bool on){
int reused = on;
int len = static_cast<socklen_t>(sizeof reused);
int ret = ::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &reused, len);
if(ret < 0){
LOG_ERROR << "setReuseAddr falied";
}
}
/* Acceptor.h */
#ifndef ACCEPTOR_H
#define ACCEPTOR_H
#include <functional>
#include <muduo/net/InetAddress.h>
#include <memory>
#include <boost/noncopyable.hpp>
#include "../Reactor/Channel.h"
#include "Socket.h"
class EventLoop;
class Acceptor : boost::noncopyable
{
// sending fd handle is not an ideal solution, better solution is
// sending a Socket object which uses RAII
using ConnCallback = std::function<void (int, const muduo::net::InetAddress &)>;
// using ConnCallback = std::function<void (Socket, const muduo::net::InetAddress &)>;
private:
EventLoop *loop_;
ConnCallback cb_;
std::unique_ptr<Socket> socket_;
Channel socketChannel_;
bool listening_;
public:
Acceptor(EventLoop *loop, const muduo::net::InetAddress & localAddr);
~Acceptor();
bool isListening() const {
return listening_;
}
void listen();
void handleRead();
void setNewConnectionCallback(const ConnCallback &func);
};
#endif /* ACCEPTOR_H */
/* Accpetor.cc */
#include <muduo/base/Logging.h>
#include "../Reactor/EventLoop.h"
#include "../Reactor/Channel.h"
#include "Acceptor.h"
#include "SockOptions.h"
#include "Socket.h"
Acceptor::Acceptor(EventLoop *loop, const muduo::net::InetAddress &localAddr)
:loop_(loop),
socket_(new Socket (sockoptions::createNonblockingOrDie(AF_INET))),
socketChannel_(loop, socket_->fd()),
listening_(false)
{
socket_->setReuseAddr(true);
socket_->bindAddress(localAddr);
socketChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}
Acceptor::~Acceptor()
{
}
void Acceptor::listen(){
loop_->assertInLoopThread();
listening_ = true;
socket_->listen(); // call the socket API `listen()`
socketChannel_.enableRead(); // ready to call the callback
}
void Acceptor::handleRead(){
muduo::net::InetAddress addr; // this is the perr address
int connfd = socket_->accept(&addr);
if(connfd < 0){
LOG_FATAL << "Acceptor - socket accept failed";
return ;
}
if(cb_){
cb_(connfd, addr);
} else {
LOG_ERROR << "Acceptor - NewConnectionCallback unset";
}
}
void Acceptor::setNewConnectionCallback(const ConnCallback &func){
cb_ = func;
}
/* sockOptions.h */
#include <arpa/inet.h>
namespace sockoptions
{
int createNonblockingOrDie(sa_family_t family);
void bindOrDie(int sockfd, const struct sockaddr* addr);
void listenOrDie(int sockfd);
void close(int sockfd);
int accept(int sockfd, struct sockaddr_in6* addr);
const struct sockaddr* sockaddr_cast(const struct sockaddr_in* addr);
const struct sockaddr* sockaddr_cast(const struct sockaddr_in6* addr);
struct sockaddr* sockaddr_cast(struct sockaddr_in6* addr);
}
/* sockOptions.h */
#include "./SockOptions.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Types.h"
#include "muduo/net/Endian.h"
#include <errno.h>
#include <fcntl.h>
#include <stdio.h> // snprintf
#include <sys/socket.h>
#include <sys/uio.h> // readv
#include <unistd.h>
int sockoptions::createNonblockingOrDie(sa_family_t family){
// stream == tcp
int sockfd = ::socket(family,
SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC,
IPPROTO_TCP);
if(sockfd < 0){
LOG_FATAL << "Create socket failed!";
}
return sockfd;
}
void sockoptions::bindOrDie(int sockfd, const struct sockaddr *addr){
socklen_t len = static_cast<socklen_t>(sizeof(*addr));
int ret = ::bind(sockfd, addr, len);
if(ret < 0){
LOG_FATAL << "Bind address failed!";
}
}
void sockoptions::listenOrDie(int sockfd){
int ret = ::listen(sockfd, SOMAXCONN);
if(ret < 0){
LOG_FATAL << "Listen socket failed!";
}
}
void sockoptions::close(int sockfd){
int ret = ::close(sockfd);
if(ret < 0){
LOG_FATAL << "Close sockfd falied!";
}
}
int sockoptions::accept(int sockfd, struct sockaddr_in6 *addr){
struct sockaddr *sa = sockaddr_cast(addr);
socklen_t len = static_cast<socklen_t>(sizeof(*addr));
int connfd = ::accept(sockfd, sa, &len);
if(connfd < 0){
LOG_FATAL << "Accept socket failed!";
}
return connfd;
}
const struct sockaddr * sockoptions::sockaddr_cast(const struct sockaddr_in *addr){
return static_cast<const struct sockaddr *>(muduo::implicit_cast<const void*>(addr));
}
const struct sockaddr * sockoptions::sockaddr_cast(const struct sockaddr_in6 *addr){
return static_cast<const struct sockaddr *>(muduo::implicit_cast<const void*>(addr));
}
struct sockaddr * sockoptions::sockaddr_cast(struct sockaddr_in6 *addr){
return static_cast<struct sockaddr *>(muduo::implicit_cast<void *>(addr));
}
/* main_Accpetor.cc */
#include "../Reactor/EventLoop.h"
#include "Acceptor.h"
#include <muduo/net/InetAddress.h>
void callbackFunc(int connfd, const muduo::net::InetAddress &addr){
printf("A new connection comming from %s\n", addr.toIpPort().c_str());
char msg[] = "Hello, I can hear you calling me\n";
::write(connfd, msg, sizeof msg);
}
int main(int argc, char *argv[])
{
EventLoop loop;
muduo::net::InetAddress localAddr(2333);
muduo::net::InetAddress localAddr2(3332);
Acceptor acceptor(&loop, localAddr);
Acceptor acceptor2(&loop, localAddr2);
acceptor.setNewConnectionCallback(&callbackFunc);
acceptor2.setNewConnectionCallback(&callbackFunc);
acceptor.listen();
acceptor2.listen();
loop.loop();
return 0;
}
運(yùn)行結(jié)果
實(shí)現(xiàn)過程中的一些知識(shí)點(diǎn)總結(jié)
sockaddr
和sockaddr_in
兩者之間是相互補(bǔ)充的關(guān)系该贾。大多數(shù)諸如::bind()
的底層socket API使用struct sockaddr *
作為入?yún)㈩愋透嵫睿?code>sockaddr的小缺陷是,它將IP端口和IP地址混在了一個(gè)變量中杨蛋,故賦值時(shí)不太方便兜材。sockaddr_in
彌補(bǔ)了這一問題,它將端口和地址進(jìn)行了分離逞力,同時(shí)為了適配sockaddr
曙寡,又填充了一些不使用的變量,使得兩種結(jié)構(gòu)體的內(nèi)存分布完全一致寇荧,因此兩種類型的指針可以相互轉(zhuǎn)換举庶。總結(jié)一下:sockaddr_in
簡化了變量的賦值揩抡,sockaddr
用于函數(shù)的傳參户侥。implicit_cast
該類型轉(zhuǎn)換應(yīng)該是為了更安全的進(jìn)行精確類型轉(zhuǎn)換,當(dāng)使用implicit_cast
的時(shí)候峦嗤,編譯器會(huì)去檢查該轉(zhuǎn)換是否安全蕊唐。(不過該cast暫時(shí)還沒有納入標(biāo)準(zhǔn)庫)SOMAXCONN
在調(diào)用::listen()
的時(shí)候需要指定最多可以支持多少連接請求,為此系統(tǒng)定義了一個(gè)專門的宏SOMAXCONN
用來表示系統(tǒng)所支持的最多請求個(gè)數(shù)烁设。setsockopt()
網(wǎng)絡(luò)通信會(huì)在不同的層級(jí)或者協(xié)議之中擁有不同的設(shè)置選項(xiàng)替梨,setsockopt()
的功能就是將設(shè)置這些選項(xiàng)都抽象到了頂層的socket這一層級(jí)。比如說要設(shè)置socket底層API層級(jí)的某些選項(xiàng)装黑,抑或是設(shè)置TCP協(xié)議中的某些選項(xiàng)耙替,都通過該函數(shù)來執(zhí)行。
-
FD_CLOEXEC
默認(rèn)情況下曹体,使用fork() + execve()
開啟新的子進(jìn)程時(shí)俗扇,父進(jìn)程打開的文件描述符fd
不會(huì)被關(guān)閉。為了讓用戶能夠自己控制是否在execve()
后關(guān)閉fd
箕别,系統(tǒng)設(shè)定了一個(gè)全局的變量铜幽,變量中的各個(gè)標(biāo)志位代表不同的fd
,如果設(shè)定了FD_CLOEXEC
串稀,則會(huì)在啟動(dòng)子進(jìn)程時(shí)關(guān)閉對應(yīng)的fd
除抛。相應(yīng)的SOCK_CLOEXEC
和TFD_CLOEXEC
可以理解是對FD_CLOEXEC
的繼承,它們分別為timer fd
和socket fd
服務(wù)
TcpServer
TcpServer是Acceptor的直接使用者母截,其實(shí)只需要將TcpServer的Callback注冊到Acceptor里面即可