summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOleksii Chuprykov <ochuprykov@mirantis.com>2016-02-16 19:18:53 +0200
committerPeter Razumovsky <prazumovsky@mirantis.com>2016-08-22 10:40:29 +0300
commit68944d223091eacd8f9a4621a46b1cdfd98ac70b (patch)
tree16ae00103d5cb663c9ccf644db1acf80ee3b8b7c
parent110cf140b1a6b1a36c5da69ec7e099a0dba0635a (diff)
downloadheat-68944d223091eacd8f9a4621a46b1cdfd98ac70b.tar.gz
Migrate stacks from legacy to convergence engine
Run `heat-manage migrate-convergence-1 <stack_id>` to migrate legacy stack to convergence engine. Heat engine is used for doing migration i.e. migration can't be done offline. Change-Id: Ie7c2498b37937438f16d154b154b3a6ecbf9ff74 Implements-bp: convergence-migrate-stack
-rw-r--r--heat/cmd/manage.py23
-rw-r--r--heat/db/api.py4
-rw-r--r--heat/db/sqlalchemy/api.py11
-rw-r--r--heat/engine/service.py37
-rw-r--r--heat/engine/stack.py31
-rw-r--r--heat/objects/stack.py13
-rw-r--r--heat/rpc/client.py12
-rw-r--r--heat/tests/db/test_sqlalchemy_api.py30
-rw-r--r--heat/tests/engine/service/test_service_engine.py2
-rw-r--r--heat/tests/test_convg_stack.py33
10 files changed, 188 insertions, 8 deletions
diff --git a/heat/cmd/manage.py b/heat/cmd/manage.py
index 01b816c0f..e7669b583 100644
--- a/heat/cmd/manage.py
+++ b/heat/cmd/manage.py
@@ -22,11 +22,14 @@ from oslo_log import log
from six import moves
from heat.common import context
+from heat.common import exception
from heat.common.i18n import _
+from heat.common import messaging
from heat.common import service_utils
from heat.db import api as db_api
from heat.db import utils
from heat.objects import service as service_objects
+from heat.rpc import client as rpc_client
from heat import version
@@ -111,6 +114,21 @@ def do_reset_stack_status():
db_api.reset_stack_status(ctxt, CONF.command.stack_id)
+def do_migrate():
+ messaging.setup()
+ client = rpc_client.EngineClient()
+ ctxt = context.get_admin_context()
+ try:
+ client.migrate_convergence_1(ctxt, CONF.command.stack_id)
+ except exception.NotFound:
+ raise Exception(_("Stack with id %s can not be found.")
+ % CONF.command.stack_id)
+ except exception.ActionInProgress:
+ raise Exception(_("The stack or some of its nested stacks are "
+ "in progress. Note, that all the stacks should be "
+ "in COMPLETE state in order to be migrated."))
+
+
def purge_deleted():
"""Remove database records that have been previously soft deleted."""
utils.purge_deleted(CONF.command.age,
@@ -141,6 +159,11 @@ def add_command_parsers(subparsers):
# positional parameter, can be skipped. default=None
parser.add_argument('version', nargs='?')
+ # migrate-stacks parser
+ parser = subparsers.add_parser('migrate-convergence-1')
+ parser.set_defaults(func=do_migrate)
+ parser.add_argument('stack_id')
+
# purge_deleted parser
parser = subparsers.add_parser('purge_deleted')
parser.set_defaults(func=purge_deleted)
diff --git a/heat/db/api.py b/heat/db/api.py
index 247214eb4..dd5d0eedf 100644
--- a/heat/db/api.py
+++ b/heat/db/api.py
@@ -188,6 +188,10 @@ def stack_get_all_by_owner_id(context, owner_id):
return IMPL.stack_get_all_by_owner_id(context, owner_id)
+def stack_get_all_by_root_owner_id(context, owner_id):
+ return IMPL.stack_get_all_by_root_owner_id(context, owner_id)
+
+
def stack_count_all(context, filters=None,
show_deleted=False, show_nested=False, show_hidden=False,
tags=None, tags_any=None, not_tags=None,
diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py
index 8c35c3a7e..f170b464d 100644
--- a/heat/db/sqlalchemy/api.py
+++ b/heat/db/sqlalchemy/api.py
@@ -224,7 +224,7 @@ def resource_purge_deleted(context, stack_id):
def resource_update(context, resource_id, values, atomic_key,
expected_engine_id=None):
session = context.session
- with session.begin():
+ with session.begin(subtransactions=True):
if atomic_key is None:
values['atomic_key'] = 1
else:
@@ -469,6 +469,13 @@ def stack_get_all_by_owner_id(context, owner_id):
return results
+def stack_get_all_by_root_owner_id(context, owner_id):
+ for stack in stack_get_all_by_owner_id(context, owner_id):
+ yield stack
+ for ch_st in stack_get_all_by_root_owner_id(context, stack.id):
+ yield ch_st
+
+
def _get_sort_keys(sort_keys, mapping):
"""Returns an array containing only whitelisted keys
@@ -627,7 +634,7 @@ def stack_update(context, stack_id, values, exp_trvsl=None):
session = context.session
- with session.begin():
+ with session.begin(subtransactions=True):
rows_updated = (session.query(models.Stack)
.filter(models.Stack.id == stack.id)
.filter(models.Stack.current_traversal
diff --git a/heat/engine/service.py b/heat/engine/service.py
index 49788d4da..d4d36f1c7 100644
--- a/heat/engine/service.py
+++ b/heat/engine/service.py
@@ -299,7 +299,7 @@ class EngineService(service.Service):
by the RPC caller.
"""
- RPC_API_VERSION = '1.33'
+ RPC_API_VERSION = '1.34'
def __init__(self, host, topic):
super(EngineService, self).__init__()
@@ -2221,6 +2221,41 @@ class EngineService(service.Service):
for srv in service_objects.Service.get_all(cnxt)]
return result
+ @context.request_context
+ def migrate_convergence_1(self, ctxt, stack_id):
+ parent_stack = parser.Stack.load(ctxt,
+ stack_id=stack_id,
+ show_deleted=False)
+ if parent_stack.convergence:
+ LOG.info(_LI("Convergence was already enabled for stack %s"),
+ stack_id)
+ return
+ db_stacks = stack_object.Stack.get_all_by_root_owner_id(
+ ctxt, parent_stack.id)
+ stacks = [parser.Stack.load(ctxt, stack_id=st.id,
+ stack=st) for st in db_stacks]
+ stacks.append(parent_stack)
+ locks = []
+ try:
+ for st in stacks:
+ lock = stack_lock.StackLock(ctxt, st.id, self.engine_id)
+ lock.acquire()
+ locks.append(lock)
+ sess = ctxt.session
+ sess.begin(subtransactions=True)
+ try:
+ for st in stacks:
+ if not st.convergence:
+ st.service_check_defer = True
+ st.migrate_to_convergence()
+ sess.commit()
+ except Exception:
+ sess.rollback()
+ raise
+ finally:
+ for lock in locks:
+ lock.release()
+
def service_manage_report(self):
cnxt = context.get_admin_context()
diff --git a/heat/engine/stack.py b/heat/engine/stack.py
index b09465ad8..a50f97713 100644
--- a/heat/engine/stack.py
+++ b/heat/engine/stack.py
@@ -603,7 +603,8 @@ class Stack(collections.Mapping):
return stack
@profiler.trace('Stack.store', hide_args=False)
- def store(self, backup=False, exp_trvsl=None):
+ def store(self, backup=False, exp_trvsl=None,
+ ignore_traversal_check=False):
"""Store the stack in the database and return its ID.
If self.id is set, we update the existing stack.
@@ -619,7 +620,7 @@ class Stack(collections.Mapping):
s['raw_template_id'] = self.t.id
if self.id:
- if exp_trvsl is None:
+ if exp_trvsl is None and not ignore_traversal_check:
exp_trvsl = self.current_traversal
if self.convergence:
@@ -1352,6 +1353,16 @@ class Stack(collections.Mapping):
rsrcs[existing_rsrc_db.name] = existing_rsrc_db
return rsrcs
+ def set_resource_deps(self):
+ curr_name_translated_dep = self.dependencies.translate(lambda res:
+ res.id)
+ ext_rsrcs_db = self.db_active_resources_get()
+ for r in self.dependencies:
+ r.needed_by = list(curr_name_translated_dep.required_by(r.id))
+ r.requires = list(curr_name_translated_dep.requires(r.id))
+ resource.Resource.set_needed_by(ext_rsrcs_db[r.id], r.needed_by)
+ resource.Resource.set_requires(ext_rsrcs_db[r.id], r.requires)
+
def _compute_convg_dependencies(self, existing_resources,
current_template_deps, current_resources):
def make_graph_key(rsrc):
@@ -2064,3 +2075,19 @@ class Stack(collections.Mapping):
return self.time_elapsed() > self.timeout_secs()
return False
+
+ def migrate_to_convergence(self):
+ values = {'current_template_id': self.t.id}
+ db_rsrcs = self.db_active_resources_get()
+ if db_rsrcs is not None:
+ for res in db_rsrcs.values():
+ res.update_and_save(values=values)
+ self.set_resource_deps()
+ self.current_traversal = uuidutils.generate_uuid()
+ self.convergence = True
+ prev_raw_template_id = self.prev_raw_template_id
+ self.prev_raw_template_id = None
+ self.store(ignore_traversal_check=True)
+ if prev_raw_template_id:
+ raw_template_object.RawTemplate.delete(self.context,
+ prev_raw_template_id)
diff --git a/heat/objects/stack.py b/heat/objects/stack.py
index ee19cf2b2..65280fca8 100644
--- a/heat/objects/stack.py
+++ b/heat/objects/stack.py
@@ -15,10 +15,9 @@
"""Stack object."""
-import six
-
from oslo_versionedobjects import base
from oslo_versionedobjects import fields
+import six
from heat.common import exception
from heat.common.i18n import _
@@ -146,6 +145,16 @@ class Stack(
pass
@classmethod
+ def get_all_by_root_owner_id(cls, context, root_owner_id):
+ db_stacks = db_api.stack_get_all_by_root_owner_id(context,
+ root_owner_id)
+ for db_stack in db_stacks:
+ try:
+ yield cls._from_db_object(context, cls(context), db_stack)
+ except exception.NotFound:
+ pass
+
+ @classmethod
def count_all(cls, context, **kwargs):
return db_api.stack_count_all(context, **kwargs)
diff --git a/heat/rpc/client.py b/heat/rpc/client.py
index 2ce1a6c58..7f539538d 100644
--- a/heat/rpc/client.py
+++ b/heat/rpc/client.py
@@ -56,6 +56,7 @@ class EngineClient(object):
1.32 - Add get_files call
1.33 - Remove tenant_safe from list_stacks, count_stacks
and list_software_configs
+ 1.34 - Add migrate_convergence_1 call
"""
BASE_RPC_API_VERSION = '1.0'
@@ -846,3 +847,14 @@ class EngineClient(object):
self.make_msg('export_stack',
stack_identity=stack_identity),
version='1.22')
+
+ def migrate_convergence_1(self, ctxt, stack_id):
+ """Migrate the stack to convergence engine
+
+ :param ctxt: RPC context
+ :param stack_name: Name of the stack you want to migrate
+ """
+ return self.call(ctxt,
+ self.make_msg('migrate_convergence_1',
+ stack_id=stack_id),
+ version='1.34')
diff --git a/heat/tests/db/test_sqlalchemy_api.py b/heat/tests/db/test_sqlalchemy_api.py
index e388c4816..9b9f7515c 100644
--- a/heat/tests/db/test_sqlalchemy_api.py
+++ b/heat/tests/db/test_sqlalchemy_api.py
@@ -1881,6 +1881,36 @@ class DBAPIStackTest(common.HeatTestCase):
parent_stack2.id)
self.assertEqual(2, len(stack2_children))
+ def test_stack_get_all_by_root_owner_id(self):
+ parent_stack1 = create_stack(self.ctx, self.template, self.user_creds)
+ parent_stack2 = create_stack(self.ctx, self.template, self.user_creds)
+ for i in range(3):
+ lvl1_st = create_stack(self.ctx, self.template, self.user_creds,
+ owner_id=parent_stack1.id)
+ for j in range(2):
+ create_stack(self.ctx, self.template, self.user_creds,
+ owner_id=lvl1_st.id)
+ for i in range(2):
+ lvl1_st = create_stack(self.ctx, self.template, self.user_creds,
+ owner_id=parent_stack2.id)
+ for j in range(4):
+ lvl2_st = create_stack(self.ctx, self.template,
+ self.user_creds, owner_id=lvl1_st.id)
+ for k in range(3):
+ create_stack(self.ctx, self.template,
+ self.user_creds, owner_id=lvl2_st.id)
+
+ stack1_children = db_api.stack_get_all_by_root_owner_id(
+ self.ctx,
+ parent_stack1.id)
+ # 3 stacks on the first level + 6 stack on the second
+ self.assertEqual(9, len(list(stack1_children)))
+ stack2_children = db_api.stack_get_all_by_root_owner_id(
+ self.ctx,
+ parent_stack2.id)
+ # 2 + 8 + 24
+ self.assertEqual(34, len(list(stack2_children)))
+
def test_stack_get_all_with_regular_tenant(self):
values = [
{'tenant': UUID1},
diff --git a/heat/tests/engine/service/test_service_engine.py b/heat/tests/engine/service/test_service_engine.py
index 0aa2d41b3..858ab0891 100644
--- a/heat/tests/engine/service/test_service_engine.py
+++ b/heat/tests/engine/service/test_service_engine.py
@@ -40,7 +40,7 @@ class ServiceEngineTest(common.HeatTestCase):
def test_make_sure_rpc_version(self):
self.assertEqual(
- '1.33',
+ '1.34',
service.EngineService.RPC_API_VERSION,
('RPC version is changed, please update this test to new version '
'and make sure additional test cases are added for RPC APIs '
diff --git a/heat/tests/test_convg_stack.py b/heat/tests/test_convg_stack.py
index 269acc04f..3e2353da9 100644
--- a/heat/tests/test_convg_stack.py
+++ b/heat/tests/test_convg_stack.py
@@ -873,3 +873,36 @@ class TestConvgComputeDependencies(common.HeatTestCase):
'((4, False), (3, False)), '
'((5, False), (3, False))])',
repr(self.stack._convg_deps))
+
+
+class TestConvergenceMigration(common.HeatTestCase):
+ def test_migration_to_convergence_engine(self):
+ self.ctx = utils.dummy_context()
+ self.stack = tools.get_stack('test_stack_convg', self.ctx,
+ template=tools.string_template_five)
+ self.stack.store()
+ for r in self.stack.resources.values():
+ r._store()
+ self.stack.migrate_to_convergence()
+ self.stack = self.stack.load(self.ctx, self.stack.id)
+
+ self.assertTrue(self.stack.convergence)
+ self.assertIsNone(self.stack.prev_raw_template_id)
+ exp_required_by = {'A': ['C'], 'B': ['C'], 'C': ['D', 'E'],
+ 'D': [], 'E': []}
+ exp_requires = {'A': [], 'B': [], 'C': ['A', 'B'], 'D': ['C'],
+ 'E': ['C']}
+ exp_tmpl_id = self.stack.t.id
+
+ def id_to_name(ids):
+ names = []
+ for r in self.stack.resources.values():
+ if r.id in ids:
+ names.append(r.name)
+ return names
+ for r in self.stack.resources.values():
+ self.assertEqual(sorted(exp_required_by[r.name]),
+ sorted(r.required_by()))
+ self.assertEqual(sorted(exp_requires[r.name]),
+ sorted(id_to_name(r.requires)))
+ self.assertEqual(exp_tmpl_id, r.current_template_id)