summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsif Saif Uddin (Auvi) <auvipy@gmail.com>2020-08-12 15:34:55 +0600
committerAsif Saif Uddin (Auvi) <auvipy@gmail.com>2020-08-12 15:34:55 +0600
commit3f1ba7e24e9c4d8f5f0a73900d2297bbd4ba9448 (patch)
tree4e8c2f33d244fe6564e89b3b79eff31a985a8c4a
parent63a77efd6d0777e48fb88f78454a632b22792a3d (diff)
downloadkombu-await.tar.gz
initial attempt for async/await to common functionsawait
-rw-r--r--kombu/common.py14
1 files changed, 7 insertions, 7 deletions
diff --git a/kombu/common.py b/kombu/common.py
index 6ab33098..5cfb4816 100644
--- a/kombu/common.py
+++ b/kombu/common.py
@@ -112,11 +112,11 @@ def declaration_cached(entity, channel):
return entity in channel.connection.client.declared_entities
-def maybe_declare(entity, channel=None, retry=False, **retry_policy):
+async def maybe_declare(entity, channel=None, retry=False, **retry_policy):
"""Declare entity (cached)."""
if retry:
- return _imaybe_declare(entity, channel, **retry_policy)
- return _maybe_declare(entity, channel)
+ return await _imaybe_declare(entity, channel, **retry_policy)
+ return await _maybe_declare(entity, channel)
def _ensure_channel_is_bound(entity, channel):
@@ -135,7 +135,7 @@ def _ensure_channel_is_bound(entity, channel):
return entity
-def _maybe_declare(entity, channel):
+async def _maybe_declare(entity, channel):
# _maybe_declare sets name on original for autogen queues
orig = entity
@@ -156,7 +156,7 @@ def _maybe_declare(entity, channel):
if not channel.connection:
raise RecoverableConnectionError('channel disconnected')
- entity.declare(channel=channel)
+ await entity.declare(channel=channel)
if declared is not None and ident:
declared.add(ident)
if orig is not None:
@@ -164,13 +164,13 @@ def _maybe_declare(entity, channel):
return True
-def _imaybe_declare(entity, channel, **retry_policy):
+async def _imaybe_declare(entity, channel, **retry_policy):
_ensure_channel_is_bound(entity, channel)
if not entity.channel.connection:
raise RecoverableConnectionError('channel disconnected')
- return entity.channel.connection.client.ensure(
+ return await entity.channel.connection.client.ensure(
entity, _maybe_declare, **retry_policy)(entity, channel)