diff options
author | Rohan McGovern <rohan@mcgovern.id.au> | 2019-04-04 17:56:43 +1000 |
---|---|---|
committer | Omer Katz <omer.drow@gmail.com> | 2019-04-04 10:56:43 +0300 |
commit | 57f464895dfdf7bbf2b4eafbe180d09c51bb889e (patch) | |
tree | 9566c5be9f31e8d7b25ecbe11c9807bdb805a920 | |
parent | fcc4c620bf017eb456b7d9e694b64e834401ff12 (diff) | |
download | kombu-57f464895dfdf7bbf2b4eafbe180d09c51bb889e.tar.gz |
qpid: ensure supported exchange types are declared (#1034)
This is a follow-up to ddba8aeaf0f68. Prior to that commit,
"implements" was missing entirely for qpid. The earlier commit
added it, but only declared that the transport was async-capable.
Turns out it's important also to declare the supported exchange
types. In particular, if the transport doesn't declare support
for the 'fanout' exchange type, pidbox wouldn't be used[1] and so
it would still not be possible to revoke a task with terminate=True
when using the qpid transport.
[1] https://github.com/celery/celery/commit/2f58c35340f648
-rw-r--r-- | kombu/transport/qpid.py | 1 | ||||
-rw-r--r-- | t/unit/transport/test_qpid.py | 6 |
2 files changed, 7 insertions, 0 deletions
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py index 524fab04..3eaaf914 100644 --- a/kombu/transport/qpid.py +++ b/kombu/transport/qpid.py @@ -1408,6 +1408,7 @@ class Transport(base.Transport): # This Transport does support the Celery asynchronous event model. implements = virtual.Transport.implements.extend( asynchronous=True, + exchange_type=frozenset(['direct', 'topic', 'fanout']), ) # The driver type and name for identification purposes. diff --git a/t/unit/transport/test_qpid.py b/t/unit/transport/test_qpid.py index 874ebc60..e2999c3e 100644 --- a/t/unit/transport/test_qpid.py +++ b/t/unit/transport/test_qpid.py @@ -1667,6 +1667,12 @@ class test_Transport_class_attributes(object): assert Transport.driver_type == 'qpid' assert Transport.driver_name == 'qpid' + def test_verify_implements_exchange_types(self): + assert 'fanout' in Transport.implements.exchange_type + assert 'direct' in Transport.implements.exchange_type + assert 'topic' in Transport.implements.exchange_type + assert 'frobnitz' not in Transport.implements.exchange_type + def test_transport_verify_recoverable_connection_errors(self): connection_errors = Transport.recoverable_connection_errors assert ConnectionError in connection_errors |