diff options
72 files changed, 771 insertions, 1188 deletions
@@ -4,4 +4,7 @@ Oslo Messaging Library The Oslo messaging API supports RPC and notifications over a number of different messaging transports. -See also: `Library Documentation <http://docs.openstack.org/developer/oslo.messaging>`_ +* License: Apache License, Version 2.0 +* Documentation: http://docs.openstack.org/developer/oslo.messaging +* Source: http://git.openstack.org/cgit/openstack/oslo.messaging +* Bugs: http://bugs.launchpad.net/oslo.messaging diff --git a/doc/source/notification_listener.rst b/doc/source/notification_listener.rst index 0f555e5..3a69070 100644 --- a/doc/source/notification_listener.rst +++ b/doc/source/notification_listener.rst @@ -10,5 +10,7 @@ Notification Listener .. autoclass:: MessageHandlingServer :members: + :noindex: .. autofunction:: get_local_context + :noindex: diff --git a/tests/functional/__init__.py b/doc/source/static/.placeholder index e69de29..e69de29 100644 --- a/tests/functional/__init__.py +++ b/doc/source/static/.placeholder diff --git a/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po b/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po index 90ba808..83e9124 100644 --- a/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po +++ b/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po @@ -1,5 +1,5 @@ # French translations for oslo.messaging. -# Copyright (C) 2014 ORGANIZATION +# Copyright (C) 2015 ORGANIZATION # This file is distributed under the same license as the oslo.messaging # project. # @@ -10,9 +10,9 @@ msgid "" msgstr "" "Project-Id-Version: oslo.messaging\n" "Report-Msgid-Bugs-To: EMAIL@ADDRESS\n" -"POT-Creation-Date: 2014-12-03 06:10+0000\n" -"PO-Revision-Date: 2014-12-01 13:11+0000\n" -"Last-Translator: openstackjenkins <jenkins@openstack.org>\n" +"POT-Creation-Date: 2015-02-02 06:12+0000\n" +"PO-Revision-Date: 2014-12-20 02:07+0000\n" +"Last-Translator: Maxime COQUEREL <max.coquerel@gmail.com>\n" "Language-Team: French " "(http://www.transifex.com/projects/p/oslomessaging/language/fr/)\n" "Plural-Forms: nplurals=2; plural=(n > 1)\n" @@ -21,21 +21,21 @@ msgstr "" "Content-Transfer-Encoding: 8bit\n" "Generated-By: Babel 1.3\n" -#: oslo/messaging/_executors/impl_blocking.py:50 +#: oslo_messaging/_executors/impl_blocking.py:50 msgid "Unexpected exception occurred." -msgstr "" +msgstr "Exception inattendue s'est produite." -#: oslo/messaging/notify/_impl_routing.py:80 +#: oslo_messaging/notify/_impl_routing.py:80 #, python-format msgid "Failed to load any notifiers for %s" msgstr "Echec de chargement des notifications pour %s" -#: oslo/messaging/notify/_impl_routing.py:117 +#: oslo_messaging/notify/_impl_routing.py:117 #, python-format msgid "Routing '%(event)s' notification to '%(driver)s' driver" msgstr "Routage '%(event)s' notification du pilote %(driver)s'" -#: oslo/messaging/rpc/dispatcher.py:145 +#: oslo_messaging/rpc/dispatcher.py:150 #, python-format msgid "Exception during message handling: %s" msgstr "Exception lors de la manipulation du message: %s" diff --git a/oslo/messaging/notify/__init__.py b/oslo/messaging/notify/__init__.py index c5032db..9de8331 100644 --- a/oslo/messaging/notify/__init__.py +++ b/oslo/messaging/notify/__init__.py @@ -25,3 +25,4 @@ from .listener import * from .log_handler import * from .logger import * from .dispatcher import NotificationResult +from oslo_messaging.notify import _impl_test diff --git a/oslo/messaging/notify/_impl_test.py b/oslo/messaging/notify/_impl_test.py deleted file mode 100644 index 4e3caf7..0000000 --- a/oslo/messaging/notify/_impl_test.py +++ /dev/null @@ -1,13 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_messaging.notify._impl_test import * # noqa diff --git a/oslo_messaging/_cmd/zmq_receiver.py b/oslo_messaging/_cmd/zmq_receiver.py index 6346d6a..cbcdfe8 100644 --- a/oslo_messaging/_cmd/zmq_receiver.py +++ b/oslo_messaging/_cmd/zmq_receiver.py @@ -21,7 +21,8 @@ import contextlib import logging import sys -from oslo.config import cfg +from oslo_config import cfg + from oslo_messaging._drivers import impl_zmq from oslo_messaging._executors import base # FIXME(markmc) diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index 0648b1e..ebae514 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -27,10 +27,10 @@ import collections import logging import uuid +from oslo_config import cfg +from oslo_utils import strutils import six -from oslo.config import cfg -from oslo.utils import strutils from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers import pool @@ -42,11 +42,13 @@ amqp_opts = [ help='Use durable queues in AMQP.'), cfg.BoolOpt('amqp_auto_delete', default=False, + deprecated_group='DEFAULT', help='Auto-delete queues in AMQP.'), # FIXME(markmc): this was toplevel in openstack.common.rpc cfg.IntOpt('rpc_conn_pool_size', default=30, + deprecated_group='DEFAULT', help='Size of RPC connection pool.'), ] @@ -56,11 +58,11 @@ LOG = logging.getLogger(__name__) class ConnectionPool(pool.Pool): """Class that implements a Pool of Connections.""" - def __init__(self, conf, url, connection_cls): + def __init__(self, conf, rpc_conn_pool_size, url, connection_cls): self.connection_cls = connection_cls self.conf = conf self.url = url - super(ConnectionPool, self).__init__(self.conf.rpc_conn_pool_size) + super(ConnectionPool, self).__init__(rpc_conn_pool_size) self.reply_proxy = None # TODO(comstud): Timeout connections not used in a while diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index c222bce..9e967c4 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -147,12 +147,6 @@ class ReplyWaiters(object): 'Timed out waiting for a reply ' 'to message ID %s' % msg_id) - def check(self, msg_id): - try: - return self._queues[msg_id].get(block=False) - except moves.queue.Empty: - return None - def put(self, msg_id, message_data): queue = self._queues.get(msg_id) if not queue: @@ -162,13 +156,8 @@ class ReplyWaiters(object): else: queue.put(message_data) - def wake_all(self, except_id): - msg_ids = [i for i in self._queues.keys() if i != except_id] - for msg_id in msg_ids: - self.put(msg_id, self.WAKE_UP) - - def add(self, msg_id, queue): - self._queues[msg_id] = queue + def add(self, msg_id): + self._queues[msg_id] = moves.queue.Queue() if len(self._queues) > self._wrn_threshold: LOG.warn('Number of call queues is greater than warning ' 'threshold: %d. There could be a leak. Increasing' @@ -181,27 +170,41 @@ class ReplyWaiters(object): class ReplyWaiter(object): - - def __init__(self, conf, reply_q, conn, allowed_remote_exmods): - self.conf = conf + def __init__(self, reply_q, conn, allowed_remote_exmods): self.conn = conn - self.reply_q = reply_q self.allowed_remote_exmods = allowed_remote_exmods - - self.conn_lock = threading.Lock() - self.incoming = [] self.msg_id_cache = rpc_amqp._MsgIdCache() self.waiters = ReplyWaiters() - conn.declare_direct_consumer(reply_q, self) + self.conn.declare_direct_consumer(reply_q, self) + + self._thread_exit_event = threading.Event() + self._thread = threading.Thread(target=self.poll) + self._thread.daemon = True + self._thread.start() + + def stop(self): + if self._thread: + self._thread_exit_event.set() + self.conn.stop_consuming() + self._thread.join() + self._thread = None + + def poll(self): + while not self._thread_exit_event.is_set(): + try: + self.conn.consume(limit=1) + except Exception: + LOG.exception("Failed to process incoming message, " + "retrying...") def __call__(self, message): message.acknowledge() - self.incoming.append(message) + incoming_msg_id = message.pop('_msg_id', None) + self.waiters.put(incoming_msg_id, message) def listen(self, msg_id): - queue = moves.queue.Queue() - self.waiters.add(msg_id, queue) + self.waiters.add(msg_id) def unlisten(self, msg_id): self.waiters.remove(msg_id) @@ -225,96 +228,25 @@ class ReplyWaiter(object): result = data['result'] return result, ending - def _poll_connection(self, msg_id, timer): - while True: - while self.incoming: - message_data = self.incoming.pop(0) - - incoming_msg_id = message_data.pop('_msg_id', None) - if incoming_msg_id == msg_id: - return self._process_reply(message_data) - - self.waiters.put(incoming_msg_id, message_data) - + def wait(self, msg_id, timeout): + # NOTE(sileht): for each msg_id we receive two amqp message + # first one with the payload, a second one to ensure the other + # have finish to send the payload + timer = rpc_common.DecayingTimer(duration=timeout) + timer.start() + final_reply = None + ending = False + while not ending: timeout = timer.check_return(self._raise_timeout_exception, msg_id) try: - self.conn.consume(limit=1, timeout=timeout) - except rpc_common.Timeout: + message = self.waiters.get(msg_id, timeout=timeout) + except moves.queue.Empty: self._raise_timeout_exception(msg_id) - def _poll_queue(self, msg_id, timer): - timeout = timer.check_return(self._raise_timeout_exception, msg_id) - message = self.waiters.get(msg_id, timeout=timeout) - if message is self.waiters.WAKE_UP: - return None, None, True # lock was released - - reply, ending = self._process_reply(message) - return reply, ending, False - - def _check_queue(self, msg_id): - while True: - message = self.waiters.check(msg_id) - if message is self.waiters.WAKE_UP: - continue - if message is None: - return None, None, True # queue is empty - reply, ending = self._process_reply(message) - return reply, ending, False - - def wait(self, msg_id, timeout): - # - # NOTE(markmc): we're waiting for a reply for msg_id to come in for on - # the reply_q, but there may be other threads also waiting for replies - # to other msg_ids - # - # Only one thread can be consuming from the queue using this connection - # and we don't want to hold open a connection per thread, so instead we - # have the first thread take responsibility for passing replies not - # intended for itself to the appropriate thread. - # - timer = rpc_common.DecayingTimer(duration=timeout) - timer.start() - final_reply = None - while True: - if self.conn_lock.acquire(False): - # Ok, we're the thread responsible for polling the connection - try: - # Check the queue to see if a previous lock-holding thread - # queued up a reply already - while True: - reply, ending, empty = self._check_queue(msg_id) - if empty: - break - if not ending: - final_reply = reply - else: - return final_reply - - # Now actually poll the connection - while True: - reply, ending = self._poll_connection(msg_id, timer) - if not ending: - final_reply = reply - else: - return final_reply - finally: - self.conn_lock.release() - # We've got our reply, tell the other threads to wake up - # so that one of them will take over the responsibility for - # polling the connection - self.waiters.wake_all(msg_id) - else: - # We're going to wait for the first thread to pass us our reply - reply, ending, trylock = self._poll_queue(msg_id, timer) - if trylock: - # The first thread got its reply, let's try and take over - # the responsibility for polling - continue - if not ending: - final_reply = reply - else: - return final_reply + if not ending: + final_reply = reply + return final_reply class AMQPDriverBase(base.BaseDriver): @@ -349,7 +281,7 @@ class AMQPDriverBase(base.BaseDriver): conn = self._get_connection(pooled=False) - self._waiter = ReplyWaiter(self.conf, reply_q, conn, + self._waiter = ReplyWaiter(reply_q, conn, self._allowed_remote_exmods) self._reply_q = reply_q @@ -451,3 +383,11 @@ class AMQPDriverBase(base.BaseDriver): if self._connection_pool: self._connection_pool.empty() self._connection_pool = None + + with self._reply_q_lock: + if self._reply_q is not None: + self._waiter.stop() + self._reply_q_conn.close() + self._reply_q_conn = None + self._reply_q = None + self._waiter = None diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py index 3b0247f..b2d97b3 100644 --- a/oslo_messaging/_drivers/common.py +++ b/oslo_messaging/_drivers/common.py @@ -21,9 +21,9 @@ import sys import time import traceback +from oslo_serialization import jsonutils import six -from oslo.serialization import jsonutils import oslo_messaging from oslo_messaging._i18n import _ from oslo_messaging import _utils as utils @@ -341,8 +341,9 @@ class DecayingTimer(object): if self._duration is not None: self._ends_at = time.time() + max(0, self._duration) - def check_return(self, timeout_callback, *args, **kwargs): + def check_return(self, timeout_callback=None, *args, **kwargs): maximum = kwargs.pop('maximum', None) + if self._duration is None: return None if maximum is None else maximum if self._ends_at is None: @@ -350,7 +351,7 @@ class DecayingTimer(object): " that has not been started.")) left = self._ends_at - time.time() - if left <= 0: + if left <= 0 and timeout_callback is not None: timeout_callback(*args, **kwargs) return left if maximum is None else min(left, maximum) diff --git a/oslo_messaging/_drivers/impl_qpid.py b/oslo_messaging/_drivers/impl_qpid.py index 7bd5272..4640230 100644 --- a/oslo_messaging/_drivers/impl_qpid.py +++ b/oslo_messaging/_drivers/impl_qpid.py @@ -20,12 +20,12 @@ import os import random import time +from oslo_config import cfg +from oslo_serialization import jsonutils +from oslo_utils import importutils +from oslo_utils import netutils import six -from oslo.config import cfg -from oslo.serialization import jsonutils -from oslo.utils import importutils -from oslo.utils import netutils from oslo_messaging._drivers import amqp as rpc_amqp from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as rpc_common @@ -41,41 +41,52 @@ LOG = logging.getLogger(__name__) qpid_opts = [ cfg.StrOpt('qpid_hostname', default='localhost', + deprecated_group='DEFAULT', help='Qpid broker hostname.'), cfg.IntOpt('qpid_port', default=5672, + deprecated_group='DEFAULT', help='Qpid broker port.'), cfg.ListOpt('qpid_hosts', default=['$qpid_hostname:$qpid_port'], + deprecated_group='DEFAULT', help='Qpid HA cluster host:port pairs.'), cfg.StrOpt('qpid_username', default='', + deprecated_group='DEFAULT', help='Username for Qpid connection.'), cfg.StrOpt('qpid_password', default='', + deprecated_group='DEFAULT', help='Password for Qpid connection.', secret=True), cfg.StrOpt('qpid_sasl_mechanisms', default='', + deprecated_group='DEFAULT', help='Space separated list of SASL mechanisms to use for ' 'auth.'), cfg.IntOpt('qpid_heartbeat', default=60, + deprecated_group='DEFAULT', help='Seconds between connection keepalive heartbeats.'), cfg.StrOpt('qpid_protocol', default='tcp', + deprecated_group='DEFAULT', help="Transport to use, either 'tcp' or 'ssl'."), cfg.BoolOpt('qpid_tcp_nodelay', default=True, + deprecated_group='DEFAULT', help='Whether to disable the Nagle algorithm.'), cfg.IntOpt('qpid_receiver_capacity', default=1, + deprecated_group='DEFAULT', help='The number of prefetched messages held by receiver.'), # NOTE(russellb) If any additional versions are added (beyond 1 and 2), # this file could probably use some additional refactoring so that the # differences between each version are split into different classes. cfg.IntOpt('qpid_topology_version', default=1, + deprecated_group='DEFAULT', help="The qpid topology version to use. Version 1 is what " "was originally used by impl_qpid. Version 2 includes " "some backwards-incompatible changes that allow broker " @@ -459,6 +470,7 @@ class Connection(object): self.session = None self.consumers = {} self.conf = conf + self.driver_conf = conf.oslo_messaging_qpid self._consume_loop_stopped = False @@ -476,7 +488,7 @@ class Connection(object): self.brokers_params.append(params) else: # Old configuration format - for adr in self.conf.qpid_hosts: + for adr in self.driver_conf.qpid_hosts: hostname, port = netutils.parse_host_port( adr, default_port=5672) @@ -485,8 +497,8 @@ class Connection(object): params = { 'host': '%s:%d' % (hostname, port), - 'username': self.conf.qpid_username, - 'password': self.conf.qpid_password, + 'username': self.driver_conf.qpid_username, + 'password': self.driver_conf.qpid_password, } self.brokers_params.append(params) @@ -505,12 +517,12 @@ class Connection(object): self.connection.username = broker['username'] self.connection.password = broker['password'] - self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms + self.connection.sasl_mechanisms = self.driver_conf.qpid_sasl_mechanisms # Reconnection is done by self.reconnect() self.connection.reconnect = False - self.connection.heartbeat = self.conf.qpid_heartbeat - self.connection.transport = self.conf.qpid_protocol - self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay + self.connection.heartbeat = self.driver_conf.qpid_heartbeat + self.connection.transport = self.driver_conf.qpid_protocol + self.connection.tcp_nodelay = self.driver_conf.qpid_tcp_nodelay self.connection.open() def _register_consumer(self, consumer): @@ -633,7 +645,8 @@ class Connection(object): "%(err_str)s"), log_info) def _declare_consumer(): - consumer = consumer_cls(self.conf, self.session, topic, callback) + consumer = consumer_cls(self.driver_conf, self.session, topic, + callback) self._register_consumer(consumer) return consumer @@ -693,7 +706,8 @@ class Connection(object): "'%(topic)s': %(err_str)s"), log_info) def _publisher_send(): - publisher = cls(self.conf, self.session, topic=topic, **kwargs) + publisher = cls(self.driver_conf, self.session, topic=topic, + **kwargs) publisher.send(msg) return self.ensure(_connect_error, _publisher_send, retry=retry) @@ -764,10 +778,15 @@ class QpidDriver(amqpdriver.AMQPDriverBase): def __init__(self, conf, url, default_exchange=None, allowed_remote_exmods=None): - conf.register_opts(qpid_opts) - conf.register_opts(rpc_amqp.amqp_opts) - - connection_pool = rpc_amqp.ConnectionPool(conf, url, Connection) + opt_group = cfg.OptGroup(name='oslo_messaging_qpid', + title='QPID driver options') + conf.register_group(opt_group) + conf.register_opts(qpid_opts, group=opt_group) + conf.register_opts(rpc_amqp.amqp_opts, group=opt_group) + + connection_pool = rpc_amqp.ConnectionPool( + conf, conf.oslo_messaging_qpid.rpc_conn_pool_size, + url, Connection) super(QpidDriver, self).__init__(conf, url, connection_pool, diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 0347f3d..86fed1f 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -26,11 +26,11 @@ import kombu.connection import kombu.entity import kombu.exceptions import kombu.messaging +from oslo_config import cfg +from oslo_utils import netutils import six from six.moves.urllib import parse -from oslo.config import cfg -from oslo.utils import netutils from oslo_messaging._drivers import amqp as rpc_amqp from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as rpc_common @@ -43,6 +43,7 @@ from oslo_messaging import exceptions rabbit_opts = [ cfg.StrOpt('kombu_ssl_version', default='', + deprecated_group='DEFAULT', help='SSL version to use (valid only if SSL enabled). ' 'Valid values are TLSv1 and SSLv23. SSLv2, SSLv3, ' 'TLSv1_1, and TLSv1_2 may be available on some ' @@ -50,57 +51,72 @@ rabbit_opts = [ ), cfg.StrOpt('kombu_ssl_keyfile', default='', + deprecated_group='DEFAULT', help='SSL key file (valid only if SSL enabled).'), cfg.StrOpt('kombu_ssl_certfile', default='', + deprecated_group='DEFAULT', help='SSL cert file (valid only if SSL enabled).'), cfg.StrOpt('kombu_ssl_ca_certs', default='', + deprecated_group='DEFAULT', help='SSL certification authority file ' '(valid only if SSL enabled).'), cfg.FloatOpt('kombu_reconnect_delay', default=1.0, + deprecated_group='DEFAULT', help='How long to wait before reconnecting in response to an ' 'AMQP consumer cancel notification.'), cfg.StrOpt('rabbit_host', default='localhost', + deprecated_group='DEFAULT', help='The RabbitMQ broker address where a single node is ' 'used.'), cfg.IntOpt('rabbit_port', default=5672, + deprecated_group='DEFAULT', help='The RabbitMQ broker port where a single node is used.'), cfg.ListOpt('rabbit_hosts', default=['$rabbit_host:$rabbit_port'], + deprecated_group='DEFAULT', help='RabbitMQ HA cluster host:port pairs.'), cfg.BoolOpt('rabbit_use_ssl', default=False, + deprecated_group='DEFAULT', help='Connect over SSL for RabbitMQ.'), cfg.StrOpt('rabbit_userid', default='guest', + deprecated_group='DEFAULT', help='The RabbitMQ userid.'), cfg.StrOpt('rabbit_password', default='guest', + deprecated_group='DEFAULT', help='The RabbitMQ password.', secret=True), cfg.StrOpt('rabbit_login_method', default='AMQPLAIN', + deprecated_group='DEFAULT', help='The RabbitMQ login method.'), cfg.StrOpt('rabbit_virtual_host', default='/', + deprecated_group='DEFAULT', help='The RabbitMQ virtual host.'), cfg.IntOpt('rabbit_retry_interval', default=1, help='How frequently to retry connecting with RabbitMQ.'), cfg.IntOpt('rabbit_retry_backoff', default=2, + deprecated_group='DEFAULT', help='How long to backoff for between retries when connecting ' 'to RabbitMQ.'), cfg.IntOpt('rabbit_max_retries', default=0, + deprecated_group='DEFAULT', help='Maximum number of RabbitMQ connection retries. ' 'Default is 0 (infinite retry count).'), cfg.BoolOpt('rabbit_ha_queues', default=False, + deprecated_group='DEFAULT', help='Use HA queues in RabbitMQ (x-ha-policy: all). ' 'If you change this option, you must wipe the ' 'RabbitMQ database.'), @@ -108,6 +124,7 @@ rabbit_opts = [ # NOTE(sileht): deprecated option since oslo_messaging 1.5.0, cfg.BoolOpt('fake_rabbit', default=False, + deprecated_group='DEFAULT', help='Deprecated, use rpc_backend=kombu+memory or ' 'rpc_backend=fake'), ] @@ -376,7 +393,8 @@ class DirectPublisher(Publisher): options = {'durable': False, 'auto_delete': True, - 'exclusive': False} + 'exclusive': False, + 'passive': True} options.update(kwargs) super(DirectPublisher, self).__init__(channel, topic, topic, type='direct', **options) @@ -392,6 +410,7 @@ class TopicPublisher(Publisher): options = {'durable': conf.amqp_durable_queues, 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} + options.update(kwargs) super(TopicPublisher, self).__init__(channel, exchange_name, @@ -448,25 +467,26 @@ class Connection(object): self.consumers = [] self.consumer_num = itertools.count(1) self.conf = conf - self.max_retries = self.conf.rabbit_max_retries + self.driver_conf = self.conf.oslo_messaging_rabbit + self.max_retries = self.driver_conf.rabbit_max_retries # Try forever? if self.max_retries <= 0: self.max_retries = None - self.interval_start = self.conf.rabbit_retry_interval - self.interval_stepping = self.conf.rabbit_retry_backoff + self.interval_start = self.driver_conf.rabbit_retry_interval + self.interval_stepping = self.driver_conf.rabbit_retry_backoff # max retry-interval = 30 seconds self.interval_max = 30 self._ssl_params = self._fetch_ssl_params() - self._login_method = self.conf.rabbit_login_method + self._login_method = self.driver_conf.rabbit_login_method if url.virtual_host is not None: virtual_host = url.virtual_host else: - virtual_host = self.conf.rabbit_virtual_host + virtual_host = self.driver_conf.rabbit_virtual_host self._url = '' - if self.conf.fake_rabbit: + if self.driver_conf.fake_rabbit: LOG.warn("Deprecated: fake_rabbit option is deprecated, set " "rpc_backend to kombu+memory or use the fake " "driver instead.") @@ -492,13 +512,13 @@ class Connection(object): transport = url.transport.replace('kombu+', '') self._url = "%s://%s" % (transport, virtual_host) else: - for adr in self.conf.rabbit_hosts: + for adr in self.driver_conf.rabbit_hosts: hostname, port = netutils.parse_host_port( - adr, default_port=self.conf.rabbit_port) + adr, default_port=self.driver_conf.rabbit_port) self._url += '%samqp://%s:%s@%s:%s/%s' % ( ";" if self._url else '', - parse.quote(self.conf.rabbit_userid), - parse.quote(self.conf.rabbit_password), + parse.quote(self.driver_conf.rabbit_userid), + parse.quote(self.driver_conf.rabbit_password), hostname, port, virtual_host) @@ -516,13 +536,19 @@ class Connection(object): self.connection.info()) # NOTE(sileht): just ensure the connection is setuped at startup self.ensure(error_callback=None, - method=lambda channel: True) + method=lambda: True) LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'), self.connection.info()) + # NOTE(sileht): + # value choosen according the best practice from kombu: + # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop + self._poll_timeout = 1 + if self._url.startswith('memory://'): # Kludge to speed up tests. self.connection.transport.polling_interval = 0.0 + self._poll_timeout = 0.05 # FIXME(markmc): use oslo sslutils when it is available as a library _SSL_PROTOCOLS = { @@ -558,15 +584,15 @@ class Connection(object): ssl_params = dict() # http://docs.python.org/library/ssl.html - ssl.wrap_socket - if self.conf.kombu_ssl_version: + if self.driver_conf.kombu_ssl_version: ssl_params['ssl_version'] = self.validate_ssl_version( - self.conf.kombu_ssl_version) - if self.conf.kombu_ssl_keyfile: - ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile - if self.conf.kombu_ssl_certfile: - ssl_params['certfile'] = self.conf.kombu_ssl_certfile - if self.conf.kombu_ssl_ca_certs: - ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs + self.driver_conf.kombu_ssl_version) + if self.driver_conf.kombu_ssl_keyfile: + ssl_params['keyfile'] = self.driver_conf.kombu_ssl_keyfile + if self.driver_conf.kombu_ssl_certfile: + ssl_params['certfile'] = self.driver_conf.kombu_ssl_certfile + if self.driver_conf.kombu_ssl_ca_certs: + ssl_params['ca_certs'] = self.driver_conf.kombu_ssl_ca_certs # We might want to allow variations in the # future with this? ssl_params['cert_reqs'] = ssl.CERT_REQUIRED @@ -597,12 +623,11 @@ class Connection(object): retry = None def on_error(exc, interval): - self.channel = None - error_callback and error_callback(exc) - interval = (self.conf.kombu_reconnect_delay + interval - if self.conf.kombu_reconnect_delay > 0 else interval) + interval = (self.driver_conf.kombu_reconnect_delay + interval + if self.driver_conf.kombu_reconnect_delay > 0 + else interval) info = {'err_str': exc, 'sleep_time': interval} info.update(self.connection.info()) @@ -626,13 +651,14 @@ class Connection(object): # use kombu for HA connection, the interval_step # should sufficient, because the underlying kombu transport # connection object freed. - if self.conf.kombu_reconnect_delay > 0: - time.sleep(self.conf.kombu_reconnect_delay) + if self.driver_conf.kombu_reconnect_delay > 0: + time.sleep(self.driver_conf.kombu_reconnect_delay) def on_reconnection(new_channel): """Callback invoked when the kombu reconnects and creates a new channel, we use it the reconfigure our consumers. """ + self._set_current_channel(new_channel) self.consumer_num = itertools.count(1) for consumer in self.consumers: consumer.reconnect(new_channel) @@ -642,11 +668,15 @@ class Connection(object): {'hostname': self.connection.hostname, 'port': self.connection.port}) + def execute_method(channel): + self._set_current_channel(channel) + method() + recoverable_errors = (self.connection.recoverable_channel_errors + self.connection.recoverable_connection_errors) try: autoretry_method = self.connection.autoretry( - method, channel=self.channel, + execute_method, channel=self.channel, max_retries=retry, errback=on_error, interval_start=self.interval_start or 1, @@ -654,10 +684,10 @@ class Connection(object): on_revive=on_reconnection, ) ret, channel = autoretry_method() - self.channel = channel + self._set_current_channel(channel) return ret except recoverable_errors as exc: - self.channel = None + self._set_current_channel(None) # NOTE(sileht): number of retry exceeded and the connection # is still broken msg = _('Unable to connect to AMQP server on ' @@ -670,17 +700,21 @@ class Connection(object): LOG.error(msg) raise exceptions.MessageDeliveryFailure(msg) + def _set_current_channel(self, new_channel): + if self.channel is not None and new_channel != self.channel: + self.connection.maybe_close_channel(self.channel) + self.channel = new_channel + def close(self): """Close/release this connection.""" if self.connection: + self._set_current_channel(None) self.connection.release() self.connection = None def reset(self): """Reset a connection so it can be used again.""" - if self.channel is not None: - self.channel.close() - self.channel = self.connection.channel() + self._set_current_channel(self.connection.channel()) self.consumers = [] self.consumer_num = itertools.count(1) @@ -694,9 +728,9 @@ class Connection(object): LOG.error(_("Failed to declare consumer for topic '%(topic)s': " "%(err_str)s"), log_info) - def _declare_consumer(channel): - consumer = consumer_cls(self.conf, channel, topic, callback, - six.next(self.consumer_num)) + def _declare_consumer(): + consumer = consumer_cls(self.driver_conf, self.channel, topic, + callback, six.next(self.consumer_num)) self.consumers.append(consumer) return consumer @@ -718,7 +752,7 @@ class Connection(object): LOG.exception(_('Failed to consume message from queue: %s'), exc) - def _consume(channel): + def _consume(): if self.do_consume: queues_head = self.consumers[:-1] # not fanout. queues_tail = self.consumers[-1] # fanout @@ -727,10 +761,8 @@ class Connection(object): queues_tail.consume(nowait=False) self.do_consume = False - # NOTE(sileht): - # maximun value choosen according the best practice from kombu: - # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop - poll_timeout = 1 if timeout is None else min(timeout, 1) + poll_timeout = (self._poll_timeout if timeout is None + else min(timeout, self._poll_timeout)) while True: if self._consume_loop_stopped: self._consume_loop_stopped = False @@ -739,8 +771,8 @@ class Connection(object): try: return self.connection.drain_events(timeout=poll_timeout) except socket.timeout as exc: - poll_timeout = timer.check_return(_raise_timeout, exc, - maximum=1) + poll_timeout = timer.check_return( + _raise_timeout, exc, maximum=self._poll_timeout) for iteration in itertools.count(0): if limit and iteration >= limit: @@ -756,8 +788,9 @@ class Connection(object): LOG.exception(_("Failed to publish message to topic " "'%(topic)s': %(err_str)s"), log_info) - def _publish(channel): - publisher = cls(self.conf, channel, topic=topic, **kwargs) + def _publish(): + publisher = cls(self.driver_conf, self.channel, topic=topic, + **kwargs) publisher.send(msg, timeout) self.ensure(_error_callback, _publish, retry=retry) @@ -784,7 +817,31 @@ class Connection(object): def direct_send(self, msg_id, msg): """Send a 'direct' message.""" - self.publisher_send(DirectPublisher, msg_id, msg) + + timer = rpc_common.DecayingTimer(duration=60) + timer.start() + # NOTE(sileht): retry at least 60sec, after we have a good change + # that the caller is really dead too... + + while True: + try: + self.publisher_send(DirectPublisher, msg_id, msg) + except self.connection.channel_errors as exc: + # NOTE(noelbk/sileht): + # If rabbit dies, the consumer can be disconnected before the + # publisher sends, and if the consumer hasn't declared the + # queue, the publisher's will send a message to an exchange + # that's not bound to a queue, and the message wll be lost. + # So we set passive=True to the publisher exchange and catch + # the 404 kombu ChannelError and retry until the exchange + # appears + if exc.code == 404 and timer.check_return() > 0: + LOG.info(_LI("The exchange to reply to %s doesn't " + "exist yet, retrying...") % msg_id) + time.sleep(1) + continue + raise + return def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): """Send a 'topic' message.""" @@ -818,10 +875,15 @@ class RabbitDriver(amqpdriver.AMQPDriverBase): def __init__(self, conf, url, default_exchange=None, allowed_remote_exmods=None): - conf.register_opts(rabbit_opts) - conf.register_opts(rpc_amqp.amqp_opts) - - connection_pool = rpc_amqp.ConnectionPool(conf, url, Connection) + opt_group = cfg.OptGroup(name='oslo_messaging_rabbit', + title='RabbitMQ driver options') + conf.register_group(opt_group) + conf.register_opts(rabbit_opts, group=opt_group) + conf.register_opts(rpc_amqp.amqp_opts, group=opt_group) + + connection_pool = rpc_amqp.ConnectionPool( + conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size, + url, Connection) super(RabbitDriver, self).__init__(conf, url, connection_pool, diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 678b3f5..23961ee 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -25,17 +25,17 @@ import uuid import eventlet import greenlet +from oslo_config import cfg +from oslo_serialization import jsonutils +from oslo_utils import excutils +from oslo_utils import importutils import six from six import moves -from oslo.config import cfg from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common from oslo_messaging._executors import base as executor_base # FIXME(markmc) from oslo_messaging._i18n import _, _LE -from oslo.serialization import jsonutils -from oslo.utils import excutils -from oslo.utils import importutils zmq = importutils.try_import('eventlet.green.zmq') @@ -302,9 +302,9 @@ class InternalContext(object): data.setdefault('args', {}) try: - result = proxy.dispatch( - ctx, data['version'], data['method'], - data.get('namespace'), **data['args']) + if not data.get("method"): + raise KeyError + result = proxy.dispatch(ctx, data) return ConsumerBase.normalize_reply(result, ctx.replies) except greenlet.GreenletExit: # ignore these since they are just from shutdowns @@ -368,18 +368,13 @@ class ConsumerBase(object): # Method starting with - are # processed internally. (non-valid method name) method = data.get('method') - if not method: - LOG.error(_("RPC message did not include method.")) - return - # Internal method # uses internal context for safety. if method == '-reply': self.private_ctx.reply(ctx, proxy, **data['args']) return - proxy.dispatch(ctx, data['version'], - data['method'], data.get('namespace'), **data['args']) + proxy.dispatch(ctx, data) class ZmqBaseReactor(ConsumerBase): @@ -834,16 +829,7 @@ class ZmqListener(base.Listener): super(ZmqListener, self).__init__(driver) self.incoming_queue = moves.queue.Queue() - def dispatch(self, ctxt, version, method, namespace, **kwargs): - message = { - 'method': method, - 'args': kwargs - } - if version: - message['version'] = version - if namespace: - message['namespace'] = namespace - + def dispatch(self, ctxt, message): incoming = ZmqIncomingMessage(self, ctxt.to_dict(), message) diff --git a/oslo_messaging/_drivers/matchmaker.py b/oslo_messaging/_drivers/matchmaker.py index 0b3ba4b..bea3f7f 100644 --- a/oslo_messaging/_drivers/matchmaker.py +++ b/oslo_messaging/_drivers/matchmaker.py @@ -20,8 +20,8 @@ import contextlib import logging import eventlet +from oslo_config import cfg -from oslo.config import cfg from oslo_messaging._i18n import _ matchmaker_opts = [ diff --git a/oslo_messaging/_drivers/matchmaker_redis.py b/oslo_messaging/_drivers/matchmaker_redis.py index 760561d..4ba5447 100644 --- a/oslo_messaging/_drivers/matchmaker_redis.py +++ b/oslo_messaging/_drivers/matchmaker_redis.py @@ -16,8 +16,9 @@ The MatchMaker classes should accept a Topic or Fanout exchange key and return keys for direct exchanges, per (approximate) AMQP parlance. """ -from oslo.config import cfg -from oslo.utils import importutils +from oslo_config import cfg +from oslo_utils import importutils + from oslo_messaging._drivers import matchmaker as mm_common redis = importutils.try_import('redis') @@ -127,7 +128,7 @@ class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase): # No value is needed, we just # care if it exists. Sets aren't viable # because only keys can expire. - pipe.set(key_host, '') + pipe.sadd(key_host, '') pipe.execute() diff --git a/oslo_messaging/_drivers/matchmaker_ring.py b/oslo_messaging/_drivers/matchmaker_ring.py index 0b65875..0fd918c 100644 --- a/oslo_messaging/_drivers/matchmaker_ring.py +++ b/oslo_messaging/_drivers/matchmaker_ring.py @@ -20,7 +20,8 @@ import itertools import json import logging -from oslo.config import cfg +from oslo_config import cfg + from oslo_messaging._drivers import matchmaker as mm from oslo_messaging._i18n import _ diff --git a/oslo_messaging/_drivers/protocols/amqp/controller.py b/oslo_messaging/_drivers/protocols/amqp/controller.py index ea0950e..73f5cf1 100644 --- a/oslo_messaging/_drivers/protocols/amqp/controller.py +++ b/oslo_messaging/_drivers/protocols/amqp/controller.py @@ -29,11 +29,11 @@ import logging import threading import uuid +from oslo_config import cfg import proton import pyngus from six import moves -from oslo.config import cfg from oslo_messaging._drivers.protocols.amqp import eventloop from oslo_messaging._drivers.protocols.amqp import opts from oslo_messaging import transport diff --git a/oslo_messaging/_drivers/protocols/amqp/driver.py b/oslo_messaging/_drivers/protocols/amqp/driver.py index 8ec8b9e..ba27f0c 100644 --- a/oslo_messaging/_drivers/protocols/amqp/driver.py +++ b/oslo_messaging/_drivers/protocols/amqp/driver.py @@ -25,10 +25,10 @@ import os import threading import time +from oslo_serialization import jsonutils import proton from six import moves -from oslo.serialization import jsonutils import oslo_messaging from oslo_messaging._drivers import base from oslo_messaging._drivers import common diff --git a/oslo_messaging/_drivers/protocols/amqp/opts.py b/oslo_messaging/_drivers/protocols/amqp/opts.py index 7aade7a..0eae3be 100644 --- a/oslo_messaging/_drivers/protocols/amqp/opts.py +++ b/oslo_messaging/_drivers/protocols/amqp/opts.py @@ -12,7 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo.config import cfg +from oslo_config import cfg amqp1_opts = [ diff --git a/oslo_messaging/_executors/base.py b/oslo_messaging/_executors/base.py index d60d656..2cc667e 100644 --- a/oslo_messaging/_executors/base.py +++ b/oslo_messaging/_executors/base.py @@ -14,10 +14,9 @@ import abc +from oslo_config import cfg import six -from oslo.config import cfg - _pool_opts = [ cfg.IntOpt('rpc_thread_pool_size', default=64, diff --git a/oslo_messaging/_executors/impl_eventlet.py b/oslo_messaging/_executors/impl_eventlet.py index 95d0a2f..555ae03 100644 --- a/oslo_messaging/_executors/impl_eventlet.py +++ b/oslo_messaging/_executors/impl_eventlet.py @@ -20,8 +20,8 @@ import eventlet from eventlet.green import threading as greenthreading from eventlet import greenpool import greenlet +from oslo_utils import excutils -from oslo.utils import excutils from oslo_messaging._executors import base from oslo_messaging import localcontext diff --git a/oslo_messaging/_executors/impl_thread.py b/oslo_messaging/_executors/impl_thread.py index 2ed4560..ca8ebc7 100644 --- a/oslo_messaging/_executors/impl_thread.py +++ b/oslo_messaging/_executors/impl_thread.py @@ -20,9 +20,9 @@ import sys import threading from concurrent import futures +from oslo_utils import excutils import six -from oslo.utils import excutils from oslo_messaging._executors import base diff --git a/oslo_messaging/_i18n.py b/oslo_messaging/_i18n.py index c4a0e7f..a012b58 100644 --- a/oslo_messaging/_i18n.py +++ b/oslo_messaging/_i18n.py @@ -16,10 +16,10 @@ See http://docs.openstack.org/developer/oslo.i18n/usage.html """ -from oslo import i18n +import oslo_i18n -_translators = i18n.TranslatorFactory(domain='oslo.messaging') +_translators = oslo_i18n.TranslatorFactory(domain='oslo.messaging') # The primary translation function using the well-known name "_" _ = _translators.primary diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py index c41da38..bda151f 100644 --- a/oslo_messaging/conffixture.py +++ b/oslo_messaging/conffixture.py @@ -20,9 +20,9 @@ import sys import fixtures -def _import_opts(conf, module, opts): +def _import_opts(conf, module, opts, group=None): __import__(module) - conf.register_opts(getattr(sys.modules[module], opts)) + conf.register_opts(getattr(sys.modules[module], opts), group=group) class ConfFixture(fixtures.Fixture): @@ -45,11 +45,17 @@ class ConfFixture(fixtures.Fixture): def __init__(self, conf): self.conf = conf _import_opts(self.conf, - 'oslo_messaging._drivers.impl_rabbit', 'rabbit_opts') + 'oslo_messaging._drivers.impl_rabbit', 'rabbit_opts', + 'oslo_messaging_rabbit') _import_opts(self.conf, - 'oslo_messaging._drivers.impl_qpid', 'qpid_opts') + 'oslo_messaging._drivers.amqp', 'amqp_opts', + 'oslo_messaging_rabbit') _import_opts(self.conf, - 'oslo_messaging._drivers.amqp', 'amqp_opts') + 'oslo_messaging._drivers.impl_qpid', 'qpid_opts', + 'oslo_messaging_qpid') + _import_opts(self.conf, + 'oslo_messaging._drivers.amqp', 'amqp_opts', + 'oslo_messaging_qpid') _import_opts(self.conf, 'oslo_messaging.rpc.client', '_client_opts') _import_opts(self.conf, 'oslo_messaging.transport', '_transport_opts') _import_opts(self.conf, diff --git a/oslo_messaging/notify/__init__.py b/oslo_messaging/notify/__init__.py index c5032db..dd5304d 100644 --- a/oslo_messaging/notify/__init__.py +++ b/oslo_messaging/notify/__init__.py @@ -17,9 +17,11 @@ __all__ = ['Notifier', 'LoggingNotificationHandler', 'get_notification_listener', 'NotificationResult', + 'NotificationFilter', 'PublishErrorsHandler', 'LoggingErrorNotificationHandler'] +from .filter import NotificationFilter from .notifier import * from .listener import * from .log_handler import * diff --git a/oslo_messaging/notify/_impl_log.py b/oslo_messaging/notify/_impl_log.py index e04aa61..400f556 100644 --- a/oslo_messaging/notify/_impl_log.py +++ b/oslo_messaging/notify/_impl_log.py @@ -17,7 +17,8 @@ import logging -from oslo.serialization import jsonutils +from oslo_serialization import jsonutils + from oslo_messaging.notify import notifier diff --git a/oslo_messaging/notify/_impl_routing.py b/oslo_messaging/notify/_impl_routing.py index 666ad4d..b420870 100644 --- a/oslo_messaging/notify/_impl_routing.py +++ b/oslo_messaging/notify/_impl_routing.py @@ -16,11 +16,11 @@ import fnmatch import logging +from oslo_config import cfg import six from stevedore import dispatch import yaml -from oslo.config import cfg from oslo_messaging._i18n import _ from oslo_messaging.notify import notifier diff --git a/oslo_messaging/notify/dispatcher.py b/oslo_messaging/notify/dispatcher.py index a9e8cc6..48c4e20 100644 --- a/oslo_messaging/notify/dispatcher.py +++ b/oslo_messaging/notify/dispatcher.py @@ -56,7 +56,9 @@ class NotificationDispatcher(object): for endpoint, prio in itertools.product(endpoints, PRIORITIES): if hasattr(endpoint, prio): method = getattr(endpoint, prio) - self._callbacks_by_priority.setdefault(prio, []).append(method) + screen = getattr(endpoint, 'filter_rule', None) + self._callbacks_by_priority.setdefault(prio, []).append( + (screen, method)) priorities = self._callbacks_by_priority.keys() self._targets_priorities = set(itertools.product(self.targets, @@ -118,7 +120,10 @@ class NotificationDispatcher(object): payload = self.serializer.deserialize_entity(ctxt, message.get('payload')) - for callback in self._callbacks_by_priority.get(priority, []): + for screen, callback in self._callbacks_by_priority.get(priority, []): + if screen and not screen.match(ctxt, publisher_id, event_type, + metadata, payload): + continue localcontext.set_local_context(ctxt) try: if executor_callback: diff --git a/oslo_messaging/notify/filter.py b/oslo_messaging/notify/filter.py new file mode 100644 index 0000000..b23fac4 --- /dev/null +++ b/oslo_messaging/notify/filter.py @@ -0,0 +1,77 @@ +# +# Copyright 2013 eNovance +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import re + + +class NotificationFilter(object): + + """Filter notification messages + + The NotificationFilter class is used to filter notifications that an + endpoint will received. + + The notification can be filter on different fields: context, + publisher_id, event_type, metadata and payload. + + The filter is done via a regular expression + + filter_rule = NotificationFilter( + publisher_id='^compute.*', + context={'tenant_id': '^5f643cfc-664b-4c69-8000-ce2ed7b08216$', + 'roles='private'}, + event_type='^compute\.instance\..*', + metadata={'timestamp': 'Aug'}, + payload={'state': '^active$') + + """ + + def __init__(self, context=None, publisher_id=None, event_type=None, + metadata=None, payload=None): + self._regex_publisher_id = None + self._regex_event_type = None + + if publisher_id is not None: + self._regex_publisher_id = re.compile(publisher_id) + if event_type is not None: + self._regex_event_type = re.compile(event_type) + self._regexs_context = self._build_regex_dict(context) + self._regexs_metadata = self._build_regex_dict(metadata) + self._regexs_payload = self._build_regex_dict(payload) + + @staticmethod + def _build_regex_dict(regex_list): + if regex_list is None: + return {} + return dict((k, re.compile(regex_list[k])) for k in regex_list) + + @staticmethod + def _check_for_mismatch(data, regex): + if isinstance(regex, dict): + for k in regex: + if (k not in data or not regex[k].match(data[k])): + return True + elif regex is not None and not regex.match(data): + return True + return False + + def match(self, context, publisher_id, event_type, metadata, payload): + if (self._check_for_mismatch(publisher_id, self._regex_publisher_id) or + self._check_for_mismatch(event_type, self._regex_event_type) or + self._check_for_mismatch(context, self._regexs_context) or + self._check_for_mismatch(metadata, self._regexs_metadata) or + self._check_for_mismatch(payload, self._regexs_payload)): + return False + return True diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py index a1586dd..1aa4613 100644 --- a/oslo_messaging/notify/listener.py +++ b/oslo_messaging/notify/listener.py @@ -40,14 +40,19 @@ and eventlet executors available. A simple example of a notification listener with multiple endpoints might be:: - from oslo.config import cfg + from oslo_config import cfg import oslo_messaging class NotificationEndpoint(object): + filter_rule = NotificationFilter(publisher_id='^compute.*') + def warn(self, ctxt, publisher_id, event_type, payload, metadata): do_something(payload) class ErrorEndpoint(object): + filter_rule = NotificationFilter(event_type='^instance\..*\.start$', + context={'ctxt_key': 'regexp'}) + def error(self, ctxt, publisher_id, event_type, payload, metadata): do_something(payload) @@ -69,7 +74,8 @@ A simple example of a notification listener with multiple endpoints might be:: A notifier sends a notification on a topic with a priority, the notification listener will receive this notification if the topic of this one have been set in one of the targets and if an endpoint implements the method named like the -priority +priority and if the notification match the NotificationFilter rule set into +the filter_rule attribute of the endpoint. Parameters to endpoint methods are the request context supplied by the client, the publisher_id of the notification message, the event_type, the payload and diff --git a/oslo_messaging/notify/log_handler.py b/oslo_messaging/notify/log_handler.py index dbbc67f..eb4b35c 100644 --- a/oslo_messaging/notify/log_handler.py +++ b/oslo_messaging/notify/log_handler.py @@ -12,7 +12,7 @@ import logging -from oslo.config import cfg +from oslo_config import cfg class LoggingErrorNotificationHandler(logging.Handler): diff --git a/oslo_messaging/notify/logger.py b/oslo_messaging/notify/logger.py index eb8e445..4117c2c 100644 --- a/oslo_messaging/notify/logger.py +++ b/oslo_messaging/notify/logger.py @@ -16,7 +16,8 @@ Driver for the Python logging package that sends log records as a notification. """ import logging -from oslo.config import cfg +from oslo_config import cfg + from oslo_messaging.notify import notifier from oslo_messaging import transport diff --git a/oslo_messaging/notify/middleware.py b/oslo_messaging/notify/middleware.py index 5529713..4297e29 100644 --- a/oslo_messaging/notify/middleware.py +++ b/oslo_messaging/notify/middleware.py @@ -21,11 +21,11 @@ import os.path import sys import traceback as tb +from oslo_config import cfg +from oslo_middleware import base import six import webob.dec -from oslo.config import cfg -from oslo.middleware import base import oslo_messaging from oslo_messaging._i18n import _LE from oslo_messaging import notify diff --git a/oslo_messaging/notify/notifier.py b/oslo_messaging/notify/notifier.py index 7cc3d9c..5565d72 100644 --- a/oslo_messaging/notify/notifier.py +++ b/oslo_messaging/notify/notifier.py @@ -19,11 +19,11 @@ import abc import logging import uuid +from oslo_config import cfg +from oslo_utils import timeutils import six from stevedore import named -from oslo.config import cfg -from oslo.utils import timeutils from oslo_messaging import serializer as msg_serializer _notifier_opts = [ diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index 4008465..664e586 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -34,9 +34,6 @@ from oslo_messaging.rpc import client from oslo_messaging import transport _global_opt_lists = [ - amqp.amqp_opts, - impl_qpid.qpid_opts, - impl_rabbit.rabbit_opts, impl_zmq.zmq_opts, matchmaker.matchmaker_opts, base._pool_opts, @@ -50,6 +47,10 @@ _opts = [ ('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts), ('matchmaker_ring', matchmaker_ring.matchmaker_opts), ('oslo_messaging_amqp', amqp_opts.amqp1_opts), + ('oslo_messaging_rabbit', list(itertools.chain(amqp.amqp_opts, + impl_rabbit.rabbit_opts))), + ('oslo_messaging_qpid', list(itertools.chain(amqp.amqp_opts, + impl_qpid.qpid_opts))) ] diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py index 8ac73a1..3ea7034 100644 --- a/oslo_messaging/rpc/client.py +++ b/oslo_messaging/rpc/client.py @@ -23,9 +23,9 @@ __all__ = [ 'RemoteError', ] +from oslo_config import cfg import six -from oslo.config import cfg from oslo_messaging._drivers import base as driver_base from oslo_messaging import _utils as utils from oslo_messaging import exceptions diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py index f1cbc12..7f6295b 100644 --- a/oslo_messaging/rpc/server.py +++ b/oslo_messaging/rpc/server.py @@ -42,7 +42,7 @@ complete. A simple example of an RPC server with multiple endpoints might be:: - from oslo.config import cfg + from oslo_config import cfg import oslo_messaging class ServerControlEndpoint(object): diff --git a/oslo_messaging/tests/drivers/test_impl_qpid.py b/oslo_messaging/tests/drivers/test_impl_qpid.py index 1c90831..2d7dd6a 100644 --- a/oslo_messaging/tests/drivers/test_impl_qpid.py +++ b/oslo_messaging/tests/drivers/test_impl_qpid.py @@ -187,7 +187,8 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase): def setUp(self): super(TestQpidInvalidTopologyVersion, self).setUp() - self.config(qpid_topology_version=-1) + self.config(qpid_topology_version=-1, + group='oslo_messaging_qpid') def test_invalid_topology_version(self): def consumer_callback(msg): @@ -199,11 +200,11 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase): # 1. qpid driver raises Exception(msg) for invalid topology version # 2. flake8 - H202 assertRaises Exception too broad exception_msg = ("Invalid value for qpid_topology_version: %d" % - self.conf.qpid_topology_version) + self.conf.oslo_messaging_qpid.qpid_topology_version) recvd_exc_msg = '' try: - self.consumer_cls(self.conf, + self.consumer_cls(self.conf.oslo_messaging_qpid, self.session_receive, msgid_or_topic, consumer_callback, @@ -215,7 +216,7 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase): recvd_exc_msg = '' try: - self.publisher_cls(self.conf, + self.publisher_cls(self.conf.oslo_messaging_qpid, self.session_send, topic=msgid_or_topic, **self.publisher_kwargs) @@ -258,13 +259,15 @@ class TestQpidDirectConsumerPublisher(_QpidBaseTestCase): self.msgid = str(random.randint(1, 100)) # create a DirectConsumer and DirectPublisher class objects - self.dir_cons = qpid_driver.DirectConsumer(self.conf, - self.session_receive, - self.msgid, - self.consumer_callback) - self.dir_pub = qpid_driver.DirectPublisher(self.conf, - self.session_send, - self.msgid) + self.dir_cons = qpid_driver.DirectConsumer( + self.conf.oslo_messaging_qpid, + self.session_receive, + self.msgid, + self.consumer_callback) + self.dir_pub = qpid_driver.DirectPublisher( + self.conf.oslo_messaging_qpid, + self.session_send, + self.msgid) def try_send_msg(no_msgs): for i in range(no_msgs): @@ -418,7 +421,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase): def test_qpid_topic_and_fanout(self): for receiver_id in range(self.no_receivers): - consumer = self.consumer_cls(self.conf, + consumer = self.consumer_cls(self.conf.oslo_messaging_qpid, self.session_receive, self.receive_topic, self.consumer_callback, @@ -431,7 +434,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase): self._receiver_threads.append(thread) for sender_id in range(self.no_senders): - publisher = self.publisher_cls(self.conf, + publisher = self.publisher_cls(self.conf.oslo_messaging_qpid, self.session_send, topic=self.topic, **self.publisher_kwargs) @@ -483,7 +486,8 @@ class TestDriverInterface(_QpidBaseTestCase): def setUp(self): super(TestDriverInterface, self).setUp() - self.config(qpid_topology_version=2) + self.config(qpid_topology_version=2, + group='oslo_messaging_qpid') transport = oslo_messaging.get_transport(self.conf) self.driver = transport._driver @@ -554,7 +558,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase): brokers = ['host1', 'host2', 'host3', 'host4', 'host5'] brokers_count = len(brokers) - self.config(qpid_hosts=brokers) + self.config(qpid_hosts=brokers, + group='oslo_messaging_qpid') with mock.patch('qpid.messaging.Connection') as conn_mock: # starting from the first broker in the list @@ -777,7 +782,8 @@ class QPidHATestCase(test_utils.BaseTestCase): self.config(qpid_hosts=self.brokers, qpid_username=None, - qpid_password=None) + qpid_password=None, + group='oslo_messaging_qpid') hostname_sets = set() self.info = {'attempt': 0, diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 3aace8b..e60bd3b 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -21,11 +21,11 @@ import uuid import fixtures import kombu import mock +from oslo_config import cfg +from oslo_serialization import jsonutils from oslotest import mockpatch import testscenarios -from oslo.config import cfg -from oslo.serialization import jsonutils import oslo_messaging from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as driver_common @@ -41,7 +41,7 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase): super(TestDeprecatedRabbitDriverLoad, self).setUp( conf=cfg.ConfigOpts()) self.messaging_conf.transport_driver = 'rabbit' - self.config(fake_rabbit=True) + self.config(fake_rabbit=True, group="oslo_messaging_rabbit") def test_driver_load(self): transport = oslo_messaging.get_transport(self.conf) @@ -673,7 +673,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): self.config(rabbit_hosts=self.brokers, rabbit_retry_interval=0.01, rabbit_retry_backoff=0.01, - kombu_reconnect_delay=0) + kombu_reconnect_delay=0, + group="oslo_messaging_rabbit") self.kombu_connect = mock.Mock() self.useFixture(mockpatch.Patch( diff --git a/oslo_messaging/tests/drivers/test_impl_zmq.py b/oslo_messaging/tests/drivers/test_impl_zmq.py index d3b03f6..5593884 100644 --- a/oslo_messaging/tests/drivers/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/test_impl_zmq.py @@ -18,14 +18,12 @@ import socket import fixtures import mock +from oslo_utils import importutils import testtools -from oslo.utils import importutils import oslo_messaging from oslo_messaging.tests import utils as test_utils -# NOTE(jamespage) the zmq driver implementation is currently tied -# to eventlet so we have to monkey_patch to support testing # eventlet is not yet py3 compatible, so skip if not installed eventlet = importutils.try_import('eventlet') @@ -44,34 +42,12 @@ def get_unused_port(): return port -class TestConfZmqDriverLoad(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestConfZmqDriverLoad, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - - def test_driver_load(self): - transport = oslo_messaging.get_transport(self.conf) - self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver) - - -class stopRpc(object): - def __init__(self, attrs): - self.attrs = attrs - - def __call__(self): - if self.attrs['reactor']: - self.attrs['reactor'].close() - if self.attrs['driver']: - self.attrs['driver'].cleanup() - - -class TestZmqBasics(test_utils.BaseTestCase): +class ZmqBaseTestCase(test_utils.BaseTestCase): + """Base test case for all ZMQ tests that make use of the ZMQ Proxy""" @testtools.skipIf(impl_zmq is None, "zmq not available") def setUp(self): - super(TestZmqBasics, self).setUp() + super(ZmqBaseTestCase, self).setUp() self.messaging_conf.transport_driver = 'zmq' # Get driver transport = oslo_messaging.get_transport(self.conf) @@ -94,6 +70,32 @@ class TestZmqBasics(test_utils.BaseTestCase): self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') self.addCleanup(stopRpc(self.__dict__)) + +class TestConfZmqDriverLoad(test_utils.BaseTestCase): + + @testtools.skipIf(impl_zmq is None, "zmq not available") + def setUp(self): + super(TestConfZmqDriverLoad, self).setUp() + self.messaging_conf.transport_driver = 'zmq' + + def test_driver_load(self): + transport = oslo_messaging.get_transport(self.conf) + self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver) + + +class stopRpc(object): + def __init__(self, attrs): + self.attrs = attrs + + def __call__(self): + if self.attrs['reactor']: + self.attrs['reactor'].close() + if self.attrs['driver']: + self.attrs['driver'].cleanup() + + +class TestZmqBasics(ZmqBaseTestCase): + def test_start_stop_listener(self): target = oslo_messaging.Target(topic='testtopic') listener = self.driver.listen(target) @@ -277,32 +279,7 @@ class TestZmqIncomingMessage(test_utils.BaseTestCase): msg.requeue() -class TestZmqConnection(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestZmqConnection, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} - self.config(**kwargs) - - # Start RPC - LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() - - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') - self.addCleanup(stopRpc(self.__dict__)) +class TestZmqConnection(ZmqBaseTestCase): @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) def test_zmqconnection_create_consumer(self, mock_reactor): @@ -379,32 +356,7 @@ class TestZmqConnection(test_utils.BaseTestCase): self.assertTrue(conn.reactor.consume_in_thread.called) -class TestZmqListener(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestZmqListener, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} - self.config(**kwargs) - - # Start RPC - LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() - - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') - self.addCleanup(stopRpc(self.__dict__)) +class TestZmqListener(ZmqBaseTestCase): def test_zmqlistener_no_msg(self): listener = impl_zmq.ZmqListener(self.driver) @@ -416,40 +368,16 @@ class TestZmqListener(test_utils.BaseTestCase): kwargs = {'a': 1, 'b': 2} m = mock.Mock() ctxt = mock.Mock(autospec=impl_zmq.RpcContext) - eventlet.spawn_n(listener.dispatch, ctxt, 0, - m.fake_method, 'name.space', **kwargs) + message = {'namespace': 'name.space', 'method': m.fake_method, + 'args': kwargs} + eventlet.spawn_n(listener.dispatch, ctxt, message) resp = listener.poll(timeout=10) msg = {'method': m.fake_method, 'namespace': 'name.space', 'args': kwargs} self.assertEqual(resp.message, msg) -class TestZmqDriver(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestZmqDriver, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} - self.config(**kwargs) - - # Start RPC - LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() - - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') - self.addCleanup(stopRpc(self.__dict__)) +class TestZmqDriver(ZmqBaseTestCase): @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True) @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True) diff --git a/oslo_messaging/tests/drivers/test_matchmaker.py b/oslo_messaging/tests/drivers/test_matchmaker.py index 40608f4..61c37a9 100644 --- a/oslo_messaging/tests/drivers/test_matchmaker.py +++ b/oslo_messaging/tests/drivers/test_matchmaker.py @@ -12,9 +12,9 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_utils import importutils import testtools -from oslo.utils import importutils from oslo_messaging.tests import utils as test_utils # NOTE(jamespage) matchmaker tied directly to eventlet diff --git a/oslo_messaging/tests/drivers/test_matchmaker_redis.py b/oslo_messaging/tests/drivers/test_matchmaker_redis.py index 19f6bc1..a0c4d7f 100644 --- a/oslo_messaging/tests/drivers/test_matchmaker_redis.py +++ b/oslo_messaging/tests/drivers/test_matchmaker_redis.py @@ -12,9 +12,9 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_utils import importutils import testtools -from oslo.utils import importutils from oslo_messaging.tests import utils as test_utils redis = importutils.try_import('redis') diff --git a/oslo_messaging/tests/drivers/test_matchmaker_ring.py b/oslo_messaging/tests/drivers/test_matchmaker_ring.py index 48c5a65..5f15600 100644 --- a/oslo_messaging/tests/drivers/test_matchmaker_ring.py +++ b/oslo_messaging/tests/drivers/test_matchmaker_ring.py @@ -12,9 +12,9 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_utils import importutils import testtools -from oslo.utils import importutils from oslo_messaging.tests import utils as test_utils # NOTE(jamespage) matchmaker tied directly to eventlet diff --git a/oslo_messaging/tests/functional/gate/gate_hook.sh b/oslo_messaging/tests/functional/gate/gate_hook.sh new file mode 100755 index 0000000..c181c95 --- /dev/null +++ b/oslo_messaging/tests/functional/gate/gate_hook.sh @@ -0,0 +1,52 @@ +#!/bin/bash +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# This script is executed inside gate_hook function in devstack gate. + +RPC_BACKEND=$1 + +DEVSTACK_LOCAL_CONFIG="" + +case $RPC_BACKEND in + rabbit) + DEVSTACK_LOCAL_CONFIG+=$'RABBIT_HOST=127.0.0.1\n' + DEVSTACK_LOCAL_CONFIG+=$'RABBIT_USERID=stackrabbit\n' + DEVSTACK_LOCAL_CONFIG+=$'RABBIT_PASSWORD=secretrabbit\n' + ;; + qpid) + export DEVSTACK_GATE_QPID=1 + DEVSTACK_LOCAL_CONFIG+=$'QPID_HOST=127.0.0.1\n' + DEVSTACK_LOCAL_CONFIG+=$'QPID_USERNAME=stackqpid\n' + DEVSTACK_LOCAL_CONFIG+=$'QPID_PASSWORD=secretqpid\n' + ;; + amqp1) + export DEVSTACK_GATE_QPID=1 + DEVSTACK_LOCAL_CONFIG+=$'QPID_HOST=127.0.0.1\n' + DEVSTACK_LOCAL_CONFIG+=$'QPID_USERNAME=stackqpid\n' + DEVSTACK_LOCAL_CONFIG+=$'QPID_PASSWORD=secretqpid\n' + DEVSTACK_LOCAL_CONFIG+=$'RPC_MESSAGING_PROTOCOL=AMQP1\n' + ;; + zeromq) + export DEVSTACK_GATE_ZEROMQ=1 + DEVSTACK_LOCAL_CONFIG+=$'ZEROMQ_MATCHMAKER=redis\n' + DEVSTACK_LOCAL_CONFIG+=$'MATCHMAKER_REDIS_HOST=127.0.0.1\n' + ;; +esac + +export DEVSTACK_LOCAL_CONFIG +export DEVSTACK_GATE_INSTALL_TESTONLY=1 +export DEVSTACK_GATE_NO_SERVICES=1 +export KEEP_LOCALRC=1 + +$BASE/new/devstack-gate/devstack-vm-gate.sh diff --git a/oslo_messaging/tests/functional/gate/post_test_hook.sh b/oslo_messaging/tests/functional/gate/post_test_hook.sh new file mode 100755 index 0000000..df6c51a --- /dev/null +++ b/oslo_messaging/tests/functional/gate/post_test_hook.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# This script is executed inside post_test_hook function in devstack gate. + +RPC_BACKEND=$1 + +cd $BASE/new/oslo.messaging +sudo -H -u stack tox -e py27-func-$RPC_BACKEND diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index bbdd11b..e9d68ba 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -17,9 +17,9 @@ import time import uuid import fixtures +from oslo_config import cfg from six import moves -from oslo.config import cfg import oslo_messaging from oslo_messaging.notify import notifier from oslo_messaging.tests import utils as test_utils @@ -270,6 +270,9 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase): self.url = os.environ.get('TRANSPORT_URL') if not self.url: self.skipTest("No transport url configured") + zmq_matchmaker = os.environ.get('ZMQ_MATCHMAKER') + if zmq_matchmaker: + self.conf.rpc_zmq_matchmaker = zmq_matchmaker class NotificationFixture(fixtures.Fixture): diff --git a/oslo_messaging/tests/notify/test_dispatcher.py b/oslo_messaging/tests/notify/test_dispatcher.py index 029f737..0449f8a 100644 --- a/oslo_messaging/tests/notify/test_dispatcher.py +++ b/oslo_messaging/tests/notify/test_dispatcher.py @@ -16,9 +16,9 @@ import itertools import mock +from oslo_utils import timeutils import testscenarios -from oslo.utils import timeutils import oslo_messaging from oslo_messaging.notify import dispatcher as notify_dispatcher from oslo_messaging.tests import utils as test_utils @@ -147,3 +147,107 @@ class TestDispatcher(test_utils.BaseTestCase): callback() mylog.warning.assert_called_once_with('Unknown priority "%s"', 'what???') + + +class TestDispatcherFilter(test_utils.BaseTestCase): + scenarios = [ + ('publisher_id_match', + dict(filter_rule=dict(publisher_id='^compute.*'), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=True)), + ('publisher_id_nomatch', + dict(filter_rule=dict(publisher_id='^compute.*'), + publisher_id='network01.manager', + event_type='instance.create.start', + context={}, + match=False)), + ('event_type_match', + dict(filter_rule=dict(event_type='^instance\.create'), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=True)), + ('event_type_nomatch', + dict(filter_rule=dict(event_type='^instance\.delete'), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=False)), + ('context_match', + dict(filter_rule=dict(context={'user': '^adm'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={'user': 'admin'}, + match=True)), + ('context_key_missing', + dict(filter_rule=dict(context={'user': '^adm'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={'project': 'admin'}, + metadata={}, + match=False)), + ('metadata_match', + dict(filter_rule=dict(metadata={'message_id': '^99'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=True)), + ('metadata_key_missing', + dict(filter_rule=dict(metadata={'user': '^adm'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=False)), + ('payload_match', + dict(filter_rule=dict(payload={'state': '^active$'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=True)), + ('payload_no_match', + dict(filter_rule=dict(payload={'state': '^deleted$'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=False)), + ('payload_key_missing', + dict(filter_rule=dict(payload={'user': '^adm'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=False)), + ('mix_match', + dict(filter_rule=dict(event_type='^instance\.create', + publisher_id='^compute', + context={'user': '^adm'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={'user': 'admin'}, + match=True)), + + ] + + def test_filters(self): + notification_filter = oslo_messaging.NotificationFilter( + **self.filter_rule) + endpoint = mock.Mock(spec=['info'], filter_rule=notification_filter) + + targets = [oslo_messaging.Target(topic='notifications')] + dispatcher = notify_dispatcher.NotificationDispatcher( + targets, [endpoint], serializer=None, allow_requeue=True) + message = {'payload': {'state': 'active'}, + 'priority': 'info', + 'publisher_id': self.publisher_id, + 'event_type': self.event_type, + 'timestamp': '2014-03-03 18:21:04.369234', + 'message_id': '99863dda-97f0-443a-a0c1-6ed317b7fd45'} + incoming = mock.Mock(ctxt=self.context, message=message) + with dispatcher(incoming) as callback: + callback() + + if self.match: + self.assertEqual(1, endpoint.info.call_count) + else: + self.assertEqual(0, endpoint.info.call_count) diff --git a/oslo_messaging/tests/notify/test_listener.py b/oslo_messaging/tests/notify/test_listener.py index 3c1e404..0bd032c 100644 --- a/oslo_messaging/tests/notify/test_listener.py +++ b/oslo_messaging/tests/notify/test_listener.py @@ -17,9 +17,9 @@ import threading import time import mock +from oslo_config import cfg import testscenarios -from oslo.config import cfg import oslo_messaging from oslo_messaging.notify import dispatcher from oslo_messaging.tests import utils as test_utils diff --git a/oslo_messaging/tests/notify/test_logger.py b/oslo_messaging/tests/notify/test_logger.py index c551493..693249b 100644 --- a/oslo_messaging/tests/notify/test_logger.py +++ b/oslo_messaging/tests/notify/test_logger.py @@ -19,10 +19,10 @@ import os import sys import mock +from oslo_utils import timeutils import testscenarios import testtools -from oslo.utils import timeutils import oslo_messaging from oslo_messaging.tests.notify import test_notifier from oslo_messaging.tests import utils as test_utils @@ -56,7 +56,7 @@ class TestLogNotifier(test_utils.BaseTestCase): # eventlet logging.logThreads = 0 - @mock.patch('oslo.utils.timeutils.utcnow') + @mock.patch('oslo_utils.timeutils.utcnow') def test_logger(self, mock_utcnow): with mock.patch('oslo_messaging.transport.get_transport', return_value=test_notifier._FakeTransport(self.conf)): @@ -99,7 +99,7 @@ class TestLogNotifier(test_utils.BaseTestCase): @testtools.skipUnless(hasattr(logging.config, 'dictConfig'), "Need logging.config.dictConfig (Python >= 2.7)") - @mock.patch('oslo.utils.timeutils.utcnow') + @mock.patch('oslo_utils.timeutils.utcnow') def test_logging_conf(self, mock_utcnow): with mock.patch('oslo_messaging.transport.get_transport', return_value=test_notifier._FakeTransport(self.conf)): diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py index 6b94d15..a8733f7 100644 --- a/oslo_messaging/tests/notify/test_notifier.py +++ b/oslo_messaging/tests/notify/test_notifier.py @@ -20,13 +20,13 @@ import uuid import fixtures import mock +from oslo_serialization import jsonutils +from oslo_utils import timeutils from stevedore import dispatch from stevedore import extension import testscenarios import yaml -from oslo.serialization import jsonutils -from oslo.utils import timeutils import oslo_messaging from oslo_messaging.notify import _impl_log from oslo_messaging.notify import _impl_messaging @@ -147,7 +147,7 @@ class TestMessagingNotifier(test_utils.BaseTestCase): self.stubs.Set(_impl_messaging, 'LOG', self.logger) self.stubs.Set(msg_notifier, '_LOG', self.logger) - @mock.patch('oslo.utils.timeutils.utcnow') + @mock.patch('oslo_utils.timeutils.utcnow') def test_notifier(self, mock_utcnow): drivers = [] if self.v1: @@ -223,7 +223,7 @@ class TestSerializer(test_utils.BaseTestCase): super(TestSerializer, self).setUp() self.addCleanup(_impl_test.reset) - @mock.patch('oslo.utils.timeutils.utcnow') + @mock.patch('oslo_utils.timeutils.utcnow') def test_serializer(self, mock_utcnow): transport = _FakeTransport(self.conf) @@ -266,7 +266,7 @@ class TestSerializer(test_utils.BaseTestCase): class TestLogNotifier(test_utils.BaseTestCase): - @mock.patch('oslo.utils.timeutils.utcnow') + @mock.patch('oslo_utils.timeutils.utcnow') def test_notifier(self, mock_utcnow): self.config(notification_driver=['log']) diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py index 2276e6b..e163a92 100644 --- a/oslo_messaging/tests/rpc/test_client.py +++ b/oslo_messaging/tests/rpc/test_client.py @@ -13,9 +13,9 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_config import cfg import testscenarios -from oslo.config import cfg import oslo_messaging from oslo_messaging import exceptions from oslo_messaging import serializer as msg_serializer diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index 3970b95..7b14085 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -16,9 +16,9 @@ import threading import mock +from oslo_config import cfg import testscenarios -from oslo.config import cfg import oslo_messaging from oslo_messaging.tests import utils as test_utils diff --git a/oslo_messaging/tests/test_amqp_driver.py b/oslo_messaging/tests/test_amqp_driver.py index 213a832..a1f5ea5 100644 --- a/oslo_messaging/tests/test_amqp_driver.py +++ b/oslo_messaging/tests/test_amqp_driver.py @@ -20,10 +20,10 @@ import threading import time import uuid +from oslo_utils import importutils from six import moves import testtools -from oslo.utils import importutils import oslo_messaging from oslo_messaging.tests import utils as test_utils diff --git a/oslo_messaging/tests/test_exception_serialization.py b/oslo_messaging/tests/test_exception_serialization.py index 1d1de31..c1079c0 100644 --- a/oslo_messaging/tests/test_exception_serialization.py +++ b/oslo_messaging/tests/test_exception_serialization.py @@ -15,10 +15,10 @@ import sys +from oslo_serialization import jsonutils import six import testscenarios -from oslo.serialization import jsonutils import oslo_messaging from oslo_messaging._drivers import common as exceptions from oslo_messaging.tests import utils as test_utils diff --git a/oslo_messaging/tests/test_opts.py b/oslo_messaging/tests/test_opts.py index 37b621f..e150cbc 100644 --- a/oslo_messaging/tests/test_opts.py +++ b/oslo_messaging/tests/test_opts.py @@ -29,13 +29,15 @@ class OptsTestCase(test_utils.BaseTestCase): super(OptsTestCase, self).setUp() def _test_list_opts(self, result): - self.assertEqual(4, len(result)) + self.assertEqual(6, len(result)) groups = [g for (g, l) in result] self.assertIn(None, groups) self.assertIn('matchmaker_ring', groups) self.assertIn('matchmaker_redis', groups) self.assertIn('oslo_messaging_amqp', groups) + self.assertIn('oslo_messaging_rabbit', groups) + self.assertIn('oslo_messaging_qpid', groups) opt_names = [o.name for (g, l) in result for o in l] self.assertIn('rpc_backend', opt_names) diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py index 53d991a..a09953a 100644 --- a/oslo_messaging/tests/test_transport.py +++ b/oslo_messaging/tests/test_transport.py @@ -14,12 +14,12 @@ # under the License. import fixtures +from oslo_config import cfg import six from six.moves import mox from stevedore import driver import testscenarios -from oslo.config import cfg import oslo_messaging from oslo_messaging.tests import utils as test_utils from oslo_messaging import transport diff --git a/oslo_messaging/tests/test_utils.py b/oslo_messaging/tests/test_utils.py index d57e32e..5754205 100644 --- a/oslo_messaging/tests/test_utils.py +++ b/oslo_messaging/tests/test_utils.py @@ -13,6 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. +import time + +import mock + from oslo_messaging._drivers import common from oslo_messaging import _utils as utils from oslo_messaging.tests import utils as test_utils @@ -51,14 +55,28 @@ class VersionIsCompatibleTestCase(test_utils.BaseTestCase): class TimerTestCase(test_utils.BaseTestCase): - def test_duration_is_none(self): + def test_no_duration_no_callback(self): t = common.DecayingTimer() t.start() - remaining = t.check_return(None) + remaining = t.check_return() self.assertEqual(None, remaining) - def test_duration_is_none_and_maximun_set(self): + def test_no_duration_but_maximun(self): t = common.DecayingTimer() t.start() - remaining = t.check_return(None, maximum=2) + remaining = t.check_return(maximum=2) self.assertEqual(2, remaining) + + def test_duration_expired_no_callback(self): + t = common.DecayingTimer(2) + t._ends_at = time.time() - 10 + remaining = t.check_return() + self.assertAlmostEqual(-10, remaining, 0) + + def test_duration_callback(self): + t = common.DecayingTimer(2) + t._ends_at = time.time() - 10 + callback = mock.Mock() + remaining = t.check_return(callback) + self.assertAlmostEqual(-10, remaining, 0) + callback.assert_called_once diff --git a/oslo_messaging/tests/utils.py b/oslo_messaging/tests/utils.py index f1b44a6..bfa73a8 100644 --- a/oslo_messaging/tests/utils.py +++ b/oslo_messaging/tests/utils.py @@ -19,12 +19,10 @@ """Common utilities used in testing""" -import six - -from oslo.config import cfg - +from oslo_config import cfg from oslotest import base from oslotest import moxstubout +import six TRUE_VALUES = ('true', '1', 'yes') diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py index 181cacb..5a8c389 100644 --- a/oslo_messaging/transport.py +++ b/oslo_messaging/transport.py @@ -27,11 +27,11 @@ __all__ = [ 'set_transport_defaults', ] +from oslo_config import cfg import six from six.moves.urllib import parse from stevedore import driver -from oslo.config import cfg from oslo_messaging import exceptions diff --git a/requirements-py3.txt b/requirements-py3.txt index 2c55212..64f3cb8 100644 --- a/requirements-py3.txt +++ b/requirements-py3.txt @@ -5,7 +5,7 @@ oslo.config>=1.6.0 # Apache-2.0 oslo.serialization>=1.2.0 # Apache-2.0 oslo.utils>=1.2.0 # Apache-2.0 -oslo.i18n>=1.0.0 # Apache-2.0 +oslo.i18n>=1.3.0 # Apache-2.0 stevedore>=1.1.0 # Apache-2.0 # for jsonutils diff --git a/requirements.txt b/requirements.txt index be424f2..352b14a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,7 @@ pbr>=0.6,!=0.7,<1.0 oslo.config>=1.6.0 # Apache-2.0 oslo.utils>=1.2.0 # Apache-2.0 oslo.serialization>=1.2.0 # Apache-2.0 -oslo.i18n>=1.0.0 # Apache-2.0 +oslo.i18n>=1.3.0 # Apache-2.0 stevedore>=1.1.0 # Apache-2.0 # for jsonutils @@ -16,7 +16,7 @@ six>=1.7.0 # FIXME(markmc): remove this when the drivers no longer # import eventlet -eventlet>=0.15.2 +eventlet>=0.16.1 # for the routing notifier PyYAML>=3.1.0 @@ -44,6 +44,7 @@ oslo.messaging.drivers = oslo.messaging.executors = blocking = oslo_messaging._executors.impl_blocking:BlockingExecutor eventlet = oslo_messaging._executors.impl_eventlet:EventletExecutor + threading = oslo_messaging._executors.impl_thread:ThreadExecutor oslo.messaging.notify.drivers = messagingv2 = oslo_messaging.notify._impl_messaging:MessagingV2Driver @@ -77,3 +78,6 @@ input_file = oslo.messaging/locale/oslo.messaging.pot keywords = _ gettext ngettext l_ lazy_gettext mapping_file = babel.cfg output_file = oslo.messaging/locale/oslo.messaging.pot + +[pbr] +warnerrors = true diff --git a/tests/drivers/test_impl_qpid.py b/tests/drivers/test_impl_qpid.py index c8ae401..d8cd1e7 100644 --- a/tests/drivers/test_impl_qpid.py +++ b/tests/drivers/test_impl_qpid.py @@ -187,7 +187,8 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase): def setUp(self): super(TestQpidInvalidTopologyVersion, self).setUp() - self.config(qpid_topology_version=-1) + self.config(qpid_topology_version=-1, + group='oslo_messaging_qpid') def test_invalid_topology_version(self): def consumer_callback(msg): @@ -199,11 +200,11 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase): # 1. qpid driver raises Exception(msg) for invalid topology version # 2. flake8 - H202 assertRaises Exception too broad exception_msg = ("Invalid value for qpid_topology_version: %d" % - self.conf.qpid_topology_version) + self.conf.oslo_messaging_qpid.qpid_topology_version) recvd_exc_msg = '' try: - self.consumer_cls(self.conf, + self.consumer_cls(self.conf.oslo_messaging_qpid, self.session_receive, msgid_or_topic, consumer_callback, @@ -215,7 +216,7 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase): recvd_exc_msg = '' try: - self.publisher_cls(self.conf, + self.publisher_cls(self.conf.oslo_messaging_qpid, self.session_send, topic=msgid_or_topic, **self.publisher_kwargs) @@ -258,13 +259,15 @@ class TestQpidDirectConsumerPublisher(_QpidBaseTestCase): self.msgid = str(random.randint(1, 100)) # create a DirectConsumer and DirectPublisher class objects - self.dir_cons = qpid_driver.DirectConsumer(self.conf, - self.session_receive, - self.msgid, - self.consumer_callback) - self.dir_pub = qpid_driver.DirectPublisher(self.conf, - self.session_send, - self.msgid) + self.dir_cons = qpid_driver.DirectConsumer( + self.conf.oslo_messaging_qpid, + self.session_receive, + self.msgid, + self.consumer_callback) + self.dir_pub = qpid_driver.DirectPublisher( + self.conf.oslo_messaging_qpid, + self.session_send, + self.msgid) def try_send_msg(no_msgs): for i in range(no_msgs): @@ -418,7 +421,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase): def test_qpid_topic_and_fanout(self): for receiver_id in range(self.no_receivers): - consumer = self.consumer_cls(self.conf, + consumer = self.consumer_cls(self.conf.oslo_messaging_qpid, self.session_receive, self.receive_topic, self.consumer_callback, @@ -431,7 +434,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase): self._receiver_threads.append(thread) for sender_id in range(self.no_senders): - publisher = self.publisher_cls(self.conf, + publisher = self.publisher_cls(self.conf.oslo_messaging_qpid, self.session_send, topic=self.topic, **self.publisher_kwargs) @@ -483,7 +486,8 @@ class TestDriverInterface(_QpidBaseTestCase): def setUp(self): super(TestDriverInterface, self).setUp() - self.config(qpid_topology_version=2) + self.config(qpid_topology_version=2, + group='oslo_messaging_qpid') transport = messaging.get_transport(self.conf) self.driver = transport._driver @@ -554,7 +558,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase): brokers = ['host1', 'host2', 'host3', 'host4', 'host5'] brokers_count = len(brokers) - self.config(qpid_hosts=brokers) + self.config(qpid_hosts=brokers, + group='oslo_messaging_qpid') with mock.patch('qpid.messaging.Connection') as conn_mock: # starting from the first broker in the list @@ -777,7 +782,8 @@ class QPidHATestCase(test_utils.BaseTestCase): self.config(qpid_hosts=self.brokers, qpid_username=None, - qpid_password=None) + qpid_password=None, + group='oslo_messaging_qpid') hostname_sets = set() self.info = {'attempt': 0, diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py index 3dfbf13..8e9b29e 100644 --- a/tests/drivers/test_impl_rabbit.py +++ b/tests/drivers/test_impl_rabbit.py @@ -41,7 +41,7 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase): super(TestDeprecatedRabbitDriverLoad, self).setUp( conf=cfg.ConfigOpts()) self.messaging_conf.transport_driver = 'rabbit' - self.config(fake_rabbit=True) + self.config(fake_rabbit=True, group="oslo_messaging_rabbit") def test_driver_load(self): transport = messaging.get_transport(self.conf) @@ -259,8 +259,27 @@ class TestSendReceive(test_utils.BaseTestCase): raise ZeroDivisionError except Exception: failure = sys.exc_info() - msgs[i].reply(failure=failure, - log_failure=not self.expected) + + # NOTE(noelbk) confirm that Publisher exchanges + # are always declared with passive=True + outer_self = self + test_exchange_was_called = [False] + old_init = kombu.entity.Exchange.__init__ + + def new_init(self, *args, **kwargs): + test_exchange_was_called[0] = True + outer_self.assertTrue(kwargs['passive']) + old_init(self, *args, **kwargs) + kombu.entity.Exchange.__init__ = new_init + + try: + msgs[i].reply(failure=failure, + log_failure=not self.expected) + finally: + kombu.entity.Exchange.__init__ = old_init + + self.assertTrue(test_exchange_was_called[0]) + elif self.rx_id: msgs[i].reply({'rx_id': i}) else: @@ -679,7 +698,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): self.config(rabbit_hosts=self.brokers, rabbit_retry_interval=0.01, rabbit_retry_backoff=0.01, - kombu_reconnect_delay=0) + kombu_reconnect_delay=0, + group="oslo_messaging_rabbit") self.kombu_connect = mock.Mock() self.useFixture(mockpatch.Patch( diff --git a/tests/drivers/test_impl_zmq.py b/tests/drivers/test_impl_zmq.py index 30277c9..091ebe8 100644 --- a/tests/drivers/test_impl_zmq.py +++ b/tests/drivers/test_impl_zmq.py @@ -42,34 +42,12 @@ def get_unused_port(): return port -class TestConfZmqDriverLoad(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestConfZmqDriverLoad, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - - def test_driver_load(self): - transport = messaging.get_transport(self.conf) - self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver) - - -class stopRpc(object): - def __init__(self, attrs): - self.attrs = attrs - - def __call__(self): - if self.attrs['reactor']: - self.attrs['reactor'].close() - if self.attrs['driver']: - self.attrs['driver'].cleanup() - - -class TestZmqBasics(test_utils.BaseTestCase): +class ZmqBaseTestCase(test_utils.BaseTestCase): + """Base test case for all ZMQ tests that make use of the ZMQ Proxy""" @testtools.skipIf(impl_zmq is None, "zmq not available") def setUp(self): - super(TestZmqBasics, self).setUp() + super(ZmqBaseTestCase, self).setUp() self.messaging_conf.transport_driver = 'zmq' # Get driver transport = messaging.get_transport(self.conf) @@ -92,6 +70,32 @@ class TestZmqBasics(test_utils.BaseTestCase): self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') self.addCleanup(stopRpc(self.__dict__)) + +class TestConfZmqDriverLoad(test_utils.BaseTestCase): + + @testtools.skipIf(impl_zmq is None, "zmq not available") + def setUp(self): + super(TestConfZmqDriverLoad, self).setUp() + self.messaging_conf.transport_driver = 'zmq' + + def test_driver_load(self): + transport = messaging.get_transport(self.conf) + self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver) + + +class stopRpc(object): + def __init__(self, attrs): + self.attrs = attrs + + def __call__(self): + if self.attrs['reactor']: + self.attrs['reactor'].close() + if self.attrs['driver']: + self.attrs['driver'].cleanup() + + +class TestZmqBasics(ZmqBaseTestCase): + def test_start_stop_listener(self): target = messaging.Target(topic='testtopic') listener = self.driver.listen(target) @@ -275,32 +279,7 @@ class TestZmqIncomingMessage(test_utils.BaseTestCase): msg.requeue() -class TestZmqConnection(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestZmqConnection, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = messaging.get_transport(self.conf) - self.driver = transport._driver - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} - self.config(**kwargs) - - # Start RPC - LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() - - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') - self.addCleanup(stopRpc(self.__dict__)) +class TestZmqConnection(ZmqBaseTestCase): @mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True) def test_zmqconnection_create_consumer(self, mock_reactor): @@ -377,32 +356,7 @@ class TestZmqConnection(test_utils.BaseTestCase): self.assertTrue(conn.reactor.consume_in_thread.called) -class TestZmqListener(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestZmqListener, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = messaging.get_transport(self.conf) - self.driver = transport._driver - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} - self.config(**kwargs) - - # Start RPC - LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() - - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') - self.addCleanup(stopRpc(self.__dict__)) +class TestZmqListener(ZmqBaseTestCase): def test_zmqlistener_no_msg(self): listener = impl_zmq.ZmqListener(self.driver) @@ -422,32 +376,7 @@ class TestZmqListener(test_utils.BaseTestCase): self.assertEqual(resp.message, msg) -class TestZmqDriver(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestZmqDriver, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = messaging.get_transport(self.conf) - self.driver = transport._driver - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} - self.config(**kwargs) - - # Start RPC - LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() - - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') - self.addCleanup(stopRpc(self.__dict__)) +class TestZmqDriver(ZmqBaseTestCase): @mock.patch('oslo.messaging._drivers.impl_zmq._cast', autospec=True) @mock.patch('oslo.messaging._drivers.impl_zmq._multi_send', autospec=True) diff --git a/tests/drivers/test_matchmaker_redis.py b/tests/drivers/test_matchmaker_redis.py index 95bde9e..a36e14a 100644 --- a/tests/drivers/test_matchmaker_redis.py +++ b/tests/drivers/test_matchmaker_redis.py @@ -46,6 +46,7 @@ class RedisMatchMakerTest(test_utils.BaseTestCase): "network": ["controller1", "node1", "node2", "node3"], "cert": ["controller1"], "console": ["controller1"], + "l3_agent.node1": ["node1"], "consoleauth": ["controller1"]} self.matcher = matchmaker_redis.MatchMakerRedis() self.populate() @@ -70,6 +71,10 @@ class RedisMatchMakerTest(test_utils.BaseTestCase): self.assertEqual( sorted(self.matcher.redis.smembers('cert')), ['cert.controller1', 'cert.keymaster']) + self.matcher.register('l3_agent.node1', 'node1') + self.assertEqual( + sorted(self.matcher.redis.smembers('l3_agent.node1')), + ['l3_agent.node1.node1']) def test_unregister(self): self.matcher.unregister('conductor', 'controller1') diff --git a/tests/functional/test_functional.py b/tests/functional/test_functional.py deleted file mode 100644 index 5c40774..0000000 --- a/tests/functional/test_functional.py +++ /dev/null @@ -1,279 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo import messaging - -from testtools import matchers - -from tests.functional.utils import ClientStub -from tests.functional.utils import IsValidDistributionOf -from tests.functional.utils import NotificationFixture -from tests.functional.utils import RpcServerGroupFixture -from tests.functional.utils import SkipIfNoTransportURL -from tests.functional.utils import TransportFixture - - -class CallTestCase(SkipIfNoTransportURL): - def test_specific_server(self): - group = self.useFixture(RpcServerGroupFixture(self.url)) - client = group.client(1) - client.append(text='open') - self.assertEqual('openstack', client.append(text='stack')) - client.add(increment=2) - self.assertEqual(12, client.add(increment=10)) - self.assertEqual(9, client.subtract(increment=3)) - self.assertEqual('openstack', group.servers[1].endpoint.sval) - self.assertEqual(9, group.servers[1].endpoint.ival) - for i in [0, 2]: - self.assertEqual('', group.servers[i].endpoint.sval) - self.assertEqual(0, group.servers[i].endpoint.ival) - - def test_server_in_group(self): - group = self.useFixture(RpcServerGroupFixture(self.url)) - - client = group.client() - data = [c for c in 'abcdefghijklmn'] - for i in data: - client.append(text=i) - - for s in group.servers: - self.assertThat(len(s.endpoint.sval), matchers.GreaterThan(0)) - actual = [[c for c in s.endpoint.sval] for s in group.servers] - self.assertThat(actual, IsValidDistributionOf(data)) - - def test_different_exchanges(self): - t = self.useFixture(TransportFixture(self.url)) - # If the different exchanges are not honoured, then the - # teardown may hang unless we broadcast all control messages - # to each server - group1 = self.useFixture(RpcServerGroupFixture(self.url, transport=t, - use_fanout_ctrl=True)) - group2 = self.useFixture(RpcServerGroupFixture(self.url, exchange="a", - transport=t, - use_fanout_ctrl=True)) - group3 = self.useFixture(RpcServerGroupFixture(self.url, exchange="b", - transport=t, - use_fanout_ctrl=True)) - - client1 = group1.client(1) - data1 = [c for c in 'abcdefghijklmn'] - for i in data1: - client1.append(text=i) - - client2 = group2.client() - data2 = [c for c in 'opqrstuvwxyz'] - for i in data2: - client2.append(text=i) - - actual1 = [[c for c in s.endpoint.sval] for s in group1.servers] - self.assertThat(actual1, IsValidDistributionOf(data1)) - actual1 = [c for c in group1.servers[1].endpoint.sval] - self.assertThat([actual1], IsValidDistributionOf(data1)) - for s in group1.servers: - expected = len(data1) if group1.servers.index(s) == 1 else 0 - self.assertEqual(expected, len(s.endpoint.sval)) - self.assertEqual(0, s.endpoint.ival) - - actual2 = [[c for c in s.endpoint.sval] for s in group2.servers] - for s in group2.servers: - self.assertThat(len(s.endpoint.sval), matchers.GreaterThan(0)) - self.assertEqual(0, s.endpoint.ival) - self.assertThat(actual2, IsValidDistributionOf(data2)) - - for s in group3.servers: - self.assertEqual(0, len(s.endpoint.sval)) - self.assertEqual(0, s.endpoint.ival) - - def test_timeout(self): - transport = self.useFixture(TransportFixture(self.url)) - target = messaging.Target(topic="no_such_topic") - c = ClientStub(transport.transport, target, timeout=1) - self.assertThat(c.ping, matchers.raises(messaging.MessagingTimeout)) - - def test_exception(self): - group = self.useFixture(RpcServerGroupFixture(self.url)) - client = group.client(1) - client.add(increment=2) - f = lambda: client.subtract(increment=3) - self.assertThat(f, matchers.raises(ValueError)) - - -class CastTestCase(SkipIfNoTransportURL): - # Note: casts return immediately, so these tests utilise a special - # internal sync() cast to ensure prior casts are complete before - # making the necessary assertions. - - def test_specific_server(self): - group = self.useFixture(RpcServerGroupFixture(self.url)) - client = group.client(1, cast=True) - client.append(text='open') - client.append(text='stack') - client.add(increment=2) - client.add(increment=10) - group.sync() - - self.assertEqual('openstack', group.servers[1].endpoint.sval) - self.assertEqual(12, group.servers[1].endpoint.ival) - for i in [0, 2]: - self.assertEqual('', group.servers[i].endpoint.sval) - self.assertEqual(0, group.servers[i].endpoint.ival) - - def test_server_in_group(self): - group = self.useFixture(RpcServerGroupFixture(self.url)) - client = group.client(cast=True) - for i in range(20): - client.add(increment=1) - group.sync() - total = 0 - for s in group.servers: - ival = s.endpoint.ival - self.assertThat(ival, matchers.GreaterThan(0)) - self.assertThat(ival, matchers.LessThan(20)) - total += ival - self.assertEqual(20, total) - - def test_fanout(self): - group = self.useFixture(RpcServerGroupFixture(self.url)) - client = group.client('all', cast=True) - client.append(text='open') - client.append(text='stack') - client.add(increment=2) - client.add(increment=10) - group.sync(server='all') - for s in group.servers: - self.assertEqual('openstack', s.endpoint.sval) - self.assertEqual(12, s.endpoint.ival) - - -class NotifyTestCase(SkipIfNoTransportURL): - # NOTE(sileht): Each test must not use the same topics - # to be run in parallel - - def test_simple(self): - transport = self.useFixture(TransportFixture(self.url)) - listener = self.useFixture(NotificationFixture(transport.transport, - ['test_simple'])) - transport.wait() - notifier = listener.notifier('abc') - - notifier.info({}, 'test', 'Hello World!') - event = listener.events.get(timeout=1) - self.assertEqual('info', event[0]) - self.assertEqual('test', event[1]) - self.assertEqual('Hello World!', event[2]) - self.assertEqual('abc', event[3]) - - def test_multiple_topics(self): - transport = self.useFixture(TransportFixture(self.url)) - listener = self.useFixture(NotificationFixture(transport.transport, - ['a', 'b'])) - transport.wait() - a = listener.notifier('pub-a', topic='a') - b = listener.notifier('pub-b', topic='b') - - sent = { - 'pub-a': [a, 'test-a', 'payload-a'], - 'pub-b': [b, 'test-b', 'payload-b'] - } - for e in sent.values(): - e[0].info({}, e[1], e[2]) - - received = {} - while len(received) < len(sent): - e = listener.events.get(timeout=1) - received[e[3]] = e - - for key in received: - actual = received[key] - expected = sent[key] - self.assertEqual('info', actual[0]) - self.assertEqual(expected[1], actual[1]) - self.assertEqual(expected[2], actual[2]) - - def test_multiple_servers(self): - transport = self.useFixture(TransportFixture(self.url)) - listener_a = self.useFixture(NotificationFixture(transport.transport, - ['test-topic'])) - listener_b = self.useFixture(NotificationFixture(transport.transport, - ['test-topic'])) - transport.wait() - n = listener_a.notifier('pub') - - events_out = [('test-%s' % c, 'payload-%s' % c) for c in 'abcdefgh'] - - for event_type, payload in events_out: - n.info({}, event_type, payload) - - events_in = [[(e[1], e[2]) for e in listener_a.get_events()], - [(e[1], e[2]) for e in listener_b.get_events()]] - - self.assertThat(events_in, IsValidDistributionOf(events_out)) - for stream in events_in: - self.assertThat(len(stream), matchers.GreaterThan(0)) - - def test_independent_topics(self): - transport = self.useFixture(TransportFixture(self.url)) - listener_a = self.useFixture(NotificationFixture(transport.transport, - ['1'])) - listener_b = self.useFixture(NotificationFixture(transport.transport, - ['2'])) - transport.wait() - - a = listener_a.notifier('pub-1', topic='1') - b = listener_b.notifier('pub-2', topic='2') - - a_out = [('test-1-%s' % c, 'payload-1-%s' % c) for c in 'abcdefgh'] - for event_type, payload in a_out: - a.info({}, event_type, payload) - - b_out = [('test-2-%s' % c, 'payload-2-%s' % c) for c in 'ijklmnop'] - for event_type, payload in b_out: - b.info({}, event_type, payload) - - for expected in a_out: - actual = listener_a.events.get(timeout=0.5) - self.assertEqual('info', actual[0]) - self.assertEqual(expected[0], actual[1]) - self.assertEqual(expected[1], actual[2]) - self.assertEqual('pub-1', actual[3]) - - for expected in b_out: - actual = listener_b.events.get(timeout=0.5) - self.assertEqual('info', actual[0]) - self.assertEqual(expected[0], actual[1]) - self.assertEqual(expected[1], actual[2]) - self.assertEqual('pub-2', actual[3]) - - def test_all_categories(self): - transport = self.useFixture(TransportFixture(self.url)) - listener = self.useFixture(NotificationFixture( - transport.transport, ['test_all_categories'])) - transport.wait() - n = listener.notifier('abc') - - cats = ['debug', 'audit', 'info', 'warn', 'error', 'critical'] - events = [(getattr(n, c), c, 'type-' + c, c + '-data') for c in cats] - for e in events: - e[0]({}, e[2], e[3]) - - # order between events with different categories is not guaranteed - received = {} - for expected in events: - e = listener.events.get(timeout=0.5) - received[e[0]] = e - - for expected in events: - actual = received[expected[1]] - self.assertEqual(expected[1], actual[0]) - self.assertEqual(expected[2], actual[1]) - self.assertEqual(expected[3], actual[2]) diff --git a/tests/functional/utils.py b/tests/functional/utils.py deleted file mode 100644 index fc0c780..0000000 --- a/tests/functional/utils.py +++ /dev/null @@ -1,343 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import threading -import time -import uuid - -import fixtures -from six import moves - -from oslo.config import cfg -from oslo import messaging -from oslo.messaging.notify import notifier -from oslo_messaging.tests import utils as test_utils - - -class TestServerEndpoint(object): - """This MessagingServer that will be used during functional testing.""" - - def __init__(self): - self.ival = 0 - self.sval = '' - - def add(self, ctxt, increment): - self.ival += increment - return self.ival - - def subtract(self, ctxt, increment): - if self.ival < increment: - raise ValueError("ival can't go negative!") - self.ival -= increment - return self.ival - - def append(self, ctxt, text): - self.sval += text - return self.sval - - -class TransportFixture(fixtures.Fixture): - """Fixture defined to setup the oslo.messaging transport.""" - - def __init__(self, url): - self.url = url - - def setUp(self): - super(TransportFixture, self).setUp() - self.transport = messaging.get_transport(cfg.CONF, url=self.url) - - def cleanUp(self): - self.transport.cleanup() - super(TransportFixture, self).cleanUp() - - def wait(self): - if self.url.startswith("rabbit") or self.url.startswith("qpid"): - time.sleep(0.5) - - -class RpcServerFixture(fixtures.Fixture): - """Fixture to setup the TestServerEndpoint.""" - - def __init__(self, transport, target, endpoint=None, ctrl_target=None): - super(RpcServerFixture, self).__init__() - self.transport = transport - self.target = target - self.endpoint = endpoint or TestServerEndpoint() - self.syncq = moves.queue.Queue() - self.ctrl_target = ctrl_target or self.target - - def setUp(self): - super(RpcServerFixture, self).setUp() - endpoints = [self.endpoint, self] - self.server = messaging.get_rpc_server(self.transport, - self.target, - endpoints) - self._ctrl = messaging.RPCClient(self.transport, self.ctrl_target) - self._start() - - def cleanUp(self): - self._stop() - super(RpcServerFixture, self).cleanUp() - - def _start(self): - self.thread = threading.Thread(target=self.server.start) - self.thread.daemon = True - self.thread.start() - - def _stop(self): - self.server.stop() - self._ctrl.cast({}, 'ping') - self.server.wait() - self.thread.join() - - def ping(self, ctxt): - pass - - def sync(self, ctxt, item): - self.syncq.put(item) - - -class RpcServerGroupFixture(fixtures.Fixture): - def __init__(self, url, topic=None, names=None, exchange=None, - transport=None, use_fanout_ctrl=False): - self.url = url - # NOTE(sileht): topic and servier_name must be uniq - # to be able to run all tests in parallel - self.topic = topic or str(uuid.uuid4()) - self.names = names or ["server_%i_%s" % (i, uuid.uuid4()) - for i in range(3)] - self.exchange = exchange - self.targets = [self._target(server=n) for n in self.names] - self.transport = transport - self.use_fanout_ctrl = use_fanout_ctrl - - def setUp(self): - super(RpcServerGroupFixture, self).setUp() - if not self.transport: - self.transport = self.useFixture(TransportFixture(self.url)) - self.servers = [self.useFixture(self._server(t)) for t in self.targets] - self.transport.wait() - - def _target(self, server=None, fanout=False): - t = messaging.Target(exchange=self.exchange, topic=self.topic) - t.server = server - t.fanout = fanout - return t - - def _server(self, target): - ctrl = None - if self.use_fanout_ctrl: - ctrl = self._target(fanout=True) - return RpcServerFixture(self.transport.transport, target, - ctrl_target=ctrl) - - def client(self, server=None, cast=False): - if server: - if server == 'all': - target = self._target(fanout=True) - elif server >= 0 and server < len(self.targets): - target = self.targets[server] - else: - raise ValueError("Invalid value for server: %r" % server) - else: - target = self._target() - return ClientStub(self.transport.transport, target, cast=cast, - timeout=5) - - def sync(self, server=None): - if server: - if server == 'all': - c = self.client(server='all', cast=True) - c.sync(item='x') - for s in self.servers: - s.syncq.get(timeout=5) - elif server >= 0 and server < len(self.targets): - c = self.client(server=server, cast=True) - c.sync(item='x') - self.servers[server].syncq.get(timeout=5) - else: - raise ValueError("Invalid value for server: %r" % server) - else: - for i in range(len(self.servers)): - self.client(i).ping() - - -class RpcCall(object): - def __init__(self, client, method, context): - self.client = client - self.method = method - self.context = context - - def __call__(self, **kwargs): - self.context['time'] = time.ctime() - self.context['cast'] = False - result = self.client.call(self.context, self.method, **kwargs) - return result - - -class RpcCast(RpcCall): - def __call__(self, **kwargs): - self.context['time'] = time.ctime() - self.context['cast'] = True - self.client.cast(self.context, self.method, **kwargs) - - -class ClientStub(object): - def __init__(self, transport, target, cast=False, name=None, **kwargs): - self.name = name or "functional-tests" - self.cast = cast - self.client = messaging.RPCClient(transport, target, **kwargs) - - def __getattr__(self, name): - context = {"application": self.name} - if self.cast: - return RpcCast(self.client, name, context) - else: - return RpcCall(self.client, name, context) - - -class InvalidDistribution(object): - def __init__(self, original, received): - self.original = original - self.received = received - self.missing = [] - self.extra = [] - self.wrong_order = [] - - def describe(self): - text = "Sent %s, got %s; " % (self.original, self.received) - e1 = ["%r was missing" % m for m in self.missing] - e2 = ["%r was not expected" % m for m in self.extra] - e3 = ["%r expected before %r" % (m[0], m[1]) for m in self.wrong_order] - return text + ", ".join(e1 + e2 + e3) - - def __len__(self): - return len(self.extra) + len(self.missing) + len(self.wrong_order) - - def get_details(self): - return {} - - -class IsValidDistributionOf(object): - """Test whether a given list can be split into particular - sub-lists. All items in the original list must be in exactly one - sub-list, and must appear in that sub-list in the same order with - respect to any other items as in the original list. - """ - def __init__(self, original): - self.original = original - - def __str__(self): - return 'IsValidDistribution(%s)' % self.original - - def match(self, actual): - errors = InvalidDistribution(self.original, actual) - received = [[i for i in l] for l in actual] - - def _remove(obj, lists): - for l in lists: - if obj in l: - front = l[0] - l.remove(obj) - return front - return None - - for item in self.original: - o = _remove(item, received) - if not o: - errors.missing += item - elif item != o: - errors.wrong_order.append([item, o]) - for l in received: - errors.extra += l - return errors or None - - -class SkipIfNoTransportURL(test_utils.BaseTestCase): - def setUp(self): - super(SkipIfNoTransportURL, self).setUp() - self.url = os.environ.get('TRANSPORT_URL') - if not self.url: - self.skipTest("No transport url configured") - - -class NotificationFixture(fixtures.Fixture): - def __init__(self, transport, topics): - super(NotificationFixture, self).__init__() - self.transport = transport - self.topics = topics - self.events = moves.queue.Queue() - self.name = str(id(self)) - - def setUp(self): - super(NotificationFixture, self).setUp() - targets = [messaging.Target(topic=t) for t in self.topics] - # add a special topic for internal notifications - targets.append(messaging.Target(topic=self.name)) - self.server = messaging.get_notification_listener(self.transport, - targets, - [self]) - self._ctrl = self.notifier('internal', topic=self.name) - self._start() - - def cleanUp(self): - self._stop() - super(NotificationFixture, self).cleanUp() - - def _start(self): - self.thread = threading.Thread(target=self.server.start) - self.thread.daemon = True - self.thread.start() - - def _stop(self): - self.server.stop() - self._ctrl.sample({}, 'shutdown', 'shutdown') - self.server.wait() - self.thread.join() - - def notifier(self, publisher, topic=None): - return notifier.Notifier(self.transport, - publisher, - driver='messaging', - topic=topic or self.topics[0]) - - def debug(self, ctxt, publisher, event_type, payload, metadata): - self.events.put(['debug', event_type, payload, publisher]) - - def audit(self, ctxt, publisher, event_type, payload, metadata): - self.events.put(['audit', event_type, payload, publisher]) - - def info(self, ctxt, publisher, event_type, payload, metadata): - self.events.put(['info', event_type, payload, publisher]) - - def warn(self, ctxt, publisher, event_type, payload, metadata): - self.events.put(['warn', event_type, payload, publisher]) - - def error(self, ctxt, publisher, event_type, payload, metadata): - self.events.put(['error', event_type, payload, publisher]) - - def critical(self, ctxt, publisher, event_type, payload, metadata): - self.events.put(['critical', event_type, payload, publisher]) - - def sample(self, ctxt, publisher, event_type, payload, metadata): - pass # Just used for internal shutdown control - - def get_events(self, timeout=0.5): - results = [] - try: - while True: - results.append(self.events.get(timeout=timeout)) - except moves.queue.Empty: - pass - return results diff --git a/tests/notify/test_dispatcher.py b/tests/notify/test_dispatcher.py index f2e32ac..adb8610 100644 --- a/tests/notify/test_dispatcher.py +++ b/tests/notify/test_dispatcher.py @@ -16,11 +16,11 @@ import itertools import mock +from oslo_utils import timeutils import testscenarios from oslo import messaging from oslo.messaging.notify import dispatcher as notify_dispatcher -from oslo.utils import timeutils from oslo_messaging.tests import utils as test_utils load_tests = testscenarios.load_tests_apply_scenarios diff --git a/tests/notify/test_logger.py b/tests/notify/test_logger.py index 1f770bd..6d1757a 100644 --- a/tests/notify/test_logger.py +++ b/tests/notify/test_logger.py @@ -19,11 +19,11 @@ import os import sys import mock +from oslo_utils import timeutils import testscenarios import testtools from oslo import messaging -from oslo.utils import timeutils import oslo_messaging from oslo_messaging.tests.notify import test_notifier from oslo_messaging.tests import utils as test_utils @@ -57,7 +57,7 @@ class TestLogNotifier(test_utils.BaseTestCase): # eventlet logging.logThreads = 0 - @mock.patch('oslo.utils.timeutils.utcnow') + @mock.patch('oslo_utils.timeutils.utcnow') def test_logger(self, mock_utcnow): with mock.patch('oslo_messaging.transport.get_transport', return_value=test_notifier._FakeTransport(self.conf)): @@ -100,7 +100,7 @@ class TestLogNotifier(test_utils.BaseTestCase): @testtools.skipUnless(hasattr(logging.config, 'dictConfig'), "Need logging.config.dictConfig (Python >= 2.7)") - @mock.patch('oslo.utils.timeutils.utcnow') + @mock.patch('oslo_utils.timeutils.utcnow') def test_logging_conf(self, mock_utcnow): with mock.patch('oslo_messaging.transport.get_transport', return_value=test_notifier._FakeTransport(self.conf)): diff --git a/tests/notify/test_notifier.py b/tests/notify/test_notifier.py index e7306a7..e326ad8 100644 --- a/tests/notify/test_notifier.py +++ b/tests/notify/test_notifier.py @@ -20,6 +20,8 @@ import uuid import fixtures import mock +from oslo_serialization import jsonutils +from oslo_utils import timeutils from stevedore import dispatch from stevedore import extension import testscenarios @@ -27,8 +29,6 @@ import yaml from oslo import messaging from oslo.messaging import serializer as msg_serializer -from oslo.serialization import jsonutils -from oslo.utils import timeutils from oslo_messaging.notify import _impl_log from oslo_messaging.notify import _impl_messaging from oslo_messaging.notify import _impl_test @@ -147,7 +147,7 @@ class TestMessagingNotifier(test_utils.BaseTestCase): self.stubs.Set(_impl_messaging, 'LOG', self.logger) self.stubs.Set(msg_notifier, '_LOG', self.logger) - @mock.patch('oslo.utils.timeutils.utcnow') + @mock.patch('oslo_utils.timeutils.utcnow') def test_notifier(self, mock_utcnow): drivers = [] if self.v1: @@ -223,7 +223,7 @@ class TestSerializer(test_utils.BaseTestCase): super(TestSerializer, self).setUp() self.addCleanup(_impl_test.reset) - @mock.patch('oslo.utils.timeutils.utcnow') + @mock.patch('oslo_utils.timeutils.utcnow') def test_serializer(self, mock_utcnow): transport = _FakeTransport(self.conf) @@ -266,7 +266,7 @@ class TestSerializer(test_utils.BaseTestCase): class TestLogNotifier(test_utils.BaseTestCase): - @mock.patch('oslo.utils.timeutils.utcnow') + @mock.patch('oslo_utils.timeutils.utcnow') def test_notifier(self, mock_utcnow): self.config(notification_driver=['log']) @@ -32,20 +32,25 @@ deps = -r{toxinidir}/requirements-py3.txt -r{toxinidir}/test-requirements-py3.txt [testenv:py27-func-qpid] -setenv = TRANSPORT_URL=qpid://guest:password@localhost// -commands = python setup.py testr --slowest --testr-args='tests.functional' +setenv = TRANSPORT_URL=qpid://stackqpid:secretqpid@127.0.0.1// +commands = python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [testenv:py27-func-rabbit] -setenv = TRANSPORT_URL=rabbit://guest:password@localhost// -commands = python setup.py testr --slowest --testr-args='tests.functional' +setenv = TRANSPORT_URL=rabbit://stackrabbit:secretrabbit@127.0.0.1// +commands = python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [testenv:py27-func-amqp1] -setenv = TRANSPORT_URL=amqp://guest:password@localhost// +setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1// deps = -r{toxinidir}/amqp1-requirements.txt {[testenv]deps} # NOTE(sileht): until ubuntu get proto packages, we run amqp_driver tests here # because this is the only target to run fedora 20 in gate -commands = python setup.py testr --slowest --testr-args='tests.(functional|test_amqp_driver)' +commands = python setup.py testr --slowest --testr-args='oslo_messaging.tests.(functional|test_amqp_driver)' + +[testenv:py27-func-zeromq] +setenv = TRANSPORT_URL=zmq:// + ZMQ_MATCHMAKER=oslo_messaging._drivers.matchmaker_redis.MatchMakerRedis +commands = python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [flake8] show-source = True |