diff options
author | Zuul <zuul@review.opendev.org> | 2023-02-28 17:47:46 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2023-02-28 17:47:46 +0000 |
commit | 63de82c3d3724d7989ffe119cc78d2aa9399aa77 (patch) | |
tree | fa23f3b55fbd6a3a9da2e10a589de7f08ebf7c56 /ironic | |
parent | 75c05be8a76d7babd1f60b1c60506d4aee6b399e (diff) | |
parent | e54ee2ba4cb818e25c75fcdc69f7ff1dc4956c73 (diff) | |
download | ironic-63de82c3d3724d7989ffe119cc78d2aa9399aa77.tar.gz |
Merge "Respond to rpc requests on stop until hash ring reset"
Diffstat (limited to 'ironic')
-rw-r--r-- | ironic/common/rpc_service.py | 27 | ||||
-rw-r--r-- | ironic/conductor/base_manager.py | 4 | ||||
-rw-r--r-- | ironic/tests/unit/common/test_rpc_service.py | 81 |
3 files changed, 105 insertions, 7 deletions
diff --git a/ironic/common/rpc_service.py b/ironic/common/rpc_service.py index b0eec7758..cb0f23c98 100644 --- a/ironic/common/rpc_service.py +++ b/ironic/common/rpc_service.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime import signal import sys import time @@ -24,6 +25,7 @@ from oslo_log import log import oslo_messaging as messaging from oslo_service import service from oslo_utils import importutils +from oslo_utils import timeutils from ironic.common import context from ironic.common import rpc @@ -93,6 +95,26 @@ class RPCService(service.Service): 'transport': CONF.rpc_transport}) def stop(self): + initial_time = timeutils.utcnow() + extend_time = initial_time + datetime.timedelta( + seconds=CONF.hash_ring_reset_interval) + + try: + self.manager.del_host(deregister=self.deregister) + except Exception as e: + LOG.exception('Service error occurred when cleaning up ' + 'the RPC manager. Error: %s', e) + + if self.manager.get_online_conductor_count() > 1: + # Delay stopping the server until the hash ring has been + # reset on the cluster + stop_time = timeutils.utcnow() + if stop_time < extend_time: + stop_wait = max(0, (extend_time - stop_time).seconds) + LOG.info('Waiting %(stop_wait)s seconds for hash ring reset.', + {'stop_wait': stop_wait}) + time.sleep(stop_wait) + try: if self.rpcserver is not None: self.rpcserver.stop() @@ -100,11 +122,6 @@ class RPCService(service.Service): except Exception as e: LOG.exception('Service error occurred when stopping the ' 'RPC server. Error: %s', e) - try: - self.manager.del_host(deregister=self.deregister) - except Exception as e: - LOG.exception('Service error occurred when cleaning up ' - 'the RPC manager. Error: %s', e) super(RPCService, self).stop(graceful=True) LOG.info('Stopped RPC server for service %(service)s on host ' diff --git a/ironic/conductor/base_manager.py b/ironic/conductor/base_manager.py index 22ebd57f5..5c2e4ea95 100644 --- a/ironic/conductor/base_manager.py +++ b/ironic/conductor/base_manager.py @@ -334,6 +334,10 @@ class BaseConductorManager(object): self._started = False + def get_online_conductor_count(self): + """Return a count of currently online conductors""" + return len(self.dbapi.get_online_conductors()) + def _register_and_validate_hardware_interfaces(self, hardware_types): """Register and validate hardware interfaces for this conductor. diff --git a/ironic/tests/unit/common/test_rpc_service.py b/ironic/tests/unit/common/test_rpc_service.py index 8483bfb22..09446ecf8 100644 --- a/ironic/tests/unit/common/test_rpc_service.py +++ b/ironic/tests/unit/common/test_rpc_service.py @@ -10,24 +10,28 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime +import time from unittest import mock from oslo_config import cfg import oslo_messaging from oslo_service import service as base_service +from oslo_utils import timeutils from ironic.common import context from ironic.common import rpc from ironic.common import rpc_service from ironic.conductor import manager from ironic.objects import base as objects_base -from ironic.tests import base +from ironic.tests.unit.db import base as db_base +from ironic.tests.unit.db import utils as db_utils CONF = cfg.CONF @mock.patch.object(base_service.Service, '__init__', lambda *_, **__: None) -class TestRPCService(base.TestCase): +class TestRPCService(db_base.DbTestCase): def setUp(self): super(TestRPCService, self).setUp() @@ -35,6 +39,7 @@ class TestRPCService(base.TestCase): mgr_module = "ironic.conductor.manager" mgr_class = "ConductorManager" self.rpc_svc = rpc_service.RPCService(host, mgr_module, mgr_class) + self.rpc_svc.manager.dbapi = self.dbapi @mock.patch.object(manager.ConductorManager, 'prepare_host', autospec=True) @mock.patch.object(oslo_messaging, 'Target', autospec=True) @@ -108,3 +113,75 @@ class TestRPCService(base.TestCase): self.assertFalse(self.rpc_svc._started) self.assertIn("boom", self.rpc_svc._failure) self.assertRaises(SystemExit, self.rpc_svc.wait_for_start) + + @mock.patch.object(timeutils, 'utcnow', autospec=True) + @mock.patch.object(time, 'sleep', autospec=True) + def test_stop_instant(self, mock_sleep, mock_utcnow): + # del_host returns instantly + mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0) + conductor1 = db_utils.get_test_conductor(hostname='fake_host') + with mock.patch.object(self.dbapi, 'get_online_conductors', + autospec=True) as mock_cond_list: + mock_cond_list.return_value = [conductor1] + self.rpc_svc.stop() + + # single conductor so exit immediately without waiting + mock_sleep.assert_not_called() + + @mock.patch.object(timeutils, 'utcnow', autospec=True) + @mock.patch.object(time, 'sleep', autospec=True) + def test_stop_after_full_reset_interval(self, mock_sleep, mock_utcnow): + # del_host returns instantly + mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0) + conductor1 = db_utils.get_test_conductor(hostname='fake_host') + conductor2 = db_utils.get_test_conductor(hostname='other_fake_host') + with mock.patch.object(self.dbapi, 'get_online_conductors', + autospec=True) as mock_cond_list: + # multiple conductors, so wait for hash_ring_reset_interval + mock_cond_list.return_value = [conductor1, conductor2] + self.rpc_svc.stop() + + # wait the total CONF.hash_ring_reset_interval 15 seconds + mock_sleep.assert_has_calls([mock.call(15)]) + + @mock.patch.object(timeutils, 'utcnow', autospec=True) + @mock.patch.object(time, 'sleep', autospec=True) + def test_stop_after_remaining_interval(self, mock_sleep, mock_utcnow): + mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0) + conductor1 = db_utils.get_test_conductor(hostname='fake_host') + conductor2 = db_utils.get_test_conductor(hostname='other_fake_host') + + # del_host returns after 5 seconds + mock_utcnow.side_effect = [ + datetime.datetime(2023, 2, 2, 21, 10, 0), + datetime.datetime(2023, 2, 2, 21, 10, 5), + ] + with mock.patch.object(self.dbapi, 'get_online_conductors', + autospec=True) as mock_cond_list: + # multiple conductors, so wait for hash_ring_reset_interval + mock_cond_list.return_value = [conductor1, conductor2] + self.rpc_svc.stop() + + # wait the remaining 10 seconds + mock_sleep.assert_has_calls([mock.call(10)]) + + @mock.patch.object(timeutils, 'utcnow', autospec=True) + @mock.patch.object(time, 'sleep', autospec=True) + def test_stop_slow(self, mock_sleep, mock_utcnow): + mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0) + conductor1 = db_utils.get_test_conductor(hostname='fake_host') + conductor2 = db_utils.get_test_conductor(hostname='other_fake_host') + + # del_host returns after 16 seconds + mock_utcnow.side_effect = [ + datetime.datetime(2023, 2, 2, 21, 10, 0), + datetime.datetime(2023, 2, 2, 21, 10, 16), + ] + with mock.patch.object(self.dbapi, 'get_online_conductors', + autospec=True) as mock_cond_list: + # multiple conductors, so wait for hash_ring_reset_interval + mock_cond_list.return_value = [conductor1, conductor2] + self.rpc_svc.stop() + + # no wait required, CONF.hash_ring_reset_interval already exceeded + mock_sleep.assert_not_called() |