summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevananda van der Veen <devananda.vdv@gmail.com>2014-09-27 18:41:46 -0700
committerDevananda van der Veen <devananda.vdv@gmail.com>2014-10-02 09:08:30 -0700
commit552a927e56030dc21221c035537d62b2077664a8 (patch)
tree8720ce71858fcff8a6e60deb6fe5d6fad83c2e9a
parent6b64010f4665f8ac141629e456504eebdc2394db (diff)
downloadironic-552a927e56030dc21221c035537d62b2077664a8.tar.gz
Add periodic task to rebuild conductor local state
This adds a periodic task which can rebuild the conductor's local state (PXE config files, etc) when conductors join or leave the cluster. For any node which is newly mapped to the conductor, this will trigger calling prepare() and take_over() on that node's deploy interface. This uses the periodic_max_worker setting like other periodic jobs, starting the take over process in separate threads. Thus, in a large cluster, it may take some time for all nodes to settle down. It also adds a new CONF option to control the timing of this job. There is a lot of room for improvement and optimization in this, however getting a fix in place is critical to the Juno release. NOTE: This does not re-establish any console sessions. Co-Authored-By: Lucas Alvares Gomes <lucasagomes@gmail.com> Change-Id: I0dbe9a5a98ec5fd0c69f32d7590d8141da5a23c2 Closes-bug: #1279331
-rw-r--r--etc/ironic/ironic.conf.sample8
-rw-r--r--ironic/common/keystone.py59
-rw-r--r--ironic/conductor/manager.py82
-rw-r--r--ironic/tests/conductor/test_manager.py184
-rw-r--r--ironic/tests/test_keystone.py7
5 files changed, 308 insertions, 32 deletions
diff --git a/etc/ironic/ironic.conf.sample b/etc/ironic/ironic.conf.sample
index 9ed172e5f..87ecd0e17 100644
--- a/etc/ironic/ironic.conf.sample
+++ b/etc/ironic/ironic.conf.sample
@@ -573,6 +573,14 @@
# meaning send all the sensor data. (list value)
#send_sensor_data_types=ALL
+# When conductors join or leave the cluster, existing
+# conductors may need to update any persistent local state as
+# nodes are moved around the cluster. This option controls how
+# often, in seconds, each conductor will check for nodes that
+# it should "take over". Set it to a negative value to disable
+# the check entirely. (integer value)
+#sync_local_state_interval=180
+
[console]
diff --git a/ironic/common/keystone.py b/ironic/common/keystone.py
index 45d7b7e20..aa37646fa 100644
--- a/ironic/common/keystone.py
+++ b/ironic/common/keystone.py
@@ -38,6 +38,33 @@ def _is_apiv3(auth_url, auth_version):
return auth_version == 'v3.0' or '/v3' in parse.urlparse(auth_url).path
+def _get_ksclient():
+ auth_url = CONF.keystone_authtoken.auth_uri
+ if not auth_url:
+ raise exception.CatalogFailure(_('Keystone API endpoint is missing'))
+
+ auth_version = CONF.keystone_authtoken.auth_version
+ api_v3 = _is_apiv3(auth_url, auth_version)
+
+ if api_v3:
+ from keystoneclient.v3 import client
+ else:
+ from keystoneclient.v2_0 import client
+
+ auth_url = get_keystone_url(auth_url, auth_version)
+ try:
+ return client.Client(username=CONF.keystone_authtoken.admin_user,
+ password=CONF.keystone_authtoken.admin_password,
+ tenant_name=CONF.keystone_authtoken.admin_tenant_name,
+ auth_url=auth_url)
+ except ksexception.Unauthorized:
+ raise exception.CatalogUnauthorized
+ except ksexception.AuthorizationFailure as err:
+ raise exception.CatalogFailure(_('Could not perform authorization '
+ 'process for service catalog: %s')
+ % err)
+
+
def get_keystone_url(auth_url, auth_version):
"""Gives an http/https url to contact keystone.
@@ -66,31 +93,7 @@ def get_service_url(service_type='baremetal', endpoint_type='internal'):
:param endpoint_type: the type of endpoint for the service.
:returns: an http/https url for the desired endpoint.
"""
- auth_url = CONF.keystone_authtoken.auth_uri
- if not auth_url:
- raise exception.CatalogFailure(_('Keystone API endpoint is missing'))
-
- auth_version = CONF.keystone_authtoken.auth_version
- api_v3 = _is_apiv3(auth_url, auth_version)
-
- if api_v3:
- from keystoneclient.v3 import client
- else:
- from keystoneclient.v2_0 import client
-
- auth_url = get_keystone_url(auth_url, auth_version)
- try:
- ksclient = client.Client(username=CONF.keystone_authtoken.admin_user,
- password=CONF.keystone_authtoken.admin_password,
- tenant_name=CONF.keystone_authtoken.admin_tenant_name,
- auth_url=auth_url)
- except ksexception.Unauthorized:
- raise exception.CatalogUnauthorized
-
- except ksexception.AuthorizationFailure as err:
- raise exception.CatalogFailure(_('Could not perform authorization '
- 'process for service catalog: %s')
- % err)
+ ksclient = _get_ksclient()
if not ksclient.has_service_catalog():
raise exception.CatalogFailure(_('No keystone service catalog loaded'))
@@ -103,3 +106,9 @@ def get_service_url(service_type='baremetal', endpoint_type='internal'):
endpoint_type=endpoint_type)
return endpoint
+
+
+def get_admin_auth_token():
+ """Get an admin auth_token from the Keystone."""
+ ksclient = _get_ksclient()
+ return ksclient.auth_token
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index 625c7bbfd..d310b4dc4 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -61,6 +61,7 @@ from ironic.common.i18n import _LC
from ironic.common.i18n import _LE
from ironic.common.i18n import _LI
from ironic.common.i18n import _LW
+from ironic.common import keystone
from ironic.common import rpc
from ironic.common import states
from ironic.common import utils as ironic_utils
@@ -68,6 +69,7 @@ from ironic.conductor import task_manager
from ironic.conductor import utils
from ironic.db import api as dbapi
from ironic import objects
+from ironic.openstack.common import context as ironic_context
from ironic.openstack.common import lockutils
from ironic.openstack.common import log
from ironic.openstack.common import periodic_task
@@ -141,6 +143,15 @@ conductor_opts = [
' sent to Ceilometer. The default value, "ALL", is a '
'special value meaning send all the sensor data.'
),
+ cfg.IntOpt('sync_local_state_interval',
+ default=180,
+ help='When conductors join or leave the cluster, existing '
+ 'conductors may need to update any persistent '
+ 'local state as nodes are moved around the cluster. '
+ 'This option controls how often, in seconds, each '
+ 'conductor will check for nodes that it should '
+ '"take over". Set it to a negative value to disable '
+ 'the check entirely.'),
]
CONF = cfg.CONF
@@ -862,15 +873,72 @@ class ConductorManager(periodic_task.PeriodicTasks):
if workers_count == CONF.conductor.periodic_max_workers:
break
- def rebalance_node_ring(self):
- """Perform any actions necessary when rebalancing the consistent hash.
-
- This may trigger several actions, such as calling driver.deploy.prepare
- for nodes which are now mapped to this conductor.
+ def _do_takeover(self, task):
+ LOG.debug(('Conductor %(cdr)s taking over node %(node)s'),
+ {'cdr': self.host, 'node': task.node.uuid})
+ task.driver.deploy.prepare(task)
+ task.driver.deploy.take_over(task)
+ # NOTE(lucasagomes): Set the ID of the new conductor managing
+ # this node
+ task.node.conductor_affinity = self.conductor.id
+ task.node.save()
+ @periodic_task.periodic_task(
+ spacing=CONF.conductor.sync_local_state_interval)
+ def _sync_local_state(self, context):
+ """Perform any actions necessary to sync local state.
+
+ This is called periodically to refresh the conductor's copy of the
+ consistent hash ring. If any mappings have changed, this method then
+ determines which, if any, nodes need to be "taken over".
+ The ensuing actions could include preparing a PXE environment,
+ updating the DHCP server, and so on.
"""
- # TODO(deva): implement this
- pass
+ self.ring_manager.reset()
+ filters = {'reserved': False,
+ 'maintenance': False,
+ 'provision_state': states.ACTIVE}
+ columns = ['id', 'uuid', 'driver', 'conductor_affinity']
+ node_list = self.dbapi.get_nodeinfo_list(
+ columns=columns,
+ filters=filters)
+
+ admin_context = None
+ workers_count = 0
+ for node_id, node_uuid, driver, conductor_affinity in node_list:
+ if not self._mapped_to_this_conductor(node_uuid, driver):
+ continue
+ if conductor_affinity == self.conductor.id:
+ continue
+
+ # NOTE(lucasagomes): The context provided by the periodic task
+ # will make the glance client to fail with an 401 (Unauthorized)
+ # so we have to use the admin_context with an admin auth_token
+ if not admin_context:
+ admin_context = ironic_context.get_admin_context()
+ admin_context.auth_token = keystone.get_admin_auth_token()
+
+ # Node is mapped here, but not updated by this conductor last
+ try:
+ with task_manager.acquire(admin_context, node_id) as task:
+ # NOTE(deva): now that we have the lock, check again to
+ # avoid racing with deletes and other state changes
+ node = task.node
+ if (node.maintenance or
+ node.conductor_affinity == self.conductor.id or
+ node.provision_state != states.ACTIVE):
+ continue
+
+ task.spawn_after(self._spawn_worker,
+ self._do_takeover, task)
+
+ except exception.NoFreeConductorWorker:
+ break
+ except (exception.NodeLocked, exception.NodeNotFound):
+ continue
+ workers_count += 1
+ if workers_count == CONF.conductor.periodic_max_workers:
+ break
def _mapped_to_this_conductor(self, node_uuid, driver):
"""Check that node is mapped to this conductor.
diff --git a/ironic/tests/conductor/test_manager.py b/ironic/tests/conductor/test_manager.py
index 75abec911..16fb9095d 100644
--- a/ironic/tests/conductor/test_manager.py
+++ b/ironic/tests/conductor/test_manager.py
@@ -26,6 +26,7 @@ from oslo import messaging
from ironic.common import boot_devices
from ironic.common import driver_factory
from ironic.common import exception
+from ironic.common import keystone
from ironic.common import states
from ironic.common import utils as ironic_utils
from ironic.conductor import manager
@@ -34,6 +35,7 @@ from ironic.conductor import utils as conductor_utils
from ironic.db import api as dbapi
from ironic.drivers import base as drivers_base
from ironic import objects
+from ironic.openstack.common import context
from ironic.tests import base as tests_base
from ironic.tests.conductor import utils as mgr_utils
from ironic.tests.db import base as tests_db_base
@@ -2292,3 +2294,185 @@ class ManagerTestProperties(tests_db_base.DbTestCase):
self.context, "bad-driver")
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.DriverNotFound, exc.exc_info[0])
+
+
+@mock.patch.object(keystone, 'get_admin_auth_token')
+@mock.patch.object(task_manager, 'acquire')
+@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
+@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
+class ManagerSyncLocalStateTestCase(_CommonMixIn, tests_db_base.DbTestCase):
+
+ def setUp(self):
+ super(ManagerSyncLocalStateTestCase, self).setUp()
+
+ self.dbapi = dbapi.get_instance()
+ self.service = manager.ConductorManager('hostname', 'test-topic')
+
+ self.service.conductor = mock.Mock()
+ self.service.dbapi = self.dbapi
+ self.service.ring_manager = mock.Mock()
+
+ self.node = self._create_node(provision_state=states.ACTIVE)
+ self.task = self._create_task(node=self.node)
+
+ self.filters = {'reserved': False,
+ 'maintenance': False,
+ 'provision_state': states.ACTIVE}
+ self.columns = ['id', 'uuid', 'driver', 'conductor_affinity']
+
+ def _assert_get_nodeinfo_args(self, get_nodeinfo_mock):
+ get_nodeinfo_mock.assert_called_once_with(
+ columns=self.columns, filters=self.filters)
+
+ def test_not_mapped(self, get_nodeinfo_mock, mapped_mock, acquire_mock,
+ get_authtoken_mock):
+ get_nodeinfo_mock.return_value = self._get_nodeinfo_list_response()
+ mapped_mock.return_value = False
+
+ self.service._sync_local_state(self.context)
+
+ self._assert_get_nodeinfo_args(get_nodeinfo_mock)
+ mapped_mock.assert_called_once_with(self.node.uuid, self.node.driver)
+ self.assertFalse(acquire_mock.called)
+ self.assertFalse(get_authtoken_mock.called)
+ self.service.ring_manager.reset.assert_called_once_with()
+
+ def test_already_mapped(self, get_nodeinfo_mock, mapped_mock,
+ acquire_mock, get_authtoken_mock):
+ # Node is already mapped to the conductor running the periodic task
+ self.node.conductor_affinity = 123
+ self.service.conductor.id = 123
+
+ get_nodeinfo_mock.return_value = self._get_nodeinfo_list_response()
+ mapped_mock.return_value = True
+
+ self.service._sync_local_state(self.context)
+
+ self._assert_get_nodeinfo_args(get_nodeinfo_mock)
+ mapped_mock.assert_called_once_with(self.node.uuid, self.node.driver)
+ self.assertFalse(acquire_mock.called)
+ self.assertFalse(get_authtoken_mock.called)
+ self.service.ring_manager.reset.assert_called_once_with()
+
+ @mock.patch.object(context, 'get_admin_context')
+ def test_good(self, get_ctx_mock, get_nodeinfo_mock, mapped_mock,
+ acquire_mock, get_authtoken_mock):
+ get_ctx_mock.return_value = self.context
+ get_nodeinfo_mock.return_value = self._get_nodeinfo_list_response()
+ mapped_mock.return_value = True
+ acquire_mock.side_effect = self._get_acquire_side_effect(self.task)
+
+ self.service._sync_local_state(self.context)
+
+ self._assert_get_nodeinfo_args(get_nodeinfo_mock)
+ mapped_mock.assert_called_once_with(self.node.uuid, self.node.driver)
+ get_authtoken_mock.assert_called_once_with()
+ acquire_mock.assert_called_once_with(self.context, self.node.id)
+ # assert spawn_after has been called
+ self.task.spawn_after.assert_called_once_with(
+ self.service._spawn_worker,
+ self.service._do_takeover, self.task)
+
+ @mock.patch.object(context, 'get_admin_context')
+ def test_no_free_worker(self, get_ctx_mock, get_nodeinfo_mock, mapped_mock,
+ acquire_mock, get_authtoken_mock):
+ get_ctx_mock.return_value = self.context
+ mapped_mock.return_value = True
+ acquire_mock.side_effect = \
+ self._get_acquire_side_effect([self.task] * 3)
+ self.task.spawn_after.side_effect = \
+ [None, exception.NoFreeConductorWorker('error')]
+
+ # 3 nodes to be checked
+ get_nodeinfo_mock.return_value = \
+ self._get_nodeinfo_list_response([self.node] * 3)
+
+ self.service._sync_local_state(self.context)
+
+ self._assert_get_nodeinfo_args(get_nodeinfo_mock)
+
+ # assert _mapped_to_this_conductor() gets called 2 times only
+ # instead of 3. When NoFreeConductorWorker is raised the loop
+ # should be broken
+ expected = [mock.call(self.node.uuid, self.node.driver)] * 2
+ self.assertEqual(expected, mapped_mock.call_args_list)
+
+ # assert acquire() gets called 2 times only instead of 3. When
+ # NoFreeConductorWorker is raised the loop should be broken
+ expected = [mock.call(self.context, self.node.id)] * 2
+ self.assertEqual(expected, acquire_mock.call_args_list)
+
+ # Only one auth token needed for all runs
+ get_authtoken_mock.assert_called_once_with()
+
+ # assert spawn_after has been called twice
+ expected = [mock.call(self.service._spawn_worker,
+ self.service._do_takeover, self.task)] * 2
+ self.assertEqual(expected, self.task.spawn_after.call_args_list)
+
+ @mock.patch.object(context, 'get_admin_context')
+ def test_node_locked(self, get_ctx_mock, get_nodeinfo_mock, mapped_mock,
+ acquire_mock, get_authtoken_mock):
+ get_ctx_mock.return_value = self.context
+ mapped_mock.return_value = True
+ acquire_mock.side_effect = self._get_acquire_side_effect(
+ [self.task, exception.NodeLocked('error'), self.task])
+ self.task.spawn_after.side_effect = [None, None]
+
+ # 3 nodes to be checked
+ get_nodeinfo_mock.return_value = \
+ self._get_nodeinfo_list_response([self.node] * 3)
+
+ self.service._sync_local_state(self.context)
+
+ self._assert_get_nodeinfo_args(get_nodeinfo_mock)
+
+ # assert _mapped_to_this_conductor() gets called 3 times
+ expected = [mock.call(self.node.uuid, self.node.driver)] * 3
+ self.assertEqual(expected, mapped_mock.call_args_list)
+
+ # assert acquire() gets called 3 times
+ expected = [mock.call(self.context, self.node.id)] * 3
+ self.assertEqual(expected, acquire_mock.call_args_list)
+
+ # Only one auth token needed for all runs
+ get_authtoken_mock.assert_called_once_with()
+
+ # assert spawn_after has been called only 2 times
+ expected = [mock.call(self.service._spawn_worker,
+ self.service._do_takeover, self.task)] * 2
+ self.assertEqual(expected, self.task.spawn_after.call_args_list)
+
+ @mock.patch.object(context, 'get_admin_context')
+ def test_worker_limit(self, get_ctx_mock, get_nodeinfo_mock, mapped_mock,
+ acquire_mock, get_authtoken_mock):
+ # Limit to only 1 worker
+ self.config(periodic_max_workers=1, group='conductor')
+ get_ctx_mock.return_value = self.context
+ mapped_mock.return_value = True
+ acquire_mock.side_effect = \
+ self._get_acquire_side_effect([self.task] * 3)
+ self.task.spawn_after.side_effect = [None] * 3
+
+ # 3 nodes to be checked
+ get_nodeinfo_mock.return_value = \
+ self._get_nodeinfo_list_response([self.node] * 3)
+
+ self.service._sync_local_state(self.context)
+
+ self._assert_get_nodeinfo_args(get_nodeinfo_mock)
+
+ # assert _mapped_to_this_conductor() gets called only once
+ # because of the worker limit
+ mapped_mock.assert_called_once_with(self.node.uuid, self.node.driver)
+
+ # assert acquire() gets called only once because of the worker limit
+ acquire_mock.assert_called_once_with(self.context, self.node.id)
+
+ # Only one auth token needed for all runs
+ get_authtoken_mock.assert_called_once_with()
+
+ # assert spawn_after has been called
+ self.task.spawn_after.assert_called_once_with(
+ self.service._spawn_worker,
+ self.service._do_takeover, self.task)
diff --git a/ironic/tests/test_keystone.py b/ironic/tests/test_keystone.py
index 3bc31699c..e70d00fba 100644
--- a/ironic/tests/test_keystone.py
+++ b/ironic/tests/test_keystone.py
@@ -110,3 +110,10 @@ class KeystoneTestCase(base.TestCase):
mock_ks.assert_called_once_with(username='fake', password='fake',
tenant_name='fake',
auth_url=expected_url)
+
+ @mock.patch('keystoneclient.v2_0.client.Client')
+ def test_get_admin_auth_token(self, mock_ks):
+ fake_client = FakeClient()
+ fake_client.auth_token = '123456'
+ mock_ks.return_value = fake_client
+ self.assertEqual('123456', keystone.get_admin_auth_token())