summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCrag Wolfe <cwolfe@redhat.com>2016-08-27 03:11:22 -0400
committerCrag Wolfe <cwolfe@redhat.com>2016-11-28 20:13:35 -0800
commit882a640f18c74e1e81ac03743898ae397fd337de (patch)
tree20e8e5e85783a934d4b6eacdbeb13bb2415799e8
parent902990097b3bda6ccee59d839814a26152c3b0d8 (diff)
downloadheat-882a640f18c74e1e81ac03743898ae397fd337de.tar.gz
Refactor purge_deleted, operate on batches of stacks
Avoid large sql "in" clauses by operating on smaller batches of stacks at a time. To avoid transaction overhead and contention on the resource table, the first deletions occur outside of a transaction (are autocommitted). This is OK because the purge is re-rentrant -- we won't lose any stack_id's to delete if something goes wrong before the conn.begin() block. That is, we will not orphan any rows if the purge is run multiple times where an error occurs. Change-Id: I9edf0558ed54820842193560e323df6501411d1d
-rw-r--r--heat/cmd/manage.py10
-rw-r--r--heat/db/sqlalchemy/api.py174
-rw-r--r--heat/db/utils.py4
-rw-r--r--heat/tests/db/test_sqlalchemy_api.py12
4 files changed, 126 insertions, 74 deletions
diff --git a/heat/cmd/manage.py b/heat/cmd/manage.py
index 0bda99907..6f1b6e1f5 100644
--- a/heat/cmd/manage.py
+++ b/heat/cmd/manage.py
@@ -133,7 +133,8 @@ def purge_deleted():
"""Remove database records that have been previously soft deleted."""
utils.purge_deleted(CONF.command.age,
CONF.command.granularity,
- CONF.command.project_id)
+ CONF.command.project_id,
+ CONF.command.batch_size)
def do_crypt_parameters_and_properties():
@@ -179,6 +180,13 @@ def add_command_parsers(subparsers):
parser.add_argument(
'-p', '--project-id',
help=_('Project ID to purge deleted stacks.'))
+ # optional parameter, can be skipped. default='20'
+ parser.add_argument(
+ '-b', '--batch_size', default='20',
+ help=_('Number of stacks to delete at a time (per transaction). '
+ 'Note that a single stack may have many db rows '
+ '(events, etc.) associated with it.'))
+
# update_params parser
parser = subparsers.add_parser('update_params')
parser.set_defaults(func=do_crypt_parameters_and_properties)
diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py
index 854d7e6ea..f1d2c8e23 100644
--- a/heat/db/sqlalchemy/api.py
+++ b/heat/db/sqlalchemy/api.py
@@ -13,6 +13,7 @@
"""Implementation of SQLAlchemy backend."""
import datetime
+import itertools
import sys
from oslo_config import cfg
@@ -1176,13 +1177,18 @@ def service_get_all_by_args(context, host, binary, hostname):
filter_by(hostname=hostname).all())
-def purge_deleted(age, granularity='days', project_id=None):
- try:
- age = int(age)
- except ValueError:
- raise exception.Error(_("age should be an integer"))
- if age < 0:
- raise exception.Error(_("age should be a positive integer"))
+def purge_deleted(age, granularity='days', project_id=None, batch_size=20):
+ def _validate_positive_integer(val, argname):
+ try:
+ return int(val)
+ except ValueError:
+ raise exception.Error(_("%s should be an integer") % argname)
+ if val < 0:
+ raise exception.Error(_("%s should be a positive integer")
+ % argname)
+
+ age = _validate_positive_integer(age, 'age')
+ batch_size = _validate_positive_integer(batch_size, 'batch_size')
if granularity not in ('days', 'hours', 'minutes', 'seconds'):
raise exception.Error(
@@ -1201,6 +1207,46 @@ def purge_deleted(age, granularity='days', project_id=None):
meta.bind = engine
stack = sqlalchemy.Table('stack', meta, autoload=True)
+ service = sqlalchemy.Table('service', meta, autoload=True)
+
+ # Purge deleted services
+ srvc_del = service.delete().where(service.c.deleted_at < time_line)
+ engine.execute(srvc_del)
+
+ # find the soft-deleted stacks that are past their expiry
+ sel = sqlalchemy.select([stack.c.id, stack.c.raw_template_id,
+ stack.c.prev_raw_template_id,
+ stack.c.user_creds_id,
+ stack.c.action,
+ stack.c.status,
+ stack.c.name])
+ if project_id:
+ stack_where = sel.where(and_(
+ stack.c.tenant == project_id,
+ stack.c.deleted_at < time_line))
+ else:
+ stack_where = sel.where(
+ stack.c.deleted_at < time_line)
+
+ stacks = engine.execute(stack_where)
+
+ while True:
+ next_stacks_to_purge = list(itertools.islice(stacks, batch_size))
+ if len(next_stacks_to_purge):
+ _purge_stacks(next_stacks_to_purge, engine, meta)
+ else:
+ break
+
+
+def _purge_stacks(stack_infos, engine, meta):
+ """Purge some stacks and their releated events, raw_templates, etc.
+
+ stack_infos is a list of lists of selected stack columns:
+ [[id, raw_template_id, prev_raw_template_id, user_creds_id,
+ action, status, name], ...]
+ """
+
+ stack = sqlalchemy.Table('stack', meta, autoload=True)
stack_lock = sqlalchemy.Table('stack_lock', meta, autoload=True)
stack_tag = sqlalchemy.Table('stack_tag', meta, autoload=True)
resource = sqlalchemy.Table('resource', meta, autoload=True)
@@ -1210,102 +1256,88 @@ def purge_deleted(age, granularity='days', project_id=None):
raw_template_files = sqlalchemy.Table('raw_template_files', meta,
autoload=True)
user_creds = sqlalchemy.Table('user_creds', meta, autoload=True)
- service = sqlalchemy.Table('service', meta, autoload=True)
syncpoint = sqlalchemy.Table('sync_point', meta, autoload=True)
- # find the soft-deleted stacks that are past their expiry
- if project_id:
- stack_where = sqlalchemy.select([
- stack.c.id, stack.c.raw_template_id,
- stack.c.prev_raw_template_id,
- stack.c.user_creds_id]).where(and_(
- stack.c.tenant == project_id,
- stack.c.deleted_at < time_line))
- else:
- stack_where = sqlalchemy.select([
- stack.c.id, stack.c.raw_template_id,
- stack.c.prev_raw_template_id,
- stack.c.user_creds_id]).where(
- stack.c.deleted_at < time_line)
-
- stacks = list(engine.execute(stack_where))
- if stacks:
- stack_ids = [i[0] for i in stacks]
- # delete stack locks (just in case some got stuck)
- stack_lock_del = stack_lock.delete().where(
- stack_lock.c.stack_id.in_(stack_ids))
- engine.execute(stack_lock_del)
- # delete stack tags
- stack_tag_del = stack_tag.delete().where(
- stack_tag.c.stack_id.in_(stack_ids))
- engine.execute(stack_tag_del)
- # delete resource_data
- res_where = sqlalchemy.select([resource.c.id]).where(
- resource.c.stack_id.in_(stack_ids))
- res_data_del = resource_data.delete().where(
- resource_data.c.resource_id.in_(res_where))
- engine.execute(res_data_del)
- # delete resources
- res_del = resource.delete().where(resource.c.stack_id.in_(stack_ids))
- engine.execute(res_del)
- # delete events
- event_del = event.delete().where(event.c.stack_id.in_(stack_ids))
- engine.execute(event_del)
- # clean up any sync_points that may have lingered
- sync_del = syncpoint.delete().where(
- syncpoint.c.stack_id.in_(stack_ids))
- engine.execute(sync_del)
+ stack_info_str = ','.join([str(i) for i in stack_infos])
+ LOG.info("Purging stacks %s" % stack_info_str)
+
+ stack_ids = [stack_info[0] for stack_info in stack_infos]
+ # delete stack locks (just in case some got stuck)
+ stack_lock_del = stack_lock.delete().where(
+ stack_lock.c.stack_id.in_(stack_ids))
+ engine.execute(stack_lock_del)
+ # delete stack tags
+ stack_tag_del = stack_tag.delete().where(
+ stack_tag.c.stack_id.in_(stack_ids))
+ engine.execute(stack_tag_del)
+ # delete resource_data
+ res_where = sqlalchemy.select([resource.c.id]).where(
+ resource.c.stack_id.in_(stack_ids))
+ res_data_del = resource_data.delete().where(
+ resource_data.c.resource_id.in_(res_where))
+ engine.execute(res_data_del)
+ # delete resources (normally there shouldn't be any)
+ res_del = resource.delete().where(resource.c.stack_id.in_(stack_ids))
+ engine.execute(res_del)
+ # delete events
+ event_del = event.delete().where(event.c.stack_id.in_(stack_ids))
+ engine.execute(event_del)
+ # clean up any sync_points that may have lingered
+ sync_del = syncpoint.delete().where(
+ syncpoint.c.stack_id.in_(stack_ids))
+ engine.execute(sync_del)
+
+ conn = engine.connect()
+ with conn.begin(): # these deletes in a transaction
# delete the stacks
stack_del = stack.delete().where(stack.c.id.in_(stack_ids))
- engine.execute(stack_del)
+ conn.execute(stack_del)
# delete orphaned raw templates
- raw_template_ids = [i[1] for i in stacks if i[1] is not None]
- raw_template_ids.extend(i[2] for i in stacks if i[2] is not None)
- if raw_template_ids:
- # keep those still referenced
+ raw_template_ids = [i[1] for i in stack_infos if i[1] is not None]
+ raw_template_ids.extend(i[2] for i in stack_infos if i[2] is not None)
+ if raw_template_ids: # keep those still referenced
raw_tmpl_sel = sqlalchemy.select([stack.c.raw_template_id]).where(
stack.c.raw_template_id.in_(raw_template_ids))
- raw_tmpl = [i[0] for i in engine.execute(raw_tmpl_sel)]
+ raw_tmpl = [i[0] for i in conn.execute(raw_tmpl_sel)]
raw_template_ids = set(raw_template_ids) - set(raw_tmpl)
+ if raw_template_ids: # keep those still referenced (previous tmpl)
raw_tmpl_sel = sqlalchemy.select(
[stack.c.prev_raw_template_id]).where(
stack.c.prev_raw_template_id.in_(raw_template_ids))
- raw_tmpl = [i[0] for i in engine.execute(raw_tmpl_sel)]
+ raw_tmpl = [i[0] for i in conn.execute(raw_tmpl_sel)]
raw_template_ids = raw_template_ids - set(raw_tmpl)
+ if raw_template_ids: # delete raw_templates if we have any
raw_tmpl_file_sel = sqlalchemy.select(
[raw_template.c.files_id]).where(
raw_template.c.id.in_(raw_template_ids))
- raw_tmpl_file_ids = [i[0] for i in engine.execute(
+ raw_tmpl_file_ids = [i[0] for i in conn.execute(
raw_tmpl_file_sel)]
raw_templ_del = raw_template.delete().where(
raw_template.c.id.in_(raw_template_ids))
- engine.execute(raw_templ_del)
- # purge any raw_template_files that are no longer referenced
- if raw_tmpl_file_ids:
+ conn.execute(raw_templ_del)
+ if raw_tmpl_file_ids: # keep _files still referenced
raw_tmpl_file_sel = sqlalchemy.select(
[raw_template.c.files_id]).where(
raw_template.c.files_id.in_(raw_tmpl_file_ids))
- raw_tmpl_files = [i[0] for i in engine.execute(
+ raw_tmpl_files = [i[0] for i in conn.execute(
raw_tmpl_file_sel)]
raw_tmpl_file_ids = set(raw_tmpl_file_ids) \
- set(raw_tmpl_files)
+ if raw_tmpl_file_ids: # delete _files if we have any
raw_tmpl_file_del = raw_template_files.delete().where(
raw_template_files.c.id.in_(raw_tmpl_file_ids))
- engine.execute(raw_tmpl_file_del)
+ conn.execute(raw_tmpl_file_del)
# purge any user creds that are no longer referenced
- user_creds_ids = [i[3] for i in stacks if i[3] is not None]
- if user_creds_ids:
- # keep those still referenced
+ user_creds_ids = [i[3] for i in stack_infos if i[3] is not None]
+ if user_creds_ids: # keep those still referenced
user_sel = sqlalchemy.select([stack.c.user_creds_id]).where(
stack.c.user_creds_id.in_(user_creds_ids))
- users = [i[0] for i in engine.execute(user_sel)]
+ users = [i[0] for i in conn.execute(user_sel)]
user_creds_ids = set(user_creds_ids) - set(users)
+ if user_creds_ids: # delete if we have any
usr_creds_del = user_creds.delete().where(
user_creds.c.id.in_(user_creds_ids))
- engine.execute(usr_creds_del)
- # Purge deleted services
- srvc_del = service.delete().where(service.c.deleted_at < time_line)
- engine.execute(srvc_del)
+ conn.execute(usr_creds_del)
def sync_point_delete_all_by_stack_and_traversal(context, stack_id,
diff --git a/heat/db/utils.py b/heat/db/utils.py
index 537d99ed3..23905502a 100644
--- a/heat/db/utils.py
+++ b/heat/db/utils.py
@@ -43,8 +43,8 @@ IMPL = LazyPluggable('backend',
sqlalchemy='heat.db.sqlalchemy.api')
-def purge_deleted(age, granularity='days', project_id=None):
- IMPL.purge_deleted(age, granularity, project_id)
+def purge_deleted(age, granularity='days', project_id=None, batch_size=20):
+ IMPL.purge_deleted(age, granularity, project_id, batch_size)
def encrypt_parameters_and_properties(ctxt, encryption_key, verbose):
diff --git a/heat/tests/db/test_sqlalchemy_api.py b/heat/tests/db/test_sqlalchemy_api.py
index e955f9d21..5eb48f5f1 100644
--- a/heat/tests/db/test_sqlalchemy_api.py
+++ b/heat/tests/db/test_sqlalchemy_api.py
@@ -2171,6 +2171,18 @@ class DBAPIStackTest(common.HeatTestCase):
self.assertIsNone(db_api.user_creds_get(
self.ctx, stacks[s].user_creds_id))
+ def test_purge_deleted_batch_arg(self):
+ now = timeutils.utcnow()
+ delta = datetime.timedelta(seconds=3600)
+ deleted = now - delta
+ for i in range(7):
+ create_stack(self.ctx, self.template, self.user_creds,
+ deleted_at=deleted)
+
+ with mock.patch('heat.db.sqlalchemy.api._purge_stacks') as mock_ps:
+ db_api.purge_deleted(age=0, batch_size=2)
+ self.assertEqual(4, mock_ps.call_count)
+
def test_stack_get_root_id(self):
root = create_stack(self.ctx, self.template, self.user_creds,
name='root stack')