前言
對socket在內(nèi)核的設(shè)計又了初步的印象后,可以進一步的探索socket整個流程歧寺。在這里我們先討論服務端中敛苇,如果把準備好一個socket 綁定并進行監(jiān)聽的竭鞍。
如果遇到什么問題可以來 http://www.reibang.com/p/62dd608667e2 本文下討論
正文
來看看服務端初始化的核心:
ServerSocket server = new ServerSocket(port);
Socket socket = server.accept();
首先看看初始化:
public ServerSocket(int port) throws IOException {
this(port, 50, null);
}
public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
setImpl();
if (port < 0 || port > 0xFFFF)
throw new IllegalArgumentException(
"Port value out of range: " + port);
if (backlog < 1)
backlog = 50;
try {
bind(new InetSocketAddress(bindAddr, port), backlog);
} catch(SecurityException e) {
close();
throw e;
} catch(IOException e) {
close();
throw e;
}
}
首先限制了服務監(jiān)聽的端口號必須在0-65535 之間。一旦超出則直接報錯IllegalArgumentException
.
接著調(diào)用bind
方法裆熙。
public void bind(SocketAddress endpoint, int backlog) throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!oldImpl && isBound())
throw new SocketException("Already bound");
if (endpoint == null)
endpoint = new InetSocketAddress(0);
if (!(endpoint instanceof InetSocketAddress))
throw new IllegalArgumentException("Unsupported address type");
InetSocketAddress epoint = (InetSocketAddress) endpoint;
if (epoint.isUnresolved())
throw new SocketException("Unresolved address");
if (backlog < 1)
backlog = 50;
try {
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkListen(epoint.getPort());
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);
bound = true;
} catch(SecurityException e) {
bound = false;
throw e;
} catch(IOException e) {
bound = false;
throw e;
}
}
首先在android中getSecurityManager
返回的是空端礼,這里不考察。
接著走的邏輯是兩個步驟:
- SocksSocketImpl 的 bind入录,參數(shù)就是傳遞進來的
InetSocketAddress
- SocksSocketImpl 的 bind入录,參數(shù)就是傳遞進來的
- SocksSocketImpl 的listen 蛤奥,參數(shù)為backlog的50
1.SocksSocketImpl bind
/**
* Binds the socket to the specified address of the specified local port.
* @param address the address
* @param lport the port
*/
protected synchronized void bind(InetAddress address, int lport)
throws IOException
{
synchronized (fdLock) {
if (!closePending && (socket == null || !socket.isBound())) {
NetHooks.beforeTcpBind(fd, address, lport);
}
}
socketBind(address, lport);
if (socket != null)
socket.setBound();
if (serverSocket != null)
serverSocket.setBound();
}
核心調(diào)用了如下幾個方法:
- 1.socketBind
- 2.setBound
socketBind
void socketBind(InetAddress address, int port) throws IOException {
if (fd == null || !fd.valid()) {
throw new SocketException("Socket closed");
}
IoBridge.bind(fd, address, port);
this.address = address;
if (port == 0) {
// Now that we're a connected socket, let's extract the port number that the system
// chose for us and store it in the Socket object.
localport = IoBridge.getLocalInetSocketAddress(fd).getPort();
} else {
localport = port;
}
}
public static void bind(FileDescriptor fd, InetAddress address, int port) throws SocketException {
...
try {
Libcore.os.bind(fd, address, port);
} catch (ErrnoException errnoException) {
if (errnoException.errno == EADDRINUSE || errnoException.errno == EADDRNOTAVAIL ||
errnoException.errno == EPERM || errnoException.errno == EACCES) {
throw new BindException(errnoException.getMessage(), errnoException);
} else {
throw new SocketException(errnoException.getMessage(), errnoException);
}
}
}
核心調(diào)用了Linux
的bind
方法。
2.libcore_io_Linux Linux_bind
文件: /libcore/luni/src/main/native/libcore_io_Linux.cpp
static void Linux_bind(JNIEnv* env, jobject, jobject javaFd, jobject javaAddress, jint port) {
// We don't need the return value because we'll already have thrown.
(void) NET_IPV4_FALLBACK(env, int, bind, javaFd, javaAddress, port, NULL_ADDR_FORBIDDEN);
}
核心是下面兩個宏:
#define NET_IPV4_FALLBACK(jni_env, return_type, syscall_name, java_fd, java_addr, port, null_addr_ok, args...) ({ \
return_type _rc = -1; \
do { \
sockaddr_storage _ss; \
socklen_t _salen; \
if ((java_addr) == NULL && (null_addr_ok)) { \
/* No IP address specified (e.g., sendto() on a connected socket). */ \
_salen = 0; \
} else if (!inetAddressToSockaddr(jni_env, java_addr, port, _ss, _salen)) { \
/* Invalid socket address, return -1. inetAddressToSockaddr has already thrown. */ \
break; \
} \
sockaddr* _sa = _salen ? reinterpret_cast<sockaddr*>(&_ss) : NULL; \
/* inetAddressToSockaddr always returns an IPv6 sockaddr. Assume that java_fd was created \
* by Java API calls, which always create IPv6 socket fds, and pass it in as is. */ \
_rc = NET_FAILURE_RETRY(jni_env, return_type, syscall_name, java_fd, ##args, _sa, _salen); \
if (_rc == -1 && errno == EAFNOSUPPORT && _salen && isIPv4MappedAddress(_sa)) { \
/* We passed in an IPv4 address in an IPv6 sockaddr and the kernel told us that we got \
* the address family wrong. Pass in the same address in an IPv4 sockaddr. */ \
(jni_env)->ExceptionClear(); \
if (!inetAddressToSockaddrVerbatim(jni_env, java_addr, port, _ss, _salen)) { \
break; \
} \
_sa = reinterpret_cast<sockaddr*>(&_ss); \
_rc = NET_FAILURE_RETRY(jni_env, return_type, syscall_name, java_fd, ##args, _sa, _salen); \
} \
} while (0); \
_rc; }) \
#define NET_FAILURE_RETRY(jni_env, return_type, syscall_name, java_fd, ...) ({ \
return_type _rc = -1; \
int _syscallErrno; \
do { \
bool _wasSignaled; \
{ \
int _fd = jniGetFDFromFileDescriptor(jni_env, java_fd); \
AsynchronousCloseMonitor _monitor(_fd); \
_rc = syscall_name(_fd, __VA_ARGS__); \
_syscallErrno = errno; \
_wasSignaled = _monitor.wasSignaled(); \
} \
if (_wasSignaled) { \
jniThrowException(jni_env, "java/net/SocketException", "Socket closed"); \
_rc = -1; \
break; \
} \
if (_rc == -1 && _syscallErrno != EINTR) { \
/* TODO: with a format string we could show the arguments too, like strace(1). */ \
throwErrnoException(jni_env, # syscall_name); \
break; \
} \
} while (_rc == -1); /* _syscallErrno == EINTR && !_wasSignaled */ \
if (_rc == -1) { \
/* If the syscall failed, re-set errno: throwing an exception might have modified it. */ \
errno = _syscallErrno; \
} \
_rc; })
核心能看到通過jni反射獲取FileDescriptor 中的fd 具柄僚稿,然后調(diào)用bind
系統(tǒng)調(diào)用凡桥。
其中bind系統(tǒng)調(diào)用的參數(shù),通過inetAddressToSockaddr
把InetAddress 類獲取fd,協(xié)議族類型蚀同,port轉(zhuǎn)化成sockaddr_in
結(jié)構(gòu)體:
struct sockaddr_in {
__kernel_sa_family_t sin_family; // 族群
__be16 sin_port; // port 端口
struct in_addr sin_addr; // ip地址
unsigned char __pad[__SOCK_SIZE__ - sizeof(short int) - sizeof(unsigned short int) - sizeof(struct in_addr)];
};
值得學習的一點是唬血,在c的編程中使用 #define 的定義』秸福可以使用do... while(0)的方法拷恨,保證一個代碼域的完整性,不被如if等特殊的程序順序符給截斷宏的代碼完整邏輯性
3.內(nèi)核的bind 系統(tǒng)調(diào)用
SYSCALL_DEFINE3(bind, int, fd, struct sockaddr __user *, umyaddr, int, addrlen)
{
struct socket *sock;
struct sockaddr_storage address;
int err, fput_needed;
sock = sockfd_lookup_light(fd, &err, &fput_needed);
if (sock) {
err = move_addr_to_kernel(umyaddr, addrlen, &address);
if (err >= 0) {
err = security_socket_bind(sock,
(struct sockaddr *)&address,
addrlen);
if (!err)
err = sock->ops->bind(sock,
(struct sockaddr *)
&address, addrlen);
}
fput_light(sock->file, fput_needed);
}
return err;
}
1.
sockfd_lookup_light
通過fd找到文件結(jié)構(gòu)體的private_data
私有數(shù)據(jù) 谢肾,也就是socket
結(jié)構(gòu)體-
move_addr_to_kernel
方法 則是把用戶態(tài)的sockaddr
轉(zhuǎn)化為sockaddr_storage
腕侄。
-
3.
security_socket_bind
進行selinux的校驗,判斷是否有權(quán)限調(diào)用socket文件描述符的bind方法芦疏。4.沒問題則調(diào)用
socket
結(jié)構(gòu)體的bind
方法冕杠。5.
fput_light
根據(jù)fput_needed
決定是是否回收socket結(jié)構(gòu)體中的fd 分配的句柄。
注意酸茴,這里的ops是指socket結(jié)構(gòu)體中的proto_ops
.如果此時是IPV4協(xié)議分预,那么就是指inet_stream_ops
的bind
方法指針,也就是inet_bind方法薪捍。
3.1.proto_ops inet_bind
int inet_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
{
struct sockaddr_in *addr = (struct sockaddr_in *)uaddr;
struct sock *sk = sock->sk;
struct inet_sock *inet = inet_sk(sk);
struct net *net = sock_net(sk);
unsigned short snum;
int chk_addr_ret;
int err;
snum = ntohs(addr->sin_port);
err = -EACCES;
if (snum && snum < PROT_SOCK &&
!ns_capable(net->user_ns, CAP_NET_BIND_SERVICE))
goto out;
lock_sock(sk);
/* Check these errors (active socket, double bind). */
err = -EINVAL;
if (sk->sk_state != TCP_CLOSE || inet->inet_num)
goto out_release_sock;
inet->inet_rcv_saddr = inet->inet_saddr = addr->sin_addr.s_addr;
if (chk_addr_ret == RTN_MULTICAST || chk_addr_ret == RTN_BROADCAST)
inet->inet_saddr = 0; /* Use device */
/* Make sure we are allowed to bind here. */
if (sk->sk_prot->get_port(sk, snum)) {
inet->inet_saddr = inet->inet_rcv_saddr = 0;
err = -EADDRINUSE;
goto out_release_sock;
}
if (inet->inet_rcv_saddr)
sk->sk_userlocks |= SOCK_BINDADDR_LOCK;
if (snum)
sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
inet->inet_sport = htons(inet->inet_num);
inet->inet_daddr = 0;
inet->inet_dport = 0;
sk_dst_reset(sk);
err = 0;
out_release_sock:
release_sock(sk);
out:
return err;
}
EXPORT_SYMBOL(inet_bind);
這個過程實際上很簡單笼痹,就是把socket結(jié)構(gòu)體轉(zhuǎn)化回inet_sock
結(jié)構(gòu)體。并在inet_sock
的inet_sport
記錄來源的ip地址酪穿。初始化 inet_daddr
以及 inet_dport
.也就是初始化目標通信的端口ip和port凳干。
注意這個過程中,先從
sockaddr
的sin_port
獲取到從jni中設(shè)置進去的端口號被济。接著通過
sk->sk_prot->get_port
把端口snum
設(shè)置到inet_sock
的inet_num
中救赐,并校驗當前的端口號是否小于1024
,因為只有超級用戶才能使用小于1024的端口號,如果發(fā)現(xiàn)非法則綁定失敗只磷。最后才將
inet_num
設(shè)置到inet_sport
作為服務端設(shè)置的端口號经磅。
4.SocketServer socketListen
同理泌绣,listen也是類似的邏輯。最后會調(diào)用到PlainSocketImpl
的socketListen
void socketListen(int count) throws IOException {
if (fd == null || !fd.valid()) {
throw new SocketException("Socket closed");
}
try {
Libcore.os.listen(fd, count);
} catch (ErrnoException errnoException) {
throw errnoException.rethrowAsSocketException();
}
}
static void Linux_listen(JNIEnv* env, jobject, jobject javaFd, jint backlog) {
int fd = jniGetFDFromFileDescriptor(env, javaFd);
throwIfMinusOne(env, "listen", TEMP_FAILURE_RETRY(listen(fd, backlog)));
}
template <typename rc_t>
static rc_t throwIfMinusOne(JNIEnv* env, const char* name, rc_t rc) {
if (rc == rc_t(-1)) {
throwErrnoException(env, name);
}
return rc;
}
能看到這個過程中预厌,實際上還是調(diào)用了listen系統(tǒng)調(diào)用阿迈。不過一旦listen調(diào)用返回異常,就會把異常跑到了Java層配乓。
5.Linux 內(nèi)核listen
SYSCALL_DEFINE2(listen, int, fd, int, backlog)
{
struct socket *sock;
int err, fput_needed;
int somaxconn;
sock = sockfd_lookup_light(fd, &err, &fput_needed);
if (sock) {
somaxconn = sock_net(sock->sk)->core.sysctl_somaxconn;
if ((unsigned int)backlog > somaxconn)
backlog = somaxconn;
err = security_socket_listen(sock, backlog);
if (!err)
err = sock->ops->listen(sock, backlog);
fput_light(sock->file, fput_needed);
}
return err;
}
這里的邏輯和bind十分相似。本質(zhì)上先從socket結(jié)構(gòu)體中獲取ops惠毁,也就是proto_ops
結(jié)構(gòu)體犹芹。在ipV4的協(xié)議中也就是指代inet_stream_ops
。在這里也就是指向方法指針inet_listen
`
5.1.inet_stream_ops inet_listen
int inet_listen(struct socket *sock, int backlog)
{
struct sock *sk = sock->sk;
unsigned char old_state;
int err;
lock_sock(sk);
err = -EINVAL;
if (sock->state != SS_UNCONNECTED || sock->type != SOCK_STREAM)
goto out;
old_state = sk->sk_state;
if (!((1 << old_state) & (TCPF_CLOSE | TCPF_LISTEN)))
goto out;
if (old_state != TCP_LISTEN) {
if ((sysctl_tcp_fastopen & TFO_SERVER_ENABLE) != 0 &&
inet_csk(sk)->icsk_accept_queue.fastopenq == NULL) {
if ((sysctl_tcp_fastopen & TFO_SERVER_WO_SOCKOPT1) != 0)
err = fastopen_init_queue(sk, backlog);
else if ((sysctl_tcp_fastopen &
TFO_SERVER_WO_SOCKOPT2) != 0)
err = fastopen_init_queue(sk,
((uint)sysctl_tcp_fastopen) >> 16);
else
err = 0;
if (err)
goto out;
}
err = inet_csk_listen_start(sk, backlog);
if (err)
goto out;
}
sk->sk_max_ack_backlog = backlog;
err = 0;
out:
release_sock(sk);
return err;
}
EXPORT_SYMBOL(inet_listen);
1.首先校驗
sock
結(jié)構(gòu)體中的type
類型必須是SOCK_STREAM
并且state
是SS_UNCONNECTED
沒有鏈接狀態(tài)鞠绰,才能繼續(xù)向下走邏輯腰埂。否則返回異常,并釋放sock結(jié)構(gòu)體蜈膨。2.
sock
結(jié)構(gòu)體的state字段往左移動一位屿笼,并校驗當前是否是LISTEN或者TCPF_CLOSE狀態(tài)。如果是則釋放sock結(jié)構(gòu)體3.如果當前的狀態(tài)不是
TCP_LISTEN
,那么會通過fastopen_init_queue
初始化在sock結(jié)構(gòu)體中的accept隊列翁巍,最后調(diào)用inet_csk_listen_start
刷新當前的狀態(tài)為TCP_LISTEN
驴一。如果已經(jīng)處于了TCP_LISTEN
監(jiān)聽狀態(tài),那么就刷新sk_max_ack_backlog
灶壶。
注意這個sk_max_ack_backlog
數(shù)值就是accept的緩存區(qū)大小肝断。
5.2.fastopen_init_queue
static inline int fastopen_init_queue(struct sock *sk, int backlog)
{
struct request_sock_queue *queue =
&inet_csk(sk)->icsk_accept_queue;
if (queue->fastopenq == NULL) {
queue->fastopenq = kzalloc(
sizeof(struct fastopen_queue),
sk->sk_allocation);
if (queue->fastopenq == NULL)
return -ENOMEM;
sk->sk_destruct = tcp_sock_destruct;
spin_lock_init(&queue->fastopenq->lock);
}
queue->fastopenq->max_qlen = backlog;
return 0;
}
這里面有一個十分重要的字段icsk_accept_queue
.這個對象是一個承載來自客戶端的請求數(shù)據(jù)鏈表結(jié)構(gòu)體。
而在這個方法實際上初始化了fastopen_queue
驰凛。這是TFO(TCP Fast Open)核心結(jié)構(gòu)體.這個技術(shù)實際上在很早之前就植入到內(nèi)核中了胸懈。
它本質(zhì)上在三次握手的階段,客戶端和服務端可以在cookie校驗成功后互相通信一些數(shù)據(jù)恰响。三次握手中第一步生成校驗cookie趣钱。而后在SYN回包中就可以帶上一些數(shù)據(jù)。
至于這么做的原因是Google在2011年的時候胚宦,發(fā)現(xiàn)重新鏈接的場景比較多首有,且是耗時的一個原因。會大致上耗費多一個RTT枢劝。因此做了這個優(yōu)化并放入到2.6.3的內(nèi)核版本中绞灼。
詳細的可以閱讀這篇文章http://www.vants.org/?post=210
服務端監(jiān)聽socket的核心結(jié)構(gòu)體
來看看整個結(jié)構(gòu)體的構(gòu)成:
struct request_sock {
struct sock_common __req_common;
struct request_sock *dl_next;
u16 mss;
u8 num_retrans; /* number of retransmits */
u8 cookie_ts:1; /* syncookie: encode tcpopts in timestamp */
u8 num_timeout:7; /* number of timeouts */
/* The following two fields can be easily recomputed I think -AK */
u32 window_clamp; /* window clamp at creation time */
u32 rcv_wnd; /* rcv_wnd offered first time */
u32 ts_recent;
unsigned long expires;
const struct request_sock_ops *rsk_ops;
struct sock *sk;
u32 secid;
u32 peer_secid;
};
/** struct listen_sock - listen state
*
* @max_qlen_log - log_2 of maximal queued SYNs/REQUESTs
*/
struct listen_sock {
u8 max_qlen_log;
u8 synflood_warned;
/* 2 bytes hole, try to use */
int qlen;
int qlen_young;
int clock_hand;
u32 hash_rnd;
u32 nr_table_entries;
struct request_sock *syn_table[0];
};
struct fastopen_queue {
struct request_sock *rskq_rst_head; /* Keep track of past TFO */
struct request_sock *rskq_rst_tail; /* requests that caused RST.
* This is part of the defense
* against spoofing attack.
*/
spinlock_t lock;
int qlen; /* # of pending (TCP_SYN_RECV) reqs */
int max_qlen; /* != 0 iff TFO is currently enabled */
};
struct request_sock_queue {
struct request_sock *rskq_accept_head;
struct request_sock *rskq_accept_tail;
rwlock_t syn_wait_lock;
u8 rskq_defer_accept;
/* 3 bytes hole, try to pack */
struct listen_sock *listen_opt;
struct fastopen_queue *fastopenq; /* This is non-NULL iff TFO has been
* enabled on this listener. Check
* max_qlen != 0 in fastopen_queue
* to determine if TFO is enabled
* right at this moment.
*/
};
request_sock_queue
首先來看看最外層的核心結(jié)構(gòu)體request_sock_queue
的構(gòu)成
- rskq_accept_head accept服務端接受客戶端請求的鏈表隊列頭
- rskq_accept_tail accept服務端接受客戶端請求的鏈表隊列尾巴
- syn_wait_lock 一個讀寫鎖,由于保護客戶端請求鏈表的鏈表頭的寫入和變化
- listen_opt 記錄監(jiān)聽狀態(tài)
- fastopenq TFO 接受隊列
request_sock
對于每一個請求來說呈野,當客戶端發(fā)送請求低矮,通過網(wǎng)卡進入內(nèi)核后,都會變成一個個request_sock
緩存在隊列中被冒。等待服務端的消費處理军掂。
__req_common
sock_common結(jié)構(gòu)體 在socket中最常用的結(jié)構(gòu)體轮蜕。sock_common這個結(jié)構(gòu)體也存在在sock
結(jié)構(gòu)體中。存儲著所有socket系統(tǒng)調(diào)用中常用的數(shù)據(jù)蝗锥。比如socket使用的協(xié)議跃洛,接收端的地址和端口,發(fā)送端的地址等等终议。只是在之前看不到是因為通過define 宏定義定義了快速訪問sock_common
的方式汇竭。在這里意味著服務端接收到從哪個客戶端的請求。dl_next
鏈表鏈接的下一個request_sock
mss
最長報文段num_retrans
重傳次數(shù)num_timeout
超時次數(shù)request_sock_ops
來自客戶端請求所對應的操作方法sk
對應客戶端請求的socket結(jié)構(gòu)體
在這里之所以會對應一個socket結(jié)構(gòu)體穴张,是因為在下面accept系統(tǒng)調(diào)用的時候會多創(chuàng)建一個新的socket细燎。
fastopen_queue
能看到fastopen_queue
這個結(jié)構(gòu)體和request_sock_queue
十分相似。因為他的定位就是在三次握手過程中傳遞數(shù)據(jù)皂甘。
inet_csk_listen_start
文件:/net/ipv4/inet_connection_sock.c
int inet_csk_listen_start(struct sock *sk, const int nr_table_entries)
{
struct inet_sock *inet = inet_sk(sk);
struct inet_connection_sock *icsk = inet_csk(sk);
int rc = reqsk_queue_alloc(&icsk->icsk_accept_queue, nr_table_entries);
if (rc != 0)
return rc;
sk->sk_max_ack_backlog = 0;
sk->sk_ack_backlog = 0;
inet_csk_delack_init(sk);
sk->sk_state = TCP_LISTEN;
if (!sk->sk_prot->get_port(sk, inet->inet_num)) {
inet->inet_sport = htons(inet->inet_num);
sk_dst_reset(sk);
sk->sk_prot->hash(sk);
return 0;
}
sk->sk_state = TCP_CLOSE;
__reqsk_queue_destroy(&icsk->icsk_accept_queue);
return -EADDRINUSE;
}
EXPORT_SYMBOL_GPL(inet_csk_listen_start);
把
sock
結(jié)構(gòu)體轉(zhuǎn)化成inet_connection_sock
,并初始化icsk_accept_queue
玻驻。注意在函數(shù)reqsk_queue_alloc
申請內(nèi)存的過程中,會對listen_sock
進行單獨的計算偿枕。因為其中有一個特殊的字段struct request_sock *syn_table[0];
一個數(shù)組的第一項璧瞬。因此此時會根據(jù)backlog
大小決定這個數(shù)組指針的所指向的數(shù)組長度。將
sock
的state(狀態(tài)) 設(shè)置為TCP_LISTEN
記錄當前服務端設(shè)置的源端口
ServerSocket accept
public Socket accept() throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!isBound())
throw new SocketException("Socket is not bound yet");
Socket s = new Socket((SocketImpl) null);
implAccept(s);
return s;
}
protected final void implAccept(Socket s) throws IOException {
SocketImpl si = null;
try {
if (s.impl == null)
s.setImpl();
else {
s.impl.reset();
}
si = s.impl;
s.impl = null;
si.address = new InetAddress();
si.fd = new FileDescriptor();
getImpl().accept(si);
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkAccept(si.getInetAddress().getHostAddress(),
si.getPort());
}
} catch (IOException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
} catch (SecurityException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
}
s.impl = si;
s.postAccept();
}
核心就是兩個方法:
- 1.調(diào)用SocketImpl的accept方法
- 2.調(diào)用SocketImpl的postAccept
而SocketImpl是AbstractPlainSocketImpl派生類渐夸,而accept的方法是隸屬SocketImpl
AbstractPlainSocketImpl accept
/libcore/ojluni/src/main/java/java/net/AbstractPlainSocketImpl.java
/**
* Accepts connections.
* @param s the connection
*/
protected void accept(SocketImpl s) throws IOException {
acquireFD();
try {
// Android-added: BlockGuard
BlockGuard.getThreadPolicy().onNetwork();
socketAccept(s);
} finally {
releaseFD();
}
}
socketAccept 方法則是由PlainSocketImpl 實現(xiàn):
/libcore/ojluni/src/main/java/java/net/PlainSocketImpl.java
void socketAccept(SocketImpl s) throws IOException {
if (fd == null || !fd.valid()) {
throw new SocketException("Socket closed");
}
// poll() with a timeout of 0 means "poll for zero millis", but a Socket timeout == 0 means
// "wait forever". When timeout == 0 we pass -1 to poll.
if (timeout <= 0) {
IoBridge.poll(fd, POLLIN | POLLERR, -1);
} else {
IoBridge.poll(fd, POLLIN | POLLERR, timeout);
}
InetSocketAddress peerAddress = new InetSocketAddress();
try {
FileDescriptor newfd = Libcore.os.accept(fd, peerAddress);
s.fd.setInt$(newfd.getInt$());
s.address = peerAddress.getAddress();
s.port = peerAddress.getPort();
} catch (ErrnoException errnoException) {
if (errnoException.errno == EAGAIN) {
throw new SocketTimeoutException(errnoException);
} else if (errnoException.errno == EINVAL || errnoException.errno == EBADF) {
throw new SocketException("Socket closed");
}
errnoException.rethrowAsSocketException();
}
s.localport = IoBridge.getLocalInetSocketAddress(s.fd).getPort();
}
這里面的實現(xiàn)核心如下分為幾點
- 1.先通過
IoBridge.poll
調(diào)用poll系統(tǒng)調(diào)用嗤锉,阻塞等待socket所對應的fd句柄。 - 2.當阻塞放開了墓塌,就通過端口號和地址調(diào)用
Libcore.os.accept
方法獲取全新的FileDescriptor 句柄對象档冬。 - 3.然后就可以通過獲取socket的IO流讀取數(shù)據(jù)的到來。
IoBridge.poll
這個方法最終回調(diào)用到Libcore.os.poll
方法桃纯。而這個方法最終調(diào)用到一個native方法:
文件:/libcore/luni/src/main/native/libcore_io_Linux.cpp
static jint Linux_poll(JNIEnv* env, jobject, jobjectArray javaStructs, jint timeoutMs) {
static jfieldID fdFid = env->GetFieldID(JniConstants::structPollfdClass, "fd", "Ljava/io/FileDescriptor;");
static jfieldID eventsFid = env->GetFieldID(JniConstants::structPollfdClass, "events", "S");
static jfieldID reventsFid = env->GetFieldID(JniConstants::structPollfdClass, "revents", "S");
// Turn the Java android.system.StructPollfd[] into a C++ struct pollfd[].
size_t arrayLength = env->GetArrayLength(javaStructs);
std::unique_ptr<struct pollfd[]> fds(new struct pollfd[arrayLength]);
memset(fds.get(), 0, sizeof(struct pollfd) * arrayLength);
size_t count = 0; // Some trailing array elements may be irrelevant. (See below.)
for (size_t i = 0; i < arrayLength; ++i) {
ScopedLocalRef<jobject> javaStruct(env, env->GetObjectArrayElement(javaStructs, i));
if (javaStruct.get() == NULL) {
break; // We allow trailing nulls in the array for caller convenience.
}
ScopedLocalRef<jobject> javaFd(env, env->GetObjectField(javaStruct.get(), fdFid));
if (javaFd.get() == NULL) {
break; // We also allow callers to just clear the fd field (this is what Selector does).
}
fds[count].fd = jniGetFDFromFileDescriptor(env, javaFd.get());
fds[count].events = env->GetShortField(javaStruct.get(), eventsFid);
++count;
}
std::vector<AsynchronousCloseMonitor*> monitors;
for (size_t i = 0; i < count; ++i) {
monitors.push_back(new AsynchronousCloseMonitor(fds[i].fd));
}
int rc;
while (true) {
timespec before;
clock_gettime(CLOCK_MONOTONIC, &before);
rc = poll(fds.get(), count, timeoutMs);
if (rc >= 0 || errno != EINTR) {
break;
}
// We got EINTR. Work out how much of the original timeout is still left.
if (timeoutMs > 0) {
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
timespec diff;
diff.tv_sec = now.tv_sec - before.tv_sec;
diff.tv_nsec = now.tv_nsec - before.tv_nsec;
if (diff.tv_nsec < 0) {
--diff.tv_sec;
diff.tv_nsec += 1000000000;
}
jint diffMs = diff.tv_sec * 1000 + diff.tv_nsec / 1000000;
if (diffMs >= timeoutMs) {
rc = 0; // We have less than 1ms left anyway, so just time out.
break;
}
timeoutMs -= diffMs;
}
}
for (size_t i = 0; i < monitors.size(); ++i) {
delete monitors[i];
}
if (rc == -1) {
throwErrnoException(env, "poll");
return -1;
}
// Update the revents fields in the Java android.system.StructPollfd[].
for (size_t i = 0; i < count; ++i) {
ScopedLocalRef<jobject> javaStruct(env, env->GetObjectArrayElement(javaStructs, i));
if (javaStruct.get() == NULL) {
return -1;
}
env->SetShortField(javaStruct.get(), reventsFid, fds[i].revents);
}
return rc;
}
這里面的邏輯很簡單:
在一個死循環(huán)中被系統(tǒng)調(diào)用poll阻塞起來酷誓,而這個過程中會阻塞參數(shù)timeoutMs的時間。如果poll阻塞提前結(jié)束态坦,但是阻塞的結(jié)果不是正常結(jié)束還會在這個循環(huán)中繼續(xù)阻塞起來盐数。
Libcore.os.accept
static jobject Linux_accept(JNIEnv* env, jobject, jobject javaFd, jobject javaSocketAddress) {
sockaddr_storage ss;
socklen_t sl = sizeof(ss);
memset(&ss, 0, sizeof(ss));
sockaddr* peer = (javaSocketAddress != NULL) ? reinterpret_cast<sockaddr*>(&ss) : NULL;
socklen_t* peerLength = (javaSocketAddress != NULL) ? &sl : 0;
jint clientFd = NET_FAILURE_RETRY(env, int, accept, javaFd, peer, peerLength);
if (clientFd == -1 || !fillSocketAddress(env, javaSocketAddress, ss, *peerLength)) {
close(clientFd);
return NULL;
}
return (clientFd != -1) ? (env, clientFd) : NULL;
}
#define NET_FAILURE_RETRY(jni_env, return_type, syscall_name, java_fd, ...) ({ \
return_type _rc = -1; \
int _syscallErrno; \
do { \
bool _wasSignaled; \
{ \
int _fd = jniGetFDFromFileDescriptor(jni_env, java_fd); \
AsynchronousCloseMonitor _monitor(_fd); \
_rc = syscall_name(_fd, __VA_ARGS__); \
_syscallErrno = errno; \
_wasSignaled = _monitor.wasSignaled(); \
} \
if (_wasSignaled) { \
jniThrowException(jni_env, "java/net/SocketException", "Socket closed"); \
_rc = -1; \
break; \
} \
if (_rc == -1 && _syscallErrno != EINTR) { \
/* TODO: with a format string we could show the arguments too, like strace(1). */ \
throwErrnoException(jni_env, # syscall_name); \
break; \
} \
} while (_rc == -1); /* _syscallErrno == EINTR && !_wasSignaled */ \
if (_rc == -1) { \
/* If the syscall failed, re-set errno: throwing an exception might have modified it. */ \
errno = _syscallErrno; \
} \
_rc; })
核心就是這個define聲明的宏調(diào)用了accept系統(tǒng)調(diào)用。關(guān)于這個宏為什么使用do while方式包裹之前的文章已經(jīng)聊過了伞梯。
accept系統(tǒng)調(diào)用
SYSCALL_DEFINE4(accept4, int, fd, struct sockaddr __user *, upeer_sockaddr,
int __user *, upeer_addrlen, int, flags)
{
struct socket *sock, *newsock;
struct file *newfile;
int err, len, newfd, fput_needed;
struct sockaddr_storage address;
if (flags & ~(SOCK_CLOEXEC | SOCK_NONBLOCK))
return -EINVAL;
if (SOCK_NONBLOCK != O_NONBLOCK && (flags & SOCK_NONBLOCK))
flags = (flags & ~SOCK_NONBLOCK) | O_NONBLOCK;
sock = sockfd_lookup_light(fd, &err, &fput_needed);
...
err = -ENFILE;
newsock = sock_alloc();
...
newsock->type = sock->type;
newsock->ops = sock->ops;
__module_get(newsock->ops->owner);
newfd = get_unused_fd_flags(flags);
...
newfile = sock_alloc_file(newsock, flags, sock->sk->sk_prot_creator->name);
if (unlikely(IS_ERR(newfile))) {
err = PTR_ERR(newfile);
put_unused_fd(newfd);
sock_release(newsock);
goto out_put;
}
err = security_socket_accept(sock, newsock);
if (err)
goto out_fd;
err = sock->ops->accept(sock, newsock, sock->file->f_flags);
if (err < 0)
goto out_fd;
if (upeer_sockaddr) {
if (newsock->ops->getname(newsock, (struct sockaddr *)&address,
&len, 2) < 0) {
err = -ECONNABORTED;
goto out_fd;
}
err = move_addr_to_user(&address,
len, upeer_sockaddr, upeer_addrlen);
if (err < 0)
goto out_fd;
}
/* File flags are not inherited via accept() unlike another OSes. */
fd_install(newfd, newfile);
err = newfd;
...
}
首先能看到整個accpet系統(tǒng)調(diào)用中玫氢,實際上可以存在兩個socket,兩個socket所對應的fd谜诫。
1.首先先根據(jù)fd獲取一開始通過new Socket聲明的socket結(jié)構(gòu)體漾峡,并校驗合法性
2.然后會創(chuàng)建一個全新的socket結(jié)構(gòu)體以及一個全新的fd句柄。并拷貝舊的socket的類型喻旷,以及協(xié)議操作符號生逸。此時·如果是ipV4協(xié)議,那么就是inet4_stream_ops
3.接著把兩個socket經(jīng)過accpet所對應的安全策略校驗
4.調(diào)用socket的ops的accept,也就是inet_stream_ops所對應的accept所指向的方法指針槽袄。把剛剛生成的新socket結(jié)構(gòu)體作為參數(shù)傳入
5.把新的文件句柄和新的socket聯(lián)系起來烙无。
inet_accept
int inet_accept(struct socket *sock, struct socket *newsock, int flags)
{
struct sock *sk1 = sock->sk;
int err = -EINVAL;
struct sock *sk2 = sk1->sk_prot->accept(sk1, flags, &err);
if (!sk2)
goto do_err;
lock_sock(sk2);
sock_rps_record_flow(sk2);
WARN_ON(!((1 << sk2->sk_state) &
(TCPF_ESTABLISHED | TCPF_SYN_RECV |
TCPF_CLOSE_WAIT | TCPF_CLOSE)));
sock_graft(sk2, newsock);
newsock->state = SS_CONNECTED;
err = 0;
release_sock(sk2);
do_err:
return err;
}
EXPORT_SYMBOL(inet_accept);
注意這里的sk1->sk_prot
則是指向了proto
結(jié)構(gòu)體。在IPV4中就是指tcp_prot
.因此這個accept方法指針就是指inet_csk_accept
方法遍尺。
inet_csk_accept
文件:/net/ipv4/inet_connection_sock.c
struct sock *inet_csk_accept(struct sock *sk, int flags, int *err)
{
struct inet_connection_sock *icsk = inet_csk(sk);
struct request_sock_queue *queue = &icsk->icsk_accept_queue;
struct sock *newsk;
struct request_sock *req;
int error;
lock_sock(sk);
/* We need to make sure that this socket is listening,
* and that it has something pending.
*/
error = -EINVAL;
if (sk->sk_state != TCP_LISTEN)
goto out_err;
/* Find already established connection */
if (reqsk_queue_empty(queue)) {
long timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
/* If this is a non blocking socket don't sleep */
error = -EAGAIN;
if (!timeo)
goto out_err;
error = inet_csk_wait_for_connect(sk, timeo);
if (error)
goto out_err;
}
req = reqsk_queue_remove(queue);
newsk = req->sk;
sk_acceptq_removed(sk);
if (sk->sk_protocol == IPPROTO_TCP && queue->fastopenq != NULL) {
spin_lock_bh(&queue->fastopenq->lock);
if (tcp_rsk(req)->listener) {
/* We are still waiting for the final ACK from 3WHS
* so can't free req now. Instead, we set req->sk to
* NULL to signify that the child socket is taken
* so reqsk_fastopen_remove() will free the req
* when 3WHS finishes (or is aborted).
*/
req->sk = NULL;
req = NULL;
}
spin_unlock_bh(&queue->fastopenq->lock);
}
out:
release_sock(sk);
if (req)
__reqsk_free(req);
return newsk;
out_err:
newsk = NULL;
req = NULL;
*err = error;
goto out;
}
EXPORT_SYMBOL(inet_csk_accept);
首先保證在調(diào)用下面的核心邏輯之前截酷,會判斷當前sock結(jié)構(gòu)體的狀態(tài)必須是
LISTEN
,否則執(zhí)行失敗通過
reqsk_queue_empty
判斷icsk_accept_queue
是否為空隊列(通過判斷鏈表頭是否為空)乾戏。如果為空迂苛,此時會判斷是否設(shè)置了O_NONBLOCK
,沒有設(shè)置則通過inet_csk_wait_for_connect
進行阻塞直到超時;沒有設(shè)置則直接返回如果不為空,則通過
reqsk_queue_remove
鼓择,取出頭部rskq_accept_head
字段三幻,并用原鏈表頭部的下一項設(shè)置為頭部。sk_acceptq_removed
減小sk_ack_backlog
的值惯退。判斷此時是TCP協(xié)議赌髓,并且
fastopenq
不為空从藤,并獲取監(jiān)聽器如果存在催跪。說明此時還在等待TFO的最后ACK 來獲取在握手期間所有的數(shù)據(jù)。此時就需要通過設(shè)置request_sock
為空代表當前套子節(jié)被占用不允許被釋放夷野。
整個代碼的核心還是inet_csk_wait_for_connect
這個方法懊蒸。
inet_csk_wait_for_connect
static int inet_csk_wait_for_connect(struct sock *sk, long timeo)
{
struct inet_connection_sock *icsk = inet_csk(sk);
DEFINE_WAIT(wait);
int err;
for (;;) {
prepare_to_wait_exclusive(sk_sleep(sk), &wait,
TASK_INTERRUPTIBLE);
release_sock(sk);
if (reqsk_queue_empty(&icsk->icsk_accept_queue))
timeo = schedule_timeout(timeo);
lock_sock(sk);
err = 0;
if (!reqsk_queue_empty(&icsk->icsk_accept_queue))
break;
err = -EINVAL;
if (sk->sk_state != TCP_LISTEN)
break;
err = sock_intr_errno(timeo);
if (signal_pending(current))
break;
err = -EAGAIN;
if (!timeo)
break;
}
finish_wait(sk_sleep(sk), &wait);
return err;
}
能看到這個過程實際上就是通過一個死循環(huán)阻塞整個accpet的執(zhí)行流程。注意在這個過程中悯搔,將不會對已經(jīng)建立連接的繼續(xù)使用poll等方式阻塞骑丸。而是先通過prepare_to_wait_exclusive
把當前進程設(shè)置為 TASK_INTERRUPTIBLE
狀態(tài)并把當前進程加入到等待隊列中。
如果此時為icsk_accept_queue
隊列為空妒貌,則會通過schedule_timeout
不斷的讓渡CPU給其他進程直到超時通危。
當喚醒CPU后,就會再度檢測icsk_accept_queue
是否為空灌曙,不為空則返回菊碟,并且檢測當前的sock的狀態(tài)如果已經(jīng)不是Listen監(jiān)聽狀態(tài)也會直接返回。通過signal_pending
檢測是否有需要處理的信號·在刺。
到這里就是完成了accpet的邏輯了逆害。但是三次握手的邏輯呢?這部分的邏輯實際是由connect系統(tǒng)調(diào)用實現(xiàn)的蚣驼。
小結(jié)
從這里能看到整個socket的接口api設(shè)計規(guī)范魄幕,實際上還是很整齊的,在整個socket結(jié)構(gòu)體中可以分為兩大類操作結(jié)構(gòu)體:
proto_ops 結(jié)構(gòu)體颖杏。以
inet_stream_ops
和inet6_stream_ops
為代表纯陨,這種方法結(jié)構(gòu)體在Socket初始化時機就可以通過協(xié)議族(family)得到對應的操作結(jié)構(gòu)。而這些操作都會和Socket的操作一一對應。當Socket執(zhí)行時候队丝,并必定先調(diào)用到這個方法結(jié)構(gòu)體中對應的方法指針靡馁。sk_prot 結(jié)構(gòu)體。這個就是對應具體的傳輸層協(xié)議類型机久。如IPV4下的TCP臭墨,IPV4下的UDP等協(xié)議。就會對應上不同的協(xié)議結(jié)構(gòu)體膘盖,比如TCP就是對應
tcp_prot
結(jié)構(gòu)體胧弛。而這個結(jié)構(gòu)體的執(zhí)行都是等到proto_ops
對應的操作完成后,就可能會執(zhí)行侠畔。
熟悉這套流程后结缚,以后閱讀源碼就能直接找到對應的方法。
對于服務端來說软棺,除了創(chuàng)建一個socket红竭,執(zhí)行如下步驟:
- bind
- listen
- accept
bind
服務端每次一次啟動socket服務,都需要綁定一個端口喘落。當然這個端口可以在java層設(shè)置為0茵宪,前提是你要在socket初始化的時候設(shè)置好端口port號。這樣才能在socket關(guān)聯(lián)的fd句柄中找到端口號并在bind方法中保存下來瘦棋。
而這個過程中涉及了2個比較核心的數(shù)據(jù)結(jié)構(gòu):
- inet_sock
- sockaddr_in
struct inet_sock {
/* sk and pinet6 has to be the first two members of inet_sock */
struct sock sk;
#if IS_ENABLED(CONFIG_IPV6)
struct ipv6_pinfo *pinet6; //ipv6 的信息
#endif
/* Socket demultiplex comparisons on incoming packets. */
#define inet_daddr sk.__sk_common.skc_daddr //外部ip地址
#define inet_rcv_saddr sk.__sk_common.skc_rcv_saddr //綁定的本地ip地址
#define inet_dport sk.__sk_common.skc_dport //目標地址端口
#define inet_num sk.__sk_common.skc_num // 本地綁定的端口
__be32 inet_saddr; // 發(fā)送的源地址
__s16 uc_ttl; //單播允許存活時間
__u16 cmsg_flags;
__be16 inet_sport; // 來源端口
__u16 inet_id; // DF packages id
struct ip_options_rcu __rcu *inet_opt;
int rx_dst_ifindex;
__u8 tos; //TOS 4 bit的TOS分別代表:最小時延稀火、最大吞吐量、最高可靠性和最小費用
__u8 min_ttl;
__u8 mc_ttl; //多播允許存活時間
__u8 pmtudisc;
__u8 recverr:1,
is_icsk:1, //是否是inet_connection_sock
freebind:1,
hdrincl:1,
mc_loop:1,
transparent:1,
mc_all:1,
nodefrag:1;
__u8 rcv_tos;
int uc_index; //單播設(shè)備id
int mc_index;//多播設(shè)備id
__be32 mc_addr;
struct ip_mc_socklist __rcu *mc_list;
struct inet_cork_full cork;
};
熟悉這個數(shù)據(jù)結(jié)構(gòu)后來看看赌朋,整個bind的核心事件:
從sockaddr
的sin_port
獲取·到端口號凰狞,sk->sk_prot->get_port
把端口snum
設(shè)置到inet_sock
的inet_num
中,并校驗當前的端口號是否小于1024
,因為只有超級用戶才能使用小于1024的端口號沛慢,如果發(fā)現(xiàn)非法則綁定失敗赡若。最后綁定到inet_dport
中。
listen
這個過程做了三件事情:
- 初始化
fastopen_queue
TFO消息隊列 - 初始化
request_sock_queue
接受回包的消息隊列 - 將socket耽狀態(tài)設(shè)置為LISTEN
accept
- 判斷是否處于LISTEN狀態(tài)团甲,只有LISTEN狀態(tài)才能進行accept
-
reqsk_queue_empty
判斷icsk_accept_queue
是否為空逾冬,為空則進行阻塞。不為空則返回鏈表頭部伐庭。本質(zhì)上就是一個消費者-生產(chǎn)者模型
到這里bind粉渠,listen,accept的準備工作完成了圾另,就等待客戶端connect進行三次握手聯(lián)通服務器霸株。