Python的異步async處理猾封,自3.5之后,基本可以成熟使用了抛丽。
使用async def
來定義異步事件谤职,在需要等待耗時任務(wù)時,用await
返回亿鲜,讓系統(tǒng)調(diào)度其它任務(wù)異步執(zhí)行柬帕。
最簡async程序(需要Python3.7):
https://docs.python.org/zh-cn/3/library/asyncio.html
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
#await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
如果多個協(xié)程需要并行執(zhí)行,則使用asyncio.gather
async def main():
print(f"started at {time.strftime('%X')}")
await asyncio.gather(
say_after(1, 'hello'),
say_after(2, 'world') )
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
注意:異步調(diào)用是非常復(fù)雜的狡门,你僅在需要時陷寝,才使用async/await
使用Pycharm
查看調(diào)用:
局部放大:
異步協(xié)程,非常適用于爬蟲應(yīng)用其馏。
對于普通爬蟲凤跑,不需要Scrapy這么重的框架,我們只需要加入簡單的控制并行數(shù)量叛复、異常重試等功能就夠了仔引。
常用庫:
-
aiojobs
簡單的并行控制 -
aiohttp
requests的異步版本 -
aiofile
異步讀寫文件 -
asyncpg
異步連接postgresql -
aioredis
異步連接redis -
uvloop
更快的asyncio實現(xiàn)
實例
輸入為urls.txt
文本扔仓,每一行是一個url地址:
https://regex101.com/
https://docs.python.org/3/this-url-will-404.html
https://www.nytimes.com/guides/
https://www.mediamatters.org/
https://1.1.1.1/
https://www.politico.com/tipsheets/morning-money
https://www.bloomberg.com/markets/economics
https://www.ietf.org/rfc/rfc2616.txt
爬蟲并行訪問這些地址fetch_html()
,然后處理parse()
咖耘。這里的例子是提取每個地址html文件里翘簇,包含的所有url字符串,并行寫入到文件foundurls.txt
儿倒。
設(shè)想我們想控制版保,爬蟲最多同時進(jìn)行N(N=3)個http get協(xié)程: aiojobs.create_scheduler(limit=3)
等待所有任務(wù)完成后,程序結(jié)束:
while scheduler._jobs:
await asyncio.sleep(1)
await scheduler.close()
完整代碼:
#!/usr/bin/env python3
"""Asynchronously get links embedded in multiple pages' HMTL."""
import asyncio
import logging
import re
import sys
from typing import IO
import urllib.error
import urllib.parse
import aiofiles
import aiohttp
import aiojobs as aiojobs
import uvloop
from aiohttp import ClientSession
logging.basicConfig(
format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
level=logging.DEBUG,
datefmt="%H:%M:%S",
stream=sys.stderr,
)
logger = logging.getLogger("areq")
logging.getLogger("chardet.charsetprober").disabled = True
HREF_RE = re.compile(r'href="(.*?)"')
async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
"""GET request wrapper to fetch page HTML.
kwargs are passed to `session.request()`.
"""
resp = await session.request(method="GET", url=url, **kwargs)
resp.raise_for_status()
logger.info("Got response [%s] for URL: %s", resp.status, url)
html = await resp.text()
return html
async def parse(url: str, session: ClientSession, **kwargs) -> set:
"""Find HREFs in the HTML of `url`."""
found = set()
try:
html = await fetch_html(url=url, session=session, **kwargs)
except (
aiohttp.ClientError,
# aiohttp.http_exceptions.HttpProcessingError,
) as e:
logger.error(
"aiohttp exception for %s [%s]: %s",
url,
getattr(e, "status", None),
getattr(e, "message", None),
)
return found
except Exception as e:
logger.exception(
"Non-aiohttp exception occured: %s", getattr(e, "__dict__", {})
)
return found
else:
for link in HREF_RE.findall(html):
try:
abslink = urllib.parse.urljoin(url, link)
except (urllib.error.URLError, ValueError):
logger.exception("Error parsing URL: %s", link)
pass
else:
found.add(abslink)
logger.info("Found %d links for %s", len(found), url)
return found
async def write_one(file: IO, url: str, **kwargs) -> None:
"""Write the found HREFs from `url` to `file`."""
res = await parse(url=url, **kwargs)
if not res:
return None
async with aiofiles.open(file, "a") as f:
for p in res:
await f.write(f"{url}\t{p}\n")
logger.info("Wrote results for source URL: %s", url)
async def bulk_crawl_and_write(file: IO, urlset: set, **kwargs) -> None:
"""Crawl & write concurrently to `file` for multiple `urls`."""
scheduler = await aiojobs.create_scheduler(limit=3)
async with ClientSession() as session:
for url in urlset:
await scheduler.spawn(write_one(file=file, url=url, session=session, **kwargs))
print(f'{scheduler} active_count:{scheduler.active_count} pending_count:{scheduler.pending_count}')
# await asyncio.sleep(3)
while scheduler._jobs:
# print(f'{scheduler} active_count:{scheduler.active_count} pending_count:{scheduler.pending_count}')
await asyncio.sleep(1)
await scheduler.close()
if __name__ == "__main__":
import pathlib
import sys
import time
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
here = pathlib.Path(__file__).parent
with open(here.joinpath("urls.txt")) as infile:
urls = set(map(str.strip, infile))
outpath = here.joinpath("foundurls.txt")
with open(outpath, "w") as outfile:
outfile.write("source_url\tparsed_url\n")
t0 = time.perf_counter()
asyncio.run(bulk_crawl_and_write(file=outpath, urlset=urls))
print(f'{__file__} finish in {time.perf_counter()-t0:.2f}sec.')
很好的入門tutorial:https://realpython.com/async-io-python/