summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-23 01:33:04 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-23 01:33:04 +0300
commitb421037421bde1d139e3844f6067ee3f1aeb6852 (patch)
treeca1eef22d25e1cf4f522d9bfc3c33481b9ea4927
parente9c560fa24abcc64c6b426181ec76d6d6cdeb490 (diff)
downloadapscheduler-b421037421bde1d139e3844f6067ee3f1aeb6852.tar.gz
Improved scheduler logging and fixed wait time calculation
-rw-r--r--src/apscheduler/schedulers/async_.py19
-rw-r--r--src/apscheduler/schedulers/sync.py16
2 files changed, 28 insertions, 7 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 751eb2c..b8f2e0b 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -20,7 +20,7 @@ from ..datastores.memory import MemoryDataStore
from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState
from ..eventbrokers.async_local import LocalAsyncEventBroker
from ..events import (
- JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated)
+ Event, JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated)
from ..exceptions import JobCancelled, JobDeadlineMissed, JobLookupError
from ..marshalling import callable_to_ref
from ..structures import JobResult, Task
@@ -71,7 +71,7 @@ class AsyncScheduler:
# Wake up the scheduler if the data store emits a significant schedule event
self._exit_stack.enter_context(
self.data_store.events.subscribe(
- lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated}
+ self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated}
)
)
@@ -97,6 +97,10 @@ class AsyncScheduler:
self._state = RunState.stopped
del self._wakeup_event
+ def _schedule_added_or_modified(self, event: Event) -> None:
+ self.logger.debug('Detected a %s event – waking up the scheduler', type(event).__name__)
+ self._wakeup_event.set()
+
async def add_schedule(
self, func_or_task_id: str | Callable, trigger: Trigger, *, id: Optional[str] = None,
args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None,
@@ -296,12 +300,17 @@ class AsyncScheduler:
if len(schedules) < 100:
next_fire_time = await self.data_store.get_next_schedule_run_time()
if next_fire_time:
- wait_time = (datetime.now(timezone.utc) - next_fire_time).total_seconds()
+ wait_time = (next_fire_time - datetime.now(timezone.utc)).total_seconds()
+ self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)',
+ wait_time, next_fire_time)
+ else:
+ self.logger.debug('Waiting for any due schedules to appear')
+ else:
+ self.logger.debug('Processing more schedules on the next iteration')
with move_on_after(wait_time):
await self._wakeup_event.wait()
-
- self._wakeup_event = anyio.Event()
+ self._wakeup_event = anyio.Event()
except get_cancelled_exc_class():
pass
except BaseException as exc:
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index c49359c..748c8aa 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -74,7 +74,7 @@ class Scheduler:
# Wake up the scheduler if the data store emits a significant schedule event
self._exit_stack.enter_context(
self.data_store.events.subscribe(
- lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated}
+ self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated}
)
)
@@ -107,6 +107,10 @@ class Scheduler:
self._state = RunState.stopped
del self._wakeup_event
+ def _schedule_added_or_modified(self, event: Event) -> None:
+ self.logger.debug('Detected a %s event – waking up the scheduler', type(event).__name__)
+ self._wakeup_event.set()
+
def add_schedule(
self, func_or_task_id: str | Callable, trigger: Trigger, *, id: Optional[str] = None,
args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None,
@@ -242,6 +246,8 @@ class Scheduler:
try:
while self._state is RunState.started:
schedules = self.data_store.acquire_schedules(self.identity, 100)
+ self.logger.debug('Processing %d schedules retrieved from the data store',
+ len(schedules))
now = datetime.now(timezone.utc)
for schedule in schedules:
# Calculate a next fire time for the schedule, if possible
@@ -305,7 +311,13 @@ class Scheduler:
if len(schedules) < 100:
next_fire_time = self.data_store.get_next_schedule_run_time()
if next_fire_time:
- wait_time = (datetime.now(timezone.utc) - next_fire_time).total_seconds()
+ wait_time = (next_fire_time - datetime.now(timezone.utc)).total_seconds()
+ self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)',
+ wait_time, next_fire_time)
+ else:
+ self.logger.debug('Waiting for any due schedules to appear')
+ else:
+ self.logger.debug('Processing more schedules on the next iteration')
if self._wakeup_event.wait(wait_time):
self._wakeup_event = threading.Event()