diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 01:47:31 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 01:47:31 +0300 |
commit | a58fca290e0831d377d496a69101e5e3dc4c604e (patch) | |
tree | 8beb7504e7113ff1f01fb610513bb72745fa91ba /src/apscheduler/eventbrokers/asyncpg.py | |
parent | 59ea7376985ef2c8b8b6b6d6df6b1b3be958480c (diff) | |
download | apscheduler-a58fca290e0831d377d496a69101e5e3dc4c604e.tar.gz |
Refactored event brokers to use exit stacks
Diffstat (limited to 'src/apscheduler/eventbrokers/asyncpg.py')
-rw-r--r-- | src/apscheduler/eventbrokers/asyncpg.py | 11 |
1 files changed, 2 insertions, 9 deletions
diff --git a/src/apscheduler/eventbrokers/asyncpg.py b/src/apscheduler/eventbrokers/asyncpg.py index 447bee8..93bfd6a 100644 --- a/src/apscheduler/eventbrokers/asyncpg.py +++ b/src/apscheduler/eventbrokers/asyncpg.py @@ -1,7 +1,6 @@ from __future__ import annotations from contextlib import asynccontextmanager -from logging import Logger, getLogger from typing import TYPE_CHECKING, AsyncContextManager, AsyncGenerator, Callable import attr @@ -25,7 +24,6 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin): connection_factory: Callable[[], AsyncContextManager[Connection]] channel: str = attr.field(kw_only=True, default='apscheduler') max_idle_time: float = attr.field(kw_only=True, default=30) - _logger: Logger = attr.field(init=False, factory=lambda: getLogger(__name__)) @classmethod def from_asyncpg_pool(cls, pool: Pool) -> AsyncpgEventBroker: @@ -50,19 +48,14 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin): async def __aenter__(self) -> LocalAsyncEventBroker: await super().__aenter__() await self._task_group.start(self._listen_notifications) + self._exit_stack.callback(self._task_group.cancel_scope.cancel) return self - async def __aexit__(self, exc_type, exc_val, exc_tb): - self._task_group.cancel_scope.cancel() - await super().__aexit__(exc_type, exc_val, exc_tb) - async def _listen_notifications(self, *, task_status=TASK_STATUS_IGNORED) -> None: - local_publish = super(AsyncpgEventBroker, self).publish - def callback(connection, pid, channel: str, payload: str) -> None: event = self.reconstitute_event_str(payload) if event is not None: - self._task_group.start_soon(local_publish, event) + self._task_group.start_soon(self.publish_local, event) task_started_sent = False while True: |