summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2019-08-08 15:50:07 +0000
committerGerrit Code Review <review@openstack.org>2019-08-08 15:50:07 +0000
commit1541b0c7f965b9defb02b9e63975db2d29d99242 (patch)
tree7b73b333fec0450f3306ad99e3e3c4478a786256
parent39196a4e90f0d46f6b8122b2e421b519b9c6d93f (diff)
parent22f240b82fffbd62be8568a7d0d3369134596ace (diff)
downloadoslo-messaging-1541b0c7f965b9defb02b9e63975db2d29d99242.tar.gz
Merge "Allow users run the rabbitmq heartbeat inside a standard pthread."
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py52
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py6
2 files changed, 57 insertions, 1 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index d12430c..ca76aaa 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -33,6 +33,7 @@ import kombu.messaging
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import eventletutils
+from oslo_utils import importutils
import six
import six.moves
from six.moves.urllib import parse
@@ -46,6 +47,18 @@ from oslo_messaging._drivers import pool
from oslo_messaging import _utils
from oslo_messaging import exceptions
+eventlet = importutils.try_import('eventlet')
+if eventlet and eventletutils.is_monkey_patched("thread"):
+ # Here we initialize module with the native python threading module
+ # if it was already monkey patched by eventlet/greenlet.
+ stdlib_threading = eventlet.patcher.original('threading')
+else:
+ # Manage the case where we run this driver in a non patched environment
+ # and where user even so configure the driver to run heartbeat through
+ # a python thread, if we don't do that when the heartbeat will start
+ # we will facing an issue by trying to override the threading module.
+ stdlib_threading = threading
+
# NOTE(sileht): don't exists in py2 socket module
TCP_USER_TIMEOUT = 18
@@ -75,6 +88,15 @@ rabbit_opts = [
deprecated_name='kombu_ssl_ca_certs',
help='SSL certification authority file '
'(valid only if SSL enabled).'),
+ cfg.BoolOpt('heartbeat_in_pthread',
+ default=False,
+ help="EXPERIMENTAL: Run the health check heartbeat thread"
+ "through a native python thread. By default if this"
+ "option isn't provided the health check heartbeat will"
+ "inherit the execution model from the parent process. By"
+ "example if the parent process have monkey patched the"
+ "stdlib by using eventlet/greenlet then the heartbeat"
+ "will be run through a green thread."),
cfg.FloatOpt('kombu_reconnect_delay',
default=1.0,
deprecated_group='DEFAULT',
@@ -442,6 +464,34 @@ class Connection(object):
driver_conf.kombu_missing_consumer_retry_timeout
self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
self.kombu_compression = driver_conf.kombu_compression
+ self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread
+
+ if self.heartbeat_in_pthread:
+ # NOTE(hberaud): Experimental: threading module is in use to run
+ # the rabbitmq health check heartbeat. in some situation like
+ # with nova-api, nova need green threads to run the cells
+ # mechanismes in an async mode, so they used eventlet and
+ # greenlet to monkey patch the python stdlib and get green threads.
+ # The issue here is that nova-api run under the apache MPM prefork
+ # module and mod_wsgi. The apache prefork module doesn't support
+ # epoll and recent kernel features, and evenlet is built over epoll
+ # and libevent, so when we run the rabbitmq heartbeat we inherit
+ # from the execution model of the parent process (nova-api), and
+ # in this case we will run the heartbeat through a green thread.
+ # We want to allow users to choose between pthread and
+ # green threads if needed in some specific situations.
+ # This experimental feature allow user to use pthread in an env
+ # that doesn't support eventlet without forcing the parent process
+ # to stop to use eventlet if they need monkey patching for some
+ # specific reasons.
+ # If users want to use pthread we need to make sure that we
+ # will use the *native* threading module for
+ # initialize the heartbeat thread.
+ # Here we override globaly the previously imported
+ # threading module with the native python threading module
+ # if it was already monkey patched by eventlet/greenlet.
+ global threading
+ threading = stdlib_threading
if self.ssl:
self.ssl_version = driver_conf.ssl_version
@@ -905,7 +955,7 @@ class Connection(object):
def _heartbeat_start(self):
if self._heartbeat_supported_and_enabled():
- self._heartbeat_exit_event = eventletutils.Event()
+ self._heartbeat_exit_event = threading.Event()
self._heartbeat_thread = threading.Thread(
target=self._heartbeat_thread_job, name="Rabbit-heartbeat")
self._heartbeat_thread.daemon = True
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 5f302db..ce6d11d 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -89,6 +89,11 @@ class TestHeartbeat(test_utils.BaseTestCase):
info='A recoverable connection/channel error occurred, '
'trying to reconnect: %s')
+ def test_run_heartbeat_in_pthread(self):
+ self.config(heartbeat_in_pthread=True,
+ group="oslo_messaging_rabbit")
+ self._do_test_heartbeat_sent()
+
class TestRabbitQos(test_utils.BaseTestCase):
@@ -999,6 +1004,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
class ConnectionLockTestCase(test_utils.BaseTestCase):
def _thread(self, lock, sleep, heartbeat=False):
+
def thread_task():
if heartbeat:
with lock.for_heartbeat():