summaryrefslogtreecommitdiff
path: root/apscheduler/schedulers/tornado.py
blob: 78093308aefe85cd29bd7ef25ce2fcbf41a53880 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from __future__ import absolute_import
from datetime import timedelta
from functools import wraps

from apscheduler.schedulers.base import BaseScheduler
from apscheduler.util import maybe_ref

try:
    from tornado.ioloop import IOLoop
except ImportError:  # pragma: nocover
    raise ImportError('TornadoScheduler requires tornado installed')


def run_in_ioloop(func):
    @wraps(func)
    def wrapper(self, *args, **kwargs):
        self._ioloop.add_callback(func, self, *args, **kwargs)
    return wrapper


class TornadoScheduler(BaseScheduler):
    """
    A scheduler that runs on a Tornado IOLoop.

    =========== ===============================================================
    ``io_loop`` Tornado IOLoop instance to use (defaults to the global IO loop)
    =========== ===============================================================
    """

    _ioloop = None
    _timeout = None

    def start(self):
        super(TornadoScheduler, self).start()
        self.wakeup()

    @run_in_ioloop
    def shutdown(self, wait=True):
        super(TornadoScheduler, self).shutdown(wait)
        self._stop_timer()

    def _configure(self, config):
        self._ioloop = maybe_ref(config.pop('io_loop', None)) or IOLoop.current()
        super(TornadoScheduler, self)._configure(config)

    def _start_timer(self, wait_seconds):
        self._stop_timer()
        if wait_seconds is not None:
            self._timeout = self._ioloop.add_timeout(timedelta(seconds=wait_seconds), self.wakeup)

    def _stop_timer(self):
        if self._timeout:
            self._ioloop.remove_timeout(self._timeout)
            del self._timeout

    @run_in_ioloop
    def wakeup(self):
        self._stop_timer()
        wait_seconds = self._process_jobs()
        self._start_timer(wait_seconds)