summaryrefslogtreecommitdiff
path: root/migrate/tests
diff options
context:
space:
mode:
Diffstat (limited to 'migrate/tests')
-rw-r--r--migrate/tests/__init__.py16
-rw-r--r--migrate/tests/changeset/__init__.py0
-rw-r--r--migrate/tests/changeset/databases/__init__.py0
-rw-r--r--migrate/tests/changeset/databases/test_ibmdb2.py32
-rw-r--r--migrate/tests/changeset/test_changeset.py976
-rw-r--r--migrate/tests/changeset/test_constraint.py299
-rw-r--r--migrate/tests/fixture/__init__.py18
-rw-r--r--migrate/tests/fixture/base.py26
-rw-r--r--migrate/tests/fixture/database.py203
-rw-r--r--migrate/tests/fixture/models.py14
-rw-r--r--migrate/tests/fixture/pathed.py77
-rw-r--r--migrate/tests/fixture/shell.py33
-rw-r--r--migrate/tests/fixture/warnings.py88
-rw-r--r--migrate/tests/integrated/__init__.py0
-rw-r--r--migrate/tests/integrated/test_docs.py18
-rw-r--r--migrate/tests/versioning/__init__.py0
-rw-r--r--migrate/tests/versioning/test_api.py128
-rw-r--r--migrate/tests/versioning/test_cfgparse.py27
-rw-r--r--migrate/tests/versioning/test_database.py13
-rw-r--r--migrate/tests/versioning/test_genmodel.py214
-rw-r--r--migrate/tests/versioning/test_keyedinstance.py45
-rw-r--r--migrate/tests/versioning/test_pathed.py51
-rw-r--r--migrate/tests/versioning/test_repository.py216
-rw-r--r--migrate/tests/versioning/test_runchangeset.py52
-rw-r--r--migrate/tests/versioning/test_schema.py205
-rw-r--r--migrate/tests/versioning/test_schemadiff.py227
-rw-r--r--migrate/tests/versioning/test_script.py305
-rw-r--r--migrate/tests/versioning/test_shell.py574
-rw-r--r--migrate/tests/versioning/test_template.py70
-rw-r--r--migrate/tests/versioning/test_util.py139
-rw-r--r--migrate/tests/versioning/test_version.py186
31 files changed, 0 insertions, 4252 deletions
diff --git a/migrate/tests/__init__.py b/migrate/tests/__init__.py
deleted file mode 100644
index c03fbf4..0000000
--- a/migrate/tests/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# make this package available during imports as long as we support <python2.5
-import sys
-import os
-sys.path.append(os.path.dirname(os.path.abspath(__file__)))
-
-
-from unittest import TestCase
-import migrate
-import six
-
-
-class TestVersionDefined(TestCase):
- def test_version(self):
- """Test for migrate.__version__"""
- self.assertTrue(isinstance(migrate.__version__, six.string_types))
- self.assertTrue(len(migrate.__version__) > 0)
diff --git a/migrate/tests/changeset/__init__.py b/migrate/tests/changeset/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/migrate/tests/changeset/__init__.py
+++ /dev/null
diff --git a/migrate/tests/changeset/databases/__init__.py b/migrate/tests/changeset/databases/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/migrate/tests/changeset/databases/__init__.py
+++ /dev/null
diff --git a/migrate/tests/changeset/databases/test_ibmdb2.py b/migrate/tests/changeset/databases/test_ibmdb2.py
deleted file mode 100644
index 4b3f983..0000000
--- a/migrate/tests/changeset/databases/test_ibmdb2.py
+++ /dev/null
@@ -1,32 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import mock
-
-import six
-
-from migrate.changeset.databases import ibmdb2
-from migrate.tests import fixture
-
-
-class TestIBMDBDialect(fixture.Base):
- """
- Test class for ibmdb2 dialect unit tests which do not require
- a live backend database connection.
- """
-
- def test_is_unique_constraint_with_null_cols_supported(self):
- test_values = {
- '10.1': False,
- '10.4.99': False,
- '10.5': True,
- '10.5.1': True
- }
- for version, supported in six.iteritems(test_values):
- mock_dialect = mock.MagicMock()
- mock_dialect.dbms_ver = version
- self.assertEqual(
- supported,
- ibmdb2.is_unique_constraint_with_null_columns_supported(
- mock_dialect),
- 'Assertion failed on version: %s' % version)
diff --git a/migrate/tests/changeset/test_changeset.py b/migrate/tests/changeset/test_changeset.py
deleted file mode 100644
index c870c52..0000000
--- a/migrate/tests/changeset/test_changeset.py
+++ /dev/null
@@ -1,976 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-import sqlalchemy
-import warnings
-
-from sqlalchemy import *
-
-from migrate import changeset, exceptions
-from migrate.changeset import *
-from migrate.changeset import constraint
-from migrate.changeset.schema import ColumnDelta
-from migrate.tests import fixture
-from migrate.tests.fixture.warnings import catch_warnings
-import six
-
-class TestAddDropColumn(fixture.DB):
- """Test add/drop column through all possible interfaces
- also test for constraints
- """
- level = fixture.DB.CONNECT
- table_name = 'tmp_adddropcol'
- table_name_idx = 'tmp_adddropcol_idx'
- table_int = 0
-
- def _setup(self, url):
- super(TestAddDropColumn, self)._setup(url)
- self.meta = MetaData()
- self.table = Table(self.table_name, self.meta,
- Column('id', Integer, unique=True),
- )
- self.table_idx = Table(
- self.table_name_idx,
- self.meta,
- Column('id', Integer, primary_key=True),
- Column('a', Integer),
- Column('b', Integer),
- Index('test_idx', 'a', 'b')
- )
- self.meta.bind = self.engine
- if self.engine.has_table(self.table.name):
- self.table.drop()
- if self.engine.has_table(self.table_idx.name):
- self.table_idx.drop()
- self.table.create()
- self.table_idx.create()
-
- def _teardown(self):
- if self.engine.has_table(self.table.name):
- self.table.drop()
- if self.engine.has_table(self.table_idx.name):
- self.table_idx.drop()
- self.meta.clear()
- super(TestAddDropColumn,self)._teardown()
-
- def run_(self, create_column_func, drop_column_func, *col_p, **col_k):
- col_name = 'data'
-
- def assert_numcols(num_of_expected_cols):
- # number of cols should be correct in table object and in database
- self.refresh_table(self.table_name)
- result = len(self.table.c)
-
- self.assertEqual(result, num_of_expected_cols),
- if col_k.get('primary_key', None):
- # new primary key: check its length too
- result = len(self.table.primary_key)
- self.assertEqual(result, num_of_expected_cols)
-
- # we have 1 columns and there is no data column
- assert_numcols(1)
- self.assertTrue(getattr(self.table.c, 'data', None) is None)
- if len(col_p) == 0:
- col_p = [String(40)]
- col = Column(col_name, *col_p, **col_k)
- create_column_func(col)
- assert_numcols(2)
- # data column exists
- self.assertTrue(self.table.c.data.type.length, 40)
-
- col2 = self.table.c.data
- drop_column_func(col2)
- assert_numcols(1)
-
- @fixture.usedb()
- def test_undefined(self):
- """Add/drop columns not yet defined in the table"""
- def add_func(col):
- return create_column(col, self.table)
- def drop_func(col):
- return drop_column(col, self.table)
- return self.run_(add_func, drop_func)
-
- @fixture.usedb()
- def test_defined(self):
- """Add/drop columns already defined in the table"""
- def add_func(col):
- self.meta.clear()
- self.table = Table(self.table_name, self.meta,
- Column('id', Integer, primary_key=True),
- col,
- )
- return create_column(col)
- def drop_func(col):
- return drop_column(col)
- return self.run_(add_func, drop_func)
-
- @fixture.usedb()
- def test_method_bound(self):
- """Add/drop columns via column methods; columns bound to a table
- ie. no table parameter passed to function
- """
- def add_func(col):
- self.assertTrue(col.table is None, col.table)
- self.table.append_column(col)
- return col.create()
- def drop_func(col):
- #self.assertTrue(col.table is None,col.table)
- #self.table.append_column(col)
- return col.drop()
- return self.run_(add_func, drop_func)
-
- @fixture.usedb()
- def test_method_notbound(self):
- """Add/drop columns via column methods; columns not bound to a table"""
- def add_func(col):
- return col.create(self.table)
- def drop_func(col):
- return col.drop(self.table)
- return self.run_(add_func, drop_func)
-
- @fixture.usedb()
- def test_tablemethod_obj(self):
- """Add/drop columns via table methods; by column object"""
- def add_func(col):
- return self.table.create_column(col)
- def drop_func(col):
- return self.table.drop_column(col)
- return self.run_(add_func, drop_func)
-
- @fixture.usedb()
- def test_tablemethod_name(self):
- """Add/drop columns via table methods; by column name"""
- def add_func(col):
- # must be bound to table
- self.table.append_column(col)
- return self.table.create_column(col.name)
- def drop_func(col):
- # Not necessarily bound to table
- return self.table.drop_column(col.name)
- return self.run_(add_func, drop_func)
-
- @fixture.usedb()
- def test_byname(self):
- """Add/drop columns via functions; by table object and column name"""
- def add_func(col):
- self.table.append_column(col)
- return create_column(col.name, self.table)
- def drop_func(col):
- return drop_column(col.name, self.table)
- return self.run_(add_func, drop_func)
-
- @fixture.usedb()
- def test_drop_column_not_in_table(self):
- """Drop column by name"""
- def add_func(col):
- return self.table.create_column(col)
- def drop_func(col):
- if SQLA_07:
- self.table._columns.remove(col)
- else:
- self.table.c.remove(col)
- return self.table.drop_column(col.name)
- self.run_(add_func, drop_func)
-
- @fixture.usedb()
- def test_fk(self):
- """Can create columns with foreign keys"""
- # create FK's target
- reftable = Table('tmp_ref', self.meta,
- Column('id', Integer, primary_key=True),
- )
- if self.engine.has_table(reftable.name):
- reftable.drop()
- reftable.create()
-
- # create column with fk
- col = Column('data', Integer, ForeignKey(reftable.c.id, name='testfk'))
- col.create(self.table)
-
- # check if constraint is added
- for cons in self.table.constraints:
- if isinstance(cons, sqlalchemy.schema.ForeignKeyConstraint):
- break
- else:
- self.fail('No constraint found')
-
- # TODO: test on db level if constraints work
-
- if SQLA_07:
- self.assertEqual(reftable.c.id.name,
- list(col.foreign_keys)[0].column.name)
- else:
- self.assertEqual(reftable.c.id.name,
- col.foreign_keys[0].column.name)
-
- if self.engine.name == 'mysql':
- constraint.ForeignKeyConstraint([self.table.c.data],
- [reftable.c.id],
- name='testfk').drop()
- col.drop(self.table)
-
- if self.engine.has_table(reftable.name):
- reftable.drop()
-
- @fixture.usedb(not_supported='sqlite')
- def test_pk(self):
- """Can create columns with primary key"""
- col = Column('data', Integer, nullable=False)
- self.assertRaises(exceptions.InvalidConstraintError,
- col.create, self.table, primary_key_name=True)
- col.create(self.table, primary_key_name='data_pkey')
-
- # check if constraint was added (cannot test on objects)
- self.table.insert(values={'data': 4}).execute()
- try:
- self.table.insert(values={'data': 4}).execute()
- except (sqlalchemy.exc.IntegrityError,
- sqlalchemy.exc.ProgrammingError):
- pass
- else:
- self.fail()
-
- col.drop()
-
- @fixture.usedb(not_supported=['mysql'])
- def test_check(self):
- """Can create columns with check constraint"""
- col = Column('foo',
- Integer,
- sqlalchemy.schema.CheckConstraint('foo > 4'))
- col.create(self.table)
-
- # check if constraint was added (cannot test on objects)
- self.table.insert(values={'foo': 5}).execute()
- try:
- self.table.insert(values={'foo': 3}).execute()
- except (sqlalchemy.exc.IntegrityError,
- sqlalchemy.exc.ProgrammingError):
- pass
- else:
- self.fail()
-
- col.drop()
-
- @fixture.usedb()
- def test_unique_constraint(self):
- self.assertRaises(exceptions.InvalidConstraintError,
- Column('data', Integer, unique=True).create, self.table)
-
- col = Column('data', Integer)
- col.create(self.table, unique_name='data_unique')
-
- # check if constraint was added (cannot test on objects)
- self.table.insert(values={'data': 5}).execute()
- try:
- self.table.insert(values={'data': 5}).execute()
- except (sqlalchemy.exc.IntegrityError,
- sqlalchemy.exc.ProgrammingError):
- pass
- else:
- self.fail()
-
- col.drop(self.table)
-
-# TODO: remove already attached columns with uniques, pks, fks ..
- @fixture.usedb(not_supported=['ibm_db_sa', 'postgresql'])
- def test_drop_column_of_composite_index(self):
- # NOTE(rpodolyaka): postgresql automatically drops a composite index
- # if one of its columns is dropped
- # NOTE(mriedem): DB2 does the same.
- self.table_idx.c.b.drop()
-
- reflected = Table(self.table_idx.name, MetaData(), autoload=True,
- autoload_with=self.engine)
- index = next(iter(reflected.indexes))
- self.assertEquals(['a'], [c.name for c in index.columns])
-
- @fixture.usedb()
- def test_drop_all_columns_of_composite_index(self):
- self.table_idx.c.a.drop()
- self.table_idx.c.b.drop()
-
- reflected = Table(self.table_idx.name, MetaData(), autoload=True,
- autoload_with=self.engine)
- self.assertEquals(0, len(reflected.indexes))
-
- def _check_index(self,expected):
- if 'mysql' in self.engine.name or 'postgres' in self.engine.name:
- for index in tuple(
- Table(self.table.name, MetaData(),
- autoload=True, autoload_with=self.engine).indexes
- ):
- if index.name=='ix_data':
- break
- self.assertEqual(expected,index.unique)
-
- @fixture.usedb()
- def test_index(self):
- col = Column('data', Integer)
- col.create(self.table, index_name='ix_data')
-
- self._check_index(False)
-
- col.drop()
-
- @fixture.usedb()
- def test_index_unique(self):
- # shows how to create a unique index
- col = Column('data', Integer)
- col.create(self.table)
- Index('ix_data', col, unique=True).create(bind=self.engine)
-
- # check if index was added
- self.table.insert(values={'data': 5}).execute()
- try:
- self.table.insert(values={'data': 5}).execute()
- except (sqlalchemy.exc.IntegrityError,
- sqlalchemy.exc.ProgrammingError):
- pass
- else:
- self.fail()
-
- self._check_index(True)
-
- col.drop()
-
- @fixture.usedb()
- def test_server_defaults(self):
- """Can create columns with server_default values"""
- col = Column('data', String(244), server_default='foobar')
- col.create(self.table)
-
- self.table.insert(values={'id': 10}).execute()
- row = self._select_row()
- self.assertEqual(u'foobar', row['data'])
-
- col.drop()
-
- @fixture.usedb()
- def test_populate_default(self):
- """Test populate_default=True"""
- def default():
- return 'foobar'
- col = Column('data', String(244), default=default)
- col.create(self.table, populate_default=True)
-
- self.table.insert(values={'id': 10}).execute()
- row = self._select_row()
- self.assertEqual(u'foobar', row['data'])
-
- col.drop()
-
- # TODO: test sequence
- # TODO: test quoting
- # TODO: test non-autoname constraints
-
- @fixture.usedb()
- def test_drop_doesnt_delete_other_indexes(self):
- # add two indexed columns
- self.table.drop()
- self.meta.clear()
- self.table = Table(
- self.table_name, self.meta,
- Column('id', Integer, primary_key=True),
- Column('d1', String(10), index=True),
- Column('d2', String(10), index=True),
- )
- self.table.create()
-
- # paranoid check
- self.refresh_table()
- self.assertEqual(
- sorted([i.name for i in self.table.indexes]),
- [u'ix_tmp_adddropcol_d1', u'ix_tmp_adddropcol_d2']
- )
-
- # delete one
- self.table.c.d2.drop()
-
- # ensure the other index is still there
- self.refresh_table()
- self.assertEqual(
- sorted([i.name for i in self.table.indexes]),
- [u'ix_tmp_adddropcol_d1']
- )
-
- def _actual_foreign_keys(self):
- from sqlalchemy.schema import ForeignKeyConstraint
- result = []
- for cons in self.table.constraints:
- if isinstance(cons,ForeignKeyConstraint):
- col_names = []
- for col_name in cons.columns:
- if not isinstance(col_name,six.string_types):
- col_name = col_name.name
- col_names.append(col_name)
- result.append(col_names)
- result.sort()
- return result
-
- @fixture.usedb()
- def test_drop_with_foreign_keys(self):
- self.table.drop()
- self.meta.clear()
-
- # create FK's target
- reftable = Table('tmp_ref', self.meta,
- Column('id', Integer, primary_key=True),
- )
- if self.engine.has_table(reftable.name):
- reftable.drop()
- reftable.create()
-
- # add a table with two foreign key columns
- self.table = Table(
- self.table_name, self.meta,
- Column('id', Integer, primary_key=True),
- Column('r1', Integer, ForeignKey('tmp_ref.id', name='test_fk1')),
- Column('r2', Integer, ForeignKey('tmp_ref.id', name='test_fk2')),
- )
- self.table.create()
-
- # paranoid check
- self.assertEqual([['r1'],['r2']],
- self._actual_foreign_keys())
-
- # delete one
- if self.engine.name == 'mysql':
- constraint.ForeignKeyConstraint([self.table.c.r2], [reftable.c.id],
- name='test_fk2').drop()
- self.table.c.r2.drop()
-
- # check remaining foreign key is there
- self.assertEqual([['r1']],
- self._actual_foreign_keys())
-
- @fixture.usedb()
- def test_drop_with_complex_foreign_keys(self):
- from sqlalchemy.schema import ForeignKeyConstraint
- from sqlalchemy.schema import UniqueConstraint
-
- self.table.drop()
- self.meta.clear()
-
- # NOTE(mriedem): DB2 does not currently support unique constraints
- # on nullable columns, so the columns that are used to create the
- # foreign keys here need to be non-nullable for testing with DB2
- # to work.
-
- # create FK's target
- reftable = Table('tmp_ref', self.meta,
- Column('id', Integer, primary_key=True),
- Column('jd', Integer, nullable=False),
- UniqueConstraint('id','jd')
- )
- if self.engine.has_table(reftable.name):
- reftable.drop()
- reftable.create()
-
- # add a table with a complex foreign key constraint
- self.table = Table(
- self.table_name, self.meta,
- Column('id', Integer, primary_key=True),
- Column('r1', Integer, nullable=False),
- Column('r2', Integer, nullable=False),
- ForeignKeyConstraint(['r1','r2'],
- [reftable.c.id,reftable.c.jd],
- name='test_fk')
- )
- self.table.create()
-
- # paranoid check
- self.assertEqual([['r1','r2']],
- self._actual_foreign_keys())
-
- # delete one
- if self.engine.name == 'mysql':
- constraint.ForeignKeyConstraint([self.table.c.r1, self.table.c.r2],
- [reftable.c.id, reftable.c.jd],
- name='test_fk').drop()
- self.table.c.r2.drop()
-
- # check the constraint is gone, since part of it
- # is no longer there - if people hit this,
- # they may be confused, maybe we should raise an error
- # and insist that the constraint is deleted first, separately?
- self.assertEqual([],
- self._actual_foreign_keys())
-
-class TestRename(fixture.DB):
- """Tests for table and index rename methods"""
- level = fixture.DB.CONNECT
- meta = MetaData()
-
- def _setup(self, url):
- super(TestRename, self)._setup(url)
- self.meta.bind = self.engine
-
- @fixture.usedb(not_supported='firebird')
- def test_rename_table(self):
- """Tables can be renamed"""
- c_name = 'col_1'
- table_name1 = 'name_one'
- table_name2 = 'name_two'
- index_name1 = 'x' + table_name1
- index_name2 = 'x' + table_name2
-
- self.meta.clear()
- self.column = Column(c_name, Integer)
- self.table = Table(table_name1, self.meta, self.column)
- self.index = Index(index_name1, self.column, unique=False)
-
- if self.engine.has_table(self.table.name):
- self.table.drop()
- if self.engine.has_table(table_name2):
- tmp = Table(table_name2, self.meta, autoload=True)
- tmp.drop()
- tmp.deregister()
- del tmp
- self.table.create()
-
- def assert_table_name(expected, skip_object_check=False):
- """Refresh a table via autoload
- SA has changed some since this test was written; we now need to do
- meta.clear() upon reloading a table - clear all rather than a
- select few. So, this works only if we're working with one table at
- a time (else, others will vanish too).
- """
- if not skip_object_check:
- # Table object check
- self.assertEqual(self.table.name,expected)
- newname = self.table.name
- else:
- # we know the object's name isn't consistent: just assign it
- newname = expected
- # Table DB check
- self.meta.clear()
- self.table = Table(newname, self.meta, autoload=True)
- self.assertEqual(self.table.name, expected)
-
- def assert_index_name(expected, skip_object_check=False):
- if not skip_object_check:
- # Index object check
- self.assertEqual(self.index.name, expected)
- else:
- # object is inconsistent
- self.index.name = expected
- # TODO: Index DB check
-
- def add_table_to_meta(name):
- # trigger the case where table_name2 needs to be
- # removed from the metadata in ChangesetTable.deregister()
- tmp = Table(name, self.meta, Column(c_name, Integer))
- tmp.create()
- tmp.drop()
-
- try:
- # Table renames
- assert_table_name(table_name1)
- add_table_to_meta(table_name2)
- rename_table(self.table, table_name2)
- assert_table_name(table_name2)
- self.table.rename(table_name1)
- assert_table_name(table_name1)
-
- # test by just the string
- rename_table(table_name1, table_name2, engine=self.engine)
- assert_table_name(table_name2, True) # object not updated
-
- # Index renames
- if self.url.startswith('sqlite') or self.url.startswith('mysql'):
- self.assertRaises(exceptions.NotSupportedError,
- self.index.rename, index_name2)
- else:
- assert_index_name(index_name1)
- rename_index(self.index, index_name2, engine=self.engine)
- assert_index_name(index_name2)
- self.index.rename(index_name1)
- assert_index_name(index_name1)
-
- # test by just the string
- rename_index(index_name1, index_name2, engine=self.engine)
- assert_index_name(index_name2, True)
-
- finally:
- if self.table.exists():
- self.table.drop()
-
-
-class TestColumnChange(fixture.DB):
- level = fixture.DB.CONNECT
- table_name = 'tmp_colchange'
-
- def _setup(self, url):
- super(TestColumnChange, self)._setup(url)
- self.meta = MetaData(self.engine)
- self.table = Table(self.table_name, self.meta,
- Column('id', Integer, primary_key=True),
- Column('data', String(40), server_default=DefaultClause("tluafed"),
- nullable=True),
- )
- if self.table.exists():
- self.table.drop()
- try:
- self.table.create()
- except sqlalchemy.exc.SQLError:
- # SQLite: database schema has changed
- if not self.url.startswith('sqlite://'):
- raise
-
- def _teardown(self):
- if self.table.exists():
- try:
- self.table.drop(self.engine)
- except sqlalchemy.exc.SQLError:
- # SQLite: database schema has changed
- if not self.url.startswith('sqlite://'):
- raise
- super(TestColumnChange, self)._teardown()
-
- @fixture.usedb()
- def test_rename(self):
- """Can rename a column"""
- def num_rows(col, content):
- return len(list(self.table.select(col == content).execute()))
- # Table content should be preserved in changed columns
- content = "fgsfds"
- self.engine.execute(self.table.insert(), data=content, id=42)
- self.assertEqual(num_rows(self.table.c.data, content), 1)
-
- # ...as a function, given a column object and the new name
- alter_column('data', name='data2', table=self.table)
- self.refresh_table()
- alter_column(self.table.c.data2, name='atad')
- self.refresh_table(self.table.name)
- self.assertTrue('data' not in self.table.c.keys())
- self.assertTrue('atad' in self.table.c.keys())
- self.assertEqual(num_rows(self.table.c.atad, content), 1)
-
- # ...as a method, given a new name
- self.table.c.atad.alter(name='data')
- self.refresh_table(self.table.name)
- self.assertTrue('atad' not in self.table.c.keys())
- self.table.c.data # Should not raise exception
- self.assertEqual(num_rows(self.table.c.data, content), 1)
-
- # ...as a function, given a new object
- alter_column(self.table.c.data,
- name = 'atad', type=String(40),
- server_default=self.table.c.data.server_default)
- self.refresh_table(self.table.name)
- self.assertTrue('data' not in self.table.c.keys())
- self.table.c.atad # Should not raise exception
- self.assertEqual(num_rows(self.table.c.atad, content), 1)
-
- # ...as a method, given a new object
- self.table.c.atad.alter(
- name='data',type=String(40),
- server_default=self.table.c.atad.server_default
- )
- self.refresh_table(self.table.name)
- self.assertTrue('atad' not in self.table.c.keys())
- self.table.c.data # Should not raise exception
- self.assertEqual(num_rows(self.table.c.data,content), 1)
-
- @fixture.usedb()
- def test_type(self):
- # Test we can change a column's type
-
- # Just the new type
- self.table.c.data.alter(type=String(43))
- self.refresh_table(self.table.name)
- self.assertTrue(isinstance(self.table.c.data.type, String))
- self.assertEqual(self.table.c.data.type.length, 43)
-
- # Different type
- self.assertTrue(isinstance(self.table.c.id.type, Integer))
- self.assertEqual(self.table.c.id.nullable, False)
-
- # SQLAlchemy 1.1 adds a third state to "autoincrement" called
- # "auto".
- self.assertTrue(self.table.c.id.autoincrement in ('auto', True))
-
- if not self.engine.name == 'firebird':
- self.table.c.id.alter(type=String(20))
- self.assertEqual(self.table.c.id.nullable, False)
-
- # a rule makes sure that autoincrement is set to False
- # when we change off of Integer
- self.assertEqual(self.table.c.id.autoincrement, False)
- self.refresh_table(self.table.name)
- self.assertTrue(isinstance(self.table.c.id.type, String))
-
- # note that after reflection, "autoincrement" is likely
- # to change back to a database-generated value. Should be
- # False or "auto". if True, it's a bug; at least one of these
- # exists prior to SQLAlchemy 1.1.3
-
- @fixture.usedb()
- def test_default(self):
- """Can change a column's server_default value (DefaultClauses only)
- Only DefaultClauses are changed here: others are managed by the
- application / by SA
- """
- self.assertEqual(self.table.c.data.server_default.arg, 'tluafed')
-
- # Just the new default
- default = 'my_default'
- self.table.c.data.alter(server_default=DefaultClause(default))
- self.refresh_table(self.table.name)
- #self.assertEqual(self.table.c.data.server_default.arg,default)
- # TextClause returned by autoload
- self.assertTrue(default in str(self.table.c.data.server_default.arg))
- self.engine.execute(self.table.insert(), id=12)
- row = self._select_row()
- self.assertEqual(row['data'], default)
-
- # Column object
- default = 'your_default'
- self.table.c.data.alter(type=String(40), server_default=DefaultClause(default))
- self.refresh_table(self.table.name)
- self.assertTrue(default in str(self.table.c.data.server_default.arg))
-
- # Drop/remove default
- self.table.c.data.alter(server_default=None)
- self.assertEqual(self.table.c.data.server_default, None)
-
- self.refresh_table(self.table.name)
- # server_default isn't necessarily None for Oracle
- #self.assertTrue(self.table.c.data.server_default is None,self.table.c.data.server_default)
- self.engine.execute(self.table.insert(), id=11)
- row = self.table.select(self.table.c.id == 11).execution_options(autocommit=True).execute().fetchone()
- self.assertTrue(row['data'] is None, row['data'])
-
- @fixture.usedb(not_supported='firebird')
- def test_null(self):
- """Can change a column's null constraint"""
- self.assertEqual(self.table.c.data.nullable, True)
-
- # Full column
- self.table.c.data.alter(type=String(40), nullable=False)
- self.table.nullable = None
- self.refresh_table(self.table.name)
- self.assertEqual(self.table.c.data.nullable, False)
-
- # Just the new status
- self.table.c.data.alter(nullable=True)
- self.refresh_table(self.table.name)
- self.assertEqual(self.table.c.data.nullable, True)
-
- @fixture.usedb()
- def test_alter_deprecated(self):
- try:
- # py 2.4 compatibility :-/
- cw = catch_warnings(record=True)
- w = cw.__enter__()
-
- warnings.simplefilter("always")
- self.table.c.data.alter(Column('data', String(100)))
-
- self.assertEqual(len(w),1)
- self.assertTrue(issubclass(w[-1].category,
- MigrateDeprecationWarning))
- self.assertEqual(
- 'Passing a Column object to alter_column is deprecated. '
- 'Just pass in keyword parameters instead.',
- str(w[-1].message))
- finally:
- cw.__exit__()
-
- @fixture.usedb()
- def test_alter_returns_delta(self):
- """Test if alter constructs return delta"""
-
- delta = self.table.c.data.alter(type=String(100))
- self.assertTrue('type' in delta)
-
- @fixture.usedb()
- def test_alter_all(self):
- """Tests all alter changes at one time"""
- # test for each db separately
- # since currently some dont support everything
-
- # test pre settings
- self.assertEqual(self.table.c.data.nullable, True)
- self.assertEqual(self.table.c.data.server_default.arg, 'tluafed')
- self.assertEqual(self.table.c.data.name, 'data')
- self.assertTrue(isinstance(self.table.c.data.type, String))
- self.assertTrue(self.table.c.data.type.length, 40)
-
- kw = dict(nullable=False,
- server_default='foobar',
- name='data_new',
- type=String(50))
- if self.engine.name == 'firebird':
- del kw['nullable']
- self.table.c.data.alter(**kw)
-
- # test altered objects
- self.assertEqual(self.table.c.data.server_default.arg, 'foobar')
- if not self.engine.name == 'firebird':
- self.assertEqual(self.table.c.data.nullable, False)
- self.assertEqual(self.table.c.data.name, 'data_new')
- self.assertEqual(self.table.c.data.type.length, 50)
-
- self.refresh_table(self.table.name)
-
- # test post settings
- if not self.engine.name == 'firebird':
- self.assertEqual(self.table.c.data_new.nullable, False)
- self.assertEqual(self.table.c.data_new.name, 'data_new')
- self.assertTrue(isinstance(self.table.c.data_new.type, String))
- self.assertTrue(self.table.c.data_new.type.length, 50)
-
- # insert data and assert default
- self.table.insert(values={'id': 10}).execute()
- row = self._select_row()
- self.assertEqual(u'foobar', row['data_new'])
-
-
-class TestColumnDelta(fixture.DB):
- """Tests ColumnDelta class"""
-
- level = fixture.DB.CONNECT
- table_name = 'tmp_coldelta'
- table_int = 0
-
- def _setup(self, url):
- super(TestColumnDelta, self)._setup(url)
- self.meta = MetaData()
- self.table = Table(self.table_name, self.meta,
- Column('ids', String(10)),
- )
- self.meta.bind = self.engine
- if self.engine.has_table(self.table.name):
- self.table.drop()
- self.table.create()
-
- def _teardown(self):
- if self.engine.has_table(self.table.name):
- self.table.drop()
- self.meta.clear()
- super(TestColumnDelta,self)._teardown()
-
- def mkcol(self, name='id', type=String, *p, **k):
- return Column(name, type, *p, **k)
-
- def verify(self, expected, original, *p, **k):
- self.delta = ColumnDelta(original, *p, **k)
- result = list(self.delta.keys())
- result.sort()
- self.assertEqual(expected, result)
- return self.delta
-
- def test_deltas_two_columns(self):
- """Testing ColumnDelta with two columns"""
- col_orig = self.mkcol(primary_key=True)
- col_new = self.mkcol(name='ids', primary_key=True)
- self.verify([], col_orig, col_orig)
- self.verify(['name'], col_orig, col_orig, 'ids')
- self.verify(['name'], col_orig, col_orig, name='ids')
- self.verify(['name'], col_orig, col_new)
- self.verify(['name', 'type'], col_orig, col_new, type=String)
-
- # Type comparisons
- self.verify([], self.mkcol(type=String), self.mkcol(type=String))
- self.verify(['type'], self.mkcol(type=String), self.mkcol(type=Integer))
- self.verify(['type'], self.mkcol(type=String), self.mkcol(type=String(42)))
- self.verify([], self.mkcol(type=String(42)), self.mkcol(type=String(42)))
- self.verify(['type'], self.mkcol(type=String(24)), self.mkcol(type=String(42)))
- self.verify(['type'], self.mkcol(type=String(24)), self.mkcol(type=Text(24)))
-
- # Other comparisons
- self.verify(['primary_key'], self.mkcol(nullable=False), self.mkcol(primary_key=True))
-
- # PK implies nullable=False
- self.verify(['nullable', 'primary_key'], self.mkcol(nullable=True), self.mkcol(primary_key=True))
- self.verify([], self.mkcol(primary_key=True), self.mkcol(primary_key=True))
- self.verify(['nullable'], self.mkcol(nullable=True), self.mkcol(nullable=False))
- self.verify([], self.mkcol(nullable=True), self.mkcol(nullable=True))
- self.verify([], self.mkcol(server_default=None), self.mkcol(server_default=None))
- self.verify([], self.mkcol(server_default='42'), self.mkcol(server_default='42'))
-
- # test server default
- delta = self.verify(['server_default'], self.mkcol(), self.mkcol('id', String, DefaultClause('foobar')))
- self.assertEqual(delta['server_default'].arg, 'foobar')
-
- self.verify([], self.mkcol(server_default='foobar'), self.mkcol('id', String, DefaultClause('foobar')))
- self.verify(['type'], self.mkcol(server_default='foobar'), self.mkcol('id', Text, DefaultClause('foobar')))
-
- col = self.mkcol(server_default='foobar')
- self.verify(['type'], col, self.mkcol('id', Text, DefaultClause('foobar')), alter_metadata=True)
- self.assertTrue(isinstance(col.type, Text))
-
- col = self.mkcol()
- self.verify(['name', 'server_default', 'type'], col, self.mkcol('beep', Text, DefaultClause('foobar')),
- alter_metadata=True)
- self.assertTrue(isinstance(col.type, Text))
- self.assertEqual(col.name, 'beep')
- self.assertEqual(col.server_default.arg, 'foobar')
-
- @fixture.usedb()
- def test_deltas_zero_columns(self):
- """Testing ColumnDelta with zero columns"""
-
- self.verify(['name'], 'ids', table=self.table, name='hey')
-
- # test reflection
- self.verify(['type'], 'ids', table=self.table.name, type=String(80), engine=self.engine)
- self.verify(['type'], 'ids', table=self.table.name, type=String(80), metadata=self.meta)
-
- self.meta.clear()
- delta = self.verify(['type'], 'ids', table=self.table.name, type=String(80), metadata=self.meta,
- alter_metadata=True)
- self.assertTrue(self.table.name in self.meta)
- self.assertEqual(delta.result_column.type.length, 80)
- self.assertEqual(self.meta.tables.get(self.table.name).c.ids.type.length, 80)
-
- # test defaults
- self.meta.clear()
- self.verify(['server_default'], 'ids', table=self.table.name, server_default='foobar',
- metadata=self.meta,
- alter_metadata=True)
- self.meta.tables.get(self.table.name).c.ids.server_default.arg == 'foobar'
-
- # test missing parameters
- self.assertRaises(ValueError, ColumnDelta, table=self.table.name)
- self.assertRaises(ValueError, ColumnDelta, 'ids', table=self.table.name, alter_metadata=True)
- self.assertRaises(ValueError, ColumnDelta, 'ids', table=self.table.name, alter_metadata=False)
-
- def test_deltas_one_column(self):
- """Testing ColumnDelta with one column"""
- col_orig = self.mkcol(primary_key=True)
-
- self.verify([], col_orig)
- self.verify(['name'], col_orig, 'ids')
- # Parameters are always executed, even if they're 'unchanged'
- # (We can't assume given column is up-to-date)
- self.verify(['name', 'primary_key', 'type'], col_orig, 'id', Integer, primary_key=True)
- self.verify(['name', 'primary_key', 'type'], col_orig, name='id', type=Integer, primary_key=True)
-
- # Change name, given an up-to-date definition and the current name
- delta = self.verify(['name'], col_orig, name='blah')
- self.assertEqual(delta.get('name'), 'blah')
- self.assertEqual(delta.current_name, 'id')
-
- col_orig = self.mkcol(primary_key=True)
- self.verify(['name', 'type'], col_orig, name='id12', type=Text, alter_metadata=True)
- self.assertTrue(isinstance(col_orig.type, Text))
- self.assertEqual(col_orig.name, 'id12')
-
- # test server default
- col_orig = self.mkcol(primary_key=True)
- delta = self.verify(['server_default'], col_orig, DefaultClause('foobar'))
- self.assertEqual(delta['server_default'].arg, 'foobar')
-
- delta = self.verify(['server_default'], col_orig, server_default=DefaultClause('foobar'))
- self.assertEqual(delta['server_default'].arg, 'foobar')
-
- # no change
- col_orig = self.mkcol(server_default=DefaultClause('foobar'))
- delta = self.verify(['type'], col_orig, DefaultClause('foobar'), type=PickleType)
- self.assertTrue(isinstance(delta.result_column.type, PickleType))
-
- # TODO: test server on update
- # TODO: test bind metadata
diff --git a/migrate/tests/changeset/test_constraint.py b/migrate/tests/changeset/test_constraint.py
deleted file mode 100644
index 325b3c0..0000000
--- a/migrate/tests/changeset/test_constraint.py
+++ /dev/null
@@ -1,299 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-from sqlalchemy import *
-from sqlalchemy.util import *
-from sqlalchemy.exc import *
-
-from migrate.changeset.util import fk_column_names
-from migrate.exceptions import *
-from migrate.changeset import *
-
-from migrate.tests import fixture
-
-
-class CommonTestConstraint(fixture.DB):
- """helper functions to test constraints.
-
- we just create a fresh new table and make sure everything is
- as required.
- """
-
- def _setup(self, url):
- super(CommonTestConstraint, self)._setup(url)
- self._create_table()
-
- def _teardown(self):
- if hasattr(self, 'table') and self.engine.has_table(self.table.name):
- self.table.drop()
- super(CommonTestConstraint, self)._teardown()
-
- def _create_table(self):
- self._connect(self.url)
- self.meta = MetaData(self.engine)
- self.tablename = 'mytable'
- self.table = Table(self.tablename, self.meta,
- Column(u'id', Integer, nullable=False),
- Column(u'fkey', Integer, nullable=False),
- mysql_engine='InnoDB')
- if self.engine.has_table(self.table.name):
- self.table.drop()
- self.table.create()
-
- # make sure we start at zero
- self.assertEqual(len(self.table.primary_key), 0)
- self.assertTrue(isinstance(self.table.primary_key,
- schema.PrimaryKeyConstraint), self.table.primary_key.__class__)
-
-
-class TestConstraint(CommonTestConstraint):
- level = fixture.DB.CONNECT
-
- def _define_pk(self, *cols):
- # Add a pk by creating a PK constraint
- if (self.engine.name in ('oracle', 'firebird')):
- # Can't drop Oracle PKs without an explicit name
- pk = PrimaryKeyConstraint(table=self.table, name='temp_pk_key', *cols)
- else:
- pk = PrimaryKeyConstraint(table=self.table, *cols)
- self.compare_columns_equal(pk.columns, cols)
- pk.create()
- self.refresh_table()
- if not self.url.startswith('sqlite'):
- self.compare_columns_equal(self.table.primary_key, cols, ['type', 'autoincrement'])
-
- # Drop the PK constraint
- #if (self.engine.name in ('oracle', 'firebird')):
- # # Apparently Oracle PK names aren't introspected
- # pk.name = self.table.primary_key.name
- pk.drop()
- self.refresh_table()
- self.assertEqual(len(self.table.primary_key), 0)
- self.assertTrue(isinstance(self.table.primary_key, schema.PrimaryKeyConstraint))
- return pk
-
- @fixture.usedb()
- def test_define_fk(self):
- """FK constraints can be defined, created, and dropped"""
- # FK target must be unique
- pk = PrimaryKeyConstraint(self.table.c.id, table=self.table, name="pkid")
- pk.create()
-
- # Add a FK by creating a FK constraint
- if SQLA_07:
- self.assertEqual(list(self.table.c.fkey.foreign_keys), [])
- else:
- self.assertEqual(self.table.c.fkey.foreign_keys._list, [])
- fk = ForeignKeyConstraint([self.table.c.fkey],
- [self.table.c.id],
- name="fk_id_fkey",
- ondelete="CASCADE")
- if SQLA_07:
- self.assertTrue(list(self.table.c.fkey.foreign_keys) is not [])
- else:
- self.assertTrue(self.table.c.fkey.foreign_keys._list is not [])
- for key in fk_column_names(fk):
- self.assertEqual(key, self.table.c.fkey.name)
- self.assertEqual([e.column for e in fk.elements], [self.table.c.id])
- self.assertEqual(list(fk.referenced), [self.table.c.id])
-
- if self.url.startswith('mysql'):
- # MySQL FKs need an index
- index = Index('index_name', self.table.c.fkey)
- index.create()
- fk.create()
-
- # test for ondelete/onupdate
- if SQLA_07:
- fkey = list(self.table.c.fkey.foreign_keys)[0]
- else:
- fkey = self.table.c.fkey.foreign_keys._list[0]
- self.assertEqual(fkey.ondelete, "CASCADE")
- # TODO: test on real db if it was set
-
- self.refresh_table()
- if SQLA_07:
- self.assertTrue(list(self.table.c.fkey.foreign_keys) is not [])
- else:
- self.assertTrue(self.table.c.fkey.foreign_keys._list is not [])
-
- fk.drop()
- self.refresh_table()
- if SQLA_07:
- self.assertEqual(list(self.table.c.fkey.foreign_keys), [])
- else:
- self.assertEqual(self.table.c.fkey.foreign_keys._list, [])
-
- @fixture.usedb()
- def test_define_pk(self):
- """PK constraints can be defined, created, and dropped"""
- self._define_pk(self.table.c.fkey)
-
- @fixture.usedb()
- def test_define_pk_multi(self):
- """Multicolumn PK constraints can be defined, created, and dropped"""
- self._define_pk(self.table.c.id, self.table.c.fkey)
-
- @fixture.usedb(not_supported=['firebird'])
- def test_drop_cascade(self):
- """Drop constraint cascaded"""
- pk = PrimaryKeyConstraint('fkey', table=self.table, name="id_pkey")
- pk.create()
- self.refresh_table()
-
- # Drop the PK constraint forcing cascade
- pk.drop(cascade=True)
-
- # TODO: add real assertion if it was added
-
- @fixture.usedb(supported=['mysql'])
- def test_fail_mysql_check_constraints(self):
- """Check constraints raise NotSupported for mysql on drop"""
- cons = CheckConstraint('id > 3', name="id_check", table=self.table)
- cons.create()
- self.refresh_table()
-
- try:
- cons.drop()
- except NotSupportedError:
- pass
- else:
- self.fail()
-
- @fixture.usedb(not_supported=['sqlite', 'mysql'])
- def test_named_check_constraints(self):
- """Check constraints can be defined, created, and dropped"""
- self.assertRaises(InvalidConstraintError, CheckConstraint, 'id > 3')
- cons = CheckConstraint('id > 3', name="id_check", table=self.table)
- cons.create()
- self.refresh_table()
-
- self.table.insert(values={'id': 4, 'fkey': 1}).execute()
- try:
- self.table.insert(values={'id': 1, 'fkey': 1}).execute()
- except (IntegrityError, ProgrammingError):
- pass
- else:
- self.fail()
-
- # Remove the name, drop the constraint; it should succeed
- cons.drop()
- self.refresh_table()
- self.table.insert(values={'id': 2, 'fkey': 2}).execute()
- self.table.insert(values={'id': 1, 'fkey': 2}).execute()
-
-
-class TestAutoname(CommonTestConstraint):
- """Every method tests for a type of constraint wether it can autoname
- itself and if you can pass object instance and names to classes.
- """
- level = fixture.DB.CONNECT
-
- @fixture.usedb(not_supported=['oracle', 'firebird'])
- def test_autoname_pk(self):
- """PrimaryKeyConstraints can guess their name if None is given"""
- # Don't supply a name; it should create one
- cons = PrimaryKeyConstraint(self.table.c.id)
- cons.create()
- self.refresh_table()
- if not self.url.startswith('sqlite'):
- # TODO: test for index for sqlite
- self.compare_columns_equal(cons.columns, self.table.primary_key, ['autoincrement', 'type'])
-
- # Remove the name, drop the constraint; it should succeed
- cons.name = None
- cons.drop()
- self.refresh_table()
- self.assertEqual(list(), list(self.table.primary_key))
-
- # test string names
- cons = PrimaryKeyConstraint('id', table=self.table)
- cons.create()
- self.refresh_table()
- if not self.url.startswith('sqlite'):
- # TODO: test for index for sqlite
- self.compare_columns_equal(cons.columns, self.table.primary_key)
- cons.name = None
- cons.drop()
-
- @fixture.usedb(not_supported=['oracle', 'sqlite', 'firebird'])
- def test_autoname_fk(self):
- """ForeignKeyConstraints can guess their name if None is given"""
- cons = PrimaryKeyConstraint(self.table.c.id)
- cons.create()
-
- cons = ForeignKeyConstraint([self.table.c.fkey], [self.table.c.id])
- cons.create()
- self.refresh_table()
- if SQLA_07:
- list(self.table.c.fkey.foreign_keys)[0].column is self.table.c.id
- else:
- self.table.c.fkey.foreign_keys[0].column is self.table.c.id
-
- # Remove the name, drop the constraint; it should succeed
- cons.name = None
- cons.drop()
- self.refresh_table()
- if SQLA_07:
- self.assertEqual(list(self.table.c.fkey.foreign_keys), list())
- else:
- self.assertEqual(self.table.c.fkey.foreign_keys._list, list())
-
- # test string names
- cons = ForeignKeyConstraint(['fkey'], ['%s.id' % self.tablename], table=self.table)
- cons.create()
- self.refresh_table()
- if SQLA_07:
- list(self.table.c.fkey.foreign_keys)[0].column is self.table.c.id
- else:
- self.table.c.fkey.foreign_keys[0].column is self.table.c.id
-
- # Remove the name, drop the constraint; it should succeed
- cons.name = None
- cons.drop()
-
- @fixture.usedb(not_supported=['oracle', 'sqlite', 'mysql'])
- def test_autoname_check(self):
- """CheckConstraints can guess their name if None is given"""
- cons = CheckConstraint('id > 3', columns=[self.table.c.id])
- cons.create()
- self.refresh_table()
-
- if not self.engine.name == 'mysql':
- self.table.insert(values={'id': 4, 'fkey': 1}).execute()
- try:
- self.table.insert(values={'id': 1, 'fkey': 2}).execute()
- except (IntegrityError, ProgrammingError):
- pass
- else:
- self.fail()
-
- # Remove the name, drop the constraint; it should succeed
- cons.name = None
- cons.drop()
- self.refresh_table()
- self.table.insert(values={'id': 2, 'fkey': 2}).execute()
- self.table.insert(values={'id': 1, 'fkey': 3}).execute()
-
- @fixture.usedb(not_supported=['oracle'])
- def test_autoname_unique(self):
- """UniqueConstraints can guess their name if None is given"""
- cons = UniqueConstraint(self.table.c.fkey)
- cons.create()
- self.refresh_table()
-
- self.table.insert(values={'fkey': 4, 'id': 1}).execute()
- try:
- self.table.insert(values={'fkey': 4, 'id': 2}).execute()
- except (sqlalchemy.exc.IntegrityError,
- sqlalchemy.exc.ProgrammingError):
- pass
- else:
- self.fail()
-
- # Remove the name, drop the constraint; it should succeed
- cons.name = None
- cons.drop()
- self.refresh_table()
- self.table.insert(values={'fkey': 4, 'id': 2}).execute()
- self.table.insert(values={'fkey': 4, 'id': 1}).execute()
diff --git a/migrate/tests/fixture/__init__.py b/migrate/tests/fixture/__init__.py
deleted file mode 100644
index 6b8bc48..0000000
--- a/migrate/tests/fixture/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import testtools
-
-def main(imports=None):
- if imports:
- global suite
- suite = suite(imports)
- defaultTest='fixture.suite'
- else:
- defaultTest=None
- return testtools.TestProgram(defaultTest=defaultTest)
-
-from .base import Base
-from .pathed import Pathed
-from .shell import Shell
-from .database import DB,usedb
diff --git a/migrate/tests/fixture/base.py b/migrate/tests/fixture/base.py
deleted file mode 100644
index 38c91af..0000000
--- a/migrate/tests/fixture/base.py
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import re
-import testtools
-
-class Base(testtools.TestCase):
-
- def assertEqualIgnoreWhitespace(self, v1, v2):
- """Compares two strings that should be\
- identical except for whitespace
- """
- def strip_whitespace(s):
- return re.sub(r'\s', '', s)
-
- line1 = strip_whitespace(v1)
- line2 = strip_whitespace(v2)
-
- self.assertEqual(line1, line2, "%s != %s" % (v1, v2))
-
- def ignoreErrors(self, func, *p,**k):
- """Call a function, ignoring any exceptions"""
- try:
- func(*p,**k)
- except:
- pass
diff --git a/migrate/tests/fixture/database.py b/migrate/tests/fixture/database.py
deleted file mode 100644
index 93bd69b..0000000
--- a/migrate/tests/fixture/database.py
+++ /dev/null
@@ -1,203 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import os
-import logging
-import sys
-
-import six
-from decorator import decorator
-
-from sqlalchemy import create_engine, Table, MetaData
-from sqlalchemy import exc as sa_exc
-from sqlalchemy.orm import create_session
-from sqlalchemy.pool import StaticPool
-
-from migrate.changeset.schema import ColumnDelta
-from migrate.versioning.util import Memoize
-
-from migrate.tests.fixture.base import Base
-from migrate.tests.fixture.pathed import Pathed
-
-
-log = logging.getLogger(__name__)
-
-@Memoize
-def readurls():
- """read URLs from config file return a list"""
- # TODO: remove tmpfile since sqlite can store db in memory
- filename = 'test_db.cfg' if six.PY2 else "test_db_py3.cfg"
- ret = list()
- tmpfile = Pathed.tmp()
- fullpath = os.path.join(os.curdir, filename)
-
- try:
- fd = open(fullpath)
- except IOError:
- raise IOError("""You must specify the databases to use for testing!
-Copy %(filename)s.tmpl to %(filename)s and edit your database URLs.""" % locals())
-
- for line in fd:
- if line.startswith('#'):
- continue
- line = line.replace('__tmp__', tmpfile).strip()
- ret.append(line)
- fd.close()
- return ret
-
-def is_supported(url, supported, not_supported):
- db = url.split(':', 1)[0]
-
- if supported is not None:
- if isinstance(supported, six.string_types):
- return supported == db
- else:
- return db in supported
- elif not_supported is not None:
- if isinstance(not_supported, six.string_types):
- return not_supported != db
- else:
- return not (db in not_supported)
- return True
-
-
-def usedb(supported=None, not_supported=None):
- """Decorates tests to be run with a database connection
- These tests are run once for each available database
-
- @param supported: run tests for ONLY these databases
- @param not_supported: run tests for all databases EXCEPT these
-
- If both supported and not_supported are empty, all dbs are assumed
- to be supported
- """
- if supported is not None and not_supported is not None:
- raise AssertionError("Can't specify both supported and not_supported in fixture.db()")
-
- urls = readurls()
- my_urls = [url for url in urls if is_supported(url, supported, not_supported)]
-
- @decorator
- def dec(f, self, *a, **kw):
- failed_for = []
- fail = False
- for url in my_urls:
- try:
- log.debug("Running test with engine %s", url)
- try:
- self._setup(url)
- except sa_exc.OperationalError:
- log.info('Backend %s is not available, skip it', url)
- continue
- except Exception as e:
- raise RuntimeError('Exception during _setup(): %r' % e)
-
- try:
- f(self, *a, **kw)
- finally:
- try:
- self._teardown()
- except Exception as e:
- raise RuntimeError('Exception during _teardown(): %r' % e)
- except Exception:
- failed_for.append(url)
- fail = sys.exc_info()
- for url in failed_for:
- log.error('Failed for %s', url)
- if fail:
- # cause the failure :-)
- six.reraise(*fail)
- return dec
-
-
-class DB(Base):
- # Constants: connection level
- NONE = 0 # No connection; just set self.url
- CONNECT = 1 # Connect; no transaction
- TXN = 2 # Everything in a transaction
-
- level = TXN
-
- def _engineInfo(self, url=None):
- if url is None:
- url = self.url
- return url
-
- def _setup(self, url):
- self._connect(url)
- # make sure there are no tables lying around
- meta = MetaData(self.engine)
- meta.reflect()
- meta.drop_all()
-
- def _teardown(self):
- self._disconnect()
-
- def _connect(self, url):
- self.url = url
- # TODO: seems like 0.5.x branch does not work with engine.dispose and staticpool
- #self.engine = create_engine(url, echo=True, poolclass=StaticPool)
- self.engine = create_engine(url, echo=True)
- # silence the logger added by SA, nose adds its own!
- logging.getLogger('sqlalchemy').handlers=[]
- self.meta = MetaData(bind=self.engine)
- if self.level < self.CONNECT:
- return
- #self.session = create_session(bind=self.engine)
- if self.level < self.TXN:
- return
- #self.txn = self.session.begin()
-
- def _disconnect(self):
- if hasattr(self, 'txn'):
- self.txn.rollback()
- if hasattr(self, 'session'):
- self.session.close()
- #if hasattr(self,'conn'):
- # self.conn.close()
- self.engine.dispose()
-
- def _supported(self, url):
- db = url.split(':',1)[0]
- func = getattr(self, self._TestCase__testMethodName)
- if hasattr(func, 'supported'):
- return db in func.supported
- if hasattr(func, 'not_supported'):
- return not (db in func.not_supported)
- # Neither list assigned; assume all are supported
- return True
-
- def _not_supported(self, url):
- return not self._supported(url)
-
- def _select_row(self):
- """Select rows, used in multiple tests"""
- return self.table.select().execution_options(
- autocommit=True).execute().fetchone()
-
- def refresh_table(self, name=None):
- """Reload the table from the database
- Assumes we're working with only a single table, self.table, and
- metadata self.meta
-
- Working w/ multiple tables is not possible, as tables can only be
- reloaded with meta.clear()
- """
- if name is None:
- name = self.table.name
- self.meta.clear()
- self.table = Table(name, self.meta, autoload=True)
-
- def compare_columns_equal(self, columns1, columns2, ignore=None):
- """Loop through all columns and compare them"""
- def key(column):
- return column.name
- for c1, c2 in zip(sorted(columns1, key=key), sorted(columns2, key=key)):
- diffs = ColumnDelta(c1, c2).diffs
- if ignore:
- for key in ignore:
- diffs.pop(key, None)
- if diffs:
- self.fail("Comparing %s to %s failed: %s" % (columns1, columns2, diffs))
-
-# TODO: document engine.dispose and write tests
diff --git a/migrate/tests/fixture/models.py b/migrate/tests/fixture/models.py
deleted file mode 100644
index ee76429..0000000
--- a/migrate/tests/fixture/models.py
+++ /dev/null
@@ -1,14 +0,0 @@
-from sqlalchemy import *
-
-# test rundiffs in shell
-meta_old_rundiffs = MetaData()
-meta_rundiffs = MetaData()
-meta = MetaData()
-
-tmp_account_rundiffs = Table('tmp_account_rundiffs', meta_rundiffs,
- Column('id', Integer, primary_key=True),
- Column('login', Text()),
- Column('passwd', Text()),
-)
-
-tmp_sql_table = Table('tmp_sql_table', meta, Column('id', Integer))
diff --git a/migrate/tests/fixture/pathed.py b/migrate/tests/fixture/pathed.py
deleted file mode 100644
index 78cf4cd..0000000
--- a/migrate/tests/fixture/pathed.py
+++ /dev/null
@@ -1,77 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import os
-import sys
-import shutil
-import tempfile
-
-from migrate.tests.fixture import base
-
-
-class Pathed(base.Base):
- # Temporary files
-
- _tmpdir = tempfile.mkdtemp()
-
- def setUp(self):
- super(Pathed, self).setUp()
- self.temp_usable_dir = tempfile.mkdtemp()
- sys.path.append(self.temp_usable_dir)
-
- def tearDown(self):
- super(Pathed, self).tearDown()
- try:
- sys.path.remove(self.temp_usable_dir)
- except:
- pass # w00t?
- Pathed.purge(self.temp_usable_dir)
-
- @classmethod
- def _tmp(cls, prefix='', suffix=''):
- """Generate a temporary file name that doesn't exist
- All filenames are generated inside a temporary directory created by
- tempfile.mkdtemp(); only the creating user has access to this directory.
- It should be secure to return a nonexistant temp filename in this
- directory, unless the user is messing with their own files.
- """
- file, ret = tempfile.mkstemp(suffix,prefix,cls._tmpdir)
- os.close(file)
- os.remove(ret)
- return ret
-
- @classmethod
- def tmp(cls, *p, **k):
- return cls._tmp(*p, **k)
-
- @classmethod
- def tmp_py(cls, *p, **k):
- return cls._tmp(suffix='.py', *p, **k)
-
- @classmethod
- def tmp_sql(cls, *p, **k):
- return cls._tmp(suffix='.sql', *p, **k)
-
- @classmethod
- def tmp_named(cls, name):
- return os.path.join(cls._tmpdir, name)
-
- @classmethod
- def tmp_repos(cls, *p, **k):
- return cls._tmp(*p, **k)
-
- @classmethod
- def purge(cls, path):
- """Removes this path if it exists, in preparation for tests
- Careful - all tests should take place in /tmp.
- We don't want to accidentally wipe stuff out...
- """
- if os.path.exists(path):
- if os.path.isdir(path):
- shutil.rmtree(path)
- else:
- os.remove(path)
- if path.endswith('.py'):
- pyc = path + 'c'
- if os.path.exists(pyc):
- os.remove(pyc)
diff --git a/migrate/tests/fixture/shell.py b/migrate/tests/fixture/shell.py
deleted file mode 100644
index 566d250..0000000
--- a/migrate/tests/fixture/shell.py
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import os
-import sys
-import logging
-
-from scripttest import TestFileEnvironment
-
-from migrate.tests.fixture.pathed import *
-
-
-log = logging.getLogger(__name__)
-
-class Shell(Pathed):
- """Base class for command line tests"""
-
- def setUp(self):
- super(Shell, self).setUp()
- migrate_path = os.path.dirname(sys.executable)
- # PATH to migrate development script folder
- log.debug('PATH for ScriptTest: %s', migrate_path)
- self.env = TestFileEnvironment(
- base_path=os.path.join(self.temp_usable_dir, 'env'),
- )
-
- def run_version(self, repos_path):
- result = self.env.run('migrate version %s' % repos_path)
- return int(result.stdout.strip())
-
- def run_db_version(self, url, repos_path):
- result = self.env.run('migrate db_version %s %s' % (url, repos_path))
- return int(result.stdout.strip())
diff --git a/migrate/tests/fixture/warnings.py b/migrate/tests/fixture/warnings.py
deleted file mode 100644
index 8d99c0f..0000000
--- a/migrate/tests/fixture/warnings.py
+++ /dev/null
@@ -1,88 +0,0 @@
-# lifted from Python 2.6, so we can use it in Python 2.5
-import sys
-
-class WarningMessage(object):
-
- """Holds the result of a single showwarning() call."""
-
- _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
- "line")
-
- def __init__(self, message, category, filename, lineno, file=None,
- line=None):
- local_values = locals()
- for attr in self._WARNING_DETAILS:
- setattr(self, attr, local_values[attr])
- if category:
- self._category_name = category.__name__
- else:
- self._category_name = None
-
- def __str__(self):
- return ("{message : %r, category : %r, filename : %r, lineno : %s, "
- "line : %r}" % (self.message, self._category_name,
- self.filename, self.lineno, self.line))
-
-
-class catch_warnings(object):
-
- """A context manager that copies and restores the warnings filter upon
- exiting the context.
-
- The 'record' argument specifies whether warnings should be captured by a
- custom implementation of warnings.showwarning() and be appended to a list
- returned by the context manager. Otherwise None is returned by the context
- manager. The objects appended to the list are arguments whose attributes
- mirror the arguments to showwarning().
-
- The 'module' argument is to specify an alternative module to the module
- named 'warnings' and imported under that name. This argument is only useful
- when testing the warnings module itself.
-
- """
-
- def __init__(self, record=False, module=None):
- """Specify whether to record warnings and if an alternative module
- should be used other than sys.modules['warnings'].
-
- For compatibility with Python 3.0, please consider all arguments to be
- keyword-only.
-
- """
- self._record = record
- if module is None:
- self._module = sys.modules['warnings']
- else:
- self._module = module
- self._entered = False
-
- def __repr__(self):
- args = []
- if self._record:
- args.append("record=True")
- if self._module is not sys.modules['warnings']:
- args.append("module=%r" % self._module)
- name = type(self).__name__
- return "%s(%s)" % (name, ", ".join(args))
-
- def __enter__(self):
- if self._entered:
- raise RuntimeError("Cannot enter %r twice" % self)
- self._entered = True
- self._filters = self._module.filters
- self._module.filters = self._filters[:]
- self._showwarning = self._module.showwarning
- if self._record:
- log = []
- def showwarning(*args, **kwargs):
- log.append(WarningMessage(*args, **kwargs))
- self._module.showwarning = showwarning
- return log
- else:
- return None
-
- def __exit__(self, *exc_info):
- if not self._entered:
- raise RuntimeError("Cannot exit %r without entering first" % self)
- self._module.filters = self._filters
- self._module.showwarning = self._showwarning
diff --git a/migrate/tests/integrated/__init__.py b/migrate/tests/integrated/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/migrate/tests/integrated/__init__.py
+++ /dev/null
diff --git a/migrate/tests/integrated/test_docs.py b/migrate/tests/integrated/test_docs.py
deleted file mode 100644
index 8e35427..0000000
--- a/migrate/tests/integrated/test_docs.py
+++ /dev/null
@@ -1,18 +0,0 @@
-import doctest
-import os
-
-
-from migrate.tests import fixture
-
-# Collect tests for all handwritten docs: doc/*.rst
-
-dir = ('..','..','..','doc','source')
-absdir = (os.path.dirname(os.path.abspath(__file__)),)+dir
-dirpath = os.path.join(*absdir)
-files = [f for f in os.listdir(dirpath) if f.endswith('.rst')]
-paths = [os.path.join(*(dir+(f,))) for f in files]
-assert len(paths) > 0
-suite = doctest.DocFileSuite(*paths)
-
-def test_docs():
- suite.debug()
diff --git a/migrate/tests/versioning/__init__.py b/migrate/tests/versioning/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/migrate/tests/versioning/__init__.py
+++ /dev/null
diff --git a/migrate/tests/versioning/test_api.py b/migrate/tests/versioning/test_api.py
deleted file mode 100644
index bc4b29d..0000000
--- a/migrate/tests/versioning/test_api.py
+++ /dev/null
@@ -1,128 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-
-import six
-
-from migrate.exceptions import *
-from migrate.versioning import api
-
-from migrate.tests.fixture.pathed import *
-from migrate.tests.fixture import models
-from migrate.tests import fixture
-
-
-class TestAPI(Pathed):
-
- def test_help(self):
- self.assertTrue(isinstance(api.help('help'), six.string_types))
- self.assertRaises(UsageError, api.help)
- self.assertRaises(UsageError, api.help, 'foobar')
- self.assertTrue(isinstance(api.help('create'), str))
-
- # test that all commands return some text
- for cmd in api.__all__:
- content = api.help(cmd)
- self.assertTrue(content)
-
- def test_create(self):
- tmprepo = self.tmp_repos()
- api.create(tmprepo, 'temp')
-
- # repository already exists
- self.assertRaises(KnownError, api.create, tmprepo, 'temp')
-
- def test_script(self):
- repo = self.tmp_repos()
- api.create(repo, 'temp')
- api.script('first version', repo)
-
- def test_script_sql(self):
- repo = self.tmp_repos()
- api.create(repo, 'temp')
- api.script_sql('postgres', 'desc', repo)
-
- def test_version(self):
- repo = self.tmp_repos()
- api.create(repo, 'temp')
- api.version(repo)
-
- def test_version_control(self):
- repo = self.tmp_repos()
- api.create(repo, 'temp')
- api.version_control('sqlite:///', repo)
- api.version_control('sqlite:///', six.text_type(repo))
-
- def test_source(self):
- repo = self.tmp_repos()
- api.create(repo, 'temp')
- api.script('first version', repo)
- api.script_sql('default', 'desc', repo)
-
- # no repository
- self.assertRaises(UsageError, api.source, 1)
-
- # stdout
- out = api.source(1, dest=None, repository=repo)
- self.assertTrue(out)
-
- # file
- out = api.source(1, dest=self.tmp_repos(), repository=repo)
- self.assertFalse(out)
-
- def test_manage(self):
- output = api.manage(os.path.join(self.temp_usable_dir, 'manage.py'))
-
-
-class TestSchemaAPI(fixture.DB, Pathed):
-
- def _setup(self, url):
- super(TestSchemaAPI, self)._setup(url)
- self.repo = self.tmp_repos()
- api.create(self.repo, 'temp')
- self.schema = api.version_control(url, self.repo)
-
- def _teardown(self):
- self.schema = api.drop_version_control(self.url, self.repo)
- super(TestSchemaAPI, self)._teardown()
-
- @fixture.usedb()
- def test_workflow(self):
- self.assertEqual(api.db_version(self.url, self.repo), 0)
- api.script('First Version', self.repo)
- self.assertEqual(api.db_version(self.url, self.repo), 0)
- api.upgrade(self.url, self.repo, 1)
- self.assertEqual(api.db_version(self.url, self.repo), 1)
- api.downgrade(self.url, self.repo, 0)
- self.assertEqual(api.db_version(self.url, self.repo), 0)
- api.test(self.url, self.repo)
- self.assertEqual(api.db_version(self.url, self.repo), 0)
-
- # preview
- # TODO: test output
- out = api.upgrade(self.url, self.repo, preview_py=True)
- out = api.upgrade(self.url, self.repo, preview_sql=True)
-
- api.upgrade(self.url, self.repo, 1)
- api.script_sql('default', 'desc', self.repo)
- self.assertRaises(UsageError, api.upgrade, self.url, self.repo, 2, preview_py=True)
- out = api.upgrade(self.url, self.repo, 2, preview_sql=True)
-
- # cant upgrade to version 1, already at version 1
- self.assertEqual(api.db_version(self.url, self.repo), 1)
- self.assertRaises(KnownError, api.upgrade, self.url, self.repo, 0)
-
- @fixture.usedb()
- def test_compare_model_to_db(self):
- diff = api.compare_model_to_db(self.url, self.repo, models.meta)
-
- @fixture.usedb()
- def test_create_model(self):
- model = api.create_model(self.url, self.repo)
-
- @fixture.usedb()
- def test_make_update_script_for_model(self):
- model = api.make_update_script_for_model(self.url, self.repo, models.meta_old_rundiffs, models.meta_rundiffs)
-
- @fixture.usedb()
- def test_update_db_from_model(self):
- model = api.update_db_from_model(self.url, self.repo, models.meta_rundiffs)
diff --git a/migrate/tests/versioning/test_cfgparse.py b/migrate/tests/versioning/test_cfgparse.py
deleted file mode 100644
index a31273e..0000000
--- a/migrate/tests/versioning/test_cfgparse.py
+++ /dev/null
@@ -1,27 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-
-from migrate.versioning import cfgparse
-from migrate.versioning.repository import *
-from migrate.versioning.template import Template
-from migrate.tests import fixture
-
-
-class TestConfigParser(fixture.Base):
-
- def test_to_dict(self):
- """Correctly interpret config results as dictionaries"""
- parser = cfgparse.Parser(dict(default_value=42))
- self.assertTrue(len(parser.sections()) == 0)
- parser.add_section('section')
- parser.set('section','option','value')
- self.assertEqual(parser.get('section', 'option'), 'value')
- self.assertEqual(parser.to_dict()['section']['option'], 'value')
-
- def test_table_config(self):
- """We should be able to specify the table to be used with a repository"""
- default_text = Repository.prepare_config(Template().get_repository(),
- 'repository_name', {})
- specified_text = Repository.prepare_config(Template().get_repository(),
- 'repository_name', {'version_table': '_other_table'})
- self.assertNotEqual(default_text, specified_text)
diff --git a/migrate/tests/versioning/test_database.py b/migrate/tests/versioning/test_database.py
deleted file mode 100644
index 8291c6b..0000000
--- a/migrate/tests/versioning/test_database.py
+++ /dev/null
@@ -1,13 +0,0 @@
-from sqlalchemy import select, text
-from migrate.tests import fixture
-
-class TestConnect(fixture.DB):
- level=fixture.DB.TXN
-
- @fixture.usedb()
- def test_connect(self):
- """Connect to the database successfully"""
- # Connection is done in fixture.DB setup; make sure we can do stuff
- self.engine.execute(
- select([text('42')])
- )
diff --git a/migrate/tests/versioning/test_genmodel.py b/migrate/tests/versioning/test_genmodel.py
deleted file mode 100644
index f800826..0000000
--- a/migrate/tests/versioning/test_genmodel.py
+++ /dev/null
@@ -1,214 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import os
-
-import six
-import sqlalchemy
-from sqlalchemy import *
-
-from migrate.versioning import genmodel, schemadiff
-from migrate.changeset import schema
-
-from migrate.tests import fixture
-
-
-class TestSchemaDiff(fixture.DB):
- table_name = 'tmp_schemadiff'
- level = fixture.DB.CONNECT
-
- def _setup(self, url):
- super(TestSchemaDiff, self)._setup(url)
- self.meta = MetaData(self.engine)
- self.meta.reflect()
- self.meta.drop_all() # in case junk tables are lying around in the test database
- self.meta = MetaData(self.engine)
- self.meta.reflect() # needed if we just deleted some tables
- self.table = Table(self.table_name, self.meta,
- Column('id',Integer(), primary_key=True),
- Column('name', UnicodeText()),
- Column('data', UnicodeText()),
- )
-
- def _teardown(self):
- if self.table.exists():
- self.meta = MetaData(self.engine)
- self.meta.reflect()
- self.meta.drop_all()
- super(TestSchemaDiff, self)._teardown()
-
- def _applyLatestModel(self):
- diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version'])
- genmodel.ModelGenerator(diff,self.engine).runB2A()
-
- # NOTE(mriedem): DB2 handles UnicodeText as LONG VARGRAPHIC
- # so the schema diffs on the columns don't work with this test.
- @fixture.usedb(not_supported='ibm_db_sa')
- def test_functional(self):
- def assertDiff(isDiff, tablesMissingInDatabase, tablesMissingInModel, tablesWithDiff):
- diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version'])
- self.assertEqual(
- (diff.tables_missing_from_B,
- diff.tables_missing_from_A,
- list(diff.tables_different.keys()),
- bool(diff)),
- (tablesMissingInDatabase,
- tablesMissingInModel,
- tablesWithDiff,
- isDiff)
- )
-
- # Model is defined but database is empty.
- assertDiff(True, [self.table_name], [], [])
-
- # Check Python upgrade and downgrade of database from updated model.
- diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version'])
- decls, upgradeCommands, downgradeCommands = genmodel.ModelGenerator(diff,self.engine).genB2AMigration()
-
- # Feature test for a recent SQLa feature;
- # expect different output in that case.
- if repr(String()) == 'String()':
- self.assertEqualIgnoreWhitespace(decls, '''
- from migrate.changeset import schema
- pre_meta = MetaData()
- post_meta = MetaData()
- tmp_schemadiff = Table('tmp_schemadiff', post_meta,
- Column('id', Integer, primary_key=True, nullable=False),
- Column('name', UnicodeText),
- Column('data', UnicodeText),
- )
- ''')
- else:
- self.assertEqualIgnoreWhitespace(decls, '''
- from migrate.changeset import schema
- pre_meta = MetaData()
- post_meta = MetaData()
- tmp_schemadiff = Table('tmp_schemadiff', post_meta,
- Column('id', Integer, primary_key=True, nullable=False),
- Column('name', UnicodeText(length=None)),
- Column('data', UnicodeText(length=None)),
- )
- ''')
-
- # Create table in database, now model should match database.
- self._applyLatestModel()
- assertDiff(False, [], [], [])
-
- # Check Python code gen from database.
- diff = schemadiff.getDiffOfModelAgainstDatabase(MetaData(), self.engine, excludeTables=['migrate_version'])
- src = genmodel.ModelGenerator(diff,self.engine).genBDefinition()
-
- namespace = {}
- six.exec_(src, namespace)
-
- c1 = Table('tmp_schemadiff', self.meta, autoload=True).c
- c2 = namespace['tmp_schemadiff'].c
- self.compare_columns_equal(c1, c2, ['type'])
- # TODO: get rid of ignoring type
-
- if not self.engine.name == 'oracle':
- # Add data, later we'll make sure it's still present.
- result = self.engine.execute(self.table.insert(), id=1, name=u'mydata')
- dataId = result.inserted_primary_key[0]
-
- # Modify table in model (by removing it and adding it back to model)
- # Drop column data, add columns data2 and data3.
- self.meta.remove(self.table)
- self.table = Table(self.table_name,self.meta,
- Column('id',Integer(),primary_key=True),
- Column('name',UnicodeText(length=None)),
- Column('data2',Integer(),nullable=True),
- Column('data3',Integer(),nullable=True),
- )
- assertDiff(True, [], [], [self.table_name])
-
- # Apply latest model changes and find no more diffs.
- self._applyLatestModel()
- assertDiff(False, [], [], [])
-
- # Drop column data3, add data4
- self.meta.remove(self.table)
- self.table = Table(self.table_name,self.meta,
- Column('id',Integer(),primary_key=True),
- Column('name',UnicodeText(length=None)),
- Column('data2',Integer(),nullable=True),
- Column('data4',Float(),nullable=True),
- )
- assertDiff(True, [], [], [self.table_name])
-
- diff = schemadiff.getDiffOfModelAgainstDatabase(
- self.meta, self.engine, excludeTables=['migrate_version'])
- decls, upgradeCommands, downgradeCommands = genmodel.ModelGenerator(diff,self.engine).genB2AMigration(indent='')
-
- # decls have changed since genBDefinition
- six.exec_(decls, namespace)
- # migration commands expect a namespace containing migrate_engine
- namespace['migrate_engine'] = self.engine
- # run the migration up and down
- six.exec_(upgradeCommands, namespace)
- assertDiff(False, [], [], [])
-
- six.exec_(decls, namespace)
- six.exec_(downgradeCommands, namespace)
- assertDiff(True, [], [], [self.table_name])
-
- six.exec_(decls, namespace)
- six.exec_(upgradeCommands, namespace)
- assertDiff(False, [], [], [])
-
- if not self.engine.name == 'oracle':
- # Make sure data is still present.
- result = self.engine.execute(self.table.select(self.table.c.id==dataId))
- rows = result.fetchall()
- self.assertEqual(len(rows), 1)
- self.assertEqual(rows[0].name, 'mydata')
-
- # Add data, later we'll make sure it's still present.
- result = self.engine.execute(self.table.insert(), id=2, name=u'mydata2', data2=123)
- dataId2 = result.inserted_primary_key[0]
-
- # Change column type in model.
- self.meta.remove(self.table)
- self.table = Table(self.table_name,self.meta,
- Column('id',Integer(),primary_key=True),
- Column('name',UnicodeText(length=None)),
- Column('data2',String(255),nullable=True),
- )
-
- # XXX test type diff
- return
-
- assertDiff(True, [], [], [self.table_name])
-
- # Apply latest model changes and find no more diffs.
- self._applyLatestModel()
- assertDiff(False, [], [], [])
-
- if not self.engine.name == 'oracle':
- # Make sure data is still present.
- result = self.engine.execute(self.table.select(self.table.c.id==dataId2))
- rows = result.fetchall()
- self.assertEqual(len(rows), 1)
- self.assertEqual(rows[0].name, 'mydata2')
- self.assertEqual(rows[0].data2, '123')
-
- # Delete data, since we're about to make a required column.
- # Not even using sqlalchemy.PassiveDefault helps because we're doing explicit column select.
- self.engine.execute(self.table.delete(), id=dataId)
-
- if not self.engine.name == 'firebird':
- # Change column nullable in model.
- self.meta.remove(self.table)
- self.table = Table(self.table_name,self.meta,
- Column('id',Integer(),primary_key=True),
- Column('name',UnicodeText(length=None)),
- Column('data2',String(255),nullable=False),
- )
- assertDiff(True, [], [], [self.table_name]) # TODO test nullable diff
-
- # Apply latest model changes and find no more diffs.
- self._applyLatestModel()
- assertDiff(False, [], [], [])
-
- # Remove table from model.
- self.meta.remove(self.table)
- assertDiff(True, [], [self.table_name], [])
diff --git a/migrate/tests/versioning/test_keyedinstance.py b/migrate/tests/versioning/test_keyedinstance.py
deleted file mode 100644
index 485cbbb..0000000
--- a/migrate/tests/versioning/test_keyedinstance.py
+++ /dev/null
@@ -1,45 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-
-from migrate.tests import fixture
-from migrate.versioning.util.keyedinstance import *
-
-class TestKeydInstance(fixture.Base):
- def test_unique(self):
- """UniqueInstance should produce unique object instances"""
- class Uniq1(KeyedInstance):
- @classmethod
- def _key(cls,key):
- return str(key)
- def __init__(self,value):
- self.value=value
- class Uniq2(KeyedInstance):
- @classmethod
- def _key(cls,key):
- return str(key)
- def __init__(self,value):
- self.value=value
-
- a10 = Uniq1('a')
-
- # Different key: different instance
- b10 = Uniq1('b')
- self.assertTrue(a10 is not b10)
-
- # Different class: different instance
- a20 = Uniq2('a')
- self.assertTrue(a10 is not a20)
-
- # Same key/class: same instance
- a11 = Uniq1('a')
- self.assertTrue(a10 is a11)
-
- # __init__ is called
- self.assertEqual(a10.value,'a')
-
- # clear() causes us to forget all existing instances
- Uniq1.clear()
- a12 = Uniq1('a')
- self.assertTrue(a10 is not a12)
-
- self.assertRaises(NotImplementedError, KeyedInstance._key)
diff --git a/migrate/tests/versioning/test_pathed.py b/migrate/tests/versioning/test_pathed.py
deleted file mode 100644
index 53f0b47..0000000
--- a/migrate/tests/versioning/test_pathed.py
+++ /dev/null
@@ -1,51 +0,0 @@
-from migrate.tests import fixture
-from migrate.versioning.pathed import *
-
-class TestPathed(fixture.Base):
- def test_parent_path(self):
- """Default parent_path should behave correctly"""
- filepath='/fgsfds/moot.py'
- dirpath='/fgsfds/moot'
- sdirpath='/fgsfds/moot/'
-
- result='/fgsfds'
- self.assertTrue(result==Pathed._parent_path(filepath))
- self.assertTrue(result==Pathed._parent_path(dirpath))
- self.assertTrue(result==Pathed._parent_path(sdirpath))
-
- def test_new(self):
- """Pathed(path) shouldn't create duplicate objects of the same path"""
- path='/fgsfds'
- class Test(Pathed):
- attr=None
- o1=Test(path)
- o2=Test(path)
- self.assertTrue(isinstance(o1,Test))
- self.assertTrue(o1.path==path)
- self.assertTrue(o1 is o2)
- o1.attr='herring'
- self.assertTrue(o2.attr=='herring')
- o2.attr='shrubbery'
- self.assertTrue(o1.attr=='shrubbery')
-
- def test_parent(self):
- """Parents should be fetched correctly"""
- class Parent(Pathed):
- parent=None
- children=0
- def _init_child(self,child,path):
- """Keep a tally of children.
- (A real class might do something more interesting here)
- """
- self.__class__.children+=1
-
- class Child(Pathed):
- parent=Parent
-
- path='/fgsfds/moot.py'
- parent_path='/fgsfds'
- object=Child(path)
- self.assertTrue(isinstance(object,Child))
- self.assertTrue(isinstance(object.parent,Parent))
- self.assertTrue(object.path==path)
- self.assertTrue(object.parent.path==parent_path)
diff --git a/migrate/tests/versioning/test_repository.py b/migrate/tests/versioning/test_repository.py
deleted file mode 100644
index 6e87c02..0000000
--- a/migrate/tests/versioning/test_repository.py
+++ /dev/null
@@ -1,216 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import os
-import shutil
-
-from migrate import exceptions
-from migrate.versioning.repository import *
-from migrate.versioning.script import *
-
-from migrate.tests import fixture
-from datetime import datetime
-
-
-class TestRepository(fixture.Pathed):
- def test_create(self):
- """Repositories are created successfully"""
- path = self.tmp_repos()
- name = 'repository_name'
-
- # Creating a repository that doesn't exist should succeed
- repo = Repository.create(path, name)
- config_path = repo.config.path
- manage_path = os.path.join(repo.path, 'manage.py')
- self.assertTrue(repo)
-
- # Files should actually be created
- self.assertTrue(os.path.exists(path))
- self.assertTrue(os.path.exists(config_path))
- self.assertTrue(os.path.exists(manage_path))
-
- # Can't create it again: it already exists
- self.assertRaises(exceptions.PathFoundError, Repository.create, path, name)
- return path
-
- def test_load(self):
- """We should be able to load information about an existing repository"""
- # Create a repository to load
- path = self.test_create()
- repos = Repository(path)
-
- self.assertTrue(repos)
- self.assertTrue(repos.config)
- self.assertTrue(repos.config.get('db_settings', 'version_table'))
-
- # version_table's default isn't none
- self.assertNotEqual(repos.config.get('db_settings', 'version_table'), 'None')
-
- def test_load_notfound(self):
- """Nonexistant repositories shouldn't be loaded"""
- path = self.tmp_repos()
- self.assertTrue(not os.path.exists(path))
- self.assertRaises(exceptions.InvalidRepositoryError, Repository, path)
-
- def test_load_invalid(self):
- """Invalid repos shouldn't be loaded"""
- # Here, invalid=empty directory. There may be other conditions too,
- # but we shouldn't need to test all of them
- path = self.tmp_repos()
- os.mkdir(path)
- self.assertRaises(exceptions.InvalidRepositoryError, Repository, path)
-
-
-class TestVersionedRepository(fixture.Pathed):
- """Tests on an existing repository with a single python script"""
-
- def setUp(self):
- super(TestVersionedRepository, self).setUp()
- Repository.clear()
- self.path_repos = self.tmp_repos()
- Repository.create(self.path_repos, 'repository_name')
-
- def test_version(self):
- """We should correctly detect the version of a repository"""
- repos = Repository(self.path_repos)
-
- # Get latest version, or detect if a specified version exists
- self.assertEqual(repos.latest, 0)
- # repos.latest isn't an integer, but a VerNum
- # (so we can't just assume the following tests are correct)
- self.assertTrue(repos.latest >= 0)
- self.assertTrue(repos.latest < 1)
-
- # Create a script and test again
- repos.create_script('')
- self.assertEqual(repos.latest, 1)
- self.assertTrue(repos.latest >= 0)
- self.assertTrue(repos.latest >= 1)
- self.assertTrue(repos.latest < 2)
-
- # Create a new script and test again
- repos.create_script('')
- self.assertEqual(repos.latest, 2)
- self.assertTrue(repos.latest >= 0)
- self.assertTrue(repos.latest >= 1)
- self.assertTrue(repos.latest >= 2)
- self.assertTrue(repos.latest < 3)
-
-
- def test_timestmap_numbering_version(self):
- repos = Repository(self.path_repos)
- repos.config.set('db_settings', 'use_timestamp_numbering', 'True')
-
- # Get latest version, or detect if a specified version exists
- self.assertEqual(repos.latest, 0)
- # repos.latest isn't an integer, but a VerNum
- # (so we can't just assume the following tests are correct)
- self.assertTrue(repos.latest >= 0)
- self.assertTrue(repos.latest < 1)
-
- # Create a script and test again
- now = int(datetime.utcnow().strftime('%Y%m%d%H%M%S'))
- repos.create_script('')
- self.assertEqual(repos.latest, now)
-
- def test_source(self):
- """Get a script object by version number and view its source"""
- # Load repository and commit script
- repo = Repository(self.path_repos)
- repo.create_script('')
- repo.create_script_sql('postgres', 'foo bar')
-
- # Source is valid: script must have an upgrade function
- # (not a very thorough test, but should be plenty)
- source = repo.version(1).script().source()
- self.assertTrue(source.find('def upgrade') >= 0)
-
- import pprint; pprint.pprint(repo.version(2).sql)
- source = repo.version(2).script('postgres', 'upgrade').source()
- self.assertEqual(source.strip(), '')
-
- def test_latestversion(self):
- """Repository.version() (no params) returns the latest version"""
- repos = Repository(self.path_repos)
- repos.create_script('')
- self.assertTrue(repos.version(repos.latest) is repos.version())
- self.assertTrue(repos.version() is not None)
-
- def test_changeset(self):
- """Repositories can create changesets properly"""
- # Create a nonzero-version repository of empty scripts
- repos = Repository(self.path_repos)
- for i in range(10):
- repos.create_script('')
-
- def check_changeset(params, length):
- """Creates and verifies a changeset"""
- changeset = repos.changeset('postgres', *params)
- self.assertEqual(len(changeset), length)
- self.assertTrue(isinstance(changeset, Changeset))
- uniq = list()
- # Changesets are iterable
- for version, change in changeset:
- self.assertTrue(isinstance(change, BaseScript))
- # Changes aren't identical
- self.assertTrue(id(change) not in uniq)
- uniq.append(id(change))
- return changeset
-
- # Upgrade to a specified version...
- cs = check_changeset((0, 10), 10)
- self.assertEqual(cs.keys().pop(0),0 ) # 0 -> 1: index is starting version
- self.assertEqual(cs.keys().pop(), 9) # 9 -> 10: index is starting version
- self.assertEqual(cs.start, 0) # starting version
- self.assertEqual(cs.end, 10) # ending version
- check_changeset((0, 1), 1)
- check_changeset((0, 5), 5)
- check_changeset((0, 0), 0)
- check_changeset((5, 5), 0)
- check_changeset((10, 10), 0)
- check_changeset((5, 10), 5)
-
- # Can't request a changeset of higher version than this repository
- self.assertRaises(Exception, repos.changeset, 'postgres', 5, 11)
- self.assertRaises(Exception, repos.changeset, 'postgres', -1, 5)
-
- # Upgrade to the latest version...
- cs = check_changeset((0,), 10)
- self.assertEqual(cs.keys().pop(0), 0)
- self.assertEqual(cs.keys().pop(), 9)
- self.assertEqual(cs.start, 0)
- self.assertEqual(cs.end, 10)
- check_changeset((1,), 9)
- check_changeset((5,), 5)
- check_changeset((9,), 1)
- check_changeset((10,), 0)
-
- # run changes
- cs.run('postgres', 'upgrade')
-
- # Can't request a changeset of higher/lower version than this repository
- self.assertRaises(Exception, repos.changeset, 'postgres', 11)
- self.assertRaises(Exception, repos.changeset, 'postgres', -1)
-
- # Downgrade
- cs = check_changeset((10, 0),10)
- self.assertEqual(cs.keys().pop(0), 10) # 10 -> 9
- self.assertEqual(cs.keys().pop(), 1) # 1 -> 0
- self.assertEqual(cs.start, 10)
- self.assertEqual(cs.end, 0)
- check_changeset((10, 5), 5)
- check_changeset((5, 0), 5)
-
- def test_many_versions(self):
- """Test what happens when lots of versions are created"""
- repos = Repository(self.path_repos)
- for i in range(1001):
- repos.create_script('')
-
- # since we normally create 3 digit ones, let's see if we blow up
- self.assertTrue(os.path.exists('%s/versions/1000.py' % self.path_repos))
- self.assertTrue(os.path.exists('%s/versions/1001.py' % self.path_repos))
-
-
-# TODO: test manage file
-# TODO: test changeset
diff --git a/migrate/tests/versioning/test_runchangeset.py b/migrate/tests/versioning/test_runchangeset.py
deleted file mode 100644
index 12bc77c..0000000
--- a/migrate/tests/versioning/test_runchangeset.py
+++ /dev/null
@@ -1,52 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import os,shutil
-
-from migrate.tests import fixture
-from migrate.versioning.schema import *
-from migrate.versioning import script
-
-
-class TestRunChangeset(fixture.Pathed,fixture.DB):
- level=fixture.DB.CONNECT
- def _setup(self, url):
- super(TestRunChangeset, self)._setup(url)
- Repository.clear()
- self.path_repos=self.tmp_repos()
- # Create repository, script
- Repository.create(self.path_repos,'repository_name')
-
- @fixture.usedb()
- def test_changeset_run(self):
- """Running a changeset against a repository gives expected results"""
- repos=Repository(self.path_repos)
- for i in range(10):
- repos.create_script('')
- try:
- ControlledSchema(self.engine,repos).drop()
- except:
- pass
- db=ControlledSchema.create(self.engine,repos)
-
- # Scripts are empty; we'll check version # correctness.
- # (Correct application of their content is checked elsewhere)
- self.assertEqual(db.version,0)
- db.upgrade(1)
- self.assertEqual(db.version,1)
- db.upgrade(5)
- self.assertEqual(db.version,5)
- db.upgrade(5)
- self.assertEqual(db.version,5)
- db.upgrade(None) # Latest is implied
- self.assertEqual(db.version,10)
- self.assertRaises(Exception,db.upgrade,11)
- self.assertEqual(db.version,10)
- db.upgrade(9)
- self.assertEqual(db.version,9)
- db.upgrade(0)
- self.assertEqual(db.version,0)
- self.assertRaises(Exception,db.upgrade,-1)
- self.assertEqual(db.version,0)
- #changeset = repos.changeset(self.url,0)
- db.drop()
diff --git a/migrate/tests/versioning/test_schema.py b/migrate/tests/versioning/test_schema.py
deleted file mode 100644
index 5396d9d..0000000
--- a/migrate/tests/versioning/test_schema.py
+++ /dev/null
@@ -1,205 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import os
-import shutil
-
-import six
-
-from migrate import exceptions
-from migrate.versioning.schema import *
-from migrate.versioning import script, schemadiff
-
-from sqlalchemy import *
-
-from migrate.tests import fixture
-
-
-class TestControlledSchema(fixture.Pathed, fixture.DB):
- # Transactions break postgres in this test; we'll clean up after ourselves
- level = fixture.DB.CONNECT
-
- def setUp(self):
- super(TestControlledSchema, self).setUp()
- self.path_repos = self.temp_usable_dir + '/repo/'
- self.repos = Repository.create(self.path_repos, 'repo_name')
-
- def _setup(self, url):
- self.setUp()
- super(TestControlledSchema, self)._setup(url)
- self.cleanup()
-
- def _teardown(self):
- super(TestControlledSchema, self)._teardown()
- self.cleanup()
- self.tearDown()
-
- def cleanup(self):
- # drop existing version table if necessary
- try:
- ControlledSchema(self.engine, self.repos).drop()
- except:
- # No table to drop; that's fine, be silent
- pass
-
- def tearDown(self):
- self.cleanup()
- super(TestControlledSchema, self).tearDown()
-
- @fixture.usedb()
- def test_version_control(self):
- """Establish version control on a particular database"""
- # Establish version control on this database
- dbcontrol = ControlledSchema.create(self.engine, self.repos)
-
- # Trying to create another DB this way fails: table exists
- self.assertRaises(exceptions.DatabaseAlreadyControlledError,
- ControlledSchema.create, self.engine, self.repos)
-
- # We can load a controlled DB this way, too
- dbcontrol0 = ControlledSchema(self.engine, self.repos)
- self.assertEqual(dbcontrol, dbcontrol0)
-
- # We can also use a repository path, instead of a repository
- dbcontrol0 = ControlledSchema(self.engine, self.repos.path)
- self.assertEqual(dbcontrol, dbcontrol0)
-
- # We don't have to use the same connection
- engine = create_engine(self.url)
- dbcontrol0 = ControlledSchema(engine, self.repos.path)
- self.assertEqual(dbcontrol, dbcontrol0)
-
- # Clean up:
- dbcontrol.drop()
-
- # Attempting to drop vc from a db without it should fail
- self.assertRaises(exceptions.DatabaseNotControlledError, dbcontrol.drop)
-
- # No table defined should raise error
- self.assertRaises(exceptions.DatabaseNotControlledError,
- ControlledSchema, self.engine, self.repos)
-
- @fixture.usedb()
- def test_version_control_specified(self):
- """Establish version control with a specified version"""
- # Establish version control on this database
- version = 0
- dbcontrol = ControlledSchema.create(self.engine, self.repos, version)
- self.assertEqual(dbcontrol.version, version)
-
- # Correct when we load it, too
- dbcontrol = ControlledSchema(self.engine, self.repos)
- self.assertEqual(dbcontrol.version, version)
-
- dbcontrol.drop()
-
- # Now try it with a nonzero value
- version = 10
- for i in range(version):
- self.repos.create_script('')
- self.assertEqual(self.repos.latest, version)
-
- # Test with some mid-range value
- dbcontrol = ControlledSchema.create(self.engine,self.repos, 5)
- self.assertEqual(dbcontrol.version, 5)
- dbcontrol.drop()
-
- # Test with max value
- dbcontrol = ControlledSchema.create(self.engine, self.repos, version)
- self.assertEqual(dbcontrol.version, version)
- dbcontrol.drop()
-
- @fixture.usedb()
- def test_version_control_invalid(self):
- """Try to establish version control with an invalid version"""
- versions = ('Thirteen', '-1', -1, '' , 13)
- # A fresh repository doesn't go up to version 13 yet
- for version in versions:
- #self.assertRaises(ControlledSchema.InvalidVersionError,
- # Can't have custom errors with assertRaises...
- try:
- ControlledSchema.create(self.engine, self.repos, version)
- self.assertTrue(False, repr(version))
- except exceptions.InvalidVersionError:
- pass
-
- @fixture.usedb()
- def test_changeset(self):
- """Create changeset from controlled schema"""
- dbschema = ControlledSchema.create(self.engine, self.repos)
-
- # empty schema doesn't have changesets
- cs = dbschema.changeset()
- self.assertEqual(cs, {})
-
- for i in range(5):
- self.repos.create_script('')
- self.assertEqual(self.repos.latest, 5)
-
- cs = dbschema.changeset(5)
- self.assertEqual(len(cs), 5)
-
- # cleanup
- dbschema.drop()
-
- @fixture.usedb()
- def test_upgrade_runchange(self):
- dbschema = ControlledSchema.create(self.engine, self.repos)
-
- for i in range(10):
- self.repos.create_script('')
-
- self.assertEqual(self.repos.latest, 10)
-
- dbschema.upgrade(10)
-
- self.assertRaises(ValueError, dbschema.upgrade, 'a')
- self.assertRaises(exceptions.InvalidVersionError, dbschema.runchange, 20, '', 1)
-
- # TODO: test for table version in db
-
- # cleanup
- dbschema.drop()
-
- @fixture.usedb()
- def test_create_model(self):
- """Test workflow to generate create_model"""
- model = ControlledSchema.create_model(self.engine, self.repos, declarative=False)
- self.assertTrue(isinstance(model, six.string_types))
-
- model = ControlledSchema.create_model(self.engine, self.repos.path, declarative=True)
- self.assertTrue(isinstance(model, six.string_types))
-
- @fixture.usedb()
- def test_compare_model_to_db(self):
- meta = self.construct_model()
-
- diff = ControlledSchema.compare_model_to_db(self.engine, meta, self.repos)
- self.assertTrue(isinstance(diff, schemadiff.SchemaDiff))
-
- diff = ControlledSchema.compare_model_to_db(self.engine, meta, self.repos.path)
- self.assertTrue(isinstance(diff, schemadiff.SchemaDiff))
- meta.drop_all(self.engine)
-
- @fixture.usedb()
- def test_update_db_from_model(self):
- dbschema = ControlledSchema.create(self.engine, self.repos)
-
- meta = self.construct_model()
-
- dbschema.update_db_from_model(meta)
-
- # TODO: test for table version in db
-
- # cleanup
- dbschema.drop()
- meta.drop_all(self.engine)
-
- def construct_model(self):
- meta = MetaData()
-
- user = Table('temp_model_schema', meta, Column('id', Integer), Column('user', String(245)))
-
- return meta
-
- # TODO: test how are tables populated in db
diff --git a/migrate/tests/versioning/test_schemadiff.py b/migrate/tests/versioning/test_schemadiff.py
deleted file mode 100644
index f45a012..0000000
--- a/migrate/tests/versioning/test_schemadiff.py
+++ /dev/null
@@ -1,227 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import os
-
-from sqlalchemy import *
-
-from migrate.versioning import schemadiff
-
-from migrate.tests import fixture
-
-class SchemaDiffBase(fixture.DB):
-
- level = fixture.DB.CONNECT
- def _make_table(self,*cols,**kw):
- self.table = Table('xtable', self.meta,
- Column('id',Integer(), primary_key=True),
- *cols
- )
- if kw.get('create',True):
- self.table.create()
-
- def _assert_diff(self,col_A,col_B):
- self._make_table(col_A)
- self.meta.clear()
- self._make_table(col_B,create=False)
- diff = self._run_diff()
- # print diff
- self.assertTrue(diff)
- self.assertEqual(1,len(diff.tables_different))
- td = list(diff.tables_different.values())[0]
- self.assertEqual(1,len(td.columns_different))
- cd = list(td.columns_different.values())[0]
- label_width = max(len(self.name1), len(self.name2))
- self.assertEqual(('Schema diffs:\n'
- ' table with differences: xtable\n'
- ' column with differences: data\n'
- ' %*s: %r\n'
- ' %*s: %r')%(
- label_width,
- self.name1,
- cd.col_A,
- label_width,
- self.name2,
- cd.col_B
- ),str(diff))
-
-class Test_getDiffOfModelAgainstDatabase(SchemaDiffBase):
- name1 = 'model'
- name2 = 'database'
-
- def _run_diff(self,**kw):
- return schemadiff.getDiffOfModelAgainstDatabase(
- self.meta, self.engine, **kw
- )
-
- @fixture.usedb()
- def test_table_missing_in_db(self):
- self._make_table(create=False)
- diff = self._run_diff()
- self.assertTrue(diff)
- self.assertEqual('Schema diffs:\n tables missing from %s: xtable' % self.name2,
- str(diff))
-
- @fixture.usedb()
- def test_table_missing_in_model(self):
- self._make_table()
- self.meta.clear()
- diff = self._run_diff()
- self.assertTrue(diff)
- self.assertEqual('Schema diffs:\n tables missing from %s: xtable' % self.name1,
- str(diff))
-
- @fixture.usedb()
- def test_column_missing_in_db(self):
- # db
- Table('xtable', self.meta,
- Column('id',Integer(), primary_key=True),
- ).create()
- self.meta.clear()
- # model
- self._make_table(
- Column('xcol',Integer()),
- create=False
- )
- # run diff
- diff = self._run_diff()
- self.assertTrue(diff)
- self.assertEqual('Schema diffs:\n'
- ' table with differences: xtable\n'
- ' %s missing these columns: xcol' % self.name2,
- str(diff))
-
- @fixture.usedb()
- def test_column_missing_in_model(self):
- # db
- self._make_table(
- Column('xcol',Integer()),
- )
- self.meta.clear()
- # model
- self._make_table(
- create=False
- )
- # run diff
- diff = self._run_diff()
- self.assertTrue(diff)
- self.assertEqual('Schema diffs:\n'
- ' table with differences: xtable\n'
- ' %s missing these columns: xcol' % self.name1,
- str(diff))
-
- @fixture.usedb()
- def test_exclude_tables(self):
- # db
- Table('ytable', self.meta,
- Column('id',Integer(), primary_key=True),
- ).create()
- Table('ztable', self.meta,
- Column('id',Integer(), primary_key=True),
- ).create()
- self.meta.clear()
- # model
- self._make_table(
- create=False
- )
- Table('ztable', self.meta,
- Column('id',Integer(), primary_key=True),
- )
- # run diff
- diff = self._run_diff(excludeTables=('xtable','ytable'))
- # ytable only in database
- # xtable only in model
- # ztable identical on both
- # ...so we expect no diff!
- self.assertFalse(diff)
- self.assertEqual('No schema diffs',str(diff))
-
- @fixture.usedb()
- def test_identical_just_pk(self):
- self._make_table()
- diff = self._run_diff()
- self.assertFalse(diff)
- self.assertEqual('No schema diffs',str(diff))
-
-
- @fixture.usedb()
- def test_different_type(self):
- self._assert_diff(
- Column('data', String(10)),
- Column('data', Integer()),
- )
-
- @fixture.usedb()
- def test_int_vs_float(self):
- self._assert_diff(
- Column('data', Integer()),
- Column('data', Float()),
- )
-
- # NOTE(mriedem): The ibm_db_sa driver handles the Float() as a DOUBLE()
- # which extends Numeric() but isn't defined in sqlalchemy.types, so we
- # can't check for it as a special case like is done in schemadiff.ColDiff.
- @fixture.usedb(not_supported='ibm_db_sa')
- def test_float_vs_numeric(self):
- self._assert_diff(
- Column('data', Float()),
- Column('data', Numeric()),
- )
-
- @fixture.usedb()
- def test_numeric_precision(self):
- self._assert_diff(
- Column('data', Numeric(precision=5)),
- Column('data', Numeric(precision=6)),
- )
-
- @fixture.usedb()
- def test_numeric_scale(self):
- self._assert_diff(
- Column('data', Numeric(precision=6,scale=0)),
- Column('data', Numeric(precision=6,scale=1)),
- )
-
- @fixture.usedb()
- def test_string_length(self):
- self._assert_diff(
- Column('data', String(10)),
- Column('data', String(20)),
- )
-
- @fixture.usedb()
- def test_integer_identical(self):
- self._make_table(
- Column('data', Integer()),
- )
- diff = self._run_diff()
- self.assertEqual('No schema diffs',str(diff))
- self.assertFalse(diff)
-
- @fixture.usedb()
- def test_string_identical(self):
- self._make_table(
- Column('data', String(10)),
- )
- diff = self._run_diff()
- self.assertEqual('No schema diffs',str(diff))
- self.assertFalse(diff)
-
- @fixture.usedb()
- def test_text_identical(self):
- self._make_table(
- Column('data', Text),
- )
- diff = self._run_diff()
- self.assertEqual('No schema diffs',str(diff))
- self.assertFalse(diff)
-
-class Test_getDiffOfModelAgainstModel(Test_getDiffOfModelAgainstDatabase):
- name1 = 'metadataA'
- name2 = 'metadataB'
-
- def _run_diff(self,**kw):
- db_meta= MetaData()
- db_meta.reflect(self.engine)
- return schemadiff.getDiffOfModelAgainstModel(
- self.meta, db_meta, **kw
- )
diff --git a/migrate/tests/versioning/test_script.py b/migrate/tests/versioning/test_script.py
deleted file mode 100644
index 20e6af0..0000000
--- a/migrate/tests/versioning/test_script.py
+++ /dev/null
@@ -1,305 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import imp
-import os
-import sys
-import shutil
-
-import six
-from migrate import exceptions
-from migrate.versioning import version, repository
-from migrate.versioning.script import *
-from migrate.versioning.util import *
-
-from migrate.tests import fixture
-from migrate.tests.fixture.models import tmp_sql_table
-
-
-class TestBaseScript(fixture.Pathed):
-
- def test_all(self):
- """Testing all basic BaseScript operations"""
- # verify / source / run
- src = self.tmp()
- open(src, 'w').close()
- bscript = BaseScript(src)
- BaseScript.verify(src)
- self.assertEqual(bscript.source(), '')
- self.assertRaises(NotImplementedError, bscript.run, 'foobar')
-
-
-class TestPyScript(fixture.Pathed, fixture.DB):
- cls = PythonScript
- def test_create(self):
- """We can create a migration script"""
- path = self.tmp_py()
- # Creating a file that doesn't exist should succeed
- self.cls.create(path)
- self.assertTrue(os.path.exists(path))
- # Created file should be a valid script (If not, raises an error)
- self.cls.verify(path)
- # Can't create it again: it already exists
- self.assertRaises(exceptions.PathFoundError,self.cls.create,path)
-
- @fixture.usedb(supported='sqlite')
- def test_run(self):
- script_path = self.tmp_py()
- pyscript = PythonScript.create(script_path)
- pyscript.run(self.engine, 1)
- pyscript.run(self.engine, -1)
-
- self.assertRaises(exceptions.ScriptError, pyscript.run, self.engine, 0)
- self.assertRaises(exceptions.ScriptError, pyscript._func, 'foobar')
-
- # clean pyc file
- if six.PY3:
- os.remove(imp.cache_from_source(script_path))
- else:
- os.remove(script_path + 'c')
-
- # test deprecated upgrade/downgrade with no arguments
- contents = open(script_path, 'r').read()
- f = open(script_path, 'w')
- f.write(contents.replace("upgrade(migrate_engine)", "upgrade()"))
- f.close()
-
- pyscript = PythonScript(script_path)
- pyscript._module = None
- try:
- pyscript.run(self.engine, 1)
- pyscript.run(self.engine, -1)
- except exceptions.ScriptError:
- pass
- else:
- self.fail()
-
- def test_verify_notfound(self):
- """Correctly verify a python migration script: nonexistant file"""
- path = self.tmp_py()
- self.assertFalse(os.path.exists(path))
- # Fails on empty path
- self.assertRaises(exceptions.InvalidScriptError,self.cls.verify,path)
- self.assertRaises(exceptions.InvalidScriptError,self.cls,path)
-
- def test_verify_invalidpy(self):
- """Correctly verify a python migration script: invalid python file"""
- path=self.tmp_py()
- # Create empty file
- f = open(path,'w')
- f.write("def fail")
- f.close()
- self.assertRaises(Exception,self.cls.verify_module,path)
- # script isn't verified on creation, but on module reference
- py = self.cls(path)
- self.assertRaises(Exception,(lambda x: x.module),py)
-
- def test_verify_nofuncs(self):
- """Correctly verify a python migration script: valid python file; no upgrade func"""
- path = self.tmp_py()
- # Create empty file
- f = open(path, 'w')
- f.write("def zergling():\n\tprint('rush')")
- f.close()
- self.assertRaises(exceptions.InvalidScriptError, self.cls.verify_module, path)
- # script isn't verified on creation, but on module reference
- py = self.cls(path)
- self.assertRaises(exceptions.InvalidScriptError,(lambda x: x.module),py)
-
- @fixture.usedb(supported='sqlite')
- def test_preview_sql(self):
- """Preview SQL abstract from ORM layer (sqlite)"""
- path = self.tmp_py()
-
- f = open(path, 'w')
- content = '''
-from migrate import *
-from sqlalchemy import *
-
-metadata = MetaData()
-
-UserGroup = Table('Link', metadata,
- Column('link1ID', Integer),
- Column('link2ID', Integer),
- UniqueConstraint('link1ID', 'link2ID'))
-
-def upgrade(migrate_engine):
- metadata.create_all(migrate_engine)
- '''
- f.write(content)
- f.close()
-
- pyscript = self.cls(path)
- SQL = pyscript.preview_sql(self.url, 1)
- self.assertEqualIgnoreWhitespace("""
- CREATE TABLE "Link"
- ("link1ID" INTEGER,
- "link2ID" INTEGER,
- UNIQUE ("link1ID", "link2ID"))
- """, SQL)
- # TODO: test: No SQL should be executed!
-
- def test_verify_success(self):
- """Correctly verify a python migration script: success"""
- path = self.tmp_py()
- # Succeeds after creating
- self.cls.create(path)
- self.cls.verify(path)
-
- # test for PythonScript.make_update_script_for_model
-
- @fixture.usedb()
- def test_make_update_script_for_model(self):
- """Construct script source from differences of two models"""
-
- self.setup_model_params()
- self.write_file(self.first_model_path, self.base_source)
- self.write_file(self.second_model_path, self.base_source + self.model_source)
-
- source_script = self.pyscript.make_update_script_for_model(
- engine=self.engine,
- oldmodel=load_model('testmodel_first:meta'),
- model=load_model('testmodel_second:meta'),
- repository=self.repo_path,
- )
-
- self.assertTrue("['User'].create()" in source_script)
- self.assertTrue("['User'].drop()" in source_script)
-
- @fixture.usedb()
- def test_make_update_script_for_equal_models(self):
- """Try to make update script from two identical models"""
-
- self.setup_model_params()
- self.write_file(self.first_model_path, self.base_source + self.model_source)
- self.write_file(self.second_model_path, self.base_source + self.model_source)
-
- source_script = self.pyscript.make_update_script_for_model(
- engine=self.engine,
- oldmodel=load_model('testmodel_first:meta'),
- model=load_model('testmodel_second:meta'),
- repository=self.repo_path,
- )
-
- self.assertFalse('User.create()' in source_script)
- self.assertFalse('User.drop()' in source_script)
-
- @fixture.usedb()
- def test_make_update_script_direction(self):
- """Check update scripts go in the right direction"""
-
- self.setup_model_params()
- self.write_file(self.first_model_path, self.base_source)
- self.write_file(self.second_model_path, self.base_source + self.model_source)
-
- source_script = self.pyscript.make_update_script_for_model(
- engine=self.engine,
- oldmodel=load_model('testmodel_first:meta'),
- model=load_model('testmodel_second:meta'),
- repository=self.repo_path,
- )
-
- self.assertTrue(0
- < source_script.find('upgrade')
- < source_script.find("['User'].create()")
- < source_script.find('downgrade')
- < source_script.find("['User'].drop()"))
-
- def setup_model_params(self):
- self.script_path = self.tmp_py()
- self.repo_path = self.tmp()
- self.first_model_path = os.path.join(self.temp_usable_dir, 'testmodel_first.py')
- self.second_model_path = os.path.join(self.temp_usable_dir, 'testmodel_second.py')
-
- self.base_source = """from sqlalchemy import *\nmeta = MetaData()\n"""
- self.model_source = """
-User = Table('User', meta,
- Column('id', Integer, primary_key=True),
- Column('login', Unicode(40)),
- Column('passwd', String(40)),
-)"""
-
- self.repo = repository.Repository.create(self.repo_path, 'repo')
- self.pyscript = PythonScript.create(self.script_path)
- sys.modules.pop('testmodel_first', None)
- sys.modules.pop('testmodel_second', None)
-
- def write_file(self, path, contents):
- f = open(path, 'w')
- f.write(contents)
- f.close()
-
-
-class TestSqlScript(fixture.Pathed, fixture.DB):
-
- @fixture.usedb()
- def test_error(self):
- """Test if exception is raised on wrong script source"""
- src = self.tmp()
-
- f = open(src, 'w')
- f.write("""foobar""")
- f.close()
-
- sqls = SqlScript(src)
- self.assertRaises(Exception, sqls.run, self.engine)
-
- @fixture.usedb()
- def test_success(self):
- """Test sucessful SQL execution"""
- # cleanup and prepare python script
- tmp_sql_table.metadata.drop_all(self.engine, checkfirst=True)
- script_path = self.tmp_py()
- pyscript = PythonScript.create(script_path)
-
- # populate python script
- contents = open(script_path, 'r').read()
- contents = contents.replace("pass", "tmp_sql_table.create(migrate_engine)")
- contents = 'from migrate.tests.fixture.models import tmp_sql_table\n' + contents
- f = open(script_path, 'w')
- f.write(contents)
- f.close()
-
- # write SQL script from python script preview
- pyscript = PythonScript(script_path)
- src = self.tmp()
- f = open(src, 'w')
- f.write(pyscript.preview_sql(self.url, 1))
- f.close()
-
- # run the change
- sqls = SqlScript(src)
- sqls.run(self.engine)
- tmp_sql_table.metadata.drop_all(self.engine, checkfirst=True)
-
- @fixture.usedb()
- def test_transaction_management_statements(self):
- """
- Test that we can successfully execute SQL scripts with transaction
- management statements.
- """
- for script_pattern in (
- "BEGIN TRANSACTION; %s; COMMIT;",
- "BEGIN; %s; END TRANSACTION;",
- "/* comment */BEGIN TRANSACTION; %s; /* comment */COMMIT;",
- "/* comment */ BEGIN TRANSACTION; %s; /* comment */ COMMIT;",
- """
--- comment
-BEGIN TRANSACTION;
-
-%s;
-
--- comment
-COMMIT;""",
- ):
-
- test_statement = ("CREATE TABLE TEST1 (field1 int); "
- "DROP TABLE TEST1")
- script = script_pattern % test_statement
- src = self.tmp()
-
- with open(src, 'wt') as f:
- f.write(script)
-
- sqls = SqlScript(src)
- sqls.run(self.engine)
diff --git a/migrate/tests/versioning/test_shell.py b/migrate/tests/versioning/test_shell.py
deleted file mode 100644
index 001efcf..0000000
--- a/migrate/tests/versioning/test_shell.py
+++ /dev/null
@@ -1,574 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import os
-import sys
-import tempfile
-
-import six
-from six.moves import cStringIO
-from sqlalchemy import MetaData, Table
-
-from migrate.exceptions import *
-from migrate.versioning.repository import Repository
-from migrate.versioning import genmodel, shell, api
-from migrate.tests.fixture import Shell, DB, usedb
-from migrate.tests.fixture import models
-
-
-class TestShellCommands(Shell):
- """Tests migrate.py commands"""
-
- def test_help(self):
- """Displays default help dialog"""
- self.assertEqual(self.env.run('migrate -h').returncode, 0)
- self.assertEqual(self.env.run('migrate --help').returncode, 0)
- self.assertEqual(self.env.run('migrate help').returncode, 0)
-
- def test_help_commands(self):
- """Display help on a specific command"""
- # we can only test that we get some output
- for cmd in api.__all__:
- result = self.env.run('migrate help %s' % cmd)
- self.assertTrue(isinstance(result.stdout, six.string_types))
- self.assertTrue(result.stdout)
- self.assertFalse(result.stderr)
-
- def test_shutdown_logging(self):
- """Try to shutdown logging output"""
- repos = self.tmp_repos()
- result = self.env.run('migrate create %s repository_name' % repos)
- result = self.env.run('migrate version %s --disable_logging' % repos)
- self.assertEqual(result.stdout, '')
- result = self.env.run('migrate version %s -q' % repos)
- self.assertEqual(result.stdout, '')
-
- # TODO: assert logging messages to 0
- shell.main(['version', repos], logging=False)
-
- def test_main_with_runpy(self):
- if sys.version_info[:2] == (2, 4):
- self.skipTest("runpy is not part of python2.4")
- from runpy import run_module
- try:
- original = sys.argv
- sys.argv=['X','--help']
-
- run_module('migrate.versioning.shell', run_name='__main__')
-
- finally:
- sys.argv = original
-
- def _check_error(self,args,code,expected,**kw):
- original = sys.stderr
- try:
- actual = cStringIO()
- sys.stderr = actual
- try:
- shell.main(args,**kw)
- except SystemExit as e:
- self.assertEqual(code,e.args[0])
- else:
- self.fail('No exception raised')
- finally:
- sys.stderr = original
- actual = actual.getvalue()
- self.assertTrue(expected in actual,'%r not in:\n"""\n%s\n"""'%(expected,actual))
-
- def test_main(self):
- """Test main() function"""
- repos = self.tmp_repos()
- shell.main(['help'])
- shell.main(['help', 'create'])
- shell.main(['create', 'repo_name', '--preview_sql'], repository=repos)
- shell.main(['version', '--', '--repository=%s' % repos])
- shell.main(['version', '-d', '--repository=%s' % repos, '--version=2'])
-
- self._check_error(['foobar'],2,'error: Invalid command foobar')
- self._check_error(['create', 'f', 'o', 'o'],2,'error: Too many arguments for command create: o')
- self._check_error(['create'],2,'error: Not enough arguments for command create: name, repository not specified')
- self._check_error(['create', 'repo_name'],2,'already exists', repository=repos)
-
- def test_create(self):
- """Repositories are created successfully"""
- repos = self.tmp_repos()
-
- # Creating a file that doesn't exist should succeed
- result = self.env.run('migrate create %s repository_name' % repos)
-
- # Files should actually be created
- self.assertTrue(os.path.exists(repos))
-
- # The default table should not be None
- repos_ = Repository(repos)
- self.assertNotEqual(repos_.config.get('db_settings', 'version_table'), 'None')
-
- # Can't create it again: it already exists
- result = self.env.run('migrate create %s repository_name' % repos,
- expect_error=True)
- self.assertEqual(result.returncode, 2)
-
- def test_script(self):
- """We can create a migration script via the command line"""
- repos = self.tmp_repos()
- result = self.env.run('migrate create %s repository_name' % repos)
-
- result = self.env.run('migrate script --repository=%s Desc' % repos)
- self.assertTrue(os.path.exists('%s/versions/001_Desc.py' % repos))
-
- result = self.env.run('migrate script More %s' % repos)
- self.assertTrue(os.path.exists('%s/versions/002_More.py' % repos))
-
- result = self.env.run('migrate script "Some Random name" %s' % repos)
- self.assertTrue(os.path.exists('%s/versions/003_Some_Random_name.py' % repos))
-
- def test_script_sql(self):
- """We can create a migration sql script via the command line"""
- repos = self.tmp_repos()
- result = self.env.run('migrate create %s repository_name' % repos)
-
- result = self.env.run('migrate script_sql mydb foo %s' % repos)
- self.assertTrue(os.path.exists('%s/versions/001_foo_mydb_upgrade.sql' % repos))
- self.assertTrue(os.path.exists('%s/versions/001_foo_mydb_downgrade.sql' % repos))
-
- # Test creating a second
- result = self.env.run('migrate script_sql postgres foo --repository=%s' % repos)
- self.assertTrue(os.path.exists('%s/versions/002_foo_postgres_upgrade.sql' % repos))
- self.assertTrue(os.path.exists('%s/versions/002_foo_postgres_downgrade.sql' % repos))
-
- # TODO: test --previews
-
- def test_manage(self):
- """Create a project management script"""
- script = self.tmp_py()
- self.assertTrue(not os.path.exists(script))
-
- # No attempt is made to verify correctness of the repository path here
- result = self.env.run('migrate manage %s --repository=/bla/' % script)
- self.assertTrue(os.path.exists(script))
-
-
-class TestShellRepository(Shell):
- """Shell commands on an existing repository/python script"""
-
- def setUp(self):
- """Create repository, python change script"""
- super(TestShellRepository, self).setUp()
- self.path_repos = self.tmp_repos()
- result = self.env.run('migrate create %s repository_name' % self.path_repos)
-
- def test_version(self):
- """Correctly detect repository version"""
- # Version: 0 (no scripts yet); successful execution
- result = self.env.run('migrate version --repository=%s' % self.path_repos)
- self.assertEqual(result.stdout.strip(), "0")
-
- # Also works as a positional param
- result = self.env.run('migrate version %s' % self.path_repos)
- self.assertEqual(result.stdout.strip(), "0")
-
- # Create a script and version should increment
- result = self.env.run('migrate script Desc %s' % self.path_repos)
- result = self.env.run('migrate version %s' % self.path_repos)
- self.assertEqual(result.stdout.strip(), "1")
-
- def test_source(self):
- """Correctly fetch a script's source"""
- result = self.env.run('migrate script Desc --repository=%s' % self.path_repos)
-
- filename = '%s/versions/001_Desc.py' % self.path_repos
- source = open(filename).read()
- self.assertTrue(source.find('def upgrade') >= 0)
-
- # Version is now 1
- result = self.env.run('migrate version %s' % self.path_repos)
- self.assertEqual(result.stdout.strip(), "1")
-
- # Output/verify the source of version 1
- result = self.env.run('migrate source 1 --repository=%s' % self.path_repos)
- self.assertEqual(result.stdout.strip(), source.strip())
-
- # We can also send the source to a file... test that too
- result = self.env.run('migrate source 1 %s --repository=%s' %
- (filename, self.path_repos))
- self.assertTrue(os.path.exists(filename))
- fd = open(filename)
- result = fd.read()
- self.assertTrue(result.strip() == source.strip())
-
-
-class TestShellDatabase(Shell, DB):
- """Commands associated with a particular database"""
- # We'll need to clean up after ourself, since the shell creates its own txn;
- # we need to connect to the DB to see if things worked
-
- level = DB.CONNECT
-
- @usedb()
- def test_version_control(self):
- """Ensure we can set version control on a database"""
- path_repos = repos = self.tmp_repos()
- url = self.url
- result = self.env.run('migrate create %s repository_name' % repos)
-
- result = self.env.run('migrate drop_version_control %(url)s %(repos)s'\
- % locals(), expect_error=True)
- self.assertEqual(result.returncode, 1)
- result = self.env.run('migrate version_control %(url)s %(repos)s' % locals())
-
- # Clean up
- result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals())
- # Attempting to drop vc from a database without it should fail
- result = self.env.run('migrate drop_version_control %(url)s %(repos)s'\
- % locals(), expect_error=True)
- self.assertEqual(result.returncode, 1)
-
- @usedb()
- def test_wrapped_kwargs(self):
- """Commands with default arguments set by manage.py"""
- path_repos = repos = self.tmp_repos()
- url = self.url
- result = self.env.run('migrate create --name=repository_name %s' % repos)
- result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals(), expect_error=True)
- self.assertEqual(result.returncode, 1)
- result = self.env.run('migrate version_control %(url)s %(repos)s' % locals())
-
- result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals())
-
- @usedb()
- def test_version_control_specified(self):
- """Ensure we can set version control to a particular version"""
- path_repos = self.tmp_repos()
- url = self.url
- result = self.env.run('migrate create --name=repository_name %s' % path_repos)
- result = self.env.run('migrate drop_version_control %(url)s %(path_repos)s' % locals(), expect_error=True)
- self.assertEqual(result.returncode, 1)
-
- # Fill the repository
- path_script = self.tmp_py()
- version = 2
- for i in range(version):
- result = self.env.run('migrate script Desc --repository=%s' % path_repos)
-
- # Repository version is correct
- result = self.env.run('migrate version %s' % path_repos)
- self.assertEqual(result.stdout.strip(), str(version))
-
- # Apply versioning to DB
- result = self.env.run('migrate version_control %(url)s %(path_repos)s %(version)s' % locals())
-
- # Test db version number (should start at 2)
- result = self.env.run('migrate db_version %(url)s %(path_repos)s' % locals())
- self.assertEqual(result.stdout.strip(), str(version))
-
- # Clean up
- result = self.env.run('migrate drop_version_control %(url)s %(path_repos)s' % locals())
-
- @usedb()
- def test_upgrade(self):
- """Can upgrade a versioned database"""
- # Create a repository
- repos_name = 'repos_name'
- repos_path = self.tmp()
- result = self.env.run('migrate create %(repos_path)s %(repos_name)s' % locals())
- self.assertEqual(self.run_version(repos_path), 0)
-
- # Version the DB
- result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
- result = self.env.run('migrate version_control %s %s' % (self.url, repos_path))
-
- # Upgrades with latest version == 0
- self.assertEqual(self.run_db_version(self.url, repos_path), 0)
- result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path))
- self.assertEqual(self.run_db_version(self.url, repos_path), 0)
- result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path))
- self.assertEqual(self.run_db_version(self.url, repos_path), 0)
- result = self.env.run('migrate upgrade %s %s 1' % (self.url, repos_path), expect_error=True)
- self.assertEqual(result.returncode, 1)
- result = self.env.run('migrate upgrade %s %s -1' % (self.url, repos_path), expect_error=True)
- self.assertEqual(result.returncode, 2)
-
- # Add a script to the repository; upgrade the db
- result = self.env.run('migrate script Desc --repository=%s' % (repos_path))
- self.assertEqual(self.run_version(repos_path), 1)
- self.assertEqual(self.run_db_version(self.url, repos_path), 0)
-
- # Test preview
- result = self.env.run('migrate upgrade %s %s 0 --preview_sql' % (self.url, repos_path))
- result = self.env.run('migrate upgrade %s %s 0 --preview_py' % (self.url, repos_path))
-
- result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path))
- self.assertEqual(self.run_db_version(self.url, repos_path), 1)
-
- # Downgrade must have a valid version specified
- result = self.env.run('migrate downgrade %s %s' % (self.url, repos_path), expect_error=True)
- self.assertEqual(result.returncode, 2)
- result = self.env.run('migrate downgrade %s %s -1' % (self.url, repos_path), expect_error=True)
- self.assertEqual(result.returncode, 2)
- result = self.env.run('migrate downgrade %s %s 2' % (self.url, repos_path), expect_error=True)
- self.assertEqual(result.returncode, 2)
- self.assertEqual(self.run_db_version(self.url, repos_path), 1)
-
- result = self.env.run('migrate downgrade %s %s 0' % (self.url, repos_path))
- self.assertEqual(self.run_db_version(self.url, repos_path), 0)
-
- result = self.env.run('migrate downgrade %s %s 1' % (self.url, repos_path), expect_error=True)
- self.assertEqual(result.returncode, 2)
- self.assertEqual(self.run_db_version(self.url, repos_path), 0)
-
- result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path))
-
- def _run_test_sqlfile(self, upgrade_script, downgrade_script):
- # TODO: add test script that checks if db really changed
- repos_path = self.tmp()
- repos_name = 'repos'
-
- result = self.env.run('migrate create %s %s' % (repos_path, repos_name))
- result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
- result = self.env.run('migrate version_control %s %s' % (self.url, repos_path))
- self.assertEqual(self.run_version(repos_path), 0)
- self.assertEqual(self.run_db_version(self.url, repos_path), 0)
-
- beforeCount = len(os.listdir(os.path.join(repos_path, 'versions'))) # hmm, this number changes sometimes based on running from svn
- result = self.env.run('migrate script_sql %s --repository=%s' % ('postgres', repos_path))
- self.assertEqual(self.run_version(repos_path), 1)
- self.assertEqual(len(os.listdir(os.path.join(repos_path, 'versions'))), beforeCount + 2)
-
- open('%s/versions/001_postgres_upgrade.sql' % repos_path, 'a').write(upgrade_script)
- open('%s/versions/001_postgres_downgrade.sql' % repos_path, 'a').write(downgrade_script)
-
- self.assertEqual(self.run_db_version(self.url, repos_path), 0)
- self.assertRaises(Exception, self.engine.text('select * from t_table').execute)
-
- result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path))
- self.assertEqual(self.run_db_version(self.url, repos_path), 1)
- self.engine.text('select * from t_table').execute()
-
- result = self.env.run('migrate downgrade %s %s 0' % (self.url, repos_path))
- self.assertEqual(self.run_db_version(self.url, repos_path), 0)
- self.assertRaises(Exception, self.engine.text('select * from t_table').execute)
-
- # The tests below are written with some postgres syntax, but the stuff
- # being tested (.sql files) ought to work with any db.
- @usedb(supported='postgres')
- def test_sqlfile(self):
- upgrade_script = """
- create table t_table (
- id serial,
- primary key(id)
- );
- """
- downgrade_script = """
- drop table t_table;
- """
- self.meta.drop_all()
- self._run_test_sqlfile(upgrade_script, downgrade_script)
-
- @usedb(supported='postgres')
- def test_sqlfile_comment(self):
- upgrade_script = """
- -- Comments in SQL break postgres autocommit
- create table t_table (
- id serial,
- primary key(id)
- );
- """
- downgrade_script = """
- -- Comments in SQL break postgres autocommit
- drop table t_table;
- """
- self._run_test_sqlfile(upgrade_script, downgrade_script)
-
- @usedb()
- def test_command_test(self):
- repos_name = 'repos_name'
- repos_path = self.tmp()
-
- result = self.env.run('migrate create repository_name --repository=%s' % repos_path)
- result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
- result = self.env.run('migrate version_control %s %s' % (self.url, repos_path))
- self.assertEqual(self.run_version(repos_path), 0)
- self.assertEqual(self.run_db_version(self.url, repos_path), 0)
-
- # Empty script should succeed
- result = self.env.run('migrate script Desc %s' % repos_path)
- result = self.env.run('migrate test %s %s' % (self.url, repos_path))
- self.assertEqual(self.run_version(repos_path), 1)
- self.assertEqual(self.run_db_version(self.url, repos_path), 0)
-
- # Error script should fail
- script_path = self.tmp_py()
- script_text='''
- from sqlalchemy import *
- from migrate import *
-
- def upgrade():
- print 'fgsfds'
- raise Exception()
-
- def downgrade():
- print 'sdfsgf'
- raise Exception()
- '''.replace("\n ", "\n")
- file = open(script_path, 'w')
- file.write(script_text)
- file.close()
-
- result = self.env.run('migrate test %s %s bla' % (self.url, repos_path), expect_error=True)
- self.assertEqual(result.returncode, 2)
- self.assertEqual(self.run_version(repos_path), 1)
- self.assertEqual(self.run_db_version(self.url, repos_path), 0)
-
- # Nonempty script using migrate_engine should succeed
- script_path = self.tmp_py()
- script_text = '''
- from sqlalchemy import *
- from migrate import *
-
- from migrate.changeset import schema
-
- meta = MetaData(migrate_engine)
- account = Table('account', meta,
- Column('id', Integer, primary_key=True),
- Column('login', Text),
- Column('passwd', Text),
- )
- def upgrade():
- # Upgrade operations go here. Don't create your own engine; use the engine
- # named 'migrate_engine' imported from migrate.
- meta.create_all()
-
- def downgrade():
- # Operations to reverse the above upgrade go here.
- meta.drop_all()
- '''.replace("\n ", "\n")
- file = open(script_path, 'w')
- file.write(script_text)
- file.close()
- result = self.env.run('migrate test %s %s' % (self.url, repos_path))
- self.assertEqual(self.run_version(repos_path), 1)
- self.assertEqual(self.run_db_version(self.url, repos_path), 0)
-
- @usedb()
- def test_rundiffs_in_shell(self):
- # This is a variant of the test_schemadiff tests but run through the shell level.
- # These shell tests are hard to debug (since they keep forking processes)
- # so they shouldn't replace the lower-level tests.
- repos_name = 'repos_name'
- repos_path = self.tmp()
- script_path = self.tmp_py()
- model_module = 'migrate.tests.fixture.models:meta_rundiffs'
- old_model_module = 'migrate.tests.fixture.models:meta_old_rundiffs'
-
- # Create empty repository.
- self.meta = MetaData(self.engine)
- self.meta.reflect()
- self.meta.drop_all() # in case junk tables are lying around in the test database
-
- result = self.env.run(
- 'migrate create %s %s' % (repos_path, repos_name),
- expect_stderr=True)
- result = self.env.run(
- 'migrate drop_version_control %s %s' % (self.url, repos_path),
- expect_stderr=True, expect_error=True)
- result = self.env.run(
- 'migrate version_control %s %s' % (self.url, repos_path),
- expect_stderr=True)
- self.assertEqual(self.run_version(repos_path), 0)
- self.assertEqual(self.run_db_version(self.url, repos_path), 0)
-
- # Setup helper script.
- result = self.env.run(
- 'migrate manage %s --repository=%s --url=%s --model=%s'\
- % (script_path, repos_path, self.url, model_module),
- expect_stderr=True)
- self.assertTrue(os.path.exists(script_path))
-
- # Model is defined but database is empty.
- result = self.env.run('migrate compare_model_to_db %s %s --model=%s' \
- % (self.url, repos_path, model_module), expect_stderr=True)
- self.assertTrue(
- "tables missing from database: tmp_account_rundiffs"
- in result.stdout)
-
- # Test Deprecation
- result = self.env.run('migrate compare_model_to_db %s %s --model=%s' \
- % (self.url, repos_path, model_module.replace(":", ".")),
- expect_stderr=True, expect_error=True)
- self.assertEqual(result.returncode, 0)
- self.assertTrue(
- "tables missing from database: tmp_account_rundiffs"
- in result.stdout)
-
- # Update db to latest model.
- result = self.env.run('migrate update_db_from_model %s %s %s'\
- % (self.url, repos_path, model_module), expect_stderr=True)
- self.assertEqual(self.run_version(repos_path), 0)
- self.assertEqual(self.run_db_version(self.url, repos_path), 0) # version did not get bumped yet because new version not yet created
-
- result = self.env.run('migrate compare_model_to_db %s %s %s'\
- % (self.url, repos_path, model_module), expect_stderr=True)
- self.assertTrue("No schema diffs" in result.stdout)
-
- result = self.env.run(
- 'migrate drop_version_control %s %s' % (self.url, repos_path),
- expect_stderr=True, expect_error=True)
- result = self.env.run(
- 'migrate version_control %s %s' % (self.url, repos_path),
- expect_stderr=True)
-
- result = self.env.run(
- 'migrate create_model %s %s' % (self.url, repos_path),
- expect_stderr=True)
- temp_dict = dict()
- six.exec_(result.stdout, temp_dict)
-
- # TODO: breaks on SA06 and SA05 - in need of total refactor - use different approach
-
- # TODO: compare whole table
- self.compare_columns_equal(models.tmp_account_rundiffs.c, temp_dict['tmp_account_rundiffs'].c, ['type'])
- ##self.assertTrue("""tmp_account_rundiffs = Table('tmp_account_rundiffs', meta,
- ##Column('id', Integer(), primary_key=True, nullable=False),
- ##Column('login', String(length=None, convert_unicode=False, assert_unicode=None)),
- ##Column('passwd', String(length=None, convert_unicode=False, assert_unicode=None))""" in result.stdout)
-
- ## We're happy with db changes, make first db upgrade script to go from version 0 -> 1.
- #result = self.env.run('migrate make_update_script_for_model', expect_error=True, expect_stderr=True)
- #self.assertTrue('Not enough arguments' in result.stderr)
-
- #result_script = self.env.run('migrate make_update_script_for_model %s %s %s %s'\
- #% (self.url, repos_path, old_model_module, model_module))
- #self.assertEqualIgnoreWhitespace(result_script.stdout,
- #'''from sqlalchemy import *
- #from migrate import *
-
- #from migrate.changeset import schema
-
- #meta = MetaData()
- #tmp_account_rundiffs = Table('tmp_account_rundiffs', meta,
- #Column('id', Integer(), primary_key=True, nullable=False),
- #Column('login', Text(length=None, convert_unicode=False, assert_unicode=None, unicode_error=None, _warn_on_bytestring=False)),
- #Column('passwd', Text(length=None, convert_unicode=False, assert_unicode=None, unicode_error=None, _warn_on_bytestring=False)),
- #)
-
- #def upgrade(migrate_engine):
- ## Upgrade operations go here. Don't create your own engine; bind migrate_engine
- ## to your metadata
- #meta.bind = migrate_engine
- #tmp_account_rundiffs.create()
-
- #def downgrade(migrate_engine):
- ## Operations to reverse the above upgrade go here.
- #meta.bind = migrate_engine
- #tmp_account_rundiffs.drop()''')
-
- ## Save the upgrade script.
- #result = self.env.run('migrate script Desc %s' % repos_path)
- #upgrade_script_path = '%s/versions/001_Desc.py' % repos_path
- #open(upgrade_script_path, 'w').write(result_script.stdout)
-
- #result = self.env.run('migrate compare_model_to_db %s %s %s'\
- #% (self.url, repos_path, model_module))
- #self.assertTrue("No schema diffs" in result.stdout)
-
- self.meta.drop_all() # in case junk tables are lying around in the test database
diff --git a/migrate/tests/versioning/test_template.py b/migrate/tests/versioning/test_template.py
deleted file mode 100644
index a079d8b..0000000
--- a/migrate/tests/versioning/test_template.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-
-import os
-import shutil
-
-import migrate.versioning.templates
-from migrate.versioning.template import *
-from migrate.versioning import api
-
-from migrate.tests import fixture
-
-
-class TestTemplate(fixture.Pathed):
- def test_templates(self):
- """We can find the path to all repository templates"""
- path = str(Template())
- self.assertTrue(os.path.exists(path))
-
- def test_repository(self):
- """We can find the path to the default repository"""
- path = Template().get_repository()
- self.assertTrue(os.path.exists(path))
-
- def test_script(self):
- """We can find the path to the default migration script"""
- path = Template().get_script()
- self.assertTrue(os.path.exists(path))
-
- def test_custom_templates_and_themes(self):
- """Users can define their own templates with themes"""
- new_templates_dir = os.path.join(self.temp_usable_dir, 'templates')
- manage_tmpl_file = os.path.join(new_templates_dir, 'manage/custom.py_tmpl')
- repository_tmpl_file = os.path.join(new_templates_dir, 'repository/custom/README')
- script_tmpl_file = os.path.join(new_templates_dir, 'script/custom.py_tmpl')
- sql_script_tmpl_file = os.path.join(new_templates_dir, 'sql_script/custom.py_tmpl')
-
- MANAGE_CONTENTS = 'print "manage.py"'
- README_CONTENTS = 'MIGRATE README!'
- SCRIPT_FILE_CONTENTS = 'print "script.py"'
- new_repo_dest = self.tmp_repos()
- new_manage_dest = self.tmp_py()
-
- # make new templates dir
- shutil.copytree(migrate.versioning.templates.__path__[0], new_templates_dir)
- shutil.copytree(os.path.join(new_templates_dir, 'repository/default'),
- os.path.join(new_templates_dir, 'repository/custom'))
-
- # edit templates
- f = open(manage_tmpl_file, 'w').write(MANAGE_CONTENTS)
- f = open(repository_tmpl_file, 'w').write(README_CONTENTS)
- f = open(script_tmpl_file, 'w').write(SCRIPT_FILE_CONTENTS)
- f = open(sql_script_tmpl_file, 'w').write(SCRIPT_FILE_CONTENTS)
-
- # create repository, manage file and python script
- kw = {}
- kw['templates_path'] = new_templates_dir
- kw['templates_theme'] = 'custom'
- api.create(new_repo_dest, 'repo_name', **kw)
- api.script('test', new_repo_dest, **kw)
- api.script_sql('postgres', 'foo', new_repo_dest, **kw)
- api.manage(new_manage_dest, **kw)
-
- # assert changes
- self.assertEqual(open(new_manage_dest).read(), MANAGE_CONTENTS)
- self.assertEqual(open(os.path.join(new_repo_dest, 'manage.py')).read(), MANAGE_CONTENTS)
- self.assertEqual(open(os.path.join(new_repo_dest, 'README')).read(), README_CONTENTS)
- self.assertEqual(open(os.path.join(new_repo_dest, 'versions/001_test.py')).read(), SCRIPT_FILE_CONTENTS)
- self.assertEqual(open(os.path.join(new_repo_dest, 'versions/002_foo_postgres_downgrade.sql')).read(), SCRIPT_FILE_CONTENTS)
- self.assertEqual(open(os.path.join(new_repo_dest, 'versions/002_foo_postgres_upgrade.sql')).read(), SCRIPT_FILE_CONTENTS)
diff --git a/migrate/tests/versioning/test_util.py b/migrate/tests/versioning/test_util.py
deleted file mode 100644
index 21e3f27..0000000
--- a/migrate/tests/versioning/test_util.py
+++ /dev/null
@@ -1,139 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import os
-
-from sqlalchemy import *
-
-from migrate.exceptions import MigrateDeprecationWarning
-from migrate.tests import fixture
-from migrate.tests.fixture.warnings import catch_warnings
-from migrate.versioning.util import *
-from migrate.versioning import api
-
-import warnings
-
-class TestUtil(fixture.Pathed):
-
- def test_construct_engine(self):
- """Construct engine the smart way"""
- url = 'sqlite://'
-
- engine = construct_engine(url)
- self.assertTrue(engine.name == 'sqlite')
-
- # keyword arg
- engine = construct_engine(url, engine_arg_encoding='utf-8')
- self.assertEqual(engine.dialect.encoding, 'utf-8')
-
- # dict
- engine = construct_engine(url, engine_dict={'encoding': 'utf-8'})
- self.assertEqual(engine.dialect.encoding, 'utf-8')
-
- # engine parameter
- engine_orig = create_engine('sqlite://')
- engine = construct_engine(engine_orig)
- self.assertEqual(engine, engine_orig)
-
- # test precedance
- engine = construct_engine(url, engine_dict={'encoding': 'iso-8859-1'},
- engine_arg_encoding='utf-8')
- self.assertEqual(engine.dialect.encoding, 'utf-8')
-
- # deprecated echo=True parameter
- try:
- # py 2.4 compatibility :-/
- cw = catch_warnings(record=True)
- w = cw.__enter__()
-
- warnings.simplefilter("always")
- engine = construct_engine(url, echo='True')
- self.assertTrue(engine.echo)
-
- self.assertEqual(len(w),1)
- self.assertTrue(issubclass(w[-1].category,
- MigrateDeprecationWarning))
- self.assertEqual(
- 'echo=True parameter is deprecated, pass '
- 'engine_arg_echo=True or engine_dict={"echo": True}',
- str(w[-1].message))
-
- finally:
- cw.__exit__()
-
- # unsupported argument
- self.assertRaises(ValueError, construct_engine, 1)
-
- def test_passing_engine(self):
- repo = self.tmp_repos()
- api.create(repo, 'temp')
- api.script('First Version', repo)
- engine = construct_engine('sqlite:///:memory:')
-
- api.version_control(engine, repo)
- api.upgrade(engine, repo)
-
- def test_asbool(self):
- """test asbool parsing"""
- result = asbool(True)
- self.assertEqual(result, True)
-
- result = asbool(False)
- self.assertEqual(result, False)
-
- result = asbool('y')
- self.assertEqual(result, True)
-
- result = asbool('n')
- self.assertEqual(result, False)
-
- self.assertRaises(ValueError, asbool, 'test')
- self.assertRaises(ValueError, asbool, object)
-
-
- def test_load_model(self):
- """load model from dotted name"""
- model_path = os.path.join(self.temp_usable_dir, 'test_load_model.py')
-
- f = open(model_path, 'w')
- f.write("class FakeFloat(int): pass")
- f.close()
-
- try:
- # py 2.4 compatibility :-/
- cw = catch_warnings(record=True)
- w = cw.__enter__()
-
- warnings.simplefilter("always")
-
- # deprecated spelling
- FakeFloat = load_model('test_load_model.FakeFloat')
- self.assertTrue(isinstance(FakeFloat(), int))
-
- self.assertEqual(len(w),1)
- self.assertTrue(issubclass(w[-1].category,
- MigrateDeprecationWarning))
- self.assertEqual(
- 'model should be in form of module.model:User '
- 'and not module.model.User',
- str(w[-1].message))
-
- finally:
- cw.__exit__()
-
- FakeFloat = load_model('test_load_model:FakeFloat')
- self.assertTrue(isinstance(FakeFloat(), int))
-
- FakeFloat = load_model(FakeFloat)
- self.assertTrue(isinstance(FakeFloat(), int))
-
- def test_guess_obj_type(self):
- """guess object type from string"""
- result = guess_obj_type('7')
- self.assertEqual(result, 7)
-
- result = guess_obj_type('y')
- self.assertEqual(result, True)
-
- result = guess_obj_type('test')
- self.assertEqual(result, 'test')
diff --git a/migrate/tests/versioning/test_version.py b/migrate/tests/versioning/test_version.py
deleted file mode 100644
index 286dd59..0000000
--- a/migrate/tests/versioning/test_version.py
+++ /dev/null
@@ -1,186 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-from migrate.exceptions import *
-from migrate.versioning.version import *
-
-from migrate.tests import fixture
-
-
-class TestVerNum(fixture.Base):
- def test_invalid(self):
- """Disallow invalid version numbers"""
- versions = ('-1', -1, 'Thirteen', '')
- for version in versions:
- self.assertRaises(ValueError, VerNum, version)
-
- def test_str(self):
- """Test str and repr version numbers"""
- self.assertEqual(str(VerNum(2)), '2')
- self.assertEqual(repr(VerNum(2)), '<VerNum(2)>')
-
- def test_is(self):
- """Two version with the same number should be equal"""
- a = VerNum(1)
- b = VerNum(1)
- self.assertTrue(a is b)
-
- self.assertEqual(VerNum(VerNum(2)), VerNum(2))
-
- def test_add(self):
- self.assertEqual(VerNum(1) + VerNum(1), VerNum(2))
- self.assertEqual(VerNum(1) + 1, 2)
- self.assertEqual(VerNum(1) + 1, '2')
- self.assertTrue(isinstance(VerNum(1) + 1, VerNum))
-
- def test_sub(self):
- self.assertEqual(VerNum(1) - 1, 0)
- self.assertTrue(isinstance(VerNum(1) - 1, VerNum))
- self.assertRaises(ValueError, lambda: VerNum(0) - 1)
-
- def test_eq(self):
- """Two versions are equal"""
- self.assertEqual(VerNum(1), VerNum('1'))
- self.assertEqual(VerNum(1), 1)
- self.assertEqual(VerNum(1), '1')
- self.assertNotEqual(VerNum(1), 2)
-
- def test_ne(self):
- self.assertTrue(VerNum(1) != 2)
- self.assertFalse(VerNum(1) != 1)
-
- def test_lt(self):
- self.assertFalse(VerNum(1) < 1)
- self.assertTrue(VerNum(1) < 2)
- self.assertFalse(VerNum(2) < 1)
-
- def test_le(self):
- self.assertTrue(VerNum(1) <= 1)
- self.assertTrue(VerNum(1) <= 2)
- self.assertFalse(VerNum(2) <= 1)
-
- def test_gt(self):
- self.assertFalse(VerNum(1) > 1)
- self.assertFalse(VerNum(1) > 2)
- self.assertTrue(VerNum(2) > 1)
-
- def test_ge(self):
- self.assertTrue(VerNum(1) >= 1)
- self.assertTrue(VerNum(2) >= 1)
- self.assertFalse(VerNum(1) >= 2)
-
- def test_int_cast(self):
- ver = VerNum(3)
- # test __int__
- self.assertEqual(int(ver), 3)
- # test __index__: range() doesn't call __int__
- self.assertEqual(list(range(ver, ver)), [])
-
-
-class TestVersion(fixture.Pathed):
-
- def setUp(self):
- super(TestVersion, self).setUp()
-
- def test_str_to_filename(self):
- self.assertEqual(str_to_filename(''), '')
- self.assertEqual(str_to_filename('__'), '_')
- self.assertEqual(str_to_filename('a'), 'a')
- self.assertEqual(str_to_filename('Abc Def'), 'Abc_Def')
- self.assertEqual(str_to_filename('Abc "D" Ef'), 'Abc_D_Ef')
- self.assertEqual(str_to_filename("Abc's Stuff"), 'Abc_s_Stuff')
- self.assertEqual(str_to_filename("a b"), 'a_b')
- self.assertEqual(str_to_filename("a.b to c"), 'a_b_to_c')
-
- def test_collection(self):
- """Let's see how we handle versions collection"""
- coll = Collection(self.temp_usable_dir)
- coll.create_new_python_version("foo bar")
- coll.create_new_sql_version("postgres", "foo bar")
- coll.create_new_sql_version("sqlite", "foo bar")
- coll.create_new_python_version("")
-
- self.assertEqual(coll.latest, 4)
- self.assertEqual(len(coll.versions), 4)
- self.assertEqual(coll.version(4), coll.version(coll.latest))
- # Check for non-existing version
- self.assertRaises(VersionNotFoundError, coll.version, 5)
- # Check for the current version
- self.assertEqual('4', coll.version(4).version)
-
- coll2 = Collection(self.temp_usable_dir)
- self.assertEqual(coll.versions, coll2.versions)
-
- Collection.clear()
-
- def test_old_repository(self):
- open(os.path.join(self.temp_usable_dir, '1'), 'w')
- self.assertRaises(Exception, Collection, self.temp_usable_dir)
-
- #TODO: def test_collection_unicode(self):
- # pass
-
- def test_create_new_python_version(self):
- coll = Collection(self.temp_usable_dir)
- coll.create_new_python_version("'")
-
- ver = coll.version()
- self.assertTrue(ver.script().source())
-
- def test_create_new_sql_version(self):
- coll = Collection(self.temp_usable_dir)
- coll.create_new_sql_version("sqlite", "foo bar")
-
- ver = coll.version()
- ver_up = ver.script('sqlite', 'upgrade')
- ver_down = ver.script('sqlite', 'downgrade')
- ver_up.source()
- ver_down.source()
-
- def test_selection(self):
- """Verify right sql script is selected"""
-
- # Create empty directory.
- path = self.tmp_repos()
- os.mkdir(path)
-
- # Create files -- files must be present or you'll get an exception later.
- python_file = '001_initial_.py'
- sqlite_upgrade_file = '001_sqlite_upgrade.sql'
- default_upgrade_file = '001_default_upgrade.sql'
- for file_ in [sqlite_upgrade_file, default_upgrade_file, python_file]:
- filepath = '%s/%s' % (path, file_)
- open(filepath, 'w').close()
-
- ver = Version(1, path, [sqlite_upgrade_file])
- self.assertEqual(os.path.basename(ver.script('sqlite', 'upgrade').path), sqlite_upgrade_file)
-
- ver = Version(1, path, [default_upgrade_file])
- self.assertEqual(os.path.basename(ver.script('default', 'upgrade').path), default_upgrade_file)
-
- ver = Version(1, path, [sqlite_upgrade_file, default_upgrade_file])
- self.assertEqual(os.path.basename(ver.script('sqlite', 'upgrade').path), sqlite_upgrade_file)
-
- ver = Version(1, path, [sqlite_upgrade_file, default_upgrade_file, python_file])
- self.assertEqual(os.path.basename(ver.script('postgres', 'upgrade').path), default_upgrade_file)
-
- ver = Version(1, path, [sqlite_upgrade_file, python_file])
- self.assertEqual(os.path.basename(ver.script('postgres', 'upgrade').path), python_file)
-
- def test_bad_version(self):
- ver = Version(1, self.temp_usable_dir, [])
- self.assertRaises(ScriptError, ver.add_script, '123.sql')
-
- # tests bad ibm_db_sa filename
- ver = Version(123, self.temp_usable_dir, [])
- self.assertRaises(ScriptError, ver.add_script,
- '123_ibm_db_sa_upgrade.sql')
-
- # tests that the name is ok but the script doesn't exist
- self.assertRaises(InvalidScriptError, ver.add_script,
- '123_test_ibm_db_sa_upgrade.sql')
-
- pyscript = os.path.join(self.temp_usable_dir, 'bla.py')
- open(pyscript, 'w')
- ver.add_script(pyscript)
- self.assertRaises(ScriptError, ver.add_script, 'bla.py')