summaryrefslogtreecommitdiff
path: root/src/apscheduler/workers
diff options
context:
space:
mode:
Diffstat (limited to 'src/apscheduler/workers')
-rw-r--r--src/apscheduler/workers/async_.py29
-rw-r--r--src/apscheduler/workers/sync.py33
2 files changed, 26 insertions, 36 deletions
diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py
index 7acef67..268a286 100644
--- a/src/apscheduler/workers/async_.py
+++ b/src/apscheduler/workers/async_.py
@@ -6,7 +6,7 @@ from contextlib import AsyncExitStack
from datetime import datetime, timezone
from inspect import isawaitable
from logging import Logger, getLogger
-from typing import Any, Callable, Iterable, Optional
+from typing import Callable, Optional
from uuid import UUID
import anyio
@@ -17,18 +17,16 @@ from ..abc import AsyncDataStore, DataStore, EventSource, Job
from ..datastores.async_adapter import AsyncDataStoreAdapter
from ..enums import JobOutcome, RunState
from ..eventbrokers.async_local import LocalAsyncEventBroker
-from ..events import (
- Event, JobAdded, JobEnded, JobStarted, SubscriptionToken, WorkerStarted, WorkerStopped)
+from ..events import JobAdded, JobEnded, JobStarted, WorkerStarted, WorkerStopped
from ..structures import JobResult
-class AsyncWorker(EventSource):
+class AsyncWorker:
"""Runs jobs locally in a task group."""
_stop_event: Optional[anyio.Event] = None
_state: RunState = RunState.stopped
_acquire_cancel_scope: Optional[CancelScope] = None
- _datastore_subscription: SubscriptionToken
_wakeup_event: anyio.Event
def __init__(self, data_store: DataStore | AsyncDataStore, *,
@@ -51,6 +49,10 @@ class AsyncWorker(EventSource):
self.data_store = data_store
@property
+ def events(self) -> EventSource:
+ return self._events
+
+ @property
def state(self) -> RunState:
return self._state
@@ -60,15 +62,15 @@ class AsyncWorker(EventSource):
await self._exit_stack.__aenter__()
await self._exit_stack.enter_async_context(self._events)
- # Initialize the data store
+ # Initialize the data store and start relaying events to the worker's event broker
await self._exit_stack.enter_async_context(self.data_store)
- relay_token = self._events.relay_events_from(self.data_store.events)
- self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token)
+ relay_subscription = self.data_store.events.subscribe(self._events.publish)
+ self._exit_stack.callback(relay_subscription.unsubscribe)
# Wake up the worker if the data store emits a significant job event
- wakeup_token = self.data_store.events.subscribe(
+ wakeup_subscription = self.data_store.events.subscribe(
lambda event: self._wakeup_event.set(), {JobAdded})
- self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token)
+ self._exit_stack.callback(wakeup_subscription.unsubscribe)
# Start the actual worker
task_group = create_task_group()
@@ -82,13 +84,6 @@ class AsyncWorker(EventSource):
await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
del self._wakeup_event
- def subscribe(self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken:
- return self._events.subscribe(callback, event_types)
-
- def unsubscribe(self, token: SubscriptionToken) -> None:
- self._events.unsubscribe(token)
-
async def run(self, *, task_status=TASK_STATUS_IGNORED) -> None:
if self._state is not RunState.starting:
raise RuntimeError(f'This function cannot be called while the worker is in the '
diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py
index 45f0c5f..f9d3674 100644
--- a/src/apscheduler/workers/sync.py
+++ b/src/apscheduler/workers/sync.py
@@ -7,19 +7,17 @@ from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
from contextlib import ExitStack
from datetime import datetime, timezone
from logging import Logger, getLogger
-from typing import Any, Callable, Iterable, Optional
+from typing import Callable, Optional
from uuid import UUID
-from .. import events
from ..abc import DataStore, EventSource
from ..enums import JobOutcome, RunState
from ..eventbrokers.local import LocalEventBroker
-from ..events import (
- JobAdded, JobEnded, JobStarted, SubscriptionToken, WorkerStarted, WorkerStopped)
+from ..events import JobAdded, JobEnded, JobStarted, WorkerStarted, WorkerStopped
from ..structures import Job, JobResult
-class Worker(EventSource):
+class Worker:
"""Runs jobs locally in a thread pool."""
_executor: ThreadPoolExecutor
@@ -42,6 +40,10 @@ class Worker(EventSource):
self.data_store = data_store
@property
+ def events(self) -> EventSource:
+ return self._events
+
+ @property
def state(self) -> RunState:
return self._state
@@ -51,25 +53,25 @@ class Worker(EventSource):
self._exit_stack.__enter__()
self._exit_stack.enter_context(self._events)
- # Initialize the data store
+ # Initialize the data store and start relaying events to the worker's event broker
self._exit_stack.enter_context(self.data_store)
- relay_token = self._events.relay_events_from(self.data_store.events)
- self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token)
+ relay_subscription = self.data_store.events.subscribe(self._events.publish)
+ self._exit_stack.callback(relay_subscription.unsubscribe)
# Wake up the worker if the data store emits a significant job event
- wakeup_token = self.data_store.events.subscribe(
+ wakeup_subscription = self.data_store.events.subscribe(
lambda event: self._wakeup_event.set(), {JobAdded})
- self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token)
+ self._exit_stack.callback(wakeup_subscription.unsubscribe)
# Start the worker and return when it has signalled readiness or raised an exception
start_future: Future[None] = Future()
- token = self._events.subscribe(start_future.set_result)
+ start_subscription = self._events.subscribe(start_future.set_result)
self._executor = ThreadPoolExecutor(1)
run_future = self._executor.submit(self.run)
try:
wait([start_future, run_future], return_when=FIRST_COMPLETED)
finally:
- self._events.unsubscribe(token)
+ start_subscription.unsubscribe()
if run_future.done():
run_future.result()
@@ -83,13 +85,6 @@ class Worker(EventSource):
self._exit_stack.__exit__(exc_type, exc_val, exc_tb)
del self._wakeup_event
- def subscribe(self, callback: Callable[[events.Event], Any],
- event_types: Optional[Iterable[type[events.Event]]] = None) -> SubscriptionToken:
- return self._events.subscribe(callback, event_types)
-
- def unsubscribe(self, token: events.SubscriptionToken) -> None:
- self._events.unsubscribe(token)
-
def run(self) -> None:
if self._state is not RunState.starting:
raise RuntimeError(f'This function cannot be called while the worker is in the '