diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-06-17 17:28:27 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-06-17 17:28:27 +0000 |
commit | ef14a15a8ee270d831851864b22bcbcc2ae078e0 (patch) | |
tree | f3728e11d6f836d2d317585c6be33b4026505044 | |
parent | e1613d1bbe80be6fd76e74381eeecbc088fea1a8 (diff) | |
parent | 2f743b2f20a559c5d2e05aa716b9137e928eb017 (diff) | |
download | nova-ef14a15a8ee270d831851864b22bcbcc2ae078e0.tar.gz |
Merge "Implement online schema migrations"
-rw-r--r-- | nova/cmd/manage.py | 18 | ||||
-rw-r--r-- | nova/db/migration.py | 15 | ||||
-rw-r--r-- | nova/db/sqlalchemy/migration.py | 984 | ||||
-rw-r--r-- | nova/db/sqlalchemy/models.py | 9 | ||||
-rw-r--r-- | nova/exception.py | 4 | ||||
-rw-r--r-- | nova/tests/unit/db/test_migrations.py | 288 | ||||
-rw-r--r-- | nova/tests/unit/db/test_sqlalchemy_migration.py | 5 |
7 files changed, 1320 insertions, 3 deletions
diff --git a/nova/cmd/manage.py b/nova/cmd/manage.py index 6253168ecf..3d230d00d7 100644 --- a/nova/cmd/manage.py +++ b/nova/cmd/manage.py @@ -908,6 +908,24 @@ class DbCommands(object): """Sync the database up to the most recent version.""" return migration.db_sync(version) + @args('--dry-run', action='store_true', dest='dry_run', + default=False, help='Print SQL statements instead of executing') + def expand(self, dry_run): + """Expand database schema.""" + return migration.db_expand(dry_run) + + @args('--dry-run', action='store_true', dest='dry_run', + default=False, help='Print SQL statements instead of executing') + def migrate(self, dry_run): + """Migrate database schema.""" + return migration.db_migrate(dry_run) + + @args('--dry-run', action='store_true', dest='dry_run', + default=False, help='Print SQL statements instead of executing') + def contract(self, dry_run): + """Contract database schema.""" + return migration.db_contract(dry_run) + def version(self): """Print the current database version.""" print(migration.db_version()) diff --git a/nova/db/migration.py b/nova/db/migration.py index 0575b4528e..7df3c796c0 100644 --- a/nova/db/migration.py +++ b/nova/db/migration.py @@ -26,6 +26,21 @@ def db_sync(version=None, database='main'): return IMPL.db_sync(version=version, database=database) +def db_expand(dryrun=False, database='main'): + """Expand database schema.""" + return IMPL.db_expand(dryrun=dryrun, database=database) + + +def db_migrate(dryrun=False, database='main'): + """Migrate database schema.""" + return IMPL.db_migrate(dryrun=dryrun, database=database) + + +def db_contract(dryrun=False, database='main'): + """Contract database schema.""" + return IMPL.db_contract(dryrun=dryrun, database=database) + + def db_version(database='main'): """Display the current database version.""" return IMPL.db_version(database=database) diff --git a/nova/db/sqlalchemy/migration.py b/nova/db/sqlalchemy/migration.py index 067862f869..851b8eadbb 100644 --- a/nova/db/sqlalchemy/migration.py +++ b/nova/db/sqlalchemy/migration.py @@ -14,8 +14,13 @@ # License for the specific language governing permissions and limitations # under the License. +import base64 import os +import random +import alembic.autogenerate +import alembic.migration +import alembic.operations from migrate import exceptions as versioning_exceptions from migrate.versioning import api as versioning_api from migrate.versioning.repository import Repository @@ -25,6 +30,7 @@ import sqlalchemy from sqlalchemy.sql import null from nova.db.sqlalchemy import api as db_session +from nova.db.sqlalchemy import models from nova import exception from nova.i18n import _ @@ -43,6 +49,44 @@ def get_engine(database='main'): return db_session.get_api_engine() +def _db_sync_locked(repository): + engine = get_engine() + try: + migrate_version = db_utils.get_table(engine, repository.version_table) + except sqlalchemy.exc.NoSuchTableError: + # Table doesn't exist yet, cannot be locked + return False + + row = sqlalchemy.sql.select([migrate_version]).\ + where(migrate_version.c.repository_id == repository.id).\ + execute().fetchone() + if not row or 'locked' not in row: + # 'db expand' will create row if missing and it will add locked + # column if missing. If 'db expand' hasn't been run, then the + # repo can't be locked + return False + + return row['locked'] + + +def _set_db_sync_lock(repository, locked): + locked = locked and 1 or 0 + + engine = get_engine() + migrate_version = db_utils.get_table(engine, repository.version_table) + return migrate_version.update().\ + where(migrate_version.c.repository_id == repository.id).\ + values(locked=locked).execute().rowcount + + +def _set_db_sync_version(repository, version): + engine = get_engine() + migrate_version = db_utils.get_table(engine, repository.version_table) + migrate_version.update().\ + where(migrate_version.c.repository_id == repository.id).\ + values(version=version).execute() + + def db_sync(version=None, database='main'): if version is not None: try: @@ -52,6 +96,11 @@ def db_sync(version=None, database='main'): current_version = db_version(database) repository = _find_migrate_repo(database) + + if _db_sync_locked(repository): + msg = _("Cannot run 'db sync' until 'db contract' is run") + raise exception.DatabaseMigrationError(reason=msg) + if version is None or version > current_version: return versioning_api.upgrade(get_engine(database), repository, version) @@ -84,6 +133,941 @@ def db_initial_version(database='main'): return INIT_VERSION[database] +def _ignore_table(name): + # Anything starting with dump_ is a backup from previous + # migration scripts. + return name.startswith('dump_') + + +def _include_object(object_, name, type_, reflected, compare_to): + if type_ == 'table': + return not _ignore_table(name) + + return True + + +_REFLECT_CACHE = {} + + +def _compare_type(context, inspector_column, metadata_column, + inspector_type, metadata_type): + # Types can be silently modified on the server side. For instance, + # under MySQL, the "BOOL" type is an alias to "TINYINT". As a result, + # creating a column with "BOOL" will reflect as "TINYINT". Instead of + # manually maintaining a mapping of types per database engine, let's + # create a temporary table with metadata_type, reflect that, then + # compare the two reflected values to see if they match. + + # Check with the alembic implementation first. I haven't seen a false + # negative yet (where it considers the types the same when they really + # aren't), but there are some false positives dealing with aliasing + # of types and some metadata. + if not context.impl.compare_type(inspector_column, metadata_column): + return False + + key = str(metadata_type) + reflected_type = _REFLECT_CACHE.get(key) + if reflected_type is None: + conn = context.bind.connect() + tmp_meta = sqlalchemy.MetaData(conn) + + randchars = ''.join(chr(random.randint(0, 255)) for _ in xrange(5)) + tmp_table_name = 'reflect_' + base64.b32encode(randchars).lower() + tmp_table = sqlalchemy.Table(tmp_table_name, tmp_meta, + sqlalchemy.Column('a', metadata_type), + prefixes=['TEMPORARY']) + tmp_table.create() + + inspect = sqlalchemy.inspect(conn) + columns = inspect.get_columns(tmp_table_name) + + tmp_table.drop() + + column_types = {c['name']: c['type'] for c in columns} + reflected_type = column_types['a'] + _REFLECT_CACHE[key] = reflected_type + + # Now compare the types + if inspector_type.__class__ is not reflected_type.__class__: + return True + + # And compare any specific attributes about this type + # TODO(johannes): This should handle dialect specific attributes + # (eg charset, collation, etc on MySQL VARCHAR type). This is + # probably best done in alembic correctly + for attr in ('length', 'display_width'): + if (hasattr(inspector_type, attr) and + getattr(inspector_type, attr) != getattr(reflected_type, attr)): + return True + + return False + + +def _create_migration_context(as_sql=False): + engine = get_engine() + opts = { + 'include_object': _include_object, + 'compare_type': _compare_type, + 'as_sql': as_sql, + } + return alembic.migration.MigrationContext.configure(engine, opts=opts) + + +class OperationBase(object): + # Be conservative by default + desired_phase = 'migrate' + removes = False + + def __init__(self): + self.provides = set() + self.depends = set() + self.conflicts = set() + + +class AddTable(OperationBase): + # Always safe since no old code should be using this table + desired_phase = 'expand' + + def __init__(self, table): + super(AddTable, self).__init__() + + self.table = table + + self.provides = set(('column', table.name, c.name) + for c in table.columns) + + def execute(self, ddlop): + # Need to copy columns so they are unlinked from metadata table + columns = [c.copy() for c in self.table.columns] + ddlop.create_table(self.table.name, *columns, + mysql_engine='InnoDB', + mysql_charset='utf8') + + def __repr__(self): + return '<AddTable table_name=%s>' % self.table.name + + +class DropTable(OperationBase): + # Always safe since no new code should be using this table + desired_phase = 'contract' + removes = True + + def __init__(self, table): + super(DropTable, self).__init__() + + self.table = table + + provides = set(('column', table.name, c.name) + for c in table.columns) + + # Indexes and constraints are implicitly dropped for a DROP TABLE. + # Add the indexcol so foreign keys get ordered correctly + for index in table.indexes: + column_names = [c.name for c in index.columns] + provides.add(('indexcol', table.name, column_names[0])) + + for constraint in table.constraints: + if not isinstance(constraint, (sqlalchemy.UniqueConstraint, + sqlalchemy.PrimaryKeyConstraint)): + continue + + column_names = [c.name for c in constraint.columns] + # SQLAlchemy can add a PrimaryKeyConstraint even if one + # doesn't exist. In that case, column_names will be empty + if column_names: + provides.add(('indexcol', table.name, column_names[0])) + + self.provides = provides + + def execute(self, ddlop): + ddlop.drop_table(self.table.name) + + def __repr__(self): + return '<DropTable table_name=%r>' % self.table.name + + +class AddColumn(OperationBase): + def __init__(self, table_name, column, desired_phase=None): + super(AddColumn, self).__init__() + + self.table_name = table_name + self.column = column + + if desired_phase: + self.desired_phase = desired_phase + + self.provides = set([('column', table_name, column.name)]) + + def execute(self, ddlop): + column = self.column.copy() + ddlop.add_column(self.table_name, column) + + def __repr__(self): + return ('<AddColumn column={table_name=%s column_name=%s type=%r}>' % + (self.table_name, self.column.name, self.column.type)) + + +class AlterColumn(OperationBase): + def __init__(self, table_name, column_name, args): + super(AlterColumn, self).__init__() + + self.table_name = table_name + self.column_name = column_name + self.args = args + + self.provides = [('column', table_name, column_name)] + # Cannot alter column with foreign key + self.conflicts = [('fkcol', table_name, column_name)] + + def execute(self, ddlop): + ddlop.alter_column(self.table_name, self.column_name, **self.args) + + def __repr__(self): + return ('<AlterColumn table_name=%s column_name=%s args=%r>' % + (self.table_name, self.column_name, self.args)) + + +class DropColumn(OperationBase): + # Always online safe since no new code should be using this column + desired_phase = 'contract' + removes = True + + def __init__(self, table_name, column): + super(DropColumn, self).__init__() + + self.table_name = table_name + self.column = column + + self.provides = set([('column', table_name, column.name)]) + + def execute(self, ddlop): + ddlop.drop_column(self.table_name, self.column.name) + + def __repr__(self): + return ('<DropColumn column={table_name=%s column_name=%s}>' % + (self.table_name, self.column.name)) + + +class AddIndex(OperationBase): + def __init__(self, index, args): + super(AddIndex, self).__init__() + + table_name = index.table.name + column_names = [c.name for c in index.columns] + + self.index = index + self.args = args + + # Adding a unique index isn't semantically safe since code may + # not be aware of the new constraint on the column(s). + self.desired_phase = 'migrate' if index.unique else 'expand' + + self.provides = set([ + ('index', table_name, index.name), + ('indexcol', table_name, column_names[0]), + ]) + # Columns need to exist before index is created + self.depends = set(('column', table_name, name) + for name in column_names) + + def execute(self, ddlop): + name = self.index.name + table_name = self.index.table.name + column_names = [c.name for c in self.index.columns] + ddlop.create_index(name, table_name, column_names, + unique=self.index.unique, **self.args) + + def __repr__(self): + index = self.index + column_names = [c.name for c in index.columns] + return ('<AddIndex index={table_name=%s name=%s column_names=(%s)} ' + 'args=%r>' % (index.table.name, index.name, + ', '.join(column_names), self.args)) + + +class DropIndex(OperationBase): + removes = True + + def __init__(self, index): + super(DropIndex, self).__init__() + + self.index = index + # This is used for conflicts + self.column_names = [c.name for c in index.columns] + + # Removing a unique index should happen in migrate since + # new code may assume there isn't any restriction anymore. + self.desired_phase = 'migrate' if index.unique else 'contract' + + table_name = index.table.name + + self.provides = set([ + ('index', table_name, index.name), + ('indexcol', table_name, self.column_names[0]), + ]) + # Can't remove an index if there is a FK potentially using it + self.conflicts = set(('fkcol', table_name, name) + for name in self.column_names) + + def execute(self, ddlop): + ddlop.drop_index(self.index.name, self.index.table.name) + + def __repr__(self): + index = self.index + return ('<DropIndex index={table_name=%s name=%s}>' % + (index.table.name, index.name)) + + +class AddUniqueConstraint(OperationBase): + def __init__(self, uc, desired_phase=None): + super(AddUniqueConstraint, self).__init__() + + self.uc = uc + if desired_phase: + self.desired_phase = desired_phase + + table = uc.table + column_names = [c.name for c in uc.columns] + + self.provides = set([ + # So drop_constraint is ordered before against add_uc + ('uc', table.name, uc.name), + ('indexcol', table.name, column_names[0]), + ]) + # Columns need to exist before constraint is created + self.depends = set(('column', table.name, c.name) + for c in uc.columns) + + def execute(self, ddlop): + uc = self.uc + table = uc.table + column_names = [c.name for c in uc.columns] + + ddlop.create_unique_constraint(uc.name, table.name, column_names) + + def __repr__(self): + uc = self.uc + column_names = [c.name for c in uc.columns] + + return ('<AddUniqueConstraint uc={name=%s table_name=%s ' + 'column_names=(%s)}>' % (uc.name, uc.table.name, + ', '.join(column_names))) + + +class DropUniqueConstraint(OperationBase): + removes = True + + def __init__(self, uc): + super(DropUniqueConstraint, self).__init__() + + self.uc = uc + + table = uc.table + + # So this gets ordered against Add correctly + self.provides = set([('uc', table.name, uc.name)]) + + # Should be scheduled before any columns dropped + self.depends = set(('column', table.name, c.name) + for c in uc.columns) + + def execute(self, ddlop): + uc = self.uc + table = uc.table + + ddlop.drop_constraint(uc.name, table.name, type_='unique') + + def __repr__(self): + uc = self.uc + column_names = [c.name for c in uc.columns] + + return ('<DropUniqueConstraint uc={name=%s table_name=%s ' + 'column_names=(%s)}>' % (uc.name, uc.table.name, + ', '.join(column_names))) + + +class AddForeignKey(OperationBase): + def __init__(self, fkc, desired_phase=None): + super(AddForeignKey, self).__init__() + + self.fkc = fkc + if desired_phase: + self.desired_phase = desired_phase + + fk = fkc.elements[0] + src_table_name = fk.parent.table.name + ref_table_name = fk.column.table.name + + provides = set([('fk', src_table_name, fkc.name)]) + depends = set([ + ('indexcol', src_table_name, fk.parent.name), + ('indexcol', ref_table_name, fk.column.name), + ]) + + for fk in fkc.elements: + provides.update([ + ('fkcol', src_table_name, fk.parent.name), + ('fkcol', ref_table_name, fk.column.name), + ]) + depends.update([ + ('column', src_table_name, fk.parent.name), + ('column', ref_table_name, fk.column.name), + ]) + + self.provides = provides + self.depends = depends + + def execute(self, ddlop): + fkc = self.fkc + + src_table_name = fkc.elements[0].parent.table.name + src_column_names = [fk.parent.name for fk in fkc.elements] + ref_table_name = fkc.elements[0].column.table.name + ref_column_names = [fk.column.name for fk in fkc.elements] + + ddlop.create_foreign_key(fkc.name, + src_table_name, ref_table_name, + src_column_names, ref_column_names) + + def __repr__(self): + fkc = self.fkc + src_table_name = fkc.elements[0].parent.table.name + src_column_names = [fk.parent.name for fk in fkc.elements] + ref_table_name = fkc.elements[0].column.table.name + ref_column_names = [fk.column.name for fk in fkc.elements] + + return ('<AddForeignKey fk={name=%r src_columns=%s.(%s) ' + 'ref_columns=%s.(%s)}>' % (fkc.name, src_table_name, + ', '.join(src_column_names), ref_table_name, + ', '.join(ref_column_names))) + + +class DropForeignKey(OperationBase): + removes = True + + def __init__(self, fkc, desired_phase=None): + super(DropForeignKey, self).__init__() + + self.fkc = fkc + if desired_phase: + self.desired_phase = desired_phase + + fk = fkc.elements[0] + src_table_name = fk.parent.table.name + ref_table_name = fk.column.table.name + + provides = set([('fk', src_table_name, fkc.name)]) + depends = set([ + ('indexcol', src_table_name, fk.parent.name), + ('indexcol', ref_table_name, fk.column.name), + ]) + + for fk in fkc.elements: + provides.update([ + ('fkcol', src_table_name, fk.parent.name), + ('fkcol', ref_table_name, fk.column.name), + ]) + + self.provides = provides + self.depends = depends + + def execute(self, ddlop): + fkc = self.fkc + table = fkc.table + + ddlop.drop_constraint(fkc.name, table.name, type_='foreignkey') + + def __repr__(self): + fkc = self.fkc + src_table_name = fkc.elements[0].parent.table.name + src_column_names = [fk.parent.name for fk in fkc.elements] + ref_table_name = fkc.elements[0].column.table.name + ref_column_names = [fk.column.name for fk in fkc.elements] + + return ('<DropForeignKey fkc={name=%s src_columns=%s.(%s) ' + 'ref_columns=%s.(%s)}>' % + (fkc.name, src_table_name, ', '.join(src_column_names), + ref_table_name, ', '.join(ref_column_names))) + + +def _table_fk_constraints(table): + return [c for c in table.constraints + if isinstance(c, sqlalchemy.ForeignKeyConstraint)] + + +def _fkc_matches_key(metadata, ckey): + for table in metadata.tables.values(): + for fkc in _table_fk_constraints(table): + fk = fkc.elements[0] + src_table_name = fk.parent.table.name + ref_table_name = fk.column.table.name + + for fk in fkc.elements: + for key in [('fkcol', src_table_name, fk.parent.name), + ('fkcol', ref_table_name, fk.column.name)]: + if key == ckey: + yield fkc + break + + +def _compare_fkc(afkc, bfkc): + # Comparing name is best, but new foreign key constraints might not + # have a name set yet + if afkc.name != bfkc.name: + return False + + afk = afkc.elements[0] + bfk = bfkc.elements[0] + if afk.parent.table.name != bfk.parent.table.name: + return False + + acolumns = [(fk.parent.name, fk.column.name) for fk in afkc.elements] + bcolumns = [(fk.parent.name, fk.column.name) for fk in bfkc.elements] + if acolumns != bcolumns: + return False + + return True + + +class Converter(object): + def _handle_add_table(self, table): + # ('add_table', Table) + + # alembic can take some operations as part of op.create_table() + # but not all. We also want to separate foreign keys since they + # can potentially create a dependency on another op we haven't + # seen yet. As a result, this one diff from alembic might be + # split up into multiple ops we track and apply in different + # phases. + tblop = AddTable(table) + yield tblop + + for uc in [c for c in table.constraints + if isinstance(c, sqlalchemy.UniqueConstraint)]: + yield AddUniqueConstraint(uc, desired_phase=tblop.desired_phase) + + for fkc in _table_fk_constraints(table): + yield AddForeignKey(fkc, desired_phase=tblop.desired_phase) + + def _handle_remove_table(self, table): + # ('remove_table', Table) + tblop = DropTable(table) + yield tblop + + for fkc in _table_fk_constraints(table): + yield DropForeignKey(fkc, desired_phase=tblop.desired_phase) + + def _handle_add_column(self, schema, table_name, column): + # ('add_column', schema, table_name, Column) + + kwargs = {} + if table_name == 'migrate_version': + # The column added to migrate_version needs to exist after the + # expand phase runs so locking out 'db sync' can happen. + kwargs['desired_phase'] = 'expand' + + yield AddColumn(table_name, column, **kwargs) + + def _handle_remove_column(self, schema, table_name, column): + # ('remove_column', schema, table_name, Column) + yield DropColumn(table_name, column) + + def _handle_add_constraint(self, constraint): + # ('add_constraint', UniqueConstraint) + if not isinstance(constraint, sqlalchemy.UniqueConstraint): + raise ValueError('Unknown constraint type %r' % constraint) + + yield AddUniqueConstraint(constraint) + + def _handle_remove_constraint(self, constraint): + # ('remove_constraint', Constraint) + if not isinstance(constraint, sqlalchemy.UniqueConstraint): + raise ValueError('Unknown constraint type %r' % constraint) + + yield DropUniqueConstraint(constraint) + + def _handle_add_index(self, index): + # ('add_index', Index) + + # Include any dialect specific options (mysql_length, etc) + args = {} + for dialect, options in index.dialect_options.items(): + for k, v in options.items(): + args['%s_%s' % (dialect, k)] = v + + yield AddIndex(index, args) + + def _handle_remove_index(self, index): + # ('remove_index', Index) + yield DropIndex(index) + + def _handle_add_fk(self, fkc): + # ('add_fk', ForeignKeyConstraint) + yield AddForeignKey(fkc) + + def _handle_remove_fk(self, fkc): + # ('remove_fk', ForeignKeyConstraint) + yield DropForeignKey(fkc) + + def _column_changes(self, diffs): + # Column change (type, nullable, etc) + table_name = diffs[0][2] + column_name = diffs[0][3] + + args = {} + for diff in diffs: + cmd = diff[0] + + if cmd == 'modify_nullable': + # ('modify_nullable', None, table_name, column_name, + # {'existing_server_default': None, + # 'existing_type': VARCHAR(length=36)}, + # conn_nullable, metadata_nullable) + existing_type = diff[4]['existing_type'] + nullable = diff[6] + + args['existing_type'] = existing_type + args['nullable'] = nullable + elif cmd == 'modify_type': + # ('modify_type', None, table_name, column_name, + # {'existing_nullable': True, + # 'existing_server_default': None}, + # TINYINT(display_width=1), Boolean()) + existing_nullable = diff[4]['existing_nullable'] + new_type = diff[6] + + if 'nullable' not in args: + args['nullable'] = existing_nullable + args['type_'] = new_type + else: + msg = _('Unknown alembic cmd %s') % cmd + raise exception.DatabaseMigrationError(reason=msg) + + yield AlterColumn(table_name, column_name, args) + + def convert_alembic(self, diffs): + ops = [] + for diff in diffs: + # Parse out the format into something easier to use than the + # tuple/list format that alembic returns + + if isinstance(diff, list): + ret = self._column_changes(diff) + else: + cmd = diff[0] + + handler = getattr(self, '_handle_%s' % cmd, None) + if handler is None: + msg = _('Unknown alembic cmd %s') % cmd + raise exception.DatabaseMigrationError(reason=msg) + + ret = handler(*diff[1:]) + + ops.extend(list(ret)) + + return ops + + +class Scheduler(object): + def __init__(self, ops=None): + # Set of operations (vertexes) + self.ops = set() + # Operations that have conflicts to process + self.conflictops = set() + + # Indirect mapping of operations + self.exists = {} + self.nonexists = {} + # Dependencies and conflicts per op (resolve via mapping) + self.depends = {} + self.conflicts = {} + + # Edges per op + self.outbound = {} + self.inbound = {} + + if ops is not None: + for op in ops: + self.add(op) + + def handle_conflicts(self, metadata): + # Foreign keys can make certain operations fail. The foreign key + # needs to be removed before the operation and then recreated + # after the operation. + # + # This finds all foreign keys that currently exist and determines + # if they could conflict, then it finds any operations that are + # already in the schedule. If appropriate operations don't exist, + # then they are created. + for op in self.conflictops: + for key in self.conflicts[op]: + for fkc in _fkc_matches_key(metadata, key): + # Find any ops that match this key + dropops = self.nonexists.get(key, []) + dropops = [op for op in dropops + if _compare_fkc(fkc, op.fkc)] + + addops = self.exists.get(key, []) + addops = [op for op in addops + if _compare_fkc(fkc, op.fkc)] + + if not dropops and not addops: + # No drop or add operations for this FK, + # so create some + self.add(DropForeignKey(fkc)) + self.add(AddForeignKey(fkc)) + + # Ensure operation gets scheduled between the drop and add operations + for op in self.conflictops: + for key in self.conflicts[op]: + dropops = self.nonexists.get(key, []) + addops = self.exists.get(key, []) + + for dropop in dropops: + self.add_edge(op, dropop) + + for addop in addops: + self.add_edge(addop, op) + + def add(self, op): + self.ops.add(op) + self.inbound[op] = set() + self.outbound[op] = set() + + if op.removes: + mapping = self.nonexists + else: + mapping = self.exists + + for key in op.provides: + mapping.setdefault(key, set()).add(op) + self.depends[op] = op.depends + + if op.conflicts: + self.conflicts[op] = op.conflicts + self.conflictops.add(op) + + def add_edge(self, f, t): + self.inbound[t].add(f) + self.outbound[f].add(t) + + def sort(self): + # The topological sort modifies inbound, but we can't do a deepcopy + # since that would deepcopy the key too. + inbound = {} + for key, depends in self.inbound.items(): + inbound[key] = set(depends) + + toprocess = [v for v in self.ops if not inbound[v]] + inorder = [] + while toprocess: + op = toprocess.pop(0) + inorder.insert(0, op) + + for depop in self.outbound[op]: + inbound[depop].remove(op) + if not inbound[depop]: + toprocess.insert(0, depop) + + del inbound[op] + + # Anything remaining in inbound is a dependency loop + if inbound: + msg = _('Dependency loop exists in database migrations') + raise exception.DatabaseMigrationError(reason=msg) + + return inorder + + def order_drop_add(self): + # Alembic will emit drop/add for indexes if the covered columns change. + # Ensure that the add is scheduled after the drop. + keys = set(self.exists.keys()) & set(self.nonexists.keys()) + for key in keys: + dropops = self.nonexists[key] + addops = self.exists[key] + for dropop in dropops: + for addop in addops: + self.add_edge(addop, dropop) + + def schedule(self): + # Scheduling tries to move as much of the schema changes to be run + # while services are still running without affecting the services. + # Otherwise known as running the schema changes online. + # + # There are two major factors used: + # 1) Is the schema change compatible with running code? Adding a new + # table is since no code knows about it, but changing a column type + # may not be. + # 2) Does the DDL statement cause the database engine to block access + # to the table and affect running services? This can vary greatly + # depending on the database software (MySQL, PostgreSQL, etc), + # version (5.1, 5.5, 5.6, etc) and the storage engine (MyISAM, + # InnoDB, etc) + # + # Also, dependencies between operations might keep an operation that + # would otherwise be safe to be run online from being run online. + + self.order_drop_add() + + # Use mapping to create edges between operations + for op, depends in self.depends.items(): + if op.removes: + mapping = self.nonexists + else: + mapping = self.exists + + for key in depends: + refops = mapping.get(key, []) + + for refop in refops: + # Dependency is reversed for drop operations + if op.removes: + self.add_edge(refop, op) + else: + self.add_edge(op, refop) + + phases = { + 'expand': [], + 'migrate': [], + 'contract': [], + } + + # TODO(johannes): Schedule operations that are safe to run online + # depending on the capabilities of the database engine + + for op in self.sort(): + phase = op.desired_phase + + if phase == 'expand': + depphases = set(o.phase for o in self.outbound[op]) + depphases.discard(phase) + if depphases: + # Can't safely move this operation to expand because + # a dependency isn't in expand. + phase = 'migrate' + elif phase == 'contract': + # Since anything that depends on this hasn't had the + # phase determined yet, this has to be naive for now + if self.inbound[op]: + phase = 'migrate' + + op.phase = phase + phases[op.phase].append(op) + + return phases['expand'], phases['migrate'], phases['contract'] + + +def _add_generated_tables_to_model(metadata, database='main'): + tables = dict(metadata.tables) + for table_name, table in tables.items(): + if table_name.startswith('shadow_') or _ignore_table(table_name): + # Don't make a shadow of a shadow table or a table we + # explicitly ignore + continue + + shadow_table_name = 'shadow_' + table_name + if shadow_table_name in tables: + # Shadow table already exists in model + continue + + columns = [c.copy() for c in table.columns] + sqlalchemy.Table(shadow_table_name, metadata, *columns, + mysql_engine='InnoDB') + # Table is added to metadata as a side-effect of creating the object + + repository = _find_migrate_repo(database) + if repository.version_table not in tables: + # The existing migrate_version table is expanded with a locked + # column so the 'db sync' command can be locked out between + # running 'db expand' and 'db contract'. + + # locked is probably more appropriate a Boolean, but there is no + # portable way of using server_default in that case. SQLAlchemy + # issue #1204 + sqlalchemy.Table(repository.version_table, metadata, + sqlalchemy.Column('repository_id', sqlalchemy.String(250), + primary_key=True), + sqlalchemy.Column('repository_path', sqlalchemy.Text()), + sqlalchemy.Column('version', sqlalchemy.Integer()), + sqlalchemy.Column('locked', sqlalchemy.Integer(), + nullable=False, server_default='0')) + # Table is added to metadata as a side-effect of creating the object + + +def _schedule_schema_changes(context): + """Split the list of diffs into expand, migrate and contract phases.""" + + metadata = models.BASE.metadata + _add_generated_tables_to_model(metadata) + + # Take all of the diffs generated by Alembic and convert them into an + # easier to use format along with some dependency information. + diffs = alembic.autogenerate.compare_metadata(context, metadata) + + converter = Converter() + ops = converter.convert_alembic(diffs) + + scheduler = Scheduler(ops) + + reflected_metadata = sqlalchemy.MetaData() + reflected_metadata.reflect(context.bind) + + scheduler.handle_conflicts(reflected_metadata) + + return scheduler.schedule() + + +def db_expand(dryrun=False, database='main'): + context = _create_migration_context(as_sql=False) + expand, migrate, contract = _schedule_schema_changes(context) + + context = _create_migration_context(as_sql=dryrun) + ddlop = alembic.operations.Operations(context) + for op in expand: + op.execute(ddlop) + + repository = _find_migrate_repo(database) + if not _set_db_sync_lock(repository, locked=True): + # No rows exist yet. Might be 'db sync' was never run + db_version_control(INIT_VERSION) + _set_db_sync_lock(repository, locked=True) + + +def db_migrate(dryrun=False, database='main'): + context = _create_migration_context(as_sql=False) + expand, migrate, contract = _schedule_schema_changes(context) + + if expand: + msg = _('expand phase still has operations that need to be executed') + raise exception.DatabaseMigrationError(reason=msg) + + context = _create_migration_context(as_sql=dryrun) + ddlop = alembic.operations.Operations(context) + for op in migrate: + op.execute(ddlop) + + +def db_contract(dryrun=False, database='main'): + context = _create_migration_context(as_sql=False) + expand, migrate, contract = _schedule_schema_changes(context) + + if expand: + msg = _('expand phase still has operations that need to be executed') + raise exception.DatabaseMigrationError(reason=msg) + if migrate: + msg = _('migrate phase still has operations that need to be executed') + raise exception.DatabaseMigrationError(reason=msg) + + context = _create_migration_context(as_sql=dryrun) + ddlop = alembic.operations.Operations(context) + for op in contract: + op.execute(ddlop) + + repository = _find_migrate_repo(database) + _set_db_sync_lock(repository, locked=False) + _set_db_sync_version(repository, repository.latest) + + def _process_null_records(table, col_name, check_fkeys, delete=False): """Queries the database and optionally deletes the NULL records. diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index 0b671cb628..25ae21f667 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -716,9 +716,16 @@ class Migration(BASE, NovaBase): __table_args__ = ( Index('migrations_instance_uuid_and_status_idx', 'deleted', 'instance_uuid', 'status'), + # MySQL has a limit of 3072 bytes for an multi-column index. This + # index ends up being larger than that using the utf-8 encoding. + # Limiting the index to the prefixes will keep it under the limit. + # FIXME(johannes): Is it MySQL or InnoDB that imposes the limit? Index('migrations_by_host_nodes_and_status_idx', 'deleted', 'source_compute', 'dest_compute', 'source_node', 'dest_node', - 'status'), + 'status', mysql_length={'source_compute': 100, + 'dest_compute': 100, + 'source_node': 100, + 'dest_node': 100}), ) id = Column(Integer, primary_key=True, nullable=False) # NOTE(tr3buchet): the ____compute variables are instance['host'] diff --git a/nova/exception.py b/nova/exception.py index e9ce82fc81..9f282e8233 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -1878,3 +1878,7 @@ class InvalidImageFormat(Invalid): class UnsupportedImageModel(Invalid): msg_fmt = _("Image model '%(image)s' is not supported") + + +class DatabaseMigrationError(NovaException): + msg_fmt = _("Database migration failed: %(reason)s") diff --git a/nova/tests/unit/db/test_migrations.py b/nova/tests/unit/db/test_migrations.py index 948a8e5c45..548f466278 100644 --- a/nova/tests/unit/db/test_migrations.py +++ b/nova/tests/unit/db/test_migrations.py @@ -39,6 +39,7 @@ import glob import logging import os +import alembic from migrate import UniqueConstraint from migrate.versioning import repository import mock @@ -778,3 +779,290 @@ class ProjectTestCase(test.NoDBTestCase): "which is not supported:" "\n\t%s" % '\n\t'.join(sorted(includes_downgrade))) self.assertFalse(includes_downgrade, helpful_msg) + + +class SchemaChangeSchedulerTest(test.NoDBTestCase): + def test_add_fk_after_add_column(self): + exist_meta = sqlalchemy.MetaData() + sqlalchemy.Table('a', exist_meta, + sqlalchemy.Column('id', sqlalchemy.Integer)) + sqlalchemy.Table('b', exist_meta, + sqlalchemy.Column('id', sqlalchemy.Integer)) + + model_meta = sqlalchemy.MetaData() + sqlalchemy.Table('a', model_meta, + sqlalchemy.Column('id', sqlalchemy.Integer)) + column = sqlalchemy.Column('a_id', sqlalchemy.Integer, + sqlalchemy.ForeignKey('a.id')) + table = sqlalchemy.Table('b', model_meta, + sqlalchemy.Column('id', sqlalchemy.Integer), + column) + fkc = sqlalchemy.ForeignKeyConstraint(['a_id'], ['a.id'], + table=table) + + addcolumn = sa_migration.AddColumn('b', column, + desired_phase='migrate') + addfk = sa_migration.AddForeignKey(fkc) + + scheduler = sa_migration.Scheduler() + scheduler.add(addfk) + scheduler.add(addcolumn) + + expand, migrate, contract = scheduler.schedule() + self.assertEqual([], expand) + self.assertEqual([addcolumn, addfk], migrate) + self.assertEqual([], contract) + + def test_remove_index_after_add(self): + exist_meta = sqlalchemy.MetaData() + oldtbl = sqlalchemy.Table('a', exist_meta, + sqlalchemy.Column('id', sqlalchemy.Integer), + sqlalchemy.Column('foo', sqlalchemy.Integer)) + + model_meta = sqlalchemy.MetaData() + newtbl = sqlalchemy.Table('a', model_meta, + sqlalchemy.Column('id', sqlalchemy.Integer)) + + old_index = sqlalchemy.Index('a_id_idx', oldtbl.c.id, oldtbl.c.foo) + new_index = sqlalchemy.Index('a_id_idx', newtbl.c.id) + + dropidx = sa_migration.DropIndex(old_index) + addidx = sa_migration.AddIndex(new_index, {}) + + scheduler = sa_migration.Scheduler() + scheduler.add(addidx) + scheduler.add(dropidx) + + expand, migrate, contract = scheduler.schedule() + self.assertEqual([], expand) + self.assertEqual([dropidx, addidx], migrate) + self.assertEqual([], contract) + + +def _table(*args, **kwargs): + kwargs = kwargs.copy() + kwargs['mysql_engine'] = 'InnoDB' + return sqlalchemy.Table(*args, **kwargs) + + +class SchemaChangeDDLCheckers(object): + def setUp(self): + super(SchemaChangeDDLCheckers, self).setUp() + + context = alembic.migration.MigrationContext.configure(self.engine) + self.ddlop = alembic.operations.Operations(context) + + def test_add_table(self): + meta = sqlalchemy.MetaData() + table = _table('a', meta, + sqlalchemy.Column('id', sqlalchemy.Integer)) + meta.create_all(self.engine) + + table = oslodbutils.get_table(self.engine, 'a') + self.assertIn('id', table.c) + + def test_drop_table(self): + meta = sqlalchemy.MetaData() + table = _table('a', meta, + sqlalchemy.Column('id', sqlalchemy.Integer)) + meta.create_all(self.engine) + + # Will raise exception if table does not exist + oslodbutils.get_table(self.engine, 'a') + + op = sa_migration.DropTable(table) + op.execute(self.ddlop) + + self.assertRaises(sqlalchemy.exc.NoSuchTableError, + oslodbutils.get_table, self.engine, 'a') + + def test_add_column(self): + meta = sqlalchemy.MetaData() + table = _table('a', meta, + sqlalchemy.Column('id', sqlalchemy.Integer)) + meta.create_all(self.engine) + + column = sqlalchemy.Column('uuid', sqlalchemy.String(36)) + op = sa_migration.AddColumn('a', column) + op.execute(self.ddlop) + + table = oslodbutils.get_table(self.engine, 'a') + self.assertIn('id', table.c) + self.assertIn('uuid', table.c) + + def test_alter_column_nullable(self): + meta = sqlalchemy.MetaData() + column = sqlalchemy.Column('uuid', sqlalchemy.String(36)) + table = _table('a', meta, + sqlalchemy.Column('id', sqlalchemy.Integer), + column) + meta.create_all(self.engine) + + self.assertTrue(table.c.uuid.nullable) + + op = sa_migration.AlterColumn('a', 'uuid', + {'nullable': False, + 'existing_type': column.type}) + op.execute(self.ddlop) + + table = oslodbutils.get_table(self.engine, 'a') + self.assertFalse(table.c.uuid.nullable) + + def test_alter_column_type(self): + meta = sqlalchemy.MetaData() + column = sqlalchemy.Column('uuid', sqlalchemy.Text) + table = _table('a', meta, + sqlalchemy.Column('id', sqlalchemy.Integer), + column) + meta.create_all(self.engine) + + self.assertIsInstance(table.c.uuid.type, sqlalchemy.Text) + + new_type = sqlalchemy.String(36) + + op = sa_migration.AlterColumn('a', 'uuid', + {'nullable': True, + 'type_': new_type}) + op.execute(self.ddlop) + + table = oslodbutils.get_table(self.engine, 'a') + self.assertIsInstance(table.c.uuid.type, sqlalchemy.String) + # Text is a subclass of String, so the previous assert could pass + # if the column type didn't change + self.assertNotIsInstance(table.c.uuid.type, sqlalchemy.Text) + + def test_drop_column(self): + meta = sqlalchemy.MetaData() + column = sqlalchemy.Column('uuid', sqlalchemy.String(36)) + table = _table('a', meta, + sqlalchemy.Column('id', sqlalchemy.Integer), + column) + meta.create_all(self.engine) + + op = sa_migration.DropColumn('a', column) + op.execute(self.ddlop) + + table = oslodbutils.get_table(self.engine, 'a') + self.assertIn('id', table.c) + self.assertNotIn('uuid', table.c) + + def test_add_index(self): + meta = sqlalchemy.MetaData() + table = _table('a', meta, + sqlalchemy.Column('id', sqlalchemy.Integer)) + meta.create_all(self.engine) + + index = sqlalchemy.Index('a_id_idx', table.c.id) + + op = sa_migration.AddIndex(index, {}) + op.execute(self.ddlop) + + table = oslodbutils.get_table(self.engine, 'a') + self.assertIn('a_id_idx', [i.name for i in table.indexes]) + + def test_drop_index(self): + meta = sqlalchemy.MetaData() + index = sqlalchemy.Index('a_id_idx', 'id') + table = _table('a', meta, + sqlalchemy.Column('id', sqlalchemy.Integer), + index) + meta.create_all(self.engine) + + op = sa_migration.DropIndex(index) + op.execute(self.ddlop) + + table = oslodbutils.get_table(self.engine, 'a') + self.assertNotIn('a_id_idx', [i.name for i in table.indexes]) + + def test_add_unique_constraint(self): + meta = sqlalchemy.MetaData() + table = _table('a', meta, + sqlalchemy.Column('id', sqlalchemy.Integer)) + meta.create_all(self.engine) + + uc = sqlalchemy.UniqueConstraint(table.c.id, name='uniq_a_id') + + op = sa_migration.AddUniqueConstraint(uc) + op.execute(self.ddlop) + + table = oslodbutils.get_table(self.engine, 'a') + # Collect all unique indexes and constraints. MySQL will + # transparently create unique constraints as unique indexes + # (which is different than PostgreSQL). Also, older versions + # of SQLAlchemy will sometimes reflect these inconsistently. + uniques = {i.name for i in table.indexes if i.unique} + uniques.update(c.name for c in table.constraints + if isinstance(c, sqlalchemy.UniqueConstraint)) + self.assertIn('uniq_a_id', uniques) + + def test_drop_unique_constraint(self): + meta = sqlalchemy.MetaData() + uc = sqlalchemy.UniqueConstraint('id', name='uniq_a_id') + table = _table('a', meta, + sqlalchemy.Column('id', sqlalchemy.Integer), + uc) + meta.create_all(self.engine) + + op = sa_migration.DropUniqueConstraint(uc) + op.execute(self.ddlop) + + table = oslodbutils.get_table(self.engine, 'a') + # See comment for test_add_unique_constraint + uniques = {i.name for i in table.indexes if i.unique} + uniques.update(c.name for c in table.constraints + if isinstance(c, sqlalchemy.UniqueConstraint)) + self.assertNotIn('uniq_a_id', uniques) + + def test_add_foreign_key(self): + meta = sqlalchemy.MetaData() + a = _table('a', meta, + sqlalchemy.Column('id', sqlalchemy.Integer), + sqlalchemy.UniqueConstraint('id')) + b = _table('b', meta, + sqlalchemy.Column('a_id', sqlalchemy.Integer)) + meta.create_all(self.engine) + + fkc = sqlalchemy.ForeignKeyConstraint([b.c.a_id], [a.c.id], + name='b_a_id_fk') + + op = sa_migration.AddForeignKey(fkc) + op.execute(self.ddlop) + + table = oslodbutils.get_table(self.engine, 'b') + fkcs = {c.name: c for c in table.constraints + if isinstance(c, sqlalchemy.ForeignKeyConstraint)} + self.assertIn('b_a_id_fk', fkcs) + + columns = [(fk.parent.name, fk.column.name) + for fk in fkcs['b_a_id_fk'].elements] + self.assertEqual([('a_id', 'id')], columns) + + def test_drop_foreign_key(self): + meta = sqlalchemy.MetaData() + a = _table('a', meta, + sqlalchemy.Column('id', sqlalchemy.Integer), + sqlalchemy.UniqueConstraint('id')) + b = _table('b', meta, + sqlalchemy.Column('a_id', sqlalchemy.Integer)) + fkc = sqlalchemy.ForeignKeyConstraint([b.c.a_id], [a.c.id], + name='b_a_id_fk') + meta.create_all(self.engine) + + op = sa_migration.DropForeignKey(fkc) + op.execute(self.ddlop) + + table = oslodbutils.get_table(self.engine, 'b') + fkcs = {c.name: c for c in table.constraints} + self.assertNotIn('b_a_id_fk', fkcs) + + +class TestSchemaChangeDDLMySQL(SchemaChangeDDLCheckers, + test_base.MySQLOpportunisticTestCase, + test.NoDBTestCase): + pass + + +class TestSchemaChangeDDLPostgreSQL(SchemaChangeDDLCheckers, + test_base.PostgreSQLOpportunisticTestCase, + test.NoDBTestCase): + pass diff --git a/nova/tests/unit/db/test_sqlalchemy_migration.py b/nova/tests/unit/db/test_sqlalchemy_migration.py index d884f5ad97..52260b39b9 100644 --- a/nova/tests/unit/db/test_sqlalchemy_migration.py +++ b/nova/tests/unit/db/test_sqlalchemy_migration.py @@ -98,13 +98,14 @@ class TestNullInstanceUuidScanDB(test.TestCase): @mock.patch.object(migration, 'db_version', return_value=2) @mock.patch.object(migration, '_find_migrate_repo', return_value='repo') +@mock.patch.object(migration, '_db_sync_locked', return_value=False) @mock.patch.object(versioning_api, 'upgrade') @mock.patch.object(versioning_api, 'downgrade') @mock.patch.object(migration, 'get_engine', return_value='engine') class TestDbSync(test.NoDBTestCase): def test_version_none(self, mock_get_engine, mock_downgrade, mock_upgrade, - mock_find_repo, mock_version): + mock_sync_locked, mock_find_repo, mock_version): database = 'fake' migration.db_sync(database=database) mock_version.assert_called_once_with(database) @@ -114,7 +115,7 @@ class TestDbSync(test.NoDBTestCase): self.assertFalse(mock_downgrade.called) def test_downgrade(self, mock_get_engine, mock_downgrade, mock_upgrade, - mock_find_repo, mock_version): + mock_sync_locked, mock_find_repo, mock_version): database = 'fake' migration.db_sync(1, database=database) mock_version.assert_called_once_with(database) |