From 7cad405ac074d13e2615c2e7aa65af5d243dfd1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 13 Dec 2017 00:41:58 +0200 Subject: Cancel all pending futures on AsyncIOExecutor shutdown Fixes #233. --- apscheduler/executors/asyncio.py | 11 +++++++++++ docs/versionhistory.rst | 2 ++ tests/test_executors_py35.py | 16 ++++++++++++++++ 3 files changed, 29 insertions(+) diff --git a/apscheduler/executors/asyncio.py b/apscheduler/executors/asyncio.py index dd16ee7..5139622 100644 --- a/apscheduler/executors/asyncio.py +++ b/apscheduler/executors/asyncio.py @@ -26,9 +26,19 @@ class AsyncIOExecutor(BaseExecutor): def start(self, scheduler, alias): super(AsyncIOExecutor, self).start(scheduler, alias) self._eventloop = scheduler._eventloop + self._pending_futures = set() + + def shutdown(self, wait=True): + # There is no way to honor wait=True without converting this method into a coroutine method + for f in self._pending_futures: + if not f.done(): + f.cancel() + + self._pending_futures.clear() def _do_submit_job(self, job, run_times): def callback(f): + self._pending_futures.discard(f) try: events = f.result() except BaseException: @@ -47,3 +57,4 @@ class AsyncIOExecutor(BaseExecutor): self._logger.name) f.add_done_callback(callback) + self._pending_futures.add(f) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 9bcf89e..9736736 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -24,6 +24,8 @@ APScheduler, see the :doc:`migration section `. * Fixed passing ``wait=True`` to ``AsyncIOScheduler.shutdown()`` (although it doesn't do much) +* Cancel all pending futures when ``AsyncIOExecutor`` is shut down + 3.4.0 ----- diff --git a/tests/test_executors_py35.py b/tests/test_executors_py35.py index cc25d88..7849eb0 100644 --- a/tests/test_executors_py35.py +++ b/tests/test_executors_py35.py @@ -1,4 +1,5 @@ """Contains test functions using Python 3.3+ syntax.""" +from asyncio import CancelledError from datetime import datetime import pytest @@ -84,3 +85,18 @@ async def test_run_coroutine_job_tornado(tornado_scheduler, tornado_executor, ex assert str(events[0].exception) == 'dummy error' else: assert events[0].retval is True + + +@pytest.mark.asyncio +async def test_asyncio_executor_shutdown(asyncio_scheduler, asyncio_executor): + """Test that the AsyncIO executor cancels its pending tasks on shutdown.""" + from asyncio import sleep + + job = asyncio_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, None]) + asyncio_executor.submit_job(job, [datetime.now(utc)]) + futures = asyncio_executor._pending_futures.copy() + assert len(futures) == 1 + + asyncio_executor.shutdown() + with pytest.raises(CancelledError): + await futures.pop() -- cgit v1.2.1