summaryrefslogtreecommitdiff
path: root/t
diff options
context:
space:
mode:
authorDenis Kubashevskiy <dehax12@gmail.com>2022-01-28 17:16:22 +0300
committerGitHub <noreply@github.com>2022-01-28 20:16:22 +0600
commit3ec6dc0fd1e9af32bddd058dda105d97804cdbdd (patch)
tree8c1f4fdef3ca98c47ff8585184e063b0c61d9904 /t
parent5bed2a8f983a3bf61c12443e7704ffd89991ef9a (diff)
downloadkombu-3ec6dc0fd1e9af32bddd058dda105d97804cdbdd.tar.gz
Set redelivered property for Celery with Redis (#1484)
* Set redelivered property for Celery with Redis Fixed setting `redelivered` value for Celery when Redis broker is used. * Add `test_do_restore_message_celery` test * Fix long line
Diffstat (limited to 't')
-rw-r--r--t/unit/transport/test_redis.py63
1 files changed, 63 insertions, 0 deletions
diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py
index 7046cf7b..ce448b37 100644
--- a/t/unit/transport/test_redis.py
+++ b/t/unit/transport/test_redis.py
@@ -1,3 +1,5 @@
+import base64
+import copy
import socket
import types
from collections import defaultdict
@@ -399,6 +401,67 @@ class test_Channel:
)
crit.assert_called()
+ def test_do_restore_message_celery(self):
+ # Payload value from real Celery project
+ payload = {
+ "body": base64.b64encode(dumps([
+ [],
+ {},
+ {
+ "callbacks": None,
+ "errbacks": None,
+ "chain": None,
+ "chord": None,
+ },
+ ]).encode()).decode(),
+ "content-encoding": "utf-8",
+ "content-type": "application/json",
+ "headers": {
+ "lang": "py",
+ "task": "common.tasks.test_task",
+ "id": "980ad2bf-104c-4ce0-8643-67d1947173f6",
+ "shadow": None,
+ "eta": None,
+ "expires": None,
+ "group": None,
+ "group_index": None,
+ "retries": 0,
+ "timelimit": [None, None],
+ "root_id": "980ad2bf-104c-4ce0-8643-67d1947173f6",
+ "parent_id": None,
+ "argsrepr": "()",
+ "kwargsrepr": "{}",
+ "origin": "gen3437@Desktop",
+ "ignore_result": False,
+ },
+ "properties": {
+ "correlation_id": "980ad2bf-104c-4ce0-8643-67d1947173f6",
+ "reply_to": "512f2489-ca40-3585-bc10-9b801a981782",
+ "delivery_mode": 2,
+ "delivery_info": {
+ "exchange": "",
+ "routing_key": "celery",
+ },
+ "priority": 0,
+ "body_encoding": "base64",
+ "delivery_tag": "badb725e-9c3e-45be-b0a4-07e44630519f",
+ },
+ }
+ result_payload = copy.deepcopy(payload)
+ result_payload['headers']['redelivered'] = True
+ result_payload['properties']['delivery_info']['redelivered'] = True
+ queue = 'celery'
+
+ client = Mock(name='client')
+ lookup = self.channel._lookup = Mock(name='_lookup')
+ lookup.return_value = [queue]
+
+ self.channel._do_restore_message(
+ payload, 'exchange', 'routing_key', client,
+ )
+
+ client.rpush.assert_called_with(queue, dumps(result_payload))
+
def test_restore_no_messages(self):
message = Mock(name='message')