summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCrag Wolfe <cwolfe@redhat.com>2017-02-09 06:21:37 +0000
committerCrag Wolfe <cwolfe@redhat.com>2017-04-25 08:21:42 -0700
commita7376f7494b310e9367ebe5dcb43b432a4053023 (patch)
tree8b2ed66cf49b9d5369c2cd6ede36710fabeb0c4e
parent132e9a2fea5f89f647db0911d9782daa8fe84766 (diff)
downloadheat-a7376f7494b310e9367ebe5dcb43b432a4053023.tar.gz
Consolidate resource locking with state changes
Change-Id: I261b2f0968e16d35b7d5d791a3edb4b265a4f1d1 Closes-Bug: #1662585
-rw-r--r--heat/engine/resource.py314
-rw-r--r--heat/objects/resource.py11
-rw-r--r--heat/tests/engine/service/test_stack_resources.py12
-rw-r--r--heat/tests/generic_resource.py1
-rw-r--r--heat/tests/test_resource.py92
-rw-r--r--heat_integrationtests/scenario/test_aodh_alarm.py5
6 files changed, 248 insertions, 187 deletions
diff --git a/heat/engine/resource.py b/heat/engine/resource.py
index 45144c320..c1a84de6b 100644
--- a/heat/engine/resource.py
+++ b/heat/engine/resource.py
@@ -97,6 +97,9 @@ class PollDelay(Exception):
class Resource(status.ResourceStatus):
BASE_ATTRIBUTES = (SHOW, ) = (attributes.SHOW_ATTR, )
+ LOCK_ACTIONS = (LOCK_NONE, LOCK_ACQUIRE, LOCK_RELEASE) = (
+ 0, 1, -1)
+
# If True, this resource must be created before it can be referenced.
strict_dependency = True
@@ -254,6 +257,8 @@ class Resource(status.ResourceStatus):
self.replaced_by = None
self.current_template_id = None
self.root_stack_id = None
+ self._calling_engine_id = None
+ self._atomic_key = None
if stack.cache_data is None:
resource = stack.db_resource_get(name)
@@ -296,6 +301,7 @@ class Resource(status.ResourceStatus):
self.replaced_by = resource.replaced_by
self.current_template_id = resource.current_template_id
self.root_stack_id = resource.root_stack_id
+ self._atomic_key = resource.atomic_key
@property
def external_id(self):
@@ -448,7 +454,8 @@ class Resource(status.ResourceStatus):
LOG.debug('Setting metadata for %s', six.text_type(self))
if refresh:
metadata = merge_metadata(metadata, db_res.rsrc_metadata)
- db_res.update_metadata(metadata)
+ if db_res.update_metadata(metadata):
+ self._incr_atomic_key()
self._rsrc_metadata = metadata
def handle_metadata_reset(self):
@@ -747,6 +754,12 @@ class Resource(status.ResourceStatus):
def glance(self):
return self.client('glance')
+ def _incr_atomic_key(self):
+ if self._atomic_key is None:
+ self._atomic_key = 1
+ else:
+ self._atomic_key = self._atomic_key + 1
+
@contextlib.contextmanager
def _action_recorder(self, action, expected_exceptions=tuple()):
"""Return a context manager to record the progress of an action.
@@ -759,12 +772,22 @@ class Resource(status.ResourceStatus):
Expected exceptions are re-raised, with the Resource moved to the
COMPLETE state.
"""
+ if self.stack.convergence:
+ lock_acquire = self.LOCK_ACQUIRE
+ lock_release = self.LOCK_RELEASE
+ else:
+ lock_acquire = lock_release = self.LOCK_NONE
+
try:
- self.state_set(action, self.IN_PROGRESS)
+ self.state_set(action, self.IN_PROGRESS, lock=lock_acquire)
yield
+ except exception.UpdateInProgress as ex:
+ with excutils.save_and_reraise_exception():
+ LOG.info('Update in progress for %s', self.name)
except expected_exceptions as ex:
with excutils.save_and_reraise_exception():
- self.state_set(action, self.COMPLETE, six.text_type(ex))
+ self.state_set(action, self.COMPLETE, six.text_type(ex),
+ lock=lock_release)
LOG.debug('%s', six.text_type(ex))
except Exception as ex:
LOG.info('%(action)s: %(info)s',
@@ -772,7 +795,8 @@ class Resource(status.ResourceStatus):
"info": six.text_type(self)},
exc_info=True)
failure = exception.ResourceFailure(ex, self, action)
- self.state_set(action, self.FAILED, six.text_type(failure))
+ self.state_set(action, self.FAILED, six.text_type(failure),
+ lock=lock_release)
raise failure
except BaseException as exc:
with excutils.save_and_reraise_exception():
@@ -781,11 +805,12 @@ class Resource(status.ResourceStatus):
msg = '%s aborted' % action
if reason:
msg += ' (%s)' % reason
- self.state_set(action, self.FAILED, msg)
+ self.state_set(action, self.FAILED, msg,
+ lock=lock_release)
except Exception:
LOG.exception('Error marking resource as failed')
else:
- self.state_set(action, self.COMPLETE)
+ self.state_set(action, self.COMPLETE, lock=lock_release)
def action_handler_task(self, action, args=None, action_prefix=None):
"""A task to call the Resource subclass's handler methods for action.
@@ -910,19 +935,19 @@ class Resource(status.ResourceStatus):
def create_convergence(self, template_id, resource_data, engine_id,
timeout, progress_callback=None):
"""Creates the resource by invoking the scheduler TaskRunner."""
- with self.lock(engine_id):
- self.requires = list(
- set(data.primary_key for data in resource_data.values()
- if data is not None)
- )
- self.current_template_id = template_id
- if self.stack.adopt_stack_data is None:
- runner = scheduler.TaskRunner(self.create)
- else:
- adopt_data = self.stack._adopt_kwargs(self)
- runner = scheduler.TaskRunner(self.adopt, **adopt_data)
+ self._calling_engine_id = engine_id
+ self.requires = list(
+ set(data.primary_key for data in resource_data.values()
+ if data is not None)
+ )
+ self.current_template_id = template_id
+ if self.stack.adopt_stack_data is None:
+ runner = scheduler.TaskRunner(self.create)
+ else:
+ adopt_data = self.stack._adopt_kwargs(self)
+ runner = scheduler.TaskRunner(self.adopt, **adopt_data)
- runner(timeout=timeout, progress_callback=progress_callback)
+ runner(timeout=timeout, progress_callback=progress_callback)
def validate_external(self):
if self.external_id is not None:
@@ -1172,55 +1197,74 @@ class Resource(status.ResourceStatus):
resource_data and existing resource's requires, then updates the
resource by invoking the scheduler TaskRunner.
"""
- def update_tmpl_id_and_requires():
+ def update_templ_id_and_requires(persist=True):
self.current_template_id = template_id
self.requires = list(
set(data.primary_key for data in resource_data.values()
if data is not None)
)
+ if not persist:
+ return
- with self.lock(engine_id):
- registry = new_stack.env.registry
- new_res_def = new_stack.t.resource_definitions(
- new_stack)[self.name]
- new_res_type = registry.get_class_to_instantiate(
- new_res_def.resource_type, resource_name=self.name)
- restricted_actions = registry.get_rsrc_restricted_actions(
- self.name)
- is_substituted = self.check_is_substituted(new_res_type)
- if type(self) is not new_res_type and not is_substituted:
- self._check_for_convergence_replace(restricted_actions)
-
- action_rollback = self.stack.action == self.stack.ROLLBACK
- status_in_progress = self.stack.status == self.stack.IN_PROGRESS
- if action_rollback and status_in_progress and self.replaced_by:
- try:
- self.restore_prev_rsrc(convergence=True)
- except Exception as e:
- failure = exception.ResourceFailure(e, self, self.action)
- self.state_set(self.UPDATE, self.FAILED,
- six.text_type(failure))
- raise failure
-
- # Use new resource as update method if existing resource
- # need to be substituted.
- if is_substituted:
- substitute = new_res_type(self.name, self.t, self.stack)
- self.stack.resources[self.name] = substitute
- updater = substitute.update
- else:
- updater = self.update
- runner = scheduler.TaskRunner(updater, new_res_def)
+ rs = {'current_template_id': self.current_template_id,
+ 'updated_at': self.updated_time,
+ 'requires': self.requires}
+ if not resource_objects.Resource.select_and_update_by_id(
+ self.context, self.id, rs, expected_engine_id=None,
+ atomic_key=self._atomic_key):
+ LOG.info("Resource %s is locked, can't set template",
+ six.text_type(self))
+ LOG.debug('Resource id:%(resource_id)s locked. '
+ 'Expected atomic_key:%(atomic_key)s, '
+ 'accessing from engine_id:%(engine_id)s',
+ {'resource_id': self.id,
+ 'atomic_key': self._atomic_key,
+ 'engine_id': self._calling_engine_id})
+ raise exception.UpdateInProgress(self.name)
+ self._incr_atomic_key()
+
+ self._calling_engine_id = engine_id
+ registry = new_stack.env.registry
+ new_res_def = new_stack.t.resource_definitions(
+ new_stack)[self.name]
+ new_res_type = registry.get_class_to_instantiate(
+ new_res_def.resource_type, resource_name=self.name)
+ restricted_actions = registry.get_rsrc_restricted_actions(
+ self.name)
+ is_substituted = self.check_is_substituted(new_res_type)
+ if type(self) is not new_res_type and not is_substituted:
+ self._check_for_convergence_replace(restricted_actions)
+
+ action_rollback = self.stack.action == self.stack.ROLLBACK
+ status_in_progress = self.stack.status == self.stack.IN_PROGRESS
+ if action_rollback and status_in_progress and self.replaced_by:
try:
- runner(timeout=timeout, progress_callback=progress_callback)
- update_tmpl_id_and_requires()
- except UpdateReplace:
- raise
- except BaseException:
- with excutils.save_and_reraise_exception():
- update_tmpl_id_and_requires()
- else:
- update_tmpl_id_and_requires()
+ self.restore_prev_rsrc(convergence=True)
+ except Exception as e:
+ failure = exception.ResourceFailure(e, self, self.action)
+ self.state_set(self.UPDATE, self.FAILED,
+ six.text_type(failure))
+ raise failure
+
+ # Use new resource as update method if existing resource
+ # need to be substituted.
+ if is_substituted:
+ substitute = new_res_type(self.name, self.t, self.stack)
+ self.stack.resources[self.name] = substitute
+ substitute._calling_engine_id = engine_id
+ updater = substitute.update
+ else:
+ updater = self.update
+ runner = scheduler.TaskRunner(
+ updater, new_res_def,
+ update_templ_func=update_templ_id_and_requires)
+ try:
+ runner(timeout=timeout, progress_callback=progress_callback)
+ except UpdateReplace:
+ raise
+ except BaseException:
+ with excutils.save_and_reraise_exception():
+ update_templ_id_and_requires(persist=True)
def preview_update(self, after, before, after_props, before_props,
prev_resource, check_init_complete=False):
@@ -1316,7 +1360,8 @@ class Resource(status.ResourceStatus):
return False
@scheduler.wrappertask
- def update(self, after, before=None, prev_resource=None):
+ def update(self, after, before=None, prev_resource=None,
+ update_templ_func=None):
"""Return a task to update the resource.
Subclasses should provide a handle_update() method to customise update,
@@ -1353,11 +1398,15 @@ class Resource(status.ResourceStatus):
after_props,
before_props,
prev_resource):
+ if update_templ_func is not None:
+ update_templ_func(persist=True)
return
else:
if not self._needs_update(after, before,
after_props, before_props,
prev_resource):
+ if update_templ_func is not None:
+ update_templ_func(persist=True)
return
if not self.stack.convergence:
@@ -1389,6 +1438,9 @@ class Resource(status.ResourceStatus):
self.t = after
self.reparse()
self._update_stored_properties()
+ if update_templ_func is not None:
+ # template/requires will be persisted by _action_recorder()
+ update_templ_func(persist=False)
except exception.ResourceActionRestricted as ae:
# catch all ResourceActionRestricted exceptions
@@ -1617,12 +1669,12 @@ class Resource(status.ResourceStatus):
if (db_res.current_template_id == template_id):
# Following update failure is ignorable; another
# update might have locked/updated the resource.
- db_res.select_and_update(
- {'needed_by': self.needed_by,
- 'replaces': None},
- atomic_key=db_res.atomic_key,
- expected_engine_id=None
- )
+ if db_res.select_and_update(
+ {'needed_by': self.needed_by,
+ 'replaces': None},
+ atomic_key=db_res.atomic_key,
+ expected_engine_id=None):
+ self._incr_atomic_key()
def delete_convergence(self, template_id, input_data, engine_id, timeout,
progress_callback=None):
@@ -1636,22 +1688,22 @@ class Resource(status.ResourceStatus):
replaced by more recent resource, then delete this and update
the replacement resource's needed_by and replaces fields.
"""
- with self.lock(engine_id):
- self.needed_by = list(set(v for v in input_data.values()
- if v is not None))
+ self._calling_engine_id = engine_id
+ self.needed_by = list(set(v for v in input_data.values()
+ if v is not None))
- if self.current_template_id != template_id:
- # just delete the resources in INIT state
- if self.action == self.INIT:
- try:
- resource_objects.Resource.delete(self.context, self.id)
- except exception.NotFound:
- pass
- else:
- runner = scheduler.TaskRunner(self.delete)
- runner(timeout=timeout,
- progress_callback=progress_callback)
- self._update_replacement_data(template_id)
+ if self.current_template_id != template_id:
+ # just delete the resources in INIT state
+ if self.action == self.INIT:
+ try:
+ resource_objects.Resource.delete(self.context, self.id)
+ except exception.NotFound:
+ pass
+ else:
+ runner = scheduler.TaskRunner(self.delete)
+ runner(timeout=timeout,
+ progress_callback=progress_callback)
+ self._update_replacement_data(template_id)
def handle_delete(self):
"""Default implementation; should be overridden by resources."""
@@ -1769,7 +1821,7 @@ class Resource(status.ResourceStatus):
except Exception as ex:
LOG.warning('db error %s', ex)
- def store(self, set_metadata=False):
+ def store(self, set_metadata=False, lock=LOCK_NONE):
"""Create the resource in the database.
If self.id is set, we update the existing stack.
@@ -1800,14 +1852,43 @@ class Resource(status.ResourceStatus):
self._rsrc_metadata = metadata
if self.id is not None:
- resource_objects.Resource.update_by_id(
- self.context, self.id, rs)
+ if (lock == self.LOCK_NONE or self._calling_engine_id is None):
+ resource_objects.Resource.update_by_id(
+ self.context, self.id, rs)
+ if (lock != self.LOCK_NONE and
+ self._calling_engine_id is None):
+ LOG.warning('no calling_engine_id in store %s',
+ str(rs))
+ else:
+ self._store_with_lock(rs, lock)
else:
new_rs = resource_objects.Resource.create(self.context, rs)
self.id = new_rs.id
self.uuid = new_rs.uuid
self.created_time = new_rs.created_at
+ def _store_with_lock(self, rs, lock):
+ if lock == self.LOCK_ACQUIRE:
+ rs['engine_id'] = self._calling_engine_id
+ expected_engine_id = None
+ else: # self.LOCK_RELEASE
+ expected_engine_id = self._calling_engine_id
+ rs['engine_id'] = None
+ if resource_objects.Resource.select_and_update_by_id(
+ self.context, self.id, rs, expected_engine_id,
+ self._atomic_key):
+ self._incr_atomic_key()
+ else:
+ LOG.info('Resource %s is locked or does not exist',
+ six.text_type(self))
+ LOG.debug('Resource id:%(resource_id)s locked or does not exist. '
+ 'Expected atomic_key:%(atomic_key)s, '
+ 'accessing from engine_id:%(engine_id)s',
+ {'resource_id': self.id,
+ 'atomic_key': self._atomic_key,
+ 'engine_id': self._calling_engine_id})
+ raise exception.UpdateInProgress(self.name)
+
def _add_event(self, action, status, reason):
"""Add a state change event to the database."""
physical_res_id = self.resource_id or self.physical_resource_name()
@@ -1820,61 +1901,17 @@ class Resource(status.ResourceStatus):
@contextlib.contextmanager
def lock(self, engine_id):
- self._acquire(engine_id)
+ self._calling_engine_id = engine_id
try:
+ self._store_with_lock({}, self.LOCK_ACQUIRE)
yield
- except: # noqa
+ except exception.UpdateInProgress:
+ raise
+ except BaseException:
with excutils.save_and_reraise_exception():
- self._release(engine_id)
+ self._store_with_lock({}, self.LOCK_RELEASE)
else:
- self._release(engine_id)
-
- def _acquire(self, engine_id):
- updated_ok = False
- try:
- rs = resource_objects.Resource.get_obj(self.context, self.id)
- updated_ok = rs.select_and_update(
- {'engine_id': engine_id},
- atomic_key=rs.atomic_key,
- expected_engine_id=None)
- except Exception as ex:
- LOG.error('DB error %s', ex)
- raise
-
- if not updated_ok:
- LOG.info('Resource %s is locked for update; deferring',
- six.text_type(self))
- LOG.debug(('Resource id:%(resource_id)s with '
- 'atomic_key:%(atomic_key)s, locked '
- 'by engine_id:%(rs_engine_id)s/%(engine_id)s') % {
- 'resource_id': rs.id, 'atomic_key': rs.atomic_key,
- 'rs_engine_id': rs.engine_id,
- 'engine_id': engine_id})
- raise exception.UpdateInProgress(self.name)
-
- def _release(self, engine_id):
- rs = None
- try:
- rs = resource_objects.Resource.get_obj(self.context, self.id)
- except (exception.NotFound, exception.EntityNotFound):
- # ignore: Resource is deleted holding a lock-on
- return
-
- atomic_key = rs.atomic_key
- if atomic_key is None:
- atomic_key = 0
-
- updated_ok = rs.select_and_update(
- {'engine_id': None,
- 'current_template_id': self.current_template_id,
- 'updated_at': self.updated_time,
- 'requires': self.requires,
- 'needed_by': self.needed_by},
- expected_engine_id=engine_id,
- atomic_key=atomic_key)
-
- if not updated_ok:
- LOG.warning('Failed to unlock resource %s', self.name)
+ self._store_with_lock({}, self.LOCK_RELEASE)
def _resolve_all_attributes(self, attr):
"""Method for resolving all attributes.
@@ -2010,7 +2047,8 @@ class Resource(status.ResourceStatus):
self.action = self.INIT
self.status = self.COMPLETE
- def state_set(self, action, status, reason="state changed"):
+ def state_set(self, action, status, reason="state changed",
+ lock=LOCK_NONE):
if action not in self.ACTIONS:
raise ValueError(_("Invalid action %s") % action)
@@ -2023,7 +2061,7 @@ class Resource(status.ResourceStatus):
self.action = action
self.status = status
self.status_reason = reason
- self.store(set_metadata)
+ self.store(set_metadata, lock=lock)
if new_state != old_state:
self._add_event(action, status, reason)
diff --git a/heat/objects/resource.py b/heat/objects/resource.py
index 0fea6db1b..38b8dc000 100644
--- a/heat/objects/resource.py
+++ b/heat/objects/resource.py
@@ -261,6 +261,14 @@ class Resource(
atomic_key=atomic_key,
expected_engine_id=expected_engine_id)
+ @classmethod
+ def select_and_update_by_id(cls, context, resource_id,
+ values, expected_engine_id=None,
+ atomic_key=0):
+ return db_api.resource_update(context, resource_id, values,
+ atomic_key=atomic_key,
+ expected_engine_id=expected_engine_id)
+
def refresh(self):
resource_db = db_api.resource_get(self._context, self.id, refresh=True)
return self.__class__._from_db_object(
@@ -282,3 +290,6 @@ class Resource(
if not rows_updated:
action = _('metadata setting for resource %s') % self.name
raise exception.ConcurrentTransaction(action=action)
+ return True
+ else:
+ return False
diff --git a/heat/tests/engine/service/test_stack_resources.py b/heat/tests/engine/service/test_stack_resources.py
index 812cbddd3..f226ea7cb 100644
--- a/heat/tests/engine/service/test_stack_resources.py
+++ b/heat/tests/engine/service/test_stack_resources.py
@@ -676,25 +676,25 @@ class StackResourcesServiceTest(common.HeatTestCase):
@tools.stack_context('service_mark_unhealthy_lock_converge_test_stk',
convergence=True)
def test_mark_unhealthy_stack_lock_convergence(self):
- mock_acquire = self.patchobject(res.Resource,
- '_acquire',
- return_value=None)
+ mock_store_with_lock = self.patchobject(res.Resource,
+ '_store_with_lock',
+ return_value=None)
self.eng.resource_mark_unhealthy(self.ctx, self.stack.identifier(),
'WebServer', True,
resource_status_reason="")
- mock_acquire.assert_called_once_with(self.eng.engine_id)
+ self.assertEqual(2, mock_store_with_lock.call_count)
@tools.stack_context('service_mark_unhealthy_lockexc_converge_test_stk',
convergence=True)
def test_mark_unhealthy_stack_lock_exc_convergence(self):
- def _acquire(*args, **kwargs):
+ def _store_with_lock(*args, **kwargs):
raise exception.UpdateInProgress(self.stack.name)
self.patchobject(
res.Resource,
- '_acquire',
+ '_store_with_lock',
return_value=None,
side_effect=exception.UpdateInProgress(self.stack.name))
ex = self.assertRaises(dispatcher.ExpectedException,
diff --git a/heat/tests/generic_resource.py b/heat/tests/generic_resource.py
index 72276e4f0..165c435f6 100644
--- a/heat/tests/generic_resource.py
+++ b/heat/tests/generic_resource.py
@@ -159,6 +159,7 @@ class ResourceWithProps(GenericResource):
properties_schema = {
'Foo': properties.Schema(properties.Schema.STRING),
'FooInt': properties.Schema(properties.Schema.INTEGER)}
+ atomic_key = None
class ResourceWithPropsRefPropOnDelete(ResourceWithProps):
diff --git a/heat/tests/test_resource.py b/heat/tests/test_resource.py
index 30a39c3ac..38b5c2828 100644
--- a/heat/tests/test_resource.py
+++ b/heat/tests/test_resource.py
@@ -1996,17 +1996,6 @@ class ResourceTest(common.HeatTestCase):
self.assertEqual(engine_id, rs.engine_id)
self.assertEqual(atomic_key, rs.atomic_key)
- @mock.patch.object(resource_objects.Resource, 'get_obj')
- @mock.patch.object(resource_objects.Resource, 'select_and_update')
- def test_release_ignores_not_found_error(self, mock_sau, mock_get_obj):
- tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
- res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
- res.store()
- res._acquire('engine-id')
- mock_get_obj.side_effect = exception.NotFound()
- res._release('engine-id')
- self.assertFalse(mock_sau.called)
-
def test_create_convergence(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
@@ -2024,7 +2013,7 @@ class ResourceTest(common.HeatTestCase):
self.assertTrue(mock_create.called)
self.assertItemsEqual([1, 3], res.requires)
- self._assert_resource_lock(res.id, None, 2)
+ self._assert_resource_lock(res.id, None, None)
def test_create_convergence_throws_timeout(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
@@ -2058,7 +2047,8 @@ class ResourceTest(common.HeatTestCase):
res.create_convergence, self.stack.t.id, res_data,
'engine-007', self.dummy_timeout, self.dummy_event)
self.assertItemsEqual([5, 3], res.requires)
- self._assert_resource_lock(res.id, None, 2)
+ # The locking happens in create which we mocked out
+ self._assert_resource_lock(res.id, None, None)
@mock.patch.object(resource.Resource, 'adopt')
def test_adopt_convergence_ok(self, mock_adopt):
@@ -2079,7 +2069,7 @@ class ResourceTest(common.HeatTestCase):
mock_adopt.assert_called_once_with(
resource_data={'resource_id': 'fluffy'})
self.assertItemsEqual([5, 3], res.requires)
- self._assert_resource_lock(res.id, None, 2)
+ self._assert_resource_lock(res.id, None, None)
def test_adopt_convergence_bad_data(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
@@ -2097,7 +2087,9 @@ class ResourceTest(common.HeatTestCase):
exc = self.assertRaises(exception.ResourceFailure, tr)
self.assertIn('Resource ID was not provided', six.text_type(exc))
- def test_update_convergence(self):
+ @mock.patch.object(resource.Resource, 'update_template_diff_properties')
+ @mock.patch.object(resource.Resource, '_needs_update')
+ def test_update_convergence(self, mock_nu, mock_utd):
tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {
@@ -2122,19 +2114,18 @@ class ResourceTest(common.HeatTestCase):
new_temp.store(stack.context)
new_stack = parser.Stack(utils.dummy_context(), 'test_stack',
new_temp, stack_id=self.stack.id)
+ res.stack.convergence = True
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
res_data = node_data.load_resources_data(res_data)
- pcb = mock.Mock()
- with mock.patch.object(resource.Resource, 'update') as mock_update:
- tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
- res_data, 'engine-007', 120, new_stack,
- pcb)
- tr()
- self.assertTrue(mock_update.called)
+ tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
+ res_data, 'engine-007', 120, new_stack)
+ tr()
self.assertItemsEqual([3, 4], res.requires)
+ self.assertEqual(res.action, resource.Resource.UPDATE)
+ self.assertEqual(res.status, resource.Resource.COMPLETE)
self._assert_resource_lock(res.id, None, 2)
def test_update_convergence_throws_timeout(self):
@@ -2210,7 +2201,9 @@ class ResourceTest(common.HeatTestCase):
self.dummy_event)
self.assertRaises(resource.UpdateReplace, tr)
- def test_update_in_progress_convergence(self):
+ @mock.patch.object(resource.Resource, '_needs_update')
+ @mock.patch.object(resource.Resource, '_check_for_convergence_replace')
+ def test_update_in_progress_convergence(self, mock_cfcr, mock_nu):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.requires = [1, 2]
@@ -2219,21 +2212,34 @@ class ResourceTest(common.HeatTestCase):
rs.update_and_save({'engine_id': 'not-this'})
self._assert_resource_lock(res.id, 'not-this', None)
+ res.stack.convergence = True
+
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
res_data = node_data.load_resources_data(res_data)
+ tmpl = template.Template({
+ 'HeatTemplateFormatVersion': '2012-12-12',
+ 'Resources': {
+ 'test_res': {'Type': 'ResourceWithPropsType'}
+ }}, env=self.env)
+ new_stack = parser.Stack(utils.dummy_context(), 'test_stack',
+ tmpl, stack_id=self.stack.id)
tr = scheduler.TaskRunner(res.update_convergence, 'template_key',
res_data, 'engine-007', self.dummy_timeout,
- mock.ANY, self.dummy_event)
+ new_stack)
ex = self.assertRaises(exception.UpdateInProgress, tr)
msg = ("The resource %s is already being updated." %
res.name)
self.assertEqual(msg, six.text_type(ex))
# ensure requirements are not updated for failed resource
- self.assertEqual([1, 2], res.requires)
+ rs = resource_objects.Resource.get_obj(self.stack.context, res.id)
+ self.assertEqual([1, 2], rs.requires)
- @mock.patch.object(resource.Resource, 'update')
- def test_update_resource_convergence_failed(self, mock_update):
+ @mock.patch.object(resource.Resource, 'update_template_diff_properties')
+ @mock.patch.object(resource.Resource, '_needs_update')
+ def test_update_resource_convergence_failed(self,
+ mock_needs_update,
+ mock_update_template_diff):
tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {
@@ -2259,26 +2265,25 @@ class ResourceTest(common.HeatTestCase):
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
res_data = node_data.load_resources_data(res_data)
- exc = Exception(_('Resource update failed'))
new_stack = parser.Stack(utils.dummy_context(), 'test_stack',
new_temp, stack_id=self.stack.id)
- dummy_ex = exception.ResourceFailure(exc, res, action=res.UPDATE)
- mock_update.side_effect = dummy_ex
+
+ res.stack.convergence = True
+ res._calling_engine_id = 'engine-9'
+
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
res_data, 'engine-007', 120, new_stack,
self.dummy_event)
self.assertRaises(exception.ResourceFailure, tr)
- expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name]
- mock_update.assert_called_once_with(expected_rsrc_def)
- # check if current_template_id was updated
self.assertEqual(new_temp.id, res.current_template_id)
# check if requires was updated
self.assertItemsEqual([3, 4], res.requires)
- self._assert_resource_lock(res.id, None, 2)
+ self.assertEqual(res.action, resource.Resource.UPDATE)
+ self.assertEqual(res.status, resource.Resource.FAILED)
+ self._assert_resource_lock(res.id, None, 3)
- @mock.patch.object(resource.Resource, 'update')
- def test_update_resource_convergence_update_replace(self, mock_update):
+ def test_update_resource_convergence_update_replace(self):
tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {
@@ -2301,10 +2306,11 @@ class ResourceTest(common.HeatTestCase):
}}, env=self.env)
new_temp.store(stack.context)
+ res.stack.convergence = True
+
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
res_data = node_data.load_resources_data(res_data)
- mock_update.side_effect = resource.UpdateReplace
new_stack = parser.Stack(utils.dummy_context(), 'test_stack',
new_temp, stack_id=self.stack.id)
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
@@ -2312,13 +2318,11 @@ class ResourceTest(common.HeatTestCase):
self.dummy_event)
self.assertRaises(resource.UpdateReplace, tr)
- expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name]
- mock_update.assert_called_once_with(expected_rsrc_def)
# ensure that current_template_id was not updated
self.assertEqual(stack.t.id, res.current_template_id)
# ensure that requires was not updated
self.assertItemsEqual([2], res.requires)
- self._assert_resource_lock(res.id, None, 2)
+ self._assert_resource_lock(res.id, None, None)
def test_convergence_update_replace_rollback(self):
rsrc_def = rsrc_defn.ResourceDefinition('test_res',
@@ -2386,7 +2390,7 @@ class ResourceTest(common.HeatTestCase):
tr()
self.assertTrue(mock_delete.called)
self.assertTrue(res._update_replacement_data.called)
- self._assert_resource_lock(res.id, None, 2)
+ self._assert_resource_lock(res.id, None, None)
def test_delete_convergence_does_not_delete_same_template_resource(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
@@ -2410,6 +2414,7 @@ class ResourceTest(common.HeatTestCase):
res_id = res.id
res.handle_delete = mock.Mock(side_effect=ValueError('test'))
self._assert_resource_lock(res.id, None, None)
+ res.stack.convergence = True
tr = scheduler.TaskRunner(res.delete_convergence, 2, {}, 'engine-007',
self.dummy_timeout, self.dummy_event)
self.assertRaises(exception.ResourceFailure, tr)
@@ -2428,12 +2433,13 @@ class ResourceTest(common.HeatTestCase):
res.status = res.COMPLETE
res.action = res.CREATE
res.store()
+ self.stack.convergence = True
+ res._calling_engine_id = 'engine-9'
rs = resource_objects.Resource.get_obj(self.stack.context, res.id)
rs.update_and_save({'engine_id': 'not-this'})
self._assert_resource_lock(res.id, 'not-this', None)
- tr = scheduler.TaskRunner(res.delete_convergence, 1, {}, 'engine-007',
- self.dummy_timeout, self.dummy_event)
+ tr = scheduler.TaskRunner(res.delete)
ex = self.assertRaises(exception.UpdateInProgress, tr)
msg = ("The resource %s is already being updated." %
res.name)
diff --git a/heat_integrationtests/scenario/test_aodh_alarm.py b/heat_integrationtests/scenario/test_aodh_alarm.py
index 90288a27a..49bcad983 100644
--- a/heat_integrationtests/scenario/test_aodh_alarm.py
+++ b/heat_integrationtests/scenario/test_aodh_alarm.py
@@ -55,3 +55,8 @@ class AodhAlarmTest(scenario_base.ScenarioTestsBase):
# Note: there is little point waiting more than 60s+time to scale up.
self.assertTrue(test.call_until_true(
120, 2, self.check_instance_count, stack_identifier, 2))
+
+ # Temporarily avoids a race condition, addressed in the
+ # next change https://review.openstack.org/#/c/449351/
+ import time
+ time.sleep(3)