diff options
author | Denis Kubashevskiy <dehax12@gmail.com> | 2022-01-28 17:16:22 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-28 20:16:22 +0600 |
commit | 3ec6dc0fd1e9af32bddd058dda105d97804cdbdd (patch) | |
tree | 8c1f4fdef3ca98c47ff8585184e063b0c61d9904 /t | |
parent | 5bed2a8f983a3bf61c12443e7704ffd89991ef9a (diff) | |
download | kombu-3ec6dc0fd1e9af32bddd058dda105d97804cdbdd.tar.gz |
Set redelivered property for Celery with Redis (#1484)
* Set redelivered property for Celery with Redis
Fixed setting `redelivered` value for Celery when Redis broker is used.
* Add `test_do_restore_message_celery` test
* Fix long line
Diffstat (limited to 't')
-rw-r--r-- | t/unit/transport/test_redis.py | 63 |
1 files changed, 63 insertions, 0 deletions
diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index 7046cf7b..ce448b37 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -1,3 +1,5 @@ +import base64 +import copy import socket import types from collections import defaultdict @@ -399,6 +401,67 @@ class test_Channel: ) crit.assert_called() + def test_do_restore_message_celery(self): + # Payload value from real Celery project + payload = { + "body": base64.b64encode(dumps([ + [], + {}, + { + "callbacks": None, + "errbacks": None, + "chain": None, + "chord": None, + }, + ]).encode()).decode(), + "content-encoding": "utf-8", + "content-type": "application/json", + "headers": { + "lang": "py", + "task": "common.tasks.test_task", + "id": "980ad2bf-104c-4ce0-8643-67d1947173f6", + "shadow": None, + "eta": None, + "expires": None, + "group": None, + "group_index": None, + "retries": 0, + "timelimit": [None, None], + "root_id": "980ad2bf-104c-4ce0-8643-67d1947173f6", + "parent_id": None, + "argsrepr": "()", + "kwargsrepr": "{}", + "origin": "gen3437@Desktop", + "ignore_result": False, + }, + "properties": { + "correlation_id": "980ad2bf-104c-4ce0-8643-67d1947173f6", + "reply_to": "512f2489-ca40-3585-bc10-9b801a981782", + "delivery_mode": 2, + "delivery_info": { + "exchange": "", + "routing_key": "celery", + }, + "priority": 0, + "body_encoding": "base64", + "delivery_tag": "badb725e-9c3e-45be-b0a4-07e44630519f", + }, + } + result_payload = copy.deepcopy(payload) + result_payload['headers']['redelivered'] = True + result_payload['properties']['delivery_info']['redelivered'] = True + queue = 'celery' + + client = Mock(name='client') + lookup = self.channel._lookup = Mock(name='_lookup') + lookup.return_value = [queue] + + self.channel._do_restore_message( + payload, 'exchange', 'routing_key', client, + ) + + client.rpush.assert_called_with(queue, dumps(result_payload)) + def test_restore_no_messages(self): message = Mock(name='message') |