summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.rst5
-rw-r--r--doc/source/notification_listener.rst2
-rw-r--r--doc/source/static/.placeholder (renamed from tests/functional/__init__.py)0
-rw-r--r--oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po18
-rw-r--r--oslo/messaging/notify/__init__.py1
-rw-r--r--oslo/messaging/notify/_impl_test.py13
-rw-r--r--oslo_messaging/_cmd/zmq_receiver.py3
-rw-r--r--oslo_messaging/_drivers/amqp.py10
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py160
-rw-r--r--oslo_messaging/_drivers/common.py7
-rw-r--r--oslo_messaging/_drivers/impl_qpid.py53
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py164
-rw-r--r--oslo_messaging/_drivers/impl_zmq.py32
-rw-r--r--oslo_messaging/_drivers/matchmaker.py2
-rw-r--r--oslo_messaging/_drivers/matchmaker_redis.py7
-rw-r--r--oslo_messaging/_drivers/matchmaker_ring.py3
-rw-r--r--oslo_messaging/_drivers/protocols/amqp/controller.py2
-rw-r--r--oslo_messaging/_drivers/protocols/amqp/driver.py2
-rw-r--r--oslo_messaging/_drivers/protocols/amqp/opts.py2
-rw-r--r--oslo_messaging/_executors/base.py3
-rw-r--r--oslo_messaging/_executors/impl_eventlet.py2
-rw-r--r--oslo_messaging/_executors/impl_thread.py2
-rw-r--r--oslo_messaging/_i18n.py4
-rw-r--r--oslo_messaging/conffixture.py16
-rw-r--r--oslo_messaging/notify/__init__.py2
-rw-r--r--oslo_messaging/notify/_impl_log.py3
-rw-r--r--oslo_messaging/notify/_impl_routing.py2
-rw-r--r--oslo_messaging/notify/dispatcher.py9
-rw-r--r--oslo_messaging/notify/filter.py77
-rw-r--r--oslo_messaging/notify/listener.py10
-rw-r--r--oslo_messaging/notify/log_handler.py2
-rw-r--r--oslo_messaging/notify/logger.py3
-rw-r--r--oslo_messaging/notify/middleware.py4
-rw-r--r--oslo_messaging/notify/notifier.py4
-rw-r--r--oslo_messaging/opts.py7
-rw-r--r--oslo_messaging/rpc/client.py2
-rw-r--r--oslo_messaging/rpc/server.py2
-rw-r--r--oslo_messaging/tests/drivers/test_impl_qpid.py38
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py9
-rw-r--r--oslo_messaging/tests/drivers/test_impl_zmq.py144
-rw-r--r--oslo_messaging/tests/drivers/test_matchmaker.py2
-rw-r--r--oslo_messaging/tests/drivers/test_matchmaker_redis.py2
-rw-r--r--oslo_messaging/tests/drivers/test_matchmaker_ring.py2
-rwxr-xr-xoslo_messaging/tests/functional/gate/gate_hook.sh52
-rwxr-xr-xoslo_messaging/tests/functional/gate/post_test_hook.sh20
-rw-r--r--oslo_messaging/tests/functional/utils.py5
-rw-r--r--oslo_messaging/tests/notify/test_dispatcher.py106
-rw-r--r--oslo_messaging/tests/notify/test_listener.py2
-rw-r--r--oslo_messaging/tests/notify/test_logger.py6
-rw-r--r--oslo_messaging/tests/notify/test_notifier.py10
-rw-r--r--oslo_messaging/tests/rpc/test_client.py2
-rw-r--r--oslo_messaging/tests/rpc/test_server.py2
-rw-r--r--oslo_messaging/tests/test_amqp_driver.py2
-rw-r--r--oslo_messaging/tests/test_exception_serialization.py2
-rw-r--r--oslo_messaging/tests/test_opts.py4
-rw-r--r--oslo_messaging/tests/test_transport.py2
-rw-r--r--oslo_messaging/tests/test_utils.py26
-rw-r--r--oslo_messaging/tests/utils.py6
-rw-r--r--oslo_messaging/transport.py2
-rw-r--r--requirements-py3.txt2
-rw-r--r--requirements.txt4
-rw-r--r--setup.cfg4
-rw-r--r--tests/drivers/test_impl_qpid.py38
-rw-r--r--tests/drivers/test_impl_rabbit.py28
-rw-r--r--tests/drivers/test_impl_zmq.py135
-rw-r--r--tests/drivers/test_matchmaker_redis.py5
-rw-r--r--tests/functional/test_functional.py279
-rw-r--r--tests/functional/utils.py343
-rw-r--r--tests/notify/test_dispatcher.py2
-rw-r--r--tests/notify/test_logger.py6
-rw-r--r--tests/notify/test_notifier.py10
-rw-r--r--tox.ini17
72 files changed, 771 insertions, 1188 deletions
diff --git a/README.rst b/README.rst
index 96342fa..625651b 100644
--- a/README.rst
+++ b/README.rst
@@ -4,4 +4,7 @@ Oslo Messaging Library
The Oslo messaging API supports RPC and notifications over a number of
different messaging transports.
-See also: `Library Documentation <http://docs.openstack.org/developer/oslo.messaging>`_
+* License: Apache License, Version 2.0
+* Documentation: http://docs.openstack.org/developer/oslo.messaging
+* Source: http://git.openstack.org/cgit/openstack/oslo.messaging
+* Bugs: http://bugs.launchpad.net/oslo.messaging
diff --git a/doc/source/notification_listener.rst b/doc/source/notification_listener.rst
index 0f555e5..3a69070 100644
--- a/doc/source/notification_listener.rst
+++ b/doc/source/notification_listener.rst
@@ -10,5 +10,7 @@ Notification Listener
.. autoclass:: MessageHandlingServer
:members:
+ :noindex:
.. autofunction:: get_local_context
+ :noindex:
diff --git a/tests/functional/__init__.py b/doc/source/static/.placeholder
index e69de29..e69de29 100644
--- a/tests/functional/__init__.py
+++ b/doc/source/static/.placeholder
diff --git a/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po b/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po
index 90ba808..83e9124 100644
--- a/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po
+++ b/oslo.messaging/locale/fr/LC_MESSAGES/oslo.messaging.po
@@ -1,5 +1,5 @@
# French translations for oslo.messaging.
-# Copyright (C) 2014 ORGANIZATION
+# Copyright (C) 2015 ORGANIZATION
# This file is distributed under the same license as the oslo.messaging
# project.
#
@@ -10,9 +10,9 @@ msgid ""
msgstr ""
"Project-Id-Version: oslo.messaging\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
-"POT-Creation-Date: 2014-12-03 06:10+0000\n"
-"PO-Revision-Date: 2014-12-01 13:11+0000\n"
-"Last-Translator: openstackjenkins <jenkins@openstack.org>\n"
+"POT-Creation-Date: 2015-02-02 06:12+0000\n"
+"PO-Revision-Date: 2014-12-20 02:07+0000\n"
+"Last-Translator: Maxime COQUEREL <max.coquerel@gmail.com>\n"
"Language-Team: French "
"(http://www.transifex.com/projects/p/oslomessaging/language/fr/)\n"
"Plural-Forms: nplurals=2; plural=(n > 1)\n"
@@ -21,21 +21,21 @@ msgstr ""
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 1.3\n"
-#: oslo/messaging/_executors/impl_blocking.py:50
+#: oslo_messaging/_executors/impl_blocking.py:50
msgid "Unexpected exception occurred."
-msgstr ""
+msgstr "Exception inattendue s'est produite."
-#: oslo/messaging/notify/_impl_routing.py:80
+#: oslo_messaging/notify/_impl_routing.py:80
#, python-format
msgid "Failed to load any notifiers for %s"
msgstr "Echec de chargement des notifications pour %s"
-#: oslo/messaging/notify/_impl_routing.py:117
+#: oslo_messaging/notify/_impl_routing.py:117
#, python-format
msgid "Routing '%(event)s' notification to '%(driver)s' driver"
msgstr "Routage '%(event)s' notification du pilote %(driver)s'"
-#: oslo/messaging/rpc/dispatcher.py:145
+#: oslo_messaging/rpc/dispatcher.py:150
#, python-format
msgid "Exception during message handling: %s"
msgstr "Exception lors de la manipulation du message: %s"
diff --git a/oslo/messaging/notify/__init__.py b/oslo/messaging/notify/__init__.py
index c5032db..9de8331 100644
--- a/oslo/messaging/notify/__init__.py
+++ b/oslo/messaging/notify/__init__.py
@@ -25,3 +25,4 @@ from .listener import *
from .log_handler import *
from .logger import *
from .dispatcher import NotificationResult
+from oslo_messaging.notify import _impl_test
diff --git a/oslo/messaging/notify/_impl_test.py b/oslo/messaging/notify/_impl_test.py
deleted file mode 100644
index 4e3caf7..0000000
--- a/oslo/messaging/notify/_impl_test.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging.notify._impl_test import * # noqa
diff --git a/oslo_messaging/_cmd/zmq_receiver.py b/oslo_messaging/_cmd/zmq_receiver.py
index 6346d6a..cbcdfe8 100644
--- a/oslo_messaging/_cmd/zmq_receiver.py
+++ b/oslo_messaging/_cmd/zmq_receiver.py
@@ -21,7 +21,8 @@ import contextlib
import logging
import sys
-from oslo.config import cfg
+from oslo_config import cfg
+
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._executors import base # FIXME(markmc)
diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py
index 0648b1e..ebae514 100644
--- a/oslo_messaging/_drivers/amqp.py
+++ b/oslo_messaging/_drivers/amqp.py
@@ -27,10 +27,10 @@ import collections
import logging
import uuid
+from oslo_config import cfg
+from oslo_utils import strutils
import six
-from oslo.config import cfg
-from oslo.utils import strutils
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers import pool
@@ -42,11 +42,13 @@ amqp_opts = [
help='Use durable queues in AMQP.'),
cfg.BoolOpt('amqp_auto_delete',
default=False,
+ deprecated_group='DEFAULT',
help='Auto-delete queues in AMQP.'),
# FIXME(markmc): this was toplevel in openstack.common.rpc
cfg.IntOpt('rpc_conn_pool_size',
default=30,
+ deprecated_group='DEFAULT',
help='Size of RPC connection pool.'),
]
@@ -56,11 +58,11 @@ LOG = logging.getLogger(__name__)
class ConnectionPool(pool.Pool):
"""Class that implements a Pool of Connections."""
- def __init__(self, conf, url, connection_cls):
+ def __init__(self, conf, rpc_conn_pool_size, url, connection_cls):
self.connection_cls = connection_cls
self.conf = conf
self.url = url
- super(ConnectionPool, self).__init__(self.conf.rpc_conn_pool_size)
+ super(ConnectionPool, self).__init__(rpc_conn_pool_size)
self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index c222bce..9e967c4 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -147,12 +147,6 @@ class ReplyWaiters(object):
'Timed out waiting for a reply '
'to message ID %s' % msg_id)
- def check(self, msg_id):
- try:
- return self._queues[msg_id].get(block=False)
- except moves.queue.Empty:
- return None
-
def put(self, msg_id, message_data):
queue = self._queues.get(msg_id)
if not queue:
@@ -162,13 +156,8 @@ class ReplyWaiters(object):
else:
queue.put(message_data)
- def wake_all(self, except_id):
- msg_ids = [i for i in self._queues.keys() if i != except_id]
- for msg_id in msg_ids:
- self.put(msg_id, self.WAKE_UP)
-
- def add(self, msg_id, queue):
- self._queues[msg_id] = queue
+ def add(self, msg_id):
+ self._queues[msg_id] = moves.queue.Queue()
if len(self._queues) > self._wrn_threshold:
LOG.warn('Number of call queues is greater than warning '
'threshold: %d. There could be a leak. Increasing'
@@ -181,27 +170,41 @@ class ReplyWaiters(object):
class ReplyWaiter(object):
-
- def __init__(self, conf, reply_q, conn, allowed_remote_exmods):
- self.conf = conf
+ def __init__(self, reply_q, conn, allowed_remote_exmods):
self.conn = conn
- self.reply_q = reply_q
self.allowed_remote_exmods = allowed_remote_exmods
-
- self.conn_lock = threading.Lock()
- self.incoming = []
self.msg_id_cache = rpc_amqp._MsgIdCache()
self.waiters = ReplyWaiters()
- conn.declare_direct_consumer(reply_q, self)
+ self.conn.declare_direct_consumer(reply_q, self)
+
+ self._thread_exit_event = threading.Event()
+ self._thread = threading.Thread(target=self.poll)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def stop(self):
+ if self._thread:
+ self._thread_exit_event.set()
+ self.conn.stop_consuming()
+ self._thread.join()
+ self._thread = None
+
+ def poll(self):
+ while not self._thread_exit_event.is_set():
+ try:
+ self.conn.consume(limit=1)
+ except Exception:
+ LOG.exception("Failed to process incoming message, "
+ "retrying...")
def __call__(self, message):
message.acknowledge()
- self.incoming.append(message)
+ incoming_msg_id = message.pop('_msg_id', None)
+ self.waiters.put(incoming_msg_id, message)
def listen(self, msg_id):
- queue = moves.queue.Queue()
- self.waiters.add(msg_id, queue)
+ self.waiters.add(msg_id)
def unlisten(self, msg_id):
self.waiters.remove(msg_id)
@@ -225,96 +228,25 @@ class ReplyWaiter(object):
result = data['result']
return result, ending
- def _poll_connection(self, msg_id, timer):
- while True:
- while self.incoming:
- message_data = self.incoming.pop(0)
-
- incoming_msg_id = message_data.pop('_msg_id', None)
- if incoming_msg_id == msg_id:
- return self._process_reply(message_data)
-
- self.waiters.put(incoming_msg_id, message_data)
-
+ def wait(self, msg_id, timeout):
+ # NOTE(sileht): for each msg_id we receive two amqp message
+ # first one with the payload, a second one to ensure the other
+ # have finish to send the payload
+ timer = rpc_common.DecayingTimer(duration=timeout)
+ timer.start()
+ final_reply = None
+ ending = False
+ while not ending:
timeout = timer.check_return(self._raise_timeout_exception, msg_id)
try:
- self.conn.consume(limit=1, timeout=timeout)
- except rpc_common.Timeout:
+ message = self.waiters.get(msg_id, timeout=timeout)
+ except moves.queue.Empty:
self._raise_timeout_exception(msg_id)
- def _poll_queue(self, msg_id, timer):
- timeout = timer.check_return(self._raise_timeout_exception, msg_id)
- message = self.waiters.get(msg_id, timeout=timeout)
- if message is self.waiters.WAKE_UP:
- return None, None, True # lock was released
-
- reply, ending = self._process_reply(message)
- return reply, ending, False
-
- def _check_queue(self, msg_id):
- while True:
- message = self.waiters.check(msg_id)
- if message is self.waiters.WAKE_UP:
- continue
- if message is None:
- return None, None, True # queue is empty
-
reply, ending = self._process_reply(message)
- return reply, ending, False
-
- def wait(self, msg_id, timeout):
- #
- # NOTE(markmc): we're waiting for a reply for msg_id to come in for on
- # the reply_q, but there may be other threads also waiting for replies
- # to other msg_ids
- #
- # Only one thread can be consuming from the queue using this connection
- # and we don't want to hold open a connection per thread, so instead we
- # have the first thread take responsibility for passing replies not
- # intended for itself to the appropriate thread.
- #
- timer = rpc_common.DecayingTimer(duration=timeout)
- timer.start()
- final_reply = None
- while True:
- if self.conn_lock.acquire(False):
- # Ok, we're the thread responsible for polling the connection
- try:
- # Check the queue to see if a previous lock-holding thread
- # queued up a reply already
- while True:
- reply, ending, empty = self._check_queue(msg_id)
- if empty:
- break
- if not ending:
- final_reply = reply
- else:
- return final_reply
-
- # Now actually poll the connection
- while True:
- reply, ending = self._poll_connection(msg_id, timer)
- if not ending:
- final_reply = reply
- else:
- return final_reply
- finally:
- self.conn_lock.release()
- # We've got our reply, tell the other threads to wake up
- # so that one of them will take over the responsibility for
- # polling the connection
- self.waiters.wake_all(msg_id)
- else:
- # We're going to wait for the first thread to pass us our reply
- reply, ending, trylock = self._poll_queue(msg_id, timer)
- if trylock:
- # The first thread got its reply, let's try and take over
- # the responsibility for polling
- continue
- if not ending:
- final_reply = reply
- else:
- return final_reply
+ if not ending:
+ final_reply = reply
+ return final_reply
class AMQPDriverBase(base.BaseDriver):
@@ -349,7 +281,7 @@ class AMQPDriverBase(base.BaseDriver):
conn = self._get_connection(pooled=False)
- self._waiter = ReplyWaiter(self.conf, reply_q, conn,
+ self._waiter = ReplyWaiter(reply_q, conn,
self._allowed_remote_exmods)
self._reply_q = reply_q
@@ -451,3 +383,11 @@ class AMQPDriverBase(base.BaseDriver):
if self._connection_pool:
self._connection_pool.empty()
self._connection_pool = None
+
+ with self._reply_q_lock:
+ if self._reply_q is not None:
+ self._waiter.stop()
+ self._reply_q_conn.close()
+ self._reply_q_conn = None
+ self._reply_q = None
+ self._waiter = None
diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py
index 3b0247f..b2d97b3 100644
--- a/oslo_messaging/_drivers/common.py
+++ b/oslo_messaging/_drivers/common.py
@@ -21,9 +21,9 @@ import sys
import time
import traceback
+from oslo_serialization import jsonutils
import six
-from oslo.serialization import jsonutils
import oslo_messaging
from oslo_messaging._i18n import _
from oslo_messaging import _utils as utils
@@ -341,8 +341,9 @@ class DecayingTimer(object):
if self._duration is not None:
self._ends_at = time.time() + max(0, self._duration)
- def check_return(self, timeout_callback, *args, **kwargs):
+ def check_return(self, timeout_callback=None, *args, **kwargs):
maximum = kwargs.pop('maximum', None)
+
if self._duration is None:
return None if maximum is None else maximum
if self._ends_at is None:
@@ -350,7 +351,7 @@ class DecayingTimer(object):
" that has not been started."))
left = self._ends_at - time.time()
- if left <= 0:
+ if left <= 0 and timeout_callback is not None:
timeout_callback(*args, **kwargs)
return left if maximum is None else min(left, maximum)
diff --git a/oslo_messaging/_drivers/impl_qpid.py b/oslo_messaging/_drivers/impl_qpid.py
index 7bd5272..4640230 100644
--- a/oslo_messaging/_drivers/impl_qpid.py
+++ b/oslo_messaging/_drivers/impl_qpid.py
@@ -20,12 +20,12 @@ import os
import random
import time
+from oslo_config import cfg
+from oslo_serialization import jsonutils
+from oslo_utils import importutils
+from oslo_utils import netutils
import six
-from oslo.config import cfg
-from oslo.serialization import jsonutils
-from oslo.utils import importutils
-from oslo.utils import netutils
from oslo_messaging._drivers import amqp as rpc_amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as rpc_common
@@ -41,41 +41,52 @@ LOG = logging.getLogger(__name__)
qpid_opts = [
cfg.StrOpt('qpid_hostname',
default='localhost',
+ deprecated_group='DEFAULT',
help='Qpid broker hostname.'),
cfg.IntOpt('qpid_port',
default=5672,
+ deprecated_group='DEFAULT',
help='Qpid broker port.'),
cfg.ListOpt('qpid_hosts',
default=['$qpid_hostname:$qpid_port'],
+ deprecated_group='DEFAULT',
help='Qpid HA cluster host:port pairs.'),
cfg.StrOpt('qpid_username',
default='',
+ deprecated_group='DEFAULT',
help='Username for Qpid connection.'),
cfg.StrOpt('qpid_password',
default='',
+ deprecated_group='DEFAULT',
help='Password for Qpid connection.',
secret=True),
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
+ deprecated_group='DEFAULT',
help='Space separated list of SASL mechanisms to use for '
'auth.'),
cfg.IntOpt('qpid_heartbeat',
default=60,
+ deprecated_group='DEFAULT',
help='Seconds between connection keepalive heartbeats.'),
cfg.StrOpt('qpid_protocol',
default='tcp',
+ deprecated_group='DEFAULT',
help="Transport to use, either 'tcp' or 'ssl'."),
cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
+ deprecated_group='DEFAULT',
help='Whether to disable the Nagle algorithm.'),
cfg.IntOpt('qpid_receiver_capacity',
default=1,
+ deprecated_group='DEFAULT',
help='The number of prefetched messages held by receiver.'),
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
# this file could probably use some additional refactoring so that the
# differences between each version are split into different classes.
cfg.IntOpt('qpid_topology_version',
default=1,
+ deprecated_group='DEFAULT',
help="The qpid topology version to use. Version 1 is what "
"was originally used by impl_qpid. Version 2 includes "
"some backwards-incompatible changes that allow broker "
@@ -459,6 +470,7 @@ class Connection(object):
self.session = None
self.consumers = {}
self.conf = conf
+ self.driver_conf = conf.oslo_messaging_qpid
self._consume_loop_stopped = False
@@ -476,7 +488,7 @@ class Connection(object):
self.brokers_params.append(params)
else:
# Old configuration format
- for adr in self.conf.qpid_hosts:
+ for adr in self.driver_conf.qpid_hosts:
hostname, port = netutils.parse_host_port(
adr, default_port=5672)
@@ -485,8 +497,8 @@ class Connection(object):
params = {
'host': '%s:%d' % (hostname, port),
- 'username': self.conf.qpid_username,
- 'password': self.conf.qpid_password,
+ 'username': self.driver_conf.qpid_username,
+ 'password': self.driver_conf.qpid_password,
}
self.brokers_params.append(params)
@@ -505,12 +517,12 @@ class Connection(object):
self.connection.username = broker['username']
self.connection.password = broker['password']
- self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
+ self.connection.sasl_mechanisms = self.driver_conf.qpid_sasl_mechanisms
# Reconnection is done by self.reconnect()
self.connection.reconnect = False
- self.connection.heartbeat = self.conf.qpid_heartbeat
- self.connection.transport = self.conf.qpid_protocol
- self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
+ self.connection.heartbeat = self.driver_conf.qpid_heartbeat
+ self.connection.transport = self.driver_conf.qpid_protocol
+ self.connection.tcp_nodelay = self.driver_conf.qpid_tcp_nodelay
self.connection.open()
def _register_consumer(self, consumer):
@@ -633,7 +645,8 @@ class Connection(object):
"%(err_str)s"), log_info)
def _declare_consumer():
- consumer = consumer_cls(self.conf, self.session, topic, callback)
+ consumer = consumer_cls(self.driver_conf, self.session, topic,
+ callback)
self._register_consumer(consumer)
return consumer
@@ -693,7 +706,8 @@ class Connection(object):
"'%(topic)s': %(err_str)s"), log_info)
def _publisher_send():
- publisher = cls(self.conf, self.session, topic=topic, **kwargs)
+ publisher = cls(self.driver_conf, self.session, topic=topic,
+ **kwargs)
publisher.send(msg)
return self.ensure(_connect_error, _publisher_send, retry=retry)
@@ -764,10 +778,15 @@ class QpidDriver(amqpdriver.AMQPDriverBase):
def __init__(self, conf, url,
default_exchange=None, allowed_remote_exmods=None):
- conf.register_opts(qpid_opts)
- conf.register_opts(rpc_amqp.amqp_opts)
-
- connection_pool = rpc_amqp.ConnectionPool(conf, url, Connection)
+ opt_group = cfg.OptGroup(name='oslo_messaging_qpid',
+ title='QPID driver options')
+ conf.register_group(opt_group)
+ conf.register_opts(qpid_opts, group=opt_group)
+ conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
+
+ connection_pool = rpc_amqp.ConnectionPool(
+ conf, conf.oslo_messaging_qpid.rpc_conn_pool_size,
+ url, Connection)
super(QpidDriver, self).__init__(conf, url,
connection_pool,
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 0347f3d..86fed1f 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -26,11 +26,11 @@ import kombu.connection
import kombu.entity
import kombu.exceptions
import kombu.messaging
+from oslo_config import cfg
+from oslo_utils import netutils
import six
from six.moves.urllib import parse
-from oslo.config import cfg
-from oslo.utils import netutils
from oslo_messaging._drivers import amqp as rpc_amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as rpc_common
@@ -43,6 +43,7 @@ from oslo_messaging import exceptions
rabbit_opts = [
cfg.StrOpt('kombu_ssl_version',
default='',
+ deprecated_group='DEFAULT',
help='SSL version to use (valid only if SSL enabled). '
'Valid values are TLSv1 and SSLv23. SSLv2, SSLv3, '
'TLSv1_1, and TLSv1_2 may be available on some '
@@ -50,57 +51,72 @@ rabbit_opts = [
),
cfg.StrOpt('kombu_ssl_keyfile',
default='',
+ deprecated_group='DEFAULT',
help='SSL key file (valid only if SSL enabled).'),
cfg.StrOpt('kombu_ssl_certfile',
default='',
+ deprecated_group='DEFAULT',
help='SSL cert file (valid only if SSL enabled).'),
cfg.StrOpt('kombu_ssl_ca_certs',
default='',
+ deprecated_group='DEFAULT',
help='SSL certification authority file '
'(valid only if SSL enabled).'),
cfg.FloatOpt('kombu_reconnect_delay',
default=1.0,
+ deprecated_group='DEFAULT',
help='How long to wait before reconnecting in response to an '
'AMQP consumer cancel notification.'),
cfg.StrOpt('rabbit_host',
default='localhost',
+ deprecated_group='DEFAULT',
help='The RabbitMQ broker address where a single node is '
'used.'),
cfg.IntOpt('rabbit_port',
default=5672,
+ deprecated_group='DEFAULT',
help='The RabbitMQ broker port where a single node is used.'),
cfg.ListOpt('rabbit_hosts',
default=['$rabbit_host:$rabbit_port'],
+ deprecated_group='DEFAULT',
help='RabbitMQ HA cluster host:port pairs.'),
cfg.BoolOpt('rabbit_use_ssl',
default=False,
+ deprecated_group='DEFAULT',
help='Connect over SSL for RabbitMQ.'),
cfg.StrOpt('rabbit_userid',
default='guest',
+ deprecated_group='DEFAULT',
help='The RabbitMQ userid.'),
cfg.StrOpt('rabbit_password',
default='guest',
+ deprecated_group='DEFAULT',
help='The RabbitMQ password.',
secret=True),
cfg.StrOpt('rabbit_login_method',
default='AMQPLAIN',
+ deprecated_group='DEFAULT',
help='The RabbitMQ login method.'),
cfg.StrOpt('rabbit_virtual_host',
default='/',
+ deprecated_group='DEFAULT',
help='The RabbitMQ virtual host.'),
cfg.IntOpt('rabbit_retry_interval',
default=1,
help='How frequently to retry connecting with RabbitMQ.'),
cfg.IntOpt('rabbit_retry_backoff',
default=2,
+ deprecated_group='DEFAULT',
help='How long to backoff for between retries when connecting '
'to RabbitMQ.'),
cfg.IntOpt('rabbit_max_retries',
default=0,
+ deprecated_group='DEFAULT',
help='Maximum number of RabbitMQ connection retries. '
'Default is 0 (infinite retry count).'),
cfg.BoolOpt('rabbit_ha_queues',
default=False,
+ deprecated_group='DEFAULT',
help='Use HA queues in RabbitMQ (x-ha-policy: all). '
'If you change this option, you must wipe the '
'RabbitMQ database.'),
@@ -108,6 +124,7 @@ rabbit_opts = [
# NOTE(sileht): deprecated option since oslo_messaging 1.5.0,
cfg.BoolOpt('fake_rabbit',
default=False,
+ deprecated_group='DEFAULT',
help='Deprecated, use rpc_backend=kombu+memory or '
'rpc_backend=fake'),
]
@@ -376,7 +393,8 @@ class DirectPublisher(Publisher):
options = {'durable': False,
'auto_delete': True,
- 'exclusive': False}
+ 'exclusive': False,
+ 'passive': True}
options.update(kwargs)
super(DirectPublisher, self).__init__(channel, topic, topic,
type='direct', **options)
@@ -392,6 +410,7 @@ class TopicPublisher(Publisher):
options = {'durable': conf.amqp_durable_queues,
'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
+
options.update(kwargs)
super(TopicPublisher, self).__init__(channel,
exchange_name,
@@ -448,25 +467,26 @@ class Connection(object):
self.consumers = []
self.consumer_num = itertools.count(1)
self.conf = conf
- self.max_retries = self.conf.rabbit_max_retries
+ self.driver_conf = self.conf.oslo_messaging_rabbit
+ self.max_retries = self.driver_conf.rabbit_max_retries
# Try forever?
if self.max_retries <= 0:
self.max_retries = None
- self.interval_start = self.conf.rabbit_retry_interval
- self.interval_stepping = self.conf.rabbit_retry_backoff
+ self.interval_start = self.driver_conf.rabbit_retry_interval
+ self.interval_stepping = self.driver_conf.rabbit_retry_backoff
# max retry-interval = 30 seconds
self.interval_max = 30
self._ssl_params = self._fetch_ssl_params()
- self._login_method = self.conf.rabbit_login_method
+ self._login_method = self.driver_conf.rabbit_login_method
if url.virtual_host is not None:
virtual_host = url.virtual_host
else:
- virtual_host = self.conf.rabbit_virtual_host
+ virtual_host = self.driver_conf.rabbit_virtual_host
self._url = ''
- if self.conf.fake_rabbit:
+ if self.driver_conf.fake_rabbit:
LOG.warn("Deprecated: fake_rabbit option is deprecated, set "
"rpc_backend to kombu+memory or use the fake "
"driver instead.")
@@ -492,13 +512,13 @@ class Connection(object):
transport = url.transport.replace('kombu+', '')
self._url = "%s://%s" % (transport, virtual_host)
else:
- for adr in self.conf.rabbit_hosts:
+ for adr in self.driver_conf.rabbit_hosts:
hostname, port = netutils.parse_host_port(
- adr, default_port=self.conf.rabbit_port)
+ adr, default_port=self.driver_conf.rabbit_port)
self._url += '%samqp://%s:%s@%s:%s/%s' % (
";" if self._url else '',
- parse.quote(self.conf.rabbit_userid),
- parse.quote(self.conf.rabbit_password),
+ parse.quote(self.driver_conf.rabbit_userid),
+ parse.quote(self.driver_conf.rabbit_password),
hostname, port,
virtual_host)
@@ -516,13 +536,19 @@ class Connection(object):
self.connection.info())
# NOTE(sileht): just ensure the connection is setuped at startup
self.ensure(error_callback=None,
- method=lambda channel: True)
+ method=lambda: True)
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'),
self.connection.info())
+ # NOTE(sileht):
+ # value choosen according the best practice from kombu:
+ # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
+ self._poll_timeout = 1
+
if self._url.startswith('memory://'):
# Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0
+ self._poll_timeout = 0.05
# FIXME(markmc): use oslo sslutils when it is available as a library
_SSL_PROTOCOLS = {
@@ -558,15 +584,15 @@ class Connection(object):
ssl_params = dict()
# http://docs.python.org/library/ssl.html - ssl.wrap_socket
- if self.conf.kombu_ssl_version:
+ if self.driver_conf.kombu_ssl_version:
ssl_params['ssl_version'] = self.validate_ssl_version(
- self.conf.kombu_ssl_version)
- if self.conf.kombu_ssl_keyfile:
- ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
- if self.conf.kombu_ssl_certfile:
- ssl_params['certfile'] = self.conf.kombu_ssl_certfile
- if self.conf.kombu_ssl_ca_certs:
- ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
+ self.driver_conf.kombu_ssl_version)
+ if self.driver_conf.kombu_ssl_keyfile:
+ ssl_params['keyfile'] = self.driver_conf.kombu_ssl_keyfile
+ if self.driver_conf.kombu_ssl_certfile:
+ ssl_params['certfile'] = self.driver_conf.kombu_ssl_certfile
+ if self.driver_conf.kombu_ssl_ca_certs:
+ ssl_params['ca_certs'] = self.driver_conf.kombu_ssl_ca_certs
# We might want to allow variations in the
# future with this?
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
@@ -597,12 +623,11 @@ class Connection(object):
retry = None
def on_error(exc, interval):
- self.channel = None
-
error_callback and error_callback(exc)
- interval = (self.conf.kombu_reconnect_delay + interval
- if self.conf.kombu_reconnect_delay > 0 else interval)
+ interval = (self.driver_conf.kombu_reconnect_delay + interval
+ if self.driver_conf.kombu_reconnect_delay > 0
+ else interval)
info = {'err_str': exc, 'sleep_time': interval}
info.update(self.connection.info())
@@ -626,13 +651,14 @@ class Connection(object):
# use kombu for HA connection, the interval_step
# should sufficient, because the underlying kombu transport
# connection object freed.
- if self.conf.kombu_reconnect_delay > 0:
- time.sleep(self.conf.kombu_reconnect_delay)
+ if self.driver_conf.kombu_reconnect_delay > 0:
+ time.sleep(self.driver_conf.kombu_reconnect_delay)
def on_reconnection(new_channel):
"""Callback invoked when the kombu reconnects and creates
a new channel, we use it the reconfigure our consumers.
"""
+ self._set_current_channel(new_channel)
self.consumer_num = itertools.count(1)
for consumer in self.consumers:
consumer.reconnect(new_channel)
@@ -642,11 +668,15 @@ class Connection(object):
{'hostname': self.connection.hostname,
'port': self.connection.port})
+ def execute_method(channel):
+ self._set_current_channel(channel)
+ method()
+
recoverable_errors = (self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
try:
autoretry_method = self.connection.autoretry(
- method, channel=self.channel,
+ execute_method, channel=self.channel,
max_retries=retry,
errback=on_error,
interval_start=self.interval_start or 1,
@@ -654,10 +684,10 @@ class Connection(object):
on_revive=on_reconnection,
)
ret, channel = autoretry_method()
- self.channel = channel
+ self._set_current_channel(channel)
return ret
except recoverable_errors as exc:
- self.channel = None
+ self._set_current_channel(None)
# NOTE(sileht): number of retry exceeded and the connection
# is still broken
msg = _('Unable to connect to AMQP server on '
@@ -670,17 +700,21 @@ class Connection(object):
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)
+ def _set_current_channel(self, new_channel):
+ if self.channel is not None and new_channel != self.channel:
+ self.connection.maybe_close_channel(self.channel)
+ self.channel = new_channel
+
def close(self):
"""Close/release this connection."""
if self.connection:
+ self._set_current_channel(None)
self.connection.release()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again."""
- if self.channel is not None:
- self.channel.close()
- self.channel = self.connection.channel()
+ self._set_current_channel(self.connection.channel())
self.consumers = []
self.consumer_num = itertools.count(1)
@@ -694,9 +728,9 @@ class Connection(object):
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s"), log_info)
- def _declare_consumer(channel):
- consumer = consumer_cls(self.conf, channel, topic, callback,
- six.next(self.consumer_num))
+ def _declare_consumer():
+ consumer = consumer_cls(self.driver_conf, self.channel, topic,
+ callback, six.next(self.consumer_num))
self.consumers.append(consumer)
return consumer
@@ -718,7 +752,7 @@ class Connection(object):
LOG.exception(_('Failed to consume message from queue: %s'),
exc)
- def _consume(channel):
+ def _consume():
if self.do_consume:
queues_head = self.consumers[:-1] # not fanout.
queues_tail = self.consumers[-1] # fanout
@@ -727,10 +761,8 @@ class Connection(object):
queues_tail.consume(nowait=False)
self.do_consume = False
- # NOTE(sileht):
- # maximun value choosen according the best practice from kombu:
- # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
- poll_timeout = 1 if timeout is None else min(timeout, 1)
+ poll_timeout = (self._poll_timeout if timeout is None
+ else min(timeout, self._poll_timeout))
while True:
if self._consume_loop_stopped:
self._consume_loop_stopped = False
@@ -739,8 +771,8 @@ class Connection(object):
try:
return self.connection.drain_events(timeout=poll_timeout)
except socket.timeout as exc:
- poll_timeout = timer.check_return(_raise_timeout, exc,
- maximum=1)
+ poll_timeout = timer.check_return(
+ _raise_timeout, exc, maximum=self._poll_timeout)
for iteration in itertools.count(0):
if limit and iteration >= limit:
@@ -756,8 +788,9 @@ class Connection(object):
LOG.exception(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)
- def _publish(channel):
- publisher = cls(self.conf, channel, topic=topic, **kwargs)
+ def _publish():
+ publisher = cls(self.driver_conf, self.channel, topic=topic,
+ **kwargs)
publisher.send(msg, timeout)
self.ensure(_error_callback, _publish, retry=retry)
@@ -784,7 +817,31 @@ class Connection(object):
def direct_send(self, msg_id, msg):
"""Send a 'direct' message."""
- self.publisher_send(DirectPublisher, msg_id, msg)
+
+ timer = rpc_common.DecayingTimer(duration=60)
+ timer.start()
+ # NOTE(sileht): retry at least 60sec, after we have a good change
+ # that the caller is really dead too...
+
+ while True:
+ try:
+ self.publisher_send(DirectPublisher, msg_id, msg)
+ except self.connection.channel_errors as exc:
+ # NOTE(noelbk/sileht):
+ # If rabbit dies, the consumer can be disconnected before the
+ # publisher sends, and if the consumer hasn't declared the
+ # queue, the publisher's will send a message to an exchange
+ # that's not bound to a queue, and the message wll be lost.
+ # So we set passive=True to the publisher exchange and catch
+ # the 404 kombu ChannelError and retry until the exchange
+ # appears
+ if exc.code == 404 and timer.check_return() > 0:
+ LOG.info(_LI("The exchange to reply to %s doesn't "
+ "exist yet, retrying...") % msg_id)
+ time.sleep(1)
+ continue
+ raise
+ return
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
"""Send a 'topic' message."""
@@ -818,10 +875,15 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
def __init__(self, conf, url,
default_exchange=None,
allowed_remote_exmods=None):
- conf.register_opts(rabbit_opts)
- conf.register_opts(rpc_amqp.amqp_opts)
-
- connection_pool = rpc_amqp.ConnectionPool(conf, url, Connection)
+ opt_group = cfg.OptGroup(name='oslo_messaging_rabbit',
+ title='RabbitMQ driver options')
+ conf.register_group(opt_group)
+ conf.register_opts(rabbit_opts, group=opt_group)
+ conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
+
+ connection_pool = rpc_amqp.ConnectionPool(
+ conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
+ url, Connection)
super(RabbitDriver, self).__init__(conf, url,
connection_pool,
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index 678b3f5..23961ee 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -25,17 +25,17 @@ import uuid
import eventlet
import greenlet
+from oslo_config import cfg
+from oslo_serialization import jsonutils
+from oslo_utils import excutils
+from oslo_utils import importutils
import six
from six import moves
-from oslo.config import cfg
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._executors import base as executor_base # FIXME(markmc)
from oslo_messaging._i18n import _, _LE
-from oslo.serialization import jsonutils
-from oslo.utils import excutils
-from oslo.utils import importutils
zmq = importutils.try_import('eventlet.green.zmq')
@@ -302,9 +302,9 @@ class InternalContext(object):
data.setdefault('args', {})
try:
- result = proxy.dispatch(
- ctx, data['version'], data['method'],
- data.get('namespace'), **data['args'])
+ if not data.get("method"):
+ raise KeyError
+ result = proxy.dispatch(ctx, data)
return ConsumerBase.normalize_reply(result, ctx.replies)
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
@@ -368,18 +368,13 @@ class ConsumerBase(object):
# Method starting with - are
# processed internally. (non-valid method name)
method = data.get('method')
- if not method:
- LOG.error(_("RPC message did not include method."))
- return
-
# Internal method
# uses internal context for safety.
if method == '-reply':
self.private_ctx.reply(ctx, proxy, **data['args'])
return
- proxy.dispatch(ctx, data['version'],
- data['method'], data.get('namespace'), **data['args'])
+ proxy.dispatch(ctx, data)
class ZmqBaseReactor(ConsumerBase):
@@ -834,16 +829,7 @@ class ZmqListener(base.Listener):
super(ZmqListener, self).__init__(driver)
self.incoming_queue = moves.queue.Queue()
- def dispatch(self, ctxt, version, method, namespace, **kwargs):
- message = {
- 'method': method,
- 'args': kwargs
- }
- if version:
- message['version'] = version
- if namespace:
- message['namespace'] = namespace
-
+ def dispatch(self, ctxt, message):
incoming = ZmqIncomingMessage(self,
ctxt.to_dict(),
message)
diff --git a/oslo_messaging/_drivers/matchmaker.py b/oslo_messaging/_drivers/matchmaker.py
index 0b3ba4b..bea3f7f 100644
--- a/oslo_messaging/_drivers/matchmaker.py
+++ b/oslo_messaging/_drivers/matchmaker.py
@@ -20,8 +20,8 @@ import contextlib
import logging
import eventlet
+from oslo_config import cfg
-from oslo.config import cfg
from oslo_messaging._i18n import _
matchmaker_opts = [
diff --git a/oslo_messaging/_drivers/matchmaker_redis.py b/oslo_messaging/_drivers/matchmaker_redis.py
index 760561d..4ba5447 100644
--- a/oslo_messaging/_drivers/matchmaker_redis.py
+++ b/oslo_messaging/_drivers/matchmaker_redis.py
@@ -16,8 +16,9 @@ The MatchMaker classes should accept a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
-from oslo.config import cfg
-from oslo.utils import importutils
+from oslo_config import cfg
+from oslo_utils import importutils
+
from oslo_messaging._drivers import matchmaker as mm_common
redis = importutils.try_import('redis')
@@ -127,7 +128,7 @@ class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
# No value is needed, we just
# care if it exists. Sets aren't viable
# because only keys can expire.
- pipe.set(key_host, '')
+ pipe.sadd(key_host, '')
pipe.execute()
diff --git a/oslo_messaging/_drivers/matchmaker_ring.py b/oslo_messaging/_drivers/matchmaker_ring.py
index 0b65875..0fd918c 100644
--- a/oslo_messaging/_drivers/matchmaker_ring.py
+++ b/oslo_messaging/_drivers/matchmaker_ring.py
@@ -20,7 +20,8 @@ import itertools
import json
import logging
-from oslo.config import cfg
+from oslo_config import cfg
+
from oslo_messaging._drivers import matchmaker as mm
from oslo_messaging._i18n import _
diff --git a/oslo_messaging/_drivers/protocols/amqp/controller.py b/oslo_messaging/_drivers/protocols/amqp/controller.py
index ea0950e..73f5cf1 100644
--- a/oslo_messaging/_drivers/protocols/amqp/controller.py
+++ b/oslo_messaging/_drivers/protocols/amqp/controller.py
@@ -29,11 +29,11 @@ import logging
import threading
import uuid
+from oslo_config import cfg
import proton
import pyngus
from six import moves
-from oslo.config import cfg
from oslo_messaging._drivers.protocols.amqp import eventloop
from oslo_messaging._drivers.protocols.amqp import opts
from oslo_messaging import transport
diff --git a/oslo_messaging/_drivers/protocols/amqp/driver.py b/oslo_messaging/_drivers/protocols/amqp/driver.py
index 8ec8b9e..ba27f0c 100644
--- a/oslo_messaging/_drivers/protocols/amqp/driver.py
+++ b/oslo_messaging/_drivers/protocols/amqp/driver.py
@@ -25,10 +25,10 @@ import os
import threading
import time
+from oslo_serialization import jsonutils
import proton
from six import moves
-from oslo.serialization import jsonutils
import oslo_messaging
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common
diff --git a/oslo_messaging/_drivers/protocols/amqp/opts.py b/oslo_messaging/_drivers/protocols/amqp/opts.py
index 7aade7a..0eae3be 100644
--- a/oslo_messaging/_drivers/protocols/amqp/opts.py
+++ b/oslo_messaging/_drivers/protocols/amqp/opts.py
@@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
-from oslo.config import cfg
+from oslo_config import cfg
amqp1_opts = [
diff --git a/oslo_messaging/_executors/base.py b/oslo_messaging/_executors/base.py
index d60d656..2cc667e 100644
--- a/oslo_messaging/_executors/base.py
+++ b/oslo_messaging/_executors/base.py
@@ -14,10 +14,9 @@
import abc
+from oslo_config import cfg
import six
-from oslo.config import cfg
-
_pool_opts = [
cfg.IntOpt('rpc_thread_pool_size',
default=64,
diff --git a/oslo_messaging/_executors/impl_eventlet.py b/oslo_messaging/_executors/impl_eventlet.py
index 95d0a2f..555ae03 100644
--- a/oslo_messaging/_executors/impl_eventlet.py
+++ b/oslo_messaging/_executors/impl_eventlet.py
@@ -20,8 +20,8 @@ import eventlet
from eventlet.green import threading as greenthreading
from eventlet import greenpool
import greenlet
+from oslo_utils import excutils
-from oslo.utils import excutils
from oslo_messaging._executors import base
from oslo_messaging import localcontext
diff --git a/oslo_messaging/_executors/impl_thread.py b/oslo_messaging/_executors/impl_thread.py
index 2ed4560..ca8ebc7 100644
--- a/oslo_messaging/_executors/impl_thread.py
+++ b/oslo_messaging/_executors/impl_thread.py
@@ -20,9 +20,9 @@ import sys
import threading
from concurrent import futures
+from oslo_utils import excutils
import six
-from oslo.utils import excutils
from oslo_messaging._executors import base
diff --git a/oslo_messaging/_i18n.py b/oslo_messaging/_i18n.py
index c4a0e7f..a012b58 100644
--- a/oslo_messaging/_i18n.py
+++ b/oslo_messaging/_i18n.py
@@ -16,10 +16,10 @@ See http://docs.openstack.org/developer/oslo.i18n/usage.html
"""
-from oslo import i18n
+import oslo_i18n
-_translators = i18n.TranslatorFactory(domain='oslo.messaging')
+_translators = oslo_i18n.TranslatorFactory(domain='oslo.messaging')
# The primary translation function using the well-known name "_"
_ = _translators.primary
diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py
index c41da38..bda151f 100644
--- a/oslo_messaging/conffixture.py
+++ b/oslo_messaging/conffixture.py
@@ -20,9 +20,9 @@ import sys
import fixtures
-def _import_opts(conf, module, opts):
+def _import_opts(conf, module, opts, group=None):
__import__(module)
- conf.register_opts(getattr(sys.modules[module], opts))
+ conf.register_opts(getattr(sys.modules[module], opts), group=group)
class ConfFixture(fixtures.Fixture):
@@ -45,11 +45,17 @@ class ConfFixture(fixtures.Fixture):
def __init__(self, conf):
self.conf = conf
_import_opts(self.conf,
- 'oslo_messaging._drivers.impl_rabbit', 'rabbit_opts')
+ 'oslo_messaging._drivers.impl_rabbit', 'rabbit_opts',
+ 'oslo_messaging_rabbit')
_import_opts(self.conf,
- 'oslo_messaging._drivers.impl_qpid', 'qpid_opts')
+ 'oslo_messaging._drivers.amqp', 'amqp_opts',
+ 'oslo_messaging_rabbit')
_import_opts(self.conf,
- 'oslo_messaging._drivers.amqp', 'amqp_opts')
+ 'oslo_messaging._drivers.impl_qpid', 'qpid_opts',
+ 'oslo_messaging_qpid')
+ _import_opts(self.conf,
+ 'oslo_messaging._drivers.amqp', 'amqp_opts',
+ 'oslo_messaging_qpid')
_import_opts(self.conf, 'oslo_messaging.rpc.client', '_client_opts')
_import_opts(self.conf, 'oslo_messaging.transport', '_transport_opts')
_import_opts(self.conf,
diff --git a/oslo_messaging/notify/__init__.py b/oslo_messaging/notify/__init__.py
index c5032db..dd5304d 100644
--- a/oslo_messaging/notify/__init__.py
+++ b/oslo_messaging/notify/__init__.py
@@ -17,9 +17,11 @@ __all__ = ['Notifier',
'LoggingNotificationHandler',
'get_notification_listener',
'NotificationResult',
+ 'NotificationFilter',
'PublishErrorsHandler',
'LoggingErrorNotificationHandler']
+from .filter import NotificationFilter
from .notifier import *
from .listener import *
from .log_handler import *
diff --git a/oslo_messaging/notify/_impl_log.py b/oslo_messaging/notify/_impl_log.py
index e04aa61..400f556 100644
--- a/oslo_messaging/notify/_impl_log.py
+++ b/oslo_messaging/notify/_impl_log.py
@@ -17,7 +17,8 @@
import logging
-from oslo.serialization import jsonutils
+from oslo_serialization import jsonutils
+
from oslo_messaging.notify import notifier
diff --git a/oslo_messaging/notify/_impl_routing.py b/oslo_messaging/notify/_impl_routing.py
index 666ad4d..b420870 100644
--- a/oslo_messaging/notify/_impl_routing.py
+++ b/oslo_messaging/notify/_impl_routing.py
@@ -16,11 +16,11 @@
import fnmatch
import logging
+from oslo_config import cfg
import six
from stevedore import dispatch
import yaml
-from oslo.config import cfg
from oslo_messaging._i18n import _
from oslo_messaging.notify import notifier
diff --git a/oslo_messaging/notify/dispatcher.py b/oslo_messaging/notify/dispatcher.py
index a9e8cc6..48c4e20 100644
--- a/oslo_messaging/notify/dispatcher.py
+++ b/oslo_messaging/notify/dispatcher.py
@@ -56,7 +56,9 @@ class NotificationDispatcher(object):
for endpoint, prio in itertools.product(endpoints, PRIORITIES):
if hasattr(endpoint, prio):
method = getattr(endpoint, prio)
- self._callbacks_by_priority.setdefault(prio, []).append(method)
+ screen = getattr(endpoint, 'filter_rule', None)
+ self._callbacks_by_priority.setdefault(prio, []).append(
+ (screen, method))
priorities = self._callbacks_by_priority.keys()
self._targets_priorities = set(itertools.product(self.targets,
@@ -118,7 +120,10 @@ class NotificationDispatcher(object):
payload = self.serializer.deserialize_entity(ctxt,
message.get('payload'))
- for callback in self._callbacks_by_priority.get(priority, []):
+ for screen, callback in self._callbacks_by_priority.get(priority, []):
+ if screen and not screen.match(ctxt, publisher_id, event_type,
+ metadata, payload):
+ continue
localcontext.set_local_context(ctxt)
try:
if executor_callback:
diff --git a/oslo_messaging/notify/filter.py b/oslo_messaging/notify/filter.py
new file mode 100644
index 0000000..b23fac4
--- /dev/null
+++ b/oslo_messaging/notify/filter.py
@@ -0,0 +1,77 @@
+#
+# Copyright 2013 eNovance
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import re
+
+
+class NotificationFilter(object):
+
+ """Filter notification messages
+
+ The NotificationFilter class is used to filter notifications that an
+ endpoint will received.
+
+ The notification can be filter on different fields: context,
+ publisher_id, event_type, metadata and payload.
+
+ The filter is done via a regular expression
+
+ filter_rule = NotificationFilter(
+ publisher_id='^compute.*',
+ context={'tenant_id': '^5f643cfc-664b-4c69-8000-ce2ed7b08216$',
+ 'roles='private'},
+ event_type='^compute\.instance\..*',
+ metadata={'timestamp': 'Aug'},
+ payload={'state': '^active$')
+
+ """
+
+ def __init__(self, context=None, publisher_id=None, event_type=None,
+ metadata=None, payload=None):
+ self._regex_publisher_id = None
+ self._regex_event_type = None
+
+ if publisher_id is not None:
+ self._regex_publisher_id = re.compile(publisher_id)
+ if event_type is not None:
+ self._regex_event_type = re.compile(event_type)
+ self._regexs_context = self._build_regex_dict(context)
+ self._regexs_metadata = self._build_regex_dict(metadata)
+ self._regexs_payload = self._build_regex_dict(payload)
+
+ @staticmethod
+ def _build_regex_dict(regex_list):
+ if regex_list is None:
+ return {}
+ return dict((k, re.compile(regex_list[k])) for k in regex_list)
+
+ @staticmethod
+ def _check_for_mismatch(data, regex):
+ if isinstance(regex, dict):
+ for k in regex:
+ if (k not in data or not regex[k].match(data[k])):
+ return True
+ elif regex is not None and not regex.match(data):
+ return True
+ return False
+
+ def match(self, context, publisher_id, event_type, metadata, payload):
+ if (self._check_for_mismatch(publisher_id, self._regex_publisher_id) or
+ self._check_for_mismatch(event_type, self._regex_event_type) or
+ self._check_for_mismatch(context, self._regexs_context) or
+ self._check_for_mismatch(metadata, self._regexs_metadata) or
+ self._check_for_mismatch(payload, self._regexs_payload)):
+ return False
+ return True
diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py
index a1586dd..1aa4613 100644
--- a/oslo_messaging/notify/listener.py
+++ b/oslo_messaging/notify/listener.py
@@ -40,14 +40,19 @@ and eventlet executors available.
A simple example of a notification listener with multiple endpoints might be::
- from oslo.config import cfg
+ from oslo_config import cfg
import oslo_messaging
class NotificationEndpoint(object):
+ filter_rule = NotificationFilter(publisher_id='^compute.*')
+
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
class ErrorEndpoint(object):
+ filter_rule = NotificationFilter(event_type='^instance\..*\.start$',
+ context={'ctxt_key': 'regexp'})
+
def error(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
@@ -69,7 +74,8 @@ A simple example of a notification listener with multiple endpoints might be::
A notifier sends a notification on a topic with a priority, the notification
listener will receive this notification if the topic of this one have been set
in one of the targets and if an endpoint implements the method named like the
-priority
+priority and if the notification match the NotificationFilter rule set into
+the filter_rule attribute of the endpoint.
Parameters to endpoint methods are the request context supplied by the client,
the publisher_id of the notification message, the event_type, the payload and
diff --git a/oslo_messaging/notify/log_handler.py b/oslo_messaging/notify/log_handler.py
index dbbc67f..eb4b35c 100644
--- a/oslo_messaging/notify/log_handler.py
+++ b/oslo_messaging/notify/log_handler.py
@@ -12,7 +12,7 @@
import logging
-from oslo.config import cfg
+from oslo_config import cfg
class LoggingErrorNotificationHandler(logging.Handler):
diff --git a/oslo_messaging/notify/logger.py b/oslo_messaging/notify/logger.py
index eb8e445..4117c2c 100644
--- a/oslo_messaging/notify/logger.py
+++ b/oslo_messaging/notify/logger.py
@@ -16,7 +16,8 @@ Driver for the Python logging package that sends log records as a notification.
"""
import logging
-from oslo.config import cfg
+from oslo_config import cfg
+
from oslo_messaging.notify import notifier
from oslo_messaging import transport
diff --git a/oslo_messaging/notify/middleware.py b/oslo_messaging/notify/middleware.py
index 5529713..4297e29 100644
--- a/oslo_messaging/notify/middleware.py
+++ b/oslo_messaging/notify/middleware.py
@@ -21,11 +21,11 @@ import os.path
import sys
import traceback as tb
+from oslo_config import cfg
+from oslo_middleware import base
import six
import webob.dec
-from oslo.config import cfg
-from oslo.middleware import base
import oslo_messaging
from oslo_messaging._i18n import _LE
from oslo_messaging import notify
diff --git a/oslo_messaging/notify/notifier.py b/oslo_messaging/notify/notifier.py
index 7cc3d9c..5565d72 100644
--- a/oslo_messaging/notify/notifier.py
+++ b/oslo_messaging/notify/notifier.py
@@ -19,11 +19,11 @@ import abc
import logging
import uuid
+from oslo_config import cfg
+from oslo_utils import timeutils
import six
from stevedore import named
-from oslo.config import cfg
-from oslo.utils import timeutils
from oslo_messaging import serializer as msg_serializer
_notifier_opts = [
diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py
index 4008465..664e586 100644
--- a/oslo_messaging/opts.py
+++ b/oslo_messaging/opts.py
@@ -34,9 +34,6 @@ from oslo_messaging.rpc import client
from oslo_messaging import transport
_global_opt_lists = [
- amqp.amqp_opts,
- impl_qpid.qpid_opts,
- impl_rabbit.rabbit_opts,
impl_zmq.zmq_opts,
matchmaker.matchmaker_opts,
base._pool_opts,
@@ -50,6 +47,10 @@ _opts = [
('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts),
('matchmaker_ring', matchmaker_ring.matchmaker_opts),
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
+ ('oslo_messaging_rabbit', list(itertools.chain(amqp.amqp_opts,
+ impl_rabbit.rabbit_opts))),
+ ('oslo_messaging_qpid', list(itertools.chain(amqp.amqp_opts,
+ impl_qpid.qpid_opts)))
]
diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py
index 8ac73a1..3ea7034 100644
--- a/oslo_messaging/rpc/client.py
+++ b/oslo_messaging/rpc/client.py
@@ -23,9 +23,9 @@ __all__ = [
'RemoteError',
]
+from oslo_config import cfg
import six
-from oslo.config import cfg
from oslo_messaging._drivers import base as driver_base
from oslo_messaging import _utils as utils
from oslo_messaging import exceptions
diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py
index f1cbc12..7f6295b 100644
--- a/oslo_messaging/rpc/server.py
+++ b/oslo_messaging/rpc/server.py
@@ -42,7 +42,7 @@ complete.
A simple example of an RPC server with multiple endpoints might be::
- from oslo.config import cfg
+ from oslo_config import cfg
import oslo_messaging
class ServerControlEndpoint(object):
diff --git a/oslo_messaging/tests/drivers/test_impl_qpid.py b/oslo_messaging/tests/drivers/test_impl_qpid.py
index 1c90831..2d7dd6a 100644
--- a/oslo_messaging/tests/drivers/test_impl_qpid.py
+++ b/oslo_messaging/tests/drivers/test_impl_qpid.py
@@ -187,7 +187,8 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
def setUp(self):
super(TestQpidInvalidTopologyVersion, self).setUp()
- self.config(qpid_topology_version=-1)
+ self.config(qpid_topology_version=-1,
+ group='oslo_messaging_qpid')
def test_invalid_topology_version(self):
def consumer_callback(msg):
@@ -199,11 +200,11 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
# 1. qpid driver raises Exception(msg) for invalid topology version
# 2. flake8 - H202 assertRaises Exception too broad
exception_msg = ("Invalid value for qpid_topology_version: %d" %
- self.conf.qpid_topology_version)
+ self.conf.oslo_messaging_qpid.qpid_topology_version)
recvd_exc_msg = ''
try:
- self.consumer_cls(self.conf,
+ self.consumer_cls(self.conf.oslo_messaging_qpid,
self.session_receive,
msgid_or_topic,
consumer_callback,
@@ -215,7 +216,7 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
recvd_exc_msg = ''
try:
- self.publisher_cls(self.conf,
+ self.publisher_cls(self.conf.oslo_messaging_qpid,
self.session_send,
topic=msgid_or_topic,
**self.publisher_kwargs)
@@ -258,13 +259,15 @@ class TestQpidDirectConsumerPublisher(_QpidBaseTestCase):
self.msgid = str(random.randint(1, 100))
# create a DirectConsumer and DirectPublisher class objects
- self.dir_cons = qpid_driver.DirectConsumer(self.conf,
- self.session_receive,
- self.msgid,
- self.consumer_callback)
- self.dir_pub = qpid_driver.DirectPublisher(self.conf,
- self.session_send,
- self.msgid)
+ self.dir_cons = qpid_driver.DirectConsumer(
+ self.conf.oslo_messaging_qpid,
+ self.session_receive,
+ self.msgid,
+ self.consumer_callback)
+ self.dir_pub = qpid_driver.DirectPublisher(
+ self.conf.oslo_messaging_qpid,
+ self.session_send,
+ self.msgid)
def try_send_msg(no_msgs):
for i in range(no_msgs):
@@ -418,7 +421,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase):
def test_qpid_topic_and_fanout(self):
for receiver_id in range(self.no_receivers):
- consumer = self.consumer_cls(self.conf,
+ consumer = self.consumer_cls(self.conf.oslo_messaging_qpid,
self.session_receive,
self.receive_topic,
self.consumer_callback,
@@ -431,7 +434,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase):
self._receiver_threads.append(thread)
for sender_id in range(self.no_senders):
- publisher = self.publisher_cls(self.conf,
+ publisher = self.publisher_cls(self.conf.oslo_messaging_qpid,
self.session_send,
topic=self.topic,
**self.publisher_kwargs)
@@ -483,7 +486,8 @@ class TestDriverInterface(_QpidBaseTestCase):
def setUp(self):
super(TestDriverInterface, self).setUp()
- self.config(qpid_topology_version=2)
+ self.config(qpid_topology_version=2,
+ group='oslo_messaging_qpid')
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
@@ -554,7 +558,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase):
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
brokers_count = len(brokers)
- self.config(qpid_hosts=brokers)
+ self.config(qpid_hosts=brokers,
+ group='oslo_messaging_qpid')
with mock.patch('qpid.messaging.Connection') as conn_mock:
# starting from the first broker in the list
@@ -777,7 +782,8 @@ class QPidHATestCase(test_utils.BaseTestCase):
self.config(qpid_hosts=self.brokers,
qpid_username=None,
- qpid_password=None)
+ qpid_password=None,
+ group='oslo_messaging_qpid')
hostname_sets = set()
self.info = {'attempt': 0,
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 3aace8b..e60bd3b 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -21,11 +21,11 @@ import uuid
import fixtures
import kombu
import mock
+from oslo_config import cfg
+from oslo_serialization import jsonutils
from oslotest import mockpatch
import testscenarios
-from oslo.config import cfg
-from oslo.serialization import jsonutils
import oslo_messaging
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as driver_common
@@ -41,7 +41,7 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase):
super(TestDeprecatedRabbitDriverLoad, self).setUp(
conf=cfg.ConfigOpts())
self.messaging_conf.transport_driver = 'rabbit'
- self.config(fake_rabbit=True)
+ self.config(fake_rabbit=True, group="oslo_messaging_rabbit")
def test_driver_load(self):
transport = oslo_messaging.get_transport(self.conf)
@@ -673,7 +673,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
self.config(rabbit_hosts=self.brokers,
rabbit_retry_interval=0.01,
rabbit_retry_backoff=0.01,
- kombu_reconnect_delay=0)
+ kombu_reconnect_delay=0,
+ group="oslo_messaging_rabbit")
self.kombu_connect = mock.Mock()
self.useFixture(mockpatch.Patch(
diff --git a/oslo_messaging/tests/drivers/test_impl_zmq.py b/oslo_messaging/tests/drivers/test_impl_zmq.py
index d3b03f6..5593884 100644
--- a/oslo_messaging/tests/drivers/test_impl_zmq.py
+++ b/oslo_messaging/tests/drivers/test_impl_zmq.py
@@ -18,14 +18,12 @@ import socket
import fixtures
import mock
+from oslo_utils import importutils
import testtools
-from oslo.utils import importutils
import oslo_messaging
from oslo_messaging.tests import utils as test_utils
-# NOTE(jamespage) the zmq driver implementation is currently tied
-# to eventlet so we have to monkey_patch to support testing
# eventlet is not yet py3 compatible, so skip if not installed
eventlet = importutils.try_import('eventlet')
@@ -44,34 +42,12 @@ def get_unused_port():
return port
-class TestConfZmqDriverLoad(test_utils.BaseTestCase):
-
- @testtools.skipIf(impl_zmq is None, "zmq not available")
- def setUp(self):
- super(TestConfZmqDriverLoad, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
-
- def test_driver_load(self):
- transport = oslo_messaging.get_transport(self.conf)
- self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
-
-
-class stopRpc(object):
- def __init__(self, attrs):
- self.attrs = attrs
-
- def __call__(self):
- if self.attrs['reactor']:
- self.attrs['reactor'].close()
- if self.attrs['driver']:
- self.attrs['driver'].cleanup()
-
-
-class TestZmqBasics(test_utils.BaseTestCase):
+class ZmqBaseTestCase(test_utils.BaseTestCase):
+ """Base test case for all ZMQ tests that make use of the ZMQ Proxy"""
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
- super(TestZmqBasics, self).setUp()
+ super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = oslo_messaging.get_transport(self.conf)
@@ -94,6 +70,32 @@ class TestZmqBasics(test_utils.BaseTestCase):
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
+
+class TestConfZmqDriverLoad(test_utils.BaseTestCase):
+
+ @testtools.skipIf(impl_zmq is None, "zmq not available")
+ def setUp(self):
+ super(TestConfZmqDriverLoad, self).setUp()
+ self.messaging_conf.transport_driver = 'zmq'
+
+ def test_driver_load(self):
+ transport = oslo_messaging.get_transport(self.conf)
+ self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
+
+
+class stopRpc(object):
+ def __init__(self, attrs):
+ self.attrs = attrs
+
+ def __call__(self):
+ if self.attrs['reactor']:
+ self.attrs['reactor'].close()
+ if self.attrs['driver']:
+ self.attrs['driver'].cleanup()
+
+
+class TestZmqBasics(ZmqBaseTestCase):
+
def test_start_stop_listener(self):
target = oslo_messaging.Target(topic='testtopic')
listener = self.driver.listen(target)
@@ -277,32 +279,7 @@ class TestZmqIncomingMessage(test_utils.BaseTestCase):
msg.requeue()
-class TestZmqConnection(test_utils.BaseTestCase):
-
- @testtools.skipIf(impl_zmq is None, "zmq not available")
- def setUp(self):
- super(TestZmqConnection, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
- # Get driver
- transport = oslo_messaging.get_transport(self.conf)
- self.driver = transport._driver
-
- # Set config values
- self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
- kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
- 'rpc_zmq_host': '127.0.0.1',
- 'rpc_response_timeout': 5,
- 'rpc_zmq_port': get_unused_port(),
- 'rpc_zmq_ipc_dir': self.internal_ipc_dir}
- self.config(**kwargs)
-
- # Start RPC
- LOG.info("Running internal zmq receiver.")
- self.reactor = impl_zmq.ZmqProxy(self.conf)
- self.reactor.consume_in_thread()
-
- self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
- self.addCleanup(stopRpc(self.__dict__))
+class TestZmqConnection(ZmqBaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer(self, mock_reactor):
@@ -379,32 +356,7 @@ class TestZmqConnection(test_utils.BaseTestCase):
self.assertTrue(conn.reactor.consume_in_thread.called)
-class TestZmqListener(test_utils.BaseTestCase):
-
- @testtools.skipIf(impl_zmq is None, "zmq not available")
- def setUp(self):
- super(TestZmqListener, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
- # Get driver
- transport = oslo_messaging.get_transport(self.conf)
- self.driver = transport._driver
-
- # Set config values
- self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
- kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
- 'rpc_zmq_host': '127.0.0.1',
- 'rpc_response_timeout': 5,
- 'rpc_zmq_port': get_unused_port(),
- 'rpc_zmq_ipc_dir': self.internal_ipc_dir}
- self.config(**kwargs)
-
- # Start RPC
- LOG.info("Running internal zmq receiver.")
- self.reactor = impl_zmq.ZmqProxy(self.conf)
- self.reactor.consume_in_thread()
-
- self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
- self.addCleanup(stopRpc(self.__dict__))
+class TestZmqListener(ZmqBaseTestCase):
def test_zmqlistener_no_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
@@ -416,40 +368,16 @@ class TestZmqListener(test_utils.BaseTestCase):
kwargs = {'a': 1, 'b': 2}
m = mock.Mock()
ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
- eventlet.spawn_n(listener.dispatch, ctxt, 0,
- m.fake_method, 'name.space', **kwargs)
+ message = {'namespace': 'name.space', 'method': m.fake_method,
+ 'args': kwargs}
+ eventlet.spawn_n(listener.dispatch, ctxt, message)
resp = listener.poll(timeout=10)
msg = {'method': m.fake_method, 'namespace': 'name.space',
'args': kwargs}
self.assertEqual(resp.message, msg)
-class TestZmqDriver(test_utils.BaseTestCase):
-
- @testtools.skipIf(impl_zmq is None, "zmq not available")
- def setUp(self):
- super(TestZmqDriver, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
- # Get driver
- transport = oslo_messaging.get_transport(self.conf)
- self.driver = transport._driver
-
- # Set config values
- self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
- kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
- 'rpc_zmq_host': '127.0.0.1',
- 'rpc_response_timeout': 5,
- 'rpc_zmq_port': get_unused_port(),
- 'rpc_zmq_ipc_dir': self.internal_ipc_dir}
- self.config(**kwargs)
-
- # Start RPC
- LOG.info("Running internal zmq receiver.")
- self.reactor = impl_zmq.ZmqProxy(self.conf)
- self.reactor.consume_in_thread()
-
- self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
- self.addCleanup(stopRpc(self.__dict__))
+class TestZmqDriver(ZmqBaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
diff --git a/oslo_messaging/tests/drivers/test_matchmaker.py b/oslo_messaging/tests/drivers/test_matchmaker.py
index 40608f4..61c37a9 100644
--- a/oslo_messaging/tests/drivers/test_matchmaker.py
+++ b/oslo_messaging/tests/drivers/test_matchmaker.py
@@ -12,9 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
+from oslo_utils import importutils
import testtools
-from oslo.utils import importutils
from oslo_messaging.tests import utils as test_utils
# NOTE(jamespage) matchmaker tied directly to eventlet
diff --git a/oslo_messaging/tests/drivers/test_matchmaker_redis.py b/oslo_messaging/tests/drivers/test_matchmaker_redis.py
index 19f6bc1..a0c4d7f 100644
--- a/oslo_messaging/tests/drivers/test_matchmaker_redis.py
+++ b/oslo_messaging/tests/drivers/test_matchmaker_redis.py
@@ -12,9 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
+from oslo_utils import importutils
import testtools
-from oslo.utils import importutils
from oslo_messaging.tests import utils as test_utils
redis = importutils.try_import('redis')
diff --git a/oslo_messaging/tests/drivers/test_matchmaker_ring.py b/oslo_messaging/tests/drivers/test_matchmaker_ring.py
index 48c5a65..5f15600 100644
--- a/oslo_messaging/tests/drivers/test_matchmaker_ring.py
+++ b/oslo_messaging/tests/drivers/test_matchmaker_ring.py
@@ -12,9 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
+from oslo_utils import importutils
import testtools
-from oslo.utils import importutils
from oslo_messaging.tests import utils as test_utils
# NOTE(jamespage) matchmaker tied directly to eventlet
diff --git a/oslo_messaging/tests/functional/gate/gate_hook.sh b/oslo_messaging/tests/functional/gate/gate_hook.sh
new file mode 100755
index 0000000..c181c95
--- /dev/null
+++ b/oslo_messaging/tests/functional/gate/gate_hook.sh
@@ -0,0 +1,52 @@
+#!/bin/bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# This script is executed inside gate_hook function in devstack gate.
+
+RPC_BACKEND=$1
+
+DEVSTACK_LOCAL_CONFIG=""
+
+case $RPC_BACKEND in
+ rabbit)
+ DEVSTACK_LOCAL_CONFIG+=$'RABBIT_HOST=127.0.0.1\n'
+ DEVSTACK_LOCAL_CONFIG+=$'RABBIT_USERID=stackrabbit\n'
+ DEVSTACK_LOCAL_CONFIG+=$'RABBIT_PASSWORD=secretrabbit\n'
+ ;;
+ qpid)
+ export DEVSTACK_GATE_QPID=1
+ DEVSTACK_LOCAL_CONFIG+=$'QPID_HOST=127.0.0.1\n'
+ DEVSTACK_LOCAL_CONFIG+=$'QPID_USERNAME=stackqpid\n'
+ DEVSTACK_LOCAL_CONFIG+=$'QPID_PASSWORD=secretqpid\n'
+ ;;
+ amqp1)
+ export DEVSTACK_GATE_QPID=1
+ DEVSTACK_LOCAL_CONFIG+=$'QPID_HOST=127.0.0.1\n'
+ DEVSTACK_LOCAL_CONFIG+=$'QPID_USERNAME=stackqpid\n'
+ DEVSTACK_LOCAL_CONFIG+=$'QPID_PASSWORD=secretqpid\n'
+ DEVSTACK_LOCAL_CONFIG+=$'RPC_MESSAGING_PROTOCOL=AMQP1\n'
+ ;;
+ zeromq)
+ export DEVSTACK_GATE_ZEROMQ=1
+ DEVSTACK_LOCAL_CONFIG+=$'ZEROMQ_MATCHMAKER=redis\n'
+ DEVSTACK_LOCAL_CONFIG+=$'MATCHMAKER_REDIS_HOST=127.0.0.1\n'
+ ;;
+esac
+
+export DEVSTACK_LOCAL_CONFIG
+export DEVSTACK_GATE_INSTALL_TESTONLY=1
+export DEVSTACK_GATE_NO_SERVICES=1
+export KEEP_LOCALRC=1
+
+$BASE/new/devstack-gate/devstack-vm-gate.sh
diff --git a/oslo_messaging/tests/functional/gate/post_test_hook.sh b/oslo_messaging/tests/functional/gate/post_test_hook.sh
new file mode 100755
index 0000000..df6c51a
--- /dev/null
+++ b/oslo_messaging/tests/functional/gate/post_test_hook.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# This script is executed inside post_test_hook function in devstack gate.
+
+RPC_BACKEND=$1
+
+cd $BASE/new/oslo.messaging
+sudo -H -u stack tox -e py27-func-$RPC_BACKEND
diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py
index bbdd11b..e9d68ba 100644
--- a/oslo_messaging/tests/functional/utils.py
+++ b/oslo_messaging/tests/functional/utils.py
@@ -17,9 +17,9 @@ import time
import uuid
import fixtures
+from oslo_config import cfg
from six import moves
-from oslo.config import cfg
import oslo_messaging
from oslo_messaging.notify import notifier
from oslo_messaging.tests import utils as test_utils
@@ -270,6 +270,9 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
self.url = os.environ.get('TRANSPORT_URL')
if not self.url:
self.skipTest("No transport url configured")
+ zmq_matchmaker = os.environ.get('ZMQ_MATCHMAKER')
+ if zmq_matchmaker:
+ self.conf.rpc_zmq_matchmaker = zmq_matchmaker
class NotificationFixture(fixtures.Fixture):
diff --git a/oslo_messaging/tests/notify/test_dispatcher.py b/oslo_messaging/tests/notify/test_dispatcher.py
index 029f737..0449f8a 100644
--- a/oslo_messaging/tests/notify/test_dispatcher.py
+++ b/oslo_messaging/tests/notify/test_dispatcher.py
@@ -16,9 +16,9 @@
import itertools
import mock
+from oslo_utils import timeutils
import testscenarios
-from oslo.utils import timeutils
import oslo_messaging
from oslo_messaging.notify import dispatcher as notify_dispatcher
from oslo_messaging.tests import utils as test_utils
@@ -147,3 +147,107 @@ class TestDispatcher(test_utils.BaseTestCase):
callback()
mylog.warning.assert_called_once_with('Unknown priority "%s"',
'what???')
+
+
+class TestDispatcherFilter(test_utils.BaseTestCase):
+ scenarios = [
+ ('publisher_id_match',
+ dict(filter_rule=dict(publisher_id='^compute.*'),
+ publisher_id='compute01.manager',
+ event_type='instance.create.start',
+ context={},
+ match=True)),
+ ('publisher_id_nomatch',
+ dict(filter_rule=dict(publisher_id='^compute.*'),
+ publisher_id='network01.manager',
+ event_type='instance.create.start',
+ context={},
+ match=False)),
+ ('event_type_match',
+ dict(filter_rule=dict(event_type='^instance\.create'),
+ publisher_id='compute01.manager',
+ event_type='instance.create.start',
+ context={},
+ match=True)),
+ ('event_type_nomatch',
+ dict(filter_rule=dict(event_type='^instance\.delete'),
+ publisher_id='compute01.manager',
+ event_type='instance.create.start',
+ context={},
+ match=False)),
+ ('context_match',
+ dict(filter_rule=dict(context={'user': '^adm'}),
+ publisher_id='compute01.manager',
+ event_type='instance.create.start',
+ context={'user': 'admin'},
+ match=True)),
+ ('context_key_missing',
+ dict(filter_rule=dict(context={'user': '^adm'}),
+ publisher_id='compute01.manager',
+ event_type='instance.create.start',
+ context={'project': 'admin'},
+ metadata={},
+ match=False)),
+ ('metadata_match',
+ dict(filter_rule=dict(metadata={'message_id': '^99'}),
+ publisher_id='compute01.manager',
+ event_type='instance.create.start',
+ context={},
+ match=True)),
+ ('metadata_key_missing',
+ dict(filter_rule=dict(metadata={'user': '^adm'}),
+ publisher_id='compute01.manager',
+ event_type='instance.create.start',
+ context={},
+ match=False)),
+ ('payload_match',
+ dict(filter_rule=dict(payload={'state': '^active$'}),
+ publisher_id='compute01.manager',
+ event_type='instance.create.start',
+ context={},
+ match=True)),
+ ('payload_no_match',
+ dict(filter_rule=dict(payload={'state': '^deleted$'}),
+ publisher_id='compute01.manager',
+ event_type='instance.create.start',
+ context={},
+ match=False)),
+ ('payload_key_missing',
+ dict(filter_rule=dict(payload={'user': '^adm'}),
+ publisher_id='compute01.manager',
+ event_type='instance.create.start',
+ context={},
+ match=False)),
+ ('mix_match',
+ dict(filter_rule=dict(event_type='^instance\.create',
+ publisher_id='^compute',
+ context={'user': '^adm'}),
+ publisher_id='compute01.manager',
+ event_type='instance.create.start',
+ context={'user': 'admin'},
+ match=True)),
+
+ ]
+
+ def test_filters(self):
+ notification_filter = oslo_messaging.NotificationFilter(
+ **self.filter_rule)
+ endpoint = mock.Mock(spec=['info'], filter_rule=notification_filter)
+
+ targets = [oslo_messaging.Target(topic='notifications')]
+ dispatcher = notify_dispatcher.NotificationDispatcher(
+ targets, [endpoint], serializer=None, allow_requeue=True)
+ message = {'payload': {'state': 'active'},
+ 'priority': 'info',
+ 'publisher_id': self.publisher_id,
+ 'event_type': self.event_type,
+ 'timestamp': '2014-03-03 18:21:04.369234',
+ 'message_id': '99863dda-97f0-443a-a0c1-6ed317b7fd45'}
+ incoming = mock.Mock(ctxt=self.context, message=message)
+ with dispatcher(incoming) as callback:
+ callback()
+
+ if self.match:
+ self.assertEqual(1, endpoint.info.call_count)
+ else:
+ self.assertEqual(0, endpoint.info.call_count)
diff --git a/oslo_messaging/tests/notify/test_listener.py b/oslo_messaging/tests/notify/test_listener.py
index 3c1e404..0bd032c 100644
--- a/oslo_messaging/tests/notify/test_listener.py
+++ b/oslo_messaging/tests/notify/test_listener.py
@@ -17,9 +17,9 @@ import threading
import time
import mock
+from oslo_config import cfg
import testscenarios
-from oslo.config import cfg
import oslo_messaging
from oslo_messaging.notify import dispatcher
from oslo_messaging.tests import utils as test_utils
diff --git a/oslo_messaging/tests/notify/test_logger.py b/oslo_messaging/tests/notify/test_logger.py
index c551493..693249b 100644
--- a/oslo_messaging/tests/notify/test_logger.py
+++ b/oslo_messaging/tests/notify/test_logger.py
@@ -19,10 +19,10 @@ import os
import sys
import mock
+from oslo_utils import timeutils
import testscenarios
import testtools
-from oslo.utils import timeutils
import oslo_messaging
from oslo_messaging.tests.notify import test_notifier
from oslo_messaging.tests import utils as test_utils
@@ -56,7 +56,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
# eventlet
logging.logThreads = 0
- @mock.patch('oslo.utils.timeutils.utcnow')
+ @mock.patch('oslo_utils.timeutils.utcnow')
def test_logger(self, mock_utcnow):
with mock.patch('oslo_messaging.transport.get_transport',
return_value=test_notifier._FakeTransport(self.conf)):
@@ -99,7 +99,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
@testtools.skipUnless(hasattr(logging.config, 'dictConfig'),
"Need logging.config.dictConfig (Python >= 2.7)")
- @mock.patch('oslo.utils.timeutils.utcnow')
+ @mock.patch('oslo_utils.timeutils.utcnow')
def test_logging_conf(self, mock_utcnow):
with mock.patch('oslo_messaging.transport.get_transport',
return_value=test_notifier._FakeTransport(self.conf)):
diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py
index 6b94d15..a8733f7 100644
--- a/oslo_messaging/tests/notify/test_notifier.py
+++ b/oslo_messaging/tests/notify/test_notifier.py
@@ -20,13 +20,13 @@ import uuid
import fixtures
import mock
+from oslo_serialization import jsonutils
+from oslo_utils import timeutils
from stevedore import dispatch
from stevedore import extension
import testscenarios
import yaml
-from oslo.serialization import jsonutils
-from oslo.utils import timeutils
import oslo_messaging
from oslo_messaging.notify import _impl_log
from oslo_messaging.notify import _impl_messaging
@@ -147,7 +147,7 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
self.stubs.Set(_impl_messaging, 'LOG', self.logger)
self.stubs.Set(msg_notifier, '_LOG', self.logger)
- @mock.patch('oslo.utils.timeutils.utcnow')
+ @mock.patch('oslo_utils.timeutils.utcnow')
def test_notifier(self, mock_utcnow):
drivers = []
if self.v1:
@@ -223,7 +223,7 @@ class TestSerializer(test_utils.BaseTestCase):
super(TestSerializer, self).setUp()
self.addCleanup(_impl_test.reset)
- @mock.patch('oslo.utils.timeutils.utcnow')
+ @mock.patch('oslo_utils.timeutils.utcnow')
def test_serializer(self, mock_utcnow):
transport = _FakeTransport(self.conf)
@@ -266,7 +266,7 @@ class TestSerializer(test_utils.BaseTestCase):
class TestLogNotifier(test_utils.BaseTestCase):
- @mock.patch('oslo.utils.timeutils.utcnow')
+ @mock.patch('oslo_utils.timeutils.utcnow')
def test_notifier(self, mock_utcnow):
self.config(notification_driver=['log'])
diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py
index 2276e6b..e163a92 100644
--- a/oslo_messaging/tests/rpc/test_client.py
+++ b/oslo_messaging/tests/rpc/test_client.py
@@ -13,9 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
+from oslo_config import cfg
import testscenarios
-from oslo.config import cfg
import oslo_messaging
from oslo_messaging import exceptions
from oslo_messaging import serializer as msg_serializer
diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py
index 3970b95..7b14085 100644
--- a/oslo_messaging/tests/rpc/test_server.py
+++ b/oslo_messaging/tests/rpc/test_server.py
@@ -16,9 +16,9 @@
import threading
import mock
+from oslo_config import cfg
import testscenarios
-from oslo.config import cfg
import oslo_messaging
from oslo_messaging.tests import utils as test_utils
diff --git a/oslo_messaging/tests/test_amqp_driver.py b/oslo_messaging/tests/test_amqp_driver.py
index 213a832..a1f5ea5 100644
--- a/oslo_messaging/tests/test_amqp_driver.py
+++ b/oslo_messaging/tests/test_amqp_driver.py
@@ -20,10 +20,10 @@ import threading
import time
import uuid
+from oslo_utils import importutils
from six import moves
import testtools
-from oslo.utils import importutils
import oslo_messaging
from oslo_messaging.tests import utils as test_utils
diff --git a/oslo_messaging/tests/test_exception_serialization.py b/oslo_messaging/tests/test_exception_serialization.py
index 1d1de31..c1079c0 100644
--- a/oslo_messaging/tests/test_exception_serialization.py
+++ b/oslo_messaging/tests/test_exception_serialization.py
@@ -15,10 +15,10 @@
import sys
+from oslo_serialization import jsonutils
import six
import testscenarios
-from oslo.serialization import jsonutils
import oslo_messaging
from oslo_messaging._drivers import common as exceptions
from oslo_messaging.tests import utils as test_utils
diff --git a/oslo_messaging/tests/test_opts.py b/oslo_messaging/tests/test_opts.py
index 37b621f..e150cbc 100644
--- a/oslo_messaging/tests/test_opts.py
+++ b/oslo_messaging/tests/test_opts.py
@@ -29,13 +29,15 @@ class OptsTestCase(test_utils.BaseTestCase):
super(OptsTestCase, self).setUp()
def _test_list_opts(self, result):
- self.assertEqual(4, len(result))
+ self.assertEqual(6, len(result))
groups = [g for (g, l) in result]
self.assertIn(None, groups)
self.assertIn('matchmaker_ring', groups)
self.assertIn('matchmaker_redis', groups)
self.assertIn('oslo_messaging_amqp', groups)
+ self.assertIn('oslo_messaging_rabbit', groups)
+ self.assertIn('oslo_messaging_qpid', groups)
opt_names = [o.name for (g, l) in result for o in l]
self.assertIn('rpc_backend', opt_names)
diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py
index 53d991a..a09953a 100644
--- a/oslo_messaging/tests/test_transport.py
+++ b/oslo_messaging/tests/test_transport.py
@@ -14,12 +14,12 @@
# under the License.
import fixtures
+from oslo_config import cfg
import six
from six.moves import mox
from stevedore import driver
import testscenarios
-from oslo.config import cfg
import oslo_messaging
from oslo_messaging.tests import utils as test_utils
from oslo_messaging import transport
diff --git a/oslo_messaging/tests/test_utils.py b/oslo_messaging/tests/test_utils.py
index d57e32e..5754205 100644
--- a/oslo_messaging/tests/test_utils.py
+++ b/oslo_messaging/tests/test_utils.py
@@ -13,6 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
+import time
+
+import mock
+
from oslo_messaging._drivers import common
from oslo_messaging import _utils as utils
from oslo_messaging.tests import utils as test_utils
@@ -51,14 +55,28 @@ class VersionIsCompatibleTestCase(test_utils.BaseTestCase):
class TimerTestCase(test_utils.BaseTestCase):
- def test_duration_is_none(self):
+ def test_no_duration_no_callback(self):
t = common.DecayingTimer()
t.start()
- remaining = t.check_return(None)
+ remaining = t.check_return()
self.assertEqual(None, remaining)
- def test_duration_is_none_and_maximun_set(self):
+ def test_no_duration_but_maximun(self):
t = common.DecayingTimer()
t.start()
- remaining = t.check_return(None, maximum=2)
+ remaining = t.check_return(maximum=2)
self.assertEqual(2, remaining)
+
+ def test_duration_expired_no_callback(self):
+ t = common.DecayingTimer(2)
+ t._ends_at = time.time() - 10
+ remaining = t.check_return()
+ self.assertAlmostEqual(-10, remaining, 0)
+
+ def test_duration_callback(self):
+ t = common.DecayingTimer(2)
+ t._ends_at = time.time() - 10
+ callback = mock.Mock()
+ remaining = t.check_return(callback)
+ self.assertAlmostEqual(-10, remaining, 0)
+ callback.assert_called_once
diff --git a/oslo_messaging/tests/utils.py b/oslo_messaging/tests/utils.py
index f1b44a6..bfa73a8 100644
--- a/oslo_messaging/tests/utils.py
+++ b/oslo_messaging/tests/utils.py
@@ -19,12 +19,10 @@
"""Common utilities used in testing"""
-import six
-
-from oslo.config import cfg
-
+from oslo_config import cfg
from oslotest import base
from oslotest import moxstubout
+import six
TRUE_VALUES = ('true', '1', 'yes')
diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py
index 181cacb..5a8c389 100644
--- a/oslo_messaging/transport.py
+++ b/oslo_messaging/transport.py
@@ -27,11 +27,11 @@ __all__ = [
'set_transport_defaults',
]
+from oslo_config import cfg
import six
from six.moves.urllib import parse
from stevedore import driver
-from oslo.config import cfg
from oslo_messaging import exceptions
diff --git a/requirements-py3.txt b/requirements-py3.txt
index 2c55212..64f3cb8 100644
--- a/requirements-py3.txt
+++ b/requirements-py3.txt
@@ -5,7 +5,7 @@
oslo.config>=1.6.0 # Apache-2.0
oslo.serialization>=1.2.0 # Apache-2.0
oslo.utils>=1.2.0 # Apache-2.0
-oslo.i18n>=1.0.0 # Apache-2.0
+oslo.i18n>=1.3.0 # Apache-2.0
stevedore>=1.1.0 # Apache-2.0
# for jsonutils
diff --git a/requirements.txt b/requirements.txt
index be424f2..352b14a 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -7,7 +7,7 @@ pbr>=0.6,!=0.7,<1.0
oslo.config>=1.6.0 # Apache-2.0
oslo.utils>=1.2.0 # Apache-2.0
oslo.serialization>=1.2.0 # Apache-2.0
-oslo.i18n>=1.0.0 # Apache-2.0
+oslo.i18n>=1.3.0 # Apache-2.0
stevedore>=1.1.0 # Apache-2.0
# for jsonutils
@@ -16,7 +16,7 @@ six>=1.7.0
# FIXME(markmc): remove this when the drivers no longer
# import eventlet
-eventlet>=0.15.2
+eventlet>=0.16.1
# for the routing notifier
PyYAML>=3.1.0
diff --git a/setup.cfg b/setup.cfg
index 57dfb21..3f94cb9 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -44,6 +44,7 @@ oslo.messaging.drivers =
oslo.messaging.executors =
blocking = oslo_messaging._executors.impl_blocking:BlockingExecutor
eventlet = oslo_messaging._executors.impl_eventlet:EventletExecutor
+ threading = oslo_messaging._executors.impl_thread:ThreadExecutor
oslo.messaging.notify.drivers =
messagingv2 = oslo_messaging.notify._impl_messaging:MessagingV2Driver
@@ -77,3 +78,6 @@ input_file = oslo.messaging/locale/oslo.messaging.pot
keywords = _ gettext ngettext l_ lazy_gettext
mapping_file = babel.cfg
output_file = oslo.messaging/locale/oslo.messaging.pot
+
+[pbr]
+warnerrors = true
diff --git a/tests/drivers/test_impl_qpid.py b/tests/drivers/test_impl_qpid.py
index c8ae401..d8cd1e7 100644
--- a/tests/drivers/test_impl_qpid.py
+++ b/tests/drivers/test_impl_qpid.py
@@ -187,7 +187,8 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
def setUp(self):
super(TestQpidInvalidTopologyVersion, self).setUp()
- self.config(qpid_topology_version=-1)
+ self.config(qpid_topology_version=-1,
+ group='oslo_messaging_qpid')
def test_invalid_topology_version(self):
def consumer_callback(msg):
@@ -199,11 +200,11 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
# 1. qpid driver raises Exception(msg) for invalid topology version
# 2. flake8 - H202 assertRaises Exception too broad
exception_msg = ("Invalid value for qpid_topology_version: %d" %
- self.conf.qpid_topology_version)
+ self.conf.oslo_messaging_qpid.qpid_topology_version)
recvd_exc_msg = ''
try:
- self.consumer_cls(self.conf,
+ self.consumer_cls(self.conf.oslo_messaging_qpid,
self.session_receive,
msgid_or_topic,
consumer_callback,
@@ -215,7 +216,7 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
recvd_exc_msg = ''
try:
- self.publisher_cls(self.conf,
+ self.publisher_cls(self.conf.oslo_messaging_qpid,
self.session_send,
topic=msgid_or_topic,
**self.publisher_kwargs)
@@ -258,13 +259,15 @@ class TestQpidDirectConsumerPublisher(_QpidBaseTestCase):
self.msgid = str(random.randint(1, 100))
# create a DirectConsumer and DirectPublisher class objects
- self.dir_cons = qpid_driver.DirectConsumer(self.conf,
- self.session_receive,
- self.msgid,
- self.consumer_callback)
- self.dir_pub = qpid_driver.DirectPublisher(self.conf,
- self.session_send,
- self.msgid)
+ self.dir_cons = qpid_driver.DirectConsumer(
+ self.conf.oslo_messaging_qpid,
+ self.session_receive,
+ self.msgid,
+ self.consumer_callback)
+ self.dir_pub = qpid_driver.DirectPublisher(
+ self.conf.oslo_messaging_qpid,
+ self.session_send,
+ self.msgid)
def try_send_msg(no_msgs):
for i in range(no_msgs):
@@ -418,7 +421,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase):
def test_qpid_topic_and_fanout(self):
for receiver_id in range(self.no_receivers):
- consumer = self.consumer_cls(self.conf,
+ consumer = self.consumer_cls(self.conf.oslo_messaging_qpid,
self.session_receive,
self.receive_topic,
self.consumer_callback,
@@ -431,7 +434,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase):
self._receiver_threads.append(thread)
for sender_id in range(self.no_senders):
- publisher = self.publisher_cls(self.conf,
+ publisher = self.publisher_cls(self.conf.oslo_messaging_qpid,
self.session_send,
topic=self.topic,
**self.publisher_kwargs)
@@ -483,7 +486,8 @@ class TestDriverInterface(_QpidBaseTestCase):
def setUp(self):
super(TestDriverInterface, self).setUp()
- self.config(qpid_topology_version=2)
+ self.config(qpid_topology_version=2,
+ group='oslo_messaging_qpid')
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
@@ -554,7 +558,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase):
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
brokers_count = len(brokers)
- self.config(qpid_hosts=brokers)
+ self.config(qpid_hosts=brokers,
+ group='oslo_messaging_qpid')
with mock.patch('qpid.messaging.Connection') as conn_mock:
# starting from the first broker in the list
@@ -777,7 +782,8 @@ class QPidHATestCase(test_utils.BaseTestCase):
self.config(qpid_hosts=self.brokers,
qpid_username=None,
- qpid_password=None)
+ qpid_password=None,
+ group='oslo_messaging_qpid')
hostname_sets = set()
self.info = {'attempt': 0,
diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py
index 3dfbf13..8e9b29e 100644
--- a/tests/drivers/test_impl_rabbit.py
+++ b/tests/drivers/test_impl_rabbit.py
@@ -41,7 +41,7 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase):
super(TestDeprecatedRabbitDriverLoad, self).setUp(
conf=cfg.ConfigOpts())
self.messaging_conf.transport_driver = 'rabbit'
- self.config(fake_rabbit=True)
+ self.config(fake_rabbit=True, group="oslo_messaging_rabbit")
def test_driver_load(self):
transport = messaging.get_transport(self.conf)
@@ -259,8 +259,27 @@ class TestSendReceive(test_utils.BaseTestCase):
raise ZeroDivisionError
except Exception:
failure = sys.exc_info()
- msgs[i].reply(failure=failure,
- log_failure=not self.expected)
+
+ # NOTE(noelbk) confirm that Publisher exchanges
+ # are always declared with passive=True
+ outer_self = self
+ test_exchange_was_called = [False]
+ old_init = kombu.entity.Exchange.__init__
+
+ def new_init(self, *args, **kwargs):
+ test_exchange_was_called[0] = True
+ outer_self.assertTrue(kwargs['passive'])
+ old_init(self, *args, **kwargs)
+ kombu.entity.Exchange.__init__ = new_init
+
+ try:
+ msgs[i].reply(failure=failure,
+ log_failure=not self.expected)
+ finally:
+ kombu.entity.Exchange.__init__ = old_init
+
+ self.assertTrue(test_exchange_was_called[0])
+
elif self.rx_id:
msgs[i].reply({'rx_id': i})
else:
@@ -679,7 +698,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
self.config(rabbit_hosts=self.brokers,
rabbit_retry_interval=0.01,
rabbit_retry_backoff=0.01,
- kombu_reconnect_delay=0)
+ kombu_reconnect_delay=0,
+ group="oslo_messaging_rabbit")
self.kombu_connect = mock.Mock()
self.useFixture(mockpatch.Patch(
diff --git a/tests/drivers/test_impl_zmq.py b/tests/drivers/test_impl_zmq.py
index 30277c9..091ebe8 100644
--- a/tests/drivers/test_impl_zmq.py
+++ b/tests/drivers/test_impl_zmq.py
@@ -42,34 +42,12 @@ def get_unused_port():
return port
-class TestConfZmqDriverLoad(test_utils.BaseTestCase):
-
- @testtools.skipIf(impl_zmq is None, "zmq not available")
- def setUp(self):
- super(TestConfZmqDriverLoad, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
-
- def test_driver_load(self):
- transport = messaging.get_transport(self.conf)
- self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
-
-
-class stopRpc(object):
- def __init__(self, attrs):
- self.attrs = attrs
-
- def __call__(self):
- if self.attrs['reactor']:
- self.attrs['reactor'].close()
- if self.attrs['driver']:
- self.attrs['driver'].cleanup()
-
-
-class TestZmqBasics(test_utils.BaseTestCase):
+class ZmqBaseTestCase(test_utils.BaseTestCase):
+ """Base test case for all ZMQ tests that make use of the ZMQ Proxy"""
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
- super(TestZmqBasics, self).setUp()
+ super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
@@ -92,6 +70,32 @@ class TestZmqBasics(test_utils.BaseTestCase):
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
+
+class TestConfZmqDriverLoad(test_utils.BaseTestCase):
+
+ @testtools.skipIf(impl_zmq is None, "zmq not available")
+ def setUp(self):
+ super(TestConfZmqDriverLoad, self).setUp()
+ self.messaging_conf.transport_driver = 'zmq'
+
+ def test_driver_load(self):
+ transport = messaging.get_transport(self.conf)
+ self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
+
+
+class stopRpc(object):
+ def __init__(self, attrs):
+ self.attrs = attrs
+
+ def __call__(self):
+ if self.attrs['reactor']:
+ self.attrs['reactor'].close()
+ if self.attrs['driver']:
+ self.attrs['driver'].cleanup()
+
+
+class TestZmqBasics(ZmqBaseTestCase):
+
def test_start_stop_listener(self):
target = messaging.Target(topic='testtopic')
listener = self.driver.listen(target)
@@ -275,32 +279,7 @@ class TestZmqIncomingMessage(test_utils.BaseTestCase):
msg.requeue()
-class TestZmqConnection(test_utils.BaseTestCase):
-
- @testtools.skipIf(impl_zmq is None, "zmq not available")
- def setUp(self):
- super(TestZmqConnection, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
- # Get driver
- transport = messaging.get_transport(self.conf)
- self.driver = transport._driver
-
- # Set config values
- self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
- kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
- 'rpc_zmq_host': '127.0.0.1',
- 'rpc_response_timeout': 5,
- 'rpc_zmq_port': get_unused_port(),
- 'rpc_zmq_ipc_dir': self.internal_ipc_dir}
- self.config(**kwargs)
-
- # Start RPC
- LOG.info("Running internal zmq receiver.")
- self.reactor = impl_zmq.ZmqProxy(self.conf)
- self.reactor.consume_in_thread()
-
- self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
- self.addCleanup(stopRpc(self.__dict__))
+class TestZmqConnection(ZmqBaseTestCase):
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer(self, mock_reactor):
@@ -377,32 +356,7 @@ class TestZmqConnection(test_utils.BaseTestCase):
self.assertTrue(conn.reactor.consume_in_thread.called)
-class TestZmqListener(test_utils.BaseTestCase):
-
- @testtools.skipIf(impl_zmq is None, "zmq not available")
- def setUp(self):
- super(TestZmqListener, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
- # Get driver
- transport = messaging.get_transport(self.conf)
- self.driver = transport._driver
-
- # Set config values
- self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
- kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
- 'rpc_zmq_host': '127.0.0.1',
- 'rpc_response_timeout': 5,
- 'rpc_zmq_port': get_unused_port(),
- 'rpc_zmq_ipc_dir': self.internal_ipc_dir}
- self.config(**kwargs)
-
- # Start RPC
- LOG.info("Running internal zmq receiver.")
- self.reactor = impl_zmq.ZmqProxy(self.conf)
- self.reactor.consume_in_thread()
-
- self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
- self.addCleanup(stopRpc(self.__dict__))
+class TestZmqListener(ZmqBaseTestCase):
def test_zmqlistener_no_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
@@ -422,32 +376,7 @@ class TestZmqListener(test_utils.BaseTestCase):
self.assertEqual(resp.message, msg)
-class TestZmqDriver(test_utils.BaseTestCase):
-
- @testtools.skipIf(impl_zmq is None, "zmq not available")
- def setUp(self):
- super(TestZmqDriver, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
- # Get driver
- transport = messaging.get_transport(self.conf)
- self.driver = transport._driver
-
- # Set config values
- self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
- kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
- 'rpc_zmq_host': '127.0.0.1',
- 'rpc_response_timeout': 5,
- 'rpc_zmq_port': get_unused_port(),
- 'rpc_zmq_ipc_dir': self.internal_ipc_dir}
- self.config(**kwargs)
-
- # Start RPC
- LOG.info("Running internal zmq receiver.")
- self.reactor = impl_zmq.ZmqProxy(self.conf)
- self.reactor.consume_in_thread()
-
- self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
- self.addCleanup(stopRpc(self.__dict__))
+class TestZmqDriver(ZmqBaseTestCase):
@mock.patch('oslo.messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo.messaging._drivers.impl_zmq._multi_send', autospec=True)
diff --git a/tests/drivers/test_matchmaker_redis.py b/tests/drivers/test_matchmaker_redis.py
index 95bde9e..a36e14a 100644
--- a/tests/drivers/test_matchmaker_redis.py
+++ b/tests/drivers/test_matchmaker_redis.py
@@ -46,6 +46,7 @@ class RedisMatchMakerTest(test_utils.BaseTestCase):
"network": ["controller1", "node1", "node2", "node3"],
"cert": ["controller1"],
"console": ["controller1"],
+ "l3_agent.node1": ["node1"],
"consoleauth": ["controller1"]}
self.matcher = matchmaker_redis.MatchMakerRedis()
self.populate()
@@ -70,6 +71,10 @@ class RedisMatchMakerTest(test_utils.BaseTestCase):
self.assertEqual(
sorted(self.matcher.redis.smembers('cert')),
['cert.controller1', 'cert.keymaster'])
+ self.matcher.register('l3_agent.node1', 'node1')
+ self.assertEqual(
+ sorted(self.matcher.redis.smembers('l3_agent.node1')),
+ ['l3_agent.node1.node1'])
def test_unregister(self):
self.matcher.unregister('conductor', 'controller1')
diff --git a/tests/functional/test_functional.py b/tests/functional/test_functional.py
deleted file mode 100644
index 5c40774..0000000
--- a/tests/functional/test_functional.py
+++ /dev/null
@@ -1,279 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo import messaging
-
-from testtools import matchers
-
-from tests.functional.utils import ClientStub
-from tests.functional.utils import IsValidDistributionOf
-from tests.functional.utils import NotificationFixture
-from tests.functional.utils import RpcServerGroupFixture
-from tests.functional.utils import SkipIfNoTransportURL
-from tests.functional.utils import TransportFixture
-
-
-class CallTestCase(SkipIfNoTransportURL):
- def test_specific_server(self):
- group = self.useFixture(RpcServerGroupFixture(self.url))
- client = group.client(1)
- client.append(text='open')
- self.assertEqual('openstack', client.append(text='stack'))
- client.add(increment=2)
- self.assertEqual(12, client.add(increment=10))
- self.assertEqual(9, client.subtract(increment=3))
- self.assertEqual('openstack', group.servers[1].endpoint.sval)
- self.assertEqual(9, group.servers[1].endpoint.ival)
- for i in [0, 2]:
- self.assertEqual('', group.servers[i].endpoint.sval)
- self.assertEqual(0, group.servers[i].endpoint.ival)
-
- def test_server_in_group(self):
- group = self.useFixture(RpcServerGroupFixture(self.url))
-
- client = group.client()
- data = [c for c in 'abcdefghijklmn']
- for i in data:
- client.append(text=i)
-
- for s in group.servers:
- self.assertThat(len(s.endpoint.sval), matchers.GreaterThan(0))
- actual = [[c for c in s.endpoint.sval] for s in group.servers]
- self.assertThat(actual, IsValidDistributionOf(data))
-
- def test_different_exchanges(self):
- t = self.useFixture(TransportFixture(self.url))
- # If the different exchanges are not honoured, then the
- # teardown may hang unless we broadcast all control messages
- # to each server
- group1 = self.useFixture(RpcServerGroupFixture(self.url, transport=t,
- use_fanout_ctrl=True))
- group2 = self.useFixture(RpcServerGroupFixture(self.url, exchange="a",
- transport=t,
- use_fanout_ctrl=True))
- group3 = self.useFixture(RpcServerGroupFixture(self.url, exchange="b",
- transport=t,
- use_fanout_ctrl=True))
-
- client1 = group1.client(1)
- data1 = [c for c in 'abcdefghijklmn']
- for i in data1:
- client1.append(text=i)
-
- client2 = group2.client()
- data2 = [c for c in 'opqrstuvwxyz']
- for i in data2:
- client2.append(text=i)
-
- actual1 = [[c for c in s.endpoint.sval] for s in group1.servers]
- self.assertThat(actual1, IsValidDistributionOf(data1))
- actual1 = [c for c in group1.servers[1].endpoint.sval]
- self.assertThat([actual1], IsValidDistributionOf(data1))
- for s in group1.servers:
- expected = len(data1) if group1.servers.index(s) == 1 else 0
- self.assertEqual(expected, len(s.endpoint.sval))
- self.assertEqual(0, s.endpoint.ival)
-
- actual2 = [[c for c in s.endpoint.sval] for s in group2.servers]
- for s in group2.servers:
- self.assertThat(len(s.endpoint.sval), matchers.GreaterThan(0))
- self.assertEqual(0, s.endpoint.ival)
- self.assertThat(actual2, IsValidDistributionOf(data2))
-
- for s in group3.servers:
- self.assertEqual(0, len(s.endpoint.sval))
- self.assertEqual(0, s.endpoint.ival)
-
- def test_timeout(self):
- transport = self.useFixture(TransportFixture(self.url))
- target = messaging.Target(topic="no_such_topic")
- c = ClientStub(transport.transport, target, timeout=1)
- self.assertThat(c.ping, matchers.raises(messaging.MessagingTimeout))
-
- def test_exception(self):
- group = self.useFixture(RpcServerGroupFixture(self.url))
- client = group.client(1)
- client.add(increment=2)
- f = lambda: client.subtract(increment=3)
- self.assertThat(f, matchers.raises(ValueError))
-
-
-class CastTestCase(SkipIfNoTransportURL):
- # Note: casts return immediately, so these tests utilise a special
- # internal sync() cast to ensure prior casts are complete before
- # making the necessary assertions.
-
- def test_specific_server(self):
- group = self.useFixture(RpcServerGroupFixture(self.url))
- client = group.client(1, cast=True)
- client.append(text='open')
- client.append(text='stack')
- client.add(increment=2)
- client.add(increment=10)
- group.sync()
-
- self.assertEqual('openstack', group.servers[1].endpoint.sval)
- self.assertEqual(12, group.servers[1].endpoint.ival)
- for i in [0, 2]:
- self.assertEqual('', group.servers[i].endpoint.sval)
- self.assertEqual(0, group.servers[i].endpoint.ival)
-
- def test_server_in_group(self):
- group = self.useFixture(RpcServerGroupFixture(self.url))
- client = group.client(cast=True)
- for i in range(20):
- client.add(increment=1)
- group.sync()
- total = 0
- for s in group.servers:
- ival = s.endpoint.ival
- self.assertThat(ival, matchers.GreaterThan(0))
- self.assertThat(ival, matchers.LessThan(20))
- total += ival
- self.assertEqual(20, total)
-
- def test_fanout(self):
- group = self.useFixture(RpcServerGroupFixture(self.url))
- client = group.client('all', cast=True)
- client.append(text='open')
- client.append(text='stack')
- client.add(increment=2)
- client.add(increment=10)
- group.sync(server='all')
- for s in group.servers:
- self.assertEqual('openstack', s.endpoint.sval)
- self.assertEqual(12, s.endpoint.ival)
-
-
-class NotifyTestCase(SkipIfNoTransportURL):
- # NOTE(sileht): Each test must not use the same topics
- # to be run in parallel
-
- def test_simple(self):
- transport = self.useFixture(TransportFixture(self.url))
- listener = self.useFixture(NotificationFixture(transport.transport,
- ['test_simple']))
- transport.wait()
- notifier = listener.notifier('abc')
-
- notifier.info({}, 'test', 'Hello World!')
- event = listener.events.get(timeout=1)
- self.assertEqual('info', event[0])
- self.assertEqual('test', event[1])
- self.assertEqual('Hello World!', event[2])
- self.assertEqual('abc', event[3])
-
- def test_multiple_topics(self):
- transport = self.useFixture(TransportFixture(self.url))
- listener = self.useFixture(NotificationFixture(transport.transport,
- ['a', 'b']))
- transport.wait()
- a = listener.notifier('pub-a', topic='a')
- b = listener.notifier('pub-b', topic='b')
-
- sent = {
- 'pub-a': [a, 'test-a', 'payload-a'],
- 'pub-b': [b, 'test-b', 'payload-b']
- }
- for e in sent.values():
- e[0].info({}, e[1], e[2])
-
- received = {}
- while len(received) < len(sent):
- e = listener.events.get(timeout=1)
- received[e[3]] = e
-
- for key in received:
- actual = received[key]
- expected = sent[key]
- self.assertEqual('info', actual[0])
- self.assertEqual(expected[1], actual[1])
- self.assertEqual(expected[2], actual[2])
-
- def test_multiple_servers(self):
- transport = self.useFixture(TransportFixture(self.url))
- listener_a = self.useFixture(NotificationFixture(transport.transport,
- ['test-topic']))
- listener_b = self.useFixture(NotificationFixture(transport.transport,
- ['test-topic']))
- transport.wait()
- n = listener_a.notifier('pub')
-
- events_out = [('test-%s' % c, 'payload-%s' % c) for c in 'abcdefgh']
-
- for event_type, payload in events_out:
- n.info({}, event_type, payload)
-
- events_in = [[(e[1], e[2]) for e in listener_a.get_events()],
- [(e[1], e[2]) for e in listener_b.get_events()]]
-
- self.assertThat(events_in, IsValidDistributionOf(events_out))
- for stream in events_in:
- self.assertThat(len(stream), matchers.GreaterThan(0))
-
- def test_independent_topics(self):
- transport = self.useFixture(TransportFixture(self.url))
- listener_a = self.useFixture(NotificationFixture(transport.transport,
- ['1']))
- listener_b = self.useFixture(NotificationFixture(transport.transport,
- ['2']))
- transport.wait()
-
- a = listener_a.notifier('pub-1', topic='1')
- b = listener_b.notifier('pub-2', topic='2')
-
- a_out = [('test-1-%s' % c, 'payload-1-%s' % c) for c in 'abcdefgh']
- for event_type, payload in a_out:
- a.info({}, event_type, payload)
-
- b_out = [('test-2-%s' % c, 'payload-2-%s' % c) for c in 'ijklmnop']
- for event_type, payload in b_out:
- b.info({}, event_type, payload)
-
- for expected in a_out:
- actual = listener_a.events.get(timeout=0.5)
- self.assertEqual('info', actual[0])
- self.assertEqual(expected[0], actual[1])
- self.assertEqual(expected[1], actual[2])
- self.assertEqual('pub-1', actual[3])
-
- for expected in b_out:
- actual = listener_b.events.get(timeout=0.5)
- self.assertEqual('info', actual[0])
- self.assertEqual(expected[0], actual[1])
- self.assertEqual(expected[1], actual[2])
- self.assertEqual('pub-2', actual[3])
-
- def test_all_categories(self):
- transport = self.useFixture(TransportFixture(self.url))
- listener = self.useFixture(NotificationFixture(
- transport.transport, ['test_all_categories']))
- transport.wait()
- n = listener.notifier('abc')
-
- cats = ['debug', 'audit', 'info', 'warn', 'error', 'critical']
- events = [(getattr(n, c), c, 'type-' + c, c + '-data') for c in cats]
- for e in events:
- e[0]({}, e[2], e[3])
-
- # order between events with different categories is not guaranteed
- received = {}
- for expected in events:
- e = listener.events.get(timeout=0.5)
- received[e[0]] = e
-
- for expected in events:
- actual = received[expected[1]]
- self.assertEqual(expected[1], actual[0])
- self.assertEqual(expected[2], actual[1])
- self.assertEqual(expected[3], actual[2])
diff --git a/tests/functional/utils.py b/tests/functional/utils.py
deleted file mode 100644
index fc0c780..0000000
--- a/tests/functional/utils.py
+++ /dev/null
@@ -1,343 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import os
-import threading
-import time
-import uuid
-
-import fixtures
-from six import moves
-
-from oslo.config import cfg
-from oslo import messaging
-from oslo.messaging.notify import notifier
-from oslo_messaging.tests import utils as test_utils
-
-
-class TestServerEndpoint(object):
- """This MessagingServer that will be used during functional testing."""
-
- def __init__(self):
- self.ival = 0
- self.sval = ''
-
- def add(self, ctxt, increment):
- self.ival += increment
- return self.ival
-
- def subtract(self, ctxt, increment):
- if self.ival < increment:
- raise ValueError("ival can't go negative!")
- self.ival -= increment
- return self.ival
-
- def append(self, ctxt, text):
- self.sval += text
- return self.sval
-
-
-class TransportFixture(fixtures.Fixture):
- """Fixture defined to setup the oslo.messaging transport."""
-
- def __init__(self, url):
- self.url = url
-
- def setUp(self):
- super(TransportFixture, self).setUp()
- self.transport = messaging.get_transport(cfg.CONF, url=self.url)
-
- def cleanUp(self):
- self.transport.cleanup()
- super(TransportFixture, self).cleanUp()
-
- def wait(self):
- if self.url.startswith("rabbit") or self.url.startswith("qpid"):
- time.sleep(0.5)
-
-
-class RpcServerFixture(fixtures.Fixture):
- """Fixture to setup the TestServerEndpoint."""
-
- def __init__(self, transport, target, endpoint=None, ctrl_target=None):
- super(RpcServerFixture, self).__init__()
- self.transport = transport
- self.target = target
- self.endpoint = endpoint or TestServerEndpoint()
- self.syncq = moves.queue.Queue()
- self.ctrl_target = ctrl_target or self.target
-
- def setUp(self):
- super(RpcServerFixture, self).setUp()
- endpoints = [self.endpoint, self]
- self.server = messaging.get_rpc_server(self.transport,
- self.target,
- endpoints)
- self._ctrl = messaging.RPCClient(self.transport, self.ctrl_target)
- self._start()
-
- def cleanUp(self):
- self._stop()
- super(RpcServerFixture, self).cleanUp()
-
- def _start(self):
- self.thread = threading.Thread(target=self.server.start)
- self.thread.daemon = True
- self.thread.start()
-
- def _stop(self):
- self.server.stop()
- self._ctrl.cast({}, 'ping')
- self.server.wait()
- self.thread.join()
-
- def ping(self, ctxt):
- pass
-
- def sync(self, ctxt, item):
- self.syncq.put(item)
-
-
-class RpcServerGroupFixture(fixtures.Fixture):
- def __init__(self, url, topic=None, names=None, exchange=None,
- transport=None, use_fanout_ctrl=False):
- self.url = url
- # NOTE(sileht): topic and servier_name must be uniq
- # to be able to run all tests in parallel
- self.topic = topic or str(uuid.uuid4())
- self.names = names or ["server_%i_%s" % (i, uuid.uuid4())
- for i in range(3)]
- self.exchange = exchange
- self.targets = [self._target(server=n) for n in self.names]
- self.transport = transport
- self.use_fanout_ctrl = use_fanout_ctrl
-
- def setUp(self):
- super(RpcServerGroupFixture, self).setUp()
- if not self.transport:
- self.transport = self.useFixture(TransportFixture(self.url))
- self.servers = [self.useFixture(self._server(t)) for t in self.targets]
- self.transport.wait()
-
- def _target(self, server=None, fanout=False):
- t = messaging.Target(exchange=self.exchange, topic=self.topic)
- t.server = server
- t.fanout = fanout
- return t
-
- def _server(self, target):
- ctrl = None
- if self.use_fanout_ctrl:
- ctrl = self._target(fanout=True)
- return RpcServerFixture(self.transport.transport, target,
- ctrl_target=ctrl)
-
- def client(self, server=None, cast=False):
- if server:
- if server == 'all':
- target = self._target(fanout=True)
- elif server >= 0 and server < len(self.targets):
- target = self.targets[server]
- else:
- raise ValueError("Invalid value for server: %r" % server)
- else:
- target = self._target()
- return ClientStub(self.transport.transport, target, cast=cast,
- timeout=5)
-
- def sync(self, server=None):
- if server:
- if server == 'all':
- c = self.client(server='all', cast=True)
- c.sync(item='x')
- for s in self.servers:
- s.syncq.get(timeout=5)
- elif server >= 0 and server < len(self.targets):
- c = self.client(server=server, cast=True)
- c.sync(item='x')
- self.servers[server].syncq.get(timeout=5)
- else:
- raise ValueError("Invalid value for server: %r" % server)
- else:
- for i in range(len(self.servers)):
- self.client(i).ping()
-
-
-class RpcCall(object):
- def __init__(self, client, method, context):
- self.client = client
- self.method = method
- self.context = context
-
- def __call__(self, **kwargs):
- self.context['time'] = time.ctime()
- self.context['cast'] = False
- result = self.client.call(self.context, self.method, **kwargs)
- return result
-
-
-class RpcCast(RpcCall):
- def __call__(self, **kwargs):
- self.context['time'] = time.ctime()
- self.context['cast'] = True
- self.client.cast(self.context, self.method, **kwargs)
-
-
-class ClientStub(object):
- def __init__(self, transport, target, cast=False, name=None, **kwargs):
- self.name = name or "functional-tests"
- self.cast = cast
- self.client = messaging.RPCClient(transport, target, **kwargs)
-
- def __getattr__(self, name):
- context = {"application": self.name}
- if self.cast:
- return RpcCast(self.client, name, context)
- else:
- return RpcCall(self.client, name, context)
-
-
-class InvalidDistribution(object):
- def __init__(self, original, received):
- self.original = original
- self.received = received
- self.missing = []
- self.extra = []
- self.wrong_order = []
-
- def describe(self):
- text = "Sent %s, got %s; " % (self.original, self.received)
- e1 = ["%r was missing" % m for m in self.missing]
- e2 = ["%r was not expected" % m for m in self.extra]
- e3 = ["%r expected before %r" % (m[0], m[1]) for m in self.wrong_order]
- return text + ", ".join(e1 + e2 + e3)
-
- def __len__(self):
- return len(self.extra) + len(self.missing) + len(self.wrong_order)
-
- def get_details(self):
- return {}
-
-
-class IsValidDistributionOf(object):
- """Test whether a given list can be split into particular
- sub-lists. All items in the original list must be in exactly one
- sub-list, and must appear in that sub-list in the same order with
- respect to any other items as in the original list.
- """
- def __init__(self, original):
- self.original = original
-
- def __str__(self):
- return 'IsValidDistribution(%s)' % self.original
-
- def match(self, actual):
- errors = InvalidDistribution(self.original, actual)
- received = [[i for i in l] for l in actual]
-
- def _remove(obj, lists):
- for l in lists:
- if obj in l:
- front = l[0]
- l.remove(obj)
- return front
- return None
-
- for item in self.original:
- o = _remove(item, received)
- if not o:
- errors.missing += item
- elif item != o:
- errors.wrong_order.append([item, o])
- for l in received:
- errors.extra += l
- return errors or None
-
-
-class SkipIfNoTransportURL(test_utils.BaseTestCase):
- def setUp(self):
- super(SkipIfNoTransportURL, self).setUp()
- self.url = os.environ.get('TRANSPORT_URL')
- if not self.url:
- self.skipTest("No transport url configured")
-
-
-class NotificationFixture(fixtures.Fixture):
- def __init__(self, transport, topics):
- super(NotificationFixture, self).__init__()
- self.transport = transport
- self.topics = topics
- self.events = moves.queue.Queue()
- self.name = str(id(self))
-
- def setUp(self):
- super(NotificationFixture, self).setUp()
- targets = [messaging.Target(topic=t) for t in self.topics]
- # add a special topic for internal notifications
- targets.append(messaging.Target(topic=self.name))
- self.server = messaging.get_notification_listener(self.transport,
- targets,
- [self])
- self._ctrl = self.notifier('internal', topic=self.name)
- self._start()
-
- def cleanUp(self):
- self._stop()
- super(NotificationFixture, self).cleanUp()
-
- def _start(self):
- self.thread = threading.Thread(target=self.server.start)
- self.thread.daemon = True
- self.thread.start()
-
- def _stop(self):
- self.server.stop()
- self._ctrl.sample({}, 'shutdown', 'shutdown')
- self.server.wait()
- self.thread.join()
-
- def notifier(self, publisher, topic=None):
- return notifier.Notifier(self.transport,
- publisher,
- driver='messaging',
- topic=topic or self.topics[0])
-
- def debug(self, ctxt, publisher, event_type, payload, metadata):
- self.events.put(['debug', event_type, payload, publisher])
-
- def audit(self, ctxt, publisher, event_type, payload, metadata):
- self.events.put(['audit', event_type, payload, publisher])
-
- def info(self, ctxt, publisher, event_type, payload, metadata):
- self.events.put(['info', event_type, payload, publisher])
-
- def warn(self, ctxt, publisher, event_type, payload, metadata):
- self.events.put(['warn', event_type, payload, publisher])
-
- def error(self, ctxt, publisher, event_type, payload, metadata):
- self.events.put(['error', event_type, payload, publisher])
-
- def critical(self, ctxt, publisher, event_type, payload, metadata):
- self.events.put(['critical', event_type, payload, publisher])
-
- def sample(self, ctxt, publisher, event_type, payload, metadata):
- pass # Just used for internal shutdown control
-
- def get_events(self, timeout=0.5):
- results = []
- try:
- while True:
- results.append(self.events.get(timeout=timeout))
- except moves.queue.Empty:
- pass
- return results
diff --git a/tests/notify/test_dispatcher.py b/tests/notify/test_dispatcher.py
index f2e32ac..adb8610 100644
--- a/tests/notify/test_dispatcher.py
+++ b/tests/notify/test_dispatcher.py
@@ -16,11 +16,11 @@
import itertools
import mock
+from oslo_utils import timeutils
import testscenarios
from oslo import messaging
from oslo.messaging.notify import dispatcher as notify_dispatcher
-from oslo.utils import timeutils
from oslo_messaging.tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
diff --git a/tests/notify/test_logger.py b/tests/notify/test_logger.py
index 1f770bd..6d1757a 100644
--- a/tests/notify/test_logger.py
+++ b/tests/notify/test_logger.py
@@ -19,11 +19,11 @@ import os
import sys
import mock
+from oslo_utils import timeutils
import testscenarios
import testtools
from oslo import messaging
-from oslo.utils import timeutils
import oslo_messaging
from oslo_messaging.tests.notify import test_notifier
from oslo_messaging.tests import utils as test_utils
@@ -57,7 +57,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
# eventlet
logging.logThreads = 0
- @mock.patch('oslo.utils.timeutils.utcnow')
+ @mock.patch('oslo_utils.timeutils.utcnow')
def test_logger(self, mock_utcnow):
with mock.patch('oslo_messaging.transport.get_transport',
return_value=test_notifier._FakeTransport(self.conf)):
@@ -100,7 +100,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
@testtools.skipUnless(hasattr(logging.config, 'dictConfig'),
"Need logging.config.dictConfig (Python >= 2.7)")
- @mock.patch('oslo.utils.timeutils.utcnow')
+ @mock.patch('oslo_utils.timeutils.utcnow')
def test_logging_conf(self, mock_utcnow):
with mock.patch('oslo_messaging.transport.get_transport',
return_value=test_notifier._FakeTransport(self.conf)):
diff --git a/tests/notify/test_notifier.py b/tests/notify/test_notifier.py
index e7306a7..e326ad8 100644
--- a/tests/notify/test_notifier.py
+++ b/tests/notify/test_notifier.py
@@ -20,6 +20,8 @@ import uuid
import fixtures
import mock
+from oslo_serialization import jsonutils
+from oslo_utils import timeutils
from stevedore import dispatch
from stevedore import extension
import testscenarios
@@ -27,8 +29,6 @@ import yaml
from oslo import messaging
from oslo.messaging import serializer as msg_serializer
-from oslo.serialization import jsonutils
-from oslo.utils import timeutils
from oslo_messaging.notify import _impl_log
from oslo_messaging.notify import _impl_messaging
from oslo_messaging.notify import _impl_test
@@ -147,7 +147,7 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
self.stubs.Set(_impl_messaging, 'LOG', self.logger)
self.stubs.Set(msg_notifier, '_LOG', self.logger)
- @mock.patch('oslo.utils.timeutils.utcnow')
+ @mock.patch('oslo_utils.timeutils.utcnow')
def test_notifier(self, mock_utcnow):
drivers = []
if self.v1:
@@ -223,7 +223,7 @@ class TestSerializer(test_utils.BaseTestCase):
super(TestSerializer, self).setUp()
self.addCleanup(_impl_test.reset)
- @mock.patch('oslo.utils.timeutils.utcnow')
+ @mock.patch('oslo_utils.timeutils.utcnow')
def test_serializer(self, mock_utcnow):
transport = _FakeTransport(self.conf)
@@ -266,7 +266,7 @@ class TestSerializer(test_utils.BaseTestCase):
class TestLogNotifier(test_utils.BaseTestCase):
- @mock.patch('oslo.utils.timeutils.utcnow')
+ @mock.patch('oslo_utils.timeutils.utcnow')
def test_notifier(self, mock_utcnow):
self.config(notification_driver=['log'])
diff --git a/tox.ini b/tox.ini
index 428b051..7d2ad97 100644
--- a/tox.ini
+++ b/tox.ini
@@ -32,20 +32,25 @@ deps = -r{toxinidir}/requirements-py3.txt
-r{toxinidir}/test-requirements-py3.txt
[testenv:py27-func-qpid]
-setenv = TRANSPORT_URL=qpid://guest:password@localhost//
-commands = python setup.py testr --slowest --testr-args='tests.functional'
+setenv = TRANSPORT_URL=qpid://stackqpid:secretqpid@127.0.0.1//
+commands = python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
[testenv:py27-func-rabbit]
-setenv = TRANSPORT_URL=rabbit://guest:password@localhost//
-commands = python setup.py testr --slowest --testr-args='tests.functional'
+setenv = TRANSPORT_URL=rabbit://stackrabbit:secretrabbit@127.0.0.1//
+commands = python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
[testenv:py27-func-amqp1]
-setenv = TRANSPORT_URL=amqp://guest:password@localhost//
+setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1//
deps = -r{toxinidir}/amqp1-requirements.txt
{[testenv]deps}
# NOTE(sileht): until ubuntu get proto packages, we run amqp_driver tests here
# because this is the only target to run fedora 20 in gate
-commands = python setup.py testr --slowest --testr-args='tests.(functional|test_amqp_driver)'
+commands = python setup.py testr --slowest --testr-args='oslo_messaging.tests.(functional|test_amqp_driver)'
+
+[testenv:py27-func-zeromq]
+setenv = TRANSPORT_URL=zmq://
+ ZMQ_MATCHMAKER=oslo_messaging._drivers.matchmaker_redis.MatchMakerRedis
+commands = python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
[flake8]
show-source = True