summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.pre-commit-config.yaml2
-rw-r--r--.zuul.yaml2
-rw-r--r--doc/source/admin/rabbit.rst5
-rw-r--r--oslo_messaging/_drivers/amqp.py4
-rw-r--r--oslo_messaging/_drivers/amqp1_driver/controller.py3
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py7
-rw-r--r--oslo_messaging/_drivers/common.py4
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py167
-rw-r--r--oslo_messaging/_drivers/pool.py10
-rw-r--r--oslo_messaging/exceptions.py4
-rw-r--r--oslo_messaging/notify/messaging.py23
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py87
-rw-r--r--oslo_messaging/tests/drivers/test_pool.py4
-rw-r--r--oslo_messaging/tests/notify/test_notifier.py75
-rw-r--r--oslo_messaging/tests/test_transport.py2
-rw-r--r--releasenotes/notes/add-quorum-control-configurations-beed79811ff97ba2.yaml6
-rw-r--r--releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml11
-rw-r--r--releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml8
-rw-r--r--releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml9
-rw-r--r--releasenotes/notes/enforce_fips_mode-07dd259eb8a73c2b.yaml10
-rw-r--r--releasenotes/source/index.rst2
-rw-r--r--releasenotes/source/yoga.rst6
-rw-r--r--releasenotes/source/zed.rst6
-rw-r--r--setup.cfg5
-rw-r--r--test-requirements.txt2
25 files changed, 417 insertions, 47 deletions
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 08aef91..50d8dea 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -32,7 +32,7 @@ repos:
- id: flake8
name: flake8
additional_dependencies:
- - hacking>=3.0.1,<3.1.0
+ - hacking>=3.0.1,<=4.1.0
language: python
entry: flake8
files: '^.*\.py$'
diff --git a/.zuul.yaml b/.zuul.yaml
index e8fc581..92d0ffc 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -108,7 +108,7 @@
- check-requirements
- lib-forward-testing-python3
- openstack-cover-jobs
- - openstack-python3-yoga-jobs
+ - openstack-python3-zed-jobs
- periodic-stable-jobs
- publish-openstack-docs-pti
- release-notes-jobs-python3
diff --git a/doc/source/admin/rabbit.rst b/doc/source/admin/rabbit.rst
index 142bdf7..b9433a5 100644
--- a/doc/source/admin/rabbit.rst
+++ b/doc/source/admin/rabbit.rst
@@ -240,6 +240,10 @@ Consuming Options
^^^^^^^^^^^^^^^^^
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_ha_queues`
+- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_queue`
+- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_delivery_limit`
+- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quroum_max_memory_length`
+- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quroum_max_memory_bytes`
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_transient_queues_ttl`
Connection Options
@@ -266,3 +270,4 @@ Security Options
- :oslo.config:option:`oslo_messaging_rabbit.ssl_key_file`
- :oslo.config:option:`oslo_messaging_rabbit.ssl_cert_file`
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_login_method`
+- :oslo.config:option:`oslo_messaging_rabbit.ssl_enforce_fips_mode`
diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py
index d4db2c6..b0c9551 100644
--- a/oslo_messaging/_drivers/amqp.py
+++ b/oslo_messaging/_drivers/amqp.py
@@ -32,7 +32,9 @@ from oslo_messaging._drivers import common as rpc_common
amqp_opts = [
cfg.BoolOpt('amqp_durable_queues',
default=False,
- help='Use durable queues in AMQP.'),
+ help='Use durable queues in AMQP. If rabbit_quorum_queue '
+ 'is enabled, queues will be durable and this value will '
+ 'be ignored.'),
cfg.BoolOpt('amqp_auto_delete',
default=False,
deprecated_group='DEFAULT',
diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py
index bba7228..44a6bb4 100644
--- a/oslo_messaging/_drivers/amqp1_driver/controller.py
+++ b/oslo_messaging/_drivers/amqp1_driver/controller.py
@@ -1245,6 +1245,7 @@ class Controller(pyngus.ConnectionEventHandler):
# service. Try to re-establish the connection:
if not self._reconnecting:
self._reconnecting = True
+ self.processor.wakeup(lambda: self._hard_reset(reason))
LOG.info("Delaying reconnect attempt for %d seconds",
self._delay)
self.processor.defer(lambda: self._do_reconnect(reason),
@@ -1261,7 +1262,6 @@ class Controller(pyngus.ConnectionEventHandler):
"""
self._reconnecting = False
if not self._closing:
- self._hard_reset(reason)
host = self.hosts.next()
LOG.info("Reconnecting to: %(hostname)s:%(port)s",
{'hostname': host.hostname, 'port': host.port})
@@ -1331,4 +1331,5 @@ class Controller(pyngus.ConnectionEventHandler):
def _active(self):
# Is the connection up
return (self._socket_connection and
+ self._socket_connection.pyngus_conn and
self._socket_connection.pyngus_conn.active)
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 24fdbc7..991bf46 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -601,9 +601,10 @@ class AMQPDriverBase(base.BaseDriver):
def _get_exchange(self, target):
return target.exchange or self._default_exchange
- def _get_connection(self, purpose=rpc_common.PURPOSE_SEND):
+ def _get_connection(self, purpose=rpc_common.PURPOSE_SEND, retry=None):
return rpc_common.ConnectionContext(self._connection_pool,
- purpose=purpose)
+ purpose=purpose,
+ retry=retry)
def _get_reply_q(self):
with self._reply_q_lock:
@@ -649,7 +650,7 @@ class AMQPDriverBase(base.BaseDriver):
log_msg = "CAST unique_id: %s " % unique_id
try:
- with self._get_connection(rpc_common.PURPOSE_SEND) as conn:
+ with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn:
if notify:
exchange = self._get_exchange(target)
LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"
diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py
index 54c6f7f..b6c3adb 100644
--- a/oslo_messaging/_drivers/common.py
+++ b/oslo_messaging/_drivers/common.py
@@ -392,7 +392,7 @@ class ConnectionContext(Connection):
If possible the function makes sure to return a connection to the pool.
"""
- def __init__(self, connection_pool, purpose):
+ def __init__(self, connection_pool, purpose, retry):
"""Create a new connection, or get one from the pool."""
self.connection = None
self.connection_pool = connection_pool
@@ -420,7 +420,7 @@ class ConnectionContext(Connection):
pooled = purpose == PURPOSE_SEND
if pooled:
- self.connection = connection_pool.get()
+ self.connection = connection_pool.get(retry=retry)
else:
self.connection = connection_pool.create(purpose)
self.pooled = pooled
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index a1f5ce2..12bf82c 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import collections
import contextlib
import errno
import functools
@@ -45,6 +46,13 @@ from oslo_messaging._drivers import pool
from oslo_messaging import _utils
from oslo_messaging import exceptions
+
+# The QuorumMemConfig will hold the quorum queue memory configurations
+QuorumMemConfig = collections.namedtuple('QuorumMemConfig',
+ 'delivery_limit'
+ ' max_memory_length'
+ ' max_memory_bytes')
+
# NOTE(sileht): don't exist in py2 socket module
TCP_USER_TIMEOUT = 18
@@ -74,8 +82,17 @@ rabbit_opts = [
deprecated_name='kombu_ssl_ca_certs',
help='SSL certification authority file '
'(valid only if SSL enabled).'),
+ cfg.BoolOpt('ssl_enforce_fips_mode',
+ default=False,
+ help='Global toggle for enforcing the OpenSSL FIPS mode. '
+ 'This feature requires Python support. '
+ 'This is available in Python 3.9 in all '
+ 'environments and may have been backported to older '
+ 'Python versions on select environments. If the Python '
+ 'executable used does not support OpenSSL FIPS mode, '
+ 'an exception will be raised.'),
cfg.BoolOpt('heartbeat_in_pthread',
- default=True,
+ default=False,
help="Run the health check heartbeat thread "
"through a native python thread by default. If this "
"option is equal to False then the health check "
@@ -83,7 +100,9 @@ rabbit_opts = [
"from the parent process. For "
"example if the parent process has monkey patched the "
"stdlib by using eventlet/greenlet then the heartbeat "
- "will be run through a green thread.",
+ "will be run through a green thread. "
+ "This option should be set to True only for the "
+ "wsgi services.",
),
cfg.FloatOpt('kombu_reconnect_delay',
default=1.0,
@@ -108,7 +127,7 @@ rabbit_opts = [
'unavailable. Takes effect only if more than one '
'RabbitMQ node is provided in config.'),
cfg.StrOpt('rabbit_login_method',
- choices=('PLAIN', 'AMQPLAIN', 'RABBIT-CR-DEMO'),
+ choices=('PLAIN', 'AMQPLAIN', 'EXTERNAL', 'RABBIT-CR-DEMO'),
default='AMQPLAIN',
deprecated_group='DEFAULT',
help='The RabbitMQ login method.'),
@@ -136,6 +155,41 @@ rabbit_opts = [
'nodes, run: '
"""\"rabbitmqctl set_policy HA '^(?!amq\\.).*' """
"""'{"ha-mode": "all"}' \""""),
+ cfg.BoolOpt('rabbit_quorum_queue',
+ default=False,
+ help='Use quorum queues in RabbitMQ (x-queue-type: quorum). '
+ 'The quorum queue is a modern queue type for RabbitMQ '
+ 'implementing a durable, replicated FIFO queue based on the '
+ 'Raft consensus algorithm. It is available as of '
+ 'RabbitMQ 3.8.0. If set this option will conflict with '
+ 'the HA queues (``rabbit_ha_queues``) aka mirrored queues, '
+ 'in other words the HA queues should be disabled, quorum '
+ 'queues durable by default so the amqp_durable_queues '
+ 'opion is ignored when this option enabled.'),
+ cfg.IntOpt('rabbit_quorum_delivery_limit',
+ default=0,
+ help='Each time a message is redelivered to a consumer, '
+ 'a counter is incremented. Once the redelivery count '
+ 'exceeds the delivery limit the message gets dropped '
+ 'or dead-lettered (if a DLX exchange has been configured) '
+ 'Used only when rabbit_quorum_queue is enabled, '
+ 'Default 0 which means dont set a limit.'),
+ cfg.IntOpt('rabbit_quroum_max_memory_length',
+ default=0,
+ help='By default all messages are maintained in memory '
+ 'if a quorum queue grows in length it can put memory '
+ 'pressure on a cluster. This option can limit the number '
+ 'of messages in the quorum queue. '
+ 'Used only when rabbit_quorum_queue is enabled, '
+ 'Default 0 which means dont set a limit.'),
+ cfg.IntOpt('rabbit_quroum_max_memory_bytes',
+ default=0,
+ help='By default all messages are maintained in memory '
+ 'if a quorum queue grows in length it can put memory '
+ 'pressure on a cluster. This option can limit the number '
+ 'of memory bytes used by the quorum queue. '
+ 'Used only when rabbit_quorum_queue is enabled, '
+ 'Default 0 which means dont set a limit.'),
cfg.IntOpt('rabbit_transient_queues_ttl',
min=1,
default=1800,
@@ -178,7 +232,9 @@ rabbit_opts = [
LOG = logging.getLogger(__name__)
-def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
+def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
+ rabbit_quorum_queue,
+ rabbit_quorum_queue_config):
"""Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we try to declare a mirrored queue
@@ -201,12 +257,48 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
Setting a queue TTL causes the queue to be automatically deleted
if it is unused for the TTL duration. This is a helpful safeguard
to prevent queues with zero consumers from growing without bound.
+
+ If the rabbit_quorum_queue option is set, we try to declare a mirrored
+ queue as described here:
+
+ https://www.rabbitmq.com/quorum-queues.html
+
+ Setting x-queue-type to quorum means that replicated FIFO queue based on
+ the Raft consensus algorithm will be used. It is available as of
+ RabbitMQ 3.8.0. If set this option will conflict with
+ the HA queues (``rabbit_ha_queues``) aka mirrored queues,
+ in other words HA queues should be disabled.
+
+ rabbit_quorum_queue_config:
+ Quorum queues provides three options to handle message poisoning
+ and limit the resources the qourum queue can use
+ x-delivery-limit number of times the queue will try to deliver
+ a message before it decide to discard it
+ x-max-in-memory-length, x-max-in-memory-bytes control the size
+ of memory used by quorum queue
"""
args = {}
+ if rabbit_quorum_queue and rabbit_ha_queues:
+ raise RuntimeError('Configuration Error: rabbit_quorum_queue '
+ 'and rabbit_ha_queues both enabled, queue '
+ 'type is quorum or HA (mirrored) not both')
+
if rabbit_ha_queues:
args['x-ha-policy'] = 'all'
+ if rabbit_quorum_queue:
+ args['x-queue-type'] = 'quorum'
+ if rabbit_quorum_queue_config.delivery_limit:
+ args['x-delivery-limit'] = \
+ rabbit_quorum_queue_config.delivery_limit
+ if rabbit_quorum_queue_config.max_memory_length:
+ args['x-max-in-memory-length'] = \
+ rabbit_quorum_queue_config.max_memory_length
+ if rabbit_quorum_queue_config.max_memory_bytes:
+ args['x-max-in-memory-bytes'] = \
+ rabbit_quorum_queue_config.max_memory_bytes
+
if rabbit_queue_ttl > 0:
args['x-expires'] = rabbit_queue_ttl * 1000
@@ -235,7 +327,8 @@ class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback,
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
- enable_cancel_on_failover=False):
+ enable_cancel_on_failover=False, rabbit_quorum_queue=False,
+ rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0)):
"""Init the Consumer class with the exchange_name, routing_key,
type, durable auto_delete
"""
@@ -248,8 +341,10 @@ class Consumer(object):
self.callback = callback
self.type = type
self.nowait = nowait
- self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
- rabbit_queue_ttl)
+ rabbit_quorum_queue_config = rabbit_quorum_queue_config or {}
+ self.queue_arguments = _get_queue_arguments(
+ rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue,
+ rabbit_quorum_queue_config)
self.queue = None
self._declared_on = None
self.exchange = kombu.entity.Exchange(
@@ -496,16 +591,20 @@ class ConnectionLock(DummyConnectionLock):
class Connection(object):
"""Connection object."""
- def __init__(self, conf, url, purpose):
+ def __init__(self, conf, url, purpose, retry=None):
# NOTE(viktors): Parse config options
driver_conf = conf.oslo_messaging_rabbit
self.interval_start = driver_conf.rabbit_retry_interval
self.interval_stepping = driver_conf.rabbit_retry_backoff
self.interval_max = driver_conf.rabbit_interval_max
+ self.max_retries = retry
self.login_method = driver_conf.rabbit_login_method
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
+ self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue
+ self.rabbit_quorum_queue_config = self._get_quorum_configurations(
+ driver_conf)
self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
@@ -521,6 +620,7 @@ class Connection(object):
self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
self.kombu_compression = driver_conf.kombu_compression
self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread
+ self.ssl_enforce_fips_mode = driver_conf.ssl_enforce_fips_mode
self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover
if self.heartbeat_in_pthread:
@@ -558,6 +658,19 @@ class Connection(object):
self.ssl_cert_file = driver_conf.ssl_cert_file
self.ssl_ca_file = driver_conf.ssl_ca_file
+ if self.ssl_enforce_fips_mode:
+ if hasattr(ssl, 'FIPS_mode'):
+ LOG.info("Enforcing the use of the OpenSSL FIPS mode")
+ ssl.FIPS_mode_set(1)
+ else:
+ raise exceptions.ConfigurationError(
+ "OpenSSL FIPS mode is not supported by your Python "
+ "version. You must either change the Python "
+ "executable used to a version with FIPS mode "
+ "support or disable FIPS mode by setting the "
+ "'[oslo_messaging_rabbit] ssl_enforce_fips_mode' "
+ "configuration option to 'False'.")
+
self._url = ''
if url.hosts:
if url.transport.startswith('kombu+'):
@@ -705,6 +818,12 @@ class Connection(object):
except AttributeError:
pass
+ @property
+ def durable(self):
+ # Quorum queues are durable by default, durable option should
+ # be enabled by default with quorum queues
+ return self.amqp_durable_queues or self.rabbit_quorum_queue
+
@classmethod
def validate_ssl_version(cls, version):
key = version.lower()
@@ -713,6 +832,14 @@ class Connection(object):
except KeyError:
raise RuntimeError("Invalid SSL version : %s" % version)
+ def _get_quorum_configurations(self, driver_conf):
+ """Get the quorum queue configurations"""
+ delivery_limit = driver_conf.rabbit_quorum_delivery_limit
+ max_memory_length = driver_conf.rabbit_quroum_max_memory_length
+ max_memory_bytes = driver_conf.rabbit_quroum_max_memory_bytes
+ return QuorumMemConfig(delivery_limit, max_memory_length,
+ max_memory_bytes)
+
# NOTE(moguimar): default_password in this function's context is just
# a fallback option, not a hardcoded password.
def _transform_transport_url(self, url, host, default_username='', # nosec
@@ -772,7 +899,13 @@ class Connection(object):
str(exc), interval)
self._set_current_channel(None)
- self.connection.ensure_connection(errback=on_error)
+ self.connection.ensure_connection(
+ errback=on_error,
+ max_retries=self.max_retries,
+ interval_start=self.interval_start or 1,
+ interval_step=self.interval_stepping,
+ interval_max=self.interval_max,
+ )
self._set_current_channel(self.connection.channel())
self.set_transport_socket_timeout()
@@ -1194,12 +1327,14 @@ class Connection(object):
queue_name=queue_name or topic,
routing_key=topic,
type='topic',
- durable=self.amqp_durable_queues,
+ durable=self.durable,
exchange_auto_delete=self.amqp_auto_delete,
queue_auto_delete=self.amqp_auto_delete,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
- enable_cancel_on_failover=self.enable_cancel_on_failover)
+ enable_cancel_on_failover=self.enable_cancel_on_failover,
+ rabbit_quorum_queue=self.rabbit_quorum_queue,
+ rabbit_quorum_queue_config=self.rabbit_quorum_queue_config)
self.declare_consumer(consumer)
@@ -1323,7 +1458,11 @@ class Connection(object):
auto_delete=exchange.auto_delete,
name=routing_key,
routing_key=routing_key,
- queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0))
+ queue_arguments=_get_queue_arguments(
+ self.rabbit_ha_queues,
+ 0,
+ self.rabbit_quorum_queue,
+ self.rabbit_quorum_queue_config))
log_info = {'key': routing_key, 'exchange': exchange}
LOG.trace(
'Connection._publish_and_creates_default_queue: '
@@ -1379,7 +1518,7 @@ class Connection(object):
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
- durable=self.amqp_durable_queues,
+ durable=self.durable,
auto_delete=self.amqp_auto_delete)
self._ensure_publishing(self._publish, exchange, msg,
@@ -1401,7 +1540,7 @@ class Connection(object):
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
- durable=self.amqp_durable_queues,
+ durable=self.durable,
auto_delete=self.amqp_auto_delete)
self._ensure_publishing(self._publish_and_creates_default_queue,
diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py
index 8090e8d..9e5288d 100644
--- a/oslo_messaging/_drivers/pool.py
+++ b/oslo_messaging/_drivers/pool.py
@@ -69,7 +69,7 @@ class Pool(object, metaclass=abc.ABCMeta):
self._items.append((ttl_watch, item))
self._cond.notify()
- def get(self):
+ def get(self, retry=None):
"""Return an item from the pool, when one is available.
This may cause the calling thread to block.
@@ -95,7 +95,7 @@ class Pool(object, metaclass=abc.ABCMeta):
# We've grabbed a slot and dropped the lock, now do the creation
try:
- return self.create()
+ return self.create(retry=retry)
except Exception:
with self._cond:
self._current_size -= 1
@@ -111,7 +111,7 @@ class Pool(object, metaclass=abc.ABCMeta):
return
@abc.abstractmethod
- def create(self):
+ def create(self, retry=None):
"""Construct a new item."""
@@ -130,9 +130,9 @@ class ConnectionPool(Pool):
LOG.debug("Idle connection has expired and been closed."
" Pool size: %d" % len(self._items))
- def create(self, purpose=common.PURPOSE_SEND):
+ def create(self, purpose=common.PURPOSE_SEND, retry=None):
LOG.debug('Pool creating new connection')
- return self.connection_cls(self.conf, self.url, purpose)
+ return self.connection_cls(self.conf, self.url, purpose, retry=retry)
def empty(self):
for item in self.iter_free():
diff --git a/oslo_messaging/exceptions.py b/oslo_messaging/exceptions.py
index 48645b3..391fe46 100644
--- a/oslo_messaging/exceptions.py
+++ b/oslo_messaging/exceptions.py
@@ -47,3 +47,7 @@ class MessageUndeliverable(Exception):
self.exchange = exchange
self.routing_key = routing_key
self.message = message
+
+
+class ConfigurationError(Exception):
+ """Raised when messaging isn't configured correctly."""
diff --git a/oslo_messaging/notify/messaging.py b/oslo_messaging/notify/messaging.py
index 61c7357..da633d8 100644
--- a/oslo_messaging/notify/messaging.py
+++ b/oslo_messaging/notify/messaging.py
@@ -21,19 +21,30 @@ Notification drivers for sending notifications via messaging.
The messaging drivers publish notification messages to notification
listeners.
-The driver will block the notifier's thread until the notification message has
-been passed to the messaging transport. There is no guarantee that the
-notification message will be consumed by a notification listener.
+In case of the rabbit backend the driver will block the notifier's thread
+until the notification message has been passed to the messaging transport.
+There is no guarantee that the notification message will be consumed by a
+notification listener.
+
+In case of the kafka backend the driver will not block the notifier's thread
+but return immediately. The driver will try to deliver the message in the
+background.
Notification messages are sent 'at-most-once' - ensuring that they are not
duplicated.
If the connection to the messaging service is not active when a notification is
-sent this driver will block waiting for the connection to complete. If the
-connection fails to complete, the driver will try to re-establish that
+sent the rabbit backend will block waiting for the connection to complete.
+If the connection fails to complete, the driver will try to re-establish that
connection. By default this will continue indefinitely until the connection
completes. However, the retry parameter can be used to have the notification
-send fail with a MessageDeliveryFailure after the given number of retries.
+send fail. In this case an error is logged and the notifier's thread is resumed
+without any error.
+
+If the connection to the messaging service is not active when a notification is
+sent the kafka backend will return immediately and the backend tries to
+establish the connection and deliver the messages in the background.
+
"""
import logging
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 8955661..dbbf33c 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -31,6 +31,7 @@ import oslo_messaging
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers import impl_rabbit as rabbit_driver
+from oslo_messaging.exceptions import ConfigurationError
from oslo_messaging.exceptions import MessageDeliveryFailure
from oslo_messaging.tests import utils as test_utils
from oslo_messaging.transport import DriverLoadFailure
@@ -205,6 +206,65 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
)
+class TestRabbitDriverLoadSSLWithFIPS(test_utils.BaseTestCase):
+ scenarios = [
+ ('ssl_fips_mode', dict(options=dict(ssl=True,
+ ssl_enforce_fips_mode=True),
+ expected=True)),
+ ]
+
+ @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection'
+ '.ensure_connection')
+ @mock.patch('kombu.connection.Connection')
+ def test_driver_load_with_fips_supported(self,
+ connection_klass, fake_ensure):
+ self.config(ssl=True, ssl_enforce_fips_mode=True,
+ group="oslo_messaging_rabbit")
+ transport = oslo_messaging.get_transport(self.conf,
+ 'kombu+memory:////')
+ self.addCleanup(transport.cleanup)
+
+ with mock.patch.object(ssl, 'FIPS_mode',
+ create=True, return_value=True):
+ with mock.patch.object(ssl, 'FIPS_mode_set', create=True):
+
+ connection = transport._driver._get_connection()
+ connection_klass.assert_called_once_with(
+ 'memory:///', transport_options={
+ 'client_properties': {
+ 'capabilities': {
+ 'connection.blocked': True,
+ 'consumer_cancel_notify': True,
+ 'authentication_failure_close': True,
+ },
+ 'connection_name': connection.name},
+ 'confirm_publish': True,
+ 'on_blocked': mock.ANY,
+ 'on_unblocked': mock.ANY},
+ ssl=self.expected, login_method='AMQPLAIN',
+ heartbeat=60, failover_strategy='round-robin'
+ )
+
+ @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection'
+ '.ensure_connection')
+ @mock.patch('oslo_messaging._drivers.impl_rabbit.ssl')
+ @mock.patch('kombu.connection.Connection')
+ def test_fips_unsupported(self, connection_klass, fake_ssl, fake_ensure):
+ self.config(ssl=True, ssl_enforce_fips_mode=True,
+ group="oslo_messaging_rabbit")
+ transport = oslo_messaging.get_transport(self.conf,
+ 'kombu+memory:////')
+ self.addCleanup(transport.cleanup)
+
+ del fake_ssl.FIPS_mode
+
+ # We do this test only if FIPS mode is not supported to
+ # ensure that we hard fail.
+ self.assertRaises(
+ ConfigurationError,
+ transport._driver._get_connection)
+
+
class TestRabbitPublisher(test_utils.BaseTestCase):
@mock.patch('kombu.messaging.Producer.publish')
def test_send_with_timeout(self, fake_publish):
@@ -1008,21 +1068,36 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
self.connection.ensure, mock_callback,
retry=4)
- self.assertEqual(6, mock_callback.call_count)
+ # TODO(stephenfin): Remove when we drop support for kombu < 5.2.4
+ expected = 5
+ if kombu.VERSION < (5, 2, 4):
+ expected = 6
+ self.assertEqual(expected, mock_callback.call_count)
def test_ensure_one_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
self.connection.ensure, mock_callback,
retry=1)
- self.assertEqual(3, mock_callback.call_count)
+ # TODO(stephenfin): Remove when we drop support for kombu < 5.2.4
+ expected = 2
+ if kombu.VERSION < (5, 2, 4):
+ expected = 3
+ self.assertEqual(expected, mock_callback.call_count)
def test_ensure_no_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
- self.assertRaises(oslo_messaging.MessageDeliveryFailure,
- self.connection.ensure, mock_callback,
- retry=0)
- self.assertEqual(2, mock_callback.call_count)
+ self.assertRaises(
+ oslo_messaging.MessageDeliveryFailure,
+ self.connection.ensure,
+ mock_callback,
+ retry=0,
+ )
+ # TODO(stephenfin): Remove when we drop support for kombu < 5.2.4
+ expected = 1
+ if kombu.VERSION < (5, 2, 4):
+ expected = 2
+ self.assertEqual(expected, mock_callback.call_count)
class ConnectionLockTestCase(test_utils.BaseTestCase):
diff --git a/oslo_messaging/tests/drivers/test_pool.py b/oslo_messaging/tests/drivers/test_pool.py
index d5c6420..82a10e1 100644
--- a/oslo_messaging/tests/drivers/test_pool.py
+++ b/oslo_messaging/tests/drivers/test_pool.py
@@ -44,7 +44,7 @@ class PoolTestCase(test_utils.BaseTestCase):
class TestPool(pool.Pool):
- def create(self):
+ def create(self, retry=None):
return uuid.uuid4()
class ThreadWaitWaiter(object):
@@ -82,7 +82,7 @@ class PoolTestCase(test_utils.BaseTestCase):
p = self.TestPool(**kwargs)
if self.create_error:
- def create_error():
+ def create_error(retry=None):
raise RuntimeError
orig_create = p.create
self.useFixture(fixtures.MockPatchObject(
diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py
index c36a432..330bdab 100644
--- a/oslo_messaging/tests/notify/test_notifier.py
+++ b/oslo_messaging/tests/notify/test_notifier.py
@@ -18,6 +18,7 @@ import sys
import uuid
import fixtures
+from kombu import connection
from oslo_serialization import jsonutils
from oslo_utils import strutils
from oslo_utils import timeutils
@@ -228,6 +229,80 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
TestMessagingNotifier.generate_scenarios()
+class TestMessagingNotifierRetry(test_utils.BaseTestCase):
+
+ class TestingException(BaseException):
+ pass
+
+ def test_notifier_retry_connection_fails_rabbit(self):
+ """This test sets a small retry number for notification sending and
+ configures a non reachable message bus. The expectation that after the
+ configured number of retries the driver gives up the message sending.
+ """
+ self.config(
+ driver=["messagingv2"],
+ topics=["test-retry"],
+ retry=2,
+ group="oslo_messaging_notifications")
+ self.config(
+ # just to speed up the test execution
+ rabbit_retry_backoff=0,
+ group="oslo_messaging_rabbit")
+ transport = oslo_messaging.get_notification_transport(
+ self.conf, url='rabbit://')
+ notifier = oslo_messaging.Notifier(transport)
+
+ orig_establish_connection = connection.Connection._establish_connection
+ calls = []
+
+ def wrapped_establish_connection(*args, **kwargs):
+ if len(calls) > 2:
+ raise self.TestingException(
+ "Connection should only be retried twice due to "
+ "configuration")
+ else:
+ calls.append((args, kwargs))
+ orig_establish_connection(*args, **kwargs)
+
+ with mock.patch(
+ 'kombu.connection.Connection._establish_connection',
+ new=wrapped_establish_connection
+ ):
+ with mock.patch(
+ 'oslo_messaging.notify.messaging.LOG.exception'
+ ) as mock_log:
+ notifier.info({}, "test", {})
+
+ # one normal call plus two retries
+ self.assertEqual(3, len(calls))
+ # the error was caught and logged
+ mock_log.assert_called_once()
+
+ def test_notifier_retry_connection_fails_kafka(self):
+ """This test sets a small retry number for notification sending and
+ configures a non reachable message bus. The expectation that after the
+ configured number of retries the driver gives up the message sending.
+ """
+
+ self.config(
+ driver=["messagingv2"],
+ topics=["test-retry"],
+ retry=2,
+ group='oslo_messaging_notifications')
+
+ transport = oslo_messaging.get_notification_transport(
+ self.conf, url='kafka://')
+
+ notifier = oslo_messaging.Notifier(transport)
+
+ # Kafka's message producer interface is async, and there is no way
+ # from the oslo interface to force sending a pending message. So this
+ # call simply returns without i) failing to deliver the message to
+ # the non existent kafka bus ii) retrying the message delivery twice
+ # as the configuration requested it.
+ notifier.info({}, "test", {})
+
+
class TestSerializer(test_utils.BaseTestCase):
def setUp(self):
diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py
index 31fec16..cb12c16 100644
--- a/oslo_messaging/tests/test_transport.py
+++ b/oslo_messaging/tests/test_transport.py
@@ -115,7 +115,7 @@ class GetTransportTestCase(test_utils.BaseTestCase):
self.assertIsNotNone(transport_)
self.assertIs(transport_.conf, self.conf)
self.assertIs(transport_._driver, drvr)
- self.assertTrue(isinstance(transport_, transport.RPCTransport))
+ self.assertIsInstance(transport_, transport.RPCTransport)
driver.DriverManager.assert_called_once_with('oslo.messaging.drivers',
self.expect['backend'],
diff --git a/releasenotes/notes/add-quorum-control-configurations-beed79811ff97ba2.yaml b/releasenotes/notes/add-quorum-control-configurations-beed79811ff97ba2.yaml
new file mode 100644
index 0000000..42fdfbf
--- /dev/null
+++ b/releasenotes/notes/add-quorum-control-configurations-beed79811ff97ba2.yaml
@@ -0,0 +1,6 @@
+---
+features:
+ - |
+ Add quorum configuration x-max-in-memory-length,
+ x-max-in-memory-bytes, x-delivery-limit which control the quorum
+ queue memory usage and handle the message poisoning problem
diff --git a/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml b/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml
new file mode 100644
index 0000000..a88c5d8
--- /dev/null
+++ b/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml
@@ -0,0 +1,11 @@
+---
+features:
+ - |
+ Adding support for quorum queues. Quorum queues are enabled if the
+ ``rabbit_quorum_queue`` parameter is sets (``x-queue-type: quorum``).
+ Setting x-queue-type to quorum means that replicated FIFO queue based on
+ the Raft consensus algorithm will be used. It is available as of
+ RabbitMQ 3.8.0. The quorum queues are durable by default
+ (``amqp_durable_queues``) will be ignored.
+ when enabled the HA queues (``rabbit_ha_queues``) aka mirrored queues
+ should be disabled since the queue can't be both types at the same time
diff --git a/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml
new file mode 100644
index 0000000..d3d62cb
--- /dev/null
+++ b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml
@@ -0,0 +1,8 @@
+---
+fixes:
+ - |
+ As a fix for `bug 1917645 <https://launchpad.net/bugs/1917645>`_ the rabbit
+ backend is changed to use the ``[oslo_messaging_notifications]retry``
+ parameter when driver tries to connect to the message bus during
+ notification sending. Before this fix the rabbit backend retried the
+ connection forever blocking the caller thread.
diff --git a/releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml b/releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml
new file mode 100644
index 0000000..265e709
--- /dev/null
+++ b/releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml
@@ -0,0 +1,9 @@
+---
+upgrade:
+ - |
+ The ``[oslo_messaging_rabbit] heartbeat_in_pthread`` config option
+ defaults to ``False`` again.
+ For wsgi applications it is recommended to set this value to ``True``
+ but enabling it for non-wsgi services may break such service.
+ Please check https://bugs.launchpad.net/oslo.messaging/+bug/1934937
+ for more details.
diff --git a/releasenotes/notes/enforce_fips_mode-07dd259eb8a73c2b.yaml b/releasenotes/notes/enforce_fips_mode-07dd259eb8a73c2b.yaml
new file mode 100644
index 0000000..ba6ac7f
--- /dev/null
+++ b/releasenotes/notes/enforce_fips_mode-07dd259eb8a73c2b.yaml
@@ -0,0 +1,10 @@
+---
+features:
+ - |
+ Adding a new option, ``[oslo_messaging_rabbit] ssl_enforce_fips_mode``, to
+ the rabbitmq driver to enforce the OpenSSL FIPS mode if supported by the
+ version of Python.
+security:
+ - |
+ We are now able to enforce the OpenSSL FIPS mode by using
+ ``[oslo_messaging_rabbit] ssl_enforce_fips_mode``.
diff --git a/releasenotes/source/index.rst b/releasenotes/source/index.rst
index b69bcf2..0654d71 100644
--- a/releasenotes/source/index.rst
+++ b/releasenotes/source/index.rst
@@ -6,6 +6,8 @@
:maxdepth: 1
unreleased
+ zed
+ yoga
xena
wallaby
victoria
diff --git a/releasenotes/source/yoga.rst b/releasenotes/source/yoga.rst
new file mode 100644
index 0000000..7cd5e90
--- /dev/null
+++ b/releasenotes/source/yoga.rst
@@ -0,0 +1,6 @@
+=========================
+Yoga Series Release Notes
+=========================
+
+.. release-notes::
+ :branch: stable/yoga
diff --git a/releasenotes/source/zed.rst b/releasenotes/source/zed.rst
new file mode 100644
index 0000000..9608c05
--- /dev/null
+++ b/releasenotes/source/zed.rst
@@ -0,0 +1,6 @@
+========================
+Zed Series Release Notes
+========================
+
+.. release-notes::
+ :branch: stable/zed
diff --git a/setup.cfg b/setup.cfg
index d4c97c6..d95c084 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -6,7 +6,7 @@ summary = Oslo Messaging API
description_file =
README.rst
home_page = https://docs.openstack.org/oslo.messaging/latest/
-python_requires = >=3.6
+python_requires = >=3.8
classifier =
Environment :: OpenStack
Intended Audience :: Developers
@@ -15,9 +15,8 @@ classifier =
Operating System :: OS Independent
Programming Language :: Python
Programming Language :: Python :: 3
- Programming Language :: Python :: 3.6
- Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
+ Programming Language :: Python :: 3.9
Programming Language :: Python :: 3 :: Only
Programming Language :: Python :: Implementation :: CPython
diff --git a/test-requirements.txt b/test-requirements.txt
index 983c1c9..3a7c44f 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -3,7 +3,7 @@
# process, which may cause wedges in the gate later.
# Hacking already pins down pep8, pyflakes and flake8
-hacking>=3.0.1,<3.1.0 # Apache-2.0
+hacking>=3.0.1,<=4.1.0 # Apache-2.0
fixtures>=3.0.0 # Apache-2.0/BSD
stestr>=2.0.0 # Apache-2.0