epoll 是在2.5.44內(nèi)核中被引進(jìn)的(epoll(4) is a new API introduced in Linux kernel 2.5.44)惰帽,它幾乎具備眾多優(yōu)點(diǎn),被公認(rèn)為L(zhǎng)inux2.6下性能最好的多路I/O就緒通知方法父虑。
epoll的優(yōu)點(diǎn):
1.支持一個(gè)進(jìn)程打開大數(shù)目的socket描述符(FD)
select 最不能忍受的是一個(gè)進(jìn)程所打開的FD是有一定限制的该酗,由FD_SETSIZE設(shè)置,默認(rèn)值是2048士嚎。對(duì)于那些需要支持的上萬連接數(shù)目的IM服務(wù)器來說顯然太少了呜魄。這時(shí)候你一是可以選擇修改這個(gè)宏然后重新編譯內(nèi)核,不過資料也同時(shí)指出這樣會(huì)帶來網(wǎng)絡(luò)效率的下降莱衩,二是可以選擇多進(jìn)程的解決方案(傳統(tǒng)的 Apache方案)爵嗅,不過雖然linux上面創(chuàng)建進(jìn)程的代價(jià)比較小,但仍舊是不可忽視的笨蚁,加上進(jìn)程間數(shù)據(jù)同步遠(yuǎn)比不上線程間同步的高效睹晒,所以也不是一種完美的方案。不過 epoll則沒有這個(gè)限制括细,它所支持的FD上限是最大可以打開文件的數(shù)目伪很,這個(gè)數(shù)字一般遠(yuǎn)大于2048,舉個(gè)例子,在1GB內(nèi)存的機(jī)器上大約是10萬左右,具體數(shù)目可以cat /proc/sys/fs/file-max察看,一般來說這個(gè)數(shù)目和系統(tǒng)內(nèi)存關(guān)系很大奋单。
2.IO效率不隨FD數(shù)目增加而線性下降
傳統(tǒng)的select/poll另一個(gè)致命弱點(diǎn)就是當(dāng)你擁有一個(gè)很大的socket集合锉试,不過由于網(wǎng)絡(luò)延時(shí),任一時(shí)間只有部分的socket是"活躍"的辱匿,但是select/poll每次調(diào)用都會(huì)線性掃描全部的集合键痛,導(dǎo)致效率呈現(xiàn)線性下降炫彩。但是epoll不存在這個(gè)問題,它只會(huì)對(duì)"活躍"的socket進(jìn)行操作---這是因?yàn)樵趦?nèi)核實(shí)現(xiàn)中epoll是根據(jù)每個(gè)fd上面的callback函數(shù)實(shí)現(xiàn)的絮短。那么江兢,只有"活躍"的socket才會(huì)主動(dòng)的去調(diào)用 callback函數(shù),其他idle狀態(tài)socket則不會(huì)丁频,在這點(diǎn)上杉允,epoll實(shí)現(xiàn)了一個(gè)"偽"AIO,因?yàn)檫@時(shí)候推動(dòng)力在os內(nèi)核席里。在一些 benchmark中叔磷,如果所有的socket基本上都是活躍的---比如一個(gè)高速LAN環(huán)境,epoll并不比select/poll有什么效率奖磁,相反改基,如果過多使用epoll_ctl,效率相比還有稍微的下降。但是一旦使用idle connections模擬WAN環(huán)境,epoll的效率就遠(yuǎn)在select/poll之上了咖为。
3.使用mmap加速內(nèi)核與用戶空間的消息傳遞
這點(diǎn)實(shí)際上涉及到epoll的具體實(shí)現(xiàn)了秕狰。無論是select,poll還是epoll都需要內(nèi)核把FD消息通知給用戶空間,如何避免不必要的內(nèi)存拷貝就很重要躁染,在這點(diǎn)上鸣哀,epoll是通過內(nèi)核于用戶空間mmap同一塊內(nèi)存實(shí)現(xiàn)的。而如果你想我一樣從2.5內(nèi)核就關(guān)注epoll的話吞彤,一定不會(huì)忘記手工 mmap這一步的我衬。
4.內(nèi)核微調(diào)
這一點(diǎn)其實(shí)不算epoll的優(yōu)點(diǎn)了,而是整個(gè)linux平臺(tái)的優(yōu)點(diǎn)饰恕。也許你可以懷疑linux平臺(tái)挠羔,但是你無法回避linux平臺(tái)賦予你微調(diào)內(nèi)核的能力。比如懂盐,內(nèi)核TCP/IP協(xié)議棧使用內(nèi)存池管理sk_buff結(jié)構(gòu)褥赊,那么可以在運(yùn)行時(shí)期動(dòng)態(tài)調(diào)整這個(gè)內(nèi)存pool(skb_head_pool)的大小--- 通過echo XXXX>/proc/sys/net/core/hot_list_length完成。再比如listen函數(shù)的第2個(gè)參數(shù)(TCP完成3次握手的數(shù)據(jù)包隊(duì)列長(zhǎng)度)莉恼,也可以根據(jù)你平臺(tái)內(nèi)存大小動(dòng)態(tài)調(diào)整拌喉。更甚至在一個(gè)數(shù)據(jù)包面數(shù)目巨大但同時(shí)每個(gè)數(shù)據(jù)包本身大小卻很小的特殊系統(tǒng)上嘗試最新的NAPI網(wǎng)卡驅(qū)動(dòng)架構(gòu)。
linux下epoll如何實(shí)現(xiàn)高效處理百萬句柄的
開發(fā)高性能網(wǎng)絡(luò)程序時(shí)俐银,windows開發(fā)者們言必稱iocp尿背,linux開發(fā)者們則言必稱epoll。大家都明白epoll是一種IO多路復(fù)用技術(shù)捶惜,可以非常高效的處理數(shù)以百萬計(jì)的socket句柄田藐,比起以前的select和poll效率高大發(fā)了。我們用起epoll來都感覺挺爽,確實(shí)快汽久,那么鹤竭,它到底為什么可以高速處理這么多并發(fā)連接呢?
使用起來很清晰景醇,首先要調(diào)用epoll_create建立一個(gè)epoll對(duì)象臀稚。參數(shù)size是內(nèi)核保證能夠正確處理的最大句柄數(shù),多于這個(gè)最大數(shù)時(shí)內(nèi)核可不保證效果三痰。epoll_ctl可以操作上面建立的epoll吧寺,例如,將剛建立的socket加入到epoll中讓其監(jiān)控散劫,或者把 epoll正在監(jiān)控的某個(gè)socket句柄移出epoll稚机,不再監(jiān)控它等等。epoll_wait在調(diào)用時(shí)获搏,在給定的timeout時(shí)間內(nèi)赖条,當(dāng)在監(jiān)控的所有句柄中有事件發(fā)生時(shí),就返回用戶態(tài)的進(jìn)程常熙。從上面的調(diào)用方式就可以看到epoll比select/poll的優(yōu)越之處:因?yàn)楹笳呙看握{(diào)用時(shí)都要傳遞你所要監(jiān)控的所有socket給select/poll系統(tǒng)調(diào)用谋币,這意味著需要將用戶態(tài)的socket列表copy到內(nèi)核態(tài),如果以萬計(jì)的句柄會(huì)導(dǎo)致每次都要copy幾十幾百KB的內(nèi)存到內(nèi)核態(tài)症概,非常低效。而我們調(diào)用epoll_wait時(shí)就相當(dāng)于以往調(diào)用select/poll早芭,但是這時(shí)卻不用傳遞socket句柄給內(nèi)核彼城,因?yàn)閮?nèi)核已經(jīng)在epoll_ctl中拿到了要監(jiān)控的句柄列表。所以退个,實(shí)際上在你調(diào)用epoll_create后募壕,內(nèi)核就已經(jīng)在內(nèi)核態(tài)開始準(zhǔn)備幫你存儲(chǔ)要監(jiān)控的句柄了,每次調(diào)用epoll_ctl只是在往內(nèi)核的數(shù)據(jù)結(jié)構(gòu)里塞入新的socket句柄语盈。當(dāng)一個(gè)進(jìn)程調(diào)用epoll_creaqte方法時(shí)舱馅,Linux內(nèi)核會(huì)創(chuàng)建一個(gè)eventpoll結(jié)構(gòu)體,這個(gè)結(jié)構(gòu)體中有兩個(gè)成員與epoll的使用方式密切相關(guān):每一個(gè)epoll對(duì)象都有一個(gè)獨(dú)立的eventpoll結(jié)構(gòu)體刀荒,這個(gè)結(jié)構(gòu)體會(huì)在內(nèi)核空間中創(chuàng)造獨(dú)立的內(nèi)存代嗤,用于存儲(chǔ)使用epoll_ctl方法向epoll對(duì)象中添加進(jìn)來的事件。這樣缠借,重復(fù)的事件就可以通過紅黑樹而高效的識(shí)別出來干毅。
此外,epoll還維護(hù)了一個(gè)雙鏈表泼返,用戶存儲(chǔ)發(fā)生的事件硝逢。當(dāng)epoll_wait調(diào)用時(shí),僅僅觀察這個(gè)list鏈表里有沒有數(shù)據(jù)即eptime項(xiàng)即可。有數(shù)據(jù)就返回渠鸽,沒有數(shù)據(jù)就sleep叫乌,等到timeout時(shí)間到后即使鏈表沒數(shù)據(jù)也返回。所以徽缚,epoll_wait非常高效憨奸。而且,通常情況下即使我們要監(jiān)控百萬計(jì)的句柄猎拨,大多一次也只返回很少量的準(zhǔn)備就緒句柄而已膀藐,所以,epoll_wait僅需要從內(nèi)核態(tài)copy少量的句柄到用戶態(tài)而已红省,如何能不高效 !那么额各,這個(gè)準(zhǔn)備就緒list鏈表是怎么維護(hù)的呢?當(dāng)我們執(zhí)行epoll_ctl時(shí)吧恃,除了把socket放到epoll文件系統(tǒng)里file對(duì)象對(duì)應(yīng)的紅黑樹上之外虾啦,還會(huì)給內(nèi)核中斷處理程序注冊(cè)一個(gè)回調(diào)函數(shù),告訴內(nèi)核痕寓,如果這個(gè)句柄的中斷到了傲醉,就把它放到準(zhǔn)備就緒list鏈表里。所以呻率,當(dāng)一個(gè)socket上有數(shù)據(jù)到了硬毕,內(nèi)核在把網(wǎng)卡上的數(shù)據(jù)copy到內(nèi)核中后就來把socket插入到準(zhǔn)備就緒鏈表里了。如此礼仗,一顆紅黑樹吐咳,一張準(zhǔn)備就緒句柄鏈表,少量的內(nèi)核cache元践,就幫我們解決了大并發(fā)下的socket處理問題韭脊。執(zhí)行epoll_create時(shí),創(chuàng)建了紅黑樹和就緒鏈表单旁,執(zhí)行epoll_ctl時(shí)沪羔,如果增加socket句柄,則檢查在紅黑樹中是否存在象浑,存在立即返回蔫饰,不存在則添加到樹干上,然后向內(nèi)核注冊(cè)回調(diào)函數(shù)融柬,用于當(dāng)中斷事件來臨時(shí)向準(zhǔn)備就緒鏈表中插入數(shù)據(jù)死嗦。執(zhí)行epoll_wait時(shí)立刻返回準(zhǔn)備就緒鏈表里的數(shù)據(jù)即可。
Python Tcp長(zhǎng)連接 Server端代碼:
#coding:utf-8
import socket
import select
class emsc_server:
def __init__(self):
self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.serversocket.bind(('0.0.0.0', 5000))
self.serversocket.listen(1000)
self.serversocket.setblocking(0)
def run(self):
response = "接收成功粒氧,返回?cái)?shù)據(jù): connecting status: 200 \n"
response += "haody,client !"
epoll = select.epoll()
epoll.register(self.serversocket.fileno(), select.EPOLLIN)
try:
connections = {}
requests = {}
responses = {}
endflag = '\n\r\n'
while True:
events = epoll.poll(1)
for fid, event in events:
if fid == self.serversocket.fileno():
connection, address = self.serversocket.accept()
connection.setblocking(0)
epoll.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
requests[connection.fileno()] = ''
responses[connection.fileno()] = response.encode()
elif event & select.EPOLLIN:
try:
requests[fid] = connections[fid].recv(1024)
if len(str(requests[fid].decode())) == 0:
connections[fid].shutdown(socket.SHUT_RDWR)
break
else:
print("2 | ------ : " + str(requests[fid].decode()) + "\n")
byteswritten = connections[fid].send(responses[fid])
if endflag in requests[fid]:
epoll.modify(fid, select.EPOLLOUT)
connections[fid].setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1)
print('-' * 40 + '\n' + requests[fid].decode()[:-2])
except:
continue
elif event & select.EPOLLOUT:
byteswritten = connections[fid].send(responses[fid])
responses[fid] = responses[fid][byteswritten:]
if len(responses[fid]) == 0:
connections[fid].setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 0)
epoll.modify(fid, 0)
connections[fid].shutdown(socket.SHUT_RDWR)
elif event & select.EPOLLHUP:
epoll.unregister(fid)
connections[fid].close()
del connections[fid]
except:
print("server excepted ...")
epoll.unregister(self.serversocket.fileno())
self.run()
finally:
print("server closed ...")
if __name__=="__main__":
emsc = emsc_server()
emsc.run()
Python Tcp長(zhǎng)連接 Client端代碼:
#coding:utf-8
import socket
import time
class emsc_client:
def __init__(self):
self.host = "127.0.0.1"
self.port = 5000
self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def run(self):
try:
self.conn.connect((self.host, self.port))
while True:
self.conn.send(("來自客戶端發(fā)送的數(shù)據(jù) : " + str(time.time())).encode())
data = self.conn.recv(1024).decode()
print("來自服務(wù)端數(shù)據(jù) :" + data + "|" + str(time.time()))
time.sleep(0.1)
except:
print("服務(wù)器連接異常,嘗試重新連接 (10s) ...")
self.conn.close()
time.sleep(10) # 斷開連接后,每10s重新連接一次
emsc_client().run()
finally:
print("客戶端已關(guān)閉 ...")
if __name__=="__main__":
emsc = emsc_client()
emsc.run()
實(shí)現(xiàn)epoll越除,I/O復(fù)用
服務(wù)端斷開后,客戶端每10s重新連接
在windows 上
Python 采用Select 模式 實(shí)現(xiàn)Tcp 長(zhǎng)連接
#coding:utf-8
import select
import socket
import queue
import time
import os
class emsc_select_server:
def __init__(self):
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setblocking(False)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_address = ('0.0.0.0', 5000)
self.server.bind(self.server_address)
self.server.listen(1000)
self.inputs = [self.server]
self.outputs = []
self.message_queues = {}
self.timeout = 20
def run(self):
response = "接收成功,返回?cái)?shù)據(jù): connecting status: 200 \n"
response += "haody,client ! | "
while self.inputs:
print("waiting for next event")
# timeout是超時(shí)摘盆,當(dāng)前連接要是超過這個(gè)時(shí)間的話翼雀,就會(huì)kill
readable, writable, exceptional = select.select(self.inputs, self.outputs, self.inputs, self.timeout)
if not (readable or writable or exceptional):
print("Time out ! ")
break
for ser in readable:
if ser is self.server:
# 通過inputs查看是否有客戶端來
connection, client_address = ser.accept()
print("connection from ", client_address)
connection.setblocking(0)
self.inputs.append(connection)
self.message_queues[connection] = queue.Queue()
else:
data = ser.recv(1024)
if data:
print("收到數(shù)據(jù) ", data.decode(), "\n來自:", ser.getpeername())
self.message_queues[ser].put(data)
# 添加通道
if ser not in self.outputs:
self.outputs.append(ser)
else:
print("closing", client_address)
if ser in self.outputs:
self.outputs.remove(ser)
self.inputs.remove(ser)
ser.close()
# 清除隊(duì)列信息
del self.message_queues[ser]
for ser in writable:
try:
next_msg = self.message_queues[ser].get_nowait()
except queue.Empty:
print(ser.getpeername(), 'queue empty')
self.outputs.remove(ser)
else:
print("發(fā)送數(shù)據(jù) ", str(response + next_msg.decode()), " to ", ser.getpeername(),"\n")
ser.send(response.encode()+next_msg)
for ser in exceptional:
print(" exception condition on ", ser.getpeername())
# stop listening for input on the connection
self.inputs.remove(ser)
if ser in self.outputs:
self.outputs.remove(ser)
ser.close()
# 清除隊(duì)列信息
del self.message_queues[ser]
if __name__=="__main__":
select_server = emsc_select_server()
select_server.run()
客戶端:同上