summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 14:50:22 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 14:50:22 +0300
commit48722053dfb43de077df18a139abb16b0a7f7e24 (patch)
treebd55e709e0d4c02619ef0ec54390a8f792da2f74
parenta58fca290e0831d377d496a69101e5e3dc4c604e (diff)
downloadapscheduler-48722053dfb43de077df18a139abb16b0a7f7e24.tar.gz
Improved the event subscription system
The subscribe() method now returns a subscription which has the unsubscribe() method in itself.
-rw-r--r--src/apscheduler/abc.py35
-rw-r--r--src/apscheduler/datastores/async_sqlalchemy.py15
-rw-r--r--src/apscheduler/datastores/mongodb.py13
-rw-r--r--src/apscheduler/datastores/sqlalchemy.py11
-rw-r--r--src/apscheduler/eventbrokers/async_adapter.py5
-rw-r--r--src/apscheduler/eventbrokers/base.py34
-rw-r--r--src/apscheduler/eventbrokers/local.py5
-rw-r--r--src/apscheduler/events.py14
-rw-r--r--src/apscheduler/schedulers/async_.py26
-rw-r--r--src/apscheduler/schedulers/sync.py32
-rw-r--r--src/apscheduler/workers/async_.py29
-rw-r--r--src/apscheduler/workers/sync.py33
-rw-r--r--tests/test_datastores.py4
-rw-r--r--tests/test_eventbrokers.py8
-rw-r--r--tests/test_schedulers.py4
-rw-r--r--tests/test_workers.py8
16 files changed, 119 insertions, 157 deletions
diff --git a/src/apscheduler/abc.py b/src/apscheduler/abc.py
index e267836..4b2da42 100644
--- a/src/apscheduler/abc.py
+++ b/src/apscheduler/abc.py
@@ -10,7 +10,7 @@ from .enums import ConflictPolicy
from .structures import Job, JobResult, Schedule, Task
if TYPE_CHECKING:
- from . import events
+ from .events import Event
class Trigger(Iterator[datetime], metaclass=ABCMeta):
@@ -64,14 +64,25 @@ class Serializer(metaclass=ABCMeta):
return self.deserialize(b64decode(serialized))
+class Subscription(metaclass=ABCMeta):
+ @abstractmethod
+ def unsubscribe(self) -> None:
+ """
+ Cancel this subscription.
+
+ Does nothing if the subscription has already been cancelled.
+
+ """
+
+
class EventSource(metaclass=ABCMeta):
"""Interface for objects that can deliver notifications to interested subscribers."""
@abstractmethod
def subscribe(
- self, callback: Callable[[events.Event], Any],
- event_types: Optional[Iterable[type[events.Event]]] = None
- ) -> events.SubscriptionToken:
+ self, callback: Callable[[Event], Any],
+ event_types: Optional[Iterable[type[Event]]] = None
+ ) -> Subscription:
"""
Subscribe to events from this event source.
@@ -79,14 +90,6 @@ class EventSource(metaclass=ABCMeta):
:param event_types: an iterable of concrete Event classes to subscribe to
"""
- @abstractmethod
- def unsubscribe(self, token: events.SubscriptionToken) -> None:
- """
- Cancel an event subscription.
-
- :param token: a token returned from :meth:`subscribe`
- """
-
class EventBroker(EventSource):
"""
@@ -102,11 +105,11 @@ class EventBroker(EventSource):
pass
@abstractmethod
- def publish(self, event: events.Event) -> None:
+ def publish(self, event: Event) -> None:
"""Publish an event."""
@abstractmethod
- def publish_local(self, event: events.Event) -> None:
+ def publish_local(self, event: Event) -> None:
"""Publish an event, but only to local subscribers."""
@@ -124,11 +127,11 @@ class AsyncEventBroker(EventSource):
pass
@abstractmethod
- async def publish(self, event: events.Event) -> None:
+ async def publish(self, event: Event) -> None:
"""Publish an event."""
@abstractmethod
- async def publish_local(self, event: events.Event) -> None:
+ async def publish_local(self, event: Event) -> None:
"""Publish an event, but only to local subscribers."""
diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py
index a215d68..b15b154 100644
--- a/src/apscheduler/datastores/async_sqlalchemy.py
+++ b/src/apscheduler/datastores/async_sqlalchemy.py
@@ -2,7 +2,7 @@ from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timedelta, timezone
-from typing import Any, Callable, Iterable, Optional
+from typing import Any, Iterable, Optional
from uuid import UUID
import attr
@@ -19,9 +19,9 @@ from ..abc import AsyncDataStore, AsyncEventBroker, EventSource, Job, Schedule
from ..enums import ConflictPolicy
from ..eventbrokers.async_local import LocalAsyncEventBroker
from ..events import (
- DataStoreEvent, Event, JobAdded, JobDeserializationFailed, ScheduleAdded,
- ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, SubscriptionToken, TaskAdded,
- TaskRemoved, TaskUpdated)
+ DataStoreEvent, JobAdded, JobDeserializationFailed, ScheduleAdded,
+ ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved,
+ TaskUpdated)
from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError
from ..marshalling import callable_to_ref
from ..structures import JobResult, Task
@@ -93,13 +93,6 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
return jobs
- def subscribe(self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken:
- return self.events.subscribe(callback, event_types)
-
- def unsubscribe(self, token: SubscriptionToken) -> None:
- self.events.unsubscribe(token)
-
async def add_task(self, task: Task) -> None:
insert = self.t_tasks.insert().\
values(id=task.id, func=callable_to_ref(task.func),
diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py
index 6e7d0aa..ad4d568 100644
--- a/src/apscheduler/datastores/mongodb.py
+++ b/src/apscheduler/datastores/mongodb.py
@@ -4,7 +4,7 @@ from collections import defaultdict
from contextlib import ExitStack
from datetime import datetime, timezone
from logging import Logger, getLogger
-from typing import Any, Callable, ClassVar, Iterable, Optional
+from typing import ClassVar, Iterable, Optional
from uuid import UUID
import attr
@@ -18,8 +18,8 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer
from ..enums import ConflictPolicy
from ..eventbrokers.local import LocalEventBroker
from ..events import (
- DataStoreEvent, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, SubscriptionToken,
- TaskAdded, TaskRemoved, TaskUpdated)
+ DataStoreEvent, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded,
+ TaskRemoved, TaskUpdated)
from ..exceptions import (
ConflictingIdError, DeserializationError, SerializationError, TaskLookupError)
from ..serializers.pickle import PickleSerializer
@@ -91,13 +91,6 @@ class MongoDBDataStore(DataStore):
def __exit__(self, exc_type, exc_val, exc_tb):
self._exit_stack.__exit__(exc_type, exc_val, exc_tb)
- def subscribe(self, callback: Callable[[events.Event], Any],
- event_types: Optional[Iterable[type[events.Event]]] = None) -> SubscriptionToken:
- return self._events.subscribe(callback, event_types)
-
- def unsubscribe(self, token: events.SubscriptionToken) -> None:
- self._events.unsubscribe(token)
-
def add_task(self, task: Task) -> None:
previous = self._tasks.find_one_and_update(
{'_id': task.id},
diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py
index 3040ae4..8dea821 100644
--- a/src/apscheduler/datastores/sqlalchemy.py
+++ b/src/apscheduler/datastores/sqlalchemy.py
@@ -3,7 +3,7 @@ from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from logging import Logger, getLogger
-from typing import Any, Callable, Iterable, Optional
+from typing import Any, Iterable, Optional
from uuid import UUID
import attr
@@ -21,7 +21,7 @@ from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome
from ..eventbrokers.local import LocalEventBroker
from ..events import (
Event, JobAdded, JobDeserializationFailed, ScheduleAdded, ScheduleDeserializationFailed,
- ScheduleRemoved, ScheduleUpdated, SubscriptionToken, TaskAdded, TaskRemoved, TaskUpdated)
+ ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated)
from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError
from ..marshalling import callable_to_ref
from ..serializers.pickle import PickleSerializer
@@ -212,13 +212,6 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
def events(self) -> EventSource:
return self._events
- def subscribe(self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken:
- return self._events.subscribe(callback, event_types)
-
- def unsubscribe(self, token: SubscriptionToken) -> None:
- self._events.unsubscribe(token)
-
def add_task(self, task: Task) -> None:
insert = self.t_tasks.insert().\
values(id=task.id, func=callable_to_ref(task.func),
diff --git a/src/apscheduler/eventbrokers/async_adapter.py b/src/apscheduler/eventbrokers/async_adapter.py
index 1fb9177..c10ac56 100644
--- a/src/apscheduler/eventbrokers/async_adapter.py
+++ b/src/apscheduler/eventbrokers/async_adapter.py
@@ -28,8 +28,9 @@ class AsyncEventBrokerAdapter(LocalAsyncEventBroker):
await to_thread.run_sync(self.original.__enter__)
self._exit_stack.push_async_exit(partial(to_thread.run_sync, self.original.__exit__))
- token = self.original.subscribe(partial(self.portal.call, self.publish_local))
- self._exit_stack.callback(self.original.unsubscribe, token)
+ # Relay events from the original broker to this one
+ relay_subscription = self.original.subscribe(partial(self.portal.call, self.publish_local))
+ self._exit_stack.callback(relay_subscription.unsubscribe)
async def publish(self, event: Event) -> None:
await to_thread.run_sync(self.original.publish, event)
diff --git a/src/apscheduler/eventbrokers/base.py b/src/apscheduler/eventbrokers/base.py
index ce12055..23bae8a 100644
--- a/src/apscheduler/eventbrokers/base.py
+++ b/src/apscheduler/eventbrokers/base.py
@@ -6,33 +6,41 @@ from typing import Any, Callable, Iterable, Optional
import attr
-from .. import abc, events
-from ..abc import EventBroker, Serializer
-from ..events import Event, Subscription, SubscriptionToken
+from .. import events
+from ..abc import EventBroker, Serializer, Subscription
+from ..events import Event
from ..exceptions import DeserializationError
+@attr.define(eq=False, frozen=True)
+class LocalSubscription(Subscription):
+ callback: Callable[[Event], Any]
+ event_types: Optional[set[type[Event]]]
+ _source: BaseEventBroker
+ _token: object
+
+ def unsubscribe(self) -> None:
+ self._source.unsubscribe(self._token)
+
+
@attr.define(eq=False)
class BaseEventBroker(EventBroker):
_logger: Logger = attr.field(init=False)
- _subscriptions: dict[SubscriptionToken, Subscription] = attr.field(init=False, factory=dict)
+ _subscriptions: dict[object, Subscription] = attr.field(init=False, factory=dict)
def __attrs_post_init__(self) -> None:
self._logger = getLogger(self.__class__.__module__)
def subscribe(self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken:
+ event_types: Optional[Iterable[type[Event]]] = None) -> Subscription:
types = set(event_types) if event_types else None
- token = SubscriptionToken(object())
- subscription = Subscription(callback, types)
+ token = object()
+ subscription = LocalSubscription(callback, types, self, token)
self._subscriptions[token] = subscription
- return token
-
- def unsubscribe(self, token: SubscriptionToken) -> None:
- self._subscriptions.pop(token, None)
+ return subscription
- def relay_events_from(self, source: abc.EventSource) -> SubscriptionToken:
- return source.subscribe(self.publish)
+ def unsubscribe(self, token: object) -> None:
+ self._subscriptions.pop(token)
class DistributedEventBrokerMixin:
diff --git a/src/apscheduler/eventbrokers/local.py b/src/apscheduler/eventbrokers/local.py
index a657f4e..e5db7cd 100644
--- a/src/apscheduler/eventbrokers/local.py
+++ b/src/apscheduler/eventbrokers/local.py
@@ -7,7 +7,8 @@ from typing import Any, Callable, Iterable, Optional
import attr
-from ..events import Event, SubscriptionToken
+from ..abc import Subscription
+from ..events import Event
from ..util import reentrant
from .base import BaseEventBroker
@@ -28,7 +29,7 @@ class LocalEventBroker(BaseEventBroker):
del self._executor
def subscribe(self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken:
+ event_types: Optional[Iterable[type[Event]]] = None) -> Subscription:
if iscoroutinefunction(callback):
raise ValueError('Coroutine functions are not supported as callbacks on a synchronous '
'event source')
diff --git a/src/apscheduler/events.py b/src/apscheduler/events.py
index cd63ea3..9d04adc 100644
--- a/src/apscheduler/events.py
+++ b/src/apscheduler/events.py
@@ -2,7 +2,7 @@ from __future__ import annotations
from datetime import datetime, timezone
from functools import partial
-from typing import Any, Callable, NewType, Optional
+from typing import Optional
from uuid import UUID
import attr
@@ -12,8 +12,6 @@ from .converters import as_aware_datetime, as_uuid
from .enums import JobOutcome
from .structures import Job
-SubscriptionToken = NewType('SubscriptionToken', object)
-
@attr.define(kw_only=True, frozen=True)
class Event:
@@ -159,13 +157,3 @@ class JobEnded(JobExecutionEvent):
timestamp=datetime.now(timezone.utc), job_id=job.id, task_id=job.task_id,
schedule_id=job.schedule_id, scheduled_fire_time=job.scheduled_fire_time,
start_deadline=job.start_deadline, outcome=outcome, start_time=start_time)
-
-
-#
-# Event delivery
-#
-
-@attr.define(eq=False, frozen=True)
-class Subscription:
- callback: Callable[[Event], Any]
- event_types: Optional[set[type[Event]]]
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 790baf4..dbece0e 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -17,14 +17,13 @@ from ..datastores.async_adapter import AsyncDataStoreAdapter
from ..datastores.memory import MemoryDataStore
from ..enums import CoalescePolicy, ConflictPolicy, RunState
from ..eventbrokers.async_local import LocalAsyncEventBroker
-from ..events import (
- Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated, SubscriptionToken)
+from ..events import ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated
from ..marshalling import callable_to_ref
from ..structures import Task
from ..workers.async_ import AsyncWorker
-class AsyncScheduler(EventSource):
+class AsyncScheduler:
"""An asynchronous (AnyIO based) scheduler implementation."""
data_store: AsyncDataStore
@@ -49,6 +48,10 @@ class AsyncScheduler(EventSource):
self.data_store = data_store
@property
+ def events(self) -> EventSource:
+ return self._events
+
+ @property
def worker(self) -> Optional[AsyncWorker]:
return self._worker
@@ -58,15 +61,15 @@ class AsyncScheduler(EventSource):
await self._exit_stack.__aenter__()
await self._exit_stack.enter_async_context(self._events)
- # Initialize the data store
+ # Initialize the data store and start relaying events to the scheduler's event broker
await self._exit_stack.enter_async_context(self.data_store)
- relay_token = self._events.relay_events_from(self.data_store.events)
- self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token)
+ relay_subscription = self.data_store.events.subscribe(self._events.publish)
+ self._exit_stack.callback(relay_subscription.unsubscribe)
# Wake up the scheduler if the data store emits a significant schedule event
- wakeup_token = self.data_store.events.subscribe(
+ wakeup_subscription = self.data_store.events.subscribe(
lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated})
- self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token)
+ self._exit_stack.callback(wakeup_subscription.unsubscribe)
# Start the built-in worker, if configured to do so
if self.start_worker:
@@ -86,13 +89,6 @@ class AsyncScheduler(EventSource):
del self._task_group
del self._wakeup_event
- def subscribe(self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken:
- return self._events.subscribe(callback, event_types)
-
- def unsubscribe(self, token: SubscriptionToken) -> None:
- self._events.unsubscribe(token)
-
async def add_schedule(
self, func_or_task_id: str | Callable, trigger: Trigger, *, id: Optional[str] = None,
args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None,
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index 5875ddc..d8e1397 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -14,14 +14,13 @@ from ..abc import DataStore, EventSource, Trigger
from ..datastores.memory import MemoryDataStore
from ..enums import CoalescePolicy, ConflictPolicy, RunState
from ..eventbrokers.local import LocalEventBroker
-from ..events import (
- Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated, SubscriptionToken)
+from ..events import Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated
from ..marshalling import callable_to_ref
from ..structures import Job, Schedule, Task
from ..workers.sync import Worker
-class Scheduler(EventSource):
+class Scheduler:
"""A synchronous scheduler implementation."""
_state: RunState = RunState.stopped
@@ -39,6 +38,10 @@ class Scheduler(EventSource):
self._events = LocalEventBroker()
@property
+ def events(self) -> EventSource:
+ return self._events
+
+ @property
def state(self) -> RunState:
return self._state
@@ -52,29 +55,29 @@ class Scheduler(EventSource):
self._exit_stack.__enter__()
self._exit_stack.enter_context(self._events)
- # Initialize the data store
+ # Initialize the data store and start relaying events to the scheduler's event broker
self._exit_stack.enter_context(self.data_store)
- relay_token = self._events.relay_events_from(self.data_store.events)
- self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token)
+ relay_subscription = self.data_store.events.subscribe(self._events.publish)
+ self._exit_stack.callback(relay_subscription.unsubscribe)
# Wake up the scheduler if the data store emits a significant schedule event
- wakeup_token = self.data_store.events.subscribe(
+ wakeup_subscription = self.data_store.events.subscribe(
lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated})
- self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token)
+ self._exit_stack.callback(wakeup_subscription.unsubscribe)
# Start the built-in worker, if configured to do so
if self.start_worker:
self._worker = Worker(self.data_store)
self._exit_stack.enter_context(self._worker)
- # Start the worker and return when it has signalled readiness or raised an exception
+ # Start the scheduler and return when it has signalled readiness or raised an exception
start_future: Future[Event] = Future()
- token = self._events.subscribe(start_future.set_result)
+ start_subscription = self._events.subscribe(start_future.set_result)
run_future = self._executor.submit(self.run)
try:
wait([start_future, run_future], return_when=FIRST_COMPLETED)
finally:
- self._events.unsubscribe(token)
+ start_subscription.unsubscribe()
if run_future.done():
run_future.result()
@@ -88,13 +91,6 @@ class Scheduler(EventSource):
self._exit_stack.__exit__(exc_type, exc_val, exc_tb)
del self._wakeup_event
- def subscribe(self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken:
- return self._events.subscribe(callback, event_types)
-
- def unsubscribe(self, token: SubscriptionToken) -> None:
- self._events.unsubscribe(token)
-
def add_schedule(
self, func_or_task_id: str | Callable, trigger: Trigger, *, id: Optional[str] = None,
args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None,
diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py
index 7acef67..268a286 100644
--- a/src/apscheduler/workers/async_.py
+++ b/src/apscheduler/workers/async_.py
@@ -6,7 +6,7 @@ from contextlib import AsyncExitStack
from datetime import datetime, timezone
from inspect import isawaitable
from logging import Logger, getLogger
-from typing import Any, Callable, Iterable, Optional
+from typing import Callable, Optional
from uuid import UUID
import anyio
@@ -17,18 +17,16 @@ from ..abc import AsyncDataStore, DataStore, EventSource, Job
from ..datastores.async_adapter import AsyncDataStoreAdapter
from ..enums import JobOutcome, RunState
from ..eventbrokers.async_local import LocalAsyncEventBroker
-from ..events import (
- Event, JobAdded, JobEnded, JobStarted, SubscriptionToken, WorkerStarted, WorkerStopped)
+from ..events import JobAdded, JobEnded, JobStarted, WorkerStarted, WorkerStopped
from ..structures import JobResult
-class AsyncWorker(EventSource):
+class AsyncWorker:
"""Runs jobs locally in a task group."""
_stop_event: Optional[anyio.Event] = None
_state: RunState = RunState.stopped
_acquire_cancel_scope: Optional[CancelScope] = None
- _datastore_subscription: SubscriptionToken
_wakeup_event: anyio.Event
def __init__(self, data_store: DataStore | AsyncDataStore, *,
@@ -51,6 +49,10 @@ class AsyncWorker(EventSource):
self.data_store = data_store
@property
+ def events(self) -> EventSource:
+ return self._events
+
+ @property
def state(self) -> RunState:
return self._state
@@ -60,15 +62,15 @@ class AsyncWorker(EventSource):
await self._exit_stack.__aenter__()
await self._exit_stack.enter_async_context(self._events)
- # Initialize the data store
+ # Initialize the data store and start relaying events to the worker's event broker
await self._exit_stack.enter_async_context(self.data_store)
- relay_token = self._events.relay_events_from(self.data_store.events)
- self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token)
+ relay_subscription = self.data_store.events.subscribe(self._events.publish)
+ self._exit_stack.callback(relay_subscription.unsubscribe)
# Wake up the worker if the data store emits a significant job event
- wakeup_token = self.data_store.events.subscribe(
+ wakeup_subscription = self.data_store.events.subscribe(
lambda event: self._wakeup_event.set(), {JobAdded})
- self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token)
+ self._exit_stack.callback(wakeup_subscription.unsubscribe)
# Start the actual worker
task_group = create_task_group()
@@ -82,13 +84,6 @@ class AsyncWorker(EventSource):
await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
del self._wakeup_event
- def subscribe(self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken:
- return self._events.subscribe(callback, event_types)
-
- def unsubscribe(self, token: SubscriptionToken) -> None:
- self._events.unsubscribe(token)
-
async def run(self, *, task_status=TASK_STATUS_IGNORED) -> None:
if self._state is not RunState.starting:
raise RuntimeError(f'This function cannot be called while the worker is in the '
diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py
index 45f0c5f..f9d3674 100644
--- a/src/apscheduler/workers/sync.py
+++ b/src/apscheduler/workers/sync.py
@@ -7,19 +7,17 @@ from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
from contextlib import ExitStack
from datetime import datetime, timezone
from logging import Logger, getLogger
-from typing import Any, Callable, Iterable, Optional
+from typing import Callable, Optional
from uuid import UUID
-from .. import events
from ..abc import DataStore, EventSource
from ..enums import JobOutcome, RunState
from ..eventbrokers.local import LocalEventBroker
-from ..events import (
- JobAdded, JobEnded, JobStarted, SubscriptionToken, WorkerStarted, WorkerStopped)
+from ..events import JobAdded, JobEnded, JobStarted, WorkerStarted, WorkerStopped
from ..structures import Job, JobResult
-class Worker(EventSource):
+class Worker:
"""Runs jobs locally in a thread pool."""
_executor: ThreadPoolExecutor
@@ -42,6 +40,10 @@ class Worker(EventSource):
self.data_store = data_store
@property
+ def events(self) -> EventSource:
+ return self._events
+
+ @property
def state(self) -> RunState:
return self._state
@@ -51,25 +53,25 @@ class Worker(EventSource):
self._exit_stack.__enter__()
self._exit_stack.enter_context(self._events)
- # Initialize the data store
+ # Initialize the data store and start relaying events to the worker's event broker
self._exit_stack.enter_context(self.data_store)
- relay_token = self._events.relay_events_from(self.data_store.events)
- self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token)
+ relay_subscription = self.data_store.events.subscribe(self._events.publish)
+ self._exit_stack.callback(relay_subscription.unsubscribe)
# Wake up the worker if the data store emits a significant job event
- wakeup_token = self.data_store.events.subscribe(
+ wakeup_subscription = self.data_store.events.subscribe(
lambda event: self._wakeup_event.set(), {JobAdded})
- self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token)
+ self._exit_stack.callback(wakeup_subscription.unsubscribe)
# Start the worker and return when it has signalled readiness or raised an exception
start_future: Future[None] = Future()
- token = self._events.subscribe(start_future.set_result)
+ start_subscription = self._events.subscribe(start_future.set_result)
self._executor = ThreadPoolExecutor(1)
run_future = self._executor.submit(self.run)
try:
wait([start_future, run_future], return_when=FIRST_COMPLETED)
finally:
- self._events.unsubscribe(token)
+ start_subscription.unsubscribe()
if run_future.done():
run_future.result()
@@ -83,13 +85,6 @@ class Worker(EventSource):
self._exit_stack.__exit__(exc_type, exc_val, exc_tb)
del self._wakeup_event
- def subscribe(self, callback: Callable[[events.Event], Any],
- event_types: Optional[Iterable[type[events.Event]]] = None) -> SubscriptionToken:
- return self._events.subscribe(callback, event_types)
-
- def unsubscribe(self, token: events.SubscriptionToken) -> None:
- self._events.unsubscribe(token)
-
def run(self) -> None:
if self._state is not RunState.starting:
raise RuntimeError(f'This function cannot be called while the worker is in the '
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index 4389069..e5b50a7 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -151,11 +151,11 @@ async def capture_events(
events.append(event)
if len(events) == limit:
limit_event.set()
- datastore.events.unsubscribe(token)
+ subscription.unsubscribe()
events: List[Event] = []
limit_event = anyio.Event()
- token = datastore.events.subscribe(listener, event_types)
+ subscription = datastore.events.subscribe(listener, event_types)
yield events
if limit:
with anyio.fail_after(3):
diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py
index f1f88c0..7097001 100644
--- a/tests/test_eventbrokers.py
+++ b/tests/test_eventbrokers.py
@@ -105,11 +105,11 @@ class TestEventBroker:
def test_unsubscribe(self, broker: EventBroker, caplog) -> None:
queue = Queue()
with broker:
- token = broker.subscribe(queue.put_nowait)
+ subscription = broker.subscribe(queue.put_nowait)
broker.publish(Event())
queue.get(timeout=3)
- broker.unsubscribe(token)
+ subscription.unsubscribe()
broker.publish(Event())
with pytest.raises(Empty):
queue.get(timeout=0.1)
@@ -168,12 +168,12 @@ class TestAsyncEventBroker:
async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None:
send, receive = create_memory_object_stream()
async with async_broker:
- token = async_broker.subscribe(send.send)
+ subscription = async_broker.subscribe(send.send)
await async_broker.publish(Event())
with fail_after(3):
await receive.receive()
- async_broker.unsubscribe(token)
+ subscription.unsubscribe()
await async_broker.publish(Event())
with pytest.raises(TimeoutError), fail_after(0.1):
await receive.receive()
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 990cfec..63729bb 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -33,7 +33,7 @@ class TestAsyncScheduler:
received_events: List[Event] = []
event = anyio.Event()
scheduler = AsyncScheduler(start_worker=False)
- scheduler.subscribe(listener)
+ scheduler.events.subscribe(listener)
trigger = DateTrigger(datetime.now(timezone.utc))
async with scheduler:
await scheduler.add_schedule(dummy_async_job, trigger, id='foo')
@@ -84,7 +84,7 @@ class TestSyncScheduler:
received_events: List[Event] = []
event = threading.Event()
scheduler = Scheduler(start_worker=False)
- scheduler.subscribe(listener)
+ scheduler.events.subscribe(listener)
trigger = DateTrigger(datetime.now(timezone.utc))
with scheduler:
scheduler.add_schedule(dummy_sync_job, trigger, id='foo')
diff --git a/tests/test_workers.py b/tests/test_workers.py
index 1a98801..f8522f3 100644
--- a/tests/test_workers.py
+++ b/tests/test_workers.py
@@ -49,7 +49,7 @@ class TestAsyncWorker:
event = anyio.Event()
data_store = MemoryDataStore()
worker = AsyncWorker(data_store)
- worker.subscribe(listener)
+ worker.events.subscribe(listener)
async with worker:
await worker.data_store.add_task(Task(id='task_id', func=target_func))
job = Job(task_id='task_id', args=(1, 2), kwargs={'x': 'foo', 'fail': fail})
@@ -108,7 +108,7 @@ class TestAsyncWorker:
event = anyio.Event()
data_store = MemoryDataStore()
worker = AsyncWorker(data_store)
- worker.subscribe(listener)
+ worker.events.subscribe(listener)
async with worker:
await worker.data_store.add_task(Task(id='task_id', func=fail_func))
job = Job(task_id='task_id', schedule_id='foo',
@@ -162,7 +162,7 @@ class TestSyncWorker:
event = threading.Event()
data_store = MemoryDataStore()
worker = Worker(data_store)
- worker.subscribe(listener)
+ worker.events.subscribe(listener)
with worker:
worker.data_store.add_task(Task(id='task_id', func=sync_func))
job = Job(task_id='task_id', args=(1, 2), kwargs={'x': 'foo', 'fail': fail})
@@ -220,7 +220,7 @@ class TestSyncWorker:
event = threading.Event()
data_store = MemoryDataStore()
worker = Worker(data_store)
- worker.subscribe(listener)
+ worker.events.subscribe(listener)
with worker:
worker.data_store.add_task(Task(id='task_id', func=fail_func))
job = Job(task_id='task_id', schedule_id='foo',