地方エンジニアの学習日記

興味ある技術の雑なメモだったりを書いてくブログ。たまに日記とガジェット紹介。

python async

async

イベントループの取得

asyncio.get_event_loop()

カレントイベントループを取得。カレントスレッドにカレントイベントループがなければ自動的にイベントループを作り,それをカレントイベントループに設定する。一度設定し,再度作っても同じカレントイベントループになる。

loop = asyncio.get_event_loop()
loop_2 = asyncio.get_event_loop()
print(loop is loop_2) # True
asyncio.new_event_loop()

新しいイベントループを作る。ただしカレントイベントループではない。

new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

引数loopをカレントイベントループに設定する。

new_loop = asyncio.new_event_loop()
loop = asyncio.set_event_loop(new_loop)
asyncio.get_running_loop()

カレントスレッドで実行中のイベントループを返す。もしイベントループがなければ例外(RuntimeError)を発生させる。この関数はコルーチンもしくはコールバックからでのみ使用可能。

async def main():
    print(asyncio.get_running_loop()) # <_UnixSelectorEventLoop running=True closed=False debug=False>
    print('Hello') # Hello
    await asyncio.sleep(1)
    print('World') # World

asyncio.run(main())

ループの実行と停止

loop.run_until_complete(future)

引数future(Futureのインスタンス)が完了するまで実行する。引数がコルーチンの場合は,asyncio.Taskとして実行するまで予約される。Futureの戻り値か例外を返す。

async def main():
    print('Hello') # Hello
    await asyncio.sleep(1)
    print('World') # World

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
loop.run_forever()

loop.stop()が呼ばれるまでループし続ける。

def main(loop):
    print('Hello World')
    loop.stop()

loop = asyncio.get_event_loop()
loop.call_soon(main, loop)

try:
    loop.run_forever()
finally:
    loop.close()
loop.stop()

イベントループを停止する。

loop.is_running()

イベントループが実行中ならTrueを返す。

loop.is_closed()

イベントループが閉じられたらTrueを返す。

loop.close()

イベントループを閉じる。

(コルーチン関数)loop.shutdown_asyncgens()

非同期ジェネレーターオブジェクトを閉じる。

async def my_async_generator(iterable):
    for num in iterable:
        await asyncio.sleep(1)
        yield num

async def main():
    generator = my_async_generator([1, 2, 3, 4, 5])
    async for item in generator:
        print(item)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.call_soon(loop.shutdown_asyncgens)
    loop.close()