diff options
-rw-r--r-- | oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po | 40 | ||||
-rw-r--r-- | oslo_messaging/_drivers/amqp.py | 12 | ||||
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 13 | ||||
-rw-r--r-- | oslo_messaging/_drivers/base.py | 2 | ||||
-rw-r--r-- | oslo_messaging/_drivers/impl_qpid.py | 11 | ||||
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 170 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/test_impl_rabbit.py | 36 |
7 files changed, 185 insertions, 99 deletions
diff --git a/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po b/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po index 0d30fde..9336eb6 100644 --- a/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po +++ b/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po @@ -5,14 +5,14 @@ # # Translators: # Jonathan Dupart <jonathan+transifex@dupart.org>, 2014 -# Maxime COQUEREL <max.coquerel@gmail.com>, 2014 +# Maxime COQUEREL <max.coquerel@gmail.com>, 2014-2015 msgid "" msgstr "" "Project-Id-Version: oslo.messaging\n" "Report-Msgid-Bugs-To: EMAIL@ADDRESS\n" -"POT-Creation-Date: 2015-05-28 06:08+0000\n" -"PO-Revision-Date: 2015-03-13 10:28+0000\n" -"Last-Translator: openstackjenkins <jenkins@openstack.org>\n" +"POT-Creation-Date: 2015-06-04 06:08+0000\n" +"PO-Revision-Date: 2015-06-03 23:46+0000\n" +"Last-Translator: Maxime COQUEREL <max.coquerel@gmail.com>\n" "Language-Team: French (http://www.transifex.com/projects/p/oslomessaging/" "language/fr/)\n" "Plural-Forms: nplurals=2; plural=(n > 1)\n" @@ -22,6 +22,34 @@ msgstr "" "Generated-By: Babel 1.3\n" #, python-format +msgid "" +"%(what)s is deprecated as of %(as_of)s and may be removed in %(remove_in)s. " +"It will not be superseded." +msgstr "" +"%(what)s déprécié depuis %(as_of)s et sera sans doute retiré dans " +"%(remove_in)s. Ne sera pas remplacé. " + +#, python-format +msgid "" +"%(what)s is deprecated as of %(as_of)s in favor of %(in_favor_of)s and may " +"be removed in %(remove_in)s." +msgstr "" +"%(what)s déprécié depuis %(as_of)s au bénéfice de %(in_favor_of)s et sera " +"sans doute retiré dans %(remove_in)s. " + +#, python-format +msgid "%(what)s is deprecated as of %(as_of)s in favor of %(in_favor_of)s." +msgstr "%(what)s déprécié depuis %(as_of)s au bénéfice de %(in_favor_of)s. " + +#, python-format +msgid "%(what)s is deprecated as of %(as_of)s. It will not be superseded." +msgstr "%(what)s déprécié depuis %(as_of)s. Ne sera pas remplacé. " + +#, python-format +msgid "Deprecated: %s" +msgstr "Déprécié: %s" + +#, python-format msgid "Exception during message handling: %s" msgstr "Exception lors de la manipulation du message: %s" @@ -30,6 +58,10 @@ msgid "Failed to load any notifiers for %s" msgstr "Echec de chargement des notifications pour %s" #, python-format +msgid "Fatal call to deprecated config: %(msg)s" +msgstr "Appel fatal à config dépréciée : %(msg)s " + +#, python-format msgid "Routing '%(event)s' notification to '%(driver)s' driver" msgstr "Routage '%(event)s' notification du pilote %(driver)s'" diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index ce5c21e..a91be1a 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -49,6 +49,18 @@ amqp_opts = [ default=False, deprecated_group='DEFAULT', help='Auto-delete queues in AMQP.'), + cfg.BoolOpt('send_single_reply', + default=False, + help='Send a single AMQP reply to call message. The current ' + 'behaviour since oslo-incubator is to send two AMQP ' + 'replies - first one with the payload, a second one to ' + 'ensure the other have finish to send the payload. We ' + 'are going to remove it in the N release, but we must ' + 'keep backward compatible at the same time. This option ' + 'provides such compatibility - it defaults to False in ' + 'Liberty and can be turned on for early adopters with a ' + 'new installations or for testing. Please note, that ' + 'this option will be removed in M release.') ] UNIQUE_ID = '_unique_id' diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 6e19dd8..3a1d9bb 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -71,8 +71,12 @@ class AMQPIncomingMessage(base.IncomingMessage): return with self.listener.driver._get_connection( rpc_amqp.PURPOSE_SEND) as conn: - self._send_reply(conn, reply, failure, log_failure=log_failure) - self._send_reply(conn, ending=True) + if self.listener.driver.send_single_reply: + self._send_reply(conn, reply, failure, log_failure=log_failure, + ending=True) + else: + self._send_reply(conn, reply, failure, log_failure=log_failure) + self._send_reply(conn, ending=True) def acknowledge(self): self.listener.msg_id_cache.add(self.unique_id) @@ -257,7 +261,8 @@ class ReplyWaiter(object): class AMQPDriverBase(base.BaseDriver): def __init__(self, conf, url, connection_pool, - default_exchange=None, allowed_remote_exmods=None): + default_exchange=None, allowed_remote_exmods=None, + send_single_reply=False): super(AMQPDriverBase, self).__init__(conf, url, default_exchange, allowed_remote_exmods) @@ -270,6 +275,8 @@ class AMQPDriverBase(base.BaseDriver): self._reply_q_conn = None self._waiter = None + self.send_single_reply = send_single_reply + def _get_exchange(self, target): return target.exchange or self._default_exchange diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index fc24168..2051e9a 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -17,7 +17,7 @@ import abc import six -from oslo.config import cfg +from oslo_config import cfg from oslo_messaging import exceptions base_opts = [ diff --git a/oslo_messaging/_drivers/impl_qpid.py b/oslo_messaging/_drivers/impl_qpid.py index 8153aba..c4dd117 100644 --- a/oslo_messaging/_drivers/impl_qpid.py +++ b/oslo_messaging/_drivers/impl_qpid.py @@ -778,7 +778,10 @@ class QpidDriver(amqpdriver.AMQPDriverBase): conf, conf.oslo_messaging_qpid.rpc_conn_pool_size, url, Connection) - super(QpidDriver, self).__init__(conf, url, - connection_pool, - default_exchange, - allowed_remote_exmods) + super(QpidDriver, self).__init__( + conf, url, + connection_pool, + default_exchange, + allowed_remote_exmods, + conf.oslo_messaging_qpid.send_single_reply, + ) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 8c696cb..7bc7c43 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -70,6 +70,18 @@ rabbit_opts = [ deprecated_group='DEFAULT', help='How long to wait before reconnecting in response to an ' 'AMQP consumer cancel notification.'), + cfg.IntOpt('kombu_reconnect_timeout', + # NOTE(dhellmann): We want this to be similar to + # rpc_response_timeout, but we can't use + # "$rpc_response_timeout" as a default because that + # option may not have been defined by the time this + # option is accessed. Instead, document the intent in + # the help text for this option and provide a separate + # literal default value. + default=60, + help='How long to wait before considering a reconnect ' + 'attempt to have failed. This value should not be ' + 'longer than rpc_response_timeout.'), cfg.StrOpt('rabbit_host', default='localhost', deprecated_group='DEFAULT', @@ -144,7 +156,7 @@ rabbit_opts = [ LOG = logging.getLogger(__name__) -def _get_queue_arguments(conf): +def _get_queue_arguments(rabbit_ha_queues): """Construct the arguments for declaring a queue. If the rabbit_ha_queues option is set, we declare a mirrored queue @@ -155,7 +167,7 @@ def _get_queue_arguments(conf): Setting x-ha-policy to all means that the queue will be mirrored to all nodes in the cluster. """ - return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} + return {'x-ha-policy': 'all'} if rabbit_ha_queues else {} class RabbitMessage(dict): @@ -174,8 +186,8 @@ class RabbitMessage(dict): class Consumer(object): """Consumer class.""" - def __init__(self, conf, exchange_name, queue_name, routing_key, type, - durable, auto_delete, callback, nowait=True): + def __init__(self, exchange_name, queue_name, routing_key, type, durable, + auto_delete, callback, nowait=True, rabbit_ha_queues=None): """Init the Publisher class with the exchange_name, routing_key, type, durable auto_delete """ @@ -187,7 +199,7 @@ class Consumer(object): self.callback = callback self.type = type self.nowait = nowait - self.queue_arguments = _get_queue_arguments(conf) + self.queue_arguments = _get_queue_arguments(rabbit_ha_queues) self.queue = None self.exchange = kombu.entity.Exchange( @@ -231,6 +243,9 @@ class Consumer(object): consumer_tag=six.text_type(tag), nowait=self.nowait) + def cancel(self, tag): + self.queue.cancel(six.text_type(tag)) + def _callback(self, message): """Call callback with deserialized message. @@ -362,26 +377,50 @@ class Connection(object): pools = {} def __init__(self, conf, url, purpose): - self.conf = conf - self.driver_conf = self.conf.oslo_messaging_rabbit - self.max_retries = self.driver_conf.rabbit_max_retries + # NOTE(viktors): Parse config options + driver_conf = conf.oslo_messaging_rabbit + + self.max_retries = driver_conf.rabbit_max_retries + self.interval_start = driver_conf.rabbit_retry_interval + self.interval_stepping = driver_conf.rabbit_retry_backoff + + self.login_method = driver_conf.rabbit_login_method + self.fake_rabbit = driver_conf.fake_rabbit + self.virtual_host = driver_conf.rabbit_virtual_host + self.rabbit_hosts = driver_conf.rabbit_hosts + self.rabbit_port = driver_conf.rabbit_port + self.rabbit_userid = driver_conf.rabbit_userid + self.rabbit_password = driver_conf.rabbit_password + self.rabbit_ha_queues = driver_conf.rabbit_ha_queues + self.heartbeat_timeout_threshold = \ + driver_conf.heartbeat_timeout_threshold + self.heartbeat_rate = driver_conf.heartbeat_rate + self.kombu_reconnect_delay = driver_conf.kombu_reconnect_delay + self.amqp_durable_queues = driver_conf.amqp_durable_queues + self.amqp_auto_delete = driver_conf.amqp_auto_delete + self.rabbit_use_ssl = driver_conf.rabbit_use_ssl + self.kombu_reconnect_timeout = driver_conf.kombu_reconnect_timeout + + if self.rabbit_use_ssl: + self.kombu_ssl_version = driver_conf.kombu_ssl_version + self.kombu_ssl_keyfile = driver_conf.kombu_ssl_keyfile + self.kombu_ssl_certfile = driver_conf.kombu_ssl_certfile + self.kombu_ssl_ca_certs = driver_conf.kombu_ssl_ca_certs + # Try forever? if self.max_retries <= 0: self.max_retries = None - self.interval_start = self.driver_conf.rabbit_retry_interval - self.interval_stepping = self.driver_conf.rabbit_retry_backoff + # max retry-interval = 30 seconds self.interval_max = 30 - self._login_method = self.driver_conf.rabbit_login_method - if url.virtual_host is not None: virtual_host = url.virtual_host else: - virtual_host = self.driver_conf.rabbit_virtual_host + virtual_host = self.virtual_host self._url = '' - if self.driver_conf.fake_rabbit: + if self.fake_rabbit: LOG.warn("Deprecated: fake_rabbit option is deprecated, set " "rpc_backend to kombu+memory or use the fake " "driver instead.") @@ -408,13 +447,13 @@ class Connection(object): transport = url.transport.replace('kombu+', '') self._url = "%s://%s" % (transport, virtual_host) else: - for adr in self.driver_conf.rabbit_hosts: + for adr in self.rabbit_hosts: hostname, port = netutils.parse_host_port( - adr, default_port=self.driver_conf.rabbit_port) + adr, default_port=self.rabbit_port) self._url += '%samqp://%s:%s@%s:%s/%s' % ( ";" if self._url else '', - parse.quote(self.driver_conf.rabbit_userid), - parse.quote(self.driver_conf.rabbit_password), + parse.quote(self.rabbit_userid), + parse.quote(self.rabbit_password), self._parse_url_hostname(hostname), port, virtual_host) @@ -435,9 +474,9 @@ class Connection(object): self.connection = kombu.connection.Connection( self._url, ssl=self._fetch_ssl_params(), - login_method=self._login_method, + login_method=self.login_method, failover_strategy="shuffle", - heartbeat=self.driver_conf.heartbeat_timeout_threshold, + heartbeat=self.heartbeat_timeout_threshold, transport_options={'confirm_publish': True}) LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)s'), @@ -452,8 +491,8 @@ class Connection(object): # (heatbeat_timeout/heartbeat_rate/2.0, default kombu # heartbeat_rate is 2) self._heartbeat_wait_timeout = ( - float(self.driver_conf.heartbeat_timeout_threshold) / - float(self.driver_conf.heartbeat_rate) / 2.0) + float(self.heartbeat_timeout_threshold) / + float(self.heartbeat_rate) / 2.0) self._heartbeat_support_log_emitted = False # NOTE(sileht): just ensure the connection is setuped at startup @@ -523,19 +562,19 @@ class Connection(object): """Handles fetching what ssl params should be used for the connection (if any). """ - if self.driver_conf.rabbit_use_ssl: + if self.rabbit_use_ssl: ssl_params = dict() # http://docs.python.org/library/ssl.html - ssl.wrap_socket - if self.driver_conf.kombu_ssl_version: + if self.kombu_ssl_version: ssl_params['ssl_version'] = self.validate_ssl_version( - self.driver_conf.kombu_ssl_version) - if self.driver_conf.kombu_ssl_keyfile: - ssl_params['keyfile'] = self.driver_conf.kombu_ssl_keyfile - if self.driver_conf.kombu_ssl_certfile: - ssl_params['certfile'] = self.driver_conf.kombu_ssl_certfile - if self.driver_conf.kombu_ssl_ca_certs: - ssl_params['ca_certs'] = self.driver_conf.kombu_ssl_ca_certs + self.kombu_ssl_version) + if self.kombu_ssl_keyfile: + ssl_params['keyfile'] = self.kombu_ssl_keyfile + if self.kombu_ssl_certfile: + ssl_params['certfile'] = self.kombu_ssl_certfile + if self.kombu_ssl_ca_certs: + ssl_params['ca_certs'] = self.kombu_ssl_ca_certs # We might want to allow variations in the # future with this? ssl_params['cert_reqs'] = ssl.CERT_REQUIRED @@ -576,8 +615,8 @@ class Connection(object): recoverable_error_callback and recoverable_error_callback(exc) - interval = (self.driver_conf.kombu_reconnect_delay + interval - if self.driver_conf.kombu_reconnect_delay > 0 + interval = (self.kombu_reconnect_delay + interval + if self.kombu_reconnect_delay > 0 else interval) info = {'err_str': exc, 'sleep_time': interval} @@ -602,8 +641,8 @@ class Connection(object): # use kombu for HA connection, the interval_step # should sufficient, because the underlying kombu transport # connection object freed. - if self.driver_conf.kombu_reconnect_delay > 0: - time.sleep(self.driver_conf.kombu_reconnect_delay) + if self.kombu_reconnect_delay > 0: + time.sleep(self.kombu_reconnect_delay) def on_reconnection(new_channel): """Callback invoked when the kombu reconnects and creates @@ -689,14 +728,15 @@ class Connection(object): with self._connection_lock: try: - self._set_current_channel(self.connection.channel()) + for tag, consumer in enumerate(self._consumers): + consumer.cancel(tag=tag) except recoverable_errors: self._set_current_channel(None) self.ensure_connection() - self._consumers = [] + self._consumers = [] def _heartbeat_supported_and_enabled(self): - if self.driver_conf.heartbeat_timeout_threshold <= 0: + if self.heartbeat_timeout_threshold <= 0: return False if self.connection.supports_heartbeats: @@ -725,9 +765,9 @@ class Connection(object): # NOTE(sileht): we are suposed to send at least one heartbeat # every heartbeat_timeout_threshold, so no need to way more with self._transport_socket_timeout( - self.driver_conf.heartbeat_timeout_threshold): + self.heartbeat_timeout_threshold): self.connection.heartbeat_check( - rate=self.driver_conf.heartbeat_rate) + rate=self.heartbeat_rate) def _heartbeat_start(self): if self._heartbeat_supported_and_enabled(): @@ -864,28 +904,28 @@ class Connection(object): responses for call/multicall """ - consumer = Consumer(self.driver_conf, - exchange_name=topic, + consumer = Consumer(exchange_name=topic, queue_name=topic, routing_key=topic, type='direct', durable=False, auto_delete=True, - callback=callback) + callback=callback, + rabbit_ha_queues=self.rabbit_ha_queues) self.declare_consumer(consumer) def declare_topic_consumer(self, exchange_name, topic, callback=None, queue_name=None): """Create a 'topic' consumer.""" - consumer = Consumer(self.driver_conf, - exchange_name=exchange_name, + consumer = Consumer(exchange_name=exchange_name, queue_name=queue_name or topic, routing_key=topic, type='topic', - durable=self.driver_conf.amqp_durable_queues, - auto_delete=self.driver_conf.amqp_auto_delete, - callback=callback) + durable=self.amqp_durable_queues, + auto_delete=self.amqp_auto_delete, + callback=callback, + rabbit_ha_queues=self.rabbit_ha_queues) self.declare_consumer(consumer) @@ -896,15 +936,14 @@ class Connection(object): exchange_name = '%s_fanout' % topic queue_name = '%s_fanout_%s' % (topic, unique) - consumer = Consumer(self.driver_conf, - exchange_name=exchange_name, + consumer = Consumer(exchange_name=exchange_name, queue_name=queue_name, routing_key=topic, type='fanout', durable=False, auto_delete=True, callback=callback, - nowait=False) + rabbit_ha_queues=self.rabbit_ha_queues) self.declare_consumer(consumer) @@ -939,7 +978,7 @@ class Connection(object): # a answer before timeout is reached transport_timeout = timeout - heartbeat_timeout = self.driver_conf.heartbeat_timeout_threshold + heartbeat_timeout = self.heartbeat_timeout_threshold if (self._heartbeat_supported_and_enabled() and ( transport_timeout is None or transport_timeout > heartbeat_timeout)): @@ -983,7 +1022,7 @@ class Connection(object): auto_delete=exchange.auto_delete, name=routing_key, routing_key=routing_key, - queue_arguments=_get_queue_arguments(self.driver_conf)) + queue_arguments=_get_queue_arguments(self.rabbit_ha_queues)) queue.declare() self.PUBLISHER_DECLARED_QUEUES[self.channel].add(queue_indentifier) @@ -1001,8 +1040,10 @@ class Connection(object): # TODO(sileht): use @retrying # NOTE(sileht): no need to wait the application expect a response # before timeout is exshauted - duration = (timeout if timeout is None - else self.conf.rpc_response_timeout) + duration = ( + timeout if timeout is not None + else self.kombu_reconnect_timeout + ) timer = rpc_common.DecayingTimer(duration=duration) timer.start() @@ -1047,8 +1088,8 @@ class Connection(object): exchange = kombu.entity.Exchange( name=exchange_name, type='topic', - durable=self.driver_conf.amqp_durable_queues, - auto_delete=self.driver_conf.amqp_auto_delete) + durable=self.amqp_durable_queues, + auto_delete=self.amqp_auto_delete) self._ensure_publishing(self._publish, exchange, msg, routing_key=topic, retry=retry) @@ -1067,8 +1108,8 @@ class Connection(object): exchange = kombu.entity.Exchange( name=exchange_name, type='topic', - durable=self.driver_conf.amqp_durable_queues, - auto_delete=self.driver_conf.amqp_auto_delete) + durable=self.amqp_durable_queues, + auto_delete=self.amqp_auto_delete) self._ensure_publishing(self._publish_and_creates_default_queue, exchange, msg, routing_key=topic, retry=retry) @@ -1090,10 +1131,13 @@ class RabbitDriver(amqpdriver.AMQPDriverBase): conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size, url, Connection) - super(RabbitDriver, self).__init__(conf, url, - connection_pool, - default_exchange, - allowed_remote_exmods) + super(RabbitDriver, self).__init__( + conf, url, + connection_pool, + default_exchange, + allowed_remote_exmods, + conf.oslo_messaging_rabbit.send_single_reply, + ) def require_features(self, requeue=True): pass diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 9a9a747..660d861 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -257,18 +257,6 @@ class TestRabbitConsume(test_utils.BaseTestCase): self.assertEqual(0, int(deadline - time.time())) - def test_connection_reset_always_succeed(self): - transport = oslo_messaging.get_transport(self.conf, - 'kombu+memory:////') - self.addCleanup(transport.cleanup) - channel = mock.Mock() - with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: - conn.connection.connection.recoverable_channel_errors = (IOError,) - with mock.patch.object(conn.connection.connection, 'channel', - side_effect=[IOError, IOError, channel]): - conn.connection.reset() - self.assertEqual(channel, conn.connection.channel) - def test_connection_ack_have_disconnected_kombu_connection(self): transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') @@ -376,17 +364,25 @@ class TestSendReceive(test_utils.BaseTestCase): ('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken? ] + _reply_ending = [ + ('old_behavior', dict(send_single_reply=False)), + ('new_behavior', dict(send_single_reply=True)), + ] + @classmethod def generate_scenarios(cls): cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders, cls._context, cls._reply, cls._failure, - cls._timeout) + cls._timeout, + cls._reply_ending) def test_send_receive(self): self.config(heartbeat_timeout_threshold=0, group="oslo_messaging_rabbit") + self.config(send_single_reply=self.send_single_reply, + group="oslo_messaging_rabbit") transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') self.addCleanup(transport.cleanup) @@ -611,7 +607,7 @@ def _declare_queue(target): channel=channel, exchange=exchange, routing_key=target.topic) - if target.server: + elif target.server: exchange = kombu.entity.Exchange(name='openstack', type='topic', durable=False, @@ -643,10 +639,8 @@ class TestRequestWireFormat(test_utils.BaseTestCase): dict(topic='testtopic', server=None, fanout=False)), ('server_target', dict(topic='testtopic', server='testserver', fanout=False)), - # NOTE(markmc): https://github.com/celery/kombu/issues/195 ('fanout_target', - dict(topic='testtopic', server=None, fanout=True, - skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')), + dict(topic='testtopic', server=None, fanout=True)), ] _msg = [ @@ -684,8 +678,6 @@ class TestRequestWireFormat(test_utils.BaseTestCase): return self.uuids[-1] def test_request_wire_format(self): - if hasattr(self, 'skip_msg'): - self.skipTest(self.skip_msg) transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') @@ -787,10 +779,8 @@ class TestReplyWireFormat(test_utils.BaseTestCase): dict(topic='testtopic', server=None, fanout=False)), ('server_target', dict(topic='testtopic', server='testserver', fanout=False)), - # NOTE(markmc): https://github.com/celery/kombu/issues/195 ('fanout_target', - dict(topic='testtopic', server=None, fanout=True, - skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')), + dict(topic='testtopic', server=None, fanout=True)), ] _msg = [ @@ -818,8 +808,6 @@ class TestReplyWireFormat(test_utils.BaseTestCase): cls._target) def test_reply_wire_format(self): - if hasattr(self, 'skip_msg'): - self.skipTest(self.skip_msg) transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') |