From 0a83846dae6071d0edf48c3a8065ae2d0b641833 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Sat, 27 May 2023 08:23:00 +0100 Subject: [PATCH 1/7] Add TaskScope and let TaskGroup subclass it The rewritten TaskGroup still passes all existing test cases. --- Lib/asyncio/taskgroups.py | 214 ++++------------------------------- Lib/asyncio/taskscope.py | 231 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 250 insertions(+), 195 deletions(-) create mode 100644 Lib/asyncio/taskscope.py diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 06b2e0db86a1fe..03004776aaa779 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -6,10 +6,10 @@ from . import events from . import exceptions -from . import tasks +from . import taskscope -class TaskGroup: +class TaskGroup(taskscope.TaskScope): """Asynchronous context manager for managing groups of tasks. Example use: @@ -26,114 +26,21 @@ class TaskGroup: The exceptions are then combined and raised as an `ExceptionGroup`. """ def __init__(self): - self._entered = False - self._exiting = False - self._aborting = False - self._loop = None - self._parent_task = None - self._parent_cancel_requested = False - self._tasks = set() + super().__init__() self._errors = [] - self._base_error = None - self._on_completed_fut = None - def __repr__(self): - info = [''] - if self._tasks: - info.append(f'tasks={len(self._tasks)}') - if self._errors: - info.append(f'errors={len(self._errors)}') - if self._aborting: - info.append('cancelling') - elif self._entered: - info.append('entered') - - info_str = ' '.join(info) - return f'' - - async def __aenter__(self): - if self._entered: - raise RuntimeError( - f"TaskGroup {self!r} has been already entered") - self._entered = True - - if self._loop is None: - self._loop = events.get_running_loop() - - self._parent_task = tasks.current_task(self._loop) - if self._parent_task is None: - raise RuntimeError( - f'TaskGroup {self!r} cannot determine the parent task') + def create_task(self, coro, *, name=None, context=None): + """Create a new task in this group and return it. - return self + Similar to `asyncio.create_task`. + """ + task = super().create_task(coro, name=name, context=context) + if not task.done(): + task.add_done_callback(self._handle_completion_as_group) + return task async def __aexit__(self, et, exc, tb): - self._exiting = True - - if (exc is not None and - self._is_base_error(exc) and - self._base_error is None): - self._base_error = exc - - propagate_cancellation_error = \ - exc if et is exceptions.CancelledError else None - if self._parent_cancel_requested: - # If this flag is set we *must* call uncancel(). - if self._parent_task.uncancel() == 0: - # If there are no pending cancellations left, - # don't propagate CancelledError. - propagate_cancellation_error = None - - if et is not None: - if not self._aborting: - # Our parent task is being cancelled: - # - # async with TaskGroup() as g: - # g.create_task(...) - # await ... # <- CancelledError - # - # or there's an exception in "async with": - # - # async with TaskGroup() as g: - # g.create_task(...) - # 1 / 0 - # - self._abort() - - # We use while-loop here because "self._on_completed_fut" - # can be cancelled multiple times if our parent task - # is being cancelled repeatedly (or even once, when - # our own cancellation is already in progress) - while self._tasks: - if self._on_completed_fut is None: - self._on_completed_fut = self._loop.create_future() - - try: - await self._on_completed_fut - except exceptions.CancelledError as ex: - if not self._aborting: - # Our parent task is being cancelled: - # - # async def wrapper(): - # async with TaskGroup() as g: - # g.create_task(foo) - # - # "wrapper" is being cancelled while "foo" is - # still running. - propagate_cancellation_error = ex - self._abort() - - self._on_completed_fut = None - - assert not self._tasks - - if self._base_error is not None: - raise self._base_error - - # Propagate CancelledError if there is one, except if there - # are other errors -- those have priority. - if propagate_cancellation_error and not self._errors: - raise propagate_cancellation_error + await super().__aexit__(et, exc, tb) if et is not None and et is not exceptions.CancelledError: self._errors.append(exc) @@ -148,95 +55,12 @@ async def __aexit__(self, et, exc, tb): finally: self._errors = None - def create_task(self, coro, *, name=None, context=None): - """Create a new task in this group and return it. - - Similar to `asyncio.create_task`. - """ - if not self._entered: - raise RuntimeError(f"TaskGroup {self!r} has not been entered") - if self._exiting and not self._tasks: - raise RuntimeError(f"TaskGroup {self!r} is finished") - if self._aborting: - raise RuntimeError(f"TaskGroup {self!r} is shutting down") - if context is None: - task = self._loop.create_task(coro) - else: - task = self._loop.create_task(coro, context=context) - tasks._set_task_name(task, name) - # optimization: Immediately call the done callback if the task is - # already done (e.g. if the coro was able to complete eagerly), - # and skip scheduling a done callback - if task.done(): - self._on_task_done(task) - else: - self._tasks.add(task) - task.add_done_callback(self._on_task_done) - return task - - # Since Python 3.8 Tasks propagate all exceptions correctly, - # except for KeyboardInterrupt and SystemExit which are - # still considered special. - - def _is_base_error(self, exc: BaseException) -> bool: - assert isinstance(exc, BaseException) - return isinstance(exc, (SystemExit, KeyboardInterrupt)) - - def _abort(self): - self._aborting = True - - for t in self._tasks: - if not t.done(): - t.cancel() - - def _on_task_done(self, task): - self._tasks.discard(task) - - if self._on_completed_fut is not None and not self._tasks: - if not self._on_completed_fut.done(): - self._on_completed_fut.set_result(True) - + def _handle_completion_as_group(self, task): if task.cancelled(): return - - exc = task.exception() - if exc is None: - return - - self._errors.append(exc) - if self._is_base_error(exc) and self._base_error is None: - self._base_error = exc - - if self._parent_task.done(): - # Not sure if this case is possible, but we want to handle - # it anyways. - self._loop.call_exception_handler({ - 'message': f'Task {task!r} has errored out but its parent ' - f'task {self._parent_task} is already completed', - 'exception': exc, - 'task': task, - }) - return - - if not self._aborting and not self._parent_cancel_requested: - # If parent task *is not* being cancelled, it means that we want - # to manually cancel it to abort whatever is being run right now - # in the TaskGroup. But we want to mark parent task as - # "not cancelled" later in __aexit__. Example situation that - # we need to handle: - # - # async def foo(): - # try: - # async with TaskGroup() as g: - # g.create_task(crash_soon()) - # await something # <- this needs to be canceled - # # by the TaskGroup, e.g. - # # foo() needs to be cancelled - # except Exception: - # # Ignore any exceptions raised in the TaskGroup - # pass - # await something_else # this line has to be called - # # after TaskGroup is finished. - self._abort() - self._parent_cancel_requested = True - self._parent_task.cancel() + if (exc := task.exception()) is not None: + self._errors.append(exc) + if not self._aborting and not self._parent_cancel_requested: + self._abort() + self._parent_cancel_requested = True + self._parent_task.cancel() diff --git a/Lib/asyncio/taskscope.py b/Lib/asyncio/taskscope.py new file mode 100644 index 00000000000000..ce412d0b546e1e --- /dev/null +++ b/Lib/asyncio/taskscope.py @@ -0,0 +1,231 @@ +# Adapted with permission from the EdgeDB project; +# license: PSFL. + + +__all__ = ["TaskScope"] + +from . import events +from . import exceptions +from . import tasks + + +class TaskScope: + """Asynchronous context manager for managing a scope of subtasks. + + Example use: + + async with asyncio.TaskScope() as scope: + task1 = scope.create_task(some_coroutine(...)) + task2 = scope.create_task(other_coroutine(...)) + print("Both tasks have completed now.") + + All tasks are awaited when the context manager exits. + + Any exceptions other than `asyncio.CancelledError` raised within + a task will be ignored (TODO: allow a custom exception handler?) + and it is the caller's responsibility to catch them in task callbacks. + """ + def __init__(self): + self._entered = False + self._exiting = False + self._aborting = False + self._loop = None + self._parent_task = None + self._parent_cancel_requested = False + self._tasks = set() + self._errors = [] + self._base_error = None + self._on_completed_fut = None + + def __repr__(self): + info = [''] + if self._tasks: + info.append(f'tasks={len(self._tasks)}') + if self._errors: + info.append(f'errors={len(self._errors)}') + if self._aborting: + info.append('cancelling') + elif self._entered: + info.append('entered') + + info_str = ' '.join(info) + return f'<{type(self).__name__}{info_str}>' + + async def __aenter__(self): + if self._entered: + raise RuntimeError( + f"TaskGroup {self!r} has been already entered") + self._entered = True + + if self._loop is None: + self._loop = events.get_running_loop() + + self._parent_task = tasks.current_task(self._loop) + if self._parent_task is None: + raise RuntimeError( + f'TaskGroup {self!r} cannot determine the parent task') + + return self + + async def __aexit__(self, et, exc, tb): + self._exiting = True + + if (exc is not None and + self._is_base_error(exc) and + self._base_error is None): + self._base_error = exc + + propagate_cancellation_error = \ + exc if et is exceptions.CancelledError else None + if self._parent_cancel_requested: + # If this flag is set we *must* call uncancel(). + if self._parent_task.uncancel() == 0: + # If there are no pending cancellations left, + # don't propagate CancelledError. + propagate_cancellation_error = None + + if et is not None: + if not self._aborting: + # Our parent task is being cancelled: + # + # async with TaskGroup() as g: + # g.create_task(...) + # await ... # <- CancelledError + # + # or there's an exception in "async with": + # + # async with TaskGroup() as g: + # g.create_task(...) + # 1 / 0 + # + self._abort() + + # We use while-loop here because "self._on_completed_fut" + # can be cancelled multiple times if our parent task + # is being cancelled repeatedly (or even once, when + # our own cancellation is already in progress) + while self._tasks: + if self._on_completed_fut is None: + self._on_completed_fut = self._loop.create_future() + + try: + await self._on_completed_fut + except exceptions.CancelledError as ex: + if not self._aborting: + # Our parent task is being cancelled: + # + # async def wrapper(): + # async with TaskGroup() as g: + # g.create_task(foo) + # + # "wrapper" is being cancelled while "foo" is + # still running. + propagate_cancellation_error = ex + self._abort() + + self._on_completed_fut = None + + assert not self._tasks + + if self._base_error is not None: + raise self._base_error + + # Propagate CancelledError if there is one, except if there + # are other errors -- those have priority. + if propagate_cancellation_error and not self._errors: + raise propagate_cancellation_error + + def create_task(self, coro, *, name=None, context=None): + """Create a new task in this group and return it. + + Similar to `asyncio.create_task`. + """ + if not self._entered: + raise RuntimeError(f"TaskGroup {self!r} has not been entered") + if self._exiting and not self._tasks: + raise RuntimeError(f"TaskGroup {self!r} is finished") + if self._aborting: + raise RuntimeError(f"TaskGroup {self!r} is shutting down") + if context is None: + task = self._loop.create_task(coro) + else: + task = self._loop.create_task(coro, context=context) + tasks._set_task_name(task, name) + # optimization: Immediately call the done callback if the task is + # already done (e.g. if the coro was able to complete eagerly), + # and skip scheduling a done callback + if task.done(): + self._on_task_done(task) + else: + self._tasks.add(task) + task.add_done_callback(self._on_task_done) + return task + + # Since Python 3.8 Tasks propagate all exceptions correctly, + # except for KeyboardInterrupt and SystemExit which are + # still considered special. + + def _is_base_error(self, exc: BaseException) -> bool: + assert isinstance(exc, BaseException) + return isinstance(exc, (SystemExit, KeyboardInterrupt)) + + def _abort(self): + self._aborting = True + + for t in self._tasks: + if not t.done(): + t.cancel() + + shutdown = _abort # alias + + def _on_task_done(self, task): + self._tasks.discard(task) + + if self._on_completed_fut is not None and not self._tasks: + if not self._on_completed_fut.done(): + self._on_completed_fut.set_result(True) + + if task.cancelled(): + return + + exc = task.exception() + if exc is None: + return + + is_base_error = self._is_base_error(exc) + if is_base_error and self._base_error is None: + self._base_error = exc + + if self._parent_task.done(): + # Not sure if this case is possible, but we want to handle + # it anyways. + self._loop.call_exception_handler({ + 'message': f'Task {task!r} has errored out but its parent ' + f'task {self._parent_task} is already completed', + 'exception': exc, + 'task': task, + }) + return + + if is_base_error: + # If parent task *is not* being cancelled, it means that we want + # to manually cancel it to abort whatever is being run right now + # in the TaskGroup. But we want to mark parent task as + # "not cancelled" later in __aexit__. Example situation that + # we need to handle: + # + # async def foo(): + # try: + # async with TaskGroup() as g: + # g.create_task(crash_soon()) + # await something # <- this needs to be canceled + # # by the TaskGroup, e.g. + # # foo() needs to be cancelled + # except Exception: + # # Ignore any exceptions raised in the TaskGroup + # pass + # await something_else # this line has to be called + # # after TaskGroup is finished. + self._abort() + self._parent_cancel_requested = True + self._parent_task.cancel() From d683380c6152f1ffa383f961e1ea63aad6157e8a Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Sat, 27 May 2023 10:22:09 +0100 Subject: [PATCH 2/7] Work in progress: Add taskscope test cases - Also fixes `__all__` import of the taskgroups and taskscope modules --- Lib/asyncio/__init__.py | 3 + Lib/asyncio/taskgroups.py | 18 +- Lib/asyncio/taskscope.py | 65 +++- Lib/test/test_asyncio/test_taskscope.py | 496 ++++++++++++++++++++++++ 4 files changed, 566 insertions(+), 16 deletions(-) create mode 100644 Lib/test/test_asyncio/test_taskscope.py diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index fed16ec7c67fac..321513f263bd92 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -18,6 +18,7 @@ from .subprocess import * from .tasks import * from .taskgroups import * +from .taskscope import * from .timeouts import * from .threads import * from .transports import * @@ -34,6 +35,8 @@ streams.__all__ + subprocess.__all__ + tasks.__all__ + + taskgroups.__all__ + + taskscope.__all__ + threads.__all__ + timeouts.__all__ + transports.__all__) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 03004776aaa779..04155dd5c64bec 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -2,7 +2,7 @@ # license: PSFL. -__all__ = ["TaskGroup"] +__all__ = "TaskGroup", from . import events from . import exceptions @@ -26,9 +26,23 @@ class TaskGroup(taskscope.TaskScope): The exceptions are then combined and raised as an `ExceptionGroup`. """ def __init__(self): - super().__init__() + super().__init__(delegate_errors=None) self._errors = [] + def __repr__(self): + info = [''] + if self._tasks: + info.append(f'tasks={len(self._tasks)}') + if self._errors: + info.append(f'errors={len(self._errors)}') + if self._aborting: + info.append('cancelling') + elif self._entered: + info.append('entered') + + info_str = ' '.join(info) + return f'' + def create_task(self, coro, *, name=None, context=None): """Create a new task in this group and return it. diff --git a/Lib/asyncio/taskscope.py b/Lib/asyncio/taskscope.py index ce412d0b546e1e..dbfc039942c760 100644 --- a/Lib/asyncio/taskscope.py +++ b/Lib/asyncio/taskscope.py @@ -2,13 +2,16 @@ # license: PSFL. -__all__ = ["TaskScope"] +__all__ = "TaskScope", from . import events from . import exceptions from . import tasks +_default_error_handler = object() + + class TaskScope: """Asynchronous context manager for managing a scope of subtasks. @@ -22,10 +25,15 @@ class TaskScope: All tasks are awaited when the context manager exits. Any exceptions other than `asyncio.CancelledError` raised within - a task will be ignored (TODO: allow a custom exception handler?) - and it is the caller's responsibility to catch them in task callbacks. + a task will be handled differently depending on `delegate_errors`. + + If `delegate_errors` is not set, it will run + `loop.call_exception_handler()`. + If it is set `None`, it will silently ignore the exception. + If it is set as a callable function, it will invoke it using the same + context argument of `loop.call_exception_handler()`. """ - def __init__(self): + def __init__(self, delegate_errors=_default_error_handler): self._entered = False self._exiting = False self._aborting = False @@ -33,28 +41,29 @@ def __init__(self): self._parent_task = None self._parent_cancel_requested = False self._tasks = set() - self._errors = [] self._base_error = None self._on_completed_fut = None + self._delegate_errors = delegate_errors + self._has_errors = False def __repr__(self): info = [''] if self._tasks: info.append(f'tasks={len(self._tasks)}') - if self._errors: - info.append(f'errors={len(self._errors)}') if self._aborting: info.append('cancelling') elif self._entered: info.append('entered') info_str = ' '.join(info) - return f'<{type(self).__name__}{info_str}>' + return f'' async def __aenter__(self): if self._entered: raise RuntimeError( - f"TaskGroup {self!r} has been already entered") + f'{type(self).__name__} {self!r} ' + f'has been already entered' + ) self._entered = True if self._loop is None: @@ -63,7 +72,9 @@ async def __aenter__(self): self._parent_task = tasks.current_task(self._loop) if self._parent_task is None: raise RuntimeError( - f'TaskGroup {self!r} cannot determine the parent task') + f'{type(self).__name__} {self!r} ' + f'cannot determine the parent task' + ) return self @@ -132,20 +143,27 @@ async def __aexit__(self, et, exc, tb): # Propagate CancelledError if there is one, except if there # are other errors -- those have priority. - if propagate_cancellation_error and not self._errors: + if propagate_cancellation_error and not self._has_errors: raise propagate_cancellation_error + if et is not None and et is not exceptions.CancelledError: + self._has_errors = True + def create_task(self, coro, *, name=None, context=None): """Create a new task in this group and return it. Similar to `asyncio.create_task`. """ if not self._entered: - raise RuntimeError(f"TaskGroup {self!r} has not been entered") + raise RuntimeError( + f"{type(self).__name__} {self!r} has not been entered" + ) if self._exiting and not self._tasks: - raise RuntimeError(f"TaskGroup {self!r} is finished") + raise RuntimeError(f"{type(self).__name__} {self!r} is finished") if self._aborting: - raise RuntimeError(f"TaskGroup {self!r} is shutting down") + raise RuntimeError( + f"{type(self).__name__} {self!r} is shutting down" + ) if context is None: task = self._loop.create_task(coro) else: @@ -192,6 +210,25 @@ def _on_task_done(self, task): if exc is None: return + self._has_errors = True + match self._delegate_errors: + case None: + pass # deliberately set to ignore errors + case func if callable(func): + func({ + 'message': f'Task {task!r} has errored inside the parent ' + f'task {self._parent_task}', + 'exception': exc, + 'task': task, + }) + case default if default is _default_error_handler: + self._loop.call_exception_handler({ + 'message': f'Task {task!r} has errored inside the parent ' + f'task {self._parent_task}', + 'exception': exc, + 'task': task, + }) + is_base_error = self._is_base_error(exc) if is_base_error and self._base_error is None: self._base_error = exc diff --git a/Lib/test/test_asyncio/test_taskscope.py b/Lib/test/test_asyncio/test_taskscope.py new file mode 100644 index 00000000000000..3b77fabbd14acf --- /dev/null +++ b/Lib/test/test_asyncio/test_taskscope.py @@ -0,0 +1,496 @@ +# license: PSFL. + +import unittest +from unittest import mock + +import asyncio +from asyncio import taskscope +from test.test_asyncio import utils as test_utils + + +# To prevent a warning "test altered the execution environment" +def tearDownModule(): + asyncio.set_event_loop_policy(None) + + +class MyExc(Exception): + pass + + +class MyBaseExc(BaseException): + pass + + +def get_error_types(eg): + return {type(exc) for exc in eg.exceptions} + + +class TestTaskScope(unittest.IsolatedAsyncioTestCase): + + async def test_children_complete_on_child_error(self): + async def zero_division(): + 1 / 0 + + async def foo1(): + await asyncio.sleep(0.1) + return 42 + + async def foo2(): + await asyncio.sleep(0.2) + return 11 + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + async with taskscope.TaskScope() as g: + t1 = g.create_task(foo1()) + t2 = g.create_task(foo2()) + t3 = g.create_task(zero_division()) + + self.assertEqual(t1.result(), 42) + self.assertEqual(t2.result(), 11) + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': t3, + }) + + async def test_inner_complete_on_child_error(self): + async def zero_division(): + 1 / 0 + + async def foo1(): + await asyncio.sleep(0.1) + return 42 + + async def foo2(): + await asyncio.sleep(0.2) + return 11 + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + async with taskscope.TaskScope() as g: + t1 = g.create_task(foo1()) + t2 = g.create_task(zero_division()) + r1 = await foo2() + + self.assertEqual(t1.result(), 42) + self.assertEqual(r1, 11) + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': t2, + }) + + async def test_children_exceptions_propagate(self): + async def zero_division(): + 1 / 0 + + async def value_error(): + await asyncio.sleep(0.2) + raise ValueError + + async def foo1(): + await asyncio.sleep(0.4) + return 42 + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + async with taskscope.TaskScope() as g: + t1 = g.create_task(zero_division()) + t2 = g.create_task(value_error()) + t3 = g.create_task(foo1()) + + exc_handler.assert_has_calls( + [ + mock.call({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': t1, + }), + mock.call({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ValueError), + 'task': t2, + }), + ], + any_order=True, + ) + self.assertEqual(t3.result(), 42) + + async def test_children_cancel_on_inner_failure(self): + async def zero_division(): + 1 / 0 + + async def foo1(): + await asyncio.sleep(0.2) + return 42 + + with self.assertRaises(ZeroDivisionError): + async with taskscope.TaskScope() as g: + t1 = g.create_task(foo1()) + await zero_division() + + self.assertTrue(t1.cancelled()) + + async def test_cancellation_01(self): + + NUM = 0 + + async def foo(): + nonlocal NUM + try: + await asyncio.sleep(5) + except asyncio.CancelledError: + NUM += 1 + raise + + async def runner(): + async with taskscope.TaskScope() as g: + for _ in range(5): + g.create_task(foo()) + + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError) as cm: + await r + + self.assertEqual(NUM, 5) + + async def test_taskgroup_35(self): + + NUM = 0 + + async def foo(): + nonlocal NUM + try: + await asyncio.sleep(5) + except asyncio.CancelledError: + NUM += 1 + raise + + async def runner(): + nonlocal NUM + async with taskscope.TaskScope() as g: + for _ in range(5): + g.create_task(foo()) + + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + NUM += 10 + raise + + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + self.assertEqual(NUM, 15) + + async def test_taskgroup_36(self): + + async def foo(): + try: + await asyncio.sleep(10) + finally: + 1 / 0 + + async def runner(): + async with taskscope.TaskScope() as g: + for _ in range(5): + g.create_task(foo()) + + await asyncio.sleep(10) + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': mock.ANY, + }) + self.assertEqual(len(exc_handler.call_args_list), 5) + + async def test_taskgroup_37(self): + + async def foo(): + try: + await asyncio.sleep(10) + finally: + 1 / 0 + + async def runner(): + async with taskscope.TaskScope(): + async with taskscope.TaskScope() as g2: + for _ in range(5): + g2.create_task(foo()) + + await asyncio.sleep(10) + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': mock.ANY, + }) + self.assertEqual(len(exc_handler.call_args_list), 5) + + async def test_taskgroup_38(self): + + async def foo(): + try: + await asyncio.sleep(10) + finally: + 1 / 0 + + async def runner(): + async with taskscope.TaskScope() as g1: + g1.create_task(asyncio.sleep(10)) + + async with taskscope.TaskScope() as g2: + for _ in range(5): + g2.create_task(foo()) + + await asyncio.sleep(10) + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': mock.ANY, + }) + self.assertEqual(len(exc_handler.call_args_list), 5) + + async def test_taskgroup_39(self): + + async def crash_soon(): + await asyncio.sleep(0.3) + 1 / 0 + + async def runner(): + async with taskscope.TaskScope() as g1: + g1.create_task(crash_soon()) + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + await asyncio.sleep(0.5) + raise + + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(ExceptionGroup) as cm: + await r + self.assertEqual(get_error_types(cm.exception), {ZeroDivisionError}) + + async def test_taskgroup_40(self): + + async def crash_soon(): + await asyncio.sleep(0.3) + 1 / 0 + + async def nested_runner(): + async with taskscope.TaskScope() as g1: + g1.create_task(crash_soon()) + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + await asyncio.sleep(0.5) + raise + + async def runner(): + t = asyncio.create_task(nested_runner()) + await t + + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(ExceptionGroup) as cm: + await r + self.assertEqual(get_error_types(cm.exception), {ZeroDivisionError}) + + async def test_taskgroup_41(self): + + NUM = 0 + + async def runner(): + nonlocal NUM + async with taskscope.TaskScope(): + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + NUM += 10 + raise + + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + self.assertEqual(NUM, 10) + + async def test_taskgroup_42(self): + + NUM = 0 + + async def runner(): + nonlocal NUM + async with taskscope.TaskScope(): + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + NUM += 10 + # This isn't a good idea, but we have to support + # this weird case. + raise MyExc + + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + + try: + await r + except ExceptionGroup as t: + self.assertEqual(get_error_types(t),{MyExc}) + else: + self.fail('ExceptionGroup was not raised') + + self.assertEqual(NUM, 10) + + async def test_taskgroup_43(self): + + async def crash_soon(): + await asyncio.sleep(0.1) + 1 / 0 + + async def nested(): + try: + await asyncio.sleep(10) + finally: + raise MyExc + + async def runner(): + async with taskscope.TaskScope() as g: + g.create_task(crash_soon()) + await nested() + + r = asyncio.create_task(runner()) + try: + await r + except ExceptionGroup as t: + self.assertEqual(get_error_types(t), {MyExc, ZeroDivisionError}) + else: + self.fail('TasgGroupError was not raised') + + async def test_taskgroup_44(self): + + async def foo1(): + await asyncio.sleep(1) + return 42 + + async def foo2(): + await asyncio.sleep(2) + return 11 + + async def runner(): + async with taskscope.TaskScope() as g: + g.create_task(foo1()) + g.create_task(foo2()) + + r = asyncio.create_task(runner()) + await asyncio.sleep(0.05) + r.cancel() + + with self.assertRaises(asyncio.CancelledError): + await r + + async def test_taskgroup_45(self): + + NUM = 0 + + async def foo1(): + nonlocal NUM + await asyncio.sleep(0.2) + NUM += 1 + + async def foo2(): + nonlocal NUM + await asyncio.sleep(0.3) + NUM += 2 + + async def runner(): + async with taskscope.TaskScope() as g: + g.create_task(foo1()) + g.create_task(foo2()) + + r = asyncio.create_task(runner()) + await asyncio.sleep(0.05) + r.cancel() + + with self.assertRaises(asyncio.CancelledError): + await r + + self.assertEqual(NUM, 0) + + From 0756e63dcae5876028ed1dcf1e1eec4e40939259 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Sat, 27 May 2023 10:25:16 +0100 Subject: [PATCH 3/7] Follow the existing convention of the naming modules - asyncio's module names are plural. --- Lib/asyncio/__init__.py | 4 +- Lib/asyncio/taskgroups.py | 5 +-- Lib/asyncio/{taskscope.py => taskscopes.py} | 0 .../{test_taskscope.py => test_taskscopes.py} | 38 +++++++++---------- 4 files changed, 23 insertions(+), 24 deletions(-) rename Lib/asyncio/{taskscope.py => taskscopes.py} (100%) rename Lib/test/test_asyncio/{test_taskscope.py => test_taskscopes.py} (93%) diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index 321513f263bd92..4467ff9725a5fa 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -18,7 +18,7 @@ from .subprocess import * from .tasks import * from .taskgroups import * -from .taskscope import * +from .taskscopes import * from .timeouts import * from .threads import * from .transports import * @@ -36,7 +36,7 @@ subprocess.__all__ + tasks.__all__ + taskgroups.__all__ + - taskscope.__all__ + + taskscopes.__all__ + threads.__all__ + timeouts.__all__ + transports.__all__) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 04155dd5c64bec..299bd52a019ff8 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -4,12 +4,11 @@ __all__ = "TaskGroup", -from . import events from . import exceptions -from . import taskscope +from . import taskscopes -class TaskGroup(taskscope.TaskScope): +class TaskGroup(taskscopes.TaskScope): """Asynchronous context manager for managing groups of tasks. Example use: diff --git a/Lib/asyncio/taskscope.py b/Lib/asyncio/taskscopes.py similarity index 100% rename from Lib/asyncio/taskscope.py rename to Lib/asyncio/taskscopes.py diff --git a/Lib/test/test_asyncio/test_taskscope.py b/Lib/test/test_asyncio/test_taskscopes.py similarity index 93% rename from Lib/test/test_asyncio/test_taskscope.py rename to Lib/test/test_asyncio/test_taskscopes.py index 3b77fabbd14acf..8b77cfc47ded14 100644 --- a/Lib/test/test_asyncio/test_taskscope.py +++ b/Lib/test/test_asyncio/test_taskscopes.py @@ -4,7 +4,7 @@ from unittest import mock import asyncio -from asyncio import taskscope +from asyncio import taskscopes from test.test_asyncio import utils as test_utils @@ -42,7 +42,7 @@ async def foo2(): loop = asyncio.get_running_loop() exc_handler = mock.Mock() with mock.patch.object(loop, 'call_exception_handler', exc_handler): - async with taskscope.TaskScope() as g: + async with taskscopes.TaskScope() as g: t1 = g.create_task(foo1()) t2 = g.create_task(foo2()) t3 = g.create_task(zero_division()) @@ -72,7 +72,7 @@ async def foo2(): loop = asyncio.get_running_loop() exc_handler = mock.Mock() with mock.patch.object(loop, 'call_exception_handler', exc_handler): - async with taskscope.TaskScope() as g: + async with taskscopes.TaskScope() as g: t1 = g.create_task(foo1()) t2 = g.create_task(zero_division()) r1 = await foo2() @@ -102,7 +102,7 @@ async def foo1(): loop = asyncio.get_running_loop() exc_handler = mock.Mock() with mock.patch.object(loop, 'call_exception_handler', exc_handler): - async with taskscope.TaskScope() as g: + async with taskscopes.TaskScope() as g: t1 = g.create_task(zero_division()) t2 = g.create_task(value_error()) t3 = g.create_task(foo1()) @@ -137,7 +137,7 @@ async def foo1(): return 42 with self.assertRaises(ZeroDivisionError): - async with taskscope.TaskScope() as g: + async with taskscopes.TaskScope() as g: t1 = g.create_task(foo1()) await zero_division() @@ -156,7 +156,7 @@ async def foo(): raise async def runner(): - async with taskscope.TaskScope() as g: + async with taskscopes.TaskScope() as g: for _ in range(5): g.create_task(foo()) @@ -184,7 +184,7 @@ async def foo(): async def runner(): nonlocal NUM - async with taskscope.TaskScope() as g: + async with taskscopes.TaskScope() as g: for _ in range(5): g.create_task(foo()) @@ -213,7 +213,7 @@ async def foo(): 1 / 0 async def runner(): - async with taskscope.TaskScope() as g: + async with taskscopes.TaskScope() as g: for _ in range(5): g.create_task(foo()) @@ -248,8 +248,8 @@ async def foo(): 1 / 0 async def runner(): - async with taskscope.TaskScope(): - async with taskscope.TaskScope() as g2: + async with taskscopes.TaskScope(): + async with taskscopes.TaskScope() as g2: for _ in range(5): g2.create_task(foo()) @@ -284,10 +284,10 @@ async def foo(): 1 / 0 async def runner(): - async with taskscope.TaskScope() as g1: + async with taskscopes.TaskScope() as g1: g1.create_task(asyncio.sleep(10)) - async with taskscope.TaskScope() as g2: + async with taskscopes.TaskScope() as g2: for _ in range(5): g2.create_task(foo()) @@ -320,7 +320,7 @@ async def crash_soon(): 1 / 0 async def runner(): - async with taskscope.TaskScope() as g1: + async with taskscopes.TaskScope() as g1: g1.create_task(crash_soon()) try: await asyncio.sleep(10) @@ -344,7 +344,7 @@ async def crash_soon(): 1 / 0 async def nested_runner(): - async with taskscope.TaskScope() as g1: + async with taskscopes.TaskScope() as g1: g1.create_task(crash_soon()) try: await asyncio.sleep(10) @@ -371,7 +371,7 @@ async def test_taskgroup_41(self): async def runner(): nonlocal NUM - async with taskscope.TaskScope(): + async with taskscopes.TaskScope(): try: await asyncio.sleep(10) except asyncio.CancelledError: @@ -394,7 +394,7 @@ async def test_taskgroup_42(self): async def runner(): nonlocal NUM - async with taskscope.TaskScope(): + async with taskscopes.TaskScope(): try: await asyncio.sleep(10) except asyncio.CancelledError: @@ -431,7 +431,7 @@ async def nested(): raise MyExc async def runner(): - async with taskscope.TaskScope() as g: + async with taskscopes.TaskScope() as g: g.create_task(crash_soon()) await nested() @@ -454,7 +454,7 @@ async def foo2(): return 11 async def runner(): - async with taskscope.TaskScope() as g: + async with taskscopes.TaskScope() as g: g.create_task(foo1()) g.create_task(foo2()) @@ -480,7 +480,7 @@ async def foo2(): NUM += 2 async def runner(): - async with taskscope.TaskScope() as g: + async with taskscopes.TaskScope() as g: g.create_task(foo1()) g.create_task(foo2()) From 62bb5f377464d4bba7d28649b6604d4a9c339c4c Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Sat, 27 May 2023 15:02:28 +0100 Subject: [PATCH 4/7] Update the rest of taskscope test cases --- Lib/test/test_asyncio/test_taskscopes.py | 85 +++++++++++++++--------- 1 file changed, 52 insertions(+), 33 deletions(-) diff --git a/Lib/test/test_asyncio/test_taskscopes.py b/Lib/test/test_asyncio/test_taskscopes.py index 8b77cfc47ded14..1d8c114771079a 100644 --- a/Lib/test/test_asyncio/test_taskscopes.py +++ b/Lib/test/test_asyncio/test_taskscopes.py @@ -21,10 +21,6 @@ class MyBaseExc(BaseException): pass -def get_error_types(eg): - return {type(exc) for exc in eg.exceptions} - - class TestTaskScope(unittest.IsolatedAsyncioTestCase): async def test_children_complete_on_child_error(self): @@ -328,14 +324,24 @@ async def runner(): await asyncio.sleep(0.5) raise - r = asyncio.create_task(runner()) - await asyncio.sleep(0.1) + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) - self.assertFalse(r.done()) - r.cancel() - with self.assertRaises(ExceptionGroup) as cm: - await r - self.assertEqual(get_error_types(cm.exception), {ZeroDivisionError}) + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': mock.ANY, + }) async def test_taskgroup_40(self): @@ -356,14 +362,24 @@ async def runner(): t = asyncio.create_task(nested_runner()) await t - r = asyncio.create_task(runner()) - await asyncio.sleep(0.1) + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) - self.assertFalse(r.done()) - r.cancel() - with self.assertRaises(ExceptionGroup) as cm: - await r - self.assertEqual(get_error_types(cm.exception), {ZeroDivisionError}) + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': mock.ANY, + }) async def test_taskgroup_41(self): @@ -409,16 +425,13 @@ async def runner(): self.assertFalse(r.done()) r.cancel() - try: + with self.assertRaises(MyExc): await r - except ExceptionGroup as t: - self.assertEqual(get_error_types(t),{MyExc}) - else: - self.fail('ExceptionGroup was not raised') self.assertEqual(NUM, 10) async def test_taskgroup_43(self): + t1 = None async def crash_soon(): await asyncio.sleep(0.1) @@ -431,17 +444,25 @@ async def nested(): raise MyExc async def runner(): + nonlocal t1 async with taskscopes.TaskScope() as g: - g.create_task(crash_soon()) + t1 = g.create_task(crash_soon()) await nested() - r = asyncio.create_task(runner()) - try: - await r - except ExceptionGroup as t: - self.assertEqual(get_error_types(t), {MyExc, ZeroDivisionError}) - else: - self.fail('TasgGroupError was not raised') + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + r = asyncio.create_task(runner()) + with self.assertRaises(MyExc): + await r + + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': t1, + }) async def test_taskgroup_44(self): @@ -492,5 +513,3 @@ async def runner(): await r self.assertEqual(NUM, 0) - - From febf8405ad9022c37ccd503d8287f384fa31f539 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Sat, 27 May 2023 15:12:44 +0100 Subject: [PATCH 5/7] Add delegate_error test cases --- Lib/test/test_asyncio/test_taskscopes.py | 56 ++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/Lib/test/test_asyncio/test_taskscopes.py b/Lib/test/test_asyncio/test_taskscopes.py index 1d8c114771079a..b74edfcb58025f 100644 --- a/Lib/test/test_asyncio/test_taskscopes.py +++ b/Lib/test/test_asyncio/test_taskscopes.py @@ -166,6 +166,62 @@ async def runner(): self.assertEqual(NUM, 5) + async def test_delegate_error_ignore(self): + async def zero_division(): + 1 / 0 + + async def value_error(): + await asyncio.sleep(0.2) + raise ValueError + + async def foo1(): + await asyncio.sleep(0.4) + return 42 + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + async with taskscopes.TaskScope(delegate_errors=None) as g: + g.create_task(zero_division()) + g.create_task(value_error()) + g.create_task(foo1()) + + exc_handler.assert_not_called() + + async def test_delegate_error_custom(self): + async def zero_division(): + 1 / 0 + + async def value_error(): + await asyncio.sleep(0.2) + raise ValueError + + async def foo1(): + await asyncio.sleep(0.4) + return 42 + + catched_errors = [] + + def catch_error(context): + nonlocal catched_errors + catched_errors.append(context) + + async with taskscopes.TaskScope(delegate_errors=catch_error) as g: + t1 = g.create_task(zero_division()) + t2 = g.create_task(value_error()) + g.create_task(foo1()) + + match_count = 0 + for item in catched_errors: + match item["exception"]: + case ZeroDivisionError(): + self.assertIs(item["task"], t1) + match_count += 1 + case ValueError(): + self.assertIs(item["task"], t2) + match_count += 10 + self.assertEqual(match_count, 11) + async def test_taskgroup_35(self): NUM = 0 From ac1c4d7fbcf3b4acfb00525cf221b4d693307f95 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sun, 28 May 2023 05:06:26 +0000 Subject: [PATCH 6/7] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2023-05-28-05-06-23.gh-issue-101581.KKTDqD.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2023-05-28-05-06-23.gh-issue-101581.KKTDqD.rst diff --git a/Misc/NEWS.d/next/Library/2023-05-28-05-06-23.gh-issue-101581.KKTDqD.rst b/Misc/NEWS.d/next/Library/2023-05-28-05-06-23.gh-issue-101581.KKTDqD.rst new file mode 100644 index 00000000000000..a042ed8899bd2f --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-05-28-05-06-23.gh-issue-101581.KKTDqD.rst @@ -0,0 +1 @@ +Add :class`asyncio.TaskScope` as a task cancellation scope primitive and now :class:`asyncio.TaskGroup` is an extension of it, allowing easier writing of safe coroutine lifecycle managers in 3rd party codes. Contribution by Andrea Tedeschi and Joongi Kim. From 84ef0064fbce2cd66f3d322faa92e7595cacb794 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Sun, 28 May 2023 14:07:14 +0900 Subject: [PATCH 7/7] Fix typo in news fragment --- .../next/Library/2023-05-28-05-06-23.gh-issue-101581.KKTDqD.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2023-05-28-05-06-23.gh-issue-101581.KKTDqD.rst b/Misc/NEWS.d/next/Library/2023-05-28-05-06-23.gh-issue-101581.KKTDqD.rst index a042ed8899bd2f..e16869a6c3c407 100644 --- a/Misc/NEWS.d/next/Library/2023-05-28-05-06-23.gh-issue-101581.KKTDqD.rst +++ b/Misc/NEWS.d/next/Library/2023-05-28-05-06-23.gh-issue-101581.KKTDqD.rst @@ -1 +1 @@ -Add :class`asyncio.TaskScope` as a task cancellation scope primitive and now :class:`asyncio.TaskGroup` is an extension of it, allowing easier writing of safe coroutine lifecycle managers in 3rd party codes. Contribution by Andrea Tedeschi and Joongi Kim. +Add :class:`asyncio.TaskScope` as a task cancellation scope primitive and now :class:`asyncio.TaskGroup` is an extension of it, allowing easier writing of safe coroutine lifecycle managers in 3rd party codes. Contribution by Andrea Tedeschi and Joongi Kim.