summaryrefslogtreecommitdiff
path: root/src/apscheduler/workers/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/apscheduler/workers/sync.py')
-rw-r--r--src/apscheduler/workers/sync.py33
1 files changed, 14 insertions, 19 deletions
diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py
index 45f0c5f..f9d3674 100644
--- a/src/apscheduler/workers/sync.py
+++ b/src/apscheduler/workers/sync.py
@@ -7,19 +7,17 @@ from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
from contextlib import ExitStack
from datetime import datetime, timezone
from logging import Logger, getLogger
-from typing import Any, Callable, Iterable, Optional
+from typing import Callable, Optional
from uuid import UUID
-from .. import events
from ..abc import DataStore, EventSource
from ..enums import JobOutcome, RunState
from ..eventbrokers.local import LocalEventBroker
-from ..events import (
- JobAdded, JobEnded, JobStarted, SubscriptionToken, WorkerStarted, WorkerStopped)
+from ..events import JobAdded, JobEnded, JobStarted, WorkerStarted, WorkerStopped
from ..structures import Job, JobResult
-class Worker(EventSource):
+class Worker:
"""Runs jobs locally in a thread pool."""
_executor: ThreadPoolExecutor
@@ -42,6 +40,10 @@ class Worker(EventSource):
self.data_store = data_store
@property
+ def events(self) -> EventSource:
+ return self._events
+
+ @property
def state(self) -> RunState:
return self._state
@@ -51,25 +53,25 @@ class Worker(EventSource):
self._exit_stack.__enter__()
self._exit_stack.enter_context(self._events)
- # Initialize the data store
+ # Initialize the data store and start relaying events to the worker's event broker
self._exit_stack.enter_context(self.data_store)
- relay_token = self._events.relay_events_from(self.data_store.events)
- self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token)
+ relay_subscription = self.data_store.events.subscribe(self._events.publish)
+ self._exit_stack.callback(relay_subscription.unsubscribe)
# Wake up the worker if the data store emits a significant job event
- wakeup_token = self.data_store.events.subscribe(
+ wakeup_subscription = self.data_store.events.subscribe(
lambda event: self._wakeup_event.set(), {JobAdded})
- self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token)
+ self._exit_stack.callback(wakeup_subscription.unsubscribe)
# Start the worker and return when it has signalled readiness or raised an exception
start_future: Future[None] = Future()
- token = self._events.subscribe(start_future.set_result)
+ start_subscription = self._events.subscribe(start_future.set_result)
self._executor = ThreadPoolExecutor(1)
run_future = self._executor.submit(self.run)
try:
wait([start_future, run_future], return_when=FIRST_COMPLETED)
finally:
- self._events.unsubscribe(token)
+ start_subscription.unsubscribe()
if run_future.done():
run_future.result()
@@ -83,13 +85,6 @@ class Worker(EventSource):
self._exit_stack.__exit__(exc_type, exc_val, exc_tb)
del self._wakeup_event
- def subscribe(self, callback: Callable[[events.Event], Any],
- event_types: Optional[Iterable[type[events.Event]]] = None) -> SubscriptionToken:
- return self._events.subscribe(callback, event_types)
-
- def unsubscribe(self, token: events.SubscriptionToken) -> None:
- self._events.unsubscribe(token)
-
def run(self) -> None:
if self._state is not RunState.starting:
raise RuntimeError(f'This function cannot be called while the worker is in the '