地方エンジニアの学習日記

興味ある技術の雑なメモだったりを書いてくブログ。たまに日記とガジェット紹介。

【Python】async IOでセマフォ

httpリクエストの同時実行数制御しつつ色々やりたい時に便利

  • asyncio.create_task()+semaphore.acquire()/release() で明示的に並列数を制御
  • async_timeout を使い 5 秒でタイムアウト
  • 例外が起きても最後まで走らせ、結果を URL ごとに保持
import asyncio
import aiohttp
import async_timeout

CONCURRENCY = 2        # 同時接続上限
TIMEOUT_SEC = 5        # 1 リクエストのタイムアウト

async def fetch(url: str, session: aiohttp.ClientSession,
                semaphore: asyncio.Semaphore, results: dict[str, str]) -> None:
    async with semaphore:                         # 上限までスロットを確保
        try:
            print(f"[REQ] {url}")
            async with async_timeout.timeout(TIMEOUT_SEC):
                async with session.get(url) as resp:
                    resp.raise_for_status()
                    results[url] = await resp.text()
                    print(f"[OK ] {url} ({len(results[url])} bytes)")
        except Exception as exc:                  # 何が起きても記録だけ
            results[url] = f"ERROR: {exc}"
            print(f"[ERR] {url}: {exc}")

async def main() -> None:
    urls = [
        "http://example.com",
        "http://example.org",
        "http://example.net",
        "http://example.edu",
        "http://example.io",
    ]

    semaphore = asyncio.Semaphore(CONCURRENCY)
    results: dict[str, str] = {}                  # URL → レスポンス or エラー

    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(fetch(u, session, semaphore, results))
                 for u in urls]
        await asyncio.gather(*tasks)              # ここで全タスク完了を待つ

    # ─ 結果表示 ───────────────────────────────────────────────
    for url in urls:
        body = results[url]
        if body.startswith("ERROR"):
            print(f"{url}: {body}")
        else:
            print(f"{url}: {len(body)} bytes")

if __name__ == "__main__":
    asyncio.run(main())