summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po40
-rw-r--r--oslo_messaging/_drivers/amqp.py12
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py13
-rw-r--r--oslo_messaging/_drivers/base.py2
-rw-r--r--oslo_messaging/_drivers/impl_qpid.py11
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py170
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py36
7 files changed, 185 insertions, 99 deletions
diff --git a/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po b/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po
index 0d30fde..9336eb6 100644
--- a/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po
+++ b/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po
@@ -5,14 +5,14 @@
#
# Translators:
# Jonathan Dupart <jonathan+transifex@dupart.org>, 2014
-# Maxime COQUEREL <max.coquerel@gmail.com>, 2014
+# Maxime COQUEREL <max.coquerel@gmail.com>, 2014-2015
msgid ""
msgstr ""
"Project-Id-Version: oslo.messaging\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
-"POT-Creation-Date: 2015-05-28 06:08+0000\n"
-"PO-Revision-Date: 2015-03-13 10:28+0000\n"
-"Last-Translator: openstackjenkins <jenkins@openstack.org>\n"
+"POT-Creation-Date: 2015-06-04 06:08+0000\n"
+"PO-Revision-Date: 2015-06-03 23:46+0000\n"
+"Last-Translator: Maxime COQUEREL <max.coquerel@gmail.com>\n"
"Language-Team: French (http://www.transifex.com/projects/p/oslomessaging/"
"language/fr/)\n"
"Plural-Forms: nplurals=2; plural=(n > 1)\n"
@@ -22,6 +22,34 @@ msgstr ""
"Generated-By: Babel 1.3\n"
#, python-format
+msgid ""
+"%(what)s is deprecated as of %(as_of)s and may be removed in %(remove_in)s. "
+"It will not be superseded."
+msgstr ""
+"%(what)s déprécié depuis %(as_of)s et sera sans doute retiré dans "
+"%(remove_in)s. Ne sera pas remplacé. "
+
+#, python-format
+msgid ""
+"%(what)s is deprecated as of %(as_of)s in favor of %(in_favor_of)s and may "
+"be removed in %(remove_in)s."
+msgstr ""
+"%(what)s déprécié depuis %(as_of)s au bénéfice de %(in_favor_of)s et sera "
+"sans doute retiré dans %(remove_in)s. "
+
+#, python-format
+msgid "%(what)s is deprecated as of %(as_of)s in favor of %(in_favor_of)s."
+msgstr "%(what)s déprécié depuis %(as_of)s au bénéfice de %(in_favor_of)s. "
+
+#, python-format
+msgid "%(what)s is deprecated as of %(as_of)s. It will not be superseded."
+msgstr "%(what)s déprécié depuis %(as_of)s. Ne sera pas remplacé. "
+
+#, python-format
+msgid "Deprecated: %s"
+msgstr "Déprécié: %s"
+
+#, python-format
msgid "Exception during message handling: %s"
msgstr "Exception lors de la manipulation du message: %s"
@@ -30,6 +58,10 @@ msgid "Failed to load any notifiers for %s"
msgstr "Echec de chargement des notifications pour %s"
#, python-format
+msgid "Fatal call to deprecated config: %(msg)s"
+msgstr "Appel fatal à config dépréciée : %(msg)s "
+
+#, python-format
msgid "Routing '%(event)s' notification to '%(driver)s' driver"
msgstr "Routage '%(event)s' notification du pilote %(driver)s'"
diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py
index ce5c21e..a91be1a 100644
--- a/oslo_messaging/_drivers/amqp.py
+++ b/oslo_messaging/_drivers/amqp.py
@@ -49,6 +49,18 @@ amqp_opts = [
default=False,
deprecated_group='DEFAULT',
help='Auto-delete queues in AMQP.'),
+ cfg.BoolOpt('send_single_reply',
+ default=False,
+ help='Send a single AMQP reply to call message. The current '
+ 'behaviour since oslo-incubator is to send two AMQP '
+ 'replies - first one with the payload, a second one to '
+ 'ensure the other have finish to send the payload. We '
+ 'are going to remove it in the N release, but we must '
+ 'keep backward compatible at the same time. This option '
+ 'provides such compatibility - it defaults to False in '
+ 'Liberty and can be turned on for early adopters with a '
+ 'new installations or for testing. Please note, that '
+ 'this option will be removed in M release.')
]
UNIQUE_ID = '_unique_id'
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 6e19dd8..3a1d9bb 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -71,8 +71,12 @@ class AMQPIncomingMessage(base.IncomingMessage):
return
with self.listener.driver._get_connection(
rpc_amqp.PURPOSE_SEND) as conn:
- self._send_reply(conn, reply, failure, log_failure=log_failure)
- self._send_reply(conn, ending=True)
+ if self.listener.driver.send_single_reply:
+ self._send_reply(conn, reply, failure, log_failure=log_failure,
+ ending=True)
+ else:
+ self._send_reply(conn, reply, failure, log_failure=log_failure)
+ self._send_reply(conn, ending=True)
def acknowledge(self):
self.listener.msg_id_cache.add(self.unique_id)
@@ -257,7 +261,8 @@ class ReplyWaiter(object):
class AMQPDriverBase(base.BaseDriver):
def __init__(self, conf, url, connection_pool,
- default_exchange=None, allowed_remote_exmods=None):
+ default_exchange=None, allowed_remote_exmods=None,
+ send_single_reply=False):
super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
@@ -270,6 +275,8 @@ class AMQPDriverBase(base.BaseDriver):
self._reply_q_conn = None
self._waiter = None
+ self.send_single_reply = send_single_reply
+
def _get_exchange(self, target):
return target.exchange or self._default_exchange
diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py
index fc24168..2051e9a 100644
--- a/oslo_messaging/_drivers/base.py
+++ b/oslo_messaging/_drivers/base.py
@@ -17,7 +17,7 @@ import abc
import six
-from oslo.config import cfg
+from oslo_config import cfg
from oslo_messaging import exceptions
base_opts = [
diff --git a/oslo_messaging/_drivers/impl_qpid.py b/oslo_messaging/_drivers/impl_qpid.py
index 8153aba..c4dd117 100644
--- a/oslo_messaging/_drivers/impl_qpid.py
+++ b/oslo_messaging/_drivers/impl_qpid.py
@@ -778,7 +778,10 @@ class QpidDriver(amqpdriver.AMQPDriverBase):
conf, conf.oslo_messaging_qpid.rpc_conn_pool_size,
url, Connection)
- super(QpidDriver, self).__init__(conf, url,
- connection_pool,
- default_exchange,
- allowed_remote_exmods)
+ super(QpidDriver, self).__init__(
+ conf, url,
+ connection_pool,
+ default_exchange,
+ allowed_remote_exmods,
+ conf.oslo_messaging_qpid.send_single_reply,
+ )
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 8c696cb..7bc7c43 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -70,6 +70,18 @@ rabbit_opts = [
deprecated_group='DEFAULT',
help='How long to wait before reconnecting in response to an '
'AMQP consumer cancel notification.'),
+ cfg.IntOpt('kombu_reconnect_timeout',
+ # NOTE(dhellmann): We want this to be similar to
+ # rpc_response_timeout, but we can't use
+ # "$rpc_response_timeout" as a default because that
+ # option may not have been defined by the time this
+ # option is accessed. Instead, document the intent in
+ # the help text for this option and provide a separate
+ # literal default value.
+ default=60,
+ help='How long to wait before considering a reconnect '
+ 'attempt to have failed. This value should not be '
+ 'longer than rpc_response_timeout.'),
cfg.StrOpt('rabbit_host',
default='localhost',
deprecated_group='DEFAULT',
@@ -144,7 +156,7 @@ rabbit_opts = [
LOG = logging.getLogger(__name__)
-def _get_queue_arguments(conf):
+def _get_queue_arguments(rabbit_ha_queues):
"""Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we declare a mirrored queue
@@ -155,7 +167,7 @@ def _get_queue_arguments(conf):
Setting x-ha-policy to all means that the queue will be mirrored
to all nodes in the cluster.
"""
- return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
+ return {'x-ha-policy': 'all'} if rabbit_ha_queues else {}
class RabbitMessage(dict):
@@ -174,8 +186,8 @@ class RabbitMessage(dict):
class Consumer(object):
"""Consumer class."""
- def __init__(self, conf, exchange_name, queue_name, routing_key, type,
- durable, auto_delete, callback, nowait=True):
+ def __init__(self, exchange_name, queue_name, routing_key, type, durable,
+ auto_delete, callback, nowait=True, rabbit_ha_queues=None):
"""Init the Publisher class with the exchange_name, routing_key,
type, durable auto_delete
"""
@@ -187,7 +199,7 @@ class Consumer(object):
self.callback = callback
self.type = type
self.nowait = nowait
- self.queue_arguments = _get_queue_arguments(conf)
+ self.queue_arguments = _get_queue_arguments(rabbit_ha_queues)
self.queue = None
self.exchange = kombu.entity.Exchange(
@@ -231,6 +243,9 @@ class Consumer(object):
consumer_tag=six.text_type(tag),
nowait=self.nowait)
+ def cancel(self, tag):
+ self.queue.cancel(six.text_type(tag))
+
def _callback(self, message):
"""Call callback with deserialized message.
@@ -362,26 +377,50 @@ class Connection(object):
pools = {}
def __init__(self, conf, url, purpose):
- self.conf = conf
- self.driver_conf = self.conf.oslo_messaging_rabbit
- self.max_retries = self.driver_conf.rabbit_max_retries
+ # NOTE(viktors): Parse config options
+ driver_conf = conf.oslo_messaging_rabbit
+
+ self.max_retries = driver_conf.rabbit_max_retries
+ self.interval_start = driver_conf.rabbit_retry_interval
+ self.interval_stepping = driver_conf.rabbit_retry_backoff
+
+ self.login_method = driver_conf.rabbit_login_method
+ self.fake_rabbit = driver_conf.fake_rabbit
+ self.virtual_host = driver_conf.rabbit_virtual_host
+ self.rabbit_hosts = driver_conf.rabbit_hosts
+ self.rabbit_port = driver_conf.rabbit_port
+ self.rabbit_userid = driver_conf.rabbit_userid
+ self.rabbit_password = driver_conf.rabbit_password
+ self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
+ self.heartbeat_timeout_threshold = \
+ driver_conf.heartbeat_timeout_threshold
+ self.heartbeat_rate = driver_conf.heartbeat_rate
+ self.kombu_reconnect_delay = driver_conf.kombu_reconnect_delay
+ self.amqp_durable_queues = driver_conf.amqp_durable_queues
+ self.amqp_auto_delete = driver_conf.amqp_auto_delete
+ self.rabbit_use_ssl = driver_conf.rabbit_use_ssl
+ self.kombu_reconnect_timeout = driver_conf.kombu_reconnect_timeout
+
+ if self.rabbit_use_ssl:
+ self.kombu_ssl_version = driver_conf.kombu_ssl_version
+ self.kombu_ssl_keyfile = driver_conf.kombu_ssl_keyfile
+ self.kombu_ssl_certfile = driver_conf.kombu_ssl_certfile
+ self.kombu_ssl_ca_certs = driver_conf.kombu_ssl_ca_certs
+
# Try forever?
if self.max_retries <= 0:
self.max_retries = None
- self.interval_start = self.driver_conf.rabbit_retry_interval
- self.interval_stepping = self.driver_conf.rabbit_retry_backoff
+
# max retry-interval = 30 seconds
self.interval_max = 30
- self._login_method = self.driver_conf.rabbit_login_method
-
if url.virtual_host is not None:
virtual_host = url.virtual_host
else:
- virtual_host = self.driver_conf.rabbit_virtual_host
+ virtual_host = self.virtual_host
self._url = ''
- if self.driver_conf.fake_rabbit:
+ if self.fake_rabbit:
LOG.warn("Deprecated: fake_rabbit option is deprecated, set "
"rpc_backend to kombu+memory or use the fake "
"driver instead.")
@@ -408,13 +447,13 @@ class Connection(object):
transport = url.transport.replace('kombu+', '')
self._url = "%s://%s" % (transport, virtual_host)
else:
- for adr in self.driver_conf.rabbit_hosts:
+ for adr in self.rabbit_hosts:
hostname, port = netutils.parse_host_port(
- adr, default_port=self.driver_conf.rabbit_port)
+ adr, default_port=self.rabbit_port)
self._url += '%samqp://%s:%s@%s:%s/%s' % (
";" if self._url else '',
- parse.quote(self.driver_conf.rabbit_userid),
- parse.quote(self.driver_conf.rabbit_password),
+ parse.quote(self.rabbit_userid),
+ parse.quote(self.rabbit_password),
self._parse_url_hostname(hostname), port,
virtual_host)
@@ -435,9 +474,9 @@ class Connection(object):
self.connection = kombu.connection.Connection(
self._url, ssl=self._fetch_ssl_params(),
- login_method=self._login_method,
+ login_method=self.login_method,
failover_strategy="shuffle",
- heartbeat=self.driver_conf.heartbeat_timeout_threshold,
+ heartbeat=self.heartbeat_timeout_threshold,
transport_options={'confirm_publish': True})
LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)s'),
@@ -452,8 +491,8 @@ class Connection(object):
# (heatbeat_timeout/heartbeat_rate/2.0, default kombu
# heartbeat_rate is 2)
self._heartbeat_wait_timeout = (
- float(self.driver_conf.heartbeat_timeout_threshold) /
- float(self.driver_conf.heartbeat_rate) / 2.0)
+ float(self.heartbeat_timeout_threshold) /
+ float(self.heartbeat_rate) / 2.0)
self._heartbeat_support_log_emitted = False
# NOTE(sileht): just ensure the connection is setuped at startup
@@ -523,19 +562,19 @@ class Connection(object):
"""Handles fetching what ssl params should be used for the connection
(if any).
"""
- if self.driver_conf.rabbit_use_ssl:
+ if self.rabbit_use_ssl:
ssl_params = dict()
# http://docs.python.org/library/ssl.html - ssl.wrap_socket
- if self.driver_conf.kombu_ssl_version:
+ if self.kombu_ssl_version:
ssl_params['ssl_version'] = self.validate_ssl_version(
- self.driver_conf.kombu_ssl_version)
- if self.driver_conf.kombu_ssl_keyfile:
- ssl_params['keyfile'] = self.driver_conf.kombu_ssl_keyfile
- if self.driver_conf.kombu_ssl_certfile:
- ssl_params['certfile'] = self.driver_conf.kombu_ssl_certfile
- if self.driver_conf.kombu_ssl_ca_certs:
- ssl_params['ca_certs'] = self.driver_conf.kombu_ssl_ca_certs
+ self.kombu_ssl_version)
+ if self.kombu_ssl_keyfile:
+ ssl_params['keyfile'] = self.kombu_ssl_keyfile
+ if self.kombu_ssl_certfile:
+ ssl_params['certfile'] = self.kombu_ssl_certfile
+ if self.kombu_ssl_ca_certs:
+ ssl_params['ca_certs'] = self.kombu_ssl_ca_certs
# We might want to allow variations in the
# future with this?
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
@@ -576,8 +615,8 @@ class Connection(object):
recoverable_error_callback and recoverable_error_callback(exc)
- interval = (self.driver_conf.kombu_reconnect_delay + interval
- if self.driver_conf.kombu_reconnect_delay > 0
+ interval = (self.kombu_reconnect_delay + interval
+ if self.kombu_reconnect_delay > 0
else interval)
info = {'err_str': exc, 'sleep_time': interval}
@@ -602,8 +641,8 @@ class Connection(object):
# use kombu for HA connection, the interval_step
# should sufficient, because the underlying kombu transport
# connection object freed.
- if self.driver_conf.kombu_reconnect_delay > 0:
- time.sleep(self.driver_conf.kombu_reconnect_delay)
+ if self.kombu_reconnect_delay > 0:
+ time.sleep(self.kombu_reconnect_delay)
def on_reconnection(new_channel):
"""Callback invoked when the kombu reconnects and creates
@@ -689,14 +728,15 @@ class Connection(object):
with self._connection_lock:
try:
- self._set_current_channel(self.connection.channel())
+ for tag, consumer in enumerate(self._consumers):
+ consumer.cancel(tag=tag)
except recoverable_errors:
self._set_current_channel(None)
self.ensure_connection()
- self._consumers = []
+ self._consumers = []
def _heartbeat_supported_and_enabled(self):
- if self.driver_conf.heartbeat_timeout_threshold <= 0:
+ if self.heartbeat_timeout_threshold <= 0:
return False
if self.connection.supports_heartbeats:
@@ -725,9 +765,9 @@ class Connection(object):
# NOTE(sileht): we are suposed to send at least one heartbeat
# every heartbeat_timeout_threshold, so no need to way more
with self._transport_socket_timeout(
- self.driver_conf.heartbeat_timeout_threshold):
+ self.heartbeat_timeout_threshold):
self.connection.heartbeat_check(
- rate=self.driver_conf.heartbeat_rate)
+ rate=self.heartbeat_rate)
def _heartbeat_start(self):
if self._heartbeat_supported_and_enabled():
@@ -864,28 +904,28 @@ class Connection(object):
responses for call/multicall
"""
- consumer = Consumer(self.driver_conf,
- exchange_name=topic,
+ consumer = Consumer(exchange_name=topic,
queue_name=topic,
routing_key=topic,
type='direct',
durable=False,
auto_delete=True,
- callback=callback)
+ callback=callback,
+ rabbit_ha_queues=self.rabbit_ha_queues)
self.declare_consumer(consumer)
def declare_topic_consumer(self, exchange_name, topic, callback=None,
queue_name=None):
"""Create a 'topic' consumer."""
- consumer = Consumer(self.driver_conf,
- exchange_name=exchange_name,
+ consumer = Consumer(exchange_name=exchange_name,
queue_name=queue_name or topic,
routing_key=topic,
type='topic',
- durable=self.driver_conf.amqp_durable_queues,
- auto_delete=self.driver_conf.amqp_auto_delete,
- callback=callback)
+ durable=self.amqp_durable_queues,
+ auto_delete=self.amqp_auto_delete,
+ callback=callback,
+ rabbit_ha_queues=self.rabbit_ha_queues)
self.declare_consumer(consumer)
@@ -896,15 +936,14 @@ class Connection(object):
exchange_name = '%s_fanout' % topic
queue_name = '%s_fanout_%s' % (topic, unique)
- consumer = Consumer(self.driver_conf,
- exchange_name=exchange_name,
+ consumer = Consumer(exchange_name=exchange_name,
queue_name=queue_name,
routing_key=topic,
type='fanout',
durable=False,
auto_delete=True,
callback=callback,
- nowait=False)
+ rabbit_ha_queues=self.rabbit_ha_queues)
self.declare_consumer(consumer)
@@ -939,7 +978,7 @@ class Connection(object):
# a answer before timeout is reached
transport_timeout = timeout
- heartbeat_timeout = self.driver_conf.heartbeat_timeout_threshold
+ heartbeat_timeout = self.heartbeat_timeout_threshold
if (self._heartbeat_supported_and_enabled() and (
transport_timeout is None or
transport_timeout > heartbeat_timeout)):
@@ -983,7 +1022,7 @@ class Connection(object):
auto_delete=exchange.auto_delete,
name=routing_key,
routing_key=routing_key,
- queue_arguments=_get_queue_arguments(self.driver_conf))
+ queue_arguments=_get_queue_arguments(self.rabbit_ha_queues))
queue.declare()
self.PUBLISHER_DECLARED_QUEUES[self.channel].add(queue_indentifier)
@@ -1001,8 +1040,10 @@ class Connection(object):
# TODO(sileht): use @retrying
# NOTE(sileht): no need to wait the application expect a response
# before timeout is exshauted
- duration = (timeout if timeout is None
- else self.conf.rpc_response_timeout)
+ duration = (
+ timeout if timeout is not None
+ else self.kombu_reconnect_timeout
+ )
timer = rpc_common.DecayingTimer(duration=duration)
timer.start()
@@ -1047,8 +1088,8 @@ class Connection(object):
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
- durable=self.driver_conf.amqp_durable_queues,
- auto_delete=self.driver_conf.amqp_auto_delete)
+ durable=self.amqp_durable_queues,
+ auto_delete=self.amqp_auto_delete)
self._ensure_publishing(self._publish, exchange, msg,
routing_key=topic, retry=retry)
@@ -1067,8 +1108,8 @@ class Connection(object):
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
- durable=self.driver_conf.amqp_durable_queues,
- auto_delete=self.driver_conf.amqp_auto_delete)
+ durable=self.amqp_durable_queues,
+ auto_delete=self.amqp_auto_delete)
self._ensure_publishing(self._publish_and_creates_default_queue,
exchange, msg, routing_key=topic, retry=retry)
@@ -1090,10 +1131,13 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
url, Connection)
- super(RabbitDriver, self).__init__(conf, url,
- connection_pool,
- default_exchange,
- allowed_remote_exmods)
+ super(RabbitDriver, self).__init__(
+ conf, url,
+ connection_pool,
+ default_exchange,
+ allowed_remote_exmods,
+ conf.oslo_messaging_rabbit.send_single_reply,
+ )
def require_features(self, requeue=True):
pass
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 9a9a747..660d861 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -257,18 +257,6 @@ class TestRabbitConsume(test_utils.BaseTestCase):
self.assertEqual(0, int(deadline - time.time()))
- def test_connection_reset_always_succeed(self):
- transport = oslo_messaging.get_transport(self.conf,
- 'kombu+memory:////')
- self.addCleanup(transport.cleanup)
- channel = mock.Mock()
- with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
- conn.connection.connection.recoverable_channel_errors = (IOError,)
- with mock.patch.object(conn.connection.connection, 'channel',
- side_effect=[IOError, IOError, channel]):
- conn.connection.reset()
- self.assertEqual(channel, conn.connection.channel)
-
def test_connection_ack_have_disconnected_kombu_connection(self):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
@@ -376,17 +364,25 @@ class TestSendReceive(test_utils.BaseTestCase):
('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken?
]
+ _reply_ending = [
+ ('old_behavior', dict(send_single_reply=False)),
+ ('new_behavior', dict(send_single_reply=True)),
+ ]
+
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
cls._context,
cls._reply,
cls._failure,
- cls._timeout)
+ cls._timeout,
+ cls._reply_ending)
def test_send_receive(self):
self.config(heartbeat_timeout_threshold=0,
group="oslo_messaging_rabbit")
+ self.config(send_single_reply=self.send_single_reply,
+ group="oslo_messaging_rabbit")
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
self.addCleanup(transport.cleanup)
@@ -611,7 +607,7 @@ def _declare_queue(target):
channel=channel,
exchange=exchange,
routing_key=target.topic)
- if target.server:
+ elif target.server:
exchange = kombu.entity.Exchange(name='openstack',
type='topic',
durable=False,
@@ -643,10 +639,8 @@ class TestRequestWireFormat(test_utils.BaseTestCase):
dict(topic='testtopic', server=None, fanout=False)),
('server_target',
dict(topic='testtopic', server='testserver', fanout=False)),
- # NOTE(markmc): https://github.com/celery/kombu/issues/195
('fanout_target',
- dict(topic='testtopic', server=None, fanout=True,
- skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
+ dict(topic='testtopic', server=None, fanout=True)),
]
_msg = [
@@ -684,8 +678,6 @@ class TestRequestWireFormat(test_utils.BaseTestCase):
return self.uuids[-1]
def test_request_wire_format(self):
- if hasattr(self, 'skip_msg'):
- self.skipTest(self.skip_msg)
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
@@ -787,10 +779,8 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
dict(topic='testtopic', server=None, fanout=False)),
('server_target',
dict(topic='testtopic', server='testserver', fanout=False)),
- # NOTE(markmc): https://github.com/celery/kombu/issues/195
('fanout_target',
- dict(topic='testtopic', server=None, fanout=True,
- skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
+ dict(topic='testtopic', server=None, fanout=True)),
]
_msg = [
@@ -818,8 +808,6 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
cls._target)
def test_reply_wire_format(self):
- if hasattr(self, 'skip_msg'):
- self.skipTest(self.skip_msg)
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')