diff options
Diffstat (limited to 'migrate/tests')
31 files changed, 0 insertions, 4252 deletions
diff --git a/migrate/tests/__init__.py b/migrate/tests/__init__.py deleted file mode 100644 index c03fbf4..0000000 --- a/migrate/tests/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# make this package available during imports as long as we support <python2.5 -import sys -import os -sys.path.append(os.path.dirname(os.path.abspath(__file__))) - - -from unittest import TestCase -import migrate -import six - - -class TestVersionDefined(TestCase): - def test_version(self): - """Test for migrate.__version__""" - self.assertTrue(isinstance(migrate.__version__, six.string_types)) - self.assertTrue(len(migrate.__version__) > 0) diff --git a/migrate/tests/changeset/__init__.py b/migrate/tests/changeset/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/migrate/tests/changeset/__init__.py +++ /dev/null diff --git a/migrate/tests/changeset/databases/__init__.py b/migrate/tests/changeset/databases/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/migrate/tests/changeset/databases/__init__.py +++ /dev/null diff --git a/migrate/tests/changeset/databases/test_ibmdb2.py b/migrate/tests/changeset/databases/test_ibmdb2.py deleted file mode 100644 index 4b3f983..0000000 --- a/migrate/tests/changeset/databases/test_ibmdb2.py +++ /dev/null @@ -1,32 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import mock - -import six - -from migrate.changeset.databases import ibmdb2 -from migrate.tests import fixture - - -class TestIBMDBDialect(fixture.Base): - """ - Test class for ibmdb2 dialect unit tests which do not require - a live backend database connection. - """ - - def test_is_unique_constraint_with_null_cols_supported(self): - test_values = { - '10.1': False, - '10.4.99': False, - '10.5': True, - '10.5.1': True - } - for version, supported in six.iteritems(test_values): - mock_dialect = mock.MagicMock() - mock_dialect.dbms_ver = version - self.assertEqual( - supported, - ibmdb2.is_unique_constraint_with_null_columns_supported( - mock_dialect), - 'Assertion failed on version: %s' % version) diff --git a/migrate/tests/changeset/test_changeset.py b/migrate/tests/changeset/test_changeset.py deleted file mode 100644 index c870c52..0000000 --- a/migrate/tests/changeset/test_changeset.py +++ /dev/null @@ -1,976 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -import sqlalchemy -import warnings - -from sqlalchemy import * - -from migrate import changeset, exceptions -from migrate.changeset import * -from migrate.changeset import constraint -from migrate.changeset.schema import ColumnDelta -from migrate.tests import fixture -from migrate.tests.fixture.warnings import catch_warnings -import six - -class TestAddDropColumn(fixture.DB): - """Test add/drop column through all possible interfaces - also test for constraints - """ - level = fixture.DB.CONNECT - table_name = 'tmp_adddropcol' - table_name_idx = 'tmp_adddropcol_idx' - table_int = 0 - - def _setup(self, url): - super(TestAddDropColumn, self)._setup(url) - self.meta = MetaData() - self.table = Table(self.table_name, self.meta, - Column('id', Integer, unique=True), - ) - self.table_idx = Table( - self.table_name_idx, - self.meta, - Column('id', Integer, primary_key=True), - Column('a', Integer), - Column('b', Integer), - Index('test_idx', 'a', 'b') - ) - self.meta.bind = self.engine - if self.engine.has_table(self.table.name): - self.table.drop() - if self.engine.has_table(self.table_idx.name): - self.table_idx.drop() - self.table.create() - self.table_idx.create() - - def _teardown(self): - if self.engine.has_table(self.table.name): - self.table.drop() - if self.engine.has_table(self.table_idx.name): - self.table_idx.drop() - self.meta.clear() - super(TestAddDropColumn,self)._teardown() - - def run_(self, create_column_func, drop_column_func, *col_p, **col_k): - col_name = 'data' - - def assert_numcols(num_of_expected_cols): - # number of cols should be correct in table object and in database - self.refresh_table(self.table_name) - result = len(self.table.c) - - self.assertEqual(result, num_of_expected_cols), - if col_k.get('primary_key', None): - # new primary key: check its length too - result = len(self.table.primary_key) - self.assertEqual(result, num_of_expected_cols) - - # we have 1 columns and there is no data column - assert_numcols(1) - self.assertTrue(getattr(self.table.c, 'data', None) is None) - if len(col_p) == 0: - col_p = [String(40)] - col = Column(col_name, *col_p, **col_k) - create_column_func(col) - assert_numcols(2) - # data column exists - self.assertTrue(self.table.c.data.type.length, 40) - - col2 = self.table.c.data - drop_column_func(col2) - assert_numcols(1) - - @fixture.usedb() - def test_undefined(self): - """Add/drop columns not yet defined in the table""" - def add_func(col): - return create_column(col, self.table) - def drop_func(col): - return drop_column(col, self.table) - return self.run_(add_func, drop_func) - - @fixture.usedb() - def test_defined(self): - """Add/drop columns already defined in the table""" - def add_func(col): - self.meta.clear() - self.table = Table(self.table_name, self.meta, - Column('id', Integer, primary_key=True), - col, - ) - return create_column(col) - def drop_func(col): - return drop_column(col) - return self.run_(add_func, drop_func) - - @fixture.usedb() - def test_method_bound(self): - """Add/drop columns via column methods; columns bound to a table - ie. no table parameter passed to function - """ - def add_func(col): - self.assertTrue(col.table is None, col.table) - self.table.append_column(col) - return col.create() - def drop_func(col): - #self.assertTrue(col.table is None,col.table) - #self.table.append_column(col) - return col.drop() - return self.run_(add_func, drop_func) - - @fixture.usedb() - def test_method_notbound(self): - """Add/drop columns via column methods; columns not bound to a table""" - def add_func(col): - return col.create(self.table) - def drop_func(col): - return col.drop(self.table) - return self.run_(add_func, drop_func) - - @fixture.usedb() - def test_tablemethod_obj(self): - """Add/drop columns via table methods; by column object""" - def add_func(col): - return self.table.create_column(col) - def drop_func(col): - return self.table.drop_column(col) - return self.run_(add_func, drop_func) - - @fixture.usedb() - def test_tablemethod_name(self): - """Add/drop columns via table methods; by column name""" - def add_func(col): - # must be bound to table - self.table.append_column(col) - return self.table.create_column(col.name) - def drop_func(col): - # Not necessarily bound to table - return self.table.drop_column(col.name) - return self.run_(add_func, drop_func) - - @fixture.usedb() - def test_byname(self): - """Add/drop columns via functions; by table object and column name""" - def add_func(col): - self.table.append_column(col) - return create_column(col.name, self.table) - def drop_func(col): - return drop_column(col.name, self.table) - return self.run_(add_func, drop_func) - - @fixture.usedb() - def test_drop_column_not_in_table(self): - """Drop column by name""" - def add_func(col): - return self.table.create_column(col) - def drop_func(col): - if SQLA_07: - self.table._columns.remove(col) - else: - self.table.c.remove(col) - return self.table.drop_column(col.name) - self.run_(add_func, drop_func) - - @fixture.usedb() - def test_fk(self): - """Can create columns with foreign keys""" - # create FK's target - reftable = Table('tmp_ref', self.meta, - Column('id', Integer, primary_key=True), - ) - if self.engine.has_table(reftable.name): - reftable.drop() - reftable.create() - - # create column with fk - col = Column('data', Integer, ForeignKey(reftable.c.id, name='testfk')) - col.create(self.table) - - # check if constraint is added - for cons in self.table.constraints: - if isinstance(cons, sqlalchemy.schema.ForeignKeyConstraint): - break - else: - self.fail('No constraint found') - - # TODO: test on db level if constraints work - - if SQLA_07: - self.assertEqual(reftable.c.id.name, - list(col.foreign_keys)[0].column.name) - else: - self.assertEqual(reftable.c.id.name, - col.foreign_keys[0].column.name) - - if self.engine.name == 'mysql': - constraint.ForeignKeyConstraint([self.table.c.data], - [reftable.c.id], - name='testfk').drop() - col.drop(self.table) - - if self.engine.has_table(reftable.name): - reftable.drop() - - @fixture.usedb(not_supported='sqlite') - def test_pk(self): - """Can create columns with primary key""" - col = Column('data', Integer, nullable=False) - self.assertRaises(exceptions.InvalidConstraintError, - col.create, self.table, primary_key_name=True) - col.create(self.table, primary_key_name='data_pkey') - - # check if constraint was added (cannot test on objects) - self.table.insert(values={'data': 4}).execute() - try: - self.table.insert(values={'data': 4}).execute() - except (sqlalchemy.exc.IntegrityError, - sqlalchemy.exc.ProgrammingError): - pass - else: - self.fail() - - col.drop() - - @fixture.usedb(not_supported=['mysql']) - def test_check(self): - """Can create columns with check constraint""" - col = Column('foo', - Integer, - sqlalchemy.schema.CheckConstraint('foo > 4')) - col.create(self.table) - - # check if constraint was added (cannot test on objects) - self.table.insert(values={'foo': 5}).execute() - try: - self.table.insert(values={'foo': 3}).execute() - except (sqlalchemy.exc.IntegrityError, - sqlalchemy.exc.ProgrammingError): - pass - else: - self.fail() - - col.drop() - - @fixture.usedb() - def test_unique_constraint(self): - self.assertRaises(exceptions.InvalidConstraintError, - Column('data', Integer, unique=True).create, self.table) - - col = Column('data', Integer) - col.create(self.table, unique_name='data_unique') - - # check if constraint was added (cannot test on objects) - self.table.insert(values={'data': 5}).execute() - try: - self.table.insert(values={'data': 5}).execute() - except (sqlalchemy.exc.IntegrityError, - sqlalchemy.exc.ProgrammingError): - pass - else: - self.fail() - - col.drop(self.table) - -# TODO: remove already attached columns with uniques, pks, fks .. - @fixture.usedb(not_supported=['ibm_db_sa', 'postgresql']) - def test_drop_column_of_composite_index(self): - # NOTE(rpodolyaka): postgresql automatically drops a composite index - # if one of its columns is dropped - # NOTE(mriedem): DB2 does the same. - self.table_idx.c.b.drop() - - reflected = Table(self.table_idx.name, MetaData(), autoload=True, - autoload_with=self.engine) - index = next(iter(reflected.indexes)) - self.assertEquals(['a'], [c.name for c in index.columns]) - - @fixture.usedb() - def test_drop_all_columns_of_composite_index(self): - self.table_idx.c.a.drop() - self.table_idx.c.b.drop() - - reflected = Table(self.table_idx.name, MetaData(), autoload=True, - autoload_with=self.engine) - self.assertEquals(0, len(reflected.indexes)) - - def _check_index(self,expected): - if 'mysql' in self.engine.name or 'postgres' in self.engine.name: - for index in tuple( - Table(self.table.name, MetaData(), - autoload=True, autoload_with=self.engine).indexes - ): - if index.name=='ix_data': - break - self.assertEqual(expected,index.unique) - - @fixture.usedb() - def test_index(self): - col = Column('data', Integer) - col.create(self.table, index_name='ix_data') - - self._check_index(False) - - col.drop() - - @fixture.usedb() - def test_index_unique(self): - # shows how to create a unique index - col = Column('data', Integer) - col.create(self.table) - Index('ix_data', col, unique=True).create(bind=self.engine) - - # check if index was added - self.table.insert(values={'data': 5}).execute() - try: - self.table.insert(values={'data': 5}).execute() - except (sqlalchemy.exc.IntegrityError, - sqlalchemy.exc.ProgrammingError): - pass - else: - self.fail() - - self._check_index(True) - - col.drop() - - @fixture.usedb() - def test_server_defaults(self): - """Can create columns with server_default values""" - col = Column('data', String(244), server_default='foobar') - col.create(self.table) - - self.table.insert(values={'id': 10}).execute() - row = self._select_row() - self.assertEqual(u'foobar', row['data']) - - col.drop() - - @fixture.usedb() - def test_populate_default(self): - """Test populate_default=True""" - def default(): - return 'foobar' - col = Column('data', String(244), default=default) - col.create(self.table, populate_default=True) - - self.table.insert(values={'id': 10}).execute() - row = self._select_row() - self.assertEqual(u'foobar', row['data']) - - col.drop() - - # TODO: test sequence - # TODO: test quoting - # TODO: test non-autoname constraints - - @fixture.usedb() - def test_drop_doesnt_delete_other_indexes(self): - # add two indexed columns - self.table.drop() - self.meta.clear() - self.table = Table( - self.table_name, self.meta, - Column('id', Integer, primary_key=True), - Column('d1', String(10), index=True), - Column('d2', String(10), index=True), - ) - self.table.create() - - # paranoid check - self.refresh_table() - self.assertEqual( - sorted([i.name for i in self.table.indexes]), - [u'ix_tmp_adddropcol_d1', u'ix_tmp_adddropcol_d2'] - ) - - # delete one - self.table.c.d2.drop() - - # ensure the other index is still there - self.refresh_table() - self.assertEqual( - sorted([i.name for i in self.table.indexes]), - [u'ix_tmp_adddropcol_d1'] - ) - - def _actual_foreign_keys(self): - from sqlalchemy.schema import ForeignKeyConstraint - result = [] - for cons in self.table.constraints: - if isinstance(cons,ForeignKeyConstraint): - col_names = [] - for col_name in cons.columns: - if not isinstance(col_name,six.string_types): - col_name = col_name.name - col_names.append(col_name) - result.append(col_names) - result.sort() - return result - - @fixture.usedb() - def test_drop_with_foreign_keys(self): - self.table.drop() - self.meta.clear() - - # create FK's target - reftable = Table('tmp_ref', self.meta, - Column('id', Integer, primary_key=True), - ) - if self.engine.has_table(reftable.name): - reftable.drop() - reftable.create() - - # add a table with two foreign key columns - self.table = Table( - self.table_name, self.meta, - Column('id', Integer, primary_key=True), - Column('r1', Integer, ForeignKey('tmp_ref.id', name='test_fk1')), - Column('r2', Integer, ForeignKey('tmp_ref.id', name='test_fk2')), - ) - self.table.create() - - # paranoid check - self.assertEqual([['r1'],['r2']], - self._actual_foreign_keys()) - - # delete one - if self.engine.name == 'mysql': - constraint.ForeignKeyConstraint([self.table.c.r2], [reftable.c.id], - name='test_fk2').drop() - self.table.c.r2.drop() - - # check remaining foreign key is there - self.assertEqual([['r1']], - self._actual_foreign_keys()) - - @fixture.usedb() - def test_drop_with_complex_foreign_keys(self): - from sqlalchemy.schema import ForeignKeyConstraint - from sqlalchemy.schema import UniqueConstraint - - self.table.drop() - self.meta.clear() - - # NOTE(mriedem): DB2 does not currently support unique constraints - # on nullable columns, so the columns that are used to create the - # foreign keys here need to be non-nullable for testing with DB2 - # to work. - - # create FK's target - reftable = Table('tmp_ref', self.meta, - Column('id', Integer, primary_key=True), - Column('jd', Integer, nullable=False), - UniqueConstraint('id','jd') - ) - if self.engine.has_table(reftable.name): - reftable.drop() - reftable.create() - - # add a table with a complex foreign key constraint - self.table = Table( - self.table_name, self.meta, - Column('id', Integer, primary_key=True), - Column('r1', Integer, nullable=False), - Column('r2', Integer, nullable=False), - ForeignKeyConstraint(['r1','r2'], - [reftable.c.id,reftable.c.jd], - name='test_fk') - ) - self.table.create() - - # paranoid check - self.assertEqual([['r1','r2']], - self._actual_foreign_keys()) - - # delete one - if self.engine.name == 'mysql': - constraint.ForeignKeyConstraint([self.table.c.r1, self.table.c.r2], - [reftable.c.id, reftable.c.jd], - name='test_fk').drop() - self.table.c.r2.drop() - - # check the constraint is gone, since part of it - # is no longer there - if people hit this, - # they may be confused, maybe we should raise an error - # and insist that the constraint is deleted first, separately? - self.assertEqual([], - self._actual_foreign_keys()) - -class TestRename(fixture.DB): - """Tests for table and index rename methods""" - level = fixture.DB.CONNECT - meta = MetaData() - - def _setup(self, url): - super(TestRename, self)._setup(url) - self.meta.bind = self.engine - - @fixture.usedb(not_supported='firebird') - def test_rename_table(self): - """Tables can be renamed""" - c_name = 'col_1' - table_name1 = 'name_one' - table_name2 = 'name_two' - index_name1 = 'x' + table_name1 - index_name2 = 'x' + table_name2 - - self.meta.clear() - self.column = Column(c_name, Integer) - self.table = Table(table_name1, self.meta, self.column) - self.index = Index(index_name1, self.column, unique=False) - - if self.engine.has_table(self.table.name): - self.table.drop() - if self.engine.has_table(table_name2): - tmp = Table(table_name2, self.meta, autoload=True) - tmp.drop() - tmp.deregister() - del tmp - self.table.create() - - def assert_table_name(expected, skip_object_check=False): - """Refresh a table via autoload - SA has changed some since this test was written; we now need to do - meta.clear() upon reloading a table - clear all rather than a - select few. So, this works only if we're working with one table at - a time (else, others will vanish too). - """ - if not skip_object_check: - # Table object check - self.assertEqual(self.table.name,expected) - newname = self.table.name - else: - # we know the object's name isn't consistent: just assign it - newname = expected - # Table DB check - self.meta.clear() - self.table = Table(newname, self.meta, autoload=True) - self.assertEqual(self.table.name, expected) - - def assert_index_name(expected, skip_object_check=False): - if not skip_object_check: - # Index object check - self.assertEqual(self.index.name, expected) - else: - # object is inconsistent - self.index.name = expected - # TODO: Index DB check - - def add_table_to_meta(name): - # trigger the case where table_name2 needs to be - # removed from the metadata in ChangesetTable.deregister() - tmp = Table(name, self.meta, Column(c_name, Integer)) - tmp.create() - tmp.drop() - - try: - # Table renames - assert_table_name(table_name1) - add_table_to_meta(table_name2) - rename_table(self.table, table_name2) - assert_table_name(table_name2) - self.table.rename(table_name1) - assert_table_name(table_name1) - - # test by just the string - rename_table(table_name1, table_name2, engine=self.engine) - assert_table_name(table_name2, True) # object not updated - - # Index renames - if self.url.startswith('sqlite') or self.url.startswith('mysql'): - self.assertRaises(exceptions.NotSupportedError, - self.index.rename, index_name2) - else: - assert_index_name(index_name1) - rename_index(self.index, index_name2, engine=self.engine) - assert_index_name(index_name2) - self.index.rename(index_name1) - assert_index_name(index_name1) - - # test by just the string - rename_index(index_name1, index_name2, engine=self.engine) - assert_index_name(index_name2, True) - - finally: - if self.table.exists(): - self.table.drop() - - -class TestColumnChange(fixture.DB): - level = fixture.DB.CONNECT - table_name = 'tmp_colchange' - - def _setup(self, url): - super(TestColumnChange, self)._setup(url) - self.meta = MetaData(self.engine) - self.table = Table(self.table_name, self.meta, - Column('id', Integer, primary_key=True), - Column('data', String(40), server_default=DefaultClause("tluafed"), - nullable=True), - ) - if self.table.exists(): - self.table.drop() - try: - self.table.create() - except sqlalchemy.exc.SQLError: - # SQLite: database schema has changed - if not self.url.startswith('sqlite://'): - raise - - def _teardown(self): - if self.table.exists(): - try: - self.table.drop(self.engine) - except sqlalchemy.exc.SQLError: - # SQLite: database schema has changed - if not self.url.startswith('sqlite://'): - raise - super(TestColumnChange, self)._teardown() - - @fixture.usedb() - def test_rename(self): - """Can rename a column""" - def num_rows(col, content): - return len(list(self.table.select(col == content).execute())) - # Table content should be preserved in changed columns - content = "fgsfds" - self.engine.execute(self.table.insert(), data=content, id=42) - self.assertEqual(num_rows(self.table.c.data, content), 1) - - # ...as a function, given a column object and the new name - alter_column('data', name='data2', table=self.table) - self.refresh_table() - alter_column(self.table.c.data2, name='atad') - self.refresh_table(self.table.name) - self.assertTrue('data' not in self.table.c.keys()) - self.assertTrue('atad' in self.table.c.keys()) - self.assertEqual(num_rows(self.table.c.atad, content), 1) - - # ...as a method, given a new name - self.table.c.atad.alter(name='data') - self.refresh_table(self.table.name) - self.assertTrue('atad' not in self.table.c.keys()) - self.table.c.data # Should not raise exception - self.assertEqual(num_rows(self.table.c.data, content), 1) - - # ...as a function, given a new object - alter_column(self.table.c.data, - name = 'atad', type=String(40), - server_default=self.table.c.data.server_default) - self.refresh_table(self.table.name) - self.assertTrue('data' not in self.table.c.keys()) - self.table.c.atad # Should not raise exception - self.assertEqual(num_rows(self.table.c.atad, content), 1) - - # ...as a method, given a new object - self.table.c.atad.alter( - name='data',type=String(40), - server_default=self.table.c.atad.server_default - ) - self.refresh_table(self.table.name) - self.assertTrue('atad' not in self.table.c.keys()) - self.table.c.data # Should not raise exception - self.assertEqual(num_rows(self.table.c.data,content), 1) - - @fixture.usedb() - def test_type(self): - # Test we can change a column's type - - # Just the new type - self.table.c.data.alter(type=String(43)) - self.refresh_table(self.table.name) - self.assertTrue(isinstance(self.table.c.data.type, String)) - self.assertEqual(self.table.c.data.type.length, 43) - - # Different type - self.assertTrue(isinstance(self.table.c.id.type, Integer)) - self.assertEqual(self.table.c.id.nullable, False) - - # SQLAlchemy 1.1 adds a third state to "autoincrement" called - # "auto". - self.assertTrue(self.table.c.id.autoincrement in ('auto', True)) - - if not self.engine.name == 'firebird': - self.table.c.id.alter(type=String(20)) - self.assertEqual(self.table.c.id.nullable, False) - - # a rule makes sure that autoincrement is set to False - # when we change off of Integer - self.assertEqual(self.table.c.id.autoincrement, False) - self.refresh_table(self.table.name) - self.assertTrue(isinstance(self.table.c.id.type, String)) - - # note that after reflection, "autoincrement" is likely - # to change back to a database-generated value. Should be - # False or "auto". if True, it's a bug; at least one of these - # exists prior to SQLAlchemy 1.1.3 - - @fixture.usedb() - def test_default(self): - """Can change a column's server_default value (DefaultClauses only) - Only DefaultClauses are changed here: others are managed by the - application / by SA - """ - self.assertEqual(self.table.c.data.server_default.arg, 'tluafed') - - # Just the new default - default = 'my_default' - self.table.c.data.alter(server_default=DefaultClause(default)) - self.refresh_table(self.table.name) - #self.assertEqual(self.table.c.data.server_default.arg,default) - # TextClause returned by autoload - self.assertTrue(default in str(self.table.c.data.server_default.arg)) - self.engine.execute(self.table.insert(), id=12) - row = self._select_row() - self.assertEqual(row['data'], default) - - # Column object - default = 'your_default' - self.table.c.data.alter(type=String(40), server_default=DefaultClause(default)) - self.refresh_table(self.table.name) - self.assertTrue(default in str(self.table.c.data.server_default.arg)) - - # Drop/remove default - self.table.c.data.alter(server_default=None) - self.assertEqual(self.table.c.data.server_default, None) - - self.refresh_table(self.table.name) - # server_default isn't necessarily None for Oracle - #self.assertTrue(self.table.c.data.server_default is None,self.table.c.data.server_default) - self.engine.execute(self.table.insert(), id=11) - row = self.table.select(self.table.c.id == 11).execution_options(autocommit=True).execute().fetchone() - self.assertTrue(row['data'] is None, row['data']) - - @fixture.usedb(not_supported='firebird') - def test_null(self): - """Can change a column's null constraint""" - self.assertEqual(self.table.c.data.nullable, True) - - # Full column - self.table.c.data.alter(type=String(40), nullable=False) - self.table.nullable = None - self.refresh_table(self.table.name) - self.assertEqual(self.table.c.data.nullable, False) - - # Just the new status - self.table.c.data.alter(nullable=True) - self.refresh_table(self.table.name) - self.assertEqual(self.table.c.data.nullable, True) - - @fixture.usedb() - def test_alter_deprecated(self): - try: - # py 2.4 compatibility :-/ - cw = catch_warnings(record=True) - w = cw.__enter__() - - warnings.simplefilter("always") - self.table.c.data.alter(Column('data', String(100))) - - self.assertEqual(len(w),1) - self.assertTrue(issubclass(w[-1].category, - MigrateDeprecationWarning)) - self.assertEqual( - 'Passing a Column object to alter_column is deprecated. ' - 'Just pass in keyword parameters instead.', - str(w[-1].message)) - finally: - cw.__exit__() - - @fixture.usedb() - def test_alter_returns_delta(self): - """Test if alter constructs return delta""" - - delta = self.table.c.data.alter(type=String(100)) - self.assertTrue('type' in delta) - - @fixture.usedb() - def test_alter_all(self): - """Tests all alter changes at one time""" - # test for each db separately - # since currently some dont support everything - - # test pre settings - self.assertEqual(self.table.c.data.nullable, True) - self.assertEqual(self.table.c.data.server_default.arg, 'tluafed') - self.assertEqual(self.table.c.data.name, 'data') - self.assertTrue(isinstance(self.table.c.data.type, String)) - self.assertTrue(self.table.c.data.type.length, 40) - - kw = dict(nullable=False, - server_default='foobar', - name='data_new', - type=String(50)) - if self.engine.name == 'firebird': - del kw['nullable'] - self.table.c.data.alter(**kw) - - # test altered objects - self.assertEqual(self.table.c.data.server_default.arg, 'foobar') - if not self.engine.name == 'firebird': - self.assertEqual(self.table.c.data.nullable, False) - self.assertEqual(self.table.c.data.name, 'data_new') - self.assertEqual(self.table.c.data.type.length, 50) - - self.refresh_table(self.table.name) - - # test post settings - if not self.engine.name == 'firebird': - self.assertEqual(self.table.c.data_new.nullable, False) - self.assertEqual(self.table.c.data_new.name, 'data_new') - self.assertTrue(isinstance(self.table.c.data_new.type, String)) - self.assertTrue(self.table.c.data_new.type.length, 50) - - # insert data and assert default - self.table.insert(values={'id': 10}).execute() - row = self._select_row() - self.assertEqual(u'foobar', row['data_new']) - - -class TestColumnDelta(fixture.DB): - """Tests ColumnDelta class""" - - level = fixture.DB.CONNECT - table_name = 'tmp_coldelta' - table_int = 0 - - def _setup(self, url): - super(TestColumnDelta, self)._setup(url) - self.meta = MetaData() - self.table = Table(self.table_name, self.meta, - Column('ids', String(10)), - ) - self.meta.bind = self.engine - if self.engine.has_table(self.table.name): - self.table.drop() - self.table.create() - - def _teardown(self): - if self.engine.has_table(self.table.name): - self.table.drop() - self.meta.clear() - super(TestColumnDelta,self)._teardown() - - def mkcol(self, name='id', type=String, *p, **k): - return Column(name, type, *p, **k) - - def verify(self, expected, original, *p, **k): - self.delta = ColumnDelta(original, *p, **k) - result = list(self.delta.keys()) - result.sort() - self.assertEqual(expected, result) - return self.delta - - def test_deltas_two_columns(self): - """Testing ColumnDelta with two columns""" - col_orig = self.mkcol(primary_key=True) - col_new = self.mkcol(name='ids', primary_key=True) - self.verify([], col_orig, col_orig) - self.verify(['name'], col_orig, col_orig, 'ids') - self.verify(['name'], col_orig, col_orig, name='ids') - self.verify(['name'], col_orig, col_new) - self.verify(['name', 'type'], col_orig, col_new, type=String) - - # Type comparisons - self.verify([], self.mkcol(type=String), self.mkcol(type=String)) - self.verify(['type'], self.mkcol(type=String), self.mkcol(type=Integer)) - self.verify(['type'], self.mkcol(type=String), self.mkcol(type=String(42))) - self.verify([], self.mkcol(type=String(42)), self.mkcol(type=String(42))) - self.verify(['type'], self.mkcol(type=String(24)), self.mkcol(type=String(42))) - self.verify(['type'], self.mkcol(type=String(24)), self.mkcol(type=Text(24))) - - # Other comparisons - self.verify(['primary_key'], self.mkcol(nullable=False), self.mkcol(primary_key=True)) - - # PK implies nullable=False - self.verify(['nullable', 'primary_key'], self.mkcol(nullable=True), self.mkcol(primary_key=True)) - self.verify([], self.mkcol(primary_key=True), self.mkcol(primary_key=True)) - self.verify(['nullable'], self.mkcol(nullable=True), self.mkcol(nullable=False)) - self.verify([], self.mkcol(nullable=True), self.mkcol(nullable=True)) - self.verify([], self.mkcol(server_default=None), self.mkcol(server_default=None)) - self.verify([], self.mkcol(server_default='42'), self.mkcol(server_default='42')) - - # test server default - delta = self.verify(['server_default'], self.mkcol(), self.mkcol('id', String, DefaultClause('foobar'))) - self.assertEqual(delta['server_default'].arg, 'foobar') - - self.verify([], self.mkcol(server_default='foobar'), self.mkcol('id', String, DefaultClause('foobar'))) - self.verify(['type'], self.mkcol(server_default='foobar'), self.mkcol('id', Text, DefaultClause('foobar'))) - - col = self.mkcol(server_default='foobar') - self.verify(['type'], col, self.mkcol('id', Text, DefaultClause('foobar')), alter_metadata=True) - self.assertTrue(isinstance(col.type, Text)) - - col = self.mkcol() - self.verify(['name', 'server_default', 'type'], col, self.mkcol('beep', Text, DefaultClause('foobar')), - alter_metadata=True) - self.assertTrue(isinstance(col.type, Text)) - self.assertEqual(col.name, 'beep') - self.assertEqual(col.server_default.arg, 'foobar') - - @fixture.usedb() - def test_deltas_zero_columns(self): - """Testing ColumnDelta with zero columns""" - - self.verify(['name'], 'ids', table=self.table, name='hey') - - # test reflection - self.verify(['type'], 'ids', table=self.table.name, type=String(80), engine=self.engine) - self.verify(['type'], 'ids', table=self.table.name, type=String(80), metadata=self.meta) - - self.meta.clear() - delta = self.verify(['type'], 'ids', table=self.table.name, type=String(80), metadata=self.meta, - alter_metadata=True) - self.assertTrue(self.table.name in self.meta) - self.assertEqual(delta.result_column.type.length, 80) - self.assertEqual(self.meta.tables.get(self.table.name).c.ids.type.length, 80) - - # test defaults - self.meta.clear() - self.verify(['server_default'], 'ids', table=self.table.name, server_default='foobar', - metadata=self.meta, - alter_metadata=True) - self.meta.tables.get(self.table.name).c.ids.server_default.arg == 'foobar' - - # test missing parameters - self.assertRaises(ValueError, ColumnDelta, table=self.table.name) - self.assertRaises(ValueError, ColumnDelta, 'ids', table=self.table.name, alter_metadata=True) - self.assertRaises(ValueError, ColumnDelta, 'ids', table=self.table.name, alter_metadata=False) - - def test_deltas_one_column(self): - """Testing ColumnDelta with one column""" - col_orig = self.mkcol(primary_key=True) - - self.verify([], col_orig) - self.verify(['name'], col_orig, 'ids') - # Parameters are always executed, even if they're 'unchanged' - # (We can't assume given column is up-to-date) - self.verify(['name', 'primary_key', 'type'], col_orig, 'id', Integer, primary_key=True) - self.verify(['name', 'primary_key', 'type'], col_orig, name='id', type=Integer, primary_key=True) - - # Change name, given an up-to-date definition and the current name - delta = self.verify(['name'], col_orig, name='blah') - self.assertEqual(delta.get('name'), 'blah') - self.assertEqual(delta.current_name, 'id') - - col_orig = self.mkcol(primary_key=True) - self.verify(['name', 'type'], col_orig, name='id12', type=Text, alter_metadata=True) - self.assertTrue(isinstance(col_orig.type, Text)) - self.assertEqual(col_orig.name, 'id12') - - # test server default - col_orig = self.mkcol(primary_key=True) - delta = self.verify(['server_default'], col_orig, DefaultClause('foobar')) - self.assertEqual(delta['server_default'].arg, 'foobar') - - delta = self.verify(['server_default'], col_orig, server_default=DefaultClause('foobar')) - self.assertEqual(delta['server_default'].arg, 'foobar') - - # no change - col_orig = self.mkcol(server_default=DefaultClause('foobar')) - delta = self.verify(['type'], col_orig, DefaultClause('foobar'), type=PickleType) - self.assertTrue(isinstance(delta.result_column.type, PickleType)) - - # TODO: test server on update - # TODO: test bind metadata diff --git a/migrate/tests/changeset/test_constraint.py b/migrate/tests/changeset/test_constraint.py deleted file mode 100644 index 325b3c0..0000000 --- a/migrate/tests/changeset/test_constraint.py +++ /dev/null @@ -1,299 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from sqlalchemy import * -from sqlalchemy.util import * -from sqlalchemy.exc import * - -from migrate.changeset.util import fk_column_names -from migrate.exceptions import * -from migrate.changeset import * - -from migrate.tests import fixture - - -class CommonTestConstraint(fixture.DB): - """helper functions to test constraints. - - we just create a fresh new table and make sure everything is - as required. - """ - - def _setup(self, url): - super(CommonTestConstraint, self)._setup(url) - self._create_table() - - def _teardown(self): - if hasattr(self, 'table') and self.engine.has_table(self.table.name): - self.table.drop() - super(CommonTestConstraint, self)._teardown() - - def _create_table(self): - self._connect(self.url) - self.meta = MetaData(self.engine) - self.tablename = 'mytable' - self.table = Table(self.tablename, self.meta, - Column(u'id', Integer, nullable=False), - Column(u'fkey', Integer, nullable=False), - mysql_engine='InnoDB') - if self.engine.has_table(self.table.name): - self.table.drop() - self.table.create() - - # make sure we start at zero - self.assertEqual(len(self.table.primary_key), 0) - self.assertTrue(isinstance(self.table.primary_key, - schema.PrimaryKeyConstraint), self.table.primary_key.__class__) - - -class TestConstraint(CommonTestConstraint): - level = fixture.DB.CONNECT - - def _define_pk(self, *cols): - # Add a pk by creating a PK constraint - if (self.engine.name in ('oracle', 'firebird')): - # Can't drop Oracle PKs without an explicit name - pk = PrimaryKeyConstraint(table=self.table, name='temp_pk_key', *cols) - else: - pk = PrimaryKeyConstraint(table=self.table, *cols) - self.compare_columns_equal(pk.columns, cols) - pk.create() - self.refresh_table() - if not self.url.startswith('sqlite'): - self.compare_columns_equal(self.table.primary_key, cols, ['type', 'autoincrement']) - - # Drop the PK constraint - #if (self.engine.name in ('oracle', 'firebird')): - # # Apparently Oracle PK names aren't introspected - # pk.name = self.table.primary_key.name - pk.drop() - self.refresh_table() - self.assertEqual(len(self.table.primary_key), 0) - self.assertTrue(isinstance(self.table.primary_key, schema.PrimaryKeyConstraint)) - return pk - - @fixture.usedb() - def test_define_fk(self): - """FK constraints can be defined, created, and dropped""" - # FK target must be unique - pk = PrimaryKeyConstraint(self.table.c.id, table=self.table, name="pkid") - pk.create() - - # Add a FK by creating a FK constraint - if SQLA_07: - self.assertEqual(list(self.table.c.fkey.foreign_keys), []) - else: - self.assertEqual(self.table.c.fkey.foreign_keys._list, []) - fk = ForeignKeyConstraint([self.table.c.fkey], - [self.table.c.id], - name="fk_id_fkey", - ondelete="CASCADE") - if SQLA_07: - self.assertTrue(list(self.table.c.fkey.foreign_keys) is not []) - else: - self.assertTrue(self.table.c.fkey.foreign_keys._list is not []) - for key in fk_column_names(fk): - self.assertEqual(key, self.table.c.fkey.name) - self.assertEqual([e.column for e in fk.elements], [self.table.c.id]) - self.assertEqual(list(fk.referenced), [self.table.c.id]) - - if self.url.startswith('mysql'): - # MySQL FKs need an index - index = Index('index_name', self.table.c.fkey) - index.create() - fk.create() - - # test for ondelete/onupdate - if SQLA_07: - fkey = list(self.table.c.fkey.foreign_keys)[0] - else: - fkey = self.table.c.fkey.foreign_keys._list[0] - self.assertEqual(fkey.ondelete, "CASCADE") - # TODO: test on real db if it was set - - self.refresh_table() - if SQLA_07: - self.assertTrue(list(self.table.c.fkey.foreign_keys) is not []) - else: - self.assertTrue(self.table.c.fkey.foreign_keys._list is not []) - - fk.drop() - self.refresh_table() - if SQLA_07: - self.assertEqual(list(self.table.c.fkey.foreign_keys), []) - else: - self.assertEqual(self.table.c.fkey.foreign_keys._list, []) - - @fixture.usedb() - def test_define_pk(self): - """PK constraints can be defined, created, and dropped""" - self._define_pk(self.table.c.fkey) - - @fixture.usedb() - def test_define_pk_multi(self): - """Multicolumn PK constraints can be defined, created, and dropped""" - self._define_pk(self.table.c.id, self.table.c.fkey) - - @fixture.usedb(not_supported=['firebird']) - def test_drop_cascade(self): - """Drop constraint cascaded""" - pk = PrimaryKeyConstraint('fkey', table=self.table, name="id_pkey") - pk.create() - self.refresh_table() - - # Drop the PK constraint forcing cascade - pk.drop(cascade=True) - - # TODO: add real assertion if it was added - - @fixture.usedb(supported=['mysql']) - def test_fail_mysql_check_constraints(self): - """Check constraints raise NotSupported for mysql on drop""" - cons = CheckConstraint('id > 3', name="id_check", table=self.table) - cons.create() - self.refresh_table() - - try: - cons.drop() - except NotSupportedError: - pass - else: - self.fail() - - @fixture.usedb(not_supported=['sqlite', 'mysql']) - def test_named_check_constraints(self): - """Check constraints can be defined, created, and dropped""" - self.assertRaises(InvalidConstraintError, CheckConstraint, 'id > 3') - cons = CheckConstraint('id > 3', name="id_check", table=self.table) - cons.create() - self.refresh_table() - - self.table.insert(values={'id': 4, 'fkey': 1}).execute() - try: - self.table.insert(values={'id': 1, 'fkey': 1}).execute() - except (IntegrityError, ProgrammingError): - pass - else: - self.fail() - - # Remove the name, drop the constraint; it should succeed - cons.drop() - self.refresh_table() - self.table.insert(values={'id': 2, 'fkey': 2}).execute() - self.table.insert(values={'id': 1, 'fkey': 2}).execute() - - -class TestAutoname(CommonTestConstraint): - """Every method tests for a type of constraint wether it can autoname - itself and if you can pass object instance and names to classes. - """ - level = fixture.DB.CONNECT - - @fixture.usedb(not_supported=['oracle', 'firebird']) - def test_autoname_pk(self): - """PrimaryKeyConstraints can guess their name if None is given""" - # Don't supply a name; it should create one - cons = PrimaryKeyConstraint(self.table.c.id) - cons.create() - self.refresh_table() - if not self.url.startswith('sqlite'): - # TODO: test for index for sqlite - self.compare_columns_equal(cons.columns, self.table.primary_key, ['autoincrement', 'type']) - - # Remove the name, drop the constraint; it should succeed - cons.name = None - cons.drop() - self.refresh_table() - self.assertEqual(list(), list(self.table.primary_key)) - - # test string names - cons = PrimaryKeyConstraint('id', table=self.table) - cons.create() - self.refresh_table() - if not self.url.startswith('sqlite'): - # TODO: test for index for sqlite - self.compare_columns_equal(cons.columns, self.table.primary_key) - cons.name = None - cons.drop() - - @fixture.usedb(not_supported=['oracle', 'sqlite', 'firebird']) - def test_autoname_fk(self): - """ForeignKeyConstraints can guess their name if None is given""" - cons = PrimaryKeyConstraint(self.table.c.id) - cons.create() - - cons = ForeignKeyConstraint([self.table.c.fkey], [self.table.c.id]) - cons.create() - self.refresh_table() - if SQLA_07: - list(self.table.c.fkey.foreign_keys)[0].column is self.table.c.id - else: - self.table.c.fkey.foreign_keys[0].column is self.table.c.id - - # Remove the name, drop the constraint; it should succeed - cons.name = None - cons.drop() - self.refresh_table() - if SQLA_07: - self.assertEqual(list(self.table.c.fkey.foreign_keys), list()) - else: - self.assertEqual(self.table.c.fkey.foreign_keys._list, list()) - - # test string names - cons = ForeignKeyConstraint(['fkey'], ['%s.id' % self.tablename], table=self.table) - cons.create() - self.refresh_table() - if SQLA_07: - list(self.table.c.fkey.foreign_keys)[0].column is self.table.c.id - else: - self.table.c.fkey.foreign_keys[0].column is self.table.c.id - - # Remove the name, drop the constraint; it should succeed - cons.name = None - cons.drop() - - @fixture.usedb(not_supported=['oracle', 'sqlite', 'mysql']) - def test_autoname_check(self): - """CheckConstraints can guess their name if None is given""" - cons = CheckConstraint('id > 3', columns=[self.table.c.id]) - cons.create() - self.refresh_table() - - if not self.engine.name == 'mysql': - self.table.insert(values={'id': 4, 'fkey': 1}).execute() - try: - self.table.insert(values={'id': 1, 'fkey': 2}).execute() - except (IntegrityError, ProgrammingError): - pass - else: - self.fail() - - # Remove the name, drop the constraint; it should succeed - cons.name = None - cons.drop() - self.refresh_table() - self.table.insert(values={'id': 2, 'fkey': 2}).execute() - self.table.insert(values={'id': 1, 'fkey': 3}).execute() - - @fixture.usedb(not_supported=['oracle']) - def test_autoname_unique(self): - """UniqueConstraints can guess their name if None is given""" - cons = UniqueConstraint(self.table.c.fkey) - cons.create() - self.refresh_table() - - self.table.insert(values={'fkey': 4, 'id': 1}).execute() - try: - self.table.insert(values={'fkey': 4, 'id': 2}).execute() - except (sqlalchemy.exc.IntegrityError, - sqlalchemy.exc.ProgrammingError): - pass - else: - self.fail() - - # Remove the name, drop the constraint; it should succeed - cons.name = None - cons.drop() - self.refresh_table() - self.table.insert(values={'fkey': 4, 'id': 2}).execute() - self.table.insert(values={'fkey': 4, 'id': 1}).execute() diff --git a/migrate/tests/fixture/__init__.py b/migrate/tests/fixture/__init__.py deleted file mode 100644 index 6b8bc48..0000000 --- a/migrate/tests/fixture/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import testtools - -def main(imports=None): - if imports: - global suite - suite = suite(imports) - defaultTest='fixture.suite' - else: - defaultTest=None - return testtools.TestProgram(defaultTest=defaultTest) - -from .base import Base -from .pathed import Pathed -from .shell import Shell -from .database import DB,usedb diff --git a/migrate/tests/fixture/base.py b/migrate/tests/fixture/base.py deleted file mode 100644 index 38c91af..0000000 --- a/migrate/tests/fixture/base.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import re -import testtools - -class Base(testtools.TestCase): - - def assertEqualIgnoreWhitespace(self, v1, v2): - """Compares two strings that should be\ - identical except for whitespace - """ - def strip_whitespace(s): - return re.sub(r'\s', '', s) - - line1 = strip_whitespace(v1) - line2 = strip_whitespace(v2) - - self.assertEqual(line1, line2, "%s != %s" % (v1, v2)) - - def ignoreErrors(self, func, *p,**k): - """Call a function, ignoring any exceptions""" - try: - func(*p,**k) - except: - pass diff --git a/migrate/tests/fixture/database.py b/migrate/tests/fixture/database.py deleted file mode 100644 index 93bd69b..0000000 --- a/migrate/tests/fixture/database.py +++ /dev/null @@ -1,203 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import logging -import sys - -import six -from decorator import decorator - -from sqlalchemy import create_engine, Table, MetaData -from sqlalchemy import exc as sa_exc -from sqlalchemy.orm import create_session -from sqlalchemy.pool import StaticPool - -from migrate.changeset.schema import ColumnDelta -from migrate.versioning.util import Memoize - -from migrate.tests.fixture.base import Base -from migrate.tests.fixture.pathed import Pathed - - -log = logging.getLogger(__name__) - -@Memoize -def readurls(): - """read URLs from config file return a list""" - # TODO: remove tmpfile since sqlite can store db in memory - filename = 'test_db.cfg' if six.PY2 else "test_db_py3.cfg" - ret = list() - tmpfile = Pathed.tmp() - fullpath = os.path.join(os.curdir, filename) - - try: - fd = open(fullpath) - except IOError: - raise IOError("""You must specify the databases to use for testing! -Copy %(filename)s.tmpl to %(filename)s and edit your database URLs.""" % locals()) - - for line in fd: - if line.startswith('#'): - continue - line = line.replace('__tmp__', tmpfile).strip() - ret.append(line) - fd.close() - return ret - -def is_supported(url, supported, not_supported): - db = url.split(':', 1)[0] - - if supported is not None: - if isinstance(supported, six.string_types): - return supported == db - else: - return db in supported - elif not_supported is not None: - if isinstance(not_supported, six.string_types): - return not_supported != db - else: - return not (db in not_supported) - return True - - -def usedb(supported=None, not_supported=None): - """Decorates tests to be run with a database connection - These tests are run once for each available database - - @param supported: run tests for ONLY these databases - @param not_supported: run tests for all databases EXCEPT these - - If both supported and not_supported are empty, all dbs are assumed - to be supported - """ - if supported is not None and not_supported is not None: - raise AssertionError("Can't specify both supported and not_supported in fixture.db()") - - urls = readurls() - my_urls = [url for url in urls if is_supported(url, supported, not_supported)] - - @decorator - def dec(f, self, *a, **kw): - failed_for = [] - fail = False - for url in my_urls: - try: - log.debug("Running test with engine %s", url) - try: - self._setup(url) - except sa_exc.OperationalError: - log.info('Backend %s is not available, skip it', url) - continue - except Exception as e: - raise RuntimeError('Exception during _setup(): %r' % e) - - try: - f(self, *a, **kw) - finally: - try: - self._teardown() - except Exception as e: - raise RuntimeError('Exception during _teardown(): %r' % e) - except Exception: - failed_for.append(url) - fail = sys.exc_info() - for url in failed_for: - log.error('Failed for %s', url) - if fail: - # cause the failure :-) - six.reraise(*fail) - return dec - - -class DB(Base): - # Constants: connection level - NONE = 0 # No connection; just set self.url - CONNECT = 1 # Connect; no transaction - TXN = 2 # Everything in a transaction - - level = TXN - - def _engineInfo(self, url=None): - if url is None: - url = self.url - return url - - def _setup(self, url): - self._connect(url) - # make sure there are no tables lying around - meta = MetaData(self.engine) - meta.reflect() - meta.drop_all() - - def _teardown(self): - self._disconnect() - - def _connect(self, url): - self.url = url - # TODO: seems like 0.5.x branch does not work with engine.dispose and staticpool - #self.engine = create_engine(url, echo=True, poolclass=StaticPool) - self.engine = create_engine(url, echo=True) - # silence the logger added by SA, nose adds its own! - logging.getLogger('sqlalchemy').handlers=[] - self.meta = MetaData(bind=self.engine) - if self.level < self.CONNECT: - return - #self.session = create_session(bind=self.engine) - if self.level < self.TXN: - return - #self.txn = self.session.begin() - - def _disconnect(self): - if hasattr(self, 'txn'): - self.txn.rollback() - if hasattr(self, 'session'): - self.session.close() - #if hasattr(self,'conn'): - # self.conn.close() - self.engine.dispose() - - def _supported(self, url): - db = url.split(':',1)[0] - func = getattr(self, self._TestCase__testMethodName) - if hasattr(func, 'supported'): - return db in func.supported - if hasattr(func, 'not_supported'): - return not (db in func.not_supported) - # Neither list assigned; assume all are supported - return True - - def _not_supported(self, url): - return not self._supported(url) - - def _select_row(self): - """Select rows, used in multiple tests""" - return self.table.select().execution_options( - autocommit=True).execute().fetchone() - - def refresh_table(self, name=None): - """Reload the table from the database - Assumes we're working with only a single table, self.table, and - metadata self.meta - - Working w/ multiple tables is not possible, as tables can only be - reloaded with meta.clear() - """ - if name is None: - name = self.table.name - self.meta.clear() - self.table = Table(name, self.meta, autoload=True) - - def compare_columns_equal(self, columns1, columns2, ignore=None): - """Loop through all columns and compare them""" - def key(column): - return column.name - for c1, c2 in zip(sorted(columns1, key=key), sorted(columns2, key=key)): - diffs = ColumnDelta(c1, c2).diffs - if ignore: - for key in ignore: - diffs.pop(key, None) - if diffs: - self.fail("Comparing %s to %s failed: %s" % (columns1, columns2, diffs)) - -# TODO: document engine.dispose and write tests diff --git a/migrate/tests/fixture/models.py b/migrate/tests/fixture/models.py deleted file mode 100644 index ee76429..0000000 --- a/migrate/tests/fixture/models.py +++ /dev/null @@ -1,14 +0,0 @@ -from sqlalchemy import * - -# test rundiffs in shell -meta_old_rundiffs = MetaData() -meta_rundiffs = MetaData() -meta = MetaData() - -tmp_account_rundiffs = Table('tmp_account_rundiffs', meta_rundiffs, - Column('id', Integer, primary_key=True), - Column('login', Text()), - Column('passwd', Text()), -) - -tmp_sql_table = Table('tmp_sql_table', meta, Column('id', Integer)) diff --git a/migrate/tests/fixture/pathed.py b/migrate/tests/fixture/pathed.py deleted file mode 100644 index 78cf4cd..0000000 --- a/migrate/tests/fixture/pathed.py +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import sys -import shutil -import tempfile - -from migrate.tests.fixture import base - - -class Pathed(base.Base): - # Temporary files - - _tmpdir = tempfile.mkdtemp() - - def setUp(self): - super(Pathed, self).setUp() - self.temp_usable_dir = tempfile.mkdtemp() - sys.path.append(self.temp_usable_dir) - - def tearDown(self): - super(Pathed, self).tearDown() - try: - sys.path.remove(self.temp_usable_dir) - except: - pass # w00t? - Pathed.purge(self.temp_usable_dir) - - @classmethod - def _tmp(cls, prefix='', suffix=''): - """Generate a temporary file name that doesn't exist - All filenames are generated inside a temporary directory created by - tempfile.mkdtemp(); only the creating user has access to this directory. - It should be secure to return a nonexistant temp filename in this - directory, unless the user is messing with their own files. - """ - file, ret = tempfile.mkstemp(suffix,prefix,cls._tmpdir) - os.close(file) - os.remove(ret) - return ret - - @classmethod - def tmp(cls, *p, **k): - return cls._tmp(*p, **k) - - @classmethod - def tmp_py(cls, *p, **k): - return cls._tmp(suffix='.py', *p, **k) - - @classmethod - def tmp_sql(cls, *p, **k): - return cls._tmp(suffix='.sql', *p, **k) - - @classmethod - def tmp_named(cls, name): - return os.path.join(cls._tmpdir, name) - - @classmethod - def tmp_repos(cls, *p, **k): - return cls._tmp(*p, **k) - - @classmethod - def purge(cls, path): - """Removes this path if it exists, in preparation for tests - Careful - all tests should take place in /tmp. - We don't want to accidentally wipe stuff out... - """ - if os.path.exists(path): - if os.path.isdir(path): - shutil.rmtree(path) - else: - os.remove(path) - if path.endswith('.py'): - pyc = path + 'c' - if os.path.exists(pyc): - os.remove(pyc) diff --git a/migrate/tests/fixture/shell.py b/migrate/tests/fixture/shell.py deleted file mode 100644 index 566d250..0000000 --- a/migrate/tests/fixture/shell.py +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import sys -import logging - -from scripttest import TestFileEnvironment - -from migrate.tests.fixture.pathed import * - - -log = logging.getLogger(__name__) - -class Shell(Pathed): - """Base class for command line tests""" - - def setUp(self): - super(Shell, self).setUp() - migrate_path = os.path.dirname(sys.executable) - # PATH to migrate development script folder - log.debug('PATH for ScriptTest: %s', migrate_path) - self.env = TestFileEnvironment( - base_path=os.path.join(self.temp_usable_dir, 'env'), - ) - - def run_version(self, repos_path): - result = self.env.run('migrate version %s' % repos_path) - return int(result.stdout.strip()) - - def run_db_version(self, url, repos_path): - result = self.env.run('migrate db_version %s %s' % (url, repos_path)) - return int(result.stdout.strip()) diff --git a/migrate/tests/fixture/warnings.py b/migrate/tests/fixture/warnings.py deleted file mode 100644 index 8d99c0f..0000000 --- a/migrate/tests/fixture/warnings.py +++ /dev/null @@ -1,88 +0,0 @@ -# lifted from Python 2.6, so we can use it in Python 2.5 -import sys - -class WarningMessage(object): - - """Holds the result of a single showwarning() call.""" - - _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file", - "line") - - def __init__(self, message, category, filename, lineno, file=None, - line=None): - local_values = locals() - for attr in self._WARNING_DETAILS: - setattr(self, attr, local_values[attr]) - if category: - self._category_name = category.__name__ - else: - self._category_name = None - - def __str__(self): - return ("{message : %r, category : %r, filename : %r, lineno : %s, " - "line : %r}" % (self.message, self._category_name, - self.filename, self.lineno, self.line)) - - -class catch_warnings(object): - - """A context manager that copies and restores the warnings filter upon - exiting the context. - - The 'record' argument specifies whether warnings should be captured by a - custom implementation of warnings.showwarning() and be appended to a list - returned by the context manager. Otherwise None is returned by the context - manager. The objects appended to the list are arguments whose attributes - mirror the arguments to showwarning(). - - The 'module' argument is to specify an alternative module to the module - named 'warnings' and imported under that name. This argument is only useful - when testing the warnings module itself. - - """ - - def __init__(self, record=False, module=None): - """Specify whether to record warnings and if an alternative module - should be used other than sys.modules['warnings']. - - For compatibility with Python 3.0, please consider all arguments to be - keyword-only. - - """ - self._record = record - if module is None: - self._module = sys.modules['warnings'] - else: - self._module = module - self._entered = False - - def __repr__(self): - args = [] - if self._record: - args.append("record=True") - if self._module is not sys.modules['warnings']: - args.append("module=%r" % self._module) - name = type(self).__name__ - return "%s(%s)" % (name, ", ".join(args)) - - def __enter__(self): - if self._entered: - raise RuntimeError("Cannot enter %r twice" % self) - self._entered = True - self._filters = self._module.filters - self._module.filters = self._filters[:] - self._showwarning = self._module.showwarning - if self._record: - log = [] - def showwarning(*args, **kwargs): - log.append(WarningMessage(*args, **kwargs)) - self._module.showwarning = showwarning - return log - else: - return None - - def __exit__(self, *exc_info): - if not self._entered: - raise RuntimeError("Cannot exit %r without entering first" % self) - self._module.filters = self._filters - self._module.showwarning = self._showwarning diff --git a/migrate/tests/integrated/__init__.py b/migrate/tests/integrated/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/migrate/tests/integrated/__init__.py +++ /dev/null diff --git a/migrate/tests/integrated/test_docs.py b/migrate/tests/integrated/test_docs.py deleted file mode 100644 index 8e35427..0000000 --- a/migrate/tests/integrated/test_docs.py +++ /dev/null @@ -1,18 +0,0 @@ -import doctest -import os - - -from migrate.tests import fixture - -# Collect tests for all handwritten docs: doc/*.rst - -dir = ('..','..','..','doc','source') -absdir = (os.path.dirname(os.path.abspath(__file__)),)+dir -dirpath = os.path.join(*absdir) -files = [f for f in os.listdir(dirpath) if f.endswith('.rst')] -paths = [os.path.join(*(dir+(f,))) for f in files] -assert len(paths) > 0 -suite = doctest.DocFileSuite(*paths) - -def test_docs(): - suite.debug() diff --git a/migrate/tests/versioning/__init__.py b/migrate/tests/versioning/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/migrate/tests/versioning/__init__.py +++ /dev/null diff --git a/migrate/tests/versioning/test_api.py b/migrate/tests/versioning/test_api.py deleted file mode 100644 index bc4b29d..0000000 --- a/migrate/tests/versioning/test_api.py +++ /dev/null @@ -1,128 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -import six - -from migrate.exceptions import * -from migrate.versioning import api - -from migrate.tests.fixture.pathed import * -from migrate.tests.fixture import models -from migrate.tests import fixture - - -class TestAPI(Pathed): - - def test_help(self): - self.assertTrue(isinstance(api.help('help'), six.string_types)) - self.assertRaises(UsageError, api.help) - self.assertRaises(UsageError, api.help, 'foobar') - self.assertTrue(isinstance(api.help('create'), str)) - - # test that all commands return some text - for cmd in api.__all__: - content = api.help(cmd) - self.assertTrue(content) - - def test_create(self): - tmprepo = self.tmp_repos() - api.create(tmprepo, 'temp') - - # repository already exists - self.assertRaises(KnownError, api.create, tmprepo, 'temp') - - def test_script(self): - repo = self.tmp_repos() - api.create(repo, 'temp') - api.script('first version', repo) - - def test_script_sql(self): - repo = self.tmp_repos() - api.create(repo, 'temp') - api.script_sql('postgres', 'desc', repo) - - def test_version(self): - repo = self.tmp_repos() - api.create(repo, 'temp') - api.version(repo) - - def test_version_control(self): - repo = self.tmp_repos() - api.create(repo, 'temp') - api.version_control('sqlite:///', repo) - api.version_control('sqlite:///', six.text_type(repo)) - - def test_source(self): - repo = self.tmp_repos() - api.create(repo, 'temp') - api.script('first version', repo) - api.script_sql('default', 'desc', repo) - - # no repository - self.assertRaises(UsageError, api.source, 1) - - # stdout - out = api.source(1, dest=None, repository=repo) - self.assertTrue(out) - - # file - out = api.source(1, dest=self.tmp_repos(), repository=repo) - self.assertFalse(out) - - def test_manage(self): - output = api.manage(os.path.join(self.temp_usable_dir, 'manage.py')) - - -class TestSchemaAPI(fixture.DB, Pathed): - - def _setup(self, url): - super(TestSchemaAPI, self)._setup(url) - self.repo = self.tmp_repos() - api.create(self.repo, 'temp') - self.schema = api.version_control(url, self.repo) - - def _teardown(self): - self.schema = api.drop_version_control(self.url, self.repo) - super(TestSchemaAPI, self)._teardown() - - @fixture.usedb() - def test_workflow(self): - self.assertEqual(api.db_version(self.url, self.repo), 0) - api.script('First Version', self.repo) - self.assertEqual(api.db_version(self.url, self.repo), 0) - api.upgrade(self.url, self.repo, 1) - self.assertEqual(api.db_version(self.url, self.repo), 1) - api.downgrade(self.url, self.repo, 0) - self.assertEqual(api.db_version(self.url, self.repo), 0) - api.test(self.url, self.repo) - self.assertEqual(api.db_version(self.url, self.repo), 0) - - # preview - # TODO: test output - out = api.upgrade(self.url, self.repo, preview_py=True) - out = api.upgrade(self.url, self.repo, preview_sql=True) - - api.upgrade(self.url, self.repo, 1) - api.script_sql('default', 'desc', self.repo) - self.assertRaises(UsageError, api.upgrade, self.url, self.repo, 2, preview_py=True) - out = api.upgrade(self.url, self.repo, 2, preview_sql=True) - - # cant upgrade to version 1, already at version 1 - self.assertEqual(api.db_version(self.url, self.repo), 1) - self.assertRaises(KnownError, api.upgrade, self.url, self.repo, 0) - - @fixture.usedb() - def test_compare_model_to_db(self): - diff = api.compare_model_to_db(self.url, self.repo, models.meta) - - @fixture.usedb() - def test_create_model(self): - model = api.create_model(self.url, self.repo) - - @fixture.usedb() - def test_make_update_script_for_model(self): - model = api.make_update_script_for_model(self.url, self.repo, models.meta_old_rundiffs, models.meta_rundiffs) - - @fixture.usedb() - def test_update_db_from_model(self): - model = api.update_db_from_model(self.url, self.repo, models.meta_rundiffs) diff --git a/migrate/tests/versioning/test_cfgparse.py b/migrate/tests/versioning/test_cfgparse.py deleted file mode 100644 index a31273e..0000000 --- a/migrate/tests/versioning/test_cfgparse.py +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -from migrate.versioning import cfgparse -from migrate.versioning.repository import * -from migrate.versioning.template import Template -from migrate.tests import fixture - - -class TestConfigParser(fixture.Base): - - def test_to_dict(self): - """Correctly interpret config results as dictionaries""" - parser = cfgparse.Parser(dict(default_value=42)) - self.assertTrue(len(parser.sections()) == 0) - parser.add_section('section') - parser.set('section','option','value') - self.assertEqual(parser.get('section', 'option'), 'value') - self.assertEqual(parser.to_dict()['section']['option'], 'value') - - def test_table_config(self): - """We should be able to specify the table to be used with a repository""" - default_text = Repository.prepare_config(Template().get_repository(), - 'repository_name', {}) - specified_text = Repository.prepare_config(Template().get_repository(), - 'repository_name', {'version_table': '_other_table'}) - self.assertNotEqual(default_text, specified_text) diff --git a/migrate/tests/versioning/test_database.py b/migrate/tests/versioning/test_database.py deleted file mode 100644 index 8291c6b..0000000 --- a/migrate/tests/versioning/test_database.py +++ /dev/null @@ -1,13 +0,0 @@ -from sqlalchemy import select, text -from migrate.tests import fixture - -class TestConnect(fixture.DB): - level=fixture.DB.TXN - - @fixture.usedb() - def test_connect(self): - """Connect to the database successfully""" - # Connection is done in fixture.DB setup; make sure we can do stuff - self.engine.execute( - select([text('42')]) - ) diff --git a/migrate/tests/versioning/test_genmodel.py b/migrate/tests/versioning/test_genmodel.py deleted file mode 100644 index f800826..0000000 --- a/migrate/tests/versioning/test_genmodel.py +++ /dev/null @@ -1,214 +0,0 @@ -# -*- coding: utf-8 -*- - -import os - -import six -import sqlalchemy -from sqlalchemy import * - -from migrate.versioning import genmodel, schemadiff -from migrate.changeset import schema - -from migrate.tests import fixture - - -class TestSchemaDiff(fixture.DB): - table_name = 'tmp_schemadiff' - level = fixture.DB.CONNECT - - def _setup(self, url): - super(TestSchemaDiff, self)._setup(url) - self.meta = MetaData(self.engine) - self.meta.reflect() - self.meta.drop_all() # in case junk tables are lying around in the test database - self.meta = MetaData(self.engine) - self.meta.reflect() # needed if we just deleted some tables - self.table = Table(self.table_name, self.meta, - Column('id',Integer(), primary_key=True), - Column('name', UnicodeText()), - Column('data', UnicodeText()), - ) - - def _teardown(self): - if self.table.exists(): - self.meta = MetaData(self.engine) - self.meta.reflect() - self.meta.drop_all() - super(TestSchemaDiff, self)._teardown() - - def _applyLatestModel(self): - diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version']) - genmodel.ModelGenerator(diff,self.engine).runB2A() - - # NOTE(mriedem): DB2 handles UnicodeText as LONG VARGRAPHIC - # so the schema diffs on the columns don't work with this test. - @fixture.usedb(not_supported='ibm_db_sa') - def test_functional(self): - def assertDiff(isDiff, tablesMissingInDatabase, tablesMissingInModel, tablesWithDiff): - diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version']) - self.assertEqual( - (diff.tables_missing_from_B, - diff.tables_missing_from_A, - list(diff.tables_different.keys()), - bool(diff)), - (tablesMissingInDatabase, - tablesMissingInModel, - tablesWithDiff, - isDiff) - ) - - # Model is defined but database is empty. - assertDiff(True, [self.table_name], [], []) - - # Check Python upgrade and downgrade of database from updated model. - diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version']) - decls, upgradeCommands, downgradeCommands = genmodel.ModelGenerator(diff,self.engine).genB2AMigration() - - # Feature test for a recent SQLa feature; - # expect different output in that case. - if repr(String()) == 'String()': - self.assertEqualIgnoreWhitespace(decls, ''' - from migrate.changeset import schema - pre_meta = MetaData() - post_meta = MetaData() - tmp_schemadiff = Table('tmp_schemadiff', post_meta, - Column('id', Integer, primary_key=True, nullable=False), - Column('name', UnicodeText), - Column('data', UnicodeText), - ) - ''') - else: - self.assertEqualIgnoreWhitespace(decls, ''' - from migrate.changeset import schema - pre_meta = MetaData() - post_meta = MetaData() - tmp_schemadiff = Table('tmp_schemadiff', post_meta, - Column('id', Integer, primary_key=True, nullable=False), - Column('name', UnicodeText(length=None)), - Column('data', UnicodeText(length=None)), - ) - ''') - - # Create table in database, now model should match database. - self._applyLatestModel() - assertDiff(False, [], [], []) - - # Check Python code gen from database. - diff = schemadiff.getDiffOfModelAgainstDatabase(MetaData(), self.engine, excludeTables=['migrate_version']) - src = genmodel.ModelGenerator(diff,self.engine).genBDefinition() - - namespace = {} - six.exec_(src, namespace) - - c1 = Table('tmp_schemadiff', self.meta, autoload=True).c - c2 = namespace['tmp_schemadiff'].c - self.compare_columns_equal(c1, c2, ['type']) - # TODO: get rid of ignoring type - - if not self.engine.name == 'oracle': - # Add data, later we'll make sure it's still present. - result = self.engine.execute(self.table.insert(), id=1, name=u'mydata') - dataId = result.inserted_primary_key[0] - - # Modify table in model (by removing it and adding it back to model) - # Drop column data, add columns data2 and data3. - self.meta.remove(self.table) - self.table = Table(self.table_name,self.meta, - Column('id',Integer(),primary_key=True), - Column('name',UnicodeText(length=None)), - Column('data2',Integer(),nullable=True), - Column('data3',Integer(),nullable=True), - ) - assertDiff(True, [], [], [self.table_name]) - - # Apply latest model changes and find no more diffs. - self._applyLatestModel() - assertDiff(False, [], [], []) - - # Drop column data3, add data4 - self.meta.remove(self.table) - self.table = Table(self.table_name,self.meta, - Column('id',Integer(),primary_key=True), - Column('name',UnicodeText(length=None)), - Column('data2',Integer(),nullable=True), - Column('data4',Float(),nullable=True), - ) - assertDiff(True, [], [], [self.table_name]) - - diff = schemadiff.getDiffOfModelAgainstDatabase( - self.meta, self.engine, excludeTables=['migrate_version']) - decls, upgradeCommands, downgradeCommands = genmodel.ModelGenerator(diff,self.engine).genB2AMigration(indent='') - - # decls have changed since genBDefinition - six.exec_(decls, namespace) - # migration commands expect a namespace containing migrate_engine - namespace['migrate_engine'] = self.engine - # run the migration up and down - six.exec_(upgradeCommands, namespace) - assertDiff(False, [], [], []) - - six.exec_(decls, namespace) - six.exec_(downgradeCommands, namespace) - assertDiff(True, [], [], [self.table_name]) - - six.exec_(decls, namespace) - six.exec_(upgradeCommands, namespace) - assertDiff(False, [], [], []) - - if not self.engine.name == 'oracle': - # Make sure data is still present. - result = self.engine.execute(self.table.select(self.table.c.id==dataId)) - rows = result.fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0].name, 'mydata') - - # Add data, later we'll make sure it's still present. - result = self.engine.execute(self.table.insert(), id=2, name=u'mydata2', data2=123) - dataId2 = result.inserted_primary_key[0] - - # Change column type in model. - self.meta.remove(self.table) - self.table = Table(self.table_name,self.meta, - Column('id',Integer(),primary_key=True), - Column('name',UnicodeText(length=None)), - Column('data2',String(255),nullable=True), - ) - - # XXX test type diff - return - - assertDiff(True, [], [], [self.table_name]) - - # Apply latest model changes and find no more diffs. - self._applyLatestModel() - assertDiff(False, [], [], []) - - if not self.engine.name == 'oracle': - # Make sure data is still present. - result = self.engine.execute(self.table.select(self.table.c.id==dataId2)) - rows = result.fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0].name, 'mydata2') - self.assertEqual(rows[0].data2, '123') - - # Delete data, since we're about to make a required column. - # Not even using sqlalchemy.PassiveDefault helps because we're doing explicit column select. - self.engine.execute(self.table.delete(), id=dataId) - - if not self.engine.name == 'firebird': - # Change column nullable in model. - self.meta.remove(self.table) - self.table = Table(self.table_name,self.meta, - Column('id',Integer(),primary_key=True), - Column('name',UnicodeText(length=None)), - Column('data2',String(255),nullable=False), - ) - assertDiff(True, [], [], [self.table_name]) # TODO test nullable diff - - # Apply latest model changes and find no more diffs. - self._applyLatestModel() - assertDiff(False, [], [], []) - - # Remove table from model. - self.meta.remove(self.table) - assertDiff(True, [], [self.table_name], []) diff --git a/migrate/tests/versioning/test_keyedinstance.py b/migrate/tests/versioning/test_keyedinstance.py deleted file mode 100644 index 485cbbb..0000000 --- a/migrate/tests/versioning/test_keyedinstance.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -from migrate.tests import fixture -from migrate.versioning.util.keyedinstance import * - -class TestKeydInstance(fixture.Base): - def test_unique(self): - """UniqueInstance should produce unique object instances""" - class Uniq1(KeyedInstance): - @classmethod - def _key(cls,key): - return str(key) - def __init__(self,value): - self.value=value - class Uniq2(KeyedInstance): - @classmethod - def _key(cls,key): - return str(key) - def __init__(self,value): - self.value=value - - a10 = Uniq1('a') - - # Different key: different instance - b10 = Uniq1('b') - self.assertTrue(a10 is not b10) - - # Different class: different instance - a20 = Uniq2('a') - self.assertTrue(a10 is not a20) - - # Same key/class: same instance - a11 = Uniq1('a') - self.assertTrue(a10 is a11) - - # __init__ is called - self.assertEqual(a10.value,'a') - - # clear() causes us to forget all existing instances - Uniq1.clear() - a12 = Uniq1('a') - self.assertTrue(a10 is not a12) - - self.assertRaises(NotImplementedError, KeyedInstance._key) diff --git a/migrate/tests/versioning/test_pathed.py b/migrate/tests/versioning/test_pathed.py deleted file mode 100644 index 53f0b47..0000000 --- a/migrate/tests/versioning/test_pathed.py +++ /dev/null @@ -1,51 +0,0 @@ -from migrate.tests import fixture -from migrate.versioning.pathed import * - -class TestPathed(fixture.Base): - def test_parent_path(self): - """Default parent_path should behave correctly""" - filepath='/fgsfds/moot.py' - dirpath='/fgsfds/moot' - sdirpath='/fgsfds/moot/' - - result='/fgsfds' - self.assertTrue(result==Pathed._parent_path(filepath)) - self.assertTrue(result==Pathed._parent_path(dirpath)) - self.assertTrue(result==Pathed._parent_path(sdirpath)) - - def test_new(self): - """Pathed(path) shouldn't create duplicate objects of the same path""" - path='/fgsfds' - class Test(Pathed): - attr=None - o1=Test(path) - o2=Test(path) - self.assertTrue(isinstance(o1,Test)) - self.assertTrue(o1.path==path) - self.assertTrue(o1 is o2) - o1.attr='herring' - self.assertTrue(o2.attr=='herring') - o2.attr='shrubbery' - self.assertTrue(o1.attr=='shrubbery') - - def test_parent(self): - """Parents should be fetched correctly""" - class Parent(Pathed): - parent=None - children=0 - def _init_child(self,child,path): - """Keep a tally of children. - (A real class might do something more interesting here) - """ - self.__class__.children+=1 - - class Child(Pathed): - parent=Parent - - path='/fgsfds/moot.py' - parent_path='/fgsfds' - object=Child(path) - self.assertTrue(isinstance(object,Child)) - self.assertTrue(isinstance(object.parent,Parent)) - self.assertTrue(object.path==path) - self.assertTrue(object.parent.path==parent_path) diff --git a/migrate/tests/versioning/test_repository.py b/migrate/tests/versioning/test_repository.py deleted file mode 100644 index 6e87c02..0000000 --- a/migrate/tests/versioning/test_repository.py +++ /dev/null @@ -1,216 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import shutil - -from migrate import exceptions -from migrate.versioning.repository import * -from migrate.versioning.script import * - -from migrate.tests import fixture -from datetime import datetime - - -class TestRepository(fixture.Pathed): - def test_create(self): - """Repositories are created successfully""" - path = self.tmp_repos() - name = 'repository_name' - - # Creating a repository that doesn't exist should succeed - repo = Repository.create(path, name) - config_path = repo.config.path - manage_path = os.path.join(repo.path, 'manage.py') - self.assertTrue(repo) - - # Files should actually be created - self.assertTrue(os.path.exists(path)) - self.assertTrue(os.path.exists(config_path)) - self.assertTrue(os.path.exists(manage_path)) - - # Can't create it again: it already exists - self.assertRaises(exceptions.PathFoundError, Repository.create, path, name) - return path - - def test_load(self): - """We should be able to load information about an existing repository""" - # Create a repository to load - path = self.test_create() - repos = Repository(path) - - self.assertTrue(repos) - self.assertTrue(repos.config) - self.assertTrue(repos.config.get('db_settings', 'version_table')) - - # version_table's default isn't none - self.assertNotEqual(repos.config.get('db_settings', 'version_table'), 'None') - - def test_load_notfound(self): - """Nonexistant repositories shouldn't be loaded""" - path = self.tmp_repos() - self.assertTrue(not os.path.exists(path)) - self.assertRaises(exceptions.InvalidRepositoryError, Repository, path) - - def test_load_invalid(self): - """Invalid repos shouldn't be loaded""" - # Here, invalid=empty directory. There may be other conditions too, - # but we shouldn't need to test all of them - path = self.tmp_repos() - os.mkdir(path) - self.assertRaises(exceptions.InvalidRepositoryError, Repository, path) - - -class TestVersionedRepository(fixture.Pathed): - """Tests on an existing repository with a single python script""" - - def setUp(self): - super(TestVersionedRepository, self).setUp() - Repository.clear() - self.path_repos = self.tmp_repos() - Repository.create(self.path_repos, 'repository_name') - - def test_version(self): - """We should correctly detect the version of a repository""" - repos = Repository(self.path_repos) - - # Get latest version, or detect if a specified version exists - self.assertEqual(repos.latest, 0) - # repos.latest isn't an integer, but a VerNum - # (so we can't just assume the following tests are correct) - self.assertTrue(repos.latest >= 0) - self.assertTrue(repos.latest < 1) - - # Create a script and test again - repos.create_script('') - self.assertEqual(repos.latest, 1) - self.assertTrue(repos.latest >= 0) - self.assertTrue(repos.latest >= 1) - self.assertTrue(repos.latest < 2) - - # Create a new script and test again - repos.create_script('') - self.assertEqual(repos.latest, 2) - self.assertTrue(repos.latest >= 0) - self.assertTrue(repos.latest >= 1) - self.assertTrue(repos.latest >= 2) - self.assertTrue(repos.latest < 3) - - - def test_timestmap_numbering_version(self): - repos = Repository(self.path_repos) - repos.config.set('db_settings', 'use_timestamp_numbering', 'True') - - # Get latest version, or detect if a specified version exists - self.assertEqual(repos.latest, 0) - # repos.latest isn't an integer, but a VerNum - # (so we can't just assume the following tests are correct) - self.assertTrue(repos.latest >= 0) - self.assertTrue(repos.latest < 1) - - # Create a script and test again - now = int(datetime.utcnow().strftime('%Y%m%d%H%M%S')) - repos.create_script('') - self.assertEqual(repos.latest, now) - - def test_source(self): - """Get a script object by version number and view its source""" - # Load repository and commit script - repo = Repository(self.path_repos) - repo.create_script('') - repo.create_script_sql('postgres', 'foo bar') - - # Source is valid: script must have an upgrade function - # (not a very thorough test, but should be plenty) - source = repo.version(1).script().source() - self.assertTrue(source.find('def upgrade') >= 0) - - import pprint; pprint.pprint(repo.version(2).sql) - source = repo.version(2).script('postgres', 'upgrade').source() - self.assertEqual(source.strip(), '') - - def test_latestversion(self): - """Repository.version() (no params) returns the latest version""" - repos = Repository(self.path_repos) - repos.create_script('') - self.assertTrue(repos.version(repos.latest) is repos.version()) - self.assertTrue(repos.version() is not None) - - def test_changeset(self): - """Repositories can create changesets properly""" - # Create a nonzero-version repository of empty scripts - repos = Repository(self.path_repos) - for i in range(10): - repos.create_script('') - - def check_changeset(params, length): - """Creates and verifies a changeset""" - changeset = repos.changeset('postgres', *params) - self.assertEqual(len(changeset), length) - self.assertTrue(isinstance(changeset, Changeset)) - uniq = list() - # Changesets are iterable - for version, change in changeset: - self.assertTrue(isinstance(change, BaseScript)) - # Changes aren't identical - self.assertTrue(id(change) not in uniq) - uniq.append(id(change)) - return changeset - - # Upgrade to a specified version... - cs = check_changeset((0, 10), 10) - self.assertEqual(cs.keys().pop(0),0 ) # 0 -> 1: index is starting version - self.assertEqual(cs.keys().pop(), 9) # 9 -> 10: index is starting version - self.assertEqual(cs.start, 0) # starting version - self.assertEqual(cs.end, 10) # ending version - check_changeset((0, 1), 1) - check_changeset((0, 5), 5) - check_changeset((0, 0), 0) - check_changeset((5, 5), 0) - check_changeset((10, 10), 0) - check_changeset((5, 10), 5) - - # Can't request a changeset of higher version than this repository - self.assertRaises(Exception, repos.changeset, 'postgres', 5, 11) - self.assertRaises(Exception, repos.changeset, 'postgres', -1, 5) - - # Upgrade to the latest version... - cs = check_changeset((0,), 10) - self.assertEqual(cs.keys().pop(0), 0) - self.assertEqual(cs.keys().pop(), 9) - self.assertEqual(cs.start, 0) - self.assertEqual(cs.end, 10) - check_changeset((1,), 9) - check_changeset((5,), 5) - check_changeset((9,), 1) - check_changeset((10,), 0) - - # run changes - cs.run('postgres', 'upgrade') - - # Can't request a changeset of higher/lower version than this repository - self.assertRaises(Exception, repos.changeset, 'postgres', 11) - self.assertRaises(Exception, repos.changeset, 'postgres', -1) - - # Downgrade - cs = check_changeset((10, 0),10) - self.assertEqual(cs.keys().pop(0), 10) # 10 -> 9 - self.assertEqual(cs.keys().pop(), 1) # 1 -> 0 - self.assertEqual(cs.start, 10) - self.assertEqual(cs.end, 0) - check_changeset((10, 5), 5) - check_changeset((5, 0), 5) - - def test_many_versions(self): - """Test what happens when lots of versions are created""" - repos = Repository(self.path_repos) - for i in range(1001): - repos.create_script('') - - # since we normally create 3 digit ones, let's see if we blow up - self.assertTrue(os.path.exists('%s/versions/1000.py' % self.path_repos)) - self.assertTrue(os.path.exists('%s/versions/1001.py' % self.path_repos)) - - -# TODO: test manage file -# TODO: test changeset diff --git a/migrate/tests/versioning/test_runchangeset.py b/migrate/tests/versioning/test_runchangeset.py deleted file mode 100644 index 12bc77c..0000000 --- a/migrate/tests/versioning/test_runchangeset.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os,shutil - -from migrate.tests import fixture -from migrate.versioning.schema import * -from migrate.versioning import script - - -class TestRunChangeset(fixture.Pathed,fixture.DB): - level=fixture.DB.CONNECT - def _setup(self, url): - super(TestRunChangeset, self)._setup(url) - Repository.clear() - self.path_repos=self.tmp_repos() - # Create repository, script - Repository.create(self.path_repos,'repository_name') - - @fixture.usedb() - def test_changeset_run(self): - """Running a changeset against a repository gives expected results""" - repos=Repository(self.path_repos) - for i in range(10): - repos.create_script('') - try: - ControlledSchema(self.engine,repos).drop() - except: - pass - db=ControlledSchema.create(self.engine,repos) - - # Scripts are empty; we'll check version # correctness. - # (Correct application of their content is checked elsewhere) - self.assertEqual(db.version,0) - db.upgrade(1) - self.assertEqual(db.version,1) - db.upgrade(5) - self.assertEqual(db.version,5) - db.upgrade(5) - self.assertEqual(db.version,5) - db.upgrade(None) # Latest is implied - self.assertEqual(db.version,10) - self.assertRaises(Exception,db.upgrade,11) - self.assertEqual(db.version,10) - db.upgrade(9) - self.assertEqual(db.version,9) - db.upgrade(0) - self.assertEqual(db.version,0) - self.assertRaises(Exception,db.upgrade,-1) - self.assertEqual(db.version,0) - #changeset = repos.changeset(self.url,0) - db.drop() diff --git a/migrate/tests/versioning/test_schema.py b/migrate/tests/versioning/test_schema.py deleted file mode 100644 index 5396d9d..0000000 --- a/migrate/tests/versioning/test_schema.py +++ /dev/null @@ -1,205 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import shutil - -import six - -from migrate import exceptions -from migrate.versioning.schema import * -from migrate.versioning import script, schemadiff - -from sqlalchemy import * - -from migrate.tests import fixture - - -class TestControlledSchema(fixture.Pathed, fixture.DB): - # Transactions break postgres in this test; we'll clean up after ourselves - level = fixture.DB.CONNECT - - def setUp(self): - super(TestControlledSchema, self).setUp() - self.path_repos = self.temp_usable_dir + '/repo/' - self.repos = Repository.create(self.path_repos, 'repo_name') - - def _setup(self, url): - self.setUp() - super(TestControlledSchema, self)._setup(url) - self.cleanup() - - def _teardown(self): - super(TestControlledSchema, self)._teardown() - self.cleanup() - self.tearDown() - - def cleanup(self): - # drop existing version table if necessary - try: - ControlledSchema(self.engine, self.repos).drop() - except: - # No table to drop; that's fine, be silent - pass - - def tearDown(self): - self.cleanup() - super(TestControlledSchema, self).tearDown() - - @fixture.usedb() - def test_version_control(self): - """Establish version control on a particular database""" - # Establish version control on this database - dbcontrol = ControlledSchema.create(self.engine, self.repos) - - # Trying to create another DB this way fails: table exists - self.assertRaises(exceptions.DatabaseAlreadyControlledError, - ControlledSchema.create, self.engine, self.repos) - - # We can load a controlled DB this way, too - dbcontrol0 = ControlledSchema(self.engine, self.repos) - self.assertEqual(dbcontrol, dbcontrol0) - - # We can also use a repository path, instead of a repository - dbcontrol0 = ControlledSchema(self.engine, self.repos.path) - self.assertEqual(dbcontrol, dbcontrol0) - - # We don't have to use the same connection - engine = create_engine(self.url) - dbcontrol0 = ControlledSchema(engine, self.repos.path) - self.assertEqual(dbcontrol, dbcontrol0) - - # Clean up: - dbcontrol.drop() - - # Attempting to drop vc from a db without it should fail - self.assertRaises(exceptions.DatabaseNotControlledError, dbcontrol.drop) - - # No table defined should raise error - self.assertRaises(exceptions.DatabaseNotControlledError, - ControlledSchema, self.engine, self.repos) - - @fixture.usedb() - def test_version_control_specified(self): - """Establish version control with a specified version""" - # Establish version control on this database - version = 0 - dbcontrol = ControlledSchema.create(self.engine, self.repos, version) - self.assertEqual(dbcontrol.version, version) - - # Correct when we load it, too - dbcontrol = ControlledSchema(self.engine, self.repos) - self.assertEqual(dbcontrol.version, version) - - dbcontrol.drop() - - # Now try it with a nonzero value - version = 10 - for i in range(version): - self.repos.create_script('') - self.assertEqual(self.repos.latest, version) - - # Test with some mid-range value - dbcontrol = ControlledSchema.create(self.engine,self.repos, 5) - self.assertEqual(dbcontrol.version, 5) - dbcontrol.drop() - - # Test with max value - dbcontrol = ControlledSchema.create(self.engine, self.repos, version) - self.assertEqual(dbcontrol.version, version) - dbcontrol.drop() - - @fixture.usedb() - def test_version_control_invalid(self): - """Try to establish version control with an invalid version""" - versions = ('Thirteen', '-1', -1, '' , 13) - # A fresh repository doesn't go up to version 13 yet - for version in versions: - #self.assertRaises(ControlledSchema.InvalidVersionError, - # Can't have custom errors with assertRaises... - try: - ControlledSchema.create(self.engine, self.repos, version) - self.assertTrue(False, repr(version)) - except exceptions.InvalidVersionError: - pass - - @fixture.usedb() - def test_changeset(self): - """Create changeset from controlled schema""" - dbschema = ControlledSchema.create(self.engine, self.repos) - - # empty schema doesn't have changesets - cs = dbschema.changeset() - self.assertEqual(cs, {}) - - for i in range(5): - self.repos.create_script('') - self.assertEqual(self.repos.latest, 5) - - cs = dbschema.changeset(5) - self.assertEqual(len(cs), 5) - - # cleanup - dbschema.drop() - - @fixture.usedb() - def test_upgrade_runchange(self): - dbschema = ControlledSchema.create(self.engine, self.repos) - - for i in range(10): - self.repos.create_script('') - - self.assertEqual(self.repos.latest, 10) - - dbschema.upgrade(10) - - self.assertRaises(ValueError, dbschema.upgrade, 'a') - self.assertRaises(exceptions.InvalidVersionError, dbschema.runchange, 20, '', 1) - - # TODO: test for table version in db - - # cleanup - dbschema.drop() - - @fixture.usedb() - def test_create_model(self): - """Test workflow to generate create_model""" - model = ControlledSchema.create_model(self.engine, self.repos, declarative=False) - self.assertTrue(isinstance(model, six.string_types)) - - model = ControlledSchema.create_model(self.engine, self.repos.path, declarative=True) - self.assertTrue(isinstance(model, six.string_types)) - - @fixture.usedb() - def test_compare_model_to_db(self): - meta = self.construct_model() - - diff = ControlledSchema.compare_model_to_db(self.engine, meta, self.repos) - self.assertTrue(isinstance(diff, schemadiff.SchemaDiff)) - - diff = ControlledSchema.compare_model_to_db(self.engine, meta, self.repos.path) - self.assertTrue(isinstance(diff, schemadiff.SchemaDiff)) - meta.drop_all(self.engine) - - @fixture.usedb() - def test_update_db_from_model(self): - dbschema = ControlledSchema.create(self.engine, self.repos) - - meta = self.construct_model() - - dbschema.update_db_from_model(meta) - - # TODO: test for table version in db - - # cleanup - dbschema.drop() - meta.drop_all(self.engine) - - def construct_model(self): - meta = MetaData() - - user = Table('temp_model_schema', meta, Column('id', Integer), Column('user', String(245))) - - return meta - - # TODO: test how are tables populated in db diff --git a/migrate/tests/versioning/test_schemadiff.py b/migrate/tests/versioning/test_schemadiff.py deleted file mode 100644 index f45a012..0000000 --- a/migrate/tests/versioning/test_schemadiff.py +++ /dev/null @@ -1,227 +0,0 @@ -# -*- coding: utf-8 -*- - -import os - -from sqlalchemy import * - -from migrate.versioning import schemadiff - -from migrate.tests import fixture - -class SchemaDiffBase(fixture.DB): - - level = fixture.DB.CONNECT - def _make_table(self,*cols,**kw): - self.table = Table('xtable', self.meta, - Column('id',Integer(), primary_key=True), - *cols - ) - if kw.get('create',True): - self.table.create() - - def _assert_diff(self,col_A,col_B): - self._make_table(col_A) - self.meta.clear() - self._make_table(col_B,create=False) - diff = self._run_diff() - # print diff - self.assertTrue(diff) - self.assertEqual(1,len(diff.tables_different)) - td = list(diff.tables_different.values())[0] - self.assertEqual(1,len(td.columns_different)) - cd = list(td.columns_different.values())[0] - label_width = max(len(self.name1), len(self.name2)) - self.assertEqual(('Schema diffs:\n' - ' table with differences: xtable\n' - ' column with differences: data\n' - ' %*s: %r\n' - ' %*s: %r')%( - label_width, - self.name1, - cd.col_A, - label_width, - self.name2, - cd.col_B - ),str(diff)) - -class Test_getDiffOfModelAgainstDatabase(SchemaDiffBase): - name1 = 'model' - name2 = 'database' - - def _run_diff(self,**kw): - return schemadiff.getDiffOfModelAgainstDatabase( - self.meta, self.engine, **kw - ) - - @fixture.usedb() - def test_table_missing_in_db(self): - self._make_table(create=False) - diff = self._run_diff() - self.assertTrue(diff) - self.assertEqual('Schema diffs:\n tables missing from %s: xtable' % self.name2, - str(diff)) - - @fixture.usedb() - def test_table_missing_in_model(self): - self._make_table() - self.meta.clear() - diff = self._run_diff() - self.assertTrue(diff) - self.assertEqual('Schema diffs:\n tables missing from %s: xtable' % self.name1, - str(diff)) - - @fixture.usedb() - def test_column_missing_in_db(self): - # db - Table('xtable', self.meta, - Column('id',Integer(), primary_key=True), - ).create() - self.meta.clear() - # model - self._make_table( - Column('xcol',Integer()), - create=False - ) - # run diff - diff = self._run_diff() - self.assertTrue(diff) - self.assertEqual('Schema diffs:\n' - ' table with differences: xtable\n' - ' %s missing these columns: xcol' % self.name2, - str(diff)) - - @fixture.usedb() - def test_column_missing_in_model(self): - # db - self._make_table( - Column('xcol',Integer()), - ) - self.meta.clear() - # model - self._make_table( - create=False - ) - # run diff - diff = self._run_diff() - self.assertTrue(diff) - self.assertEqual('Schema diffs:\n' - ' table with differences: xtable\n' - ' %s missing these columns: xcol' % self.name1, - str(diff)) - - @fixture.usedb() - def test_exclude_tables(self): - # db - Table('ytable', self.meta, - Column('id',Integer(), primary_key=True), - ).create() - Table('ztable', self.meta, - Column('id',Integer(), primary_key=True), - ).create() - self.meta.clear() - # model - self._make_table( - create=False - ) - Table('ztable', self.meta, - Column('id',Integer(), primary_key=True), - ) - # run diff - diff = self._run_diff(excludeTables=('xtable','ytable')) - # ytable only in database - # xtable only in model - # ztable identical on both - # ...so we expect no diff! - self.assertFalse(diff) - self.assertEqual('No schema diffs',str(diff)) - - @fixture.usedb() - def test_identical_just_pk(self): - self._make_table() - diff = self._run_diff() - self.assertFalse(diff) - self.assertEqual('No schema diffs',str(diff)) - - - @fixture.usedb() - def test_different_type(self): - self._assert_diff( - Column('data', String(10)), - Column('data', Integer()), - ) - - @fixture.usedb() - def test_int_vs_float(self): - self._assert_diff( - Column('data', Integer()), - Column('data', Float()), - ) - - # NOTE(mriedem): The ibm_db_sa driver handles the Float() as a DOUBLE() - # which extends Numeric() but isn't defined in sqlalchemy.types, so we - # can't check for it as a special case like is done in schemadiff.ColDiff. - @fixture.usedb(not_supported='ibm_db_sa') - def test_float_vs_numeric(self): - self._assert_diff( - Column('data', Float()), - Column('data', Numeric()), - ) - - @fixture.usedb() - def test_numeric_precision(self): - self._assert_diff( - Column('data', Numeric(precision=5)), - Column('data', Numeric(precision=6)), - ) - - @fixture.usedb() - def test_numeric_scale(self): - self._assert_diff( - Column('data', Numeric(precision=6,scale=0)), - Column('data', Numeric(precision=6,scale=1)), - ) - - @fixture.usedb() - def test_string_length(self): - self._assert_diff( - Column('data', String(10)), - Column('data', String(20)), - ) - - @fixture.usedb() - def test_integer_identical(self): - self._make_table( - Column('data', Integer()), - ) - diff = self._run_diff() - self.assertEqual('No schema diffs',str(diff)) - self.assertFalse(diff) - - @fixture.usedb() - def test_string_identical(self): - self._make_table( - Column('data', String(10)), - ) - diff = self._run_diff() - self.assertEqual('No schema diffs',str(diff)) - self.assertFalse(diff) - - @fixture.usedb() - def test_text_identical(self): - self._make_table( - Column('data', Text), - ) - diff = self._run_diff() - self.assertEqual('No schema diffs',str(diff)) - self.assertFalse(diff) - -class Test_getDiffOfModelAgainstModel(Test_getDiffOfModelAgainstDatabase): - name1 = 'metadataA' - name2 = 'metadataB' - - def _run_diff(self,**kw): - db_meta= MetaData() - db_meta.reflect(self.engine) - return schemadiff.getDiffOfModelAgainstModel( - self.meta, db_meta, **kw - ) diff --git a/migrate/tests/versioning/test_script.py b/migrate/tests/versioning/test_script.py deleted file mode 100644 index 20e6af0..0000000 --- a/migrate/tests/versioning/test_script.py +++ /dev/null @@ -1,305 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import imp -import os -import sys -import shutil - -import six -from migrate import exceptions -from migrate.versioning import version, repository -from migrate.versioning.script import * -from migrate.versioning.util import * - -from migrate.tests import fixture -from migrate.tests.fixture.models import tmp_sql_table - - -class TestBaseScript(fixture.Pathed): - - def test_all(self): - """Testing all basic BaseScript operations""" - # verify / source / run - src = self.tmp() - open(src, 'w').close() - bscript = BaseScript(src) - BaseScript.verify(src) - self.assertEqual(bscript.source(), '') - self.assertRaises(NotImplementedError, bscript.run, 'foobar') - - -class TestPyScript(fixture.Pathed, fixture.DB): - cls = PythonScript - def test_create(self): - """We can create a migration script""" - path = self.tmp_py() - # Creating a file that doesn't exist should succeed - self.cls.create(path) - self.assertTrue(os.path.exists(path)) - # Created file should be a valid script (If not, raises an error) - self.cls.verify(path) - # Can't create it again: it already exists - self.assertRaises(exceptions.PathFoundError,self.cls.create,path) - - @fixture.usedb(supported='sqlite') - def test_run(self): - script_path = self.tmp_py() - pyscript = PythonScript.create(script_path) - pyscript.run(self.engine, 1) - pyscript.run(self.engine, -1) - - self.assertRaises(exceptions.ScriptError, pyscript.run, self.engine, 0) - self.assertRaises(exceptions.ScriptError, pyscript._func, 'foobar') - - # clean pyc file - if six.PY3: - os.remove(imp.cache_from_source(script_path)) - else: - os.remove(script_path + 'c') - - # test deprecated upgrade/downgrade with no arguments - contents = open(script_path, 'r').read() - f = open(script_path, 'w') - f.write(contents.replace("upgrade(migrate_engine)", "upgrade()")) - f.close() - - pyscript = PythonScript(script_path) - pyscript._module = None - try: - pyscript.run(self.engine, 1) - pyscript.run(self.engine, -1) - except exceptions.ScriptError: - pass - else: - self.fail() - - def test_verify_notfound(self): - """Correctly verify a python migration script: nonexistant file""" - path = self.tmp_py() - self.assertFalse(os.path.exists(path)) - # Fails on empty path - self.assertRaises(exceptions.InvalidScriptError,self.cls.verify,path) - self.assertRaises(exceptions.InvalidScriptError,self.cls,path) - - def test_verify_invalidpy(self): - """Correctly verify a python migration script: invalid python file""" - path=self.tmp_py() - # Create empty file - f = open(path,'w') - f.write("def fail") - f.close() - self.assertRaises(Exception,self.cls.verify_module,path) - # script isn't verified on creation, but on module reference - py = self.cls(path) - self.assertRaises(Exception,(lambda x: x.module),py) - - def test_verify_nofuncs(self): - """Correctly verify a python migration script: valid python file; no upgrade func""" - path = self.tmp_py() - # Create empty file - f = open(path, 'w') - f.write("def zergling():\n\tprint('rush')") - f.close() - self.assertRaises(exceptions.InvalidScriptError, self.cls.verify_module, path) - # script isn't verified on creation, but on module reference - py = self.cls(path) - self.assertRaises(exceptions.InvalidScriptError,(lambda x: x.module),py) - - @fixture.usedb(supported='sqlite') - def test_preview_sql(self): - """Preview SQL abstract from ORM layer (sqlite)""" - path = self.tmp_py() - - f = open(path, 'w') - content = ''' -from migrate import * -from sqlalchemy import * - -metadata = MetaData() - -UserGroup = Table('Link', metadata, - Column('link1ID', Integer), - Column('link2ID', Integer), - UniqueConstraint('link1ID', 'link2ID')) - -def upgrade(migrate_engine): - metadata.create_all(migrate_engine) - ''' - f.write(content) - f.close() - - pyscript = self.cls(path) - SQL = pyscript.preview_sql(self.url, 1) - self.assertEqualIgnoreWhitespace(""" - CREATE TABLE "Link" - ("link1ID" INTEGER, - "link2ID" INTEGER, - UNIQUE ("link1ID", "link2ID")) - """, SQL) - # TODO: test: No SQL should be executed! - - def test_verify_success(self): - """Correctly verify a python migration script: success""" - path = self.tmp_py() - # Succeeds after creating - self.cls.create(path) - self.cls.verify(path) - - # test for PythonScript.make_update_script_for_model - - @fixture.usedb() - def test_make_update_script_for_model(self): - """Construct script source from differences of two models""" - - self.setup_model_params() - self.write_file(self.first_model_path, self.base_source) - self.write_file(self.second_model_path, self.base_source + self.model_source) - - source_script = self.pyscript.make_update_script_for_model( - engine=self.engine, - oldmodel=load_model('testmodel_first:meta'), - model=load_model('testmodel_second:meta'), - repository=self.repo_path, - ) - - self.assertTrue("['User'].create()" in source_script) - self.assertTrue("['User'].drop()" in source_script) - - @fixture.usedb() - def test_make_update_script_for_equal_models(self): - """Try to make update script from two identical models""" - - self.setup_model_params() - self.write_file(self.first_model_path, self.base_source + self.model_source) - self.write_file(self.second_model_path, self.base_source + self.model_source) - - source_script = self.pyscript.make_update_script_for_model( - engine=self.engine, - oldmodel=load_model('testmodel_first:meta'), - model=load_model('testmodel_second:meta'), - repository=self.repo_path, - ) - - self.assertFalse('User.create()' in source_script) - self.assertFalse('User.drop()' in source_script) - - @fixture.usedb() - def test_make_update_script_direction(self): - """Check update scripts go in the right direction""" - - self.setup_model_params() - self.write_file(self.first_model_path, self.base_source) - self.write_file(self.second_model_path, self.base_source + self.model_source) - - source_script = self.pyscript.make_update_script_for_model( - engine=self.engine, - oldmodel=load_model('testmodel_first:meta'), - model=load_model('testmodel_second:meta'), - repository=self.repo_path, - ) - - self.assertTrue(0 - < source_script.find('upgrade') - < source_script.find("['User'].create()") - < source_script.find('downgrade') - < source_script.find("['User'].drop()")) - - def setup_model_params(self): - self.script_path = self.tmp_py() - self.repo_path = self.tmp() - self.first_model_path = os.path.join(self.temp_usable_dir, 'testmodel_first.py') - self.second_model_path = os.path.join(self.temp_usable_dir, 'testmodel_second.py') - - self.base_source = """from sqlalchemy import *\nmeta = MetaData()\n""" - self.model_source = """ -User = Table('User', meta, - Column('id', Integer, primary_key=True), - Column('login', Unicode(40)), - Column('passwd', String(40)), -)""" - - self.repo = repository.Repository.create(self.repo_path, 'repo') - self.pyscript = PythonScript.create(self.script_path) - sys.modules.pop('testmodel_first', None) - sys.modules.pop('testmodel_second', None) - - def write_file(self, path, contents): - f = open(path, 'w') - f.write(contents) - f.close() - - -class TestSqlScript(fixture.Pathed, fixture.DB): - - @fixture.usedb() - def test_error(self): - """Test if exception is raised on wrong script source""" - src = self.tmp() - - f = open(src, 'w') - f.write("""foobar""") - f.close() - - sqls = SqlScript(src) - self.assertRaises(Exception, sqls.run, self.engine) - - @fixture.usedb() - def test_success(self): - """Test sucessful SQL execution""" - # cleanup and prepare python script - tmp_sql_table.metadata.drop_all(self.engine, checkfirst=True) - script_path = self.tmp_py() - pyscript = PythonScript.create(script_path) - - # populate python script - contents = open(script_path, 'r').read() - contents = contents.replace("pass", "tmp_sql_table.create(migrate_engine)") - contents = 'from migrate.tests.fixture.models import tmp_sql_table\n' + contents - f = open(script_path, 'w') - f.write(contents) - f.close() - - # write SQL script from python script preview - pyscript = PythonScript(script_path) - src = self.tmp() - f = open(src, 'w') - f.write(pyscript.preview_sql(self.url, 1)) - f.close() - - # run the change - sqls = SqlScript(src) - sqls.run(self.engine) - tmp_sql_table.metadata.drop_all(self.engine, checkfirst=True) - - @fixture.usedb() - def test_transaction_management_statements(self): - """ - Test that we can successfully execute SQL scripts with transaction - management statements. - """ - for script_pattern in ( - "BEGIN TRANSACTION; %s; COMMIT;", - "BEGIN; %s; END TRANSACTION;", - "/* comment */BEGIN TRANSACTION; %s; /* comment */COMMIT;", - "/* comment */ BEGIN TRANSACTION; %s; /* comment */ COMMIT;", - """ --- comment -BEGIN TRANSACTION; - -%s; - --- comment -COMMIT;""", - ): - - test_statement = ("CREATE TABLE TEST1 (field1 int); " - "DROP TABLE TEST1") - script = script_pattern % test_statement - src = self.tmp() - - with open(src, 'wt') as f: - f.write(script) - - sqls = SqlScript(src) - sqls.run(self.engine) diff --git a/migrate/tests/versioning/test_shell.py b/migrate/tests/versioning/test_shell.py deleted file mode 100644 index 001efcf..0000000 --- a/migrate/tests/versioning/test_shell.py +++ /dev/null @@ -1,574 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import sys -import tempfile - -import six -from six.moves import cStringIO -from sqlalchemy import MetaData, Table - -from migrate.exceptions import * -from migrate.versioning.repository import Repository -from migrate.versioning import genmodel, shell, api -from migrate.tests.fixture import Shell, DB, usedb -from migrate.tests.fixture import models - - -class TestShellCommands(Shell): - """Tests migrate.py commands""" - - def test_help(self): - """Displays default help dialog""" - self.assertEqual(self.env.run('migrate -h').returncode, 0) - self.assertEqual(self.env.run('migrate --help').returncode, 0) - self.assertEqual(self.env.run('migrate help').returncode, 0) - - def test_help_commands(self): - """Display help on a specific command""" - # we can only test that we get some output - for cmd in api.__all__: - result = self.env.run('migrate help %s' % cmd) - self.assertTrue(isinstance(result.stdout, six.string_types)) - self.assertTrue(result.stdout) - self.assertFalse(result.stderr) - - def test_shutdown_logging(self): - """Try to shutdown logging output""" - repos = self.tmp_repos() - result = self.env.run('migrate create %s repository_name' % repos) - result = self.env.run('migrate version %s --disable_logging' % repos) - self.assertEqual(result.stdout, '') - result = self.env.run('migrate version %s -q' % repos) - self.assertEqual(result.stdout, '') - - # TODO: assert logging messages to 0 - shell.main(['version', repos], logging=False) - - def test_main_with_runpy(self): - if sys.version_info[:2] == (2, 4): - self.skipTest("runpy is not part of python2.4") - from runpy import run_module - try: - original = sys.argv - sys.argv=['X','--help'] - - run_module('migrate.versioning.shell', run_name='__main__') - - finally: - sys.argv = original - - def _check_error(self,args,code,expected,**kw): - original = sys.stderr - try: - actual = cStringIO() - sys.stderr = actual - try: - shell.main(args,**kw) - except SystemExit as e: - self.assertEqual(code,e.args[0]) - else: - self.fail('No exception raised') - finally: - sys.stderr = original - actual = actual.getvalue() - self.assertTrue(expected in actual,'%r not in:\n"""\n%s\n"""'%(expected,actual)) - - def test_main(self): - """Test main() function""" - repos = self.tmp_repos() - shell.main(['help']) - shell.main(['help', 'create']) - shell.main(['create', 'repo_name', '--preview_sql'], repository=repos) - shell.main(['version', '--', '--repository=%s' % repos]) - shell.main(['version', '-d', '--repository=%s' % repos, '--version=2']) - - self._check_error(['foobar'],2,'error: Invalid command foobar') - self._check_error(['create', 'f', 'o', 'o'],2,'error: Too many arguments for command create: o') - self._check_error(['create'],2,'error: Not enough arguments for command create: name, repository not specified') - self._check_error(['create', 'repo_name'],2,'already exists', repository=repos) - - def test_create(self): - """Repositories are created successfully""" - repos = self.tmp_repos() - - # Creating a file that doesn't exist should succeed - result = self.env.run('migrate create %s repository_name' % repos) - - # Files should actually be created - self.assertTrue(os.path.exists(repos)) - - # The default table should not be None - repos_ = Repository(repos) - self.assertNotEqual(repos_.config.get('db_settings', 'version_table'), 'None') - - # Can't create it again: it already exists - result = self.env.run('migrate create %s repository_name' % repos, - expect_error=True) - self.assertEqual(result.returncode, 2) - - def test_script(self): - """We can create a migration script via the command line""" - repos = self.tmp_repos() - result = self.env.run('migrate create %s repository_name' % repos) - - result = self.env.run('migrate script --repository=%s Desc' % repos) - self.assertTrue(os.path.exists('%s/versions/001_Desc.py' % repos)) - - result = self.env.run('migrate script More %s' % repos) - self.assertTrue(os.path.exists('%s/versions/002_More.py' % repos)) - - result = self.env.run('migrate script "Some Random name" %s' % repos) - self.assertTrue(os.path.exists('%s/versions/003_Some_Random_name.py' % repos)) - - def test_script_sql(self): - """We can create a migration sql script via the command line""" - repos = self.tmp_repos() - result = self.env.run('migrate create %s repository_name' % repos) - - result = self.env.run('migrate script_sql mydb foo %s' % repos) - self.assertTrue(os.path.exists('%s/versions/001_foo_mydb_upgrade.sql' % repos)) - self.assertTrue(os.path.exists('%s/versions/001_foo_mydb_downgrade.sql' % repos)) - - # Test creating a second - result = self.env.run('migrate script_sql postgres foo --repository=%s' % repos) - self.assertTrue(os.path.exists('%s/versions/002_foo_postgres_upgrade.sql' % repos)) - self.assertTrue(os.path.exists('%s/versions/002_foo_postgres_downgrade.sql' % repos)) - - # TODO: test --previews - - def test_manage(self): - """Create a project management script""" - script = self.tmp_py() - self.assertTrue(not os.path.exists(script)) - - # No attempt is made to verify correctness of the repository path here - result = self.env.run('migrate manage %s --repository=/bla/' % script) - self.assertTrue(os.path.exists(script)) - - -class TestShellRepository(Shell): - """Shell commands on an existing repository/python script""" - - def setUp(self): - """Create repository, python change script""" - super(TestShellRepository, self).setUp() - self.path_repos = self.tmp_repos() - result = self.env.run('migrate create %s repository_name' % self.path_repos) - - def test_version(self): - """Correctly detect repository version""" - # Version: 0 (no scripts yet); successful execution - result = self.env.run('migrate version --repository=%s' % self.path_repos) - self.assertEqual(result.stdout.strip(), "0") - - # Also works as a positional param - result = self.env.run('migrate version %s' % self.path_repos) - self.assertEqual(result.stdout.strip(), "0") - - # Create a script and version should increment - result = self.env.run('migrate script Desc %s' % self.path_repos) - result = self.env.run('migrate version %s' % self.path_repos) - self.assertEqual(result.stdout.strip(), "1") - - def test_source(self): - """Correctly fetch a script's source""" - result = self.env.run('migrate script Desc --repository=%s' % self.path_repos) - - filename = '%s/versions/001_Desc.py' % self.path_repos - source = open(filename).read() - self.assertTrue(source.find('def upgrade') >= 0) - - # Version is now 1 - result = self.env.run('migrate version %s' % self.path_repos) - self.assertEqual(result.stdout.strip(), "1") - - # Output/verify the source of version 1 - result = self.env.run('migrate source 1 --repository=%s' % self.path_repos) - self.assertEqual(result.stdout.strip(), source.strip()) - - # We can also send the source to a file... test that too - result = self.env.run('migrate source 1 %s --repository=%s' % - (filename, self.path_repos)) - self.assertTrue(os.path.exists(filename)) - fd = open(filename) - result = fd.read() - self.assertTrue(result.strip() == source.strip()) - - -class TestShellDatabase(Shell, DB): - """Commands associated with a particular database""" - # We'll need to clean up after ourself, since the shell creates its own txn; - # we need to connect to the DB to see if things worked - - level = DB.CONNECT - - @usedb() - def test_version_control(self): - """Ensure we can set version control on a database""" - path_repos = repos = self.tmp_repos() - url = self.url - result = self.env.run('migrate create %s repository_name' % repos) - - result = self.env.run('migrate drop_version_control %(url)s %(repos)s'\ - % locals(), expect_error=True) - self.assertEqual(result.returncode, 1) - result = self.env.run('migrate version_control %(url)s %(repos)s' % locals()) - - # Clean up - result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals()) - # Attempting to drop vc from a database without it should fail - result = self.env.run('migrate drop_version_control %(url)s %(repos)s'\ - % locals(), expect_error=True) - self.assertEqual(result.returncode, 1) - - @usedb() - def test_wrapped_kwargs(self): - """Commands with default arguments set by manage.py""" - path_repos = repos = self.tmp_repos() - url = self.url - result = self.env.run('migrate create --name=repository_name %s' % repos) - result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals(), expect_error=True) - self.assertEqual(result.returncode, 1) - result = self.env.run('migrate version_control %(url)s %(repos)s' % locals()) - - result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals()) - - @usedb() - def test_version_control_specified(self): - """Ensure we can set version control to a particular version""" - path_repos = self.tmp_repos() - url = self.url - result = self.env.run('migrate create --name=repository_name %s' % path_repos) - result = self.env.run('migrate drop_version_control %(url)s %(path_repos)s' % locals(), expect_error=True) - self.assertEqual(result.returncode, 1) - - # Fill the repository - path_script = self.tmp_py() - version = 2 - for i in range(version): - result = self.env.run('migrate script Desc --repository=%s' % path_repos) - - # Repository version is correct - result = self.env.run('migrate version %s' % path_repos) - self.assertEqual(result.stdout.strip(), str(version)) - - # Apply versioning to DB - result = self.env.run('migrate version_control %(url)s %(path_repos)s %(version)s' % locals()) - - # Test db version number (should start at 2) - result = self.env.run('migrate db_version %(url)s %(path_repos)s' % locals()) - self.assertEqual(result.stdout.strip(), str(version)) - - # Clean up - result = self.env.run('migrate drop_version_control %(url)s %(path_repos)s' % locals()) - - @usedb() - def test_upgrade(self): - """Can upgrade a versioned database""" - # Create a repository - repos_name = 'repos_name' - repos_path = self.tmp() - result = self.env.run('migrate create %(repos_path)s %(repos_name)s' % locals()) - self.assertEqual(self.run_version(repos_path), 0) - - # Version the DB - result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True) - result = self.env.run('migrate version_control %s %s' % (self.url, repos_path)) - - # Upgrades with latest version == 0 - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - result = self.env.run('migrate upgrade %s %s 1' % (self.url, repos_path), expect_error=True) - self.assertEqual(result.returncode, 1) - result = self.env.run('migrate upgrade %s %s -1' % (self.url, repos_path), expect_error=True) - self.assertEqual(result.returncode, 2) - - # Add a script to the repository; upgrade the db - result = self.env.run('migrate script Desc --repository=%s' % (repos_path)) - self.assertEqual(self.run_version(repos_path), 1) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - # Test preview - result = self.env.run('migrate upgrade %s %s 0 --preview_sql' % (self.url, repos_path)) - result = self.env.run('migrate upgrade %s %s 0 --preview_py' % (self.url, repos_path)) - - result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_db_version(self.url, repos_path), 1) - - # Downgrade must have a valid version specified - result = self.env.run('migrate downgrade %s %s' % (self.url, repos_path), expect_error=True) - self.assertEqual(result.returncode, 2) - result = self.env.run('migrate downgrade %s %s -1' % (self.url, repos_path), expect_error=True) - self.assertEqual(result.returncode, 2) - result = self.env.run('migrate downgrade %s %s 2' % (self.url, repos_path), expect_error=True) - self.assertEqual(result.returncode, 2) - self.assertEqual(self.run_db_version(self.url, repos_path), 1) - - result = self.env.run('migrate downgrade %s %s 0' % (self.url, repos_path)) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - result = self.env.run('migrate downgrade %s %s 1' % (self.url, repos_path), expect_error=True) - self.assertEqual(result.returncode, 2) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path)) - - def _run_test_sqlfile(self, upgrade_script, downgrade_script): - # TODO: add test script that checks if db really changed - repos_path = self.tmp() - repos_name = 'repos' - - result = self.env.run('migrate create %s %s' % (repos_path, repos_name)) - result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True) - result = self.env.run('migrate version_control %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_version(repos_path), 0) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - beforeCount = len(os.listdir(os.path.join(repos_path, 'versions'))) # hmm, this number changes sometimes based on running from svn - result = self.env.run('migrate script_sql %s --repository=%s' % ('postgres', repos_path)) - self.assertEqual(self.run_version(repos_path), 1) - self.assertEqual(len(os.listdir(os.path.join(repos_path, 'versions'))), beforeCount + 2) - - open('%s/versions/001_postgres_upgrade.sql' % repos_path, 'a').write(upgrade_script) - open('%s/versions/001_postgres_downgrade.sql' % repos_path, 'a').write(downgrade_script) - - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - self.assertRaises(Exception, self.engine.text('select * from t_table').execute) - - result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_db_version(self.url, repos_path), 1) - self.engine.text('select * from t_table').execute() - - result = self.env.run('migrate downgrade %s %s 0' % (self.url, repos_path)) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - self.assertRaises(Exception, self.engine.text('select * from t_table').execute) - - # The tests below are written with some postgres syntax, but the stuff - # being tested (.sql files) ought to work with any db. - @usedb(supported='postgres') - def test_sqlfile(self): - upgrade_script = """ - create table t_table ( - id serial, - primary key(id) - ); - """ - downgrade_script = """ - drop table t_table; - """ - self.meta.drop_all() - self._run_test_sqlfile(upgrade_script, downgrade_script) - - @usedb(supported='postgres') - def test_sqlfile_comment(self): - upgrade_script = """ - -- Comments in SQL break postgres autocommit - create table t_table ( - id serial, - primary key(id) - ); - """ - downgrade_script = """ - -- Comments in SQL break postgres autocommit - drop table t_table; - """ - self._run_test_sqlfile(upgrade_script, downgrade_script) - - @usedb() - def test_command_test(self): - repos_name = 'repos_name' - repos_path = self.tmp() - - result = self.env.run('migrate create repository_name --repository=%s' % repos_path) - result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True) - result = self.env.run('migrate version_control %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_version(repos_path), 0) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - # Empty script should succeed - result = self.env.run('migrate script Desc %s' % repos_path) - result = self.env.run('migrate test %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_version(repos_path), 1) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - # Error script should fail - script_path = self.tmp_py() - script_text=''' - from sqlalchemy import * - from migrate import * - - def upgrade(): - print 'fgsfds' - raise Exception() - - def downgrade(): - print 'sdfsgf' - raise Exception() - '''.replace("\n ", "\n") - file = open(script_path, 'w') - file.write(script_text) - file.close() - - result = self.env.run('migrate test %s %s bla' % (self.url, repos_path), expect_error=True) - self.assertEqual(result.returncode, 2) - self.assertEqual(self.run_version(repos_path), 1) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - # Nonempty script using migrate_engine should succeed - script_path = self.tmp_py() - script_text = ''' - from sqlalchemy import * - from migrate import * - - from migrate.changeset import schema - - meta = MetaData(migrate_engine) - account = Table('account', meta, - Column('id', Integer, primary_key=True), - Column('login', Text), - Column('passwd', Text), - ) - def upgrade(): - # Upgrade operations go here. Don't create your own engine; use the engine - # named 'migrate_engine' imported from migrate. - meta.create_all() - - def downgrade(): - # Operations to reverse the above upgrade go here. - meta.drop_all() - '''.replace("\n ", "\n") - file = open(script_path, 'w') - file.write(script_text) - file.close() - result = self.env.run('migrate test %s %s' % (self.url, repos_path)) - self.assertEqual(self.run_version(repos_path), 1) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - @usedb() - def test_rundiffs_in_shell(self): - # This is a variant of the test_schemadiff tests but run through the shell level. - # These shell tests are hard to debug (since they keep forking processes) - # so they shouldn't replace the lower-level tests. - repos_name = 'repos_name' - repos_path = self.tmp() - script_path = self.tmp_py() - model_module = 'migrate.tests.fixture.models:meta_rundiffs' - old_model_module = 'migrate.tests.fixture.models:meta_old_rundiffs' - - # Create empty repository. - self.meta = MetaData(self.engine) - self.meta.reflect() - self.meta.drop_all() # in case junk tables are lying around in the test database - - result = self.env.run( - 'migrate create %s %s' % (repos_path, repos_name), - expect_stderr=True) - result = self.env.run( - 'migrate drop_version_control %s %s' % (self.url, repos_path), - expect_stderr=True, expect_error=True) - result = self.env.run( - 'migrate version_control %s %s' % (self.url, repos_path), - expect_stderr=True) - self.assertEqual(self.run_version(repos_path), 0) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) - - # Setup helper script. - result = self.env.run( - 'migrate manage %s --repository=%s --url=%s --model=%s'\ - % (script_path, repos_path, self.url, model_module), - expect_stderr=True) - self.assertTrue(os.path.exists(script_path)) - - # Model is defined but database is empty. - result = self.env.run('migrate compare_model_to_db %s %s --model=%s' \ - % (self.url, repos_path, model_module), expect_stderr=True) - self.assertTrue( - "tables missing from database: tmp_account_rundiffs" - in result.stdout) - - # Test Deprecation - result = self.env.run('migrate compare_model_to_db %s %s --model=%s' \ - % (self.url, repos_path, model_module.replace(":", ".")), - expect_stderr=True, expect_error=True) - self.assertEqual(result.returncode, 0) - self.assertTrue( - "tables missing from database: tmp_account_rundiffs" - in result.stdout) - - # Update db to latest model. - result = self.env.run('migrate update_db_from_model %s %s %s'\ - % (self.url, repos_path, model_module), expect_stderr=True) - self.assertEqual(self.run_version(repos_path), 0) - self.assertEqual(self.run_db_version(self.url, repos_path), 0) # version did not get bumped yet because new version not yet created - - result = self.env.run('migrate compare_model_to_db %s %s %s'\ - % (self.url, repos_path, model_module), expect_stderr=True) - self.assertTrue("No schema diffs" in result.stdout) - - result = self.env.run( - 'migrate drop_version_control %s %s' % (self.url, repos_path), - expect_stderr=True, expect_error=True) - result = self.env.run( - 'migrate version_control %s %s' % (self.url, repos_path), - expect_stderr=True) - - result = self.env.run( - 'migrate create_model %s %s' % (self.url, repos_path), - expect_stderr=True) - temp_dict = dict() - six.exec_(result.stdout, temp_dict) - - # TODO: breaks on SA06 and SA05 - in need of total refactor - use different approach - - # TODO: compare whole table - self.compare_columns_equal(models.tmp_account_rundiffs.c, temp_dict['tmp_account_rundiffs'].c, ['type']) - ##self.assertTrue("""tmp_account_rundiffs = Table('tmp_account_rundiffs', meta, - ##Column('id', Integer(), primary_key=True, nullable=False), - ##Column('login', String(length=None, convert_unicode=False, assert_unicode=None)), - ##Column('passwd', String(length=None, convert_unicode=False, assert_unicode=None))""" in result.stdout) - - ## We're happy with db changes, make first db upgrade script to go from version 0 -> 1. - #result = self.env.run('migrate make_update_script_for_model', expect_error=True, expect_stderr=True) - #self.assertTrue('Not enough arguments' in result.stderr) - - #result_script = self.env.run('migrate make_update_script_for_model %s %s %s %s'\ - #% (self.url, repos_path, old_model_module, model_module)) - #self.assertEqualIgnoreWhitespace(result_script.stdout, - #'''from sqlalchemy import * - #from migrate import * - - #from migrate.changeset import schema - - #meta = MetaData() - #tmp_account_rundiffs = Table('tmp_account_rundiffs', meta, - #Column('id', Integer(), primary_key=True, nullable=False), - #Column('login', Text(length=None, convert_unicode=False, assert_unicode=None, unicode_error=None, _warn_on_bytestring=False)), - #Column('passwd', Text(length=None, convert_unicode=False, assert_unicode=None, unicode_error=None, _warn_on_bytestring=False)), - #) - - #def upgrade(migrate_engine): - ## Upgrade operations go here. Don't create your own engine; bind migrate_engine - ## to your metadata - #meta.bind = migrate_engine - #tmp_account_rundiffs.create() - - #def downgrade(migrate_engine): - ## Operations to reverse the above upgrade go here. - #meta.bind = migrate_engine - #tmp_account_rundiffs.drop()''') - - ## Save the upgrade script. - #result = self.env.run('migrate script Desc %s' % repos_path) - #upgrade_script_path = '%s/versions/001_Desc.py' % repos_path - #open(upgrade_script_path, 'w').write(result_script.stdout) - - #result = self.env.run('migrate compare_model_to_db %s %s %s'\ - #% (self.url, repos_path, model_module)) - #self.assertTrue("No schema diffs" in result.stdout) - - self.meta.drop_all() # in case junk tables are lying around in the test database diff --git a/migrate/tests/versioning/test_template.py b/migrate/tests/versioning/test_template.py deleted file mode 100644 index a079d8b..0000000 --- a/migrate/tests/versioning/test_template.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -import os -import shutil - -import migrate.versioning.templates -from migrate.versioning.template import * -from migrate.versioning import api - -from migrate.tests import fixture - - -class TestTemplate(fixture.Pathed): - def test_templates(self): - """We can find the path to all repository templates""" - path = str(Template()) - self.assertTrue(os.path.exists(path)) - - def test_repository(self): - """We can find the path to the default repository""" - path = Template().get_repository() - self.assertTrue(os.path.exists(path)) - - def test_script(self): - """We can find the path to the default migration script""" - path = Template().get_script() - self.assertTrue(os.path.exists(path)) - - def test_custom_templates_and_themes(self): - """Users can define their own templates with themes""" - new_templates_dir = os.path.join(self.temp_usable_dir, 'templates') - manage_tmpl_file = os.path.join(new_templates_dir, 'manage/custom.py_tmpl') - repository_tmpl_file = os.path.join(new_templates_dir, 'repository/custom/README') - script_tmpl_file = os.path.join(new_templates_dir, 'script/custom.py_tmpl') - sql_script_tmpl_file = os.path.join(new_templates_dir, 'sql_script/custom.py_tmpl') - - MANAGE_CONTENTS = 'print "manage.py"' - README_CONTENTS = 'MIGRATE README!' - SCRIPT_FILE_CONTENTS = 'print "script.py"' - new_repo_dest = self.tmp_repos() - new_manage_dest = self.tmp_py() - - # make new templates dir - shutil.copytree(migrate.versioning.templates.__path__[0], new_templates_dir) - shutil.copytree(os.path.join(new_templates_dir, 'repository/default'), - os.path.join(new_templates_dir, 'repository/custom')) - - # edit templates - f = open(manage_tmpl_file, 'w').write(MANAGE_CONTENTS) - f = open(repository_tmpl_file, 'w').write(README_CONTENTS) - f = open(script_tmpl_file, 'w').write(SCRIPT_FILE_CONTENTS) - f = open(sql_script_tmpl_file, 'w').write(SCRIPT_FILE_CONTENTS) - - # create repository, manage file and python script - kw = {} - kw['templates_path'] = new_templates_dir - kw['templates_theme'] = 'custom' - api.create(new_repo_dest, 'repo_name', **kw) - api.script('test', new_repo_dest, **kw) - api.script_sql('postgres', 'foo', new_repo_dest, **kw) - api.manage(new_manage_dest, **kw) - - # assert changes - self.assertEqual(open(new_manage_dest).read(), MANAGE_CONTENTS) - self.assertEqual(open(os.path.join(new_repo_dest, 'manage.py')).read(), MANAGE_CONTENTS) - self.assertEqual(open(os.path.join(new_repo_dest, 'README')).read(), README_CONTENTS) - self.assertEqual(open(os.path.join(new_repo_dest, 'versions/001_test.py')).read(), SCRIPT_FILE_CONTENTS) - self.assertEqual(open(os.path.join(new_repo_dest, 'versions/002_foo_postgres_downgrade.sql')).read(), SCRIPT_FILE_CONTENTS) - self.assertEqual(open(os.path.join(new_repo_dest, 'versions/002_foo_postgres_upgrade.sql')).read(), SCRIPT_FILE_CONTENTS) diff --git a/migrate/tests/versioning/test_util.py b/migrate/tests/versioning/test_util.py deleted file mode 100644 index 21e3f27..0000000 --- a/migrate/tests/versioning/test_util.py +++ /dev/null @@ -1,139 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os - -from sqlalchemy import * - -from migrate.exceptions import MigrateDeprecationWarning -from migrate.tests import fixture -from migrate.tests.fixture.warnings import catch_warnings -from migrate.versioning.util import * -from migrate.versioning import api - -import warnings - -class TestUtil(fixture.Pathed): - - def test_construct_engine(self): - """Construct engine the smart way""" - url = 'sqlite://' - - engine = construct_engine(url) - self.assertTrue(engine.name == 'sqlite') - - # keyword arg - engine = construct_engine(url, engine_arg_encoding='utf-8') - self.assertEqual(engine.dialect.encoding, 'utf-8') - - # dict - engine = construct_engine(url, engine_dict={'encoding': 'utf-8'}) - self.assertEqual(engine.dialect.encoding, 'utf-8') - - # engine parameter - engine_orig = create_engine('sqlite://') - engine = construct_engine(engine_orig) - self.assertEqual(engine, engine_orig) - - # test precedance - engine = construct_engine(url, engine_dict={'encoding': 'iso-8859-1'}, - engine_arg_encoding='utf-8') - self.assertEqual(engine.dialect.encoding, 'utf-8') - - # deprecated echo=True parameter - try: - # py 2.4 compatibility :-/ - cw = catch_warnings(record=True) - w = cw.__enter__() - - warnings.simplefilter("always") - engine = construct_engine(url, echo='True') - self.assertTrue(engine.echo) - - self.assertEqual(len(w),1) - self.assertTrue(issubclass(w[-1].category, - MigrateDeprecationWarning)) - self.assertEqual( - 'echo=True parameter is deprecated, pass ' - 'engine_arg_echo=True or engine_dict={"echo": True}', - str(w[-1].message)) - - finally: - cw.__exit__() - - # unsupported argument - self.assertRaises(ValueError, construct_engine, 1) - - def test_passing_engine(self): - repo = self.tmp_repos() - api.create(repo, 'temp') - api.script('First Version', repo) - engine = construct_engine('sqlite:///:memory:') - - api.version_control(engine, repo) - api.upgrade(engine, repo) - - def test_asbool(self): - """test asbool parsing""" - result = asbool(True) - self.assertEqual(result, True) - - result = asbool(False) - self.assertEqual(result, False) - - result = asbool('y') - self.assertEqual(result, True) - - result = asbool('n') - self.assertEqual(result, False) - - self.assertRaises(ValueError, asbool, 'test') - self.assertRaises(ValueError, asbool, object) - - - def test_load_model(self): - """load model from dotted name""" - model_path = os.path.join(self.temp_usable_dir, 'test_load_model.py') - - f = open(model_path, 'w') - f.write("class FakeFloat(int): pass") - f.close() - - try: - # py 2.4 compatibility :-/ - cw = catch_warnings(record=True) - w = cw.__enter__() - - warnings.simplefilter("always") - - # deprecated spelling - FakeFloat = load_model('test_load_model.FakeFloat') - self.assertTrue(isinstance(FakeFloat(), int)) - - self.assertEqual(len(w),1) - self.assertTrue(issubclass(w[-1].category, - MigrateDeprecationWarning)) - self.assertEqual( - 'model should be in form of module.model:User ' - 'and not module.model.User', - str(w[-1].message)) - - finally: - cw.__exit__() - - FakeFloat = load_model('test_load_model:FakeFloat') - self.assertTrue(isinstance(FakeFloat(), int)) - - FakeFloat = load_model(FakeFloat) - self.assertTrue(isinstance(FakeFloat(), int)) - - def test_guess_obj_type(self): - """guess object type from string""" - result = guess_obj_type('7') - self.assertEqual(result, 7) - - result = guess_obj_type('y') - self.assertEqual(result, True) - - result = guess_obj_type('test') - self.assertEqual(result, 'test') diff --git a/migrate/tests/versioning/test_version.py b/migrate/tests/versioning/test_version.py deleted file mode 100644 index 286dd59..0000000 --- a/migrate/tests/versioning/test_version.py +++ /dev/null @@ -1,186 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from migrate.exceptions import * -from migrate.versioning.version import * - -from migrate.tests import fixture - - -class TestVerNum(fixture.Base): - def test_invalid(self): - """Disallow invalid version numbers""" - versions = ('-1', -1, 'Thirteen', '') - for version in versions: - self.assertRaises(ValueError, VerNum, version) - - def test_str(self): - """Test str and repr version numbers""" - self.assertEqual(str(VerNum(2)), '2') - self.assertEqual(repr(VerNum(2)), '<VerNum(2)>') - - def test_is(self): - """Two version with the same number should be equal""" - a = VerNum(1) - b = VerNum(1) - self.assertTrue(a is b) - - self.assertEqual(VerNum(VerNum(2)), VerNum(2)) - - def test_add(self): - self.assertEqual(VerNum(1) + VerNum(1), VerNum(2)) - self.assertEqual(VerNum(1) + 1, 2) - self.assertEqual(VerNum(1) + 1, '2') - self.assertTrue(isinstance(VerNum(1) + 1, VerNum)) - - def test_sub(self): - self.assertEqual(VerNum(1) - 1, 0) - self.assertTrue(isinstance(VerNum(1) - 1, VerNum)) - self.assertRaises(ValueError, lambda: VerNum(0) - 1) - - def test_eq(self): - """Two versions are equal""" - self.assertEqual(VerNum(1), VerNum('1')) - self.assertEqual(VerNum(1), 1) - self.assertEqual(VerNum(1), '1') - self.assertNotEqual(VerNum(1), 2) - - def test_ne(self): - self.assertTrue(VerNum(1) != 2) - self.assertFalse(VerNum(1) != 1) - - def test_lt(self): - self.assertFalse(VerNum(1) < 1) - self.assertTrue(VerNum(1) < 2) - self.assertFalse(VerNum(2) < 1) - - def test_le(self): - self.assertTrue(VerNum(1) <= 1) - self.assertTrue(VerNum(1) <= 2) - self.assertFalse(VerNum(2) <= 1) - - def test_gt(self): - self.assertFalse(VerNum(1) > 1) - self.assertFalse(VerNum(1) > 2) - self.assertTrue(VerNum(2) > 1) - - def test_ge(self): - self.assertTrue(VerNum(1) >= 1) - self.assertTrue(VerNum(2) >= 1) - self.assertFalse(VerNum(1) >= 2) - - def test_int_cast(self): - ver = VerNum(3) - # test __int__ - self.assertEqual(int(ver), 3) - # test __index__: range() doesn't call __int__ - self.assertEqual(list(range(ver, ver)), []) - - -class TestVersion(fixture.Pathed): - - def setUp(self): - super(TestVersion, self).setUp() - - def test_str_to_filename(self): - self.assertEqual(str_to_filename(''), '') - self.assertEqual(str_to_filename('__'), '_') - self.assertEqual(str_to_filename('a'), 'a') - self.assertEqual(str_to_filename('Abc Def'), 'Abc_Def') - self.assertEqual(str_to_filename('Abc "D" Ef'), 'Abc_D_Ef') - self.assertEqual(str_to_filename("Abc's Stuff"), 'Abc_s_Stuff') - self.assertEqual(str_to_filename("a b"), 'a_b') - self.assertEqual(str_to_filename("a.b to c"), 'a_b_to_c') - - def test_collection(self): - """Let's see how we handle versions collection""" - coll = Collection(self.temp_usable_dir) - coll.create_new_python_version("foo bar") - coll.create_new_sql_version("postgres", "foo bar") - coll.create_new_sql_version("sqlite", "foo bar") - coll.create_new_python_version("") - - self.assertEqual(coll.latest, 4) - self.assertEqual(len(coll.versions), 4) - self.assertEqual(coll.version(4), coll.version(coll.latest)) - # Check for non-existing version - self.assertRaises(VersionNotFoundError, coll.version, 5) - # Check for the current version - self.assertEqual('4', coll.version(4).version) - - coll2 = Collection(self.temp_usable_dir) - self.assertEqual(coll.versions, coll2.versions) - - Collection.clear() - - def test_old_repository(self): - open(os.path.join(self.temp_usable_dir, '1'), 'w') - self.assertRaises(Exception, Collection, self.temp_usable_dir) - - #TODO: def test_collection_unicode(self): - # pass - - def test_create_new_python_version(self): - coll = Collection(self.temp_usable_dir) - coll.create_new_python_version("'") - - ver = coll.version() - self.assertTrue(ver.script().source()) - - def test_create_new_sql_version(self): - coll = Collection(self.temp_usable_dir) - coll.create_new_sql_version("sqlite", "foo bar") - - ver = coll.version() - ver_up = ver.script('sqlite', 'upgrade') - ver_down = ver.script('sqlite', 'downgrade') - ver_up.source() - ver_down.source() - - def test_selection(self): - """Verify right sql script is selected""" - - # Create empty directory. - path = self.tmp_repos() - os.mkdir(path) - - # Create files -- files must be present or you'll get an exception later. - python_file = '001_initial_.py' - sqlite_upgrade_file = '001_sqlite_upgrade.sql' - default_upgrade_file = '001_default_upgrade.sql' - for file_ in [sqlite_upgrade_file, default_upgrade_file, python_file]: - filepath = '%s/%s' % (path, file_) - open(filepath, 'w').close() - - ver = Version(1, path, [sqlite_upgrade_file]) - self.assertEqual(os.path.basename(ver.script('sqlite', 'upgrade').path), sqlite_upgrade_file) - - ver = Version(1, path, [default_upgrade_file]) - self.assertEqual(os.path.basename(ver.script('default', 'upgrade').path), default_upgrade_file) - - ver = Version(1, path, [sqlite_upgrade_file, default_upgrade_file]) - self.assertEqual(os.path.basename(ver.script('sqlite', 'upgrade').path), sqlite_upgrade_file) - - ver = Version(1, path, [sqlite_upgrade_file, default_upgrade_file, python_file]) - self.assertEqual(os.path.basename(ver.script('postgres', 'upgrade').path), default_upgrade_file) - - ver = Version(1, path, [sqlite_upgrade_file, python_file]) - self.assertEqual(os.path.basename(ver.script('postgres', 'upgrade').path), python_file) - - def test_bad_version(self): - ver = Version(1, self.temp_usable_dir, []) - self.assertRaises(ScriptError, ver.add_script, '123.sql') - - # tests bad ibm_db_sa filename - ver = Version(123, self.temp_usable_dir, []) - self.assertRaises(ScriptError, ver.add_script, - '123_ibm_db_sa_upgrade.sql') - - # tests that the name is ok but the script doesn't exist - self.assertRaises(InvalidScriptError, ver.add_script, - '123_test_ibm_db_sa_upgrade.sql') - - pyscript = os.path.join(self.temp_usable_dir, 'bla.py') - open(pyscript, 'w') - ver.add_script(pyscript) - self.assertRaises(ScriptError, ver.add_script, 'bla.py') |