diff options
Diffstat (limited to 'src/apscheduler/workers')
-rw-r--r-- | src/apscheduler/workers/async_.py | 29 | ||||
-rw-r--r-- | src/apscheduler/workers/sync.py | 33 |
2 files changed, 26 insertions, 36 deletions
diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py index 7acef67..268a286 100644 --- a/src/apscheduler/workers/async_.py +++ b/src/apscheduler/workers/async_.py @@ -6,7 +6,7 @@ from contextlib import AsyncExitStack from datetime import datetime, timezone from inspect import isawaitable from logging import Logger, getLogger -from typing import Any, Callable, Iterable, Optional +from typing import Callable, Optional from uuid import UUID import anyio @@ -17,18 +17,16 @@ from ..abc import AsyncDataStore, DataStore, EventSource, Job from ..datastores.async_adapter import AsyncDataStoreAdapter from ..enums import JobOutcome, RunState from ..eventbrokers.async_local import LocalAsyncEventBroker -from ..events import ( - Event, JobAdded, JobEnded, JobStarted, SubscriptionToken, WorkerStarted, WorkerStopped) +from ..events import JobAdded, JobEnded, JobStarted, WorkerStarted, WorkerStopped from ..structures import JobResult -class AsyncWorker(EventSource): +class AsyncWorker: """Runs jobs locally in a task group.""" _stop_event: Optional[anyio.Event] = None _state: RunState = RunState.stopped _acquire_cancel_scope: Optional[CancelScope] = None - _datastore_subscription: SubscriptionToken _wakeup_event: anyio.Event def __init__(self, data_store: DataStore | AsyncDataStore, *, @@ -51,6 +49,10 @@ class AsyncWorker(EventSource): self.data_store = data_store @property + def events(self) -> EventSource: + return self._events + + @property def state(self) -> RunState: return self._state @@ -60,15 +62,15 @@ class AsyncWorker(EventSource): await self._exit_stack.__aenter__() await self._exit_stack.enter_async_context(self._events) - # Initialize the data store + # Initialize the data store and start relaying events to the worker's event broker await self._exit_stack.enter_async_context(self.data_store) - relay_token = self._events.relay_events_from(self.data_store.events) - self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token) + relay_subscription = self.data_store.events.subscribe(self._events.publish) + self._exit_stack.callback(relay_subscription.unsubscribe) # Wake up the worker if the data store emits a significant job event - wakeup_token = self.data_store.events.subscribe( + wakeup_subscription = self.data_store.events.subscribe( lambda event: self._wakeup_event.set(), {JobAdded}) - self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token) + self._exit_stack.callback(wakeup_subscription.unsubscribe) # Start the actual worker task_group = create_task_group() @@ -82,13 +84,6 @@ class AsyncWorker(EventSource): await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb) del self._wakeup_event - def subscribe(self, callback: Callable[[Event], Any], - event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken: - return self._events.subscribe(callback, event_types) - - def unsubscribe(self, token: SubscriptionToken) -> None: - self._events.unsubscribe(token) - async def run(self, *, task_status=TASK_STATUS_IGNORED) -> None: if self._state is not RunState.starting: raise RuntimeError(f'This function cannot be called while the worker is in the ' diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py index 45f0c5f..f9d3674 100644 --- a/src/apscheduler/workers/sync.py +++ b/src/apscheduler/workers/sync.py @@ -7,19 +7,17 @@ from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait from contextlib import ExitStack from datetime import datetime, timezone from logging import Logger, getLogger -from typing import Any, Callable, Iterable, Optional +from typing import Callable, Optional from uuid import UUID -from .. import events from ..abc import DataStore, EventSource from ..enums import JobOutcome, RunState from ..eventbrokers.local import LocalEventBroker -from ..events import ( - JobAdded, JobEnded, JobStarted, SubscriptionToken, WorkerStarted, WorkerStopped) +from ..events import JobAdded, JobEnded, JobStarted, WorkerStarted, WorkerStopped from ..structures import Job, JobResult -class Worker(EventSource): +class Worker: """Runs jobs locally in a thread pool.""" _executor: ThreadPoolExecutor @@ -42,6 +40,10 @@ class Worker(EventSource): self.data_store = data_store @property + def events(self) -> EventSource: + return self._events + + @property def state(self) -> RunState: return self._state @@ -51,25 +53,25 @@ class Worker(EventSource): self._exit_stack.__enter__() self._exit_stack.enter_context(self._events) - # Initialize the data store + # Initialize the data store and start relaying events to the worker's event broker self._exit_stack.enter_context(self.data_store) - relay_token = self._events.relay_events_from(self.data_store.events) - self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token) + relay_subscription = self.data_store.events.subscribe(self._events.publish) + self._exit_stack.callback(relay_subscription.unsubscribe) # Wake up the worker if the data store emits a significant job event - wakeup_token = self.data_store.events.subscribe( + wakeup_subscription = self.data_store.events.subscribe( lambda event: self._wakeup_event.set(), {JobAdded}) - self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token) + self._exit_stack.callback(wakeup_subscription.unsubscribe) # Start the worker and return when it has signalled readiness or raised an exception start_future: Future[None] = Future() - token = self._events.subscribe(start_future.set_result) + start_subscription = self._events.subscribe(start_future.set_result) self._executor = ThreadPoolExecutor(1) run_future = self._executor.submit(self.run) try: wait([start_future, run_future], return_when=FIRST_COMPLETED) finally: - self._events.unsubscribe(token) + start_subscription.unsubscribe() if run_future.done(): run_future.result() @@ -83,13 +85,6 @@ class Worker(EventSource): self._exit_stack.__exit__(exc_type, exc_val, exc_tb) del self._wakeup_event - def subscribe(self, callback: Callable[[events.Event], Any], - event_types: Optional[Iterable[type[events.Event]]] = None) -> SubscriptionToken: - return self._events.subscribe(callback, event_types) - - def unsubscribe(self, token: events.SubscriptionToken) -> None: - self._events.unsubscribe(token) - def run(self) -> None: if self._state is not RunState.starting: raise RuntimeError(f'This function cannot be called while the worker is in the ' |