diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 14:50:22 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 14:50:22 +0300 |
commit | 48722053dfb43de077df18a139abb16b0a7f7e24 (patch) | |
tree | bd55e709e0d4c02619ef0ec54390a8f792da2f74 | |
parent | a58fca290e0831d377d496a69101e5e3dc4c604e (diff) | |
download | apscheduler-48722053dfb43de077df18a139abb16b0a7f7e24.tar.gz |
Improved the event subscription system
The subscribe() method now returns a subscription which has the unsubscribe() method in itself.
-rw-r--r-- | src/apscheduler/abc.py | 35 | ||||
-rw-r--r-- | src/apscheduler/datastores/async_sqlalchemy.py | 15 | ||||
-rw-r--r-- | src/apscheduler/datastores/mongodb.py | 13 | ||||
-rw-r--r-- | src/apscheduler/datastores/sqlalchemy.py | 11 | ||||
-rw-r--r-- | src/apscheduler/eventbrokers/async_adapter.py | 5 | ||||
-rw-r--r-- | src/apscheduler/eventbrokers/base.py | 34 | ||||
-rw-r--r-- | src/apscheduler/eventbrokers/local.py | 5 | ||||
-rw-r--r-- | src/apscheduler/events.py | 14 | ||||
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 26 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 32 | ||||
-rw-r--r-- | src/apscheduler/workers/async_.py | 29 | ||||
-rw-r--r-- | src/apscheduler/workers/sync.py | 33 | ||||
-rw-r--r-- | tests/test_datastores.py | 4 | ||||
-rw-r--r-- | tests/test_eventbrokers.py | 8 | ||||
-rw-r--r-- | tests/test_schedulers.py | 4 | ||||
-rw-r--r-- | tests/test_workers.py | 8 |
16 files changed, 119 insertions, 157 deletions
diff --git a/src/apscheduler/abc.py b/src/apscheduler/abc.py index e267836..4b2da42 100644 --- a/src/apscheduler/abc.py +++ b/src/apscheduler/abc.py @@ -10,7 +10,7 @@ from .enums import ConflictPolicy from .structures import Job, JobResult, Schedule, Task if TYPE_CHECKING: - from . import events + from .events import Event class Trigger(Iterator[datetime], metaclass=ABCMeta): @@ -64,14 +64,25 @@ class Serializer(metaclass=ABCMeta): return self.deserialize(b64decode(serialized)) +class Subscription(metaclass=ABCMeta): + @abstractmethod + def unsubscribe(self) -> None: + """ + Cancel this subscription. + + Does nothing if the subscription has already been cancelled. + + """ + + class EventSource(metaclass=ABCMeta): """Interface for objects that can deliver notifications to interested subscribers.""" @abstractmethod def subscribe( - self, callback: Callable[[events.Event], Any], - event_types: Optional[Iterable[type[events.Event]]] = None - ) -> events.SubscriptionToken: + self, callback: Callable[[Event], Any], + event_types: Optional[Iterable[type[Event]]] = None + ) -> Subscription: """ Subscribe to events from this event source. @@ -79,14 +90,6 @@ class EventSource(metaclass=ABCMeta): :param event_types: an iterable of concrete Event classes to subscribe to """ - @abstractmethod - def unsubscribe(self, token: events.SubscriptionToken) -> None: - """ - Cancel an event subscription. - - :param token: a token returned from :meth:`subscribe` - """ - class EventBroker(EventSource): """ @@ -102,11 +105,11 @@ class EventBroker(EventSource): pass @abstractmethod - def publish(self, event: events.Event) -> None: + def publish(self, event: Event) -> None: """Publish an event.""" @abstractmethod - def publish_local(self, event: events.Event) -> None: + def publish_local(self, event: Event) -> None: """Publish an event, but only to local subscribers.""" @@ -124,11 +127,11 @@ class AsyncEventBroker(EventSource): pass @abstractmethod - async def publish(self, event: events.Event) -> None: + async def publish(self, event: Event) -> None: """Publish an event.""" @abstractmethod - async def publish_local(self, event: events.Event) -> None: + async def publish_local(self, event: Event) -> None: """Publish an event, but only to local subscribers.""" diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py index a215d68..b15b154 100644 --- a/src/apscheduler/datastores/async_sqlalchemy.py +++ b/src/apscheduler/datastores/async_sqlalchemy.py @@ -2,7 +2,7 @@ from __future__ import annotations from collections import defaultdict from datetime import datetime, timedelta, timezone -from typing import Any, Callable, Iterable, Optional +from typing import Any, Iterable, Optional from uuid import UUID import attr @@ -19,9 +19,9 @@ from ..abc import AsyncDataStore, AsyncEventBroker, EventSource, Job, Schedule from ..enums import ConflictPolicy from ..eventbrokers.async_local import LocalAsyncEventBroker from ..events import ( - DataStoreEvent, Event, JobAdded, JobDeserializationFailed, ScheduleAdded, - ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, SubscriptionToken, TaskAdded, - TaskRemoved, TaskUpdated) + DataStoreEvent, JobAdded, JobDeserializationFailed, ScheduleAdded, + ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, + TaskUpdated) from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError from ..marshalling import callable_to_ref from ..structures import JobResult, Task @@ -93,13 +93,6 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore): return jobs - def subscribe(self, callback: Callable[[Event], Any], - event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken: - return self.events.subscribe(callback, event_types) - - def unsubscribe(self, token: SubscriptionToken) -> None: - self.events.unsubscribe(token) - async def add_task(self, task: Task) -> None: insert = self.t_tasks.insert().\ values(id=task.id, func=callable_to_ref(task.func), diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 6e7d0aa..ad4d568 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -4,7 +4,7 @@ from collections import defaultdict from contextlib import ExitStack from datetime import datetime, timezone from logging import Logger, getLogger -from typing import Any, Callable, ClassVar, Iterable, Optional +from typing import ClassVar, Iterable, Optional from uuid import UUID import attr @@ -18,8 +18,8 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer from ..enums import ConflictPolicy from ..eventbrokers.local import LocalEventBroker from ..events import ( - DataStoreEvent, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, SubscriptionToken, - TaskAdded, TaskRemoved, TaskUpdated) + DataStoreEvent, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, + TaskRemoved, TaskUpdated) from ..exceptions import ( ConflictingIdError, DeserializationError, SerializationError, TaskLookupError) from ..serializers.pickle import PickleSerializer @@ -91,13 +91,6 @@ class MongoDBDataStore(DataStore): def __exit__(self, exc_type, exc_val, exc_tb): self._exit_stack.__exit__(exc_type, exc_val, exc_tb) - def subscribe(self, callback: Callable[[events.Event], Any], - event_types: Optional[Iterable[type[events.Event]]] = None) -> SubscriptionToken: - return self._events.subscribe(callback, event_types) - - def unsubscribe(self, token: events.SubscriptionToken) -> None: - self._events.unsubscribe(token) - def add_task(self, task: Task) -> None: previous = self._tasks.find_one_and_update( {'_id': task.id}, diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 3040ae4..8dea821 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -3,7 +3,7 @@ from __future__ import annotations from collections import defaultdict from datetime import datetime, timedelta, timezone from logging import Logger, getLogger -from typing import Any, Callable, Iterable, Optional +from typing import Any, Iterable, Optional from uuid import UUID import attr @@ -21,7 +21,7 @@ from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome from ..eventbrokers.local import LocalEventBroker from ..events import ( Event, JobAdded, JobDeserializationFailed, ScheduleAdded, ScheduleDeserializationFailed, - ScheduleRemoved, ScheduleUpdated, SubscriptionToken, TaskAdded, TaskRemoved, TaskUpdated) + ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated) from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError from ..marshalling import callable_to_ref from ..serializers.pickle import PickleSerializer @@ -212,13 +212,6 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore): def events(self) -> EventSource: return self._events - def subscribe(self, callback: Callable[[Event], Any], - event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken: - return self._events.subscribe(callback, event_types) - - def unsubscribe(self, token: SubscriptionToken) -> None: - self._events.unsubscribe(token) - def add_task(self, task: Task) -> None: insert = self.t_tasks.insert().\ values(id=task.id, func=callable_to_ref(task.func), diff --git a/src/apscheduler/eventbrokers/async_adapter.py b/src/apscheduler/eventbrokers/async_adapter.py index 1fb9177..c10ac56 100644 --- a/src/apscheduler/eventbrokers/async_adapter.py +++ b/src/apscheduler/eventbrokers/async_adapter.py @@ -28,8 +28,9 @@ class AsyncEventBrokerAdapter(LocalAsyncEventBroker): await to_thread.run_sync(self.original.__enter__) self._exit_stack.push_async_exit(partial(to_thread.run_sync, self.original.__exit__)) - token = self.original.subscribe(partial(self.portal.call, self.publish_local)) - self._exit_stack.callback(self.original.unsubscribe, token) + # Relay events from the original broker to this one + relay_subscription = self.original.subscribe(partial(self.portal.call, self.publish_local)) + self._exit_stack.callback(relay_subscription.unsubscribe) async def publish(self, event: Event) -> None: await to_thread.run_sync(self.original.publish, event) diff --git a/src/apscheduler/eventbrokers/base.py b/src/apscheduler/eventbrokers/base.py index ce12055..23bae8a 100644 --- a/src/apscheduler/eventbrokers/base.py +++ b/src/apscheduler/eventbrokers/base.py @@ -6,33 +6,41 @@ from typing import Any, Callable, Iterable, Optional import attr -from .. import abc, events -from ..abc import EventBroker, Serializer -from ..events import Event, Subscription, SubscriptionToken +from .. import events +from ..abc import EventBroker, Serializer, Subscription +from ..events import Event from ..exceptions import DeserializationError +@attr.define(eq=False, frozen=True) +class LocalSubscription(Subscription): + callback: Callable[[Event], Any] + event_types: Optional[set[type[Event]]] + _source: BaseEventBroker + _token: object + + def unsubscribe(self) -> None: + self._source.unsubscribe(self._token) + + @attr.define(eq=False) class BaseEventBroker(EventBroker): _logger: Logger = attr.field(init=False) - _subscriptions: dict[SubscriptionToken, Subscription] = attr.field(init=False, factory=dict) + _subscriptions: dict[object, Subscription] = attr.field(init=False, factory=dict) def __attrs_post_init__(self) -> None: self._logger = getLogger(self.__class__.__module__) def subscribe(self, callback: Callable[[Event], Any], - event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken: + event_types: Optional[Iterable[type[Event]]] = None) -> Subscription: types = set(event_types) if event_types else None - token = SubscriptionToken(object()) - subscription = Subscription(callback, types) + token = object() + subscription = LocalSubscription(callback, types, self, token) self._subscriptions[token] = subscription - return token - - def unsubscribe(self, token: SubscriptionToken) -> None: - self._subscriptions.pop(token, None) + return subscription - def relay_events_from(self, source: abc.EventSource) -> SubscriptionToken: - return source.subscribe(self.publish) + def unsubscribe(self, token: object) -> None: + self._subscriptions.pop(token) class DistributedEventBrokerMixin: diff --git a/src/apscheduler/eventbrokers/local.py b/src/apscheduler/eventbrokers/local.py index a657f4e..e5db7cd 100644 --- a/src/apscheduler/eventbrokers/local.py +++ b/src/apscheduler/eventbrokers/local.py @@ -7,7 +7,8 @@ from typing import Any, Callable, Iterable, Optional import attr -from ..events import Event, SubscriptionToken +from ..abc import Subscription +from ..events import Event from ..util import reentrant from .base import BaseEventBroker @@ -28,7 +29,7 @@ class LocalEventBroker(BaseEventBroker): del self._executor def subscribe(self, callback: Callable[[Event], Any], - event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken: + event_types: Optional[Iterable[type[Event]]] = None) -> Subscription: if iscoroutinefunction(callback): raise ValueError('Coroutine functions are not supported as callbacks on a synchronous ' 'event source') diff --git a/src/apscheduler/events.py b/src/apscheduler/events.py index cd63ea3..9d04adc 100644 --- a/src/apscheduler/events.py +++ b/src/apscheduler/events.py @@ -2,7 +2,7 @@ from __future__ import annotations from datetime import datetime, timezone from functools import partial -from typing import Any, Callable, NewType, Optional +from typing import Optional from uuid import UUID import attr @@ -12,8 +12,6 @@ from .converters import as_aware_datetime, as_uuid from .enums import JobOutcome from .structures import Job -SubscriptionToken = NewType('SubscriptionToken', object) - @attr.define(kw_only=True, frozen=True) class Event: @@ -159,13 +157,3 @@ class JobEnded(JobExecutionEvent): timestamp=datetime.now(timezone.utc), job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id, scheduled_fire_time=job.scheduled_fire_time, start_deadline=job.start_deadline, outcome=outcome, start_time=start_time) - - -# -# Event delivery -# - -@attr.define(eq=False, frozen=True) -class Subscription: - callback: Callable[[Event], Any] - event_types: Optional[set[type[Event]]] diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 790baf4..dbece0e 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -17,14 +17,13 @@ from ..datastores.async_adapter import AsyncDataStoreAdapter from ..datastores.memory import MemoryDataStore from ..enums import CoalescePolicy, ConflictPolicy, RunState from ..eventbrokers.async_local import LocalAsyncEventBroker -from ..events import ( - Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated, SubscriptionToken) +from ..events import ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated from ..marshalling import callable_to_ref from ..structures import Task from ..workers.async_ import AsyncWorker -class AsyncScheduler(EventSource): +class AsyncScheduler: """An asynchronous (AnyIO based) scheduler implementation.""" data_store: AsyncDataStore @@ -49,6 +48,10 @@ class AsyncScheduler(EventSource): self.data_store = data_store @property + def events(self) -> EventSource: + return self._events + + @property def worker(self) -> Optional[AsyncWorker]: return self._worker @@ -58,15 +61,15 @@ class AsyncScheduler(EventSource): await self._exit_stack.__aenter__() await self._exit_stack.enter_async_context(self._events) - # Initialize the data store + # Initialize the data store and start relaying events to the scheduler's event broker await self._exit_stack.enter_async_context(self.data_store) - relay_token = self._events.relay_events_from(self.data_store.events) - self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token) + relay_subscription = self.data_store.events.subscribe(self._events.publish) + self._exit_stack.callback(relay_subscription.unsubscribe) # Wake up the scheduler if the data store emits a significant schedule event - wakeup_token = self.data_store.events.subscribe( + wakeup_subscription = self.data_store.events.subscribe( lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated}) - self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token) + self._exit_stack.callback(wakeup_subscription.unsubscribe) # Start the built-in worker, if configured to do so if self.start_worker: @@ -86,13 +89,6 @@ class AsyncScheduler(EventSource): del self._task_group del self._wakeup_event - def subscribe(self, callback: Callable[[Event], Any], - event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken: - return self._events.subscribe(callback, event_types) - - def unsubscribe(self, token: SubscriptionToken) -> None: - self._events.unsubscribe(token) - async def add_schedule( self, func_or_task_id: str | Callable, trigger: Trigger, *, id: Optional[str] = None, args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None, diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index 5875ddc..d8e1397 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -14,14 +14,13 @@ from ..abc import DataStore, EventSource, Trigger from ..datastores.memory import MemoryDataStore from ..enums import CoalescePolicy, ConflictPolicy, RunState from ..eventbrokers.local import LocalEventBroker -from ..events import ( - Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated, SubscriptionToken) +from ..events import Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated from ..marshalling import callable_to_ref from ..structures import Job, Schedule, Task from ..workers.sync import Worker -class Scheduler(EventSource): +class Scheduler: """A synchronous scheduler implementation.""" _state: RunState = RunState.stopped @@ -39,6 +38,10 @@ class Scheduler(EventSource): self._events = LocalEventBroker() @property + def events(self) -> EventSource: + return self._events + + @property def state(self) -> RunState: return self._state @@ -52,29 +55,29 @@ class Scheduler(EventSource): self._exit_stack.__enter__() self._exit_stack.enter_context(self._events) - # Initialize the data store + # Initialize the data store and start relaying events to the scheduler's event broker self._exit_stack.enter_context(self.data_store) - relay_token = self._events.relay_events_from(self.data_store.events) - self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token) + relay_subscription = self.data_store.events.subscribe(self._events.publish) + self._exit_stack.callback(relay_subscription.unsubscribe) # Wake up the scheduler if the data store emits a significant schedule event - wakeup_token = self.data_store.events.subscribe( + wakeup_subscription = self.data_store.events.subscribe( lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated}) - self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token) + self._exit_stack.callback(wakeup_subscription.unsubscribe) # Start the built-in worker, if configured to do so if self.start_worker: self._worker = Worker(self.data_store) self._exit_stack.enter_context(self._worker) - # Start the worker and return when it has signalled readiness or raised an exception + # Start the scheduler and return when it has signalled readiness or raised an exception start_future: Future[Event] = Future() - token = self._events.subscribe(start_future.set_result) + start_subscription = self._events.subscribe(start_future.set_result) run_future = self._executor.submit(self.run) try: wait([start_future, run_future], return_when=FIRST_COMPLETED) finally: - self._events.unsubscribe(token) + start_subscription.unsubscribe() if run_future.done(): run_future.result() @@ -88,13 +91,6 @@ class Scheduler(EventSource): self._exit_stack.__exit__(exc_type, exc_val, exc_tb) del self._wakeup_event - def subscribe(self, callback: Callable[[Event], Any], - event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken: - return self._events.subscribe(callback, event_types) - - def unsubscribe(self, token: SubscriptionToken) -> None: - self._events.unsubscribe(token) - def add_schedule( self, func_or_task_id: str | Callable, trigger: Trigger, *, id: Optional[str] = None, args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None, diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py index 7acef67..268a286 100644 --- a/src/apscheduler/workers/async_.py +++ b/src/apscheduler/workers/async_.py @@ -6,7 +6,7 @@ from contextlib import AsyncExitStack from datetime import datetime, timezone from inspect import isawaitable from logging import Logger, getLogger -from typing import Any, Callable, Iterable, Optional +from typing import Callable, Optional from uuid import UUID import anyio @@ -17,18 +17,16 @@ from ..abc import AsyncDataStore, DataStore, EventSource, Job from ..datastores.async_adapter import AsyncDataStoreAdapter from ..enums import JobOutcome, RunState from ..eventbrokers.async_local import LocalAsyncEventBroker -from ..events import ( - Event, JobAdded, JobEnded, JobStarted, SubscriptionToken, WorkerStarted, WorkerStopped) +from ..events import JobAdded, JobEnded, JobStarted, WorkerStarted, WorkerStopped from ..structures import JobResult -class AsyncWorker(EventSource): +class AsyncWorker: """Runs jobs locally in a task group.""" _stop_event: Optional[anyio.Event] = None _state: RunState = RunState.stopped _acquire_cancel_scope: Optional[CancelScope] = None - _datastore_subscription: SubscriptionToken _wakeup_event: anyio.Event def __init__(self, data_store: DataStore | AsyncDataStore, *, @@ -51,6 +49,10 @@ class AsyncWorker(EventSource): self.data_store = data_store @property + def events(self) -> EventSource: + return self._events + + @property def state(self) -> RunState: return self._state @@ -60,15 +62,15 @@ class AsyncWorker(EventSource): await self._exit_stack.__aenter__() await self._exit_stack.enter_async_context(self._events) - # Initialize the data store + # Initialize the data store and start relaying events to the worker's event broker await self._exit_stack.enter_async_context(self.data_store) - relay_token = self._events.relay_events_from(self.data_store.events) - self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token) + relay_subscription = self.data_store.events.subscribe(self._events.publish) + self._exit_stack.callback(relay_subscription.unsubscribe) # Wake up the worker if the data store emits a significant job event - wakeup_token = self.data_store.events.subscribe( + wakeup_subscription = self.data_store.events.subscribe( lambda event: self._wakeup_event.set(), {JobAdded}) - self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token) + self._exit_stack.callback(wakeup_subscription.unsubscribe) # Start the actual worker task_group = create_task_group() @@ -82,13 +84,6 @@ class AsyncWorker(EventSource): await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb) del self._wakeup_event - def subscribe(self, callback: Callable[[Event], Any], - event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken: - return self._events.subscribe(callback, event_types) - - def unsubscribe(self, token: SubscriptionToken) -> None: - self._events.unsubscribe(token) - async def run(self, *, task_status=TASK_STATUS_IGNORED) -> None: if self._state is not RunState.starting: raise RuntimeError(f'This function cannot be called while the worker is in the ' diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py index 45f0c5f..f9d3674 100644 --- a/src/apscheduler/workers/sync.py +++ b/src/apscheduler/workers/sync.py @@ -7,19 +7,17 @@ from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait from contextlib import ExitStack from datetime import datetime, timezone from logging import Logger, getLogger -from typing import Any, Callable, Iterable, Optional +from typing import Callable, Optional from uuid import UUID -from .. import events from ..abc import DataStore, EventSource from ..enums import JobOutcome, RunState from ..eventbrokers.local import LocalEventBroker -from ..events import ( - JobAdded, JobEnded, JobStarted, SubscriptionToken, WorkerStarted, WorkerStopped) +from ..events import JobAdded, JobEnded, JobStarted, WorkerStarted, WorkerStopped from ..structures import Job, JobResult -class Worker(EventSource): +class Worker: """Runs jobs locally in a thread pool.""" _executor: ThreadPoolExecutor @@ -42,6 +40,10 @@ class Worker(EventSource): self.data_store = data_store @property + def events(self) -> EventSource: + return self._events + + @property def state(self) -> RunState: return self._state @@ -51,25 +53,25 @@ class Worker(EventSource): self._exit_stack.__enter__() self._exit_stack.enter_context(self._events) - # Initialize the data store + # Initialize the data store and start relaying events to the worker's event broker self._exit_stack.enter_context(self.data_store) - relay_token = self._events.relay_events_from(self.data_store.events) - self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token) + relay_subscription = self.data_store.events.subscribe(self._events.publish) + self._exit_stack.callback(relay_subscription.unsubscribe) # Wake up the worker if the data store emits a significant job event - wakeup_token = self.data_store.events.subscribe( + wakeup_subscription = self.data_store.events.subscribe( lambda event: self._wakeup_event.set(), {JobAdded}) - self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token) + self._exit_stack.callback(wakeup_subscription.unsubscribe) # Start the worker and return when it has signalled readiness or raised an exception start_future: Future[None] = Future() - token = self._events.subscribe(start_future.set_result) + start_subscription = self._events.subscribe(start_future.set_result) self._executor = ThreadPoolExecutor(1) run_future = self._executor.submit(self.run) try: wait([start_future, run_future], return_when=FIRST_COMPLETED) finally: - self._events.unsubscribe(token) + start_subscription.unsubscribe() if run_future.done(): run_future.result() @@ -83,13 +85,6 @@ class Worker(EventSource): self._exit_stack.__exit__(exc_type, exc_val, exc_tb) del self._wakeup_event - def subscribe(self, callback: Callable[[events.Event], Any], - event_types: Optional[Iterable[type[events.Event]]] = None) -> SubscriptionToken: - return self._events.subscribe(callback, event_types) - - def unsubscribe(self, token: events.SubscriptionToken) -> None: - self._events.unsubscribe(token) - def run(self) -> None: if self._state is not RunState.starting: raise RuntimeError(f'This function cannot be called while the worker is in the ' diff --git a/tests/test_datastores.py b/tests/test_datastores.py index 4389069..e5b50a7 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -151,11 +151,11 @@ async def capture_events( events.append(event) if len(events) == limit: limit_event.set() - datastore.events.unsubscribe(token) + subscription.unsubscribe() events: List[Event] = [] limit_event = anyio.Event() - token = datastore.events.subscribe(listener, event_types) + subscription = datastore.events.subscribe(listener, event_types) yield events if limit: with anyio.fail_after(3): diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py index f1f88c0..7097001 100644 --- a/tests/test_eventbrokers.py +++ b/tests/test_eventbrokers.py @@ -105,11 +105,11 @@ class TestEventBroker: def test_unsubscribe(self, broker: EventBroker, caplog) -> None: queue = Queue() with broker: - token = broker.subscribe(queue.put_nowait) + subscription = broker.subscribe(queue.put_nowait) broker.publish(Event()) queue.get(timeout=3) - broker.unsubscribe(token) + subscription.unsubscribe() broker.publish(Event()) with pytest.raises(Empty): queue.get(timeout=0.1) @@ -168,12 +168,12 @@ class TestAsyncEventBroker: async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None: send, receive = create_memory_object_stream() async with async_broker: - token = async_broker.subscribe(send.send) + subscription = async_broker.subscribe(send.send) await async_broker.publish(Event()) with fail_after(3): await receive.receive() - async_broker.unsubscribe(token) + subscription.unsubscribe() await async_broker.publish(Event()) with pytest.raises(TimeoutError), fail_after(0.1): await receive.receive() diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 990cfec..63729bb 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -33,7 +33,7 @@ class TestAsyncScheduler: received_events: List[Event] = [] event = anyio.Event() scheduler = AsyncScheduler(start_worker=False) - scheduler.subscribe(listener) + scheduler.events.subscribe(listener) trigger = DateTrigger(datetime.now(timezone.utc)) async with scheduler: await scheduler.add_schedule(dummy_async_job, trigger, id='foo') @@ -84,7 +84,7 @@ class TestSyncScheduler: received_events: List[Event] = [] event = threading.Event() scheduler = Scheduler(start_worker=False) - scheduler.subscribe(listener) + scheduler.events.subscribe(listener) trigger = DateTrigger(datetime.now(timezone.utc)) with scheduler: scheduler.add_schedule(dummy_sync_job, trigger, id='foo') diff --git a/tests/test_workers.py b/tests/test_workers.py index 1a98801..f8522f3 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -49,7 +49,7 @@ class TestAsyncWorker: event = anyio.Event() data_store = MemoryDataStore() worker = AsyncWorker(data_store) - worker.subscribe(listener) + worker.events.subscribe(listener) async with worker: await worker.data_store.add_task(Task(id='task_id', func=target_func)) job = Job(task_id='task_id', args=(1, 2), kwargs={'x': 'foo', 'fail': fail}) @@ -108,7 +108,7 @@ class TestAsyncWorker: event = anyio.Event() data_store = MemoryDataStore() worker = AsyncWorker(data_store) - worker.subscribe(listener) + worker.events.subscribe(listener) async with worker: await worker.data_store.add_task(Task(id='task_id', func=fail_func)) job = Job(task_id='task_id', schedule_id='foo', @@ -162,7 +162,7 @@ class TestSyncWorker: event = threading.Event() data_store = MemoryDataStore() worker = Worker(data_store) - worker.subscribe(listener) + worker.events.subscribe(listener) with worker: worker.data_store.add_task(Task(id='task_id', func=sync_func)) job = Job(task_id='task_id', args=(1, 2), kwargs={'x': 'foo', 'fail': fail}) @@ -220,7 +220,7 @@ class TestSyncWorker: event = threading.Event() data_store = MemoryDataStore() worker = Worker(data_store) - worker.subscribe(listener) + worker.events.subscribe(listener) with worker: worker.data_store.add_task(Task(id='task_id', func=fail_func)) job = Job(task_id='task_id', schedule_id='foo', |