diff options
author | Crag Wolfe <cwolfe@redhat.com> | 2016-08-27 03:11:22 -0400 |
---|---|---|
committer | Crag Wolfe <cwolfe@redhat.com> | 2016-11-28 20:13:35 -0800 |
commit | 882a640f18c74e1e81ac03743898ae397fd337de (patch) | |
tree | 20e8e5e85783a934d4b6eacdbeb13bb2415799e8 | |
parent | 902990097b3bda6ccee59d839814a26152c3b0d8 (diff) | |
download | heat-882a640f18c74e1e81ac03743898ae397fd337de.tar.gz |
Refactor purge_deleted, operate on batches of stacks
Avoid large sql "in" clauses by operating on smaller batches of stacks
at a time.
To avoid transaction overhead and contention on the resource table,
the first deletions occur outside of a transaction (are
autocommitted). This is OK because the purge is re-rentrant -- we
won't lose any stack_id's to delete if something goes wrong before the
conn.begin() block. That is, we will not orphan any rows if the purge
is run multiple times where an error occurs.
Change-Id: I9edf0558ed54820842193560e323df6501411d1d
-rw-r--r-- | heat/cmd/manage.py | 10 | ||||
-rw-r--r-- | heat/db/sqlalchemy/api.py | 174 | ||||
-rw-r--r-- | heat/db/utils.py | 4 | ||||
-rw-r--r-- | heat/tests/db/test_sqlalchemy_api.py | 12 |
4 files changed, 126 insertions, 74 deletions
diff --git a/heat/cmd/manage.py b/heat/cmd/manage.py index 0bda99907..6f1b6e1f5 100644 --- a/heat/cmd/manage.py +++ b/heat/cmd/manage.py @@ -133,7 +133,8 @@ def purge_deleted(): """Remove database records that have been previously soft deleted.""" utils.purge_deleted(CONF.command.age, CONF.command.granularity, - CONF.command.project_id) + CONF.command.project_id, + CONF.command.batch_size) def do_crypt_parameters_and_properties(): @@ -179,6 +180,13 @@ def add_command_parsers(subparsers): parser.add_argument( '-p', '--project-id', help=_('Project ID to purge deleted stacks.')) + # optional parameter, can be skipped. default='20' + parser.add_argument( + '-b', '--batch_size', default='20', + help=_('Number of stacks to delete at a time (per transaction). ' + 'Note that a single stack may have many db rows ' + '(events, etc.) associated with it.')) + # update_params parser parser = subparsers.add_parser('update_params') parser.set_defaults(func=do_crypt_parameters_and_properties) diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py index 854d7e6ea..f1d2c8e23 100644 --- a/heat/db/sqlalchemy/api.py +++ b/heat/db/sqlalchemy/api.py @@ -13,6 +13,7 @@ """Implementation of SQLAlchemy backend.""" import datetime +import itertools import sys from oslo_config import cfg @@ -1176,13 +1177,18 @@ def service_get_all_by_args(context, host, binary, hostname): filter_by(hostname=hostname).all()) -def purge_deleted(age, granularity='days', project_id=None): - try: - age = int(age) - except ValueError: - raise exception.Error(_("age should be an integer")) - if age < 0: - raise exception.Error(_("age should be a positive integer")) +def purge_deleted(age, granularity='days', project_id=None, batch_size=20): + def _validate_positive_integer(val, argname): + try: + return int(val) + except ValueError: + raise exception.Error(_("%s should be an integer") % argname) + if val < 0: + raise exception.Error(_("%s should be a positive integer") + % argname) + + age = _validate_positive_integer(age, 'age') + batch_size = _validate_positive_integer(batch_size, 'batch_size') if granularity not in ('days', 'hours', 'minutes', 'seconds'): raise exception.Error( @@ -1201,6 +1207,46 @@ def purge_deleted(age, granularity='days', project_id=None): meta.bind = engine stack = sqlalchemy.Table('stack', meta, autoload=True) + service = sqlalchemy.Table('service', meta, autoload=True) + + # Purge deleted services + srvc_del = service.delete().where(service.c.deleted_at < time_line) + engine.execute(srvc_del) + + # find the soft-deleted stacks that are past their expiry + sel = sqlalchemy.select([stack.c.id, stack.c.raw_template_id, + stack.c.prev_raw_template_id, + stack.c.user_creds_id, + stack.c.action, + stack.c.status, + stack.c.name]) + if project_id: + stack_where = sel.where(and_( + stack.c.tenant == project_id, + stack.c.deleted_at < time_line)) + else: + stack_where = sel.where( + stack.c.deleted_at < time_line) + + stacks = engine.execute(stack_where) + + while True: + next_stacks_to_purge = list(itertools.islice(stacks, batch_size)) + if len(next_stacks_to_purge): + _purge_stacks(next_stacks_to_purge, engine, meta) + else: + break + + +def _purge_stacks(stack_infos, engine, meta): + """Purge some stacks and their releated events, raw_templates, etc. + + stack_infos is a list of lists of selected stack columns: + [[id, raw_template_id, prev_raw_template_id, user_creds_id, + action, status, name], ...] + """ + + stack = sqlalchemy.Table('stack', meta, autoload=True) stack_lock = sqlalchemy.Table('stack_lock', meta, autoload=True) stack_tag = sqlalchemy.Table('stack_tag', meta, autoload=True) resource = sqlalchemy.Table('resource', meta, autoload=True) @@ -1210,102 +1256,88 @@ def purge_deleted(age, granularity='days', project_id=None): raw_template_files = sqlalchemy.Table('raw_template_files', meta, autoload=True) user_creds = sqlalchemy.Table('user_creds', meta, autoload=True) - service = sqlalchemy.Table('service', meta, autoload=True) syncpoint = sqlalchemy.Table('sync_point', meta, autoload=True) - # find the soft-deleted stacks that are past their expiry - if project_id: - stack_where = sqlalchemy.select([ - stack.c.id, stack.c.raw_template_id, - stack.c.prev_raw_template_id, - stack.c.user_creds_id]).where(and_( - stack.c.tenant == project_id, - stack.c.deleted_at < time_line)) - else: - stack_where = sqlalchemy.select([ - stack.c.id, stack.c.raw_template_id, - stack.c.prev_raw_template_id, - stack.c.user_creds_id]).where( - stack.c.deleted_at < time_line) - - stacks = list(engine.execute(stack_where)) - if stacks: - stack_ids = [i[0] for i in stacks] - # delete stack locks (just in case some got stuck) - stack_lock_del = stack_lock.delete().where( - stack_lock.c.stack_id.in_(stack_ids)) - engine.execute(stack_lock_del) - # delete stack tags - stack_tag_del = stack_tag.delete().where( - stack_tag.c.stack_id.in_(stack_ids)) - engine.execute(stack_tag_del) - # delete resource_data - res_where = sqlalchemy.select([resource.c.id]).where( - resource.c.stack_id.in_(stack_ids)) - res_data_del = resource_data.delete().where( - resource_data.c.resource_id.in_(res_where)) - engine.execute(res_data_del) - # delete resources - res_del = resource.delete().where(resource.c.stack_id.in_(stack_ids)) - engine.execute(res_del) - # delete events - event_del = event.delete().where(event.c.stack_id.in_(stack_ids)) - engine.execute(event_del) - # clean up any sync_points that may have lingered - sync_del = syncpoint.delete().where( - syncpoint.c.stack_id.in_(stack_ids)) - engine.execute(sync_del) + stack_info_str = ','.join([str(i) for i in stack_infos]) + LOG.info("Purging stacks %s" % stack_info_str) + + stack_ids = [stack_info[0] for stack_info in stack_infos] + # delete stack locks (just in case some got stuck) + stack_lock_del = stack_lock.delete().where( + stack_lock.c.stack_id.in_(stack_ids)) + engine.execute(stack_lock_del) + # delete stack tags + stack_tag_del = stack_tag.delete().where( + stack_tag.c.stack_id.in_(stack_ids)) + engine.execute(stack_tag_del) + # delete resource_data + res_where = sqlalchemy.select([resource.c.id]).where( + resource.c.stack_id.in_(stack_ids)) + res_data_del = resource_data.delete().where( + resource_data.c.resource_id.in_(res_where)) + engine.execute(res_data_del) + # delete resources (normally there shouldn't be any) + res_del = resource.delete().where(resource.c.stack_id.in_(stack_ids)) + engine.execute(res_del) + # delete events + event_del = event.delete().where(event.c.stack_id.in_(stack_ids)) + engine.execute(event_del) + # clean up any sync_points that may have lingered + sync_del = syncpoint.delete().where( + syncpoint.c.stack_id.in_(stack_ids)) + engine.execute(sync_del) + + conn = engine.connect() + with conn.begin(): # these deletes in a transaction # delete the stacks stack_del = stack.delete().where(stack.c.id.in_(stack_ids)) - engine.execute(stack_del) + conn.execute(stack_del) # delete orphaned raw templates - raw_template_ids = [i[1] for i in stacks if i[1] is not None] - raw_template_ids.extend(i[2] for i in stacks if i[2] is not None) - if raw_template_ids: - # keep those still referenced + raw_template_ids = [i[1] for i in stack_infos if i[1] is not None] + raw_template_ids.extend(i[2] for i in stack_infos if i[2] is not None) + if raw_template_ids: # keep those still referenced raw_tmpl_sel = sqlalchemy.select([stack.c.raw_template_id]).where( stack.c.raw_template_id.in_(raw_template_ids)) - raw_tmpl = [i[0] for i in engine.execute(raw_tmpl_sel)] + raw_tmpl = [i[0] for i in conn.execute(raw_tmpl_sel)] raw_template_ids = set(raw_template_ids) - set(raw_tmpl) + if raw_template_ids: # keep those still referenced (previous tmpl) raw_tmpl_sel = sqlalchemy.select( [stack.c.prev_raw_template_id]).where( stack.c.prev_raw_template_id.in_(raw_template_ids)) - raw_tmpl = [i[0] for i in engine.execute(raw_tmpl_sel)] + raw_tmpl = [i[0] for i in conn.execute(raw_tmpl_sel)] raw_template_ids = raw_template_ids - set(raw_tmpl) + if raw_template_ids: # delete raw_templates if we have any raw_tmpl_file_sel = sqlalchemy.select( [raw_template.c.files_id]).where( raw_template.c.id.in_(raw_template_ids)) - raw_tmpl_file_ids = [i[0] for i in engine.execute( + raw_tmpl_file_ids = [i[0] for i in conn.execute( raw_tmpl_file_sel)] raw_templ_del = raw_template.delete().where( raw_template.c.id.in_(raw_template_ids)) - engine.execute(raw_templ_del) - # purge any raw_template_files that are no longer referenced - if raw_tmpl_file_ids: + conn.execute(raw_templ_del) + if raw_tmpl_file_ids: # keep _files still referenced raw_tmpl_file_sel = sqlalchemy.select( [raw_template.c.files_id]).where( raw_template.c.files_id.in_(raw_tmpl_file_ids)) - raw_tmpl_files = [i[0] for i in engine.execute( + raw_tmpl_files = [i[0] for i in conn.execute( raw_tmpl_file_sel)] raw_tmpl_file_ids = set(raw_tmpl_file_ids) \ - set(raw_tmpl_files) + if raw_tmpl_file_ids: # delete _files if we have any raw_tmpl_file_del = raw_template_files.delete().where( raw_template_files.c.id.in_(raw_tmpl_file_ids)) - engine.execute(raw_tmpl_file_del) + conn.execute(raw_tmpl_file_del) # purge any user creds that are no longer referenced - user_creds_ids = [i[3] for i in stacks if i[3] is not None] - if user_creds_ids: - # keep those still referenced + user_creds_ids = [i[3] for i in stack_infos if i[3] is not None] + if user_creds_ids: # keep those still referenced user_sel = sqlalchemy.select([stack.c.user_creds_id]).where( stack.c.user_creds_id.in_(user_creds_ids)) - users = [i[0] for i in engine.execute(user_sel)] + users = [i[0] for i in conn.execute(user_sel)] user_creds_ids = set(user_creds_ids) - set(users) + if user_creds_ids: # delete if we have any usr_creds_del = user_creds.delete().where( user_creds.c.id.in_(user_creds_ids)) - engine.execute(usr_creds_del) - # Purge deleted services - srvc_del = service.delete().where(service.c.deleted_at < time_line) - engine.execute(srvc_del) + conn.execute(usr_creds_del) def sync_point_delete_all_by_stack_and_traversal(context, stack_id, diff --git a/heat/db/utils.py b/heat/db/utils.py index 537d99ed3..23905502a 100644 --- a/heat/db/utils.py +++ b/heat/db/utils.py @@ -43,8 +43,8 @@ IMPL = LazyPluggable('backend', sqlalchemy='heat.db.sqlalchemy.api') -def purge_deleted(age, granularity='days', project_id=None): - IMPL.purge_deleted(age, granularity, project_id) +def purge_deleted(age, granularity='days', project_id=None, batch_size=20): + IMPL.purge_deleted(age, granularity, project_id, batch_size) def encrypt_parameters_and_properties(ctxt, encryption_key, verbose): diff --git a/heat/tests/db/test_sqlalchemy_api.py b/heat/tests/db/test_sqlalchemy_api.py index e955f9d21..5eb48f5f1 100644 --- a/heat/tests/db/test_sqlalchemy_api.py +++ b/heat/tests/db/test_sqlalchemy_api.py @@ -2171,6 +2171,18 @@ class DBAPIStackTest(common.HeatTestCase): self.assertIsNone(db_api.user_creds_get( self.ctx, stacks[s].user_creds_id)) + def test_purge_deleted_batch_arg(self): + now = timeutils.utcnow() + delta = datetime.timedelta(seconds=3600) + deleted = now - delta + for i in range(7): + create_stack(self.ctx, self.template, self.user_creds, + deleted_at=deleted) + + with mock.patch('heat.db.sqlalchemy.api._purge_stacks') as mock_ps: + db_api.purge_deleted(age=0, batch_size=2) + self.assertEqual(4, mock_ps.call_count) + def test_stack_get_root_id(self): root = create_stack(self.ctx, self.template, self.user_creds, name='root stack') |