summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRohan McGovern <rohan@mcgovern.id.au>2019-04-04 17:56:43 +1000
committerOmer Katz <omer.drow@gmail.com>2019-04-04 10:56:43 +0300
commit57f464895dfdf7bbf2b4eafbe180d09c51bb889e (patch)
tree9566c5be9f31e8d7b25ecbe11c9807bdb805a920
parentfcc4c620bf017eb456b7d9e694b64e834401ff12 (diff)
downloadkombu-57f464895dfdf7bbf2b4eafbe180d09c51bb889e.tar.gz
qpid: ensure supported exchange types are declared (#1034)
This is a follow-up to ddba8aeaf0f68. Prior to that commit, "implements" was missing entirely for qpid. The earlier commit added it, but only declared that the transport was async-capable. Turns out it's important also to declare the supported exchange types. In particular, if the transport doesn't declare support for the 'fanout' exchange type, pidbox wouldn't be used[1] and so it would still not be possible to revoke a task with terminate=True when using the qpid transport. [1] https://github.com/celery/celery/commit/2f58c35340f648
-rw-r--r--kombu/transport/qpid.py1
-rw-r--r--t/unit/transport/test_qpid.py6
2 files changed, 7 insertions, 0 deletions
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index 524fab04..3eaaf914 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -1408,6 +1408,7 @@ class Transport(base.Transport):
# This Transport does support the Celery asynchronous event model.
implements = virtual.Transport.implements.extend(
asynchronous=True,
+ exchange_type=frozenset(['direct', 'topic', 'fanout']),
)
# The driver type and name for identification purposes.
diff --git a/t/unit/transport/test_qpid.py b/t/unit/transport/test_qpid.py
index 874ebc60..e2999c3e 100644
--- a/t/unit/transport/test_qpid.py
+++ b/t/unit/transport/test_qpid.py
@@ -1667,6 +1667,12 @@ class test_Transport_class_attributes(object):
assert Transport.driver_type == 'qpid'
assert Transport.driver_name == 'qpid'
+ def test_verify_implements_exchange_types(self):
+ assert 'fanout' in Transport.implements.exchange_type
+ assert 'direct' in Transport.implements.exchange_type
+ assert 'topic' in Transport.implements.exchange_type
+ assert 'frobnitz' not in Transport.implements.exchange_type
+
def test_transport_verify_recoverable_connection_errors(self):
connection_errors = Transport.recoverable_connection_errors
assert ConnectionError in connection_errors