summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-09-04 12:18:15 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-09-04 12:18:15 +0300
commite47af449940a29452a88f61c173e153139ed5b14 (patch)
tree504ba0631a0fe061a44d76ad244d98fb06340b1b
parent2a0eac6fb42391cd5f356a94fa77b901d4b6bcd8 (diff)
downloadapscheduler-e47af449940a29452a88f61c173e153139ed5b14.tar.gz
Fixed resource warnings in the asyncpg event broker when the listener task is cancelled
-rw-r--r--src/apscheduler/eventbrokers/asyncpg.py22
1 files changed, 16 insertions, 6 deletions
diff --git a/src/apscheduler/eventbrokers/asyncpg.py b/src/apscheduler/eventbrokers/asyncpg.py
index 27b1bed..7e3045e 100644
--- a/src/apscheduler/eventbrokers/asyncpg.py
+++ b/src/apscheduler/eventbrokers/asyncpg.py
@@ -145,13 +145,25 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin):
self._logger.info("Stopped event broker")
async def _listen_notifications(self, *, task_status=TASK_STATUS_IGNORED) -> None:
- def callback(
+ conn: Connection
+
+ def listen_callback(
connection: Connection, pid: int, channel: str, payload: str
) -> None:
event = self.reconstitute_event_str(payload)
if event is not None:
self._task_group.start_soon(self.publish_local, event)
+ async def close_connection() -> None:
+ if not conn.is_closed():
+ with move_on_after(3, shield=True):
+ await conn.close()
+
+ async def unsubscribe() -> None:
+ if not conn.is_closed():
+ with move_on_after(3, shield=True):
+ await conn.remove_listener(self.channel, listen_callback)
+
task_started_sent = False
send, receive = create_memory_object_stream(100, str)
while True:
@@ -160,13 +172,11 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin):
with attempt:
conn = await self.connection_factory()
- exit_stack.push_async_callback(conn.close)
+ exit_stack.push_async_callback(close_connection)
self._logger.info("Connection established")
try:
- await conn.add_listener(self.channel, callback)
- exit_stack.push_async_callback(
- conn.remove_listener, self.channel, callback
- )
+ await conn.add_listener(self.channel, listen_callback)
+ exit_stack.push_async_callback(unsubscribe)
if not task_started_sent:
task_status.started(send)
task_started_sent = True