summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 15:18:58 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 15:18:58 +0300
commit0a6b0f683edee8bf22d85dc655ad61a8285fd312 (patch)
treee011f50fa9fe3ced09efc3ff41d8c4773c3de9ee
parent4c7dab12eb64d23709df9ce1a2e248ce88f54f4a (diff)
downloadapscheduler-0a6b0f683edee8bf22d85dc655ad61a8285fd312.tar.gz
Guard subscriptions in the synchronous local event broker with a lock
This allows the local event broker to safely iterate through the original list of subscriptions while publishing an event, instead of having to atomically make a shallow copy every time.
-rw-r--r--src/apscheduler/eventbrokers/local.py16
1 files changed, 12 insertions, 4 deletions
diff --git a/src/apscheduler/eventbrokers/local.py b/src/apscheduler/eventbrokers/local.py
index e5db7cd..24de3eb 100644
--- a/src/apscheduler/eventbrokers/local.py
+++ b/src/apscheduler/eventbrokers/local.py
@@ -3,6 +3,7 @@ from __future__ import annotations
from asyncio import iscoroutinefunction
from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack
+from threading import Lock
from typing import Any, Callable, Iterable, Optional
import attr
@@ -18,6 +19,7 @@ from .base import BaseEventBroker
class LocalEventBroker(BaseEventBroker):
_executor: ThreadPoolExecutor = attr.field(init=False)
_exit_stack: ExitStack = attr.field(init=False)
+ _subscriptions_lock: Lock = attr.field(init=False, factory=Lock)
def __enter__(self):
self._exit_stack = ExitStack()
@@ -34,16 +36,22 @@ class LocalEventBroker(BaseEventBroker):
raise ValueError('Coroutine functions are not supported as callbacks on a synchronous '
'event source')
- return super().subscribe(callback, event_types)
+ with self._subscriptions_lock:
+ return super().subscribe(callback, event_types)
+
+ def unsubscribe(self, token: object) -> None:
+ with self._subscriptions_lock:
+ super().unsubscribe(token)
def publish(self, event: Event) -> None:
self.publish_local(event)
def publish_local(self, event: Event) -> None:
event_type = type(event)
- for subscription in list(self._subscriptions.values()):
- if subscription.event_types is None or event_type in subscription.event_types:
- self._executor.submit(self._deliver_event, subscription.callback, event)
+ with self._subscriptions_lock:
+ for subscription in self._subscriptions.values():
+ if subscription.event_types is None or event_type in subscription.event_types:
+ self._executor.submit(self._deliver_event, subscription.callback, event)
def _deliver_event(self, func: Callable[[Event], Any], event: Event) -> None:
try: