summaryrefslogtreecommitdiff
path: root/src/apscheduler/eventbrokers/asyncpg.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/apscheduler/eventbrokers/asyncpg.py')
-rw-r--r--src/apscheduler/eventbrokers/asyncpg.py11
1 files changed, 2 insertions, 9 deletions
diff --git a/src/apscheduler/eventbrokers/asyncpg.py b/src/apscheduler/eventbrokers/asyncpg.py
index 447bee8..93bfd6a 100644
--- a/src/apscheduler/eventbrokers/asyncpg.py
+++ b/src/apscheduler/eventbrokers/asyncpg.py
@@ -1,7 +1,6 @@
from __future__ import annotations
from contextlib import asynccontextmanager
-from logging import Logger, getLogger
from typing import TYPE_CHECKING, AsyncContextManager, AsyncGenerator, Callable
import attr
@@ -25,7 +24,6 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin):
connection_factory: Callable[[], AsyncContextManager[Connection]]
channel: str = attr.field(kw_only=True, default='apscheduler')
max_idle_time: float = attr.field(kw_only=True, default=30)
- _logger: Logger = attr.field(init=False, factory=lambda: getLogger(__name__))
@classmethod
def from_asyncpg_pool(cls, pool: Pool) -> AsyncpgEventBroker:
@@ -50,19 +48,14 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin):
async def __aenter__(self) -> LocalAsyncEventBroker:
await super().__aenter__()
await self._task_group.start(self._listen_notifications)
+ self._exit_stack.callback(self._task_group.cancel_scope.cancel)
return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- self._task_group.cancel_scope.cancel()
- await super().__aexit__(exc_type, exc_val, exc_tb)
-
async def _listen_notifications(self, *, task_status=TASK_STATUS_IGNORED) -> None:
- local_publish = super(AsyncpgEventBroker, self).publish
-
def callback(connection, pid, channel: str, payload: str) -> None:
event = self.reconstitute_event_str(payload)
if event is not None:
- self._task_group.start_soon(local_publish, event)
+ self._task_group.start_soon(self.publish_local, event)
task_started_sent = False
while True: