summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2017-12-13 00:41:58 +0200
committerAlex Grönholm <alex.gronholm@nextday.fi>2017-12-13 00:41:58 +0200
commit7cad405ac074d13e2615c2e7aa65af5d243dfd1f (patch)
tree0d8bb03118dcf7862b34ae5326770cca4788a767
parent488cd5cfd74bcfa71b7b493e14f30af90ee2156e (diff)
downloadapscheduler-7cad405ac074d13e2615c2e7aa65af5d243dfd1f.tar.gz
Cancel all pending futures on AsyncIOExecutor shutdown
Fixes #233.
-rw-r--r--apscheduler/executors/asyncio.py11
-rw-r--r--docs/versionhistory.rst2
-rw-r--r--tests/test_executors_py35.py16
3 files changed, 29 insertions, 0 deletions
diff --git a/apscheduler/executors/asyncio.py b/apscheduler/executors/asyncio.py
index dd16ee7..5139622 100644
--- a/apscheduler/executors/asyncio.py
+++ b/apscheduler/executors/asyncio.py
@@ -26,9 +26,19 @@ class AsyncIOExecutor(BaseExecutor):
def start(self, scheduler, alias):
super(AsyncIOExecutor, self).start(scheduler, alias)
self._eventloop = scheduler._eventloop
+ self._pending_futures = set()
+
+ def shutdown(self, wait=True):
+ # There is no way to honor wait=True without converting this method into a coroutine method
+ for f in self._pending_futures:
+ if not f.done():
+ f.cancel()
+
+ self._pending_futures.clear()
def _do_submit_job(self, job, run_times):
def callback(f):
+ self._pending_futures.discard(f)
try:
events = f.result()
except BaseException:
@@ -47,3 +57,4 @@ class AsyncIOExecutor(BaseExecutor):
self._logger.name)
f.add_done_callback(callback)
+ self._pending_futures.add(f)
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst
index 9bcf89e..9736736 100644
--- a/docs/versionhistory.rst
+++ b/docs/versionhistory.rst
@@ -24,6 +24,8 @@ APScheduler, see the :doc:`migration section <migration>`.
* Fixed passing ``wait=True`` to ``AsyncIOScheduler.shutdown()`` (although it doesn't do much)
+* Cancel all pending futures when ``AsyncIOExecutor`` is shut down
+
3.4.0
-----
diff --git a/tests/test_executors_py35.py b/tests/test_executors_py35.py
index cc25d88..7849eb0 100644
--- a/tests/test_executors_py35.py
+++ b/tests/test_executors_py35.py
@@ -1,4 +1,5 @@
"""Contains test functions using Python 3.3+ syntax."""
+from asyncio import CancelledError
from datetime import datetime
import pytest
@@ -84,3 +85,18 @@ async def test_run_coroutine_job_tornado(tornado_scheduler, tornado_executor, ex
assert str(events[0].exception) == 'dummy error'
else:
assert events[0].retval is True
+
+
+@pytest.mark.asyncio
+async def test_asyncio_executor_shutdown(asyncio_scheduler, asyncio_executor):
+ """Test that the AsyncIO executor cancels its pending tasks on shutdown."""
+ from asyncio import sleep
+
+ job = asyncio_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, None])
+ asyncio_executor.submit_job(job, [datetime.now(utc)])
+ futures = asyncio_executor._pending_futures.copy()
+ assert len(futures) == 1
+
+ asyncio_executor.shutdown()
+ with pytest.raises(CancelledError):
+ await futures.pop()