diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2016-09-15 15:10:08 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2016-09-15 15:10:47 +0300 |
commit | d4bce351a4dc15a5eb4be1887999426d005ba2ac (patch) | |
tree | 2e3d107852e44aa0b10e4d99d423ffd56d9c5dcd | |
parent | 9fa8a833bbc5ed75cbc9f5e3e4aa14ba2279b5dc (diff) | |
download | apscheduler-d4bce351a4dc15a5eb4be1887999426d005ba2ac.tar.gz |
Added coroutine support for asyncio and Tornado executors (fixes #96, #98)
-rw-r--r-- | apscheduler/executors/asyncio.py | 24 | ||||
-rw-r--r-- | apscheduler/executors/base_py3.py | 41 | ||||
-rw-r--r-- | apscheduler/executors/tornado.py | 54 | ||||
-rw-r--r-- | apscheduler/schedulers/asyncio.py | 6 | ||||
-rw-r--r-- | apscheduler/schedulers/tornado.py | 6 | ||||
-rw-r--r-- | docs/versionhistory.rst | 7 | ||||
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | tests/conftest.py | 4 | ||||
-rw-r--r-- | tests/test_executors_py35.py | 86 | ||||
-rw-r--r-- | tox.ini | 4 |
10 files changed, 229 insertions, 5 deletions
diff --git a/apscheduler/executors/asyncio.py b/apscheduler/executors/asyncio.py index fb26e7e..4479429 100644 --- a/apscheduler/executors/asyncio.py +++ b/apscheduler/executors/asyncio.py @@ -1,13 +1,25 @@ from __future__ import absolute_import + import sys from apscheduler.executors.base import BaseExecutor, run_job +try: + from asyncio import iscoroutinefunction + from apscheduler.executors.base_py3 import run_coroutine_job +except ImportError: + from trollius import iscoroutinefunction + run_coroutine_job = None + class AsyncIOExecutor(BaseExecutor): """ Runs jobs in the default executor of the event loop. + If the job function is a native coroutine function, it is scheduled to be run directly in the + event loop as soon as possible. All other functions are run in the event loop's default + executor which is usually a thread pool. + Plugin alias: ``asyncio`` """ @@ -24,6 +36,14 @@ class AsyncIOExecutor(BaseExecutor): else: self._run_job_success(job.id, events) - f = self._eventloop.run_in_executor(None, run_job, job, job._jobstore_alias, run_times, - self._logger.name) + if iscoroutinefunction(job.func): + if run_coroutine_job is not None: + coro = run_coroutine_job(job, job._jobstore_alias, run_times, self._logger.name) + f = self._eventloop.create_task(coro) + else: + raise Exception('Executing coroutine based jobs is not supported with Trollius') + else: + f = self._eventloop.run_in_executor(None, run_job, job, job._jobstore_alias, run_times, + self._logger.name) + f.add_done_callback(callback) diff --git a/apscheduler/executors/base_py3.py b/apscheduler/executors/base_py3.py new file mode 100644 index 0000000..4712425 --- /dev/null +++ b/apscheduler/executors/base_py3.py @@ -0,0 +1,41 @@ +import logging +import sys +from datetime import datetime, timedelta +from traceback import format_tb + +from pytz import utc + +from apscheduler.events import ( + JobExecutionEvent, EVENT_JOB_MISSED, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED) + + +async def run_coroutine_job(job, jobstore_alias, run_times, logger_name): + """Coroutine version of run_job().""" + events = [] + logger = logging.getLogger(logger_name) + for run_time in run_times: + # See if the job missed its run time window, and handle possible misfires accordingly + if job.misfire_grace_time is not None: + difference = datetime.now(utc) - run_time + grace_time = timedelta(seconds=job.misfire_grace_time) + if difference > grace_time: + events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias, + run_time)) + logger.warning('Run time of job "%s" was missed by %s', job, difference) + continue + + logger.info('Running job "%s" (scheduled at %s)', job, run_time) + try: + retval = await job.func(*job.args, **job.kwargs) + except: + exc, tb = sys.exc_info()[1:] + formatted_tb = ''.join(format_tb(tb)) + events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time, + exception=exc, traceback=formatted_tb)) + logger.exception('Job "%s" raised an exception', job) + else: + events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time, + retval=retval)) + logger.info('Job "%s" executed successfully', job) + + return events diff --git a/apscheduler/executors/tornado.py b/apscheduler/executors/tornado.py new file mode 100644 index 0000000..bcba8c1 --- /dev/null +++ b/apscheduler/executors/tornado.py @@ -0,0 +1,54 @@ +from __future__ import absolute_import + +import sys +from concurrent.futures import ThreadPoolExecutor + +from tornado.gen import convert_yielded + +from apscheduler.executors.base import BaseExecutor, run_job + +try: + from inspect import iscoroutinefunction + from apscheduler.executors.base_py3 import run_coroutine_job +except ImportError: + def iscoroutinefunction(func): + return False + + +class TornadoExecutor(BaseExecutor): + """ + Runs jobs either in a thread pool or directly on the I/O loop. + + If the job function is a native coroutine function, it is scheduled to be run directly in the + I/O loop as soon as possible. All other functions are run in a thread pool. + + Plugin alias: ``tornado`` + + :param int max_workers: maximum number of worker threads in the thread pool + """ + + def __init__(self, max_workers=10): + super().__init__() + self.executor = ThreadPoolExecutor(max_workers) + + def start(self, scheduler, alias): + super(TornadoExecutor, self).start(scheduler, alias) + self._ioloop = scheduler._ioloop + + def _do_submit_job(self, job, run_times): + def callback(f): + try: + events = f.result() + except: + self._run_job_error(job.id, *sys.exc_info()[1:]) + else: + self._run_job_success(job.id, events) + + if iscoroutinefunction(job.func): + f = run_coroutine_job(job, job._jobstore_alias, run_times, self._logger.name) + else: + f = self.executor.submit(run_job, job, job._jobstore_alias, run_times, + self._logger.name) + + f = convert_yielded(f) + f.add_done_callback(callback) diff --git a/apscheduler/schedulers/asyncio.py b/apscheduler/schedulers/asyncio.py index 2929bd6..a272b1a 100644 --- a/apscheduler/schedulers/asyncio.py +++ b/apscheduler/schedulers/asyncio.py @@ -16,8 +16,8 @@ except ImportError: # pragma: nocover def run_in_event_loop(func): @wraps(func) - def wrapper(self, *args, **kwargs): - self._eventloop.call_soon_threadsafe(func, self, *args, **kwargs) + def wrapper(self, *args): + self._eventloop.call_soon_threadsafe(func, self, *args) return wrapper @@ -25,6 +25,8 @@ class AsyncIOScheduler(BaseScheduler): """ A scheduler that runs on an asyncio (:pep:`3156`) event loop. + The default executor can run jobs based on native coroutines (``async def``). + Extra options: ============== ============================================================= diff --git a/apscheduler/schedulers/tornado.py b/apscheduler/schedulers/tornado.py index 29b2750..0a9171f 100644 --- a/apscheduler/schedulers/tornado.py +++ b/apscheduler/schedulers/tornado.py @@ -23,6 +23,8 @@ class TornadoScheduler(BaseScheduler): """ A scheduler that runs on a Tornado IOLoop. + The default executor can run jobs based on native coroutines (``async def``). + =========== =============================================================== ``io_loop`` Tornado IOLoop instance to use (defaults to the global IO loop) =========== =============================================================== @@ -50,6 +52,10 @@ class TornadoScheduler(BaseScheduler): self._ioloop.remove_timeout(self._timeout) del self._timeout + def _create_default_executor(self): + from apscheduler.executors.tornado import TornadoExecutor + return TornadoExecutor() + @run_in_ioloop def wakeup(self): self._stop_timer() diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 4395116..842495d 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -7,8 +7,15 @@ APScheduler, see the :doc:`migration section <migration>`. 3.3.0 ----- +* The asyncio and Tornado schedulers can now run jobs targeting coroutine functions + (requires Python 3.5; only native coroutines (``async def``) are supported) + +* The Tornado scheduler now uses TornadoExecutor as its default executor (see above as for why) + * Added ZooKeeper job store (thanks to Jose Ignacio Villar for the patch) +* Added the ability to execute coroutine functions with asyncio/trollius and Twisted + * Improved import logic in ``ref_to_obj()`` to avoid errors in cases where traversing the path with ``getattr()`` would not work (thanks to Jarek Glowacki for the patch) @@ -52,6 +52,7 @@ setup( 'mongodb': ['pymongo >= 2.8'], 'rethinkdb': ['rethinkdb'], 'redis': ['redis'], + 'tornado': ['tornado >= 4.3'], 'zookeeper': ['kazoo'] }, zip_safe=False, @@ -67,6 +68,7 @@ setup( 'processpool = apscheduler.executors.pool:ProcessPoolExecutor', 'asyncio = apscheduler.executors.asyncio:AsyncIOExecutor [asyncio]', 'gevent = apscheduler.executors.gevent:GeventExecutor [gevent]', + 'tornado = apscheduler.executors.tornado:TornadoExecutor [tornado]', 'twisted = apscheduler.executors.twisted:TwistedExecutor [twisted]' ], 'apscheduler.jobstores': [ diff --git a/tests/conftest.py b/tests/conftest.py index 8d32424..19fba99 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,6 +15,10 @@ except ImportError: from mock import Mock +def pytest_ignore_collect(path, config): + return path.basename.endswith('_py35.py') and sys.version_info < (3, 5) + + def minpython(*version): version_str = '.'.join([str(num) for num in version]) diff --git a/tests/test_executors_py35.py b/tests/test_executors_py35.py new file mode 100644 index 0000000..cc25d88 --- /dev/null +++ b/tests/test_executors_py35.py @@ -0,0 +1,86 @@ +"""Contains test functions using Python 3.3+ syntax.""" +from datetime import datetime + +import pytest +from apscheduler.executors.asyncio import AsyncIOExecutor +from apscheduler.executors.tornado import TornadoExecutor +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.schedulers.tornado import TornadoScheduler +from pytz import utc + + +@pytest.fixture +def asyncio_scheduler(event_loop): + scheduler = AsyncIOScheduler(event_loop=event_loop) + scheduler.start(paused=True) + yield scheduler + scheduler.shutdown(False) + + +@pytest.fixture +def asyncio_executor(asyncio_scheduler): + executor = AsyncIOExecutor() + executor.start(asyncio_scheduler, 'default') + yield executor + executor.shutdown() + + +@pytest.fixture +def tornado_scheduler(io_loop): + scheduler = TornadoScheduler(io_loop=io_loop) + scheduler.start(paused=True) + yield scheduler + scheduler.shutdown(False) + + +@pytest.fixture +def tornado_executor(tornado_scheduler): + executor = TornadoExecutor() + executor.start(tornado_scheduler, 'default') + yield executor + executor.shutdown() + + +async def waiter(sleep, exception): + await sleep(0.1) + if exception: + raise Exception('dummy error') + else: + return True + + +@pytest.mark.parametrize('exception', [False, True]) +@pytest.mark.asyncio +async def test_run_coroutine_job(asyncio_scheduler, asyncio_executor, exception): + from asyncio import Future, sleep + + future = Future() + job = asyncio_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, exception]) + asyncio_executor._run_job_success = lambda job_id, events: future.set_result(events) + asyncio_executor._run_job_error = lambda job_id, exc, tb: future.set_exception(exc) + asyncio_executor.submit_job(job, [datetime.now(utc)]) + events = await future + assert len(events) == 1 + if exception: + assert str(events[0].exception) == 'dummy error' + else: + assert events[0].retval is True + + +@pytest.mark.parametrize('exception', [False, True]) +@pytest.mark.gen_test +async def test_run_coroutine_job_tornado(tornado_scheduler, tornado_executor, exception): + from tornado.concurrent import Future + from tornado.gen import sleep + + future = Future() + job = tornado_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, exception]) + tornado_executor._run_job_success = lambda job_id, events: future.set_result(events) + tornado_executor._run_job_error = lambda job_id, exc, tb: future.set_exception(exc) + tornado_executor.submit_job(job, [datetime.now(utc)]) + events = await future + assert len(events) == 1 + if exception: + assert str(events[0].exception) == 'dummy error' + else: + assert events[0].retval is True @@ -10,10 +10,11 @@ skip_missing_interpreters = true pypy = pypy [testenv] -commands = py.test {posargs} +commands = pytest {posargs} deps = pytest pytest-cov pytest-catchlog + pytest-tornado sqlalchemy pymongo kazoo @@ -25,6 +26,7 @@ deps = pytest {py27,pypy}: mock {py27,pypy}: trollius {py33}: asyncio + {py33,py34,py35}: pytest-asyncio [testenv:flake8] deps = flake8 |