import asyncio class UnsafeCounter: def __init__(self): self.count = 0 async def increment(self): current = self.count await asyncio.sleep(0.001) # わざと文脈切替を誘発 self.count = current + 1 async def main(): counter = UnsafeCounter() tasks = [asyncio.create_task(counter.increment()) for _ in range(1000)] await asyncio.gather(*tasks) print(f"Final count: {counter.count}") asyncio.run(main())
awaitはタスクのスイッチを起こす
await は、非同期処理の実行中にタスクの一時停止を明示的に指示する構文です。Python の非同期ランタイム(asyncio など)は、await を検出すると現在のコルーチンの実行を中断し、制御をイベントループに戻す。このとき、イベントループはスケジュールされている他のタスクを選んで実行することができます。これにより、単一スレッドでも複数の非同期タスクが協調的に切り替わりながら並行処理されます。つまり、await はタスク切り替え(コンテキストスイッチ)が発生するタイミングを明示的に示すものであると言えます。
def __step(self, exc=None): if self.done(): raise exceptions.InvalidStateError( f'_step(): already done: {self!r}, {exc!r}') if self._must_cancel: if not isinstance(exc, exceptions.CancelledError): exc = self._make_cancelled_error() self._must_cancel = False coro = self._coro self._fut_waiter = None _enter_task(self._loop, self) # Call either coro.throw(exc) or coro.send(None). try: if exc is None: # We use the `send` method directly, because coroutines # don't have `__iter__` and `__next__` methods. result = coro.send(None) else: result = coro.throw(exc) except StopIteration as exc: if self._must_cancel: # Task is cancelled right before coro stops. self._must_cancel = False super().cancel(msg=self._cancel_message) else: super().set_result(exc.value) except exceptions.CancelledError as exc: # Save the original exception so we can chain it later. self._cancelled_exc = exc super().cancel() # I.e., Future.cancel(self). except (KeyboardInterrupt, SystemExit) as exc: super().set_exception(exc) raise except BaseException as exc: super().set_exception(exc) else: blocking = getattr(result, '_asyncio_future_blocking', None) if blocking is not None: # Yielded Future must come from Future.__iter__(). if futures._get_loop(result) is not self._loop: new_exc = RuntimeError( f'Task {self!r} got Future ' f'{result!r} attached to a different loop') self._loop.call_soon( self.__step, new_exc, context=self._context) elif blocking: if result is self: new_exc = RuntimeError( f'Task cannot await on itself: {self!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) else: result._asyncio_future_blocking = False result.add_done_callback( self.__wakeup, context=self._context) self._fut_waiter = result if self._must_cancel: if self._fut_waiter.cancel( msg=self._cancel_message): self._must_cancel = False else: new_exc = RuntimeError( f'yield was used instead of yield from ' f'in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) elif result is None: # Bare yield relinquishes control for one event loop iteration. self._loop.call_soon(self.__step, context=self._context) elif inspect.isgenerator(result): # Yielding a generator is just wrong. new_exc = RuntimeError( f'yield was used instead of yield from for ' f'generator in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) else: # Yielding something else is an error. new_exc = RuntimeError(f'Task got bad yield: {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) finally: _leave_task(self._loop, self) self = None # Needed to break cycles when an exception occurs.