Python asyncio + aiojobs簡單異步框架

Python的異步async處理猾封,自3.5之后,基本可以成熟使用了抛丽。
使用async def來定義異步事件谤职,在需要等待耗時任務(wù)時,用await返回亿鲜,讓系統(tǒng)調(diào)度其它任務(wù)異步執(zhí)行柬帕。

最簡async程序(需要Python3.7):

https://docs.python.org/zh-cn/3/library/asyncio.html

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    #await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

如果多個協(xié)程需要并行執(zhí)行,則使用asyncio.gather

async def main():
    print(f"started at {time.strftime('%X')}")

    await asyncio.gather(
        say_after(1, 'hello'),
        say_after(2, 'world') )

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

注意:異步調(diào)用是非常復(fù)雜的狡门,你僅在需要時陷寝,才使用async/await
使用Pycharm查看調(diào)用:

image.png

局部放大:


image.png

異步協(xié)程,非常適用于爬蟲應(yīng)用其馏。
對于普通爬蟲凤跑,不需要Scrapy這么重的框架,我們只需要加入簡單的控制并行數(shù)量叛复、異常重試等功能就夠了仔引。

常用庫:

  • aiojobs 簡單的并行控制
  • aiohttp requests的異步版本
  • aiofile 異步讀寫文件
  • asyncpg 異步連接postgresql
  • aioredis 異步連接redis
  • uvloop 更快的asyncio實現(xiàn)

實例

輸入為urls.txt文本扔仓,每一行是一個url地址:

https://regex101.com/
https://docs.python.org/3/this-url-will-404.html
https://www.nytimes.com/guides/
https://www.mediamatters.org/
https://1.1.1.1/
https://www.politico.com/tipsheets/morning-money
https://www.bloomberg.com/markets/economics
https://www.ietf.org/rfc/rfc2616.txt

爬蟲并行訪問這些地址fetch_html(),然后處理parse()咖耘。這里的例子是提取每個地址html文件里翘簇,包含的所有url字符串,并行寫入到文件foundurls.txt儿倒。

設(shè)想我們想控制版保,爬蟲最多同時進(jìn)行N(N=3)個http get協(xié)程: aiojobs.create_scheduler(limit=3)
等待所有任務(wù)完成后,程序結(jié)束:

while scheduler._jobs:
            await asyncio.sleep(1)
        await scheduler.close()

完整代碼:

#!/usr/bin/env python3

"""Asynchronously get links embedded in multiple pages' HMTL."""

import asyncio
import logging
import re
import sys
from typing import IO
import urllib.error
import urllib.parse

import aiofiles
import aiohttp
import aiojobs as aiojobs
import uvloop
from aiohttp import ClientSession

logging.basicConfig(
    format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
    level=logging.DEBUG,
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)
logger = logging.getLogger("areq")
logging.getLogger("chardet.charsetprober").disabled = True

HREF_RE = re.compile(r'href="(.*?)"')


async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
    """GET request wrapper to fetch page HTML.

    kwargs are passed to `session.request()`.
    """

    resp = await session.request(method="GET", url=url, **kwargs)
    resp.raise_for_status()
    logger.info("Got response [%s] for URL: %s", resp.status, url)
    html = await resp.text()
    return html


async def parse(url: str, session: ClientSession, **kwargs) -> set:
    """Find HREFs in the HTML of `url`."""
    found = set()
    try:
        html = await fetch_html(url=url, session=session, **kwargs)
    except (
            aiohttp.ClientError,
            # aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        logger.error(
            "aiohttp exception for %s [%s]: %s",
            url,
            getattr(e, "status", None),
            getattr(e, "message", None),
        )
        return found
    except Exception as e:
        logger.exception(
            "Non-aiohttp exception occured:  %s", getattr(e, "__dict__", {})
        )
        return found
    else:
        for link in HREF_RE.findall(html):
            try:
                abslink = urllib.parse.urljoin(url, link)
            except (urllib.error.URLError, ValueError):
                logger.exception("Error parsing URL: %s", link)
                pass
            else:
                found.add(abslink)
        logger.info("Found %d links for %s", len(found), url)
        return found


async def write_one(file: IO, url: str, **kwargs) -> None:
    """Write the found HREFs from `url` to `file`."""
    res = await parse(url=url, **kwargs)
    if not res:
        return None
    async with aiofiles.open(file, "a") as f:
        for p in res:
            await f.write(f"{url}\t{p}\n")
        logger.info("Wrote results for source URL: %s", url)


async def bulk_crawl_and_write(file: IO, urlset: set, **kwargs) -> None:
    """Crawl & write concurrently to `file` for multiple `urls`."""
    scheduler = await aiojobs.create_scheduler(limit=3)
    async with ClientSession() as session:
        for url in urlset:
            await scheduler.spawn(write_one(file=file, url=url, session=session, **kwargs))
        print(f'{scheduler} active_count:{scheduler.active_count} pending_count:{scheduler.pending_count}')
        # await asyncio.sleep(3)
        while scheduler._jobs:
            # print(f'{scheduler} active_count:{scheduler.active_count} pending_count:{scheduler.pending_count}')
            await asyncio.sleep(1)
        await scheduler.close()

if __name__ == "__main__":
    import pathlib
    import sys
    import time

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    outpath = here.joinpath("foundurls.txt")
    with open(outpath, "w") as outfile:
        outfile.write("source_url\tparsed_url\n")
    t0 = time.perf_counter()
    asyncio.run(bulk_crawl_and_write(file=outpath, urlset=urls))
    print(f'{__file__} finish in {time.perf_counter()-t0:.2f}sec.') 

很好的入門tutorial:https://realpython.com/async-io-python/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末夫否,一起剝皮案震驚了整個濱河市彻犁,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌凰慈,老刑警劉巖汞幢,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異微谓,居然都是意外死亡森篷,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進(jìn)店門豺型,熙熙樓的掌柜王于貴愁眉苦臉地迎上來仲智,“玉大人,你說我怎么就攤上這事触创】裁辏” “怎么了为牍?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵哼绑,是天一觀的道長。 經(jīng)常有香客問我碉咆,道長抖韩,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任疫铜,我火速辦了婚禮茂浮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘壳咕。我一直安慰自己席揽,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布谓厘。 她就那樣靜靜地躺著幌羞,像睡著了一般。 火紅的嫁衣襯著肌膚如雪竟稳。 梳的紋絲不亂的頭發(fā)上属桦,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天熊痴,我揣著相機(jī)與錄音,去河邊找鬼聂宾。 笑死果善,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的系谐。 我是一名探鬼主播巾陕,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼蔚鸥!你這毒婦竟也來了惜论?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤止喷,失蹤者是張志新(化名)和其女友劉穎馆类,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體弹谁,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡乾巧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了预愤。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片沟于。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖植康,靈堂內(nèi)的尸體忽然破棺而出旷太,到底是詐尸還是另有隱情,我是刑警寧澤销睁,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布供璧,位于F島的核電站,受9級特大地震影響冻记,放射性物質(zhì)發(fā)生泄漏睡毒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一冗栗、第九天 我趴在偏房一處隱蔽的房頂上張望演顾。 院中可真熱鬧,春花似錦隅居、人聲如沸钠至。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽棉钧。三九已至,卻和暖如春乒融,著一層夾襖步出監(jiān)牢的瞬間掰盘,已是汗流浹背摄悯。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留愧捕,地道東北人奢驯。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像次绘,于是被迫代替她去往敵國和親瘪阁。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,722評論 2 345