summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2016-09-15 15:10:08 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2016-09-15 15:10:47 +0300
commitd4bce351a4dc15a5eb4be1887999426d005ba2ac (patch)
tree2e3d107852e44aa0b10e4d99d423ffd56d9c5dcd
parent9fa8a833bbc5ed75cbc9f5e3e4aa14ba2279b5dc (diff)
downloadapscheduler-d4bce351a4dc15a5eb4be1887999426d005ba2ac.tar.gz
Added coroutine support for asyncio and Tornado executors (fixes #96, #98)
-rw-r--r--apscheduler/executors/asyncio.py24
-rw-r--r--apscheduler/executors/base_py3.py41
-rw-r--r--apscheduler/executors/tornado.py54
-rw-r--r--apscheduler/schedulers/asyncio.py6
-rw-r--r--apscheduler/schedulers/tornado.py6
-rw-r--r--docs/versionhistory.rst7
-rw-r--r--setup.py2
-rw-r--r--tests/conftest.py4
-rw-r--r--tests/test_executors_py35.py86
-rw-r--r--tox.ini4
10 files changed, 229 insertions, 5 deletions
diff --git a/apscheduler/executors/asyncio.py b/apscheduler/executors/asyncio.py
index fb26e7e..4479429 100644
--- a/apscheduler/executors/asyncio.py
+++ b/apscheduler/executors/asyncio.py
@@ -1,13 +1,25 @@
from __future__ import absolute_import
+
import sys
from apscheduler.executors.base import BaseExecutor, run_job
+try:
+ from asyncio import iscoroutinefunction
+ from apscheduler.executors.base_py3 import run_coroutine_job
+except ImportError:
+ from trollius import iscoroutinefunction
+ run_coroutine_job = None
+
class AsyncIOExecutor(BaseExecutor):
"""
Runs jobs in the default executor of the event loop.
+ If the job function is a native coroutine function, it is scheduled to be run directly in the
+ event loop as soon as possible. All other functions are run in the event loop's default
+ executor which is usually a thread pool.
+
Plugin alias: ``asyncio``
"""
@@ -24,6 +36,14 @@ class AsyncIOExecutor(BaseExecutor):
else:
self._run_job_success(job.id, events)
- f = self._eventloop.run_in_executor(None, run_job, job, job._jobstore_alias, run_times,
- self._logger.name)
+ if iscoroutinefunction(job.func):
+ if run_coroutine_job is not None:
+ coro = run_coroutine_job(job, job._jobstore_alias, run_times, self._logger.name)
+ f = self._eventloop.create_task(coro)
+ else:
+ raise Exception('Executing coroutine based jobs is not supported with Trollius')
+ else:
+ f = self._eventloop.run_in_executor(None, run_job, job, job._jobstore_alias, run_times,
+ self._logger.name)
+
f.add_done_callback(callback)
diff --git a/apscheduler/executors/base_py3.py b/apscheduler/executors/base_py3.py
new file mode 100644
index 0000000..4712425
--- /dev/null
+++ b/apscheduler/executors/base_py3.py
@@ -0,0 +1,41 @@
+import logging
+import sys
+from datetime import datetime, timedelta
+from traceback import format_tb
+
+from pytz import utc
+
+from apscheduler.events import (
+ JobExecutionEvent, EVENT_JOB_MISSED, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED)
+
+
+async def run_coroutine_job(job, jobstore_alias, run_times, logger_name):
+ """Coroutine version of run_job()."""
+ events = []
+ logger = logging.getLogger(logger_name)
+ for run_time in run_times:
+ # See if the job missed its run time window, and handle possible misfires accordingly
+ if job.misfire_grace_time is not None:
+ difference = datetime.now(utc) - run_time
+ grace_time = timedelta(seconds=job.misfire_grace_time)
+ if difference > grace_time:
+ events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias,
+ run_time))
+ logger.warning('Run time of job "%s" was missed by %s', job, difference)
+ continue
+
+ logger.info('Running job "%s" (scheduled at %s)', job, run_time)
+ try:
+ retval = await job.func(*job.args, **job.kwargs)
+ except:
+ exc, tb = sys.exc_info()[1:]
+ formatted_tb = ''.join(format_tb(tb))
+ events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,
+ exception=exc, traceback=formatted_tb))
+ logger.exception('Job "%s" raised an exception', job)
+ else:
+ events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,
+ retval=retval))
+ logger.info('Job "%s" executed successfully', job)
+
+ return events
diff --git a/apscheduler/executors/tornado.py b/apscheduler/executors/tornado.py
new file mode 100644
index 0000000..bcba8c1
--- /dev/null
+++ b/apscheduler/executors/tornado.py
@@ -0,0 +1,54 @@
+from __future__ import absolute_import
+
+import sys
+from concurrent.futures import ThreadPoolExecutor
+
+from tornado.gen import convert_yielded
+
+from apscheduler.executors.base import BaseExecutor, run_job
+
+try:
+ from inspect import iscoroutinefunction
+ from apscheduler.executors.base_py3 import run_coroutine_job
+except ImportError:
+ def iscoroutinefunction(func):
+ return False
+
+
+class TornadoExecutor(BaseExecutor):
+ """
+ Runs jobs either in a thread pool or directly on the I/O loop.
+
+ If the job function is a native coroutine function, it is scheduled to be run directly in the
+ I/O loop as soon as possible. All other functions are run in a thread pool.
+
+ Plugin alias: ``tornado``
+
+ :param int max_workers: maximum number of worker threads in the thread pool
+ """
+
+ def __init__(self, max_workers=10):
+ super().__init__()
+ self.executor = ThreadPoolExecutor(max_workers)
+
+ def start(self, scheduler, alias):
+ super(TornadoExecutor, self).start(scheduler, alias)
+ self._ioloop = scheduler._ioloop
+
+ def _do_submit_job(self, job, run_times):
+ def callback(f):
+ try:
+ events = f.result()
+ except:
+ self._run_job_error(job.id, *sys.exc_info()[1:])
+ else:
+ self._run_job_success(job.id, events)
+
+ if iscoroutinefunction(job.func):
+ f = run_coroutine_job(job, job._jobstore_alias, run_times, self._logger.name)
+ else:
+ f = self.executor.submit(run_job, job, job._jobstore_alias, run_times,
+ self._logger.name)
+
+ f = convert_yielded(f)
+ f.add_done_callback(callback)
diff --git a/apscheduler/schedulers/asyncio.py b/apscheduler/schedulers/asyncio.py
index 2929bd6..a272b1a 100644
--- a/apscheduler/schedulers/asyncio.py
+++ b/apscheduler/schedulers/asyncio.py
@@ -16,8 +16,8 @@ except ImportError: # pragma: nocover
def run_in_event_loop(func):
@wraps(func)
- def wrapper(self, *args, **kwargs):
- self._eventloop.call_soon_threadsafe(func, self, *args, **kwargs)
+ def wrapper(self, *args):
+ self._eventloop.call_soon_threadsafe(func, self, *args)
return wrapper
@@ -25,6 +25,8 @@ class AsyncIOScheduler(BaseScheduler):
"""
A scheduler that runs on an asyncio (:pep:`3156`) event loop.
+ The default executor can run jobs based on native coroutines (``async def``).
+
Extra options:
============== =============================================================
diff --git a/apscheduler/schedulers/tornado.py b/apscheduler/schedulers/tornado.py
index 29b2750..0a9171f 100644
--- a/apscheduler/schedulers/tornado.py
+++ b/apscheduler/schedulers/tornado.py
@@ -23,6 +23,8 @@ class TornadoScheduler(BaseScheduler):
"""
A scheduler that runs on a Tornado IOLoop.
+ The default executor can run jobs based on native coroutines (``async def``).
+
=========== ===============================================================
``io_loop`` Tornado IOLoop instance to use (defaults to the global IO loop)
=========== ===============================================================
@@ -50,6 +52,10 @@ class TornadoScheduler(BaseScheduler):
self._ioloop.remove_timeout(self._timeout)
del self._timeout
+ def _create_default_executor(self):
+ from apscheduler.executors.tornado import TornadoExecutor
+ return TornadoExecutor()
+
@run_in_ioloop
def wakeup(self):
self._stop_timer()
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst
index 4395116..842495d 100644
--- a/docs/versionhistory.rst
+++ b/docs/versionhistory.rst
@@ -7,8 +7,15 @@ APScheduler, see the :doc:`migration section <migration>`.
3.3.0
-----
+* The asyncio and Tornado schedulers can now run jobs targeting coroutine functions
+ (requires Python 3.5; only native coroutines (``async def``) are supported)
+
+* The Tornado scheduler now uses TornadoExecutor as its default executor (see above as for why)
+
* Added ZooKeeper job store (thanks to Jose Ignacio Villar for the patch)
+* Added the ability to execute coroutine functions with asyncio/trollius and Twisted
+
* Improved import logic in ``ref_to_obj()`` to avoid errors in cases where traversing the path with
``getattr()`` would not work (thanks to Jarek Glowacki for the patch)
diff --git a/setup.py b/setup.py
index d396206..916e791 100644
--- a/setup.py
+++ b/setup.py
@@ -52,6 +52,7 @@ setup(
'mongodb': ['pymongo >= 2.8'],
'rethinkdb': ['rethinkdb'],
'redis': ['redis'],
+ 'tornado': ['tornado >= 4.3'],
'zookeeper': ['kazoo']
},
zip_safe=False,
@@ -67,6 +68,7 @@ setup(
'processpool = apscheduler.executors.pool:ProcessPoolExecutor',
'asyncio = apscheduler.executors.asyncio:AsyncIOExecutor [asyncio]',
'gevent = apscheduler.executors.gevent:GeventExecutor [gevent]',
+ 'tornado = apscheduler.executors.tornado:TornadoExecutor [tornado]',
'twisted = apscheduler.executors.twisted:TwistedExecutor [twisted]'
],
'apscheduler.jobstores': [
diff --git a/tests/conftest.py b/tests/conftest.py
index 8d32424..19fba99 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -15,6 +15,10 @@ except ImportError:
from mock import Mock
+def pytest_ignore_collect(path, config):
+ return path.basename.endswith('_py35.py') and sys.version_info < (3, 5)
+
+
def minpython(*version):
version_str = '.'.join([str(num) for num in version])
diff --git a/tests/test_executors_py35.py b/tests/test_executors_py35.py
new file mode 100644
index 0000000..cc25d88
--- /dev/null
+++ b/tests/test_executors_py35.py
@@ -0,0 +1,86 @@
+"""Contains test functions using Python 3.3+ syntax."""
+from datetime import datetime
+
+import pytest
+from apscheduler.executors.asyncio import AsyncIOExecutor
+from apscheduler.executors.tornado import TornadoExecutor
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+from apscheduler.schedulers.tornado import TornadoScheduler
+from pytz import utc
+
+
+@pytest.fixture
+def asyncio_scheduler(event_loop):
+ scheduler = AsyncIOScheduler(event_loop=event_loop)
+ scheduler.start(paused=True)
+ yield scheduler
+ scheduler.shutdown(False)
+
+
+@pytest.fixture
+def asyncio_executor(asyncio_scheduler):
+ executor = AsyncIOExecutor()
+ executor.start(asyncio_scheduler, 'default')
+ yield executor
+ executor.shutdown()
+
+
+@pytest.fixture
+def tornado_scheduler(io_loop):
+ scheduler = TornadoScheduler(io_loop=io_loop)
+ scheduler.start(paused=True)
+ yield scheduler
+ scheduler.shutdown(False)
+
+
+@pytest.fixture
+def tornado_executor(tornado_scheduler):
+ executor = TornadoExecutor()
+ executor.start(tornado_scheduler, 'default')
+ yield executor
+ executor.shutdown()
+
+
+async def waiter(sleep, exception):
+ await sleep(0.1)
+ if exception:
+ raise Exception('dummy error')
+ else:
+ return True
+
+
+@pytest.mark.parametrize('exception', [False, True])
+@pytest.mark.asyncio
+async def test_run_coroutine_job(asyncio_scheduler, asyncio_executor, exception):
+ from asyncio import Future, sleep
+
+ future = Future()
+ job = asyncio_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, exception])
+ asyncio_executor._run_job_success = lambda job_id, events: future.set_result(events)
+ asyncio_executor._run_job_error = lambda job_id, exc, tb: future.set_exception(exc)
+ asyncio_executor.submit_job(job, [datetime.now(utc)])
+ events = await future
+ assert len(events) == 1
+ if exception:
+ assert str(events[0].exception) == 'dummy error'
+ else:
+ assert events[0].retval is True
+
+
+@pytest.mark.parametrize('exception', [False, True])
+@pytest.mark.gen_test
+async def test_run_coroutine_job_tornado(tornado_scheduler, tornado_executor, exception):
+ from tornado.concurrent import Future
+ from tornado.gen import sleep
+
+ future = Future()
+ job = tornado_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, exception])
+ tornado_executor._run_job_success = lambda job_id, events: future.set_result(events)
+ tornado_executor._run_job_error = lambda job_id, exc, tb: future.set_exception(exc)
+ tornado_executor.submit_job(job, [datetime.now(utc)])
+ events = await future
+ assert len(events) == 1
+ if exception:
+ assert str(events[0].exception) == 'dummy error'
+ else:
+ assert events[0].retval is True
diff --git a/tox.ini b/tox.ini
index 7668afc..e637b8a 100644
--- a/tox.ini
+++ b/tox.ini
@@ -10,10 +10,11 @@ skip_missing_interpreters = true
pypy = pypy
[testenv]
-commands = py.test {posargs}
+commands = pytest {posargs}
deps = pytest
pytest-cov
pytest-catchlog
+ pytest-tornado
sqlalchemy
pymongo
kazoo
@@ -25,6 +26,7 @@ deps = pytest
{py27,pypy}: mock
{py27,pypy}: trollius
{py33}: asyncio
+ {py33,py34,py35}: pytest-asyncio
[testenv:flake8]
deps = flake8