httpリクエストの同時実行数制御しつつ色々やりたい時に便利
- asyncio.create_task()+semaphore.acquire()/release() で明示的に並列数を制御
- async_timeout を使い 5 秒でタイムアウト
- 例外が起きても最後まで走らせ、結果を URL ごとに保持
import asyncio import aiohttp import async_timeout CONCURRENCY = 2 # 同時接続上限 TIMEOUT_SEC = 5 # 1 リクエストのタイムアウト async def fetch(url: str, session: aiohttp.ClientSession, semaphore: asyncio.Semaphore, results: dict[str, str]) -> None: async with semaphore: # 上限までスロットを確保 try: print(f"[REQ] {url}") async with async_timeout.timeout(TIMEOUT_SEC): async with session.get(url) as resp: resp.raise_for_status() results[url] = await resp.text() print(f"[OK ] {url} ({len(results[url])} bytes)") except Exception as exc: # 何が起きても記録だけ results[url] = f"ERROR: {exc}" print(f"[ERR] {url}: {exc}") async def main() -> None: urls = [ "http://example.com", "http://example.org", "http://example.net", "http://example.edu", "http://example.io", ] semaphore = asyncio.Semaphore(CONCURRENCY) results: dict[str, str] = {} # URL → レスポンス or エラー async with aiohttp.ClientSession() as session: tasks = [asyncio.create_task(fetch(u, session, semaphore, results)) for u in urls] await asyncio.gather(*tasks) # ここで全タスク完了を待つ # ─ 結果表示 ─────────────────────────────────────────────── for url in urls: body = results[url] if body.startswith("ERROR"): print(f"{url}: {body}") else: print(f"{url}: {len(body)} bytes") if __name__ == "__main__": asyncio.run(main())