summaryrefslogtreecommitdiff
path: root/oslo_db/sqlalchemy/test_migrations.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_db/sqlalchemy/test_migrations.py')
-rw-r--r--oslo_db/sqlalchemy/test_migrations.py217
1 files changed, 0 insertions, 217 deletions
diff --git a/oslo_db/sqlalchemy/test_migrations.py b/oslo_db/sqlalchemy/test_migrations.py
index a0b5591..22ace67 100644
--- a/oslo_db/sqlalchemy/test_migrations.py
+++ b/oslo_db/sqlalchemy/test_migrations.py
@@ -27,229 +27,12 @@ import sqlalchemy.exc
import sqlalchemy.sql.expression as expr
import sqlalchemy.types as types
-from oslo_db import exception as exc
from oslo_db.sqlalchemy import provision
from oslo_db.sqlalchemy import utils
LOG = logging.getLogger(__name__)
-class WalkVersionsMixin(object, metaclass=abc.ABCMeta):
- """Test mixin to check upgrade and downgrade ability of migration.
-
- This is only suitable for testing of migrate_ migration scripts. An
- abstract class mixin. `INIT_VERSION`, `REPOSITORY` and `migration_api`
- attributes must be implemented in subclasses.
-
- .. _auxiliary-dynamic-methods:
-
- Auxiliary Methods:
-
- `migrate_up` and `migrate_down` instance methods of the class can be
- used with auxiliary methods named `_pre_upgrade_<revision_id>`,
- `_check_<revision_id>`, `_post_downgrade_<revision_id>`. The methods
- intended to check applied changes for correctness of data operations.
- This methods should be implemented for every particular revision
- which you want to check with data. Implementation recommendations for
- `_pre_upgrade_<revision_id>`, `_check_<revision_id>`,
- `_post_downgrade_<revision_id>` implementation:
-
- * `_pre_upgrade_<revision_id>`: provide a data appropriate to
- a next revision. Should be used an id of revision which
- going to be applied.
-
- * `_check_<revision_id>`: Insert, select, delete operations
- with newly applied changes. The data provided by
- `_pre_upgrade_<revision_id>` will be used.
-
- * `_post_downgrade_<revision_id>`: check for absence
- (inability to use) changes provided by reverted revision.
-
- Execution order of auxiliary methods when revision is upgrading:
-
- `_pre_upgrade_###` => `upgrade` => `_check_###`
-
- Execution order of auxiliary methods when revision is downgrading:
-
- `downgrade` => `_post_downgrade_###`
-
- .. _migrate: https://sqlalchemy-migrate.readthedocs.org/en/latest/
-
- """
-
- @property
- @abc.abstractmethod
- def INIT_VERSION(self):
- """Initial version of a migration repository.
-
- Can be different from 0, if a migrations were squashed.
-
- :rtype: int
- """
- pass
-
- @property
- @abc.abstractmethod
- def REPOSITORY(self):
- """Allows basic manipulation with migration repository.
-
- :returns: `migrate.versioning.repository.Repository` subclass.
- """
- pass
-
- @property
- @abc.abstractmethod
- def migration_api(self):
- """Provides API for upgrading, downgrading and version manipulations.
-
- :returns: `migrate.api` or overloaded analog.
- """
- pass
-
- @property
- @abc.abstractmethod
- def migrate_engine(self):
- """Provides engine instance.
-
- Should be the same instance as used when migrations are applied. In
- most cases, the `engine` attribute provided by the test class in a
- `setUp` method will work.
-
- Example of implementation:
-
- def migrate_engine(self):
- return self.engine
-
- :returns: sqlalchemy engine instance
- """
- pass
-
- def walk_versions(self, snake_walk=False, downgrade=True):
- """Check if migration upgrades and downgrades successfully.
-
- Determine the latest version script from the repo, then
- upgrade from 1 through to the latest, with no data
- in the databases. This just checks that the schema itself
- upgrades successfully.
-
- `walk_versions` calls `migrate_up` and `migrate_down` with
- `with_data` argument to check changes with data, but these methods
- can be called without any extra check outside of `walk_versions`
- method.
-
- :param snake_walk: enables checking that each individual migration can
- be upgraded/downgraded by itself.
-
- If we have ordered migrations 123abc, 456def, 789ghi and we run
- upgrading with the `snake_walk` argument set to `True`, the
- migrations will be applied in the following order::
-
- `123abc => 456def => 123abc =>
- 456def => 789ghi => 456def => 789ghi`
-
- :type snake_walk: bool
- :param downgrade: Check downgrade behavior if True.
- :type downgrade: bool
- """
-
- # Place the database under version control
- self.migration_api.version_control(self.migrate_engine,
- self.REPOSITORY,
- self.INIT_VERSION)
- self.assertEqual(self.INIT_VERSION,
- self.migration_api.db_version(self.migrate_engine,
- self.REPOSITORY))
-
- LOG.debug('latest version is %s', self.REPOSITORY.latest)
- versions = range(int(self.INIT_VERSION) + 1,
- int(self.REPOSITORY.latest) + 1)
-
- for version in versions:
- # upgrade -> downgrade -> upgrade
- self.migrate_up(version, with_data=True)
- if snake_walk:
- downgraded = self.migrate_down(version - 1, with_data=True)
- if downgraded:
- self.migrate_up(version)
-
- if downgrade:
- # Now walk it back down to 0 from the latest, testing
- # the downgrade paths.
- for version in reversed(versions):
- # downgrade -> upgrade -> downgrade
- downgraded = self.migrate_down(version - 1)
-
- if snake_walk and downgraded:
- self.migrate_up(version)
- self.migrate_down(version - 1)
-
- def migrate_down(self, version, with_data=False):
- """Migrate down to a previous version of the db.
-
- :param version: id of revision to downgrade.
- :type version: str
- :keyword with_data: Whether to verify the absence of changes from
- migration(s) being downgraded, see
- :ref:`Auxiliary Methods <auxiliary-dynamic-methods>`.
- :type with_data: Bool
- """
-
- try:
- self.migration_api.downgrade(self.migrate_engine,
- self.REPOSITORY, version)
- except NotImplementedError:
- # NOTE(sirp): some migrations, namely release-level
- # migrations, don't support a downgrade.
- return False
-
- self.assertEqual(version, self.migration_api.db_version(
- self.migrate_engine, self.REPOSITORY))
-
- # NOTE(sirp): `version` is what we're downgrading to (i.e. the 'target'
- # version). So if we have any downgrade checks, they need to be run for
- # the previous (higher numbered) migration.
- if with_data:
- post_downgrade = getattr(
- self, "_post_downgrade_%03d" % (version + 1), None)
- if post_downgrade:
- post_downgrade(self.migrate_engine)
-
- return True
-
- def migrate_up(self, version, with_data=False):
- """Migrate up to a new version of the db.
-
- :param version: id of revision to upgrade.
- :type version: str
- :keyword with_data: Whether to verify the applied changes with data,
- see :ref:`Auxiliary Methods <auxiliary-dynamic-methods>`.
- :type with_data: Bool
- """
- # NOTE(sdague): try block is here because it's impossible to debug
- # where a failed data migration happens otherwise
- try:
- if with_data:
- data = None
- pre_upgrade = getattr(
- self, "_pre_upgrade_%03d" % version, None)
- if pre_upgrade:
- data = pre_upgrade(self.migrate_engine)
-
- self.migration_api.upgrade(self.migrate_engine,
- self.REPOSITORY, version)
- self.assertEqual(version,
- self.migration_api.db_version(self.migrate_engine,
- self.REPOSITORY))
- if with_data:
- check = getattr(self, "_check_%03d" % version, None)
- if check:
- check(self.migrate_engine, data)
- except exc.DBMigrationError:
- msg = "Failed to migrate to version %(ver)s on engine %(eng)s"
- LOG.error(msg, {"ver": version, "eng": self.migrate_engine})
- raise
-
-
class ModelsMigrationsSync(object, metaclass=abc.ABCMeta):
"""A helper class for comparison of DB migration scripts and models.