summaryrefslogtreecommitdiff
path: root/src/apscheduler/eventbrokers/local.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/apscheduler/eventbrokers/local.py')
-rw-r--r--src/apscheduler/eventbrokers/local.py13
1 files changed, 10 insertions, 3 deletions
diff --git a/src/apscheduler/eventbrokers/local.py b/src/apscheduler/eventbrokers/local.py
index 24de3eb..acf0c9a 100644
--- a/src/apscheduler/eventbrokers/local.py
+++ b/src/apscheduler/eventbrokers/local.py
@@ -31,13 +31,14 @@ class LocalEventBroker(BaseEventBroker):
del self._executor
def subscribe(self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None) -> Subscription:
+ event_types: Optional[Iterable[type[Event]]] = None, *,
+ one_shot: bool = False) -> Subscription:
if iscoroutinefunction(callback):
raise ValueError('Coroutine functions are not supported as callbacks on a synchronous '
'event source')
with self._subscriptions_lock:
- return super().subscribe(callback, event_types)
+ return super().subscribe(callback, event_types, one_shot=one_shot)
def unsubscribe(self, token: object) -> None:
with self._subscriptions_lock:
@@ -49,9 +50,15 @@ class LocalEventBroker(BaseEventBroker):
def publish_local(self, event: Event) -> None:
event_type = type(event)
with self._subscriptions_lock:
- for subscription in self._subscriptions.values():
+ one_shot_tokens: list[object] = []
+ for token, subscription in self._subscriptions.items():
if subscription.event_types is None or event_type in subscription.event_types:
self._executor.submit(self._deliver_event, subscription.callback, event)
+ if subscription.one_shot:
+ one_shot_tokens.append(subscription.token)
+
+ for token in one_shot_tokens:
+ super().unsubscribe(token)
def _deliver_event(self, func: Callable[[Event], Any], event: Event) -> None:
try: