diff options
author | Asif Saif Uddin (Auvi) <auvipy@gmail.com> | 2020-08-12 15:34:55 +0600 |
---|---|---|
committer | Asif Saif Uddin (Auvi) <auvipy@gmail.com> | 2020-08-12 15:34:55 +0600 |
commit | 3f1ba7e24e9c4d8f5f0a73900d2297bbd4ba9448 (patch) | |
tree | 4e8c2f33d244fe6564e89b3b79eff31a985a8c4a | |
parent | 63a77efd6d0777e48fb88f78454a632b22792a3d (diff) | |
download | kombu-await.tar.gz |
initial attempt for async/await to common functionsawait
-rw-r--r-- | kombu/common.py | 14 |
1 files changed, 7 insertions, 7 deletions
diff --git a/kombu/common.py b/kombu/common.py index 6ab33098..5cfb4816 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -112,11 +112,11 @@ def declaration_cached(entity, channel): return entity in channel.connection.client.declared_entities -def maybe_declare(entity, channel=None, retry=False, **retry_policy): +async def maybe_declare(entity, channel=None, retry=False, **retry_policy): """Declare entity (cached).""" if retry: - return _imaybe_declare(entity, channel, **retry_policy) - return _maybe_declare(entity, channel) + return await _imaybe_declare(entity, channel, **retry_policy) + return await _maybe_declare(entity, channel) def _ensure_channel_is_bound(entity, channel): @@ -135,7 +135,7 @@ def _ensure_channel_is_bound(entity, channel): return entity -def _maybe_declare(entity, channel): +async def _maybe_declare(entity, channel): # _maybe_declare sets name on original for autogen queues orig = entity @@ -156,7 +156,7 @@ def _maybe_declare(entity, channel): if not channel.connection: raise RecoverableConnectionError('channel disconnected') - entity.declare(channel=channel) + await entity.declare(channel=channel) if declared is not None and ident: declared.add(ident) if orig is not None: @@ -164,13 +164,13 @@ def _maybe_declare(entity, channel): return True -def _imaybe_declare(entity, channel, **retry_policy): +async def _imaybe_declare(entity, channel, **retry_policy): _ensure_channel_is_bound(entity, channel) if not entity.channel.connection: raise RecoverableConnectionError('channel disconnected') - return entity.channel.connection.client.ensure( + return await entity.channel.connection.client.ensure( entity, _maybe_declare, **retry_policy)(entity, channel) |