summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-03-25 00:22:11 +0000
committerGerrit Code Review <review@openstack.org>2015-03-25 00:22:11 +0000
commit77c3604b406af36b2ca9efaa0a95a17e0239389d (patch)
tree27bbb8af9b28ce846832896dda677e0feebb84e5
parent9b14d1aa8aab0209b48cd706b771b9b55d0fece2 (diff)
parent64bdd80c5fe4d53ac8d7ab3ed906ec9feaeb7ec4 (diff)
downloadoslo-messaging-77c3604b406af36b2ca9efaa0a95a17e0239389d.tar.gz
Merge "rabbit: heartbeat implementation" into stable/kilo
-rw-r--r--oslo_messaging/_drivers/amqp.py32
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py15
-rw-r--r--oslo_messaging/_drivers/impl_qpid.py2
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py264
-rw-r--r--oslo_messaging/tests/drivers/test_impl_qpid.py7
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py122
-rw-r--r--tests/drivers/test_impl_qpid.py7
-rw-r--r--tests/drivers/test_impl_rabbit.py14
8 files changed, 424 insertions, 39 deletions
diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py
index ebae514..0d3dc51 100644
--- a/oslo_messaging/_drivers/amqp.py
+++ b/oslo_messaging/_drivers/amqp.py
@@ -55,6 +55,26 @@ amqp_opts = [
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
+# NOTE(sileht): Even if rabbit/qpid have only one Connection class,
+# this connection can be used for two purposes:
+# * wait and receive amqp messages (only do read stuffs on the socket)
+# * send messages to the broker (only do write stuffs on the socket)
+# The code inside a connection class is not concurrency safe.
+# Using one Connection class instance for doing both, will result
+# of eventlet complaining of multiple greenthreads that read/write the
+# same fd concurrently... because 'send' and 'listen' run in different
+# greenthread.
+# So, a connection cannot be shared between thread/greenthread and
+# this two variables permit to define the purpose of the connection
+# to allow drivers to add special handling if needed (like heatbeat).
+# amqp drivers create 3 kind of connections:
+# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection
+# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used
+# * driver internally have another 'PURPOSE_LISTEN' connection dedicated
+# to wait replies of rpc call
+PURPOSE_LISTEN = 'listen'
+PURPOSE_SEND = 'send'
+
class ConnectionPool(pool.Pool):
"""Class that implements a Pool of Connections."""
@@ -66,9 +86,11 @@ class ConnectionPool(pool.Pool):
self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
- def create(self):
+ def create(self, purpose=None):
+ if purpose is None:
+ purpose = PURPOSE_SEND
LOG.debug('Pool creating new connection')
- return self.connection_cls(self.conf, self.url)
+ return self.connection_cls(self.conf, self.url, purpose)
def empty(self):
for item in self.iter_free():
@@ -87,16 +109,18 @@ class ConnectionContext(rpc_common.Connection):
If possible the function makes sure to return a connection to the pool.
"""
- def __init__(self, connection_pool, pooled=True):
+ def __init__(self, connection_pool, purpose):
"""Create a new connection, or get one from the pool."""
self.connection = None
self.connection_pool = connection_pool
+ pooled = purpose == PURPOSE_SEND
if pooled:
self.connection = connection_pool.get()
else:
# a non-pooled connection is requested, so create a new connection
- self.connection = connection_pool.create()
+ self.connection = connection_pool.create(purpose)
self.pooled = pooled
+ self.connection.pooled = pooled
def __enter__(self):
"""When with ConnectionContext() is used, return self."""
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 9e967c4..bbc4b78 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -69,7 +69,8 @@ class AMQPIncomingMessage(base.IncomingMessage):
# NOTE(Alexei_987) not sending reply, if msg_id is empty
# because reply should not be expected by caller side
return
- with self.listener.driver._get_connection() as conn:
+ with self.listener.driver._get_connection(
+ rpc_amqp.PURPOSE_SEND) as conn:
self._send_reply(conn, reply, failure, log_failure=log_failure)
self._send_reply(conn, ending=True)
@@ -268,9 +269,9 @@ class AMQPDriverBase(base.BaseDriver):
def _get_exchange(self, target):
return target.exchange or self._default_exchange
- def _get_connection(self, pooled=True):
+ def _get_connection(self, purpose=rpc_amqp.PURPOSE_SEND):
return rpc_amqp.ConnectionContext(self._connection_pool,
- pooled=pooled)
+ purpose=purpose)
def _get_reply_q(self):
with self._reply_q_lock:
@@ -279,7 +280,7 @@ class AMQPDriverBase(base.BaseDriver):
reply_q = 'reply_' + uuid.uuid4().hex
- conn = self._get_connection(pooled=False)
+ conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
self._waiter = ReplyWaiter(reply_q, conn,
self._allowed_remote_exmods)
@@ -320,7 +321,7 @@ class AMQPDriverBase(base.BaseDriver):
self._waiter.listen(msg_id)
try:
- with self._get_connection() as conn:
+ with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn:
if notify:
conn.notify_send(self._get_exchange(target),
target.topic, msg, retry=retry)
@@ -353,7 +354,7 @@ class AMQPDriverBase(base.BaseDriver):
envelope=(version == 2.0), notify=True, retry=retry)
def listen(self, target):
- conn = self._get_connection(pooled=False)
+ conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
@@ -369,7 +370,7 @@ class AMQPDriverBase(base.BaseDriver):
return listener
def listen_for_notifications(self, targets_and_priorities, pool):
- conn = self._get_connection(pooled=False)
+ conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
for target, priority in targets_and_priorities:
diff --git a/oslo_messaging/_drivers/impl_qpid.py b/oslo_messaging/_drivers/impl_qpid.py
index 4640230..3a2b296 100644
--- a/oslo_messaging/_drivers/impl_qpid.py
+++ b/oslo_messaging/_drivers/impl_qpid.py
@@ -462,7 +462,7 @@ class Connection(object):
pools = {}
- def __init__(self, conf, url):
+ def __init__(self, conf, url, purpose):
if not qpid_messaging:
raise ImportError("Failed to import qpid.messaging")
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index b06d290..d10ac22 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -12,19 +12,20 @@
# License for the specific language governing permissions and limitations
# under the License.
+import contextlib
import functools
import itertools
import logging
import os
import socket
import ssl
+import threading
import time
import uuid
import kombu
import kombu.connection
import kombu.entity
-import kombu.exceptions
import kombu.messaging
from oslo_config import cfg
from oslo_utils import netutils
@@ -35,6 +36,7 @@ from oslo_messaging._drivers import amqp as rpc_amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._i18n import _
+from oslo_messaging._i18n import _LE
from oslo_messaging._i18n import _LI
from oslo_messaging._i18n import _LW
from oslo_messaging import exceptions
@@ -120,6 +122,15 @@ rabbit_opts = [
help='Use HA queues in RabbitMQ (x-ha-policy: all). '
'If you change this option, you must wipe the '
'RabbitMQ database.'),
+ cfg.IntOpt('heartbeat_timeout_threshold',
+ default=60,
+ help="Number of seconds after which the Rabbit broker is "
+ "considered down if heartbeat's keep-alive fails "
+ "(0 disable the heartbeat)."),
+ cfg.IntOpt('heartbeat_rate',
+ default=2,
+ help='How often times during the heartbeat_timeout_threshold '
+ 'we check the heartbeat.'),
# NOTE(sileht): deprecated option since oslo_messaging 1.5.0,
cfg.BoolOpt('fake_rabbit',
@@ -460,12 +471,119 @@ class NotifyPublisher(TopicPublisher):
queue.declare()
+class DummyConnectionLock(object):
+ def acquire(self):
+ pass
+
+ def release(self):
+ pass
+
+ def heartbeat_acquire(self):
+ pass
+
+ def __enter__(self):
+ self.acquire()
+
+ def __exit__(self, type, value, traceback):
+ self.release()
+
+
+class ConnectionLock(DummyConnectionLock):
+ """Lock object to protect access the the kombu connection
+
+ This is a lock object to protect access the the kombu connection
+ object between the heartbeat thread and the driver thread.
+
+ They are two way to acquire this lock:
+ * lock.acquire()
+ * lock.heartbeat_acquire()
+
+ In both case lock.release(), release the lock.
+
+ The goal is that the heartbeat thread always have the priority
+ for acquiring the lock. This ensures we have no heartbeat
+ starvation when the driver sends a lot of messages.
+
+ So when lock.heartbeat_acquire() is called next time the lock
+ is released(), the caller unconditionnaly acquires
+ the lock, even someone else have asked for the lock before it.
+ """
+
+ def __init__(self):
+ self._workers_waiting = 0
+ self._heartbeat_waiting = False
+ self._lock_acquired = None
+ self._monitor = threading.Lock()
+ self._workers_locks = threading.Condition(self._monitor)
+ self._heartbeat_lock = threading.Condition(self._monitor)
+ self._get_thread_id = self._fetch_current_thread_functor()
+
+ def acquire(self):
+ with self._monitor:
+ while self._lock_acquired:
+ self._workers_waiting += 1
+ self._workers_locks.wait()
+ self._workers_waiting -= 1
+ self._lock_acquired = self._get_thread_id()
+
+ def heartbeat_acquire(self):
+ # NOTE(sileht): must be called only one time
+ with self._monitor:
+ while self._lock_acquired is not None:
+ self._heartbeat_waiting = True
+ self._heartbeat_lock.wait()
+ self._heartbeat_waiting = False
+ self._lock_acquired = self._get_thread_id()
+
+ def release(self):
+ with self._monitor:
+ if self._lock_acquired is None:
+ raise RuntimeError("We can't release a not acquired lock")
+ thread_id = self._get_thread_id()
+ if self._lock_acquired != thread_id:
+ raise RuntimeError("We can't release lock acquired by another "
+ "thread/greenthread; %s vs %s" %
+ (self._lock_acquired, thread_id))
+ self._lock_acquired = None
+ if self._heartbeat_waiting:
+ self._heartbeat_lock.notify()
+ elif self._workers_waiting > 0:
+ self._workers_locks.notify()
+
+ @contextlib.contextmanager
+ def for_heartbeat(self):
+ self.heartbeat_acquire()
+ try:
+ yield
+ finally:
+ self.release()
+
+ @staticmethod
+ def _fetch_current_thread_functor():
+ # Until https://github.com/eventlet/eventlet/issues/172 is resolved
+ # or addressed we have to use complicated workaround to get a object
+ # that will not be recycled; the usage of threading.current_thread()
+ # doesn't appear to currently be monkey patched and therefore isn't
+ # reliable to use (and breaks badly when used as all threads share
+ # the same current_thread() object)...
+ try:
+ import eventlet
+ from eventlet import patcher
+ green_threaded = patcher.is_monkey_patched('thread')
+ except ImportError:
+ green_threaded = False
+ if green_threaded:
+ return lambda: eventlet.getcurrent()
+ else:
+ return lambda: threading.current_thread()
+
+
class Connection(object):
"""Connection object."""
pools = {}
- def __init__(self, conf, url):
+ def __init__(self, conf, url, purpose):
self.consumers = []
self.consumer_num = itertools.count(1)
self.conf = conf
@@ -527,18 +645,47 @@ class Connection(object):
self.do_consume = True
self._consume_loop_stopped = False
-
self.channel = None
+
+ # NOTE(sileht): if purpose is PURPOSE_LISTEN
+ # we don't need the lock because we don't
+ # have a heartbeat thread
+ if purpose == rpc_amqp.PURPOSE_SEND:
+ self._connection_lock = ConnectionLock()
+ else:
+ self._connection_lock = DummyConnectionLock()
+
self.connection = kombu.connection.Connection(
self._url, ssl=self._fetch_ssl_params(),
login_method=self._login_method,
- failover_strategy="shuffle")
+ failover_strategy="shuffle",
+ heartbeat=self.driver_conf.heartbeat_timeout_threshold)
LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)d'),
self.connection.info())
+
+ # NOTE(sileht): kombu recommend to run heartbeat_check every
+ # seconds, but we use a lock around the kombu connection
+ # so, to not lock to much this lock to most of the time do nothing
+ # expected waiting the events drain, we start heartbeat_check and
+ # retreive the server heartbeat packet only two times more than
+ # the minimum required for the heartbeat works
+ # (heatbeat_timeout/heartbeat_rate/2.0, default kombu
+ # heartbeat_rate is 2)
+ self._heartbeat_wait_timeout = (
+ float(self.driver_conf.heartbeat_timeout_threshold) /
+ float(self.driver_conf.heartbeat_rate) / 2.0)
+ self._heartbeat_support_log_emitted = False
+
# NOTE(sileht): just ensure the connection is setuped at startup
- self.ensure(error_callback=None,
- method=lambda: True)
+ self.ensure_connection()
+
+ # NOTE(sileht): if purpose is PURPOSE_LISTEN
+ # the consume code does the heartbeat stuff
+ # we don't need a thread
+ if purpose == rpc_amqp.PURPOSE_SEND:
+ self._heartbeat_start()
+
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'),
self.connection.info())
@@ -602,6 +749,10 @@ class Connection(object):
return ssl_params or True
return False
+ def ensure_connection(self):
+ self.ensure(error_callback=None,
+ method=lambda: True)
+
def ensure(self, error_callback, method, retry=None,
timeout_is_error=True):
"""Will retry up to retry number of times.
@@ -609,6 +760,8 @@ class Connection(object):
retry = -1 means to retry forever
retry = 0 means no retry
retry = N means N retries
+
+ NOTE(sileht): Must be called within the connection lock
"""
current_pid = os.getpid()
@@ -676,6 +829,7 @@ class Connection(object):
recoverable_errors = (self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
+
try:
autoretry_method = self.connection.autoretry(
execute_method, channel=self.channel,
@@ -703,12 +857,17 @@ class Connection(object):
raise exceptions.MessageDeliveryFailure(msg)
def _set_current_channel(self, new_channel):
+ """Change the channel to use.
+
+ NOTE(sileht): Must be called within the connection lock
+ """
if self.channel is not None and new_channel != self.channel:
self.connection.maybe_close_channel(self.channel)
self.channel = new_channel
def close(self):
"""Close/release this connection."""
+ self._heartbeat_stop()
if self.connection:
self._set_current_channel(None)
self.connection.release()
@@ -716,10 +875,74 @@ class Connection(object):
def reset(self):
"""Reset a connection so it can be used again."""
- self._set_current_channel(self.connection.channel())
+ with self._connection_lock:
+ self._set_current_channel(self.connection.channel())
self.consumers = []
self.consumer_num = itertools.count(1)
+ def _heartbeat_supported_and_enabled(self):
+ if self.driver_conf.heartbeat_timeout_threshold <= 0:
+ return False
+
+ if self.connection.supports_heartbeats:
+ return True
+ elif not self._heartbeat_support_log_emitted:
+ LOG.warn(_LW("Heartbeat support requested but it is not supported "
+ "by the kombu driver or the broker"))
+ self._heartbeat_support_log_emitted = True
+ return False
+
+ def _heartbeat_start(self):
+ if self._heartbeat_supported_and_enabled():
+ self._heartbeat_exit_event = threading.Event()
+ self._heartbeat_thread = threading.Thread(
+ target=self._heartbeat_thread_job)
+ self._heartbeat_thread.daemon = True
+ self._heartbeat_thread.start()
+ else:
+ self._heartbeat_thread = None
+
+ def _heartbeat_stop(self):
+ if self._heartbeat_thread is not None:
+ self._heartbeat_exit_event.set()
+ self._heartbeat_thread.join()
+ self._heartbeat_thread = None
+
+ def _heartbeat_thread_job(self):
+ """Thread that maintains inactive connections
+ """
+ while not self._heartbeat_exit_event.is_set():
+ with self._connection_lock.for_heartbeat():
+
+ recoverable_errors = (
+ self.connection.recoverable_channel_errors +
+ self.connection.recoverable_connection_errors)
+
+ try:
+ try:
+ self.connection.heartbeat_check(
+ rate=self.driver_conf.heartbeat_rate)
+ # NOTE(sileht): We need to drain event to receive
+ # heartbeat from the broker but don't hold the
+ # connection too much times. In amqpdriver a connection
+ # is used exclusivly for read or for write, so we have
+ # to do this for connection used for write drain_events
+ # already do that for other connection
+ try:
+ self.connection.drain_events(timeout=0.001)
+ except socket.timeout:
+ pass
+ except recoverable_errors as exc:
+ LOG.info(_LI("A recoverable connection/channel error "
+ "occurs, try to reconnect: %s"), exc)
+ except Exception:
+ LOG.exception(_LE("Unexpected error during heartbeart "
+ "thread processing, retrying..."))
+
+ self._heartbeat_exit_event.wait(
+ timeout=self._heartbeat_wait_timeout)
+ self._heartbeat_exit_event.clear()
+
def declare_consumer(self, consumer_cls, topic, callback):
"""Create a Consumer using the class that was passed in and
add it to our list of consumers
@@ -736,10 +959,14 @@ class Connection(object):
self.consumers.append(consumer)
return consumer
- return self.ensure(_connect_error, _declare_consumer)
+ with self._connection_lock:
+ return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None, timeout=None):
- """Return an iterator that will consume from all queues/consumers."""
+ """Return an iterator that will consume from all queues/consumers.
+
+ NOTE(sileht): Must be called within the connection lock
+ """
timer = rpc_common.DecayingTimer(duration=timeout)
timer.start()
@@ -770,6 +997,9 @@ class Connection(object):
self._consume_loop_stopped = False
raise StopIteration
+ if self._heartbeat_supported_and_enabled():
+ self.connection.heartbeat_check(
+ rate=self.driver_conf.heartbeat_rate)
try:
return self.connection.drain_events(timeout=poll_timeout)
except socket.timeout as exc:
@@ -795,7 +1025,8 @@ class Connection(object):
**kwargs)
publisher.send(msg, timeout)
- self.ensure(_error_callback, _publish, retry=retry)
+ with self._connection_lock:
+ self.ensure(_error_callback, _publish, retry=retry)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
@@ -861,12 +1092,13 @@ class Connection(object):
def consume(self, limit=None, timeout=None):
"""Consume from all queues/consumers."""
- it = self.iterconsume(limit=limit, timeout=timeout)
- while True:
- try:
- six.next(it)
- except StopIteration:
- return
+ with self._connection_lock:
+ it = self.iterconsume(limit=limit, timeout=timeout)
+ while True:
+ try:
+ six.next(it)
+ except StopIteration:
+ return
def stop_consuming(self):
self._consume_loop_stopped = True
diff --git a/oslo_messaging/tests/drivers/test_impl_qpid.py b/oslo_messaging/tests/drivers/test_impl_qpid.py
index 2d7dd6a..e39f72a 100644
--- a/oslo_messaging/tests/drivers/test_impl_qpid.py
+++ b/oslo_messaging/tests/drivers/test_impl_qpid.py
@@ -27,6 +27,7 @@ import testscenarios
import testtools
import oslo_messaging
+from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import impl_qpid as qpid_driver
from oslo_messaging.tests import utils as test_utils
@@ -564,7 +565,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase):
with mock.patch('qpid.messaging.Connection') as conn_mock:
# starting from the first broker in the list
url = oslo_messaging.TransportURL.parse(self.conf, None)
- connection = qpid_driver.Connection(self.conf, url)
+ connection = qpid_driver.Connection(self.conf, url,
+ amqp.PURPOSE_SEND)
# reconnect will advance to the next broker, one broker per
# attempt, and then wrap to the start of the list once the end is
@@ -806,7 +808,8 @@ class QPidHATestCase(test_utils.BaseTestCase):
# starting from the first broker in the list
url = oslo_messaging.TransportURL.parse(self.conf, None)
- self.connection = qpid_driver.Connection(self.conf, url)
+ self.connection = qpid_driver.Connection(self.conf, url,
+ amqp.PURPOSE_SEND)
self.addCleanup(self.connection.close)
self.info.update({'attempt': 0,
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index df0f3b3..b4e691d 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -21,6 +21,7 @@ import uuid
import fixtures
import kombu
+import kombu.transport.memory
import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
@@ -28,6 +29,7 @@ from oslotest import mockpatch
import testscenarios
import oslo_messaging
+from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers import impl_rabbit as rabbit_driver
@@ -54,6 +56,52 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase):
self.assertEqual('memory:////', url)
+class TestHeartbeat(test_utils.BaseTestCase):
+
+ @mock.patch('oslo_messaging._drivers.impl_rabbit.LOG')
+ @mock.patch('kombu.connection.Connection.heartbeat_check')
+ @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.'
+ '_heartbeat_supported_and_enabled', return_value=True)
+ def _do_test_heartbeat_sent(self, fake_heartbeat_support, fake_heartbeat,
+ fake_logger, heartbeat_side_effect=None,
+ info=None):
+
+ event = threading.Event()
+
+ def heartbeat_check(rate=2):
+ event.set()
+ if heartbeat_side_effect:
+ raise heartbeat_side_effect
+
+ fake_heartbeat.side_effect = heartbeat_check
+
+ transport = oslo_messaging.get_transport(self.conf,
+ 'kombu+memory:////')
+ self.addCleanup(transport.cleanup)
+ conn = transport._driver._get_connection()
+ event.wait()
+ conn._heartbeat_stop()
+
+ # check heartbeat have been called
+ self.assertLess(0, fake_heartbeat.call_count)
+
+ if not heartbeat_side_effect:
+ self.assertEqual(2, fake_logger.info.call_count)
+ else:
+ self.assertEqual(3, fake_logger.info.call_count)
+ self.assertIn(mock.call(info, mock.ANY),
+ fake_logger.info.mock_calls)
+
+ def test_test_heartbeat_sent_default(self):
+ self._do_test_heartbeat_sent()
+
+ def test_test_heartbeat_sent_connection_fail(self):
+ self._do_test_heartbeat_sent(
+ heartbeat_side_effect=kombu.exceptions.ConnectionError,
+ info='A recoverable connection/channel error occurs, '
+ 'try to reconnect: %s')
+
+
class TestRabbitDriverLoad(test_utils.BaseTestCase):
scenarios = [
@@ -68,6 +116,8 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure')
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset')
def test_driver_load(self, fake_ensure, fake_reset):
+ self.config(heartbeat_timeout_threshold=0,
+ group='oslo_messaging_rabbit')
self.messaging_conf.transport_driver = self.transport_driver
transport = oslo_messaging.get_transport(self.conf)
self.addCleanup(transport.cleanup)
@@ -107,8 +157,8 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
transport._driver._get_connection()
connection_klass.assert_called_once_with(
- 'memory:///', ssl=self.expected,
- login_method='AMQPLAIN', failover_strategy="shuffle")
+ 'memory:///', ssl=self.expected, login_method='AMQPLAIN',
+ heartbeat=60, failover_strategy="shuffle")
class TestRabbitIterconsume(test_utils.BaseTestCase):
@@ -118,7 +168,7 @@ class TestRabbitIterconsume(test_utils.BaseTestCase):
'kombu+memory:////')
self.addCleanup(transport.cleanup)
deadline = time.time() + 3
- with transport._driver._get_connection() as conn:
+ with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
conn.iterconsume(timeout=3)
# kombu memory transport doesn't really raise error
# so just simulate a real driver behavior
@@ -170,10 +220,12 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
def setUp(self):
super(TestRabbitTransportURL, self).setUp()
self.messaging_conf.transport_driver = 'rabbit'
+ self.config(heartbeat_timeout_threshold=0,
+ group='oslo_messaging_rabbit')
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure')
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset')
- def test_transport_url(self, fake_ensure_connection, fake_reset):
+ def test_transport_url(self, fake_reset, fake_ensure):
transport = oslo_messaging.get_transport(self.conf, self.url)
self.addCleanup(transport.cleanup)
driver = transport._driver
@@ -223,6 +275,8 @@ class TestSendReceive(test_utils.BaseTestCase):
cls._timeout)
def test_send_receive(self):
+ self.config(heartbeat_timeout_threshold=0,
+ group="oslo_messaging_rabbit")
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
self.addCleanup(transport.cleanup)
@@ -708,6 +762,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
rabbit_retry_interval=0.01,
rabbit_retry_backoff=0.01,
kombu_reconnect_delay=0,
+ heartbeat_timeout_threshold=0,
group="oslo_messaging_rabbit")
self.kombu_connect = mock.Mock()
@@ -719,7 +774,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
# starting from the first broker in the list
url = oslo_messaging.TransportURL.parse(self.conf, None)
- self.connection = rabbit_driver.Connection(self.conf, url)
+ self.connection = rabbit_driver.Connection(self.conf, url,
+ amqp.PURPOSE_SEND)
self.addCleanup(self.connection.close)
def test_ensure_four_retry(self):
@@ -745,3 +801,59 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
retry=0)
self.assertEqual(1, self.kombu_connect.call_count)
self.assertEqual(2, mock_callback.call_count)
+
+
+class ConnectionLockTestCase(test_utils.BaseTestCase):
+ def _thread(self, lock, sleep, heartbeat=False):
+ def thread_task():
+ if heartbeat:
+ with lock.for_heartbeat():
+ time.sleep(sleep)
+ else:
+ with lock:
+ time.sleep(sleep)
+
+ t = threading.Thread(target=thread_task)
+ t.daemon = True
+ t.start()
+ start = time.time()
+
+ def get_elapsed_time():
+ t.join()
+ return time.time() - start
+
+ return get_elapsed_time
+
+ def test_workers_only(self):
+ l = rabbit_driver.ConnectionLock()
+ t1 = self._thread(l, 1)
+ t2 = self._thread(l, 1)
+ self.assertAlmostEqual(1, t1(), places=1)
+ self.assertAlmostEqual(2, t2(), places=1)
+
+ def test_worker_and_heartbeat(self):
+ l = rabbit_driver.ConnectionLock()
+ t1 = self._thread(l, 1)
+ t2 = self._thread(l, 1, heartbeat=True)
+ self.assertAlmostEqual(1, t1(), places=1)
+ self.assertAlmostEqual(2, t2(), places=1)
+
+ def test_workers_and_heartbeat(self):
+ l = rabbit_driver.ConnectionLock()
+ t1 = self._thread(l, 1)
+ t2 = self._thread(l, 1)
+ t3 = self._thread(l, 1)
+ t4 = self._thread(l, 1, heartbeat=True)
+ t5 = self._thread(l, 1)
+ self.assertAlmostEqual(1, t1(), places=1)
+ self.assertAlmostEqual(2, t4(), places=1)
+ self.assertAlmostEqual(3, t2(), places=1)
+ self.assertAlmostEqual(4, t3(), places=1)
+ self.assertAlmostEqual(5, t5(), places=1)
+
+ def test_heartbeat(self):
+ l = rabbit_driver.ConnectionLock()
+ t1 = self._thread(l, 1, heartbeat=True)
+ t2 = self._thread(l, 1)
+ self.assertAlmostEqual(1, t1(), places=1)
+ self.assertAlmostEqual(2, t2(), places=1)
diff --git a/tests/drivers/test_impl_qpid.py b/tests/drivers/test_impl_qpid.py
index d8cd1e7..2c2c0a5 100644
--- a/tests/drivers/test_impl_qpid.py
+++ b/tests/drivers/test_impl_qpid.py
@@ -27,6 +27,7 @@ import testscenarios
import testtools
from oslo import messaging
+from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import impl_qpid as qpid_driver
from oslo_messaging.tests import utils as test_utils
@@ -564,7 +565,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase):
with mock.patch('qpid.messaging.Connection') as conn_mock:
# starting from the first broker in the list
url = messaging.TransportURL.parse(self.conf, None)
- connection = qpid_driver.Connection(self.conf, url)
+ connection = qpid_driver.Connection(self.conf, url,
+ amqp.PURPOSE_SEND)
# reconnect will advance to the next broker, one broker per
# attempt, and then wrap to the start of the list once the end is
@@ -806,7 +808,8 @@ class QPidHATestCase(test_utils.BaseTestCase):
# starting from the first broker in the list
url = messaging.TransportURL.parse(self.conf, None)
- self.connection = qpid_driver.Connection(self.conf, url)
+ self.connection = qpid_driver.Connection(self.conf, url,
+ amqp.PURPOSE_SEND)
self.addCleanup(self.connection.close)
self.info.update({'attempt': 0,
diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py
index 8e9b29e..783afd8 100644
--- a/tests/drivers/test_impl_rabbit.py
+++ b/tests/drivers/test_impl_rabbit.py
@@ -27,6 +27,7 @@ import testscenarios
from oslo.config import cfg
from oslo import messaging
from oslo.serialization import jsonutils
+from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers import impl_rabbit as rabbit_driver
@@ -44,6 +45,8 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase):
self.config(fake_rabbit=True, group="oslo_messaging_rabbit")
def test_driver_load(self):
+ self.config(heartbeat_timeout_threshold=0,
+ group='oslo_messaging_rabbit')
transport = messaging.get_transport(self.conf)
self.addCleanup(transport.cleanup)
driver = transport._driver
@@ -67,6 +70,8 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure')
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset')
def test_driver_load(self, fake_ensure, fake_reset):
+ self.config(heartbeat_timeout_threshold=0,
+ group='oslo_messaging_rabbit')
self.messaging_conf.transport_driver = self.transport_driver
transport = messaging.get_transport(self.conf)
self.addCleanup(transport.cleanup)
@@ -83,7 +88,7 @@ class TestRabbitIterconsume(test_utils.BaseTestCase):
transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup)
deadline = time.time() + 3
- with transport._driver._get_connection() as conn:
+ with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
conn.iterconsume(timeout=3)
# kombu memory transport doesn't really raise error
# so just simulate a real driver behavior
@@ -140,6 +145,8 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
def setUp(self):
super(TestRabbitTransportURL, self).setUp()
+ self.config(heartbeat_timeout_threshold=0,
+ group='oslo_messaging_rabbit')
self.messaging_conf.transport_driver = 'rabbit'
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure')
@@ -200,6 +207,8 @@ class TestSendReceive(test_utils.BaseTestCase):
cls._timeout)
def test_send_receive(self):
+ self.config(heartbeat_timeout_threshold=0,
+ group='oslo_messaging_rabbit')
transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup)
@@ -710,7 +719,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
# starting from the first broker in the list
url = messaging.TransportURL.parse(self.conf, None)
- self.connection = rabbit_driver.Connection(self.conf, url)
+ self.connection = rabbit_driver.Connection(self.conf, url,
+ amqp.PURPOSE_SEND)
self.addCleanup(self.connection.close)
def test_ensure_four_retry(self):