summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMehdi Abaakouk <mehdi.abaakouk@enovance.com>2015-01-21 09:13:10 +0100
committerMehdi Abaakouk <mehdi.abaakouk@enovance.com>2015-03-24 17:34:55 +0100
commit64bdd80c5fe4d53ac8d7ab3ed906ec9feaeb7ec4 (patch)
treece0e0b0175e8afbb3a7d14e2292551e3c7b63f97
parentff4eb4db1b58227d0c0736a0c00d99d3fc73d79b (diff)
downloadoslo-messaging-64bdd80c5fe4d53ac8d7ab3ed906ec9feaeb7ec4.tar.gz
rabbit: heartbeat implementation
AMQP offers a heartbeat feature to ensure that the application layer promptly finds out about disrupted connections (and also completely unresponsive peers). If the client requests heartbeats on connection, rabbit server will regularly send messages to each connections with the expectation of a response. To acheive this, each driver connection object spawn a thread that send/retrieve heartbeat packets exchanged between the server and the client. To protect the concurrency access to the kombu connection between the driver and this thread use a lock that always prioritize the heartbeat thread. So when the heartbeat thread wakes up it will acquire the lock quickly, to ensure we have no heartbeat starvation when the driver sends a lot of messages. Also when we are polling the broker, the lock can be held for a long time by the 'consume' method, so this one does the heartbeat stuffs itself. DocImpact: 2 new configuration options for Rabbit driver Co-Authored-By: Oleksii Zamiatin <ozamiatin@mirantis.com> Co-Authored-By: Ilya Pekelny <ipekelny@mirantis.com> Related-Bug: #1371723 Closes-Bug: #856764 Change-Id: I1d3a635f3853bc13ffc14034468f1ac6262c11a3 (cherry picked from commit b9e134d7e955b9180482d2f7c8844501c750adf6)
-rw-r--r--oslo_messaging/_drivers/amqp.py32
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py15
-rw-r--r--oslo_messaging/_drivers/impl_qpid.py2
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py264
-rw-r--r--oslo_messaging/tests/drivers/test_impl_qpid.py7
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py122
-rw-r--r--tests/drivers/test_impl_qpid.py7
-rw-r--r--tests/drivers/test_impl_rabbit.py14
8 files changed, 424 insertions, 39 deletions
diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py
index ebae514..0d3dc51 100644
--- a/oslo_messaging/_drivers/amqp.py
+++ b/oslo_messaging/_drivers/amqp.py
@@ -55,6 +55,26 @@ amqp_opts = [
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
+# NOTE(sileht): Even if rabbit/qpid have only one Connection class,
+# this connection can be used for two purposes:
+# * wait and receive amqp messages (only do read stuffs on the socket)
+# * send messages to the broker (only do write stuffs on the socket)
+# The code inside a connection class is not concurrency safe.
+# Using one Connection class instance for doing both, will result
+# of eventlet complaining of multiple greenthreads that read/write the
+# same fd concurrently... because 'send' and 'listen' run in different
+# greenthread.
+# So, a connection cannot be shared between thread/greenthread and
+# this two variables permit to define the purpose of the connection
+# to allow drivers to add special handling if needed (like heatbeat).
+# amqp drivers create 3 kind of connections:
+# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection
+# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used
+# * driver internally have another 'PURPOSE_LISTEN' connection dedicated
+# to wait replies of rpc call
+PURPOSE_LISTEN = 'listen'
+PURPOSE_SEND = 'send'
+
class ConnectionPool(pool.Pool):
"""Class that implements a Pool of Connections."""
@@ -66,9 +86,11 @@ class ConnectionPool(pool.Pool):
self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
- def create(self):
+ def create(self, purpose=None):
+ if purpose is None:
+ purpose = PURPOSE_SEND
LOG.debug('Pool creating new connection')
- return self.connection_cls(self.conf, self.url)
+ return self.connection_cls(self.conf, self.url, purpose)
def empty(self):
for item in self.iter_free():
@@ -87,16 +109,18 @@ class ConnectionContext(rpc_common.Connection):
If possible the function makes sure to return a connection to the pool.
"""
- def __init__(self, connection_pool, pooled=True):
+ def __init__(self, connection_pool, purpose):
"""Create a new connection, or get one from the pool."""
self.connection = None
self.connection_pool = connection_pool
+ pooled = purpose == PURPOSE_SEND
if pooled:
self.connection = connection_pool.get()
else:
# a non-pooled connection is requested, so create a new connection
- self.connection = connection_pool.create()
+ self.connection = connection_pool.create(purpose)
self.pooled = pooled
+ self.connection.pooled = pooled
def __enter__(self):
"""When with ConnectionContext() is used, return self."""
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 9e967c4..bbc4b78 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -69,7 +69,8 @@ class AMQPIncomingMessage(base.IncomingMessage):
# NOTE(Alexei_987) not sending reply, if msg_id is empty
# because reply should not be expected by caller side
return
- with self.listener.driver._get_connection() as conn:
+ with self.listener.driver._get_connection(
+ rpc_amqp.PURPOSE_SEND) as conn:
self._send_reply(conn, reply, failure, log_failure=log_failure)
self._send_reply(conn, ending=True)
@@ -268,9 +269,9 @@ class AMQPDriverBase(base.BaseDriver):
def _get_exchange(self, target):
return target.exchange or self._default_exchange
- def _get_connection(self, pooled=True):
+ def _get_connection(self, purpose=rpc_amqp.PURPOSE_SEND):
return rpc_amqp.ConnectionContext(self._connection_pool,
- pooled=pooled)
+ purpose=purpose)
def _get_reply_q(self):
with self._reply_q_lock:
@@ -279,7 +280,7 @@ class AMQPDriverBase(base.BaseDriver):
reply_q = 'reply_' + uuid.uuid4().hex
- conn = self._get_connection(pooled=False)
+ conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
self._waiter = ReplyWaiter(reply_q, conn,
self._allowed_remote_exmods)
@@ -320,7 +321,7 @@ class AMQPDriverBase(base.BaseDriver):
self._waiter.listen(msg_id)
try:
- with self._get_connection() as conn:
+ with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn:
if notify:
conn.notify_send(self._get_exchange(target),
target.topic, msg, retry=retry)
@@ -353,7 +354,7 @@ class AMQPDriverBase(base.BaseDriver):
envelope=(version == 2.0), notify=True, retry=retry)
def listen(self, target):
- conn = self._get_connection(pooled=False)
+ conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
@@ -369,7 +370,7 @@ class AMQPDriverBase(base.BaseDriver):
return listener
def listen_for_notifications(self, targets_and_priorities, pool):
- conn = self._get_connection(pooled=False)
+ conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
for target, priority in targets_and_priorities:
diff --git a/oslo_messaging/_drivers/impl_qpid.py b/oslo_messaging/_drivers/impl_qpid.py
index 4640230..3a2b296 100644
--- a/oslo_messaging/_drivers/impl_qpid.py
+++ b/oslo_messaging/_drivers/impl_qpid.py
@@ -462,7 +462,7 @@ class Connection(object):
pools = {}
- def __init__(self, conf, url):
+ def __init__(self, conf, url, purpose):
if not qpid_messaging:
raise ImportError("Failed to import qpid.messaging")
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index b06d290..d10ac22 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -12,19 +12,20 @@
# License for the specific language governing permissions and limitations
# under the License.
+import contextlib
import functools
import itertools
import logging
import os
import socket
import ssl
+import threading
import time
import uuid
import kombu
import kombu.connection
import kombu.entity
-import kombu.exceptions
import kombu.messaging
from oslo_config import cfg
from oslo_utils import netutils
@@ -35,6 +36,7 @@ from oslo_messaging._drivers import amqp as rpc_amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._i18n import _
+from oslo_messaging._i18n import _LE
from oslo_messaging._i18n import _LI
from oslo_messaging._i18n import _LW
from oslo_messaging import exceptions
@@ -120,6 +122,15 @@ rabbit_opts = [
help='Use HA queues in RabbitMQ (x-ha-policy: all). '
'If you change this option, you must wipe the '
'RabbitMQ database.'),
+ cfg.IntOpt('heartbeat_timeout_threshold',
+ default=60,
+ help="Number of seconds after which the Rabbit broker is "
+ "considered down if heartbeat's keep-alive fails "
+ "(0 disable the heartbeat)."),
+ cfg.IntOpt('heartbeat_rate',
+ default=2,
+ help='How often times during the heartbeat_timeout_threshold '
+ 'we check the heartbeat.'),
# NOTE(sileht): deprecated option since oslo_messaging 1.5.0,
cfg.BoolOpt('fake_rabbit',
@@ -460,12 +471,119 @@ class NotifyPublisher(TopicPublisher):
queue.declare()
+class DummyConnectionLock(object):
+ def acquire(self):
+ pass
+
+ def release(self):
+ pass
+
+ def heartbeat_acquire(self):
+ pass
+
+ def __enter__(self):
+ self.acquire()
+
+ def __exit__(self, type, value, traceback):
+ self.release()
+
+
+class ConnectionLock(DummyConnectionLock):
+ """Lock object to protect access the the kombu connection
+
+ This is a lock object to protect access the the kombu connection
+ object between the heartbeat thread and the driver thread.
+
+ They are two way to acquire this lock:
+ * lock.acquire()
+ * lock.heartbeat_acquire()
+
+ In both case lock.release(), release the lock.
+
+ The goal is that the heartbeat thread always have the priority
+ for acquiring the lock. This ensures we have no heartbeat
+ starvation when the driver sends a lot of messages.
+
+ So when lock.heartbeat_acquire() is called next time the lock
+ is released(), the caller unconditionnaly acquires
+ the lock, even someone else have asked for the lock before it.
+ """
+
+ def __init__(self):
+ self._workers_waiting = 0
+ self._heartbeat_waiting = False
+ self._lock_acquired = None
+ self._monitor = threading.Lock()
+ self._workers_locks = threading.Condition(self._monitor)
+ self._heartbeat_lock = threading.Condition(self._monitor)
+ self._get_thread_id = self._fetch_current_thread_functor()
+
+ def acquire(self):
+ with self._monitor:
+ while self._lock_acquired:
+ self._workers_waiting += 1
+ self._workers_locks.wait()
+ self._workers_waiting -= 1
+ self._lock_acquired = self._get_thread_id()
+
+ def heartbeat_acquire(self):
+ # NOTE(sileht): must be called only one time
+ with self._monitor:
+ while self._lock_acquired is not None:
+ self._heartbeat_waiting = True
+ self._heartbeat_lock.wait()
+ self._heartbeat_waiting = False
+ self._lock_acquired = self._get_thread_id()
+
+ def release(self):
+ with self._monitor:
+ if self._lock_acquired is None:
+ raise RuntimeError("We can't release a not acquired lock")
+ thread_id = self._get_thread_id()
+ if self._lock_acquired != thread_id:
+ raise RuntimeError("We can't release lock acquired by another "
+ "thread/greenthread; %s vs %s" %
+ (self._lock_acquired, thread_id))
+ self._lock_acquired = None
+ if self._heartbeat_waiting:
+ self._heartbeat_lock.notify()
+ elif self._workers_waiting > 0:
+ self._workers_locks.notify()
+
+ @contextlib.contextmanager
+ def for_heartbeat(self):
+ self.heartbeat_acquire()
+ try:
+ yield
+ finally:
+ self.release()
+
+ @staticmethod
+ def _fetch_current_thread_functor():
+ # Until https://github.com/eventlet/eventlet/issues/172 is resolved
+ # or addressed we have to use complicated workaround to get a object
+ # that will not be recycled; the usage of threading.current_thread()
+ # doesn't appear to currently be monkey patched and therefore isn't
+ # reliable to use (and breaks badly when used as all threads share
+ # the same current_thread() object)...
+ try:
+ import eventlet
+ from eventlet import patcher
+ green_threaded = patcher.is_monkey_patched('thread')
+ except ImportError:
+ green_threaded = False
+ if green_threaded:
+ return lambda: eventlet.getcurrent()
+ else:
+ return lambda: threading.current_thread()
+
+
class Connection(object):
"""Connection object."""
pools = {}
- def __init__(self, conf, url):
+ def __init__(self, conf, url, purpose):
self.consumers = []
self.consumer_num = itertools.count(1)
self.conf = conf
@@ -527,18 +645,47 @@ class Connection(object):
self.do_consume = True
self._consume_loop_stopped = False
-
self.channel = None
+
+ # NOTE(sileht): if purpose is PURPOSE_LISTEN
+ # we don't need the lock because we don't
+ # have a heartbeat thread
+ if purpose == rpc_amqp.PURPOSE_SEND:
+ self._connection_lock = ConnectionLock()
+ else:
+ self._connection_lock = DummyConnectionLock()
+
self.connection = kombu.connection.Connection(
self._url, ssl=self._fetch_ssl_params(),
login_method=self._login_method,
- failover_strategy="shuffle")
+ failover_strategy="shuffle",
+ heartbeat=self.driver_conf.heartbeat_timeout_threshold)
LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)d'),
self.connection.info())
+
+ # NOTE(sileht): kombu recommend to run heartbeat_check every
+ # seconds, but we use a lock around the kombu connection
+ # so, to not lock to much this lock to most of the time do nothing
+ # expected waiting the events drain, we start heartbeat_check and
+ # retreive the server heartbeat packet only two times more than
+ # the minimum required for the heartbeat works
+ # (heatbeat_timeout/heartbeat_rate/2.0, default kombu
+ # heartbeat_rate is 2)
+ self._heartbeat_wait_timeout = (
+ float(self.driver_conf.heartbeat_timeout_threshold) /
+ float(self.driver_conf.heartbeat_rate) / 2.0)
+ self._heartbeat_support_log_emitted = False
+
# NOTE(sileht): just ensure the connection is setuped at startup
- self.ensure(error_callback=None,
- method=lambda: True)
+ self.ensure_connection()
+
+ # NOTE(sileht): if purpose is PURPOSE_LISTEN
+ # the consume code does the heartbeat stuff
+ # we don't need a thread
+ if purpose == rpc_amqp.PURPOSE_SEND:
+ self._heartbeat_start()
+
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'),
self.connection.info())
@@ -602,6 +749,10 @@ class Connection(object):
return ssl_params or True
return False
+ def ensure_connection(self):
+ self.ensure(error_callback=None,
+ method=lambda: True)
+
def ensure(self, error_callback, method, retry=None,
timeout_is_error=True):
"""Will retry up to retry number of times.
@@ -609,6 +760,8 @@ class Connection(object):
retry = -1 means to retry forever
retry = 0 means no retry
retry = N means N retries
+
+ NOTE(sileht): Must be called within the connection lock
"""
current_pid = os.getpid()
@@ -676,6 +829,7 @@ class Connection(object):
recoverable_errors = (self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
+
try:
autoretry_method = self.connection.autoretry(
execute_method, channel=self.channel,
@@ -703,12 +857,17 @@ class Connection(object):
raise exceptions.MessageDeliveryFailure(msg)
def _set_current_channel(self, new_channel):
+ """Change the channel to use.
+
+ NOTE(sileht): Must be called within the connection lock
+ """
if self.channel is not None and new_channel != self.channel:
self.connection.maybe_close_channel(self.channel)
self.channel = new_channel
def close(self):
"""Close/release this connection."""
+ self._heartbeat_stop()
if self.connection:
self._set_current_channel(None)
self.connection.release()
@@ -716,10 +875,74 @@ class Connection(object):
def reset(self):
"""Reset a connection so it can be used again."""
- self._set_current_channel(self.connection.channel())
+ with self._connection_lock:
+ self._set_current_channel(self.connection.channel())
self.consumers = []
self.consumer_num = itertools.count(1)
+ def _heartbeat_supported_and_enabled(self):
+ if self.driver_conf.heartbeat_timeout_threshold <= 0:
+ return False
+
+ if self.connection.supports_heartbeats:
+ return True
+ elif not self._heartbeat_support_log_emitted:
+ LOG.warn(_LW("Heartbeat support requested but it is not supported "
+ "by the kombu driver or the broker"))
+ self._heartbeat_support_log_emitted = True
+ return False
+
+ def _heartbeat_start(self):
+ if self._heartbeat_supported_and_enabled():
+ self._heartbeat_exit_event = threading.Event()
+ self._heartbeat_thread = threading.Thread(
+ target=self._heartbeat_thread_job)
+ self._heartbeat_thread.daemon = True
+ self._heartbeat_thread.start()
+ else:
+ self._heartbeat_thread = None
+
+ def _heartbeat_stop(self):
+ if self._heartbeat_thread is not None:
+ self._heartbeat_exit_event.set()
+ self._heartbeat_thread.join()
+ self._heartbeat_thread = None
+
+ def _heartbeat_thread_job(self):
+ """Thread that maintains inactive connections
+ """
+ while not self._heartbeat_exit_event.is_set():
+ with self._connection_lock.for_heartbeat():
+
+ recoverable_errors = (
+ self.connection.recoverable_channel_errors +
+ self.connection.recoverable_connection_errors)
+
+ try:
+ try:
+ self.connection.heartbeat_check(
+ rate=self.driver_conf.heartbeat_rate)
+ # NOTE(sileht): We need to drain event to receive
+ # heartbeat from the broker but don't hold the
+ # connection too much times. In amqpdriver a connection
+ # is used exclusivly for read or for write, so we have
+ # to do this for connection used for write drain_events
+ # already do that for other connection
+ try:
+ self.connection.drain_events(timeout=0.001)
+ except socket.timeout:
+ pass
+ except recoverable_errors as exc:
+ LOG.info(_LI("A recoverable connection/channel error "
+ "occurs, try to reconnect: %s"), exc)
+ except Exception:
+ LOG.exception(_LE("Unexpected error during heartbeart "
+ "thread processing, retrying..."))
+
+ self._heartbeat_exit_event.wait(
+ timeout=self._heartbeat_wait_timeout)
+ self._heartbeat_exit_event.clear()
+
def declare_consumer(self, consumer_cls, topic, callback):
"""Create a Consumer using the class that was passed in and
add it to our list of consumers
@@ -736,10 +959,14 @@ class Connection(object):
self.consumers.append(consumer)
return consumer
- return self.ensure(_connect_error, _declare_consumer)
+ with self._connection_lock:
+ return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None, timeout=None):
- """Return an iterator that will consume from all queues/consumers."""
+ """Return an iterator that will consume from all queues/consumers.
+
+ NOTE(sileht): Must be called within the connection lock
+ """
timer = rpc_common.DecayingTimer(duration=timeout)
timer.start()
@@ -770,6 +997,9 @@ class Connection(object):
self._consume_loop_stopped = False
raise StopIteration
+ if self._heartbeat_supported_and_enabled():
+ self.connection.heartbeat_check(
+ rate=self.driver_conf.heartbeat_rate)
try:
return self.connection.drain_events(timeout=poll_timeout)
except socket.timeout as exc:
@@ -795,7 +1025,8 @@ class Connection(object):
**kwargs)
publisher.send(msg, timeout)
- self.ensure(_error_callback, _publish, retry=retry)
+ with self._connection_lock:
+ self.ensure(_error_callback, _publish, retry=retry)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
@@ -861,12 +1092,13 @@ class Connection(object):
def consume(self, limit=None, timeout=None):
"""Consume from all queues/consumers."""
- it = self.iterconsume(limit=limit, timeout=timeout)
- while True:
- try:
- six.next(it)
- except StopIteration:
- return
+ with self._connection_lock:
+ it = self.iterconsume(limit=limit, timeout=timeout)
+ while True:
+ try:
+ six.next(it)
+ except StopIteration:
+ return
def stop_consuming(self):
self._consume_loop_stopped = True
diff --git a/oslo_messaging/tests/drivers/test_impl_qpid.py b/oslo_messaging/tests/drivers/test_impl_qpid.py
index 2d7dd6a..e39f72a 100644
--- a/oslo_messaging/tests/drivers/test_impl_qpid.py
+++ b/oslo_messaging/tests/drivers/test_impl_qpid.py
@@ -27,6 +27,7 @@ import testscenarios
import testtools
import oslo_messaging
+from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import impl_qpid as qpid_driver
from oslo_messaging.tests import utils as test_utils
@@ -564,7 +565,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase):
with mock.patch('qpid.messaging.Connection') as conn_mock:
# starting from the first broker in the list
url = oslo_messaging.TransportURL.parse(self.conf, None)
- connection = qpid_driver.Connection(self.conf, url)
+ connection = qpid_driver.Connection(self.conf, url,
+ amqp.PURPOSE_SEND)
# reconnect will advance to the next broker, one broker per
# attempt, and then wrap to the start of the list once the end is
@@ -806,7 +808,8 @@ class QPidHATestCase(test_utils.BaseTestCase):
# starting from the first broker in the list
url = oslo_messaging.TransportURL.parse(self.conf, None)
- self.connection = qpid_driver.Connection(self.conf, url)
+ self.connection = qpid_driver.Connection(self.conf, url,
+ amqp.PURPOSE_SEND)
self.addCleanup(self.connection.close)
self.info.update({'attempt': 0,
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index df0f3b3..b4e691d 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -21,6 +21,7 @@ import uuid
import fixtures
import kombu
+import kombu.transport.memory
import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
@@ -28,6 +29,7 @@ from oslotest import mockpatch
import testscenarios
import oslo_messaging
+from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers import impl_rabbit as rabbit_driver
@@ -54,6 +56,52 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase):
self.assertEqual('memory:////', url)
+class TestHeartbeat(test_utils.BaseTestCase):
+
+ @mock.patch('oslo_messaging._drivers.impl_rabbit.LOG')
+ @mock.patch('kombu.connection.Connection.heartbeat_check')
+ @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.'
+ '_heartbeat_supported_and_enabled', return_value=True)
+ def _do_test_heartbeat_sent(self, fake_heartbeat_support, fake_heartbeat,
+ fake_logger, heartbeat_side_effect=None,
+ info=None):
+
+ event = threading.Event()
+
+ def heartbeat_check(rate=2):
+ event.set()
+ if heartbeat_side_effect:
+ raise heartbeat_side_effect
+
+ fake_heartbeat.side_effect = heartbeat_check
+
+ transport = oslo_messaging.get_transport(self.conf,
+ 'kombu+memory:////')
+ self.addCleanup(transport.cleanup)
+ conn = transport._driver._get_connection()
+ event.wait()
+ conn._heartbeat_stop()
+
+ # check heartbeat have been called
+ self.assertLess(0, fake_heartbeat.call_count)
+
+ if not heartbeat_side_effect:
+ self.assertEqual(2, fake_logger.info.call_count)
+ else:
+ self.assertEqual(3, fake_logger.info.call_count)
+ self.assertIn(mock.call(info, mock.ANY),
+ fake_logger.info.mock_calls)
+
+ def test_test_heartbeat_sent_default(self):
+ self._do_test_heartbeat_sent()
+
+ def test_test_heartbeat_sent_connection_fail(self):
+ self._do_test_heartbeat_sent(
+ heartbeat_side_effect=kombu.exceptions.ConnectionError,
+ info='A recoverable connection/channel error occurs, '
+ 'try to reconnect: %s')
+
+
class TestRabbitDriverLoad(test_utils.BaseTestCase):
scenarios = [
@@ -68,6 +116,8 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure')
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset')
def test_driver_load(self, fake_ensure, fake_reset):
+ self.config(heartbeat_timeout_threshold=0,
+ group='oslo_messaging_rabbit')
self.messaging_conf.transport_driver = self.transport_driver
transport = oslo_messaging.get_transport(self.conf)
self.addCleanup(transport.cleanup)
@@ -107,8 +157,8 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
transport._driver._get_connection()
connection_klass.assert_called_once_with(
- 'memory:///', ssl=self.expected,
- login_method='AMQPLAIN', failover_strategy="shuffle")
+ 'memory:///', ssl=self.expected, login_method='AMQPLAIN',
+ heartbeat=60, failover_strategy="shuffle")
class TestRabbitIterconsume(test_utils.BaseTestCase):
@@ -118,7 +168,7 @@ class TestRabbitIterconsume(test_utils.BaseTestCase):
'kombu+memory:////')
self.addCleanup(transport.cleanup)
deadline = time.time() + 3
- with transport._driver._get_connection() as conn:
+ with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
conn.iterconsume(timeout=3)
# kombu memory transport doesn't really raise error
# so just simulate a real driver behavior
@@ -170,10 +220,12 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
def setUp(self):
super(TestRabbitTransportURL, self).setUp()
self.messaging_conf.transport_driver = 'rabbit'
+ self.config(heartbeat_timeout_threshold=0,
+ group='oslo_messaging_rabbit')
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure')
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset')
- def test_transport_url(self, fake_ensure_connection, fake_reset):
+ def test_transport_url(self, fake_reset, fake_ensure):
transport = oslo_messaging.get_transport(self.conf, self.url)
self.addCleanup(transport.cleanup)
driver = transport._driver
@@ -223,6 +275,8 @@ class TestSendReceive(test_utils.BaseTestCase):
cls._timeout)
def test_send_receive(self):
+ self.config(heartbeat_timeout_threshold=0,
+ group="oslo_messaging_rabbit")
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
self.addCleanup(transport.cleanup)
@@ -708,6 +762,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
rabbit_retry_interval=0.01,
rabbit_retry_backoff=0.01,
kombu_reconnect_delay=0,
+ heartbeat_timeout_threshold=0,
group="oslo_messaging_rabbit")
self.kombu_connect = mock.Mock()
@@ -719,7 +774,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
# starting from the first broker in the list
url = oslo_messaging.TransportURL.parse(self.conf, None)
- self.connection = rabbit_driver.Connection(self.conf, url)
+ self.connection = rabbit_driver.Connection(self.conf, url,
+ amqp.PURPOSE_SEND)
self.addCleanup(self.connection.close)
def test_ensure_four_retry(self):
@@ -745,3 +801,59 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
retry=0)
self.assertEqual(1, self.kombu_connect.call_count)
self.assertEqual(2, mock_callback.call_count)
+
+
+class ConnectionLockTestCase(test_utils.BaseTestCase):
+ def _thread(self, lock, sleep, heartbeat=False):
+ def thread_task():
+ if heartbeat:
+ with lock.for_heartbeat():
+ time.sleep(sleep)
+ else:
+ with lock:
+ time.sleep(sleep)
+
+ t = threading.Thread(target=thread_task)
+ t.daemon = True
+ t.start()
+ start = time.time()
+
+ def get_elapsed_time():
+ t.join()
+ return time.time() - start
+
+ return get_elapsed_time
+
+ def test_workers_only(self):
+ l = rabbit_driver.ConnectionLock()
+ t1 = self._thread(l, 1)
+ t2 = self._thread(l, 1)
+ self.assertAlmostEqual(1, t1(), places=1)
+ self.assertAlmostEqual(2, t2(), places=1)
+
+ def test_worker_and_heartbeat(self):
+ l = rabbit_driver.ConnectionLock()
+ t1 = self._thread(l, 1)
+ t2 = self._thread(l, 1, heartbeat=True)
+ self.assertAlmostEqual(1, t1(), places=1)
+ self.assertAlmostEqual(2, t2(), places=1)
+
+ def test_workers_and_heartbeat(self):
+ l = rabbit_driver.ConnectionLock()
+ t1 = self._thread(l, 1)
+ t2 = self._thread(l, 1)
+ t3 = self._thread(l, 1)
+ t4 = self._thread(l, 1, heartbeat=True)
+ t5 = self._thread(l, 1)
+ self.assertAlmostEqual(1, t1(), places=1)
+ self.assertAlmostEqual(2, t4(), places=1)
+ self.assertAlmostEqual(3, t2(), places=1)
+ self.assertAlmostEqual(4, t3(), places=1)
+ self.assertAlmostEqual(5, t5(), places=1)
+
+ def test_heartbeat(self):
+ l = rabbit_driver.ConnectionLock()
+ t1 = self._thread(l, 1, heartbeat=True)
+ t2 = self._thread(l, 1)
+ self.assertAlmostEqual(1, t1(), places=1)
+ self.assertAlmostEqual(2, t2(), places=1)
diff --git a/tests/drivers/test_impl_qpid.py b/tests/drivers/test_impl_qpid.py
index d8cd1e7..2c2c0a5 100644
--- a/tests/drivers/test_impl_qpid.py
+++ b/tests/drivers/test_impl_qpid.py
@@ -27,6 +27,7 @@ import testscenarios
import testtools
from oslo import messaging
+from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import impl_qpid as qpid_driver
from oslo_messaging.tests import utils as test_utils
@@ -564,7 +565,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase):
with mock.patch('qpid.messaging.Connection') as conn_mock:
# starting from the first broker in the list
url = messaging.TransportURL.parse(self.conf, None)
- connection = qpid_driver.Connection(self.conf, url)
+ connection = qpid_driver.Connection(self.conf, url,
+ amqp.PURPOSE_SEND)
# reconnect will advance to the next broker, one broker per
# attempt, and then wrap to the start of the list once the end is
@@ -806,7 +808,8 @@ class QPidHATestCase(test_utils.BaseTestCase):
# starting from the first broker in the list
url = messaging.TransportURL.parse(self.conf, None)
- self.connection = qpid_driver.Connection(self.conf, url)
+ self.connection = qpid_driver.Connection(self.conf, url,
+ amqp.PURPOSE_SEND)
self.addCleanup(self.connection.close)
self.info.update({'attempt': 0,
diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py
index 8e9b29e..783afd8 100644
--- a/tests/drivers/test_impl_rabbit.py
+++ b/tests/drivers/test_impl_rabbit.py
@@ -27,6 +27,7 @@ import testscenarios
from oslo.config import cfg
from oslo import messaging
from oslo.serialization import jsonutils
+from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers import impl_rabbit as rabbit_driver
@@ -44,6 +45,8 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase):
self.config(fake_rabbit=True, group="oslo_messaging_rabbit")
def test_driver_load(self):
+ self.config(heartbeat_timeout_threshold=0,
+ group='oslo_messaging_rabbit')
transport = messaging.get_transport(self.conf)
self.addCleanup(transport.cleanup)
driver = transport._driver
@@ -67,6 +70,8 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure')
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset')
def test_driver_load(self, fake_ensure, fake_reset):
+ self.config(heartbeat_timeout_threshold=0,
+ group='oslo_messaging_rabbit')
self.messaging_conf.transport_driver = self.transport_driver
transport = messaging.get_transport(self.conf)
self.addCleanup(transport.cleanup)
@@ -83,7 +88,7 @@ class TestRabbitIterconsume(test_utils.BaseTestCase):
transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup)
deadline = time.time() + 3
- with transport._driver._get_connection() as conn:
+ with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
conn.iterconsume(timeout=3)
# kombu memory transport doesn't really raise error
# so just simulate a real driver behavior
@@ -140,6 +145,8 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
def setUp(self):
super(TestRabbitTransportURL, self).setUp()
+ self.config(heartbeat_timeout_threshold=0,
+ group='oslo_messaging_rabbit')
self.messaging_conf.transport_driver = 'rabbit'
@mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure')
@@ -200,6 +207,8 @@ class TestSendReceive(test_utils.BaseTestCase):
cls._timeout)
def test_send_receive(self):
+ self.config(heartbeat_timeout_threshold=0,
+ group='oslo_messaging_rabbit')
transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup)
@@ -710,7 +719,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
# starting from the first broker in the list
url = messaging.TransportURL.parse(self.conf, None)
- self.connection = rabbit_driver.Connection(self.conf, url)
+ self.connection = rabbit_driver.Connection(self.conf, url,
+ amqp.PURPOSE_SEND)
self.addCleanup(self.connection.close)
def test_ensure_four_retry(self):