summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 15:13:19 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 15:13:19 +0300
commit4c7dab12eb64d23709df9ce1a2e248ce88f54f4a (patch)
treebee5d6a822228560c3f656718ce491c6810f8f65
parent48722053dfb43de077df18a139abb16b0a7f7e24 (diff)
downloadapscheduler-4c7dab12eb64d23709df9ce1a2e248ce88f54f4a.tar.gz
Added context manager support to event subscriptions
-rw-r--r--src/apscheduler/abc.py13
-rw-r--r--src/apscheduler/eventbrokers/async_adapter.py5
-rw-r--r--src/apscheduler/schedulers/async_.py11
-rw-r--r--src/apscheduler/schedulers/sync.py18
-rw-r--r--src/apscheduler/workers/async_.py9
-rw-r--r--src/apscheduler/workers/sync.py18
6 files changed, 40 insertions, 34 deletions
diff --git a/src/apscheduler/abc.py b/src/apscheduler/abc.py
index 4b2da42..3d85eed 100644
--- a/src/apscheduler/abc.py
+++ b/src/apscheduler/abc.py
@@ -65,13 +65,24 @@ class Serializer(metaclass=ABCMeta):
class Subscription(metaclass=ABCMeta):
+ """
+ Represents a subscription with an event source.
+
+ If used as a context manager, unsubscribes on exit.
+ """
+
+ def __enter__(self) -> Subscription:
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
+ self.unsubscribe()
+
@abstractmethod
def unsubscribe(self) -> None:
"""
Cancel this subscription.
Does nothing if the subscription has already been cancelled.
-
"""
diff --git a/src/apscheduler/eventbrokers/async_adapter.py b/src/apscheduler/eventbrokers/async_adapter.py
index c10ac56..a91aae3 100644
--- a/src/apscheduler/eventbrokers/async_adapter.py
+++ b/src/apscheduler/eventbrokers/async_adapter.py
@@ -29,8 +29,9 @@ class AsyncEventBrokerAdapter(LocalAsyncEventBroker):
self._exit_stack.push_async_exit(partial(to_thread.run_sync, self.original.__exit__))
# Relay events from the original broker to this one
- relay_subscription = self.original.subscribe(partial(self.portal.call, self.publish_local))
- self._exit_stack.callback(relay_subscription.unsubscribe)
+ self._exit_stack.enter_context(
+ self.original.subscribe(partial(self.portal.call, self.publish_local))
+ )
async def publish(self, event: Event) -> None:
await to_thread.run_sync(self.original.publish, event)
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index dbece0e..213b4de 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -63,13 +63,14 @@ class AsyncScheduler:
# Initialize the data store and start relaying events to the scheduler's event broker
await self._exit_stack.enter_async_context(self.data_store)
- relay_subscription = self.data_store.events.subscribe(self._events.publish)
- self._exit_stack.callback(relay_subscription.unsubscribe)
+ self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish))
# Wake up the scheduler if the data store emits a significant schedule event
- wakeup_subscription = self.data_store.events.subscribe(
- lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated})
- self._exit_stack.callback(wakeup_subscription.unsubscribe)
+ self._exit_stack.enter_context(
+ self.data_store.events.subscribe(
+ lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated}
+ )
+ )
# Start the built-in worker, if configured to do so
if self.start_worker:
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index d8e1397..dd3f37e 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -57,13 +57,14 @@ class Scheduler:
# Initialize the data store and start relaying events to the scheduler's event broker
self._exit_stack.enter_context(self.data_store)
- relay_subscription = self.data_store.events.subscribe(self._events.publish)
- self._exit_stack.callback(relay_subscription.unsubscribe)
+ self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish))
# Wake up the scheduler if the data store emits a significant schedule event
- wakeup_subscription = self.data_store.events.subscribe(
- lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated})
- self._exit_stack.callback(wakeup_subscription.unsubscribe)
+ self._exit_stack.enter_context(
+ self.data_store.events.subscribe(
+ lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated}
+ )
+ )
# Start the built-in worker, if configured to do so
if self.start_worker:
@@ -72,12 +73,9 @@ class Scheduler:
# Start the scheduler and return when it has signalled readiness or raised an exception
start_future: Future[Event] = Future()
- start_subscription = self._events.subscribe(start_future.set_result)
- run_future = self._executor.submit(self.run)
- try:
+ with self._events.subscribe(start_future.set_result):
+ run_future = self._executor.submit(self.run)
wait([start_future, run_future], return_when=FIRST_COMPLETED)
- finally:
- start_subscription.unsubscribe()
if run_future.done():
run_future.result()
diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py
index 268a286..5e872cf 100644
--- a/src/apscheduler/workers/async_.py
+++ b/src/apscheduler/workers/async_.py
@@ -64,13 +64,12 @@ class AsyncWorker:
# Initialize the data store and start relaying events to the worker's event broker
await self._exit_stack.enter_async_context(self.data_store)
- relay_subscription = self.data_store.events.subscribe(self._events.publish)
- self._exit_stack.callback(relay_subscription.unsubscribe)
+ self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish))
# Wake up the worker if the data store emits a significant job event
- wakeup_subscription = self.data_store.events.subscribe(
- lambda event: self._wakeup_event.set(), {JobAdded})
- self._exit_stack.callback(wakeup_subscription.unsubscribe)
+ self._exit_stack.enter_context(
+ self.data_store.events.subscribe(lambda event: self._wakeup_event.set(), {JobAdded})
+ )
# Start the actual worker
task_group = create_task_group()
diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py
index f9d3674..824cce8 100644
--- a/src/apscheduler/workers/sync.py
+++ b/src/apscheduler/workers/sync.py
@@ -55,23 +55,19 @@ class Worker:
# Initialize the data store and start relaying events to the worker's event broker
self._exit_stack.enter_context(self.data_store)
- relay_subscription = self.data_store.events.subscribe(self._events.publish)
- self._exit_stack.callback(relay_subscription.unsubscribe)
+ self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish))
# Wake up the worker if the data store emits a significant job event
- wakeup_subscription = self.data_store.events.subscribe(
- lambda event: self._wakeup_event.set(), {JobAdded})
- self._exit_stack.callback(wakeup_subscription.unsubscribe)
+ self._exit_stack.enter_context(
+ self.data_store.events.subscribe(lambda event: self._wakeup_event.set(), {JobAdded})
+ )
# Start the worker and return when it has signalled readiness or raised an exception
start_future: Future[None] = Future()
- start_subscription = self._events.subscribe(start_future.set_result)
- self._executor = ThreadPoolExecutor(1)
- run_future = self._executor.submit(self.run)
- try:
+ with self._events.subscribe(start_future.set_result):
+ self._executor = ThreadPoolExecutor(1)
+ run_future = self._executor.submit(self.run)
wait([start_future, run_future], return_when=FIRST_COMPLETED)
- finally:
- start_subscription.unsubscribe()
if run_future.done():
run_future.result()