summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-10-02 00:23:50 +0000
committerGerrit Code Review <review@openstack.org>2014-10-02 00:23:51 +0000
commitc77c41803a276700c3f523a562452d1868161dca (patch)
treedbb957eccc2f08d7f5c97853d5b2afe8f7f9d304
parent6a42cee7786f496ec454575ce140f259b9a206cd (diff)
parentd6a277130b72534e499c64e3e468ec5e572fcc4d (diff)
downloadironic-c77c41803a276700c3f523a562452d1868161dca.tar.gz
Merge "Add "affinity" tracking to nodes and conductors"
-rw-r--r--ironic/api/controllers/v1/node.py3
-rw-r--r--ironic/conductor/manager.py33
-rw-r--r--ironic/db/api.py12
-rw-r--r--ironic/db/sqlalchemy/alembic/versions/487deb87cc9d_add_conductor_affinity_and_online.py45
-rw-r--r--ironic/db/sqlalchemy/api.py44
-rw-r--r--ironic/db/sqlalchemy/models.py15
-rw-r--r--ironic/objects/node.py7
-rw-r--r--ironic/tests/conductor/test_manager.py1
-rw-r--r--ironic/tests/db/sqlalchemy/test_types.py4
-rw-r--r--ironic/tests/db/test_conductor.py36
-rw-r--r--ironic/tests/db/utils.py8
11 files changed, 168 insertions, 40 deletions
diff --git a/ironic/api/controllers/v1/node.py b/ironic/api/controllers/v1/node.py
index e587ce351..91041d2e0 100644
--- a/ironic/api/controllers/v1/node.py
+++ b/ironic/api/controllers/v1/node.py
@@ -432,6 +432,9 @@ class Node(base.APIBase):
ports = wsme.wsattr([link.Link], readonly=True)
"Links to the collection of ports on this node"
+ # NOTE(deva): "conductor_affinity" shouldn't be presented on the
+ # API because it's an internal value. Don't add it here.
+
def __init__(self, **kwargs):
self.fields = []
fields = objects.Node.fields.keys()
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index e9439fb4b..c872f8966 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -187,15 +187,19 @@ class ConductorManager(periodic_task.PeriodicTasks):
"""List of driver names which this conductor supports."""
try:
- self.dbapi.register_conductor({'hostname': self.host,
- 'drivers': self.drivers})
+ # Register this conductor with the cluster
+ cdr = self.dbapi.register_conductor({'hostname': self.host,
+ 'drivers': self.drivers})
except exception.ConductorAlreadyRegistered:
+ # This conductor was already registered and did not shut down
+ # properly, so log a warning and update the record.
LOG.warn(_LW("A conductor with hostname %(hostname)s "
"was previously registered. Updating registration"),
{'hostname': self.host})
- self.dbapi.unregister_conductor(self.host)
- self.dbapi.register_conductor({'hostname': self.host,
- 'drivers': self.drivers})
+ cdr = self.dbapi.register_conductor({'hostname': self.host,
+ 'drivers': self.drivers},
+ update_existing=True)
+ self.conductor = cdr
self.ring_manager = hash.HashRingManager()
"""Consistent hash ring which maps drivers to conductors."""
@@ -219,6 +223,8 @@ class ConductorManager(periodic_task.PeriodicTasks):
def del_host(self):
self._keepalive_evt.set()
try:
+ # Inform the cluster that this conductor is shutting down.
+ # Note that rebalancing won't begin until after heartbeat timeout.
self.dbapi.unregister_conductor(self.host)
LOG.info(_LI('Successfully stopped conductor with hostname '
'%(hostname)s.'),
@@ -428,6 +434,8 @@ class ConductorManager(periodic_task.PeriodicTasks):
"""
if isinstance(e, exception.NoFreeConductorWorker):
+ # NOTE(deva): there is no need to clear conductor_affinity
+ # because it isn't updated on a failed deploy
node.provision_state = provision_state
node.target_provision_state = target_provision_state
node.last_error = (_("No free conductor workers available"))
@@ -525,6 +533,10 @@ class ConductorManager(periodic_task.PeriodicTasks):
try:
task.driver.deploy.prepare(task)
new_state = task.driver.deploy.deploy(task)
+
+ # Update conductor_affinity to reference this conductor's ID
+ # since there may be local persistent state
+ node.conductor_affinity = self.conductor.id
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.warning(_LW('Error in deploy of node %(node)s: %(err)s'),
@@ -532,6 +544,7 @@ class ConductorManager(periodic_task.PeriodicTasks):
node.last_error = _("Failed to deploy. Error: %s") % e
node.provision_state = states.DEPLOYFAIL
node.target_provision_state = states.NOSTATE
+ # NOTE(deva): there is no need to clear conductor_affinity
else:
# NOTE(deva): Some drivers may return states.DEPLOYWAIT
# eg. if they are waiting for a callback
@@ -632,7 +645,10 @@ class ConductorManager(periodic_task.PeriodicTasks):
else:
node.provision_state = new_state
finally:
- # Clean the instance_info
+ # NOTE(deva): there is no need to unset conductor_affinity
+ # because it is a reference to the most recent conductor which
+ # deployed a node, and does not limit any future actions.
+ # But we do need to clear the instance_info
node.instance_info = {}
node.save()
@@ -869,7 +885,7 @@ class ConductorManager(periodic_task.PeriodicTasks):
except exception.DriverNotFound:
return False
- return self.host == ring.get_hosts(node_uuid)[0]
+ return self.host in ring.get_hosts(node_uuid)
@messaging.expected_exceptions(exception.NodeLocked)
def validate_driver_interfaces(self, context, node_id):
@@ -1060,6 +1076,9 @@ class ConductorManager(periodic_task.PeriodicTasks):
try:
if enabled:
task.driver.console.start_console(task)
+ # TODO(deva): We should be updating conductor_affinity here
+ # but there is no support for console sessions in
+ # take_over() right now.
else:
task.driver.console.stop_console(task)
except Exception as e:
diff --git a/ironic/db/api.py b/ironic/db/api.py
index 1b2f95bf9..c038bb71d 100644
--- a/ironic/db/api.py
+++ b/ironic/db/api.py
@@ -317,8 +317,8 @@ class Connection(object):
"""
@abc.abstractmethod
- def register_conductor(self, values):
- """Register a new conductor service at the specified hostname.
+ def register_conductor(self, values, update_existing=False):
+ """Register an active conductor with the cluster.
:param values: A dict of values which must contain the following:
{
@@ -326,13 +326,17 @@ class Connection(object):
this Conductor service.
'drivers': a list of supported drivers.
}
+ :param update_existing: When false, registration will raise an
+ exception when a conflicting online record
+ is found. When true, will overwrite the
+ existing record. Default: False.
:returns: A conductor.
:raises: ConductorAlreadyRegistered
"""
@abc.abstractmethod
def get_conductor(self, hostname):
- """Retrieve a conductor service record from the database.
+ """Retrieve a conductor's service record from the database.
:param hostname: The hostname of the conductor service.
:returns: A conductor.
@@ -341,7 +345,7 @@ class Connection(object):
@abc.abstractmethod
def unregister_conductor(self, hostname):
- """Unregister this conductor with the service registry.
+ """Remove this conductor from the service registry immediately.
:param hostname: The hostname of this conductor service.
:raises: ConductorNotFound
diff --git a/ironic/db/sqlalchemy/alembic/versions/487deb87cc9d_add_conductor_affinity_and_online.py b/ironic/db/sqlalchemy/alembic/versions/487deb87cc9d_add_conductor_affinity_and_online.py
new file mode 100644
index 000000000..264aeea6d
--- /dev/null
+++ b/ironic/db/sqlalchemy/alembic/versions/487deb87cc9d_add_conductor_affinity_and_online.py
@@ -0,0 +1,45 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""add conductor_affinity and online
+
+Revision ID: 487deb87cc9d
+Revises: 3bea56f25597
+Create Date: 2014-09-26 16:16:30.988900
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '487deb87cc9d'
+down_revision = '3bea56f25597'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+ op.add_column(
+ 'conductors',
+ sa.Column('online', sa.Boolean(), default=True))
+ op.add_column(
+ 'nodes',
+ sa.Column('conductor_affinity', sa.Integer(),
+ sa.ForeignKey('conductors.id',
+ name='nodes_conductor_affinity_fk'),
+ nullable=True))
+
+
+def downgrade():
+ op.drop_constraint('nodes_conductor_affinity_fk', 'nodes',
+ type_='foreignkey')
+ op.drop_column('nodes', 'conductor_affinity')
+ op.drop_column('conductors', 'online')
diff --git a/ironic/db/sqlalchemy/api.py b/ironic/db/sqlalchemy/api.py
index 3a74605f8..d01b2359d 100644
--- a/ironic/db/sqlalchemy/api.py
+++ b/ironic/db/sqlalchemy/api.py
@@ -502,23 +502,31 @@ class Connection(api.Connection):
if count != 1:
raise exception.ChassisNotFound(chassis=chassis_id)
- def register_conductor(self, values):
- try:
- conductor = models.Conductor()
- conductor.update(values)
- # NOTE(deva): ensure updated_at field has a non-null initial value
- if not conductor.get('updated_at'):
- conductor.update({'updated_at': timeutils.utcnow()})
- conductor.save()
- return conductor
- except db_exc.DBDuplicateEntry:
- raise exception.ConductorAlreadyRegistered(
- conductor=values['hostname'])
+ def register_conductor(self, values, update_existing=False):
+ session = get_session()
+ with session.begin():
+ query = model_query(models.Conductor, session=session).\
+ filter_by(hostname=values['hostname'])
+ try:
+ ref = query.one()
+ if ref.online is True and not update_existing:
+ raise exception.ConductorAlreadyRegistered(
+ conductor=values['hostname'])
+ except NoResultFound:
+ ref = models.Conductor()
+ ref.update(values)
+ # always set online and updated_at fields when registering
+ # a conductor, especially when updating an existing one
+ ref.update({'updated_at': timeutils.utcnow(),
+ 'online': True})
+ ref.save(session)
+ return ref
def get_conductor(self, hostname):
try:
return model_query(models.Conductor).\
- filter_by(hostname=hostname).\
+ filter_by(hostname=hostname,
+ online=True).\
one()
except NoResultFound:
raise exception.ConductorNotFound(conductor=hostname)
@@ -527,8 +535,9 @@ class Connection(api.Connection):
session = get_session()
with session.begin():
query = model_query(models.Conductor, session=session).\
- filter_by(hostname=hostname)
- count = query.delete()
+ filter_by(hostname=hostname,
+ online=True)
+ count = query.update({'online': False})
if count == 0:
raise exception.ConductorNotFound(conductor=hostname)
@@ -538,7 +547,9 @@ class Connection(api.Connection):
query = model_query(models.Conductor, session=session).\
filter_by(hostname=hostname)
# since we're not changing any other field, manually set updated_at
- count = query.update({'updated_at': timeutils.utcnow()})
+ # and since we're heartbeating, make sure that online=True
+ count = query.update({'updated_at': timeutils.utcnow(),
+ 'online': True})
if count == 0:
raise exception.ConductorNotFound(conductor=hostname)
@@ -548,6 +559,7 @@ class Connection(api.Connection):
limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
result = model_query(models.Conductor).\
+ filter_by(online=True).\
filter(models.Conductor.updated_at >= limit).\
all()
diff --git a/ironic/db/sqlalchemy/models.py b/ironic/db/sqlalchemy/models.py
index 50cf9c885..691214a84 100644
--- a/ironic/db/sqlalchemy/models.py
+++ b/ironic/db/sqlalchemy/models.py
@@ -135,6 +135,7 @@ class Conductor(Base):
id = Column(Integer, primary_key=True)
hostname = Column(String(255), nullable=False)
drivers = Column(JSONEncodedList)
+ online = Column(Boolean, default=True)
class Node(Base):
@@ -163,7 +164,21 @@ class Node(Base):
properties = Column(JSONEncodedDict)
driver = Column(String(15))
driver_info = Column(JSONEncodedDict)
+
+ # NOTE(deva): this is the host name of the conductor which has
+ # acquired a TaskManager lock on the node.
+ # We should use an INT FK (conductors.id) in the future.
reservation = Column(String(255), nullable=True)
+
+ # NOTE(deva): this is the id of the last conductor which prepared local
+ # state for the node (eg, a PXE config file).
+ # When affinity and the hash ring's mapping do not match,
+ # this indicates that a conductor should rebuild local state.
+ conductor_affinity = Column(Integer,
+ ForeignKey('conductors.id',
+ name='nodes_conductor_affinity_fk'),
+ nullable=True)
+
maintenance = Column(Boolean, default=False)
console_enabled = Column(Boolean, default=False)
extra = Column(JSONEncodedDict)
diff --git a/ironic/objects/node.py b/ironic/objects/node.py
index df8911051..abe6b47bc 100644
--- a/ironic/objects/node.py
+++ b/ironic/objects/node.py
@@ -29,7 +29,8 @@ class Node(base.IronicObject):
# Version 1.4: Add get_by_instance_uuid()
# Version 1.5: Add list()
# Version 1.6: Add reserve() and release()
- VERSION = '1.6'
+ # Version 1.7: Add conductor_affinity
+ VERSION = '1.7'
dbapi = db_api.get_instance()
@@ -46,6 +47,10 @@ class Node(base.IronicObject):
'instance_info': obj_utils.dict_or_none,
'properties': obj_utils.dict_or_none,
'reservation': obj_utils.str_or_none,
+ # a reference to the id of the conductor service, not its hostname,
+ # that has most recently performed some action which could require
+ # local state to be maintained (eg, built a PXE config)
+ 'conductor_affinity': obj_utils.int_or_none,
# One of states.POWER_ON|POWER_OFF|NOSTATE|ERROR
'power_state': obj_utils.str_or_none,
diff --git a/ironic/tests/conductor/test_manager.py b/ironic/tests/conductor/test_manager.py
index 9ea64ea2c..23b4ad0e3 100644
--- a/ironic/tests/conductor/test_manager.py
+++ b/ironic/tests/conductor/test_manager.py
@@ -694,6 +694,7 @@ class DoNodeDeployTearDownTestCase(_ServiceSetUpMixin,
@mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
def test__do_node_deploy_ok(self, mock_deploy):
+ self._start_service()
# test when driver.deploy.deploy returns DEPLOYDONE
mock_deploy.return_value = states.DEPLOYDONE
node = obj_utils.create_test_node(self.context, driver='fake',
diff --git a/ironic/tests/db/sqlalchemy/test_types.py b/ironic/tests/db/sqlalchemy/test_types.py
index d1e5a7408..71577e3ef 100644
--- a/ironic/tests/db/sqlalchemy/test_types.py
+++ b/ironic/tests/db/sqlalchemy/test_types.py
@@ -78,5 +78,5 @@ class SqlAlchemyCustomTypesTestCase(base.DbTestCase):
def test_JSONEncodedList_type_check(self):
self.assertRaises(db_exc.DBError,
self.dbapi.register_conductor,
- {'drivers':
- {'this is not a list': 'test'}})
+ {'hostname': 'test_host3',
+ 'drivers': {'this is not a list': 'test'}})
diff --git a/ironic/tests/db/test_conductor.py b/ironic/tests/db/test_conductor.py
index e9b54de6b..4501d3218 100644
--- a/ironic/tests/db/test_conductor.py
+++ b/ironic/tests/db/test_conductor.py
@@ -32,17 +32,23 @@ class DbConductorTestCase(base.DbTestCase):
super(DbConductorTestCase, self).setUp()
self.dbapi = dbapi.get_instance()
+ def test_register_conductor_existing_fails(self):
+ c = utils.get_test_conductor()
+ self.dbapi.register_conductor(c)
+ self.assertRaises(
+ exception.ConductorAlreadyRegistered,
+ self.dbapi.register_conductor,
+ c)
+
+ def test_register_conductor_override(self):
+ c = utils.get_test_conductor()
+ self.dbapi.register_conductor(c)
+ self.dbapi.register_conductor(c, update_existing=True)
+
def _create_test_cdr(self, **kwargs):
c = utils.get_test_conductor(**kwargs)
return self.dbapi.register_conductor(c)
- def test_register_conductor(self):
- self._create_test_cdr(id=1)
- self.assertRaises(
- exception.ConductorAlreadyRegistered,
- self._create_test_cdr,
- id=2)
-
def test_get_conductor(self):
c1 = self._create_test_cdr()
c2 = self.dbapi.get_conductor(c1.hostname)
@@ -67,7 +73,7 @@ class DbConductorTestCase(base.DbTestCase):
def test_touch_conductor(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
- c = self._create_test_cdr(updated_at=test_time)
+ c = self._create_test_cdr()
self.assertEqual(test_time, timeutils.normalize_time(c.updated_at))
test_time = datetime.datetime(2000, 1, 1, 0, 1)
@@ -77,12 +83,26 @@ class DbConductorTestCase(base.DbTestCase):
self.assertEqual(test_time, timeutils.normalize_time(c.updated_at))
def test_touch_conductor_not_found(self):
+ # A conductor's heartbeat will not create a new record,
+ # it will only update existing ones
self._create_test_cdr()
self.assertRaises(
exception.ConductorNotFound,
self.dbapi.touch_conductor,
'bad-hostname')
+ def test_touch_offline_conductor(self):
+ # Ensure that a conductor's periodic heartbeat task can make the
+ # conductor visible again, even if it was spuriously marked offline
+ c = self._create_test_cdr()
+ self.dbapi.unregister_conductor(c.hostname)
+ self.assertRaises(
+ exception.ConductorNotFound,
+ self.dbapi.get_conductor,
+ c.hostname)
+ self.dbapi.touch_conductor(c.hostname)
+ self.dbapi.get_conductor(c.hostname)
+
@mock.patch.object(timeutils, 'utcnow')
def test_get_active_driver_dict_one_host_no_driver(self, mock_utcnow):
h = 'fake-host'
diff --git a/ironic/tests/db/utils.py b/ironic/tests/db/utils.py
index f55a108fa..edbf1d4d9 100644
--- a/ironic/tests/db/utils.py
+++ b/ironic/tests/db/utils.py
@@ -14,6 +14,9 @@
# under the License.
"""Ironic test utilities."""
+
+from oslo.utils import timeutils
+
from ironic.common import states
@@ -151,6 +154,7 @@ def get_test_node(**kw):
'id': kw.get('id', 123),
'uuid': kw.get('uuid', '1be26c0b-03f2-4d2e-ae87-c02d7f33c123'),
'chassis_id': kw.get('chassis_id', 42),
+ 'conductor_affinity': kw.get('conductor_affinity', None),
'power_state': kw.get('power_state', states.NOSTATE),
'target_power_state': kw.get('target_power_state', states.NOSTATE),
'provision_state': kw.get('provision_state', states.NOSTATE),
@@ -200,6 +204,6 @@ def get_test_conductor(**kw):
'id': kw.get('id', 6),
'hostname': kw.get('hostname', 'test-conductor-node'),
'drivers': kw.get('drivers', ['fake-driver', 'null-driver']),
- 'created_at': kw.get('created_at'),
- 'updated_at': kw.get('updated_at'),
+ 'created_at': kw.get('created_at', timeutils.utcnow()),
+ 'updated_at': kw.get('updated_at', timeutils.utcnow()),
}