利用 Python 的協(xié)程進行快速端口掃描

原文鏈接:http://wyb0.com/posts/2019/python-coroutine-fast-port-scan/

0x00 協(xié)程的優(yōu)勢

協(xié)程擁有極高的執(zhí)行效率,因為子程序切換不是線程切換,而是由程序自身控制,因此沒有線程切換的開銷请契。和多線程比青责,線程數(shù)量越多麸塞,協(xié)程的性能優(yōu)勢就越明顯么夫。

不需要多線程的鎖機制,因為只有一個線程无宿,也不存在同時寫變量沖突,在協(xié)程中控制共享資源不加鎖枢里,只需要判斷狀態(tài)就好了孽鸡,所以執(zhí)行效率比多線程高很多

0x01 Python中的協(xié)程

協(xié)程也就是微線程,python 的 generator(生成器) 中的 yield 可以一定程度上實現(xiàn)協(xié)程

在 generator 中栏豺,我們不但可以通過 for 循環(huán)來迭代彬碱,還可以不斷調(diào)用 next() 函數(shù)獲取由 yield 語句返回的下一個值。

但是 Python 的 yield 不但可以返回一個值奥洼,它還可以接收調(diào)用者發(fā)出的參數(shù)巷疼。


coroutine_case.png

0x02 使用 gevent

python 中可以通過 generator 實現(xiàn)協(xié)程,但是不完全灵奖,第三方的 gevent 為 Python 提供了比較完善的協(xié)程支持嚼沿,gevent 可以通過 monkey patch 動態(tài)的修改 Python 自帶的一些標準庫

由于 IO 操作(比如訪問網(wǎng)絡)非常耗時估盘,經(jīng)常使程序處于等待狀態(tài),而 gevent 可以為我們自動切換協(xié)程骡尽,再在適當?shù)臅r候切換回來繼續(xù)執(zhí)行遣妥,這就保證總有 greenlet 在運行,而不是等待 IO

使用 gevent 可以獲得極高的并發(fā)性能攀细,但 gevent 只能在 Unix/Linux 下運行箫踩,在 Windows 下不保證正常安裝和運行
下面 3 個網(wǎng)絡操作是并發(fā)執(zhí)行的,且結(jié)束順序不同谭贪,但只有一個線程

from gevent import monkey; monkey.patch_all()
import requests
import gevent

def get_resp_size(url):
    print('GET: %s' % url)
    html = requests.get(url).content
    print('%d bytes received from %s.' % (len(html), url))

def gevent_test(urls):
    job_list = [gevent.spawn(get_resp_size, url) for url in urls]
    gevent.joinall(job_list)

urls = [
    'https://www.python.org/',
    'https://www.yahoo.com/',
    'https://github.com/',
]
gevent_test(urls)

0x03 asyncio

在 python 3.4 時引入了 asyncio 這個模塊班套,asyncio 專門被用來實現(xiàn)異步IO操作。

通過使用 <f>yield from</f> 和 asyncio 模塊中的 <f>@asyncio.coroutine</f> 可以實現(xiàn)協(xié)程

對于簡單的迭代器故河,yield from iterable 本質(zhì)上等于 for item in iterable: yield item 的縮寫版

  • hello world 示例
@asyncio.coroutine
def hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")
  • 請求web網(wǎng)頁
import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

0x04 async/await

在 python 3.5 時引入了 async/await

  • 關于asyncio的一些關鍵字的說明

    • event_loop
      事件循環(huán):程序開啟一個無限循環(huán)吱韭,把一些函數(shù)注冊到事件循環(huán)上,當滿足事件發(fā)生的時候鱼的,調(diào)用相應的協(xié)程函數(shù)
    • coroutine
      協(xié)程對象理盆,指一個使用async關鍵字定義的函數(shù),它的調(diào)用不會立即執(zhí)行函數(shù)凑阶,而是會返回一個協(xié)程對象猿规。協(xié)程對象需要注冊到事件循環(huán),由事件循環(huán)調(diào)用宙橱。
    • task
      一個協(xié)程對象就是一個原生可以掛起的函數(shù)姨俩,任務則是對協(xié)程進一步封裝,其中包含了任務的各種狀態(tài)
    • future
      代表將來執(zhí)行或沒有執(zhí)行的任務的結(jié)果师郑。它和task沒有本質(zhì)上的區(qū)別
    • async/await
      async定義一個協(xié)程环葵,await就像生成器里的yield一樣用于掛起阻塞的異步調(diào)用接口。
  • async和await是針對coroutine的新語法宝冕,要使用新的語法张遭,只需要做兩步簡單的替換:

    • 把@asyncio.coroutine替換為async;
    • 把yield from替換為await地梨。
  • hello world 示例
@asyncio.coroutine
def hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")

# 改為如下代碼:

async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")
  • 使用asyncio創(chuàng)建任務運行菊卷,并給task指定callback得到執(zhí)行結(jié)果
import asyncio

async def do_some_work(x):
    print("waiting:", x)
    return "Done after {}s".format(x)

def callback(future):
    result = future.result()
    print('callback:',result)

def run1():
    loop = asyncio.get_event_loop() # 定義一個事件loop
    coroutine = do_some_work(2) # 定義協(xié)程對象,它不能直接運行
    # run_unitl_complete() 需要傳入一個 Future 對象
    # 若傳入?yún)f(xié)程的話 run_unitl_complete 內(nèi)部會將協(xié)程包裝成一個任務(task)對象
    # task 對象是 Future 類的子類宝剖,保存了協(xié)程運行后的狀態(tài)洁闰,用于未來獲取協(xié)程的結(jié)果
    result = loop.run_until_complete(coroutine)
    print(result)
    loop.close()

def run2():
    loop = asyncio.get_event_loop() # 定義一個事件loop
    coroutine = do_some_work(1) # 定義協(xié)程對象,它不能直接運行
    task = loop.create_task(coroutine) # 創(chuàng)建協(xié)程 task
    # task = asyncio.ensure_future(coroutine) # 也可以使用 ensure_future() 直接創(chuàng)建 Future 對象
    task.add_done_callback(callback) # 回調(diào)函數(shù)万细,獲取task的返回值
    loop.run_until_complete(task) # 將task加入到事件循環(huán) loop
    loop.close()
  • 使用 asyncio 并發(fā)執(zhí)行協(xié)程

異步和并發(fā)與并行并沒有關系扑眉,異步用于表示并發(fā)或并行任務的印象

import asyncio

async def do_some_work(x):
    print("waiting:", x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

def callback(future):
    print('callback',future.result())

def run1():
    # 4s 后結(jié)果同時返回
    tasks = [asyncio.ensure_future(do_some_work(x)) for x in [2, 1, 4]]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    for task in tasks:
        result = task.result()
        print(result)
    loop.close()

def run2():
    # 1s、2s、4s 分別返回結(jié)果
    tasks = [asyncio.ensure_future(do_some_work(x)) for x in [2,1,4]]
    for task in tasks:
        task.add_done_callback(callback)
    loop = asyncio.get_event_loop()
    # wait 返回完成的和未完成的任務
    done, pending = loop.run_until_complete(asyncio.wait(tasks))
    # results = loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()
  • 停止協(xié)程
import asyncio

async def do_some_work(x):
    print("waiting:", x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

def callback(future):
    print('callback',future.result())

def run():
    # 1s襟雷、2s刃滓、4s 分別返回結(jié)果
    tasks = [asyncio.ensure_future(do_some_work(x)) for x in [2,1,4]]
    for task in tasks:
        task.add_done_callback(callback)

    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        for task in asyncio.Task.all_tasks():
            print(task.cancel())
        loop.stop()
        # loop stop 之后還需要再次開啟事件循環(huán),最后再 close耸弄,不然還會拋出異常
        loop.run_forever()
    finally:
        loop.close()

0x05 協(xié)程與線程配合使用

import asyncio
import threading

async def do_some_work(x):
    print("waiting:", x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

def callback(future):
    print('callback',future.result())

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever() # 和 run_until_complete 作用一樣咧虎,不過這個在協(xié)程運行完后不會停止

def run():
    # 在子線程中運行協(xié)程loop
    sub_loop = asyncio.new_event_loop()
    thread = threading.Thread(target=start_loop, args=(sub_loop,))
    thread.start()

    # 在主線程給子線程的 loop 添加協(xié)程任務
    futures = [asyncio.run_coroutine_threadsafe(do_some_work(x), sub_loop) for x in [2,1,4]]
    for future in futures:
        future.add_done_callback(callback)

    print('test...')
    
    try:
        while True:pass
    except KeyboardInterrupt as e:
        import sys;sys.exit('user aborted!')
    finally:
        sub_loop.call_soon_threadsafe(sub_loop.stop)

run()
?  python3 tmp.py
waiting: 2
test...
waiting: 1
waiting: 4
callback Done after 1s
callback Done after 2s
callback Done after 4s
^Cuser aborted!

0x06 通過協(xié)程實現(xiàn)全端口 tcp 快速掃描

import time
import asyncio
import threading

class PortScan(object):
    """docstring for PortScan"""
    def __init__(self, ip_list=["127.0.0.1"], all_ports=False, rate=2000):
        super(PortScan, self).__init__()
        self.ip_list = ip_list
        self.rate = rate
        self.all_ports = all_ports
        self.open_list = {}
        self.common_port = "21,22,23,25,53,69,80,81,82,83,84,85,86,87,88,89,110,111,135,139,143,161,389,443,445,465,513,873,993,995,1080,1099,1158,1433,1521,1533,1863,2049,2100,2181,3128,3306,3307,3308,3389,3690,5000,5432,5900,6379,7001,8000,8001,8002,8003,8004,8005,8006,8007,8008,8009,8010,8011,8012,8013,8014,8015,8016,8017,8018,8019,8020,8021,8022,8023,8024,8025,8026,8027,8028,8029,8030,8031,8032,8033,8034,8035,8036,8037,8038,8039,8040,8041,8042,8043,8044,8045,8046,8047,8048,8049,8050,8051,8052,8053,8054,8055,8056,8057,8058,8059,8060,8061,8062,8063,8064,8065,8066,8067,8068,8069,8070,8071,8072,8073,8074,8075,8076,8077,8078,8079,8080,8081,8082,8083,8084,8085,8086,8087,8088,8089,8090,8888,9000,9080,9090,9200,9300,9418,11211,27017,27018,27019,50060"

    async def async_port_check(self, semaphore, ip_port):
        async with semaphore:
            ip,port = ip_port
            conn = asyncio.open_connection(ip, port)
            try:
                reader, writer = await asyncio.wait_for(conn, timeout=10)
                return (ip, port, 'open')
            except Exception as e:
                # print(e)
                return (ip, port, 'close')

    def callback(self, future):
        ip,port,status = future.result()
        if status == "open":
            print(ip,port,status)
            try:
                if ip in self.open_list:
                    self.open_list[ip].append(port)
                else:
                    self.open_list[ip] = [port]
            except Exception as e:
                print(e)
        else:
            pass

    def async_tcp_port_scan(self):
        ports = [port for port in range(11,65535)] if self.all_ports else self.common_port.split(',')
        ip_port_list = [(ip,int(port)) for ip in self.ip_list for port in ports]

        sem = asyncio.Semaphore(self.rate) # 限制并發(fā)量
        loop = asyncio.get_event_loop()

        tasks = list()
        for ip_port in ip_port_list:
            task = asyncio.ensure_future(self.async_port_check(sem, ip_port))
            task.add_done_callback(self.callback)
            tasks.append(task)

        loop.run_until_complete(asyncio.wait(tasks))

        print(self.open_list)


if __name__ == '__main__':
    ip_list = ["59.108.35.198"]

    now = time.time

    start = now()
    ps = PortScan(ip_list,True,2000)
    ps.async_tcp_port_scan()
    print("Time:",now()-start)
?  python3 tmp.py
59.108.35.198 22 open
59.108.35.198 80 open
59.108.35.198 8888 open
59.108.35.198 50050 open
{'59.108.35.198': [22, 80, 8888, 50050]}
Time: 49.96410322189331


Reference(侵刪):

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市计呈,隨后出現(xiàn)的幾起案子砰诵,更是在濱河造成了極大的恐慌,老刑警劉巖捌显,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件茁彭,死亡現(xiàn)場離奇詭異,居然都是意外死亡扶歪,警方通過查閱死者的電腦和手機理肺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來善镰,“玉大人妹萨,你說我怎么就攤上這事§牌郏” “怎么了乎完?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長品洛。 經(jīng)常有香客問我树姨,道長,這世上最難降的妖魔是什么桥状? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任帽揪,我火速辦了婚禮,結(jié)果婚禮上岛宦,老公的妹妹穿的比我還像新娘台丛。我一直安慰自己耍缴,他們只是感情好砾肺,可當我...
    茶點故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著防嗡,像睡著了一般变汪。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蚁趁,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天裙盾,我揣著相機與錄音,去河邊找鬼。 笑死番官,一個胖子當著我的面吹牛庐完,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播徘熔,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼门躯,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了酷师?” 一聲冷哼從身側(cè)響起讶凉,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎山孔,沒想到半個月后懂讯,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡台颠,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年褐望,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片串前。...
    茶點故事閱讀 40,424評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡譬挚,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出酪呻,到底是詐尸還是另有隱情减宣,我是刑警寧澤,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布玩荠,位于F島的核電站漆腌,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏阶冈。R本人自食惡果不足惜闷尿,卻給世界環(huán)境...
    茶點故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望女坑。 院中可真熱鬧填具,春花似錦、人聲如沸匆骗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽碉就。三九已至盟广,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間瓮钥,已是汗流浹背筋量。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工烹吵, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人桨武。 一個月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓肋拔,卻偏偏與公主長得像,于是被迫代替她去往敵國和親呀酸。 傳聞我的和親對象是個殘疾皇子只损,可洞房花燭夜當晚...
    茶點故事閱讀 45,435評論 2 359