summaryrefslogtreecommitdiff
path: root/t/unit/transport/test_redis.py
diff options
context:
space:
mode:
Diffstat (limited to 't/unit/transport/test_redis.py')
-rw-r--r--t/unit/transport/test_redis.py63
1 files changed, 63 insertions, 0 deletions
diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py
index 7046cf7b..ce448b37 100644
--- a/t/unit/transport/test_redis.py
+++ b/t/unit/transport/test_redis.py
@@ -1,3 +1,5 @@
+import base64
+import copy
import socket
import types
from collections import defaultdict
@@ -399,6 +401,67 @@ class test_Channel:
)
crit.assert_called()
+ def test_do_restore_message_celery(self):
+ # Payload value from real Celery project
+ payload = {
+ "body": base64.b64encode(dumps([
+ [],
+ {},
+ {
+ "callbacks": None,
+ "errbacks": None,
+ "chain": None,
+ "chord": None,
+ },
+ ]).encode()).decode(),
+ "content-encoding": "utf-8",
+ "content-type": "application/json",
+ "headers": {
+ "lang": "py",
+ "task": "common.tasks.test_task",
+ "id": "980ad2bf-104c-4ce0-8643-67d1947173f6",
+ "shadow": None,
+ "eta": None,
+ "expires": None,
+ "group": None,
+ "group_index": None,
+ "retries": 0,
+ "timelimit": [None, None],
+ "root_id": "980ad2bf-104c-4ce0-8643-67d1947173f6",
+ "parent_id": None,
+ "argsrepr": "()",
+ "kwargsrepr": "{}",
+ "origin": "gen3437@Desktop",
+ "ignore_result": False,
+ },
+ "properties": {
+ "correlation_id": "980ad2bf-104c-4ce0-8643-67d1947173f6",
+ "reply_to": "512f2489-ca40-3585-bc10-9b801a981782",
+ "delivery_mode": 2,
+ "delivery_info": {
+ "exchange": "",
+ "routing_key": "celery",
+ },
+ "priority": 0,
+ "body_encoding": "base64",
+ "delivery_tag": "badb725e-9c3e-45be-b0a4-07e44630519f",
+ },
+ }
+ result_payload = copy.deepcopy(payload)
+ result_payload['headers']['redelivered'] = True
+ result_payload['properties']['delivery_info']['redelivered'] = True
+ queue = 'celery'
+
+ client = Mock(name='client')
+ lookup = self.channel._lookup = Mock(name='_lookup')
+ lookup.return_value = [queue]
+
+ self.channel._do_restore_message(
+ payload, 'exchange', 'routing_key', client,
+ )
+
+ client.rpush.assert_called_with(queue, dumps(result_payload))
+
def test_restore_no_messages(self):
message = Mock(name='message')