"""Timer scheduling Python callbacks.""" import heapq import sys from collections import namedtuple from datetime import datetime from functools import total_ordering from weakref import proxy as weakrefproxy from vine.utils import wraps from kombu.five import monotonic, python_2_unicode_compatible from kombu.log import get_logger from time import time as _time try: from pytz import utc except ImportError: # pragma: no cover utc = None __all__ = ('Entry', 'Timer', 'to_timestamp') logger = get_logger(__name__) DEFAULT_MAX_INTERVAL = 2 EPOCH = datetime.utcfromtimestamp(0).replace(tzinfo=utc) IS_PYPY = hasattr(sys, 'pypy_version_info') scheduled = namedtuple('scheduled', ('eta', 'priority', 'entry')) def to_timestamp(d, default_timezone=utc, time=monotonic): """Convert datetime to timestamp. If d' is already a timestamp, then that will be used. """ if isinstance(d, datetime): if d.tzinfo is None: d = d.replace(tzinfo=default_timezone) diff = _time() - time() return max((d - EPOCH).total_seconds() - diff, 0) return d @total_ordering @python_2_unicode_compatible class Entry: """Schedule Entry.""" if not IS_PYPY: # pragma: no cover __slots__ = ( 'fun', 'args', 'kwargs', 'tref', 'canceled', '_last_run', '__weakref__', ) def __init__(self, fun, args=None, kwargs=None): self.fun = fun self.args = args or [] self.kwargs = kwargs or {} self.tref = weakrefproxy(self) self._last_run = None self.canceled = False def __call__(self): return self.fun(*self.args, **self.kwargs) def cancel(self): try: self.tref.canceled = True except ReferenceError: # pragma: no cover pass def __repr__(self): return '= secs: tref._last_run = now return fun(*args, **kwargs) finally: if not tref.canceled: last = tref._last_run next = secs - (now - last) if last else secs self.enter_after(next, tref, priority) tref.fun = _reschedules tref._last_run = None return self.enter_after(secs, tref, priority) def enter_at(self, entry, eta=None, priority=0, time=monotonic): """Enter function into the scheduler. Arguments: entry (~kombu.asynchronous.timer.Entry): Item to enter. eta (datetime.datetime): Scheduled time. priority (int): Unused. """ if eta is None: eta = time() if isinstance(eta, datetime): try: eta = to_timestamp(eta) except Exception as exc: if not self.handle_error(exc): raise return return self._enter(eta, priority, entry) def enter_after(self, secs, entry, priority=0, time=monotonic): return self.enter_at(entry, time() + secs, priority) def _enter(self, eta, priority, entry, push=heapq.heappush): push(self._queue, scheduled(eta, priority, entry)) return entry def apply_entry(self, entry): try: entry() except Exception as exc: if not self.handle_error(exc): logger.error('Error in timer: %r', exc, exc_info=True) def handle_error(self, exc_info): if self.on_error: self.on_error(exc_info) return True def stop(self): pass def __iter__(self, min=min, nowfun=monotonic, pop=heapq.heappop, push=heapq.heappush): """Iterate over schedule. This iterator yields a tuple of ``(wait_seconds, entry)``, where if entry is :const:`None` the caller should wait for ``wait_seconds`` until it polls the schedule again. """ max_interval = self.max_interval queue = self._queue while 1: if queue: eventA = queue[0] now, eta = nowfun(), eventA[0] if now < eta: yield min(eta - now, max_interval), None else: eventB = pop(queue) if eventB is eventA: entry = eventA[2] if not entry.canceled: yield None, entry continue else: push(queue, eventB) else: yield None, None def clear(self): self._queue[:] = [] # atomic, without creating a new list. def cancel(self, tref): tref.cancel() def __len__(self): return len(self._queue) def __nonzero__(self): return True @property def queue(self, _pop=heapq.heappop): """Snapshot of underlying datastructure.""" events = list(self._queue) return [_pop(v) for v in [events] * len(events)] @property def schedule(self): return self