diff options
Diffstat (limited to 'migrate/tests/versioning')
-rw-r--r-- | migrate/tests/versioning/__init__.py | 0 | ||||
-rw-r--r-- | migrate/tests/versioning/test_api.py | 128 | ||||
-rw-r--r-- | migrate/tests/versioning/test_cfgparse.py | 27 | ||||
-rw-r--r-- | migrate/tests/versioning/test_database.py | 13 | ||||
-rw-r--r-- | migrate/tests/versioning/test_genmodel.py | 214 | ||||
-rw-r--r-- | migrate/tests/versioning/test_keyedinstance.py | 45 | ||||
-rw-r--r-- | migrate/tests/versioning/test_pathed.py | 51 | ||||
-rw-r--r-- | migrate/tests/versioning/test_repository.py | 216 | ||||
-rw-r--r-- | migrate/tests/versioning/test_runchangeset.py | 52 | ||||
-rw-r--r-- | migrate/tests/versioning/test_schema.py | 205 | ||||
-rw-r--r-- | migrate/tests/versioning/test_schemadiff.py | 227 | ||||
-rw-r--r-- | migrate/tests/versioning/test_script.py | 305 | ||||
-rw-r--r-- | migrate/tests/versioning/test_shell.py | 574 | ||||
-rw-r--r-- | migrate/tests/versioning/test_template.py | 70 | ||||
-rw-r--r-- | migrate/tests/versioning/test_util.py | 139 | ||||
-rw-r--r-- | migrate/tests/versioning/test_version.py | 186 |
16 files changed, 0 insertions, 2452 deletions
diff --git a/migrate/tests/versioning/__init__.py b/migrate/tests/versioning/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/migrate/tests/versioning/__init__.py +++ /dev/null diff --git a/migrate/tests/versioning/test_api.py b/migrate/tests/versioning/test_api.py deleted file mode 100644 index bc4b29d..0000000 --- a/migrate/tests/versioning/test_api.py +++ /dev/null @@ -1,128 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -import six - -from migrate.exceptions import * -from migrate.versioning import api - -from migrate.tests.fixture.pathed import * -from migrate.tests.fixture import models -from migrate.tests import fixture - - -class TestAPI(Pathed): - - def test_help(self): - self.assertTrue(isinstance(api.help('help'), six.string_types)) - self.assertRaises(UsageError, api.help) - self.assertRaises(UsageError, api.help, 'foobar') - self.assertTrue(isinstance(api.help('create'), str)) - - # test that all commands return some text - for cmd in api.__all__: - content = api.help(cmd) - self.assertTrue(content) - - def test_create(self): - tmprepo = self.tmp_repos() - api.create(tmprepo, 'temp') - - # repository already exists - self.assertRaises(KnownError, api.create, tmprepo, 'temp') - - def test_script(self): - repo = self.tmp_repos() - api.create(repo, 'temp') - api.script('first version', repo) - - def test_script_sql(self): - repo = self.tmp_repos() - api.create(repo, 'temp') - api.script_sql('postgres', 'desc', repo) - - def test_version(self): - repo = self.tmp_repos() - api.create(repo, 'temp') - api.version(repo) - - def test_version_control(self): - repo = self.tmp_repos() - api.create(repo, 'temp') - api.version_control('sqlite:///', repo) - api.version_control('sqlite:///', six.text_type(repo)) - - def test_source(self): - repo = self.tmp_repos() - api.create(repo, 'temp') - api.script('first version', repo) - api.script_sql('default', 'desc', repo) - - # no repository - self.assertRaises(UsageError, api.source, 1) - - # stdout - out = api.source(1, dest=None, repository=repo) - self.assertTrue(out) - - # file - out = api.source(1, dest=self.tmp_repos(), repository=repo) - self.assertFalse(out) - - def test_manage(self): - output = api.manage(os.path.join(self.temp_usable_dir, 'manage.py')) - - -class TestSchemaAPI(fixture.DB, Pathed): - - def _setup(self, url): - super(TestSchemaAPI, self)._setup(url) - self.repo = self.tmp_repos() - api.create(self.repo, 'temp') - self.schema = api.version_control(url, self.repo) - - def _teardown(self): - self.schema = api.drop_version_control(self.url, self.repo) - super(TestSchemaAPI, self)._teardown() - - @fixture.usedb() - def test_workflow(self): - self.assertEqual(api.db_version(self.url, self.repo), 0) - api.script('First Version', self.repo) - self.assertEqual(api.db_version(self.url, self.repo), 0) - api.upgrade(self.url, self.repo, 1) - self.assertEqual(api.db_version(self.url, self.repo), 1) - api.downgrade(self.url, self.repo, 0) - self.assertEqual(api.db_version(self.url, self.repo), 0) - api.test(self.url, self.repo) - self.assertEqual(api.db_version(self.url, self.repo), 0) - - # preview - # TODO: test output - out = api.upgrade(self.url, self.repo, preview_py=True) - out = api.upgrade(self.url, self.repo, preview_sql=True) - - api.upgrade(self.url, self.repo, 1) - api.script_sql('default', 'desc', self.repo) - self.assertRaises(UsageError, api.upgrade, self.url, self.repo, 2, preview_py=True) - out = api.upgrade(self.url, self.repo, 2, preview_sql=True) - - # cant upgrade to version 1, already at version 1 - self.assertEqual(api.db_version(self.url, self.repo), 1) - self.assertRaises(KnownError, api.upgrade, self.url, self.repo, 0) - - @fixture.usedb() - def test_compare_model_to_db(self): - diff = api.compare_model_to_db(self.url, self.repo, models.meta) - - @fixture.usedb() - def test_create_model(self): - model = api.create_model(self.url, self.repo) - - @fixture.usedb() - def test_make_update_script_for_model(self): - model = api.make_update_script_for_model(self.url, self.repo, models.meta_old_rundiffs, models.meta_rundiffs) - - @fixture.usedb() - def test_update_db_from_model(self): - model = api.update_db_from_model(self.url, self.repo, models.meta_rundiffs) diff --git a/migrate/tests/versioning/test_cfgparse.py b/migrate/tests/versioning/test_cfgparse.py deleted file mode 100644 index a31273e..0000000 --- a/migrate/tests/versioning/test_cfgparse.py +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -from migrate.versioning import cfgparse -from migrate.versioning.repository import * -from migrate.versioning.template import Template -from migrate.tests import fixture - - -class TestConfigParser(fixture.Base): - - def test_to_dict(self): - """Correctly interpret config results as dictionaries""" - parser = cfgparse.Parser(dict(default_value=42)) - self.assertTrue(len(parser.sections()) == 0) - parser.add_section('section') - parser.set('section','option','value') - self.assertEqual(parser.get('section', 'option'), 'value') - self.assertEqual(parser.to_dict()['section']['option'], 'value') - - def test_table_config(self): - """We should be able to specify the table to be used with a repository""" - default_text = Repository.prepare_config(Template().get_repository(), - 'repository_name', {}) - specified_text = Repository.prepare_config(Template().get_repository(), - 'repository_name', {'version_table': '_other_table'}) - self.assertNotEqual(default_text, specified_text) diff --git a/migrate/tests/versioning/test_database.py b/migrate/tests/versioning/test_database.py deleted file mode 100644 index 8291c6b..0000000 --- a/migrate/tests/versioning/test_database.py +++ /dev/null @@ -1,13 +0,0 @@ -from sqlalchemy import select, text -from migrate.tests import fixture - -class TestConnect(fixture.DB): - level=fixture.DB.TXN - - @fixture.usedb() - def test_connect(self): - """Connect to the database successfully""" - # Connection is done in fixture.DB setup; make sure we can do stuff - self.engine.execute( - select([text('42')]) - ) diff --git a/migrate/tests/versioning/test_genmodel.py b/migrate/tests/versioning/test_genmodel.py deleted file mode 100644 index f800826..0000000 --- a/migrate/tests/versioning/test_genmodel.py +++ /dev/null @@ -1,214 +0,0 @@ -# -*- coding: utf-8 -*- - -import os - -import six -import sqlalchemy -from sqlalchemy import * - -from migrate.versioning import genmodel, schemadiff -from migrate.changeset import schema - -from migrate.tests import fixture - - -class TestSchemaDiff(fixture.DB): - table_name = 'tmp_schemadiff' - level = fixture.DB.CONNECT - - def _setup(self, url): - super(TestSchemaDiff, self)._setup(url) - self.meta = MetaData(self.engine) - self.meta.reflect() - self.meta.drop_all() # in case junk tables are lying around in the test database - self.meta = MetaData(self.engine) - self.meta.reflect() # needed if we just deleted some tables - self.table = Table(self.table_name, self.meta, - Column('id',Integer(), primary_key=True), - Column('name', UnicodeText()), - Column('data', UnicodeText()), - ) - - def _teardown(self): - if self.table.exists(): - self.meta = MetaData(self.engine) - self.meta.reflect() - self.meta.drop_all() - super(TestSchemaDiff, self)._teardown() - - def _applyLatestModel(self): - diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version']) - genmodel.ModelGenerator(diff,self.engine).runB2A() - - # NOTE(mriedem): DB2 handles UnicodeText as LONG VARGRAPHIC - # so the schema diffs on the columns don't work with this test. - @fixture.usedb(not_supported='ibm_db_sa') - def test_functional(self): - def assertDiff(isDiff, tablesMissingInDatabase, tablesMissingInModel, tablesWithDiff): - diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version']) - self.assertEqual( - (diff.tables_missing_from_B, - diff.tables_missing_from_A, - list(diff.tables_different.keys()), - bool(diff)), - (tablesMissingInDatabase, - tablesMissingInModel, - tablesWithDiff, - isDiff) - ) - - # Model is defined but database is empty. - assertDiff(True, [self.table_name], [], []) - - # Check Python upgrade and downgrade of database from updated model. - diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version']) - decls, upgradeCommands, downgradeCommands = genmodel.ModelGenerator(diff,self.engine).genB2AMigration() - - # Feature test for a recent SQLa feature; - # expect different output in that case. - if repr(String()) == 'String()': - self.assertEqualIgnoreWhitespace(decls, ''' - from migrate.changeset import schema - pre_meta = MetaData() - post_meta = MetaData() - tmp_schemadiff = Table('tmp_schemadiff', post_meta, - Column('id', Integer, primary_key=True, nullable=False), - Column('name', UnicodeText), - Column('data', UnicodeText), - ) - ''') - else: - self.assertEqualIgnoreWhitespace(decls, ''' - from migrate.changeset import schema - pre_meta = MetaData() - post_meta = MetaData() - tmp_schemadiff = Table('tmp_schemadiff', post_meta, - Column('id', Integer, primary_key=True, nullable=False), - Column('name', UnicodeText(length=None)), - Column('data', UnicodeText(length=None)), - ) - ''') - - # Create table in database, now model should match database. - self._applyLatestModel() - assertDiff(False, [], [], []) - - # Check Python code gen from database. - diff = schemadiff.getDiffOfModelAgainstDatabase(MetaData(), self.engine, excludeTables=['migrate_version']) - src = genmodel.ModelGenerator(diff,self.engine).genBDefinition() - - namespace = {} - six.exec_(src, namespace) - - c1 = Table('tmp_schemadiff', self.meta, autoload=True).c - c2 = namespace['tmp_schemadiff'].c - self.compare_columns_equal(c1, c2, ['type']) - # TODO: get rid of ignoring type - - if not self.engine.name == 'oracle': - # Add data, later we'll make sure it's still present. - result = self.engine.execute(self.table.insert(), id=1, name=u'mydata') - dataId = result.inserted_primary_key[0] - - # Modify table in model (by removing it and adding it back to model) - # Drop column data, add columns data2 and data3. - self.meta.remove(self.table) - self.table = Table(self.table_name,self.meta, - Column('id',Integer(),primary_key=True), - Column('name',UnicodeText(length=None)), - Column('data2',Integer(),nullable=True), - Column('data3',Integer(),nullable=True), - ) - assertDiff(True, [], [], [self.table_name]) - - # Apply latest model changes and find no more diffs. - self._applyLatestModel() - assertDiff(False, [], [], []) - - # Drop column data3, add data4 - self.meta.remove(self.table) - self.table = Table(self.table_name,self.meta, - Column('id',Integer(),primary_key=True), - Column('name',UnicodeText(length=None)), - Column('data2',Integer(),nullable=True), - Column('data4',Float(),nullable=True), - ) - assertDiff(True, [], [], [self.table_name]) - - diff = schemadiff.getDiffOfModelAgainstDatabase( - self.meta, self.engine, excludeTables=['migrate_version']) - decls, upgradeCommands, downgradeCommands = genmodel.ModelGenerator(diff,self.engine).genB2AMigration(indent='') - - # decls have changed since genBDefinition - six.exec_(decls, namespace) - # migration commands expect a namespace containing migrate_engine - namespace['migrate_engine'] = self.engine - # run the migration up and down - six.exec_(upgradeCommands, namespace) - assertDiff(False, [], [], []) - - six.exec_(decls, namespace) - six.exec_(downgradeCommands, namespace) - assertDiff(True, [], [], [self.table_name]) - - six.exec_(decls, namespace) - six.exec_(upgradeCommands, namespace) - assertDiff(False, [], [], []) - - if not self.engine.name == 'oracle': - # Make sure data is still present. - result = self.engine.execute(self.table.select(self.table.c.id==dataId)) - rows = result.fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0].name, 'mydata') - - # Add data, later we'll make sure it's still present. - result = self.engine.execute(self.table.insert(), id=2, name=u'mydata2', data2=123) - dataId2 = result.inserted_primary_key[0] - - # Change column type in model. - self.meta.remove(self.table) - self.table = Table(self.table_name,self.meta, - Column('id',Integer(),primary_key=True), - Column('name',UnicodeText(length=None)), - Column('data2',String(255),nullable=True), - ) - - # XXX test type diff - return - - assertDiff(True, [], [], [self.table_name]) - - # Apply latest model changes and find no more diffs. - self._applyLatestModel() - assertDiff(False, [], [], []) - - if not self.engine.name == 'oracle': - # Make sure data is still present. - result = self.engine.execute(self.table.select(self.table.c.id==dataId2)) - rows = result.fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0].name, 'mydata2') - self.assertEqual(rows[0].data2, '123') - - # Delete data, since we're about to make a required column. - # Not even using sqlalchemy.PassiveDefault helps because we're doing explicit column select. - self.engine.execute(self.table.delete(), id=dataId) - - if not self.engine.name == 'firebird': - # Change column nullable in model. - self.meta.remove(self.table) - self.table = Table(self.table_name,self.meta, - Column('id',Integer(),primary_key=True), - Column('name',UnicodeText(length=None)), - Column('data2',String(255),nullable=False), - ) - assertDiff(True, [], [], [self.table_name]) # TODO test nullable diff - - # Apply latest model changes and find no more diffs. - self._applyLatestModel() - assertDiff(False, [], [], []) - - # Remove table from model. - self.meta.remove(self.table) - assertDiff(True, [], [self.table_name], []) diff --git a/migrate/tests/versioning/test_keyedinstance.py b/migrate/tests/versioning/test_keyedinstance.py deleted file mode 100644 index 485cbbb..0000000 --- a/migrate/tests/versioning/test_keyedinstance.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -from migrate.tests import fixture -from migrate.versioning.util.keyedinstance import * - -class TestKeydInstance(fixture.Base): - def test_unique(self): - """UniqueInstance should produce unique object instances""" - class Uniq1(KeyedInstance): - @classmethod - def _key(cls,key): - return str(key) - def __init__(self,value): - self.value=value - class Uniq2(KeyedInstance): - @classmethod - def _key(cls,key): - return str(key) - def __init__(self,value): - self.value=value - - a10 = Uniq1('a') - - # Different key: different instance - b10 = Uniq1('b') - self.assertTrue(a10 is not b10) - - # Different class: different instance - a20 = Uniq2('a') - self.assertTrue(a10 is not a20) - - # Same key/class: same instance - a11 = Uniq1('a') - self.assertTrue(a10 is a11) - - # __init__ is called - self.assertEqual(a10.value,'a') - - # clear() causes us to forget all existing instances - Uniq1.clear() - a12 = Uniq1('a') - self.assertTrue(a10 is not a12) - - self.assertRaises(NotImplementedError, KeyedInstance._key) diff --git a/migrate/tests/versioning/test_pathed.py b/migrate/tests/versioning/test_pathed.py deleted file mode 100644 index 53f0b47..0000000 --- a/migrate/tests/versioning/test_pathed.py +++ /dev/null @@ -1,51 +0,0 @@ -from migrate.tests import fixture -from migrate.versioning.pathed import * - -class TestPathed(fixture.Base): - def test_parent_path(self): - """Default parent_path should behave correctly""" - filepath='/fgsfds/moot.py' - dirpath='/fgsfds/moot' - sdirpath='/fgsfds/moot/' - - result='/fgsfds' - self.assertTrue(result==Pathed._parent_path(filepath)) - self.assertTrue(result==Pathed._parent_path(dirpath)) - self.assertTrue(result==Pathed._parent_path(sdirpath)) - - def test_new(self): - """Pathed(path) shouldn't create duplicate objects of the same path""" - path='/fgsfds' - class Test(Pathed): - attr=None - o1=Test(path) - o2=Test(path) - self.assertTrue(isinstance(o1,Test)) - self.assertTrue(o1.path==path) - self.assertTrue(o1 is o2) - o1.attr='herring' - self.assertTrue(o2.attr=='herring') - o2.attr='shrubbery' - self.assertTrue(o1.attr=='shrubbery') - - def test_parent(self): - """Parents should be fetched correctly""" - class Parent(Pathed): - parent=None - children=0 - def _init_child(self,child,path): - """Keep a tally of children. - (A real class might do something more interesting here) - """ - self.__class__.children+=1 - - class Child(Pathed): - parent=Parent - - path='/fgsfds/moot.py' - parent_path='/fgsfds' - object=Child(path) - self.assertTrue(isinstance(object,Child)) - self.assertTrue(isinstance(object.parent,Parent)) - self.assertTrue(object.path==path) - self.assertTrue(object.parent.path==parent_path) diff --git a/migrate/tests/versioning/test_repository.py b/migrate/tests/versioning/test_repository.py deleted file mode 100644 index 6e87c02..0000000 --- a/migrate/tests/versioning/test_repository.py +++ /dev/null @@ -1,216 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import shutil - -from migrate import exceptions -from migrate.versioning.repository import * -from migrate.versioning.script import * - -from migrate.tests import fixture -from datetime import datetime - - -class TestRepository(fixture.Pathed): - def test_create(self): - """Repositories are created successfully""" - path = self.tmp_repos() - name = 'repository_name' - - # Creating a repository that doesn't exist should succeed - repo = Repository.create(path, name) - config_path = repo.config.path - manage_path = os.path.join(repo.path, 'manage.py') - self.assertTrue(repo) - - # Files should actually be created - self.assertTrue(os.path.exists(path)) - self.assertTrue(os.path.exists(config_path)) - self.assertTrue(os.path.exists(manage_path)) - - # Can't create it again: it already exists - self.assertRaises(exceptions.PathFoundError, Repository.create, path, name) - return path - - def test_load(self): - """We should be able to load information about an existing repository""" - # Create a repository to load - path = self.test_create() - repos = Repository(path) - - self.assertTrue(repos) - self.assertTrue(repos.config) - self.assertTrue(repos.config.get('db_settings', 'version_table')) - - # version_table's default isn't none - self.assertNotEqual(repos.config.get('db_settings', 'version_table'), 'None') - - def test_load_notfound(self): - """Nonexistant repositories shouldn't be loaded""" - path = self.tmp_repos() - self.assertTrue(not os.path.exists(path)) - self.assertRaises(exceptions.InvalidRepositoryError, Repository, path) - - def test_load_invalid(self): - """Invalid repos shouldn't be loaded""" - # Here, invalid=empty directory. There may be other conditions too, - # but we shouldn't need to test all of them - path = self.tmp_repos() - os.mkdir(path) - self.assertRaises(exceptions.InvalidRepositoryError, Repository, path) - - -class TestVersionedRepository(fixture.Pathed): - """Tests on an existing repository with a single python script""" - - def setUp(self): - super(TestVersionedRepository, self).setUp() - Repository.clear() - self.path_repos = self.tmp_repos() - Repository.create(self.path_repos, 'repository_name') - - def test_version(self): - """We should correctly detect the version of a repository""" - repos = Repository(self.path_repos) - - # Get latest version, or detect if a specified version exists - self.assertEqual(repos.latest, 0) - # repos.latest isn't an integer, but a VerNum - # (so we can't just assume the following tests are correct) - self.assertTrue(repos.latest >= 0) - self.assertTrue(repos.latest < 1) - - # Create a script and test again - repos.create_script('') - self.assertEqual(repos.latest, 1) - self.assertTrue(repos.latest >= 0) - self.assertTrue(repos.latest >= 1) - self.assertTrue(repos.latest < 2) - - # Create a new script and test again - repos.create_script('') - self.assertEqual(repos.latest, 2) - self.assertTrue(repos.latest >= 0) - self.assertTrue(repos.latest >= 1) - self.assertTrue(repos.latest >= 2) - self.assertTrue(repos.latest < 3) - - - def test_timestmap_numbering_version(self): - repos = Repository(self.path_repos) - repos.config.set('db_settings', 'use_timestamp_numbering', 'True') - - # Get latest version, or detect if a specified version exists - self.assertEqual(repos.latest, 0) - # repos.latest isn't an integer, but a VerNum - # (so we can't just assume the following tests are correct) - self.assertTrue(repos.latest >= 0) - self.assertTrue(repos.latest < 1) - - # Create a script and test again - now = int(datetime.utcnow().strftime('%Y%m%d%H%M%S')) - repos.create_script('') - self.assertEqual(repos.latest, now) - - def test_source(self): - """Get a script object by version number and view its source""" - # Load repository and commit script - repo = Repository(self.path_repos) - repo.create_script('') - repo.create_script_sql('postgres', 'foo bar') - - # Source is valid: script must have an upgrade function - # (not a very thorough test, but should be plenty) - source = repo.version(1).script().source() - self.assertTrue(source.find('def upgrade') >= 0) - - import pprint; pprint.pprint(repo.version(2).sql) - source = repo.version(2).script('postgres', 'upgrade').source() - self.assertEqual(source.strip(), '') - - def test_latestversion(self): - """Repository.version() (no params) returns the latest version""" - repos = Repository(self.path_repos) - repos.create_script('') - self.assertTrue(repos.version(repos.latest) is repos.version()) - self.assertTrue(repos.version() is not None) - - def test_changeset(self): - """Repositories can create changesets properly""" - # Create a nonzero-version repository of empty scripts - repos = Repository(self.path_repos) - for i in range(10): - repos.create_script('') - - def check_changeset(params, length): - """Creates and verifies a changeset""" - changeset = repos.changeset('postgres', *params) - self.assertEqual(len(changeset), length) - self.assertTrue(isinstance(changeset, Changeset)) - uniq = list() - # Changesets are iterable - for version, change in changeset: - self.assertTrue(isinstance(change, BaseScript)) - # Changes aren't identical - self.assertTrue(id(change) not in uniq) - uniq.append(id(change)) - return changeset - - # Upgrade to a specified version... - cs = check_changeset((0, 10), 10) - self.assertEqual(cs.keys().pop(0),0 ) # 0 -> 1: index is starting version - self.assertEqual(cs.keys().pop(), 9) # 9 -> 10: index is starting version - self.assertEqual(cs.start, 0) # starting version - self.assertEqual(cs.end, 10) # ending version - check_changeset((0, 1), 1) - check_changeset((0, 5), 5) - check_changeset((0, 0), 0) - check_changeset((5, 5), 0) - check_changeset((10, 10), 0) - check_changeset((5, 10), 5) - - # Can't request a changeset of higher version than this repository - self.assertRaises(Exception, repos.changeset, 'postgres', 5, 11) - self.assertRaises(Exception, repos.changeset, 'postgres', -1, 5) - - # Upgrade to the latest version... - cs = check_changeset((0,), 10) - self.assertEqual(cs.keys().pop(0), 0) - self.assertEqual(cs.keys().pop(), 9) - self.assertEqual(cs.start, 0) - self.assertEqual(cs.end, 10) - check_changeset((1,), 9) - check_changeset((5,), 5) - check_changeset((9,), 1) - check_changeset((10,), 0) - - # run changes - cs.run('postgres', 'upgrade') - - # Can't request a changeset of higher/lower version than this repository - self.assertRaises(Exception, repos.changeset, 'postgres', 11) - self.assertRaises(Exception, repos.changeset, 'postgres', -1) - - # Downgrade - cs = check_changeset((10, 0),10) - self.assertEqual(cs.keys().pop(0), 10) # 10 -> 9 - self.assertEqual(cs.keys().pop(), 1) # 1 -> 0 - self.assertEqual(cs.start, 10) - self.assertEqual(cs.end, 0) - check_changeset((10, 5), 5) - check_changeset((5, 0), 5) - - def test_many_versions(self): - """Test what happens when lots of versions are created""" - repos = Repository(self.path_repos) - for i in range(1001): - repos.create_script('') - - # since we normally create 3 digit ones, let's see if we blow up - self.assertTrue(os.path.exists('%s/versions/1000.py' % self.path_repos)) - self.assertTrue(os.path.exists('%s/versions/1001.py' % self.path_repos)) - - -# TODO: test manage file -# TODO: test changeset diff --git a/migrate/tests/versioning/test_runchangeset.py b/migrate/tests/versioning/test_runchangeset.py deleted file mode 100644 index 12bc77c..0000000 --- a/migrate/tests/versioning/test_runchangeset.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os,shutil - -from migrate.tests import fixture -from migrate.versioning.schema import * -from migrate.versioning import script - - -class TestRunChangeset(fixture.Pathed,fixture.DB): - level=fixture.DB.CONNECT - def _setup(self, url): - super(TestRunChangeset, self)._setup(url) - Repository.clear() - self.path_repos=self.tmp_repos() - # Create repository, script - Repository.create(self.path_repos,'repository_name') - - @fixture.usedb() - def test_changeset_run(self): - """Running a changeset against a repository gives expected results""" - repos=Repository(self.path_repos) - for i in range(10): - repos.create_script('') - try: - ControlledSchema(self.engine,repos).drop() - except: - pass - db=ControlledSchema.create(self.engine,repos) - - # Scripts are empty; we'll check version # correctness. - # (Correct application of their content is checked elsewhere) - self.assertEqual(db.version,0) - db.upgrade(1) - self.assertEqual(db.version,1) - db.upgrade(5) - self.assertEqual(db.version,5) - db.upgrade(5) - self.assertEqual(db.version,5) - db.upgrade(None) # Latest is implied - self.assertEqual(db.version,10) - self.assertRaises(Exception,db.upgrade,11) - self.assertEqual(db.version,10) - db.upgrade(9) - self.assertEqual(db.version,9) - db.upgrade(0) - self.assertEqual(db.version,0) - self.assertRaises(Exception,db.upgrade,-1) - self.assertEqual(db.version,0) - #changeset = repos.changeset(self.url,0) - db.drop() diff --git a/migrate/tests/versioning/test_schema.py b/migrate/tests/versioning/test_schema.py deleted file mode 100644 index 5396d9d..0000000 --- a/migrate/tests/versioning/test_schema.py +++ /dev/null @@ -1,205 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import shutil - -import six - -from migrate import exceptions -from migrate.versioning.schema import * -from migrate.versioning import script, schemadiff - -from sqlalchemy import * - -from migrate.tests import fixture - - -class TestControlledSchema(fixture.Pathed, fixture.DB): - # Transactions break postgres in this test; we'll clean up after ourselves - level = fixture.DB.CONNECT - - def setUp(self): - super(TestControlledSchema, self).setUp() - self.path_repos = self.temp_usable_dir + '/repo/' - self.repos = Repository.create(self.path_repos, 'repo_name') - - def _setup(self, url): - self.setUp() - super(TestControlledSchema, self)._setup(url) - self.cleanup() - - def _teardown(self): - super(TestControlledSchema, self)._teardown() - self.cleanup() - self.tearDown() - - def cleanup(self): - # drop existing version table if necessary - try: - ControlledSchema(self.engine, self.repos).drop() - except: - # No table to drop; that's fine, be silent - pass - - def tearDown(self): - self.cleanup() - super(TestControlledSchema, self).tearDown() - - @fixture.usedb() - def test_version_control(self): - """Establish version control on a particular database""" - # Establish version control on this database - dbcontrol = ControlledSchema.create(self.engine, self.repos) - - # Trying to create another DB this way fails: table exists - self.assertRaises(exceptions.DatabaseAlreadyControlledError, - ControlledSchema.create, self.engine, self.repos) - - # We can load a controlled DB this way, too - dbcontrol0 = ControlledSchema(self.engine, self.repos) - self.assertEqual(dbcontrol, dbcontrol0) - - # We can also use a repository path, instead of a repository - dbcontrol0 = ControlledSchema(self.engine, self.repos.path) - self.assertEqual(dbcontrol, dbcontrol0) - - # We don't have to use the same connection - engine = create_engine(self.url) - dbcontrol0 = ControlledSchema(engine, self.repos.path) - self.assertEqual(dbcontrol, dbcontrol0) - - # Clean up: - dbcontrol.drop() - - # Attempting to drop vc from a db without it should fail - self.assertRaises(exceptions.DatabaseNotControlledError, dbcontrol.drop) - - # No table defined should raise error - self.assertRaises(exceptions.DatabaseNotControlledError, - ControlledSchema, self.engine, self.repos) - - @fixture.usedb() - def test_version_control_specified(self): - """Establish version control with a specified version""" - # Establish version control on this database - version = 0 - dbcontrol = ControlledSchema.create(self.engine, self.repos, version) - self.assertEqual(dbcontrol.version, version) - - # Correct when we load it, too - dbcontrol = ControlledSchema(self.engine, self.repos) - self.assertEqual(dbcontrol.version, version) - - dbcontrol.drop() - - # Now try it with a nonzero value - version = 10 - for i in range(version): - self.repos.create_script('') - self.assertEqual(self.repos.latest, version) - - # Test with some mid-range value - dbcontrol = ControlledSchema.create(self.engine,self.repos, 5) - self.assertEqual(dbcontrol.version, 5) - dbcontrol.drop() - - # Test with max value - dbcontrol = ControlledSchema.create(self.engine, self.repos, version) - self.assertEqual(dbcontrol.version, version) - dbcontrol.drop() - - @fixture.usedb() - def test_version_control_invalid(self): - """Try to establish version control with an invalid version""" - versions = ('Thirteen', '-1', -1, '' , 13) - # A fresh repository doesn't go up to version 13 yet - for version in versions: - #self.assertRaises(ControlledSchema.InvalidVersionError, - # Can't have custom errors with assertRaises... - try: - ControlledSchema.create(self.engine, self.repos, version) - self.assertTrue(False, repr(version)) - except exceptions.InvalidVersionError: - pass - - @fixture.usedb() - def test_changeset(self): - """Create changeset from controlled schema""" - dbschema = ControlledSchema.create(self.engine, self.repos) - - # empty schema doesn't have changesets - cs = dbschema.changeset() - self.assertEqual(cs, {}) - - for i in range(5): - self.repos.create_script('') - self.assertEqual(self.repos.latest, 5) - - cs = dbschema.changeset(5) - self.assertEqual(len(cs), 5) - - # cleanup - dbschema.drop() - - @fixture.usedb() - def test_upgrade_runchange(self): - dbschema = ControlledSchema.create(self.engine, self.repos) - - for i in range(10): - self.repos.create_script('') - - self.assertEqual(self.repos.latest, 10) - - dbschema.upgrade(10) - - self.assertRaises(ValueError, dbschema.upgrade, 'a') - self.assertRaises(exceptions.InvalidVersionError, dbschema.runchange, 20, '', 1) - - # TODO: test for table version in db - - # cleanup - dbschema.drop() - - @fixture.usedb() - def test_create_model(self): - """Test workflow to generate create_model""" - model = ControlledSchema.create_model(self.engine, self.repos, declarative=False) - self.assertTrue(isinstance(model, six.string_types)) - - model = ControlledSchema.create_model(self.engine, self.repos.path, declarative=True) - self.assertTrue(isinstance(model, six.string_types)) - - @fixture.usedb() - def test_compare_model_to_db(self): - meta = self.construct_model() - - diff = ControlledSchema.compare_model_to_db(self.engine, meta, self.repos) - self.assertTrue(isinstance(diff, schemadiff.SchemaDiff)) - - diff = ControlledSchema.compare_model_to_db(self.engine, meta, self.repos.path) - self.assertTrue(isinstance(diff, schemadiff.SchemaDiff)) - meta.drop_all(self.engine) - - @fixture.usedb() - def test_update_db_from_model(self): - dbschema = ControlledSchema.create(self.engine, self.repos) - - meta = self.construct_model() - - dbschema.update_db_from_model(meta) - - # TODO: test for table version in db - - # cleanup - dbschema.drop() - meta.drop_all(self.engine) - - def construct_model(self): - meta = MetaData() - - user = Table('temp_model_schema', meta, Column('id', Integer), Column('user', String(245))) - - return meta - - # TODO: test how are tables populated in db diff --git a/migrate/tests/versioning/test_schemadiff.py b/migrate/tests/versioning/test_schemadiff.py deleted file mode 100644 index f45a012..0000000 --- a/migrate/tests/versioning/test_schemadiff.py +++ /dev/null @@ -1,227 +0,0 @@ -# -*- coding: utf-8 -*- - -import os - -from sqlalchemy import * - -from migrate.versioning import schemadiff - -from migrate.tests import fixture - -class SchemaDiffBase(fixture.DB): - - level = fixture.DB.CONNECT - def _make_table(self,*cols,**kw): - self.table = Table('xtable', self.meta, - Column('id',Integer(), primary_key=True), - *cols - ) - if kw.get('create',True): - self.table.create() - - def _assert_diff(self,col_A,col_B): - self._make_table(col_A) - self.meta.clear() - self._make_table(col_B,create=False) - diff = self._run_diff() - # print diff - self.assertTrue(diff) - self.assertEqual(1,len(diff.tables_different)) - td = list(diff.tables_different.values())[0] - self.assertEqual(1,len(td.columns_different)) - cd = list(td.columns_different.values())[0] - label_width = max(len(self.name1), len(self.name2)) - self.assertEqual(('Schema diffs:\n' - ' table with differences: xtable\n' - ' column with differences: data\n' - ' %*s: %r\n' - ' %*s: %r')%( - label_width, - self.name1, - cd.col_A, - label_width, - self.name2, - cd.col_B - ),str(diff)) - -class Test_getDiffOfModelAgainstDatabase(SchemaDiffBase): - name1 = 'model' - name2 = 'database' - - def _run_diff(self,**kw): - return schemadiff.getDiffOfModelAgainstDatabase( - self.meta, self.engine, **kw - ) - - @fixture.usedb() - def test_table_missing_in_db(self): - self._make_table(create=False) - diff = self._run_diff() - self.assertTrue(diff) - self.assertEqual('Schema diffs:\n tables missing from %s: xtable' % self.name2, - str(diff)) - - @fixture.usedb() - def test_table_missing_in_model(self): - self._make_table() - self.meta.clear() - diff = self._run_diff() - self.assertTrue(diff) - self.assertEqual('Schema diffs:\n tables missing from %s: xtable' % self.name1, - str(diff)) - - @fixture.usedb() - def test_column_missing_in_db(self): - # db - Table('xtable', self.meta, - Column('id',Integer(), primary_key=True), - ).create() - self.meta.clear() - # model - self._make_table( - Column('xcol',Integer()), - create=False - ) - # run diff - diff = self._run_diff() - self.assertTrue(diff) - self.assertEqual('Schema diffs:\n' - ' table with differences: xtable\n' - ' %s missing these columns: xcol' % self.name2, - str(diff)) - - @fixture.usedb() - def test_column_missing_in_model(self): - # db - self._make_table( - Column('xcol',Integer()), - ) - self.meta.clear() - # model - self._make_table( - create=False - ) - # run diff - diff = self._run_diff() - self.assertTrue(diff) - self.assertEqual('Schema diffs:\n' - ' table with differences: xtable\n' - ' %s missing these columns: xcol' % self.name1, - str(diff)) - - @fixture.usedb() - def test_exclude_tables(self): - # db - Table('ytable', self.meta, - Column('id',Integer(), primary_key=True), - ).create() - Table('ztable', self.meta, - Column('id',Integer(), primary_key=True), - ).create() - self.meta.clear() - # model - self._make_table( - create=False - ) - Table('ztable', self.meta, - Column('id',Integer(), primary_key=True), - ) - # run diff - diff = self._run_diff(excludeTables=('xtable','ytable')) - # ytable only in database - # xtable only in model - # ztable identical on both - # ...so we expect no diff! - self.assertFalse(diff) - self.assertEqual('No schema diffs',str(diff)) - - @fixture.usedb() - def test_identical_just_pk(self): - self._make_table() - diff = self._run_diff() - self.assertFalse(diff) - self.assertEqual('No schema diffs',str(diff)) - - - @fixture.usedb() - def test_different_type(self): - self._assert_diff( - Column('data', String(10)), - Column('data', Integer()), - ) - - @fixture.usedb() - def test_int_vs_float(self): - self._assert_diff( - Column('data', Integer()), - Column('data', Float()), - ) - - # NOTE(mriedem): The ibm_db_sa driver handles the Float() as a DOUBLE() - # which extends Numeric() but isn't defined in sqlalchemy.types, so we - # can't check for it as a special case like is done in schemadiff.ColDiff. - @fixture.usedb(not_supported='ibm_db_sa') - def test_float_vs_numeric(self): - self._assert_diff( - Column('data', Float()), - Column('data', Numeric()), - ) - - @fixture.usedb() - def test_numeric_precision(self): - self._assert_diff( - Column('data', Numeric(precision=5)), - Column('data', Numeric(precision=6)), - ) - - @fixture.usedb() - def test_numeric_scale(self): - self._assert_diff( - Column('data', Numeric(precision=6,scale=0)), - Column('data', Numeric(precision=6,scale=1)), - ) - - @fixture.usedb() - def test_string_length(self): - self._assert_diff( - Column('data', String(10)), - Column('data', String(20)), - ) - - @fixture.usedb() - def test_integer_identical(self): - self._make_table( - Column('data', Integer()), - ) - diff = self._run_diff() - self.assertEqual('No schema diffs',str(diff)) - self.assertFalse(diff) - - @fixture.usedb() - def test_string_identical(self): - self._make_table( - Column('data', String(10)), - ) - diff = self._run_diff() - self.assertEqual('No schema diffs',str(diff)) - self.assertFalse(diff) - - @fixture.usedb() - def test_text_identical(self): - self._make_table( - Column('data', Text), - ) - diff = self._run_diff() - self.assertEqual('No schema diffs',str(diff)) - self.assertFalse(diff) - -class Test_getDiffOfModelAgainstModel(Test_getDiffOfModelAgainstDatabase): - name1 = 'metadataA' - name2 = 'metadataB' - - def _run_diff(self,**kw): - db_meta= MetaData() - db_meta.reflect(self.engine) - return schemadiff.getDiffOfModelAgainstModel( - self.meta, db_meta, **kw - ) diff --git a/migrate/tests/versioning/test_script.py b/migrate/tests/versioning/test_script.py deleted file mode 100644 index 20e6af0..0000000 --- a/migrate/tests/versioning/test_script.py +++ /dev/null @@ -1,305 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import imp -import os -import sys -import shutil - -import six -from migrate import exceptions -from migrate.versioning import version, repository -from migrate.versioning.script import * -from migrate.versioning.util import * - -from migrate.tests import fixture -from migrate.tests.fixture.models import tmp_sql_table - - -class TestBaseScript(fixture.Pathed): - - def test_all(self): - """Testing all basic BaseScript operations""" - # verify / source / run - src = self.tmp() - open(src, 'w').close() - bscript = BaseScript(src) - BaseScript.verify(src) - self.assertEqual(bscript.source(), '') - self.assertRaises(NotImplementedError, bscript.run, 'foobar') - - -class TestPyScript(fixture.Pathed, fixture.DB): - cls = PythonScript - def test_create(self): - """We can create a migration script""" - path = self.tmp_py() - # Creating a file that doesn't exist should succeed - self.cls.create(path) - self.assertTrue(os.path.exists(path)) - # Created file should be a valid script (If not, raises an error) - self.cls.verify(path) - # Can't create it again: it already exists - self.assertRaises(exceptions.PathFoundError,self.cls.create,path) - - @fixture.usedb(supported='sqlite') - def test_run(self): - script_path = self.tmp_py() - pyscript = PythonScript.create(script_path) - pyscript.run(self.engine, 1) - pyscript.run(self.engine, -1) - - self.assertRaises(exceptions.ScriptError, pyscript.run, self.engine, 0) - self.assertRaises(exceptions.ScriptError, pyscript._func, 'foobar') - - # clean pyc file - if six.PY3: - os.remove(imp.cache_from_source(script_path)) - else: - os.remove(script_path + 'c') - - # test deprecated upgrade/downgrade with no arguments - contents = open(script_path, 'r').read() - f = open(script_path, 'w') - f.write(contents.replace("upgrade(migrate_engine)", "upgrade()")) - f.close() - - pyscript = PythonScript(script_path) - pyscript._module = None - try: - pyscript.run(self.engine, 1) - pyscript.run(self.engine, -1) - except exceptions.ScriptError: - pass - else: - self.fail() - - def test_verify_notfound(self): - """Correctly verify a python migration script: nonexistant file""" - path = self.tmp_py() - self.assertFalse(os.path.exists(path)) - # Fails on empty path - self.assertRaises(exceptions.InvalidScriptError,self.cls.verify,path) - self.assertRaises(exceptions.InvalidScriptError,self.cls,path) - - def test_verify_invalidpy(self): - """Correctly verify a python migration script: invalid python file""" - path=self.tmp_py() - # Create empty file - f = open(path,'w') - f.write("def fail") - f.close() - self.assertRaises(Exception,self.cls.verify_module,path) - # script isn't verified on creation, but on module reference - py = self.cls(path) - self.assertRaises(Exception,(lambda x: x.module),py) - - def test_verify_nofuncs(self): - """Correctly verify a python migration script: valid python file; no upgrade func""" - path = self.tmp_py() - # Create empty file - f = open(path, 'w') - f.write("def zergling():\n\tprint('rush')") - f.close() - self.assertRaises(exceptions.InvalidScriptError, self.cls.verify_module, path) - # script isn't verified on creation, but on module reference - py = self.cls(path) - self.assertRaises(exceptions.InvalidScriptError,(lambda x: x.module),py) - - @fixture.usedb(supported='sqlite') - def test_preview_sql(self): - """Preview SQL abstract from ORM layer (sqlite)""" - path = self.tmp_py() - - f = open(path, 'w') - content = ''' -from migrate import * -from sqlalchemy import * - -metadata = MetaData() - -UserGroup = Table('Link', metadata, - Column('link1ID', Integer), - Column('link2ID', Integer), - UniqueConstraint('link1ID', 'link2ID')) - -def upgrade(migrate_engine): - metadata.create_all(migrate_engine) - ''' - f.write(content) - f.close() - - pyscript = self.cls(path) - SQL = pyscript.preview_sql(self.url, 1) - self.assertEqualIgnoreWhitespace(""" - CREATE TABLE "Link" - ("link1ID" INTEGER, - "link2ID" INTEGER, - UNIQUE ("link1ID", "link2ID")) - """, SQL) - # TODO: test: No SQL should be executed! - - def test_verify_success(self): - """Correctly verify a python migration script: success""" - path = self.tmp_py() - # Succeeds after creating - self.cls.create(path) - self.cls.verify(path) - - # test for PythonScript.make_update_script_for_model - - @fixture.usedb() - def test_make_update_script_for_model(self): - """Construct script source from differences of two models""" - - self.setup_model_params() - self.write_file(self.first_model_path, self.base_source) - self.write_file(self.second_model_path, self.base_source + self.model_source) - - source_script = self.pyscript.make_update_script_for_model( - engine=self.engine, - oldmodel=load_model('testmodel_first:meta'), - model=load_model('testmodel_second:meta'), - repository=self.repo_path, - ) - - self.assertTrue("['User'].create()" in source_script) - self.assertTrue("['User'].drop()" in source_script) - - @fixture.usedb() - def test_make_update_script_for_equal_models(self): - """Try to make update script from two identical models""" - - self.setup_model_params() - self.write_file(self.first_model_path, self.base_source + self.model_source) - self.write_file(self.second_model_path, self.base_source + self.model_source) - - source_script = self.pyscript.make_update_script_for_model( - engine=self.engine, - oldmodel=load_model('testmodel_first:meta'), - model=load_model('testmodel_second:meta'), - repository=self.repo_path, - ) - - self.assertFalse('User.create()' in source_script) - self.assertFalse('User.drop()' in source_script) - - @fixture.usedb() - def test_make_update_script_direction(self): - """Check update scripts go in the right direction""" - - self.setup_model_params() - self.write_file(self.first_model_path, self.base_source) - self.write_file(self.second_model_path, self.base_source + self.model_source) - - source_script = self.pyscript.make_update_script_for_model( - engine=self.engine, - oldmodel=load_model('testmodel_first:meta'), - model=load_model('testmodel_second:meta'), - repository=self.repo_path, - ) - - self.assertTrue(0 - < source_script.find('upgrade') - < source_script.find("['User'].create()") - < source_script.find('downgrade') - < source_script.find("['User'].drop()")) - - def setup_model_params(self): - self.script_path = self.tmp_py() - self.repo_path = self.tmp() - self.first_model_path = os.path.join(self.temp_usable_dir, 'testmodel_first.py') - self.second_model_path = os.path.join(self.temp_usable_dir, 'testmodel_second.py') - - self.base_source = """from sqlalchemy import *\nmeta = MetaData()\n""" - self.model_source = """ -User = Table('User', meta, - Column('id', Integer, primary_key=True), - Column('login', Unicode(40)), - Column('passwd', String(40)), -)""" - - self.repo = repository.Repository.create(self.repo_path, 'repo') - self.pyscript = PythonScript.create(self.script_path) - sys.modules.pop('testmodel_first', None) - sys.modules.pop('testmodel_second', None) - - def write_file(self, path, contents): - f = open(path, 'w') - f.write(contents) - f.close() - - -class TestSqlScript(fixture.Pathed, fixture.DB): - - @fixture.usedb() - def test_error(self): - """Test if exception is raised on wrong script source""" - src = self.tmp() - - f = open(src, 'w') - f.write("""foobar""") - f.close() - - sqls = SqlScript(src) - self.assertRaises(Exception, sqls.run, self.engine) - - @fixture.usedb() - def test_success(self): - """Test sucessful SQL execution""" - # cleanup and prepare python script - tmp_sql_table.metadata.drop_all(self.engine, checkfirst=True) - script_path = self.tmp_py() - pyscript = PythonScript.create(script_path) - - # populate python script - contents = open(script_path, 'r').read() - contents = contents.replace("pass", "tmp_sql_table.create(migrate_engine)") - contents = 'from migrate.tests.fixture.models import tmp_sql_table\n' + contents - f = open(script_path, 'w') - f.write(contents) - f.close() - - # write SQL script from python script preview - pyscript = PythonScript(script_path) - src = self.tmp() - f = open(src, 'w') - f.write(pyscript.preview_sql(self.url, 1)) - f.close() - - # run the change - sqls = SqlScript(src) - sqls.run(self.engine) - tmp_sql_table.metadata.drop_all(self.engine, checkfirst=True) - - @fixture.usedb() - def test_transaction_management_statements(self): - """ - Test that we can successfully execute SQL scripts with transaction - management statements. - """ - for script_pattern in ( - "BEGIN TRANSACTION; %s; COMMIT;", - "BEGIN; %s; END TRANSACTION;", - "/* comment */BEGIN TRANSACTION; %s; /* comment */COMMIT;", - "/* comment */ BEGIN TRANSACTION; %s; /* comment */ COMMIT;", - """ --- comment -BEGIN TRANSACTION; - -%s; - --- comment -COMMIT;""", - ): - - test_statement = ("CREATE TABLE TEST1 (field1 int); " - "DROP TABLE TEST1") - script = script_pattern % test_statement - src = self.tmp() - - with open(src, 'wt') as f: - f.write(script) - - sqls = SqlScript(src) - sqls.run(self.engine) diff --git a/migrate/tests/versioning/test_shell.py b/migrate/tests/versioning/test_shell.py deleted file mode 100644 index 001efcf..0000000 --- a/migrate/tests/versioning/test_shell.py +++ /dev/null @@ -1,574 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import sys -import tempfile - -import six -from six.moves import cStringIO -from sqlalchemy import MetaData, Table - -from migrate.exceptions import * -from migrate.versioning.repository import Repository -from migrate.versioning import genmodel, shell, api -from migrate.tests.fixture import Shell, DB, usedb -from migrate.tests.fixture import models - - -class TestShellCommands(Shell): - """Tests migrate.py commands""" - - def test_help(self): - """Displays default help dialog""" - self.assertEqual(self.env.run('migrate -h').returncode, 0) - self.assertEqual(self.env.run('migrate --help').returncode, 0) - self.assertEqual(self.env.run('migrate help').returncode, 0) - - def test_help_commands(self): - """Display help on a specific command""" - # we can only test that we get some output - for cmd in api.__all__: - result = self.env.run('migrate help %s' % cmd) - self.assertTrue(isinstance(result.stdout, six.string_types)) - self.assertTrue(result.stdout) - self.assertFalse(result.stderr) - - def test_shutdown_logging(self): - """Try to shutdown logging output""" - repos = self.tmp_repos() - result = self.env.run('migrate create %s repository_name' % repos) - result = self.env.run('migrate version %s --disable_logging' % repos) - self.assertEqual(result.stdout, '') - result = self.env.run('migrate version %s -q' % repos) - self.assertEqual(result.stdout, '') - - # TODO: assert logging messages to 0 - shell.main(['version', repos], logging=False) - - def test_main_with_runpy(self): - if sys.version_info[:2] == (2, 4): - self.skipTest("runpy is not part of python2.4") - from runpy import run_module - try: - original = sys.argv - sys.argv=['X','--help'] - - run_module('migrate.versioning.shell', run_name='__main__') - - finally: - sys.argv = original - - def _check_error(self,args,code,expected,**kw): - original = sys.stderr - try: - actual = cStringIO() - sys.stderr = actual - try: - shell.main(args,**kw) - except SystemExit as e: - self.assertEqual(code,e.args[0]) - else: - self.fail('No exception raised') - finally: - sys.stderr = original - actual = actual.getvalue() - self.assertTrue(expected in actual,'%r not in:\n"""\n%s\n"""'%(expected,actual)) - - def test_main(self): - """Test main() function""" - repos = self.tmp_repos() - shell.main(['help']) - shell.main(['help', 'create']) - shell.main(['create', 'repo_name', '--preview_sql'], repository=repos) - shell.main(['version', '--', '--repository=%s' % repos]) - shell.main(['version', '-d', '--repository=%s' % repos, '--version=2']) - - self._check_error(['foobar'],2,'error: Invalid command foobar') - self._check_error(['create', 'f', 'o', 'o'],2,'error: Too many arguments for command create: o') - self._check_error(['create'],2,'error: Not enough arguments for command create: name, repository not specified') - self._check_error(['create', 'repo_name'],2,'already exists', repository=repos) - - def test_create(self): - """Repositories are created successfully""" - repos = self.tmp_repos() - - # Creating a file that doesn't exist should succeed - result = self.env.run('migrate create %s repository_name' % repos) - - # Files should actually be created - self.assertTrue(os.path.exists(repos)) - - # The default table should not be None - repos_ = Repository(repos) - self.assertNotEqual(repos_.config.get('db_settings', 'version_table'), 'None') - - # Can't create it again: it already exists - result = self.env.run('migrate create %s repository_name' % repos, - expect_error=True) - self.assertEqual(result.returncode, 2) - - def test_script(self): - """We can create a migration script via the command line""" - repos = self.tmp_repos() - result = self.env.run('migrate create %s repository_name' % repos) - - result = self.env.run('migrate script --repository=%s Desc' % repos) - self.assertTrue(os.path.exists('%s/versions/001_Desc.py' % repos)) - - result = self.env.run('migrate script More %s' % repos) - self.assertTrue(os.path.exists('%s/versions/002_More.py' % repos)) - - result = self.env.run('migrate script "Some Random name" %s' % repos) - self.assertTrue(os.path.exists('%s/versions/003_Some_Random_name.py' % repos)) - - def test_script_sql(self): - """We can create a migration sql script via the command line""" - repos = self.tmp_repos() - result = self.env.run('migrate create %s repository_name' % repos) - - result = self.env.run('migrate script_sql mydb foo %s' % repos) - self.assertTrue(os.path.exists('%s/versions/001_foo_mydb_upgrade.sql' % repos)) - self.assertTrue(os.path.exists('%s/versions/001_foo_mydb_downgrade.sql' % repos)) - - # Test creating a second - result = self.env.run('migrate script_sql postgres foo --repository=%s' % repos) - self.assertTrue(os.path.exists('%s/versions/002_foo_postgres_upgrade.sql' % repos)) - self.assertTrue(os.path.exists('%s/versions/002_foo_postgres_downgrade.sql' % repos)) - - # TODO: test --previews - - def test_manage(self): - """Create a project management script""" - script = self.tmp_py() - self.assertTrue(not os.path.exists(script)) - - # No attempt is made to verify correctness of the repository path here - result = self.env.run('migrate manage %s --repository=/bla/' % script) - self.assertTrue(os.path.exists(script)) - - -class TestShellRepository(Shell): - """Shell commands on an existing repository/python script""" - - def setUp(self): - """Create repository, python change script""" - super(TestShellRepository, self).setUp() - self.path_repos = self.tmp_repos() - result = self.env.run('migrate create %s repository_name' % self.path_repos) - - def test_version(self): - """Correctly detect repository version""" - # Version: 0 (no scripts yet); successful execution - result = self.env.run('migrate version --repository=%s' % self.path_repos) - self.assertEqual(result.stdout.strip(), "0") - - # Also works as a positional param - result = self.env.run('migrate version %s' % self.path_repos) - self.assertEqual(result.stdout.strip(), "0") - - # Create a script and version should increment - result = self.env.run('migrate script Desc %s' % self.path_repos) - result = self.env.run('migrate version %s' % self.path_repos) - self.assertEqual(result.stdout.strip(), "1") - - def test_source(self): - """Correctly fetch a script's source""" - result = self.env.run('migrate script Desc --repository=%s' % self.path_repos) - - filename = '%s/versions/001_Desc.py' % self.path_repos - source = open(filename).read() - self.assertTrue(source.find('def upgrade') >= 0) - - # Version is now 1 - result = self.env.run('migrate version %s' % self.path_repos) - self.assertEqual(result.stdout.strip(), "1") - - # Output/verify the source of version 1 - result = self.env.run('migrate source 1 --repository=%s' % self.path_repos) - self.assertEqual(result.stdout.strip(), source.strip()) - - # We can also send the source to a file... test that too - result = self.env.run('migrate source 1 %s --repository=%s' % - (filename, self.path_repos)) - self.assertTrue(os.path.exists(filename)) - fd = open(filename) - result = fd.read() - self.assertTrue(result.strip() == source.strip()) - - -class TestShellDatabase(Shell, DB): - """Commands associated with a particular database""" - # We'll need to clean up after ourself, since the shell creates its own txn; - # we need to connect to the DB to see if things worked - - level = DB.CONNECT - - @usedb() - def test_version_control(self): - """Ensure we can set version control on a database""" - path_repos = repos = self.tmp_repos() - url = self.url - result = self.env.run('migrate create %s repository_name' % repos) - - result = self.env.run('migrate drop_version_control %(url)s %(repos)s'\ - % locals(), expect_error=True) - self.assertEqual(result.returncode, 1) - result = self.env.run('migrate version_control %(url)s %(repos)s' % locals()) - - # Clean up - result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals()) - # Attempting to drop vc from a database without it should fail - result = self.env.run('migrate drop_version_control %(url)s %(repos)s'\ - % locals(), expect_error=True) - self.assertEqual(result.returncode, 1) - - @usedb() - def test_wrapped_kwargs(self): - """Commands with default arguments set by manage.py""" - path_repos = repos = self.tmp_repos() - url = self.url - result = self.env.run('migrate create --name=repository_name %s' % repos) - result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals(), expect_error=True) - self.assertEqual(result.returncode, 1) - result = self.env.run('migrate version_control %(url)s %(repos)s' % locals()) - - result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals()) - - @usedb() - def test_version_control_specified(self): - """Ensure we can set version control to a particular version""" - path_repos = self.tmp_repos() - url = self.url - result = self.env.run('migrate create --name=repository_name %s' % path_repos) - result = self.env.run('migrate drop_version_control %(url)s %(path_repos)s' % locals(), expect_error=True) - self.assertEqual(result.returncode, 1) - - # Fill the repository - path_script = self.tmp_py() - version = 2 - for i in range(version): - result = self.env.run('migrate script Desc --repository=%s' % path_repos) - - # Repository version is correct - result = self.env.run('migrate version %s' % path_repos) - self.assertEqual(result.stdout.strip(), str(version)) - - # Apply versioning to DB - result = self.env.run('migrate version_control %(url)s %(path_repos)s %(version)s' % locals()) - - # Test db version number (should start at 2) - result = self.env.run('migrate db_version %(url)s %(path_repos)s' % locals()) - self.assertEqual(result.stdout.strip(), str(version)) - - # Clean up - result = self.env.run('migrate drop_version_control %(url)s %(path_repos)s' % locals()) - - @usedb() - def test_upgrade(self): - """Can upgrade a versioned database""" - # Create a repository - repos_name = 'repos_name' - repos_path = self.tmp() - result = self.env.run('migrate create %(repos_path)s %(repos_name)s' % locals()) - self.assertEqual(self.run_version(repos_path), 0) - - # Version the DB - result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True) - result = self.env.run('migrate version_control %s %s' % (self.url, repos_path)) - - # Upgrades with latest version == 0 - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - result = self.env.run('migrate upgrade %s %s 1' % (self.url, repos_path), expect_error=True) - self.assertEqual(result.returncode, 1) - result = self.env.run('migrate upgrade %s %s -1' % (self.url, repos_path), expect_error=True) - self.assertEqual(result.returncode, 2) - - # Add a script to the repository; upgrade the db - result = self.env.run('migrate script Desc --repository=%s' % (repos_path)) - self.assertEqual(self.run_version(repos_path), 1) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - # Test preview - result = self.env.run('migrate upgrade %s %s 0 --preview_sql' % (self.url, repos_path)) - result = self.env.run('migrate upgrade %s %s 0 --preview_py' % (self.url, repos_path)) - - result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_db_version(self.url, repos_path), 1) - - # Downgrade must have a valid version specified - result = self.env.run('migrate downgrade %s %s' % (self.url, repos_path), expect_error=True) - self.assertEqual(result.returncode, 2) - result = self.env.run('migrate downgrade %s %s -1' % (self.url, repos_path), expect_error=True) - self.assertEqual(result.returncode, 2) - result = self.env.run('migrate downgrade %s %s 2' % (self.url, repos_path), expect_error=True) - self.assertEqual(result.returncode, 2) - self.assertEqual(self.run_db_version(self.url, repos_path), 1) - - result = self.env.run('migrate downgrade %s %s 0' % (self.url, repos_path)) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - result = self.env.run('migrate downgrade %s %s 1' % (self.url, repos_path), expect_error=True) - self.assertEqual(result.returncode, 2) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path)) - - def _run_test_sqlfile(self, upgrade_script, downgrade_script): - # TODO: add test script that checks if db really changed - repos_path = self.tmp() - repos_name = 'repos' - - result = self.env.run('migrate create %s %s' % (repos_path, repos_name)) - result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True) - result = self.env.run('migrate version_control %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_version(repos_path), 0) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - beforeCount = len(os.listdir(os.path.join(repos_path, 'versions'))) # hmm, this number changes sometimes based on running from svn - result = self.env.run('migrate script_sql %s --repository=%s' % ('postgres', repos_path)) - self.assertEqual(self.run_version(repos_path), 1) - self.assertEqual(len(os.listdir(os.path.join(repos_path, 'versions'))), beforeCount + 2) - - open('%s/versions/001_postgres_upgrade.sql' % repos_path, 'a').write(upgrade_script) - open('%s/versions/001_postgres_downgrade.sql' % repos_path, 'a').write(downgrade_script) - - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - self.assertRaises(Exception, self.engine.text('select * from t_table').execute) - - result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_db_version(self.url, repos_path), 1) - self.engine.text('select * from t_table').execute() - - result = self.env.run('migrate downgrade %s %s 0' % (self.url, repos_path)) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - self.assertRaises(Exception, self.engine.text('select * from t_table').execute) - - # The tests below are written with some postgres syntax, but the stuff - # being tested (.sql files) ought to work with any db. - @usedb(supported='postgres') - def test_sqlfile(self): - upgrade_script = """ - create table t_table ( - id serial, - primary key(id) - ); - """ - downgrade_script = """ - drop table t_table; - """ - self.meta.drop_all() - self._run_test_sqlfile(upgrade_script, downgrade_script) - - @usedb(supported='postgres') - def test_sqlfile_comment(self): - upgrade_script = """ - -- Comments in SQL break postgres autocommit - create table t_table ( - id serial, - primary key(id) - ); - """ - downgrade_script = """ - -- Comments in SQL break postgres autocommit - drop table t_table; - """ - self._run_test_sqlfile(upgrade_script, downgrade_script) - - @usedb() - def test_command_test(self): - repos_name = 'repos_name' - repos_path = self.tmp() - - result = self.env.run('migrate create repository_name --repository=%s' % repos_path) - result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True) - result = self.env.run('migrate version_control %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_version(repos_path), 0) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - # Empty script should succeed - result = self.env.run('migrate script Desc %s' % repos_path) - result = self.env.run('migrate test %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_version(repos_path), 1) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - # Error script should fail - script_path = self.tmp_py() - script_text=''' - from sqlalchemy import * - from migrate import * - - def upgrade(): - print 'fgsfds' - raise Exception() - - def downgrade(): - print 'sdfsgf' - raise Exception() - '''.replace("\n ", "\n") - file = open(script_path, 'w') - file.write(script_text) - file.close() - - result = self.env.run('migrate test %s %s bla' % (self.url, repos_path), expect_error=True) - self.assertEqual(result.returncode, 2) - self.assertEqual(self.run_version(repos_path), 1) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - # Nonempty script using migrate_engine should succeed - script_path = self.tmp_py() - script_text = ''' - from sqlalchemy import * - from migrate import * - - from migrate.changeset import schema - - meta = MetaData(migrate_engine) - account = Table('account', meta, - Column('id', Integer, primary_key=True), - Column('login', Text), - Column('passwd', Text), - ) - def upgrade(): - # Upgrade operations go here. Don't create your own engine; use the engine - # named 'migrate_engine' imported from migrate. - meta.create_all() - - def downgrade(): - # Operations to reverse the above upgrade go here. - meta.drop_all() - '''.replace("\n ", "\n") - file = open(script_path, 'w') - file.write(script_text) - file.close() - result = self.env.run('migrate test %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_version(repos_path), 1) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - @usedb() - def test_rundiffs_in_shell(self): - # This is a variant of the test_schemadiff tests but run through the shell level. - # These shell tests are hard to debug (since they keep forking processes) - # so they shouldn't replace the lower-level tests. - repos_name = 'repos_name' - repos_path = self.tmp() - script_path = self.tmp_py() - model_module = 'migrate.tests.fixture.models:meta_rundiffs' - old_model_module = 'migrate.tests.fixture.models:meta_old_rundiffs' - - # Create empty repository. - self.meta = MetaData(self.engine) - self.meta.reflect() - self.meta.drop_all() # in case junk tables are lying around in the test database - - result = self.env.run( - 'migrate create %s %s' % (repos_path, repos_name), - expect_stderr=True) - result = self.env.run( - 'migrate drop_version_control %s %s' % (self.url, repos_path), - expect_stderr=True, expect_error=True) - result = self.env.run( - 'migrate version_control %s %s' % (self.url, repos_path), - expect_stderr=True) - self.assertEqual(self.run_version(repos_path), 0) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - # Setup helper script. - result = self.env.run( - 'migrate manage %s --repository=%s --url=%s --model=%s'\ - % (script_path, repos_path, self.url, model_module), - expect_stderr=True) - self.assertTrue(os.path.exists(script_path)) - - # Model is defined but database is empty. - result = self.env.run('migrate compare_model_to_db %s %s --model=%s' \ - % (self.url, repos_path, model_module), expect_stderr=True) - self.assertTrue( - "tables missing from database: tmp_account_rundiffs" - in result.stdout) - - # Test Deprecation - result = self.env.run('migrate compare_model_to_db %s %s --model=%s' \ - % (self.url, repos_path, model_module.replace(":", ".")), - expect_stderr=True, expect_error=True) - self.assertEqual(result.returncode, 0) - self.assertTrue( - "tables missing from database: tmp_account_rundiffs" - in result.stdout) - - # Update db to latest model. - result = self.env.run('migrate update_db_from_model %s %s %s'\ - % (self.url, repos_path, model_module), expect_stderr=True) - self.assertEqual(self.run_version(repos_path), 0) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) # version did not get bumped yet because new version not yet created - - result = self.env.run('migrate compare_model_to_db %s %s %s'\ - % (self.url, repos_path, model_module), expect_stderr=True) - self.assertTrue("No schema diffs" in result.stdout) - - result = self.env.run( - 'migrate drop_version_control %s %s' % (self.url, repos_path), - expect_stderr=True, expect_error=True) - result = self.env.run( - 'migrate version_control %s %s' % (self.url, repos_path), - expect_stderr=True) - - result = self.env.run( - 'migrate create_model %s %s' % (self.url, repos_path), - expect_stderr=True) - temp_dict = dict() - six.exec_(result.stdout, temp_dict) - - # TODO: breaks on SA06 and SA05 - in need of total refactor - use different approach - - # TODO: compare whole table - self.compare_columns_equal(models.tmp_account_rundiffs.c, temp_dict['tmp_account_rundiffs'].c, ['type']) - ##self.assertTrue("""tmp_account_rundiffs = Table('tmp_account_rundiffs', meta, - ##Column('id', Integer(), primary_key=True, nullable=False), - ##Column('login', String(length=None, convert_unicode=False, assert_unicode=None)), - ##Column('passwd', String(length=None, convert_unicode=False, assert_unicode=None))""" in result.stdout) - - ## We're happy with db changes, make first db upgrade script to go from version 0 -> 1. - #result = self.env.run('migrate make_update_script_for_model', expect_error=True, expect_stderr=True) - #self.assertTrue('Not enough arguments' in result.stderr) - - #result_script = self.env.run('migrate make_update_script_for_model %s %s %s %s'\ - #% (self.url, repos_path, old_model_module, model_module)) - #self.assertEqualIgnoreWhitespace(result_script.stdout, - #'''from sqlalchemy import * - #from migrate import * - - #from migrate.changeset import schema - - #meta = MetaData() - #tmp_account_rundiffs = Table('tmp_account_rundiffs', meta, - #Column('id', Integer(), primary_key=True, nullable=False), - #Column('login', Text(length=None, convert_unicode=False, assert_unicode=None, unicode_error=None, _warn_on_bytestring=False)), - #Column('passwd', Text(length=None, convert_unicode=False, assert_unicode=None, unicode_error=None, _warn_on_bytestring=False)), - #) - - #def upgrade(migrate_engine): - ## Upgrade operations go here. Don't create your own engine; bind migrate_engine - ## to your metadata - #meta.bind = migrate_engine - #tmp_account_rundiffs.create() - - #def downgrade(migrate_engine): - ## Operations to reverse the above upgrade go here. - #meta.bind = migrate_engine - #tmp_account_rundiffs.drop()''') - - ## Save the upgrade script. - #result = self.env.run('migrate script Desc %s' % repos_path) - #upgrade_script_path = '%s/versions/001_Desc.py' % repos_path - #open(upgrade_script_path, 'w').write(result_script.stdout) - - #result = self.env.run('migrate compare_model_to_db %s %s %s'\ - #% (self.url, repos_path, model_module)) - #self.assertTrue("No schema diffs" in result.stdout) - - self.meta.drop_all() # in case junk tables are lying around in the test database diff --git a/migrate/tests/versioning/test_template.py b/migrate/tests/versioning/test_template.py deleted file mode 100644 index a079d8b..0000000 --- a/migrate/tests/versioning/test_template.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -import os -import shutil - -import migrate.versioning.templates -from migrate.versioning.template import * -from migrate.versioning import api - -from migrate.tests import fixture - - -class TestTemplate(fixture.Pathed): - def test_templates(self): - """We can find the path to all repository templates""" - path = str(Template()) - self.assertTrue(os.path.exists(path)) - - def test_repository(self): - """We can find the path to the default repository""" - path = Template().get_repository() - self.assertTrue(os.path.exists(path)) - - def test_script(self): - """We can find the path to the default migration script""" - path = Template().get_script() - self.assertTrue(os.path.exists(path)) - - def test_custom_templates_and_themes(self): - """Users can define their own templates with themes""" - new_templates_dir = os.path.join(self.temp_usable_dir, 'templates') - manage_tmpl_file = os.path.join(new_templates_dir, 'manage/custom.py_tmpl') - repository_tmpl_file = os.path.join(new_templates_dir, 'repository/custom/README') - script_tmpl_file = os.path.join(new_templates_dir, 'script/custom.py_tmpl') - sql_script_tmpl_file = os.path.join(new_templates_dir, 'sql_script/custom.py_tmpl') - - MANAGE_CONTENTS = 'print "manage.py"' - README_CONTENTS = 'MIGRATE README!' - SCRIPT_FILE_CONTENTS = 'print "script.py"' - new_repo_dest = self.tmp_repos() - new_manage_dest = self.tmp_py() - - # make new templates dir - shutil.copytree(migrate.versioning.templates.__path__[0], new_templates_dir) - shutil.copytree(os.path.join(new_templates_dir, 'repository/default'), - os.path.join(new_templates_dir, 'repository/custom')) - - # edit templates - f = open(manage_tmpl_file, 'w').write(MANAGE_CONTENTS) - f = open(repository_tmpl_file, 'w').write(README_CONTENTS) - f = open(script_tmpl_file, 'w').write(SCRIPT_FILE_CONTENTS) - f = open(sql_script_tmpl_file, 'w').write(SCRIPT_FILE_CONTENTS) - - # create repository, manage file and python script - kw = {} - kw['templates_path'] = new_templates_dir - kw['templates_theme'] = 'custom' - api.create(new_repo_dest, 'repo_name', **kw) - api.script('test', new_repo_dest, **kw) - api.script_sql('postgres', 'foo', new_repo_dest, **kw) - api.manage(new_manage_dest, **kw) - - # assert changes - self.assertEqual(open(new_manage_dest).read(), MANAGE_CONTENTS) - self.assertEqual(open(os.path.join(new_repo_dest, 'manage.py')).read(), MANAGE_CONTENTS) - self.assertEqual(open(os.path.join(new_repo_dest, 'README')).read(), README_CONTENTS) - self.assertEqual(open(os.path.join(new_repo_dest, 'versions/001_test.py')).read(), SCRIPT_FILE_CONTENTS) - self.assertEqual(open(os.path.join(new_repo_dest, 'versions/002_foo_postgres_downgrade.sql')).read(), SCRIPT_FILE_CONTENTS) - self.assertEqual(open(os.path.join(new_repo_dest, 'versions/002_foo_postgres_upgrade.sql')).read(), SCRIPT_FILE_CONTENTS) diff --git a/migrate/tests/versioning/test_util.py b/migrate/tests/versioning/test_util.py deleted file mode 100644 index 21e3f27..0000000 --- a/migrate/tests/versioning/test_util.py +++ /dev/null @@ -1,139 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os - -from sqlalchemy import * - -from migrate.exceptions import MigrateDeprecationWarning -from migrate.tests import fixture -from migrate.tests.fixture.warnings import catch_warnings -from migrate.versioning.util import * -from migrate.versioning import api - -import warnings - -class TestUtil(fixture.Pathed): - - def test_construct_engine(self): - """Construct engine the smart way""" - url = 'sqlite://' - - engine = construct_engine(url) - self.assertTrue(engine.name == 'sqlite') - - # keyword arg - engine = construct_engine(url, engine_arg_encoding='utf-8') - self.assertEqual(engine.dialect.encoding, 'utf-8') - - # dict - engine = construct_engine(url, engine_dict={'encoding': 'utf-8'}) - self.assertEqual(engine.dialect.encoding, 'utf-8') - - # engine parameter - engine_orig = create_engine('sqlite://') - engine = construct_engine(engine_orig) - self.assertEqual(engine, engine_orig) - - # test precedance - engine = construct_engine(url, engine_dict={'encoding': 'iso-8859-1'}, - engine_arg_encoding='utf-8') - self.assertEqual(engine.dialect.encoding, 'utf-8') - - # deprecated echo=True parameter - try: - # py 2.4 compatibility :-/ - cw = catch_warnings(record=True) - w = cw.__enter__() - - warnings.simplefilter("always") - engine = construct_engine(url, echo='True') - self.assertTrue(engine.echo) - - self.assertEqual(len(w),1) - self.assertTrue(issubclass(w[-1].category, - MigrateDeprecationWarning)) - self.assertEqual( - 'echo=True parameter is deprecated, pass ' - 'engine_arg_echo=True or engine_dict={"echo": True}', - str(w[-1].message)) - - finally: - cw.__exit__() - - # unsupported argument - self.assertRaises(ValueError, construct_engine, 1) - - def test_passing_engine(self): - repo = self.tmp_repos() - api.create(repo, 'temp') - api.script('First Version', repo) - engine = construct_engine('sqlite:///:memory:') - - api.version_control(engine, repo) - api.upgrade(engine, repo) - - def test_asbool(self): - """test asbool parsing""" - result = asbool(True) - self.assertEqual(result, True) - - result = asbool(False) - self.assertEqual(result, False) - - result = asbool('y') - self.assertEqual(result, True) - - result = asbool('n') - self.assertEqual(result, False) - - self.assertRaises(ValueError, asbool, 'test') - self.assertRaises(ValueError, asbool, object) - - - def test_load_model(self): - """load model from dotted name""" - model_path = os.path.join(self.temp_usable_dir, 'test_load_model.py') - - f = open(model_path, 'w') - f.write("class FakeFloat(int): pass") - f.close() - - try: - # py 2.4 compatibility :-/ - cw = catch_warnings(record=True) - w = cw.__enter__() - - warnings.simplefilter("always") - - # deprecated spelling - FakeFloat = load_model('test_load_model.FakeFloat') - self.assertTrue(isinstance(FakeFloat(), int)) - - self.assertEqual(len(w),1) - self.assertTrue(issubclass(w[-1].category, - MigrateDeprecationWarning)) - self.assertEqual( - 'model should be in form of module.model:User ' - 'and not module.model.User', - str(w[-1].message)) - - finally: - cw.__exit__() - - FakeFloat = load_model('test_load_model:FakeFloat') - self.assertTrue(isinstance(FakeFloat(), int)) - - FakeFloat = load_model(FakeFloat) - self.assertTrue(isinstance(FakeFloat(), int)) - - def test_guess_obj_type(self): - """guess object type from string""" - result = guess_obj_type('7') - self.assertEqual(result, 7) - - result = guess_obj_type('y') - self.assertEqual(result, True) - - result = guess_obj_type('test') - self.assertEqual(result, 'test') diff --git a/migrate/tests/versioning/test_version.py b/migrate/tests/versioning/test_version.py deleted file mode 100644 index 286dd59..0000000 --- a/migrate/tests/versioning/test_version.py +++ /dev/null @@ -1,186 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from migrate.exceptions import * -from migrate.versioning.version import * - -from migrate.tests import fixture - - -class TestVerNum(fixture.Base): - def test_invalid(self): - """Disallow invalid version numbers""" - versions = ('-1', -1, 'Thirteen', '') - for version in versions: - self.assertRaises(ValueError, VerNum, version) - - def test_str(self): - """Test str and repr version numbers""" - self.assertEqual(str(VerNum(2)), '2') - self.assertEqual(repr(VerNum(2)), '<VerNum(2)>') - - def test_is(self): - """Two version with the same number should be equal""" - a = VerNum(1) - b = VerNum(1) - self.assertTrue(a is b) - - self.assertEqual(VerNum(VerNum(2)), VerNum(2)) - - def test_add(self): - self.assertEqual(VerNum(1) + VerNum(1), VerNum(2)) - self.assertEqual(VerNum(1) + 1, 2) - self.assertEqual(VerNum(1) + 1, '2') - self.assertTrue(isinstance(VerNum(1) + 1, VerNum)) - - def test_sub(self): - self.assertEqual(VerNum(1) - 1, 0) - self.assertTrue(isinstance(VerNum(1) - 1, VerNum)) - self.assertRaises(ValueError, lambda: VerNum(0) - 1) - - def test_eq(self): - """Two versions are equal""" - self.assertEqual(VerNum(1), VerNum('1')) - self.assertEqual(VerNum(1), 1) - self.assertEqual(VerNum(1), '1') - self.assertNotEqual(VerNum(1), 2) - - def test_ne(self): - self.assertTrue(VerNum(1) != 2) - self.assertFalse(VerNum(1) != 1) - - def test_lt(self): - self.assertFalse(VerNum(1) < 1) - self.assertTrue(VerNum(1) < 2) - self.assertFalse(VerNum(2) < 1) - - def test_le(self): - self.assertTrue(VerNum(1) <= 1) - self.assertTrue(VerNum(1) <= 2) - self.assertFalse(VerNum(2) <= 1) - - def test_gt(self): - self.assertFalse(VerNum(1) > 1) - self.assertFalse(VerNum(1) > 2) - self.assertTrue(VerNum(2) > 1) - - def test_ge(self): - self.assertTrue(VerNum(1) >= 1) - self.assertTrue(VerNum(2) >= 1) - self.assertFalse(VerNum(1) >= 2) - - def test_int_cast(self): - ver = VerNum(3) - # test __int__ - self.assertEqual(int(ver), 3) - # test __index__: range() doesn't call __int__ - self.assertEqual(list(range(ver, ver)), []) - - -class TestVersion(fixture.Pathed): - - def setUp(self): - super(TestVersion, self).setUp() - - def test_str_to_filename(self): - self.assertEqual(str_to_filename(''), '') - self.assertEqual(str_to_filename('__'), '_') - self.assertEqual(str_to_filename('a'), 'a') - self.assertEqual(str_to_filename('Abc Def'), 'Abc_Def') - self.assertEqual(str_to_filename('Abc "D" Ef'), 'Abc_D_Ef') - self.assertEqual(str_to_filename("Abc's Stuff"), 'Abc_s_Stuff') - self.assertEqual(str_to_filename("a b"), 'a_b') - self.assertEqual(str_to_filename("a.b to c"), 'a_b_to_c') - - def test_collection(self): - """Let's see how we handle versions collection""" - coll = Collection(self.temp_usable_dir) - coll.create_new_python_version("foo bar") - coll.create_new_sql_version("postgres", "foo bar") - coll.create_new_sql_version("sqlite", "foo bar") - coll.create_new_python_version("") - - self.assertEqual(coll.latest, 4) - self.assertEqual(len(coll.versions), 4) - self.assertEqual(coll.version(4), coll.version(coll.latest)) - # Check for non-existing version - self.assertRaises(VersionNotFoundError, coll.version, 5) - # Check for the current version - self.assertEqual('4', coll.version(4).version) - - coll2 = Collection(self.temp_usable_dir) - self.assertEqual(coll.versions, coll2.versions) - - Collection.clear() - - def test_old_repository(self): - open(os.path.join(self.temp_usable_dir, '1'), 'w') - self.assertRaises(Exception, Collection, self.temp_usable_dir) - - #TODO: def test_collection_unicode(self): - # pass - - def test_create_new_python_version(self): - coll = Collection(self.temp_usable_dir) - coll.create_new_python_version("'") - - ver = coll.version() - self.assertTrue(ver.script().source()) - - def test_create_new_sql_version(self): - coll = Collection(self.temp_usable_dir) - coll.create_new_sql_version("sqlite", "foo bar") - - ver = coll.version() - ver_up = ver.script('sqlite', 'upgrade') - ver_down = ver.script('sqlite', 'downgrade') - ver_up.source() - ver_down.source() - - def test_selection(self): - """Verify right sql script is selected""" - - # Create empty directory. - path = self.tmp_repos() - os.mkdir(path) - - # Create files -- files must be present or you'll get an exception later. - python_file = '001_initial_.py' - sqlite_upgrade_file = '001_sqlite_upgrade.sql' - default_upgrade_file = '001_default_upgrade.sql' - for file_ in [sqlite_upgrade_file, default_upgrade_file, python_file]: - filepath = '%s/%s' % (path, file_) - open(filepath, 'w').close() - - ver = Version(1, path, [sqlite_upgrade_file]) - self.assertEqual(os.path.basename(ver.script('sqlite', 'upgrade').path), sqlite_upgrade_file) - - ver = Version(1, path, [default_upgrade_file]) - self.assertEqual(os.path.basename(ver.script('default', 'upgrade').path), default_upgrade_file) - - ver = Version(1, path, [sqlite_upgrade_file, default_upgrade_file]) - self.assertEqual(os.path.basename(ver.script('sqlite', 'upgrade').path), sqlite_upgrade_file) - - ver = Version(1, path, [sqlite_upgrade_file, default_upgrade_file, python_file]) - self.assertEqual(os.path.basename(ver.script('postgres', 'upgrade').path), default_upgrade_file) - - ver = Version(1, path, [sqlite_upgrade_file, python_file]) - self.assertEqual(os.path.basename(ver.script('postgres', 'upgrade').path), python_file) - - def test_bad_version(self): - ver = Version(1, self.temp_usable_dir, []) - self.assertRaises(ScriptError, ver.add_script, '123.sql') - - # tests bad ibm_db_sa filename - ver = Version(123, self.temp_usable_dir, []) - self.assertRaises(ScriptError, ver.add_script, - '123_ibm_db_sa_upgrade.sql') - - # tests that the name is ok but the script doesn't exist - self.assertRaises(InvalidScriptError, ver.add_script, - '123_test_ibm_db_sa_upgrade.sql') - - pyscript = os.path.join(self.temp_usable_dir, 'bla.py') - open(pyscript, 'w') - ver.add_script(pyscript) - self.assertRaises(ScriptError, ver.add_script, 'bla.py') |