diff options
Diffstat (limited to 'src/apscheduler/eventbrokers/async_local.py')
-rw-r--r-- | src/apscheduler/eventbrokers/async_local.py | 26 |
1 files changed, 16 insertions, 10 deletions
diff --git a/src/apscheduler/eventbrokers/async_local.py b/src/apscheduler/eventbrokers/async_local.py index 590f0cb..79030f3 100644 --- a/src/apscheduler/eventbrokers/async_local.py +++ b/src/apscheduler/eventbrokers/async_local.py @@ -36,15 +36,21 @@ class LocalAsyncEventBroker(AsyncEventBroker, BaseEventBroker): await self.publish_local(event) async def publish_local(self, event: Event) -> None: - async def deliver_event(func: Callable[[Event], Any]) -> None: - try: - retval = func(event) - if iscoroutine(retval): - await retval - except BaseException: - self._logger.exception('Error delivering %s event', event.__class__.__name__) - event_type = type(event) - for subscription in self._subscriptions.values(): + one_shot_tokens: list[object] = [] + for token, subscription in self._subscriptions.items(): if subscription.event_types is None or event_type in subscription.event_types: - self._task_group.start_soon(deliver_event, subscription.callback) + self._task_group.start_soon(self._deliver_event, subscription.callback, event) + if subscription.one_shot: + one_shot_tokens.append(subscription.token) + + for token in one_shot_tokens: + super().unsubscribe(token) + + async def _deliver_event(self, func: Callable[[Event], Any], event: Event) -> None: + try: + retval = func(event) + if iscoroutine(retval): + await retval + except BaseException: + self._logger.exception('Error delivering %s event', event.__class__.__name__) |