summaryrefslogtreecommitdiff
path: root/src/apscheduler/eventbrokers/async_local.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/apscheduler/eventbrokers/async_local.py')
-rw-r--r--src/apscheduler/eventbrokers/async_local.py26
1 files changed, 16 insertions, 10 deletions
diff --git a/src/apscheduler/eventbrokers/async_local.py b/src/apscheduler/eventbrokers/async_local.py
index 590f0cb..79030f3 100644
--- a/src/apscheduler/eventbrokers/async_local.py
+++ b/src/apscheduler/eventbrokers/async_local.py
@@ -36,15 +36,21 @@ class LocalAsyncEventBroker(AsyncEventBroker, BaseEventBroker):
await self.publish_local(event)
async def publish_local(self, event: Event) -> None:
- async def deliver_event(func: Callable[[Event], Any]) -> None:
- try:
- retval = func(event)
- if iscoroutine(retval):
- await retval
- except BaseException:
- self._logger.exception('Error delivering %s event', event.__class__.__name__)
-
event_type = type(event)
- for subscription in self._subscriptions.values():
+ one_shot_tokens: list[object] = []
+ for token, subscription in self._subscriptions.items():
if subscription.event_types is None or event_type in subscription.event_types:
- self._task_group.start_soon(deliver_event, subscription.callback)
+ self._task_group.start_soon(self._deliver_event, subscription.callback, event)
+ if subscription.one_shot:
+ one_shot_tokens.append(subscription.token)
+
+ for token in one_shot_tokens:
+ super().unsubscribe(token)
+
+ async def _deliver_event(self, func: Callable[[Event], Any], event: Event) -> None:
+ try:
+ retval = func(event)
+ if iscoroutine(retval):
+ await retval
+ except BaseException:
+ self._logger.exception('Error delivering %s event', event.__class__.__name__)