Python 實(shí)現(xiàn)TCP長(zhǎng)連接朴爬,epoll扣癣、select通信模式

epoll 是在2.5.44內(nèi)核中被引進(jìn)的(epoll(4) is a new API introduced in Linux kernel 2.5.44)惰帽,它幾乎具備眾多優(yōu)點(diǎn),被公認(rèn)為L(zhǎng)inux2.6下性能最好的多路I/O就緒通知方法父虑。

epoll的優(yōu)點(diǎn):

1.支持一個(gè)進(jìn)程打開大數(shù)目的socket描述符(FD)
select 最不能忍受的是一個(gè)進(jìn)程所打開的FD是有一定限制的该酗,由FD_SETSIZE設(shè)置,默認(rèn)值是2048士嚎。對(duì)于那些需要支持的上萬連接數(shù)目的IM服務(wù)器來說顯然太少了呜魄。這時(shí)候你一是可以選擇修改這個(gè)宏然后重新編譯內(nèi)核,不過資料也同時(shí)指出這樣會(huì)帶來網(wǎng)絡(luò)效率的下降莱衩,二是可以選擇多進(jìn)程的解決方案(傳統(tǒng)的 Apache方案)爵嗅,不過雖然linux上面創(chuàng)建進(jìn)程的代價(jià)比較小,但仍舊是不可忽視的笨蚁,加上進(jìn)程間數(shù)據(jù)同步遠(yuǎn)比不上線程間同步的高效睹晒,所以也不是一種完美的方案。不過 epoll則沒有這個(gè)限制括细,它所支持的FD上限是最大可以打開文件的數(shù)目伪很,這個(gè)數(shù)字一般遠(yuǎn)大于2048,舉個(gè)例子,在1GB內(nèi)存的機(jī)器上大約是10萬左右,具體數(shù)目可以cat /proc/sys/fs/file-max察看,一般來說這個(gè)數(shù)目和系統(tǒng)內(nèi)存關(guān)系很大奋单。

2.IO效率不隨FD數(shù)目增加而線性下降
傳統(tǒng)的select/poll另一個(gè)致命弱點(diǎn)就是當(dāng)你擁有一個(gè)很大的socket集合锉试,不過由于網(wǎng)絡(luò)延時(shí),任一時(shí)間只有部分的socket是"活躍"的辱匿,但是select/poll每次調(diào)用都會(huì)線性掃描全部的集合键痛,導(dǎo)致效率呈現(xiàn)線性下降炫彩。但是epoll不存在這個(gè)問題,它只會(huì)對(duì)"活躍"的socket進(jìn)行操作---這是因?yàn)樵趦?nèi)核實(shí)現(xiàn)中epoll是根據(jù)每個(gè)fd上面的callback函數(shù)實(shí)現(xiàn)的絮短。那么江兢,只有"活躍"的socket才會(huì)主動(dòng)的去調(diào)用 callback函數(shù),其他idle狀態(tài)socket則不會(huì)丁频,在這點(diǎn)上杉允,epoll實(shí)現(xiàn)了一個(gè)"偽"AIO,因?yàn)檫@時(shí)候推動(dòng)力在os內(nèi)核席里。在一些 benchmark中叔磷,如果所有的socket基本上都是活躍的---比如一個(gè)高速LAN環(huán)境,epoll并不比select/poll有什么效率奖磁,相反改基,如果過多使用epoll_ctl,效率相比還有稍微的下降。但是一旦使用idle connections模擬WAN環(huán)境,epoll的效率就遠(yuǎn)在select/poll之上了咖为。

3.使用mmap加速內(nèi)核與用戶空間的消息傳遞
這點(diǎn)實(shí)際上涉及到epoll的具體實(shí)現(xiàn)了秕狰。無論是select,poll還是epoll都需要內(nèi)核把FD消息通知給用戶空間,如何避免不必要的內(nèi)存拷貝就很重要躁染,在這點(diǎn)上鸣哀,epoll是通過內(nèi)核于用戶空間mmap同一塊內(nèi)存實(shí)現(xiàn)的。而如果你想我一樣從2.5內(nèi)核就關(guān)注epoll的話吞彤,一定不會(huì)忘記手工 mmap這一步的我衬。

4.內(nèi)核微調(diào)

這一點(diǎn)其實(shí)不算epoll的優(yōu)點(diǎn)了,而是整個(gè)linux平臺(tái)的優(yōu)點(diǎn)饰恕。也許你可以懷疑linux平臺(tái)挠羔,但是你無法回避linux平臺(tái)賦予你微調(diào)內(nèi)核的能力。比如懂盐,內(nèi)核TCP/IP協(xié)議棧使用內(nèi)存池管理sk_buff結(jié)構(gòu)褥赊,那么可以在運(yùn)行時(shí)期動(dòng)態(tài)調(diào)整這個(gè)內(nèi)存pool(skb_head_pool)的大小--- 通過echo XXXX>/proc/sys/net/core/hot_list_length完成。再比如listen函數(shù)的第2個(gè)參數(shù)(TCP完成3次握手的數(shù)據(jù)包隊(duì)列長(zhǎng)度)莉恼,也可以根據(jù)你平臺(tái)內(nèi)存大小動(dòng)態(tài)調(diào)整拌喉。更甚至在一個(gè)數(shù)據(jù)包面數(shù)目巨大但同時(shí)每個(gè)數(shù)據(jù)包本身大小卻很小的特殊系統(tǒng)上嘗試最新的NAPI網(wǎng)卡驅(qū)動(dòng)架構(gòu)。

linux下epoll如何實(shí)現(xiàn)高效處理百萬句柄的

開發(fā)高性能網(wǎng)絡(luò)程序時(shí)俐银,windows開發(fā)者們言必稱iocp尿背,linux開發(fā)者們則言必稱epoll。大家都明白epoll是一種IO多路復(fù)用技術(shù)捶惜,可以非常高效的處理數(shù)以百萬計(jì)的socket句柄田藐,比起以前的select和poll效率高大發(fā)了。我們用起epoll來都感覺挺爽,確實(shí)快汽久,那么鹤竭,它到底為什么可以高速處理這么多并發(fā)連接呢?

使用起來很清晰景醇,首先要調(diào)用epoll_create建立一個(gè)epoll對(duì)象臀稚。參數(shù)size是內(nèi)核保證能夠正確處理的最大句柄數(shù),多于這個(gè)最大數(shù)時(shí)內(nèi)核可不保證效果三痰。epoll_ctl可以操作上面建立的epoll吧寺,例如,將剛建立的socket加入到epoll中讓其監(jiān)控散劫,或者把 epoll正在監(jiān)控的某個(gè)socket句柄移出epoll稚机,不再監(jiān)控它等等。epoll_wait在調(diào)用時(shí)获搏,在給定的timeout時(shí)間內(nèi)赖条,當(dāng)在監(jiān)控的所有句柄中有事件發(fā)生時(shí),就返回用戶態(tài)的進(jìn)程常熙。從上面的調(diào)用方式就可以看到epoll比select/poll的優(yōu)越之處:因?yàn)楹笳呙看握{(diào)用時(shí)都要傳遞你所要監(jiān)控的所有socket給select/poll系統(tǒng)調(diào)用谋币,這意味著需要將用戶態(tài)的socket列表copy到內(nèi)核態(tài),如果以萬計(jì)的句柄會(huì)導(dǎo)致每次都要copy幾十幾百KB的內(nèi)存到內(nèi)核態(tài)症概,非常低效。而我們調(diào)用epoll_wait時(shí)就相當(dāng)于以往調(diào)用select/poll早芭,但是這時(shí)卻不用傳遞socket句柄給內(nèi)核彼城,因?yàn)閮?nèi)核已經(jīng)在epoll_ctl中拿到了要監(jiān)控的句柄列表。所以退个,實(shí)際上在你調(diào)用epoll_create后募壕,內(nèi)核就已經(jīng)在內(nèi)核態(tài)開始準(zhǔn)備幫你存儲(chǔ)要監(jiān)控的句柄了,每次調(diào)用epoll_ctl只是在往內(nèi)核的數(shù)據(jù)結(jié)構(gòu)里塞入新的socket句柄语盈。當(dāng)一個(gè)進(jìn)程調(diào)用epoll_creaqte方法時(shí)舱馅,Linux內(nèi)核會(huì)創(chuàng)建一個(gè)eventpoll結(jié)構(gòu)體,這個(gè)結(jié)構(gòu)體中有兩個(gè)成員與epoll的使用方式密切相關(guān):每一個(gè)epoll對(duì)象都有一個(gè)獨(dú)立的eventpoll結(jié)構(gòu)體刀荒,這個(gè)結(jié)構(gòu)體會(huì)在內(nèi)核空間中創(chuàng)造獨(dú)立的內(nèi)存代嗤,用于存儲(chǔ)使用epoll_ctl方法向epoll對(duì)象中添加進(jìn)來的事件。這樣缠借,重復(fù)的事件就可以通過紅黑樹而高效的識(shí)別出來干毅。

此外,epoll還維護(hù)了一個(gè)雙鏈表泼返,用戶存儲(chǔ)發(fā)生的事件硝逢。當(dāng)epoll_wait調(diào)用時(shí),僅僅觀察這個(gè)list鏈表里有沒有數(shù)據(jù)即eptime項(xiàng)即可。有數(shù)據(jù)就返回渠鸽,沒有數(shù)據(jù)就sleep叫乌,等到timeout時(shí)間到后即使鏈表沒數(shù)據(jù)也返回。所以徽缚,epoll_wait非常高效憨奸。而且,通常情況下即使我們要監(jiān)控百萬計(jì)的句柄猎拨,大多一次也只返回很少量的準(zhǔn)備就緒句柄而已膀藐,所以,epoll_wait僅需要從內(nèi)核態(tài)copy少量的句柄到用戶態(tài)而已红省,如何能不高效 !那么额各,這個(gè)準(zhǔn)備就緒list鏈表是怎么維護(hù)的呢?當(dāng)我們執(zhí)行epoll_ctl時(shí)吧恃,除了把socket放到epoll文件系統(tǒng)里file對(duì)象對(duì)應(yīng)的紅黑樹上之外虾啦,還會(huì)給內(nèi)核中斷處理程序注冊(cè)一個(gè)回調(diào)函數(shù),告訴內(nèi)核痕寓,如果這個(gè)句柄的中斷到了傲醉,就把它放到準(zhǔn)備就緒list鏈表里。所以呻率,當(dāng)一個(gè)socket上有數(shù)據(jù)到了硬毕,內(nèi)核在把網(wǎng)卡上的數(shù)據(jù)copy到內(nèi)核中后就來把socket插入到準(zhǔn)備就緒鏈表里了。如此礼仗,一顆紅黑樹吐咳,一張準(zhǔn)備就緒句柄鏈表,少量的內(nèi)核cache元践,就幫我們解決了大并發(fā)下的socket處理問題韭脊。執(zhí)行epoll_create時(shí),創(chuàng)建了紅黑樹和就緒鏈表单旁,執(zhí)行epoll_ctl時(shí)沪羔,如果增加socket句柄,則檢查在紅黑樹中是否存在象浑,存在立即返回蔫饰,不存在則添加到樹干上,然后向內(nèi)核注冊(cè)回調(diào)函數(shù)融柬,用于當(dāng)中斷事件來臨時(shí)向準(zhǔn)備就緒鏈表中插入數(shù)據(jù)死嗦。執(zhí)行epoll_wait時(shí)立刻返回準(zhǔn)備就緒鏈表里的數(shù)據(jù)即可。

Python Tcp長(zhǎng)連接 Server端代碼:

#coding:utf-8
import socket
import select

class emsc_server:
    def __init__(self):
        self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.serversocket.bind(('0.0.0.0', 5000))
        self.serversocket.listen(1000)
        self.serversocket.setblocking(0)

    def run(self):
        response = "接收成功粒氧,返回?cái)?shù)據(jù): connecting status: 200 \n"
        response += "haody,client !"
        epoll = select.epoll()
        epoll.register(self.serversocket.fileno(), select.EPOLLIN)

        try:
            connections = {}
            requests = {}
            responses = {}
            endflag = '\n\r\n'

            while True:
                events = epoll.poll(1)
                for fid, event in events:
                    if fid == self.serversocket.fileno():
                        connection, address = self.serversocket.accept()
                        connection.setblocking(0)
                        epoll.register(connection.fileno(), select.EPOLLIN)
                        connections[connection.fileno()] = connection
                        requests[connection.fileno()] = ''
                        responses[connection.fileno()] = response.encode()

                    elif event & select.EPOLLIN:
                        try:
                            requests[fid] = connections[fid].recv(1024)
                            if len(str(requests[fid].decode())) == 0:
                                connections[fid].shutdown(socket.SHUT_RDWR)
                                break
                            else:
                                print("2 | ------ : " + str(requests[fid].decode()) + "\n")
                                byteswritten = connections[fid].send(responses[fid])

                            if endflag in requests[fid]:
                                epoll.modify(fid, select.EPOLLOUT)
                                connections[fid].setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1)
                                print('-' * 40 + '\n' + requests[fid].decode()[:-2])
                        except:
                            continue

                    elif event & select.EPOLLOUT:
                        byteswritten = connections[fid].send(responses[fid])
                        responses[fid] = responses[fid][byteswritten:]
                        if len(responses[fid]) == 0:
                            connections[fid].setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 0)
                            epoll.modify(fid, 0)
                            connections[fid].shutdown(socket.SHUT_RDWR)

                    elif event & select.EPOLLHUP:
                        epoll.unregister(fid)
                        connections[fid].close()
                        del connections[fid]

        except:
            print("server excepted ...")
            epoll.unregister(self.serversocket.fileno())
            self.run()

        finally:
            print("server closed ...")

if __name__=="__main__":
    emsc = emsc_server()
    emsc.run()

Python Tcp長(zhǎng)連接 Client端代碼:

#coding:utf-8
import socket
import time

class emsc_client:

    def __init__(self):
        self.host = "127.0.0.1"
        self.port = 5000
        self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    def run(self):
        try:
            self.conn.connect((self.host, self.port))
            while True:
                self.conn.send(("來自客戶端發(fā)送的數(shù)據(jù) : " + str(time.time())).encode())
                data = self.conn.recv(1024).decode()
                print("來自服務(wù)端數(shù)據(jù) :" + data + "|" + str(time.time()))
                time.sleep(0.1)
        except:
            print("服務(wù)器連接異常,嘗試重新連接 (10s) ...")
            self.conn.close()
            time.sleep(10) # 斷開連接后,每10s重新連接一次
            emsc_client().run()

        finally:
            print("客戶端已關(guān)閉 ...")

if __name__=="__main__":
    emsc = emsc_client()
    emsc.run()

實(shí)現(xiàn)epoll越除,I/O復(fù)用

服務(wù)端斷開后,客戶端每10s重新連接

在windows 上

Python 采用Select 模式 實(shí)現(xiàn)Tcp 長(zhǎng)連接

#coding:utf-8
import select
import socket
import queue
import time
import os

class emsc_select_server:
    def __init__(self):
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.setblocking(False)
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_address = ('0.0.0.0', 5000)
        self.server.bind(self.server_address)
        self.server.listen(1000)
        self.inputs = [self.server]
        self.outputs = []
        self.message_queues = {}
        self.timeout = 20

    def run(self):
        response = "接收成功,返回?cái)?shù)據(jù): connecting status: 200 \n"
        response += "haody,client ! | "

        while self.inputs:
            print("waiting for next event")
            # timeout是超時(shí)摘盆,當(dāng)前連接要是超過這個(gè)時(shí)間的話翼雀,就會(huì)kill
            readable, writable, exceptional = select.select(self.inputs, self.outputs, self.inputs, self.timeout)

            if not (readable or writable or exceptional):
                print("Time out ! ")
                break
            for ser in readable:
                if ser is self.server:
                    # 通過inputs查看是否有客戶端來
                    connection, client_address = ser.accept()
                    print("connection from ", client_address)
                    connection.setblocking(0)
                    self.inputs.append(connection)
                    self.message_queues[connection] = queue.Queue()
                else:
                    data = ser.recv(1024)
                    if data:
                        print("收到數(shù)據(jù) ", data.decode(), "\n來自:", ser.getpeername())
                        self.message_queues[ser].put(data)
                        # 添加通道
                        if ser not in self.outputs:
                            self.outputs.append(ser)
                    else:
                        print("closing", client_address)
                        if ser in self.outputs:
                            self.outputs.remove(ser)
                        self.inputs.remove(ser)
                        ser.close()
                        # 清除隊(duì)列信息
                        del self.message_queues[ser]

            for ser in writable:
                try:
                    next_msg = self.message_queues[ser].get_nowait()
                except queue.Empty:
                    print(ser.getpeername(), 'queue empty')
                    self.outputs.remove(ser)
                else:
                    print("發(fā)送數(shù)據(jù) ", str(response + next_msg.decode()), " to ", ser.getpeername(),"\n")
                    ser.send(response.encode()+next_msg)

            for ser in exceptional:
                print(" exception condition on ", ser.getpeername())
                # stop listening for input on the connection
                self.inputs.remove(ser)
                if ser in self.outputs:
                    self.outputs.remove(ser)
                ser.close()
                # 清除隊(duì)列信息
                del self.message_queues[ser]

if __name__=="__main__":
    select_server = emsc_select_server()
    select_server.run()

客戶端:同上

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市孩擂,隨后出現(xiàn)的幾起案子狼渊,更是在濱河造成了極大的恐慌,老刑警劉巖类垦,帶你破解...
    沈念sama閱讀 211,423評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件狈邑,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡蚤认,警方通過查閱死者的電腦和手機(jī)米苹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,147評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來砰琢,“玉大人蘸嘶,你說我怎么就攤上這事∨闫” “怎么了训唱?”我有些...
    開封第一講書人閱讀 157,019評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)挚冤。 經(jīng)常有香客問我况增,道長(zhǎng),這世上最難降的妖魔是什么训挡? 我笑而不...
    開封第一講書人閱讀 56,443評(píng)論 1 283
  • 正文 為了忘掉前任巡通,我火速辦了婚禮,結(jié)果婚禮上舍哄,老公的妹妹穿的比我還像新娘。我一直安慰自己誊锭,他們只是感情好表悬,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,535評(píng)論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著丧靡,像睡著了一般蟆沫。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上温治,一...
    開封第一講書人閱讀 49,798評(píng)論 1 290
  • 那天饭庞,我揣著相機(jī)與錄音,去河邊找鬼熬荆。 笑死舟山,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播累盗,決...
    沈念sama閱讀 38,941評(píng)論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼寒矿,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了若债?” 一聲冷哼從身側(cè)響起符相,我...
    開封第一講書人閱讀 37,704評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蠢琳,沒想到半個(gè)月后啊终,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,152評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡傲须,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,494評(píng)論 2 327
  • 正文 我和宋清朗相戀三年蓝牲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片躏碳。...
    茶點(diǎn)故事閱讀 38,629評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡搞旭,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出菇绵,到底是詐尸還是另有隱情肄渗,我是刑警寧澤,帶...
    沈念sama閱讀 34,295評(píng)論 4 329
  • 正文 年R本政府宣布咬最,位于F島的核電站翎嫡,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏永乌。R本人自食惡果不足惜惑申,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,901評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望翅雏。 院中可真熱鬧圈驼,春花似錦、人聲如沸望几。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽橄抹。三九已至靴迫,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間楼誓,已是汗流浹背玉锌。 一陣腳步聲響...
    開封第一講書人閱讀 31,978評(píng)論 1 266
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留疟羹,地道東北人主守。 一個(gè)月前我還...
    沈念sama閱讀 46,333評(píng)論 2 360
  • 正文 我出身青樓禀倔,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親丸逸。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蹋艺,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,499評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容