diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 15:13:19 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 15:13:19 +0300 |
commit | 4c7dab12eb64d23709df9ce1a2e248ce88f54f4a (patch) | |
tree | bee5d6a822228560c3f656718ce491c6810f8f65 | |
parent | 48722053dfb43de077df18a139abb16b0a7f7e24 (diff) | |
download | apscheduler-4c7dab12eb64d23709df9ce1a2e248ce88f54f4a.tar.gz |
Added context manager support to event subscriptions
-rw-r--r-- | src/apscheduler/abc.py | 13 | ||||
-rw-r--r-- | src/apscheduler/eventbrokers/async_adapter.py | 5 | ||||
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 11 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 18 | ||||
-rw-r--r-- | src/apscheduler/workers/async_.py | 9 | ||||
-rw-r--r-- | src/apscheduler/workers/sync.py | 18 |
6 files changed, 40 insertions, 34 deletions
diff --git a/src/apscheduler/abc.py b/src/apscheduler/abc.py index 4b2da42..3d85eed 100644 --- a/src/apscheduler/abc.py +++ b/src/apscheduler/abc.py @@ -65,13 +65,24 @@ class Serializer(metaclass=ABCMeta): class Subscription(metaclass=ABCMeta): + """ + Represents a subscription with an event source. + + If used as a context manager, unsubscribes on exit. + """ + + def __enter__(self) -> Subscription: + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.unsubscribe() + @abstractmethod def unsubscribe(self) -> None: """ Cancel this subscription. Does nothing if the subscription has already been cancelled. - """ diff --git a/src/apscheduler/eventbrokers/async_adapter.py b/src/apscheduler/eventbrokers/async_adapter.py index c10ac56..a91aae3 100644 --- a/src/apscheduler/eventbrokers/async_adapter.py +++ b/src/apscheduler/eventbrokers/async_adapter.py @@ -29,8 +29,9 @@ class AsyncEventBrokerAdapter(LocalAsyncEventBroker): self._exit_stack.push_async_exit(partial(to_thread.run_sync, self.original.__exit__)) # Relay events from the original broker to this one - relay_subscription = self.original.subscribe(partial(self.portal.call, self.publish_local)) - self._exit_stack.callback(relay_subscription.unsubscribe) + self._exit_stack.enter_context( + self.original.subscribe(partial(self.portal.call, self.publish_local)) + ) async def publish(self, event: Event) -> None: await to_thread.run_sync(self.original.publish, event) diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index dbece0e..213b4de 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -63,13 +63,14 @@ class AsyncScheduler: # Initialize the data store and start relaying events to the scheduler's event broker await self._exit_stack.enter_async_context(self.data_store) - relay_subscription = self.data_store.events.subscribe(self._events.publish) - self._exit_stack.callback(relay_subscription.unsubscribe) + self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish)) # Wake up the scheduler if the data store emits a significant schedule event - wakeup_subscription = self.data_store.events.subscribe( - lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated}) - self._exit_stack.callback(wakeup_subscription.unsubscribe) + self._exit_stack.enter_context( + self.data_store.events.subscribe( + lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated} + ) + ) # Start the built-in worker, if configured to do so if self.start_worker: diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index d8e1397..dd3f37e 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -57,13 +57,14 @@ class Scheduler: # Initialize the data store and start relaying events to the scheduler's event broker self._exit_stack.enter_context(self.data_store) - relay_subscription = self.data_store.events.subscribe(self._events.publish) - self._exit_stack.callback(relay_subscription.unsubscribe) + self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish)) # Wake up the scheduler if the data store emits a significant schedule event - wakeup_subscription = self.data_store.events.subscribe( - lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated}) - self._exit_stack.callback(wakeup_subscription.unsubscribe) + self._exit_stack.enter_context( + self.data_store.events.subscribe( + lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated} + ) + ) # Start the built-in worker, if configured to do so if self.start_worker: @@ -72,12 +73,9 @@ class Scheduler: # Start the scheduler and return when it has signalled readiness or raised an exception start_future: Future[Event] = Future() - start_subscription = self._events.subscribe(start_future.set_result) - run_future = self._executor.submit(self.run) - try: + with self._events.subscribe(start_future.set_result): + run_future = self._executor.submit(self.run) wait([start_future, run_future], return_when=FIRST_COMPLETED) - finally: - start_subscription.unsubscribe() if run_future.done(): run_future.result() diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py index 268a286..5e872cf 100644 --- a/src/apscheduler/workers/async_.py +++ b/src/apscheduler/workers/async_.py @@ -64,13 +64,12 @@ class AsyncWorker: # Initialize the data store and start relaying events to the worker's event broker await self._exit_stack.enter_async_context(self.data_store) - relay_subscription = self.data_store.events.subscribe(self._events.publish) - self._exit_stack.callback(relay_subscription.unsubscribe) + self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish)) # Wake up the worker if the data store emits a significant job event - wakeup_subscription = self.data_store.events.subscribe( - lambda event: self._wakeup_event.set(), {JobAdded}) - self._exit_stack.callback(wakeup_subscription.unsubscribe) + self._exit_stack.enter_context( + self.data_store.events.subscribe(lambda event: self._wakeup_event.set(), {JobAdded}) + ) # Start the actual worker task_group = create_task_group() diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py index f9d3674..824cce8 100644 --- a/src/apscheduler/workers/sync.py +++ b/src/apscheduler/workers/sync.py @@ -55,23 +55,19 @@ class Worker: # Initialize the data store and start relaying events to the worker's event broker self._exit_stack.enter_context(self.data_store) - relay_subscription = self.data_store.events.subscribe(self._events.publish) - self._exit_stack.callback(relay_subscription.unsubscribe) + self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish)) # Wake up the worker if the data store emits a significant job event - wakeup_subscription = self.data_store.events.subscribe( - lambda event: self._wakeup_event.set(), {JobAdded}) - self._exit_stack.callback(wakeup_subscription.unsubscribe) + self._exit_stack.enter_context( + self.data_store.events.subscribe(lambda event: self._wakeup_event.set(), {JobAdded}) + ) # Start the worker and return when it has signalled readiness or raised an exception start_future: Future[None] = Future() - start_subscription = self._events.subscribe(start_future.set_result) - self._executor = ThreadPoolExecutor(1) - run_future = self._executor.submit(self.run) - try: + with self._events.subscribe(start_future.set_result): + self._executor = ThreadPoolExecutor(1) + run_future = self._executor.submit(self.run) wait([start_future, run_future], return_when=FIRST_COMPLETED) - finally: - start_subscription.unsubscribe() if run_future.done(): run_future.result() |