diff options
-rw-r--r-- | alembic/batch.py | 16 | ||||
-rw-r--r-- | tests/test_batch.py | 155 |
2 files changed, 157 insertions, 14 deletions
diff --git a/alembic/batch.py b/alembic/batch.py index 5f74511..a64d2bc 100644 --- a/alembic/batch.py +++ b/alembic/batch.py @@ -1,5 +1,5 @@ from sqlalchemy import Table, MetaData, Index, select, Column, \ - ForeignKeyConstraint + ForeignKeyConstraint, cast from sqlalchemy import types as sqltypes from sqlalchemy.util import OrderedDict @@ -194,23 +194,23 @@ class ApplyBatchImpl(object): def alter_column(self, table_name, column_name, nullable=None, server_default=False, - new_column_name=None, + name=None, type_=None, autoincrement=None, **kw ): existing = self.columns[column_name] existing_transfer = self.column_transfers[column_name] - if new_column_name is not None and new_column_name != column_name: + if name is not None and name != column_name: # note that we don't change '.key' - we keep referring # to the renamed column by its old key in _create(). neat! - existing.name = new_column_name - existing_transfer["name"] = new_column_name + existing.name = name + existing_transfer["name"] = name if type_ is not None: type_ = sqltypes.to_instance(type_) existing.type = type_ - existing_transfer["typecast"] = type_ + existing_transfer["expr"] = cast(existing_transfer["expr"], type_) if nullable is not None: existing.nullable = nullable if server_default is not False: @@ -219,7 +219,9 @@ class ApplyBatchImpl(object): existing.autoincrement = bool(autoincrement) def add_column(self, table_name, column, **kw): - self.columns[column.name] = column + # we copy the column because operations.add_column() + # gives us a Column that is part of a Table already. + self.columns[column.name] = column.copy(schema=self.table.schema) self.column_transfers[column.name] = {} def drop_column(self, table_name, column, **kw): diff --git a/tests/test_batch.py b/tests/test_batch.py index dfddbc5..4f2695b 100644 --- a/tests/test_batch.py +++ b/tests/test_batch.py @@ -1,11 +1,12 @@ from contextlib import contextmanager import re -from alembic.testing import TestBase, eq_ +from alembic.testing import TestBase, eq_, config from alembic.testing.fixtures import op_fixture from alembic.testing import mock from alembic.operations import Operations from alembic.batch import ApplyBatchImpl +from alembic.migration import MigrationContext from sqlalchemy import Integer, Table, Column, String, MetaData, ForeignKey, \ UniqueConstraint, ForeignKeyConstraint @@ -97,10 +98,18 @@ class BatchApplyTest(TestBase): 'SELECT %(tname_colnames)s FROM tname' % { "colnames": ", ".join([ impl.new_table.c[name].name - for name in colnames if name in impl.table.c]), + for name in colnames + if name in impl.table.c]), "tname_colnames": - ", ".join("tname.%s" % name - for name in colnames if name in impl.table.c) + ", ".join( + "CAST(tname.%s AS %s) AS anon_1" % ( + name, impl.new_table.c[name].type) + if ( + impl.new_table.c[name].type + is not impl.table.c[name].type) + else "tname.%s" % name + for name in colnames if name in impl.table.c + ) }, 'DROP TABLE tname', 'ALTER TABLE _alembic_batch_temp RENAME TO tname' @@ -115,13 +124,22 @@ class BatchApplyTest(TestBase): def test_rename_col(self): impl = self._simple_fixture() - impl.alter_column('tname', 'x', new_column_name='q') + impl.alter_column('tname', 'x', name='q') new_table = self._assert_impl(impl) eq_(new_table.c.x.name, 'q') + def test_add_col(self): + impl = self._simple_fixture() + col = Column('g', Integer) + # operations.add_column produces a table + t = self.op._table('tname', col) # noqa + impl.add_column('tname', col) + new_table = self._assert_impl(impl, colnames=['id', 'x', 'y', 'g']) + eq_(new_table.c.g.name, 'g') + def test_rename_col_pk(self): impl = self._simple_fixture() - impl.alter_column('tname', 'id', new_column_name='foobar') + impl.alter_column('tname', 'id', name='foobar') new_table = self._assert_impl( impl, ddl_contains="PRIMARY KEY (foobar)") eq_(new_table.c.id.name, 'foobar') @@ -129,7 +147,7 @@ class BatchApplyTest(TestBase): def test_rename_col_fk(self): impl = self._fk_fixture() - impl.alter_column('tname', 'user_id', new_column_name='foobar') + impl.alter_column('tname', 'user_id', name='foobar') new_table = self._assert_impl( impl, colnames=['id', 'email', 'user_id'], ddl_contains='FOREIGN KEY(foobar) REFERENCES "user" (id)') @@ -329,3 +347,126 @@ class BatchAPITest(TestBase): batch.impl.operations.impl.mock_calls, [mock.call.drop_constraint(self.mock_schema.Constraint())] ) + + +class BatchRoundTripTest(TestBase): + __only_on__ = "sqlite" + + def setUp(self): + self.conn = config.db.connect() + t1 = Table( + 'foo', MetaData(), + Column('id', Integer, primary_key=True), + Column('data', String(50)), + Column('x', Integer) + ) + t1.create(self.conn) + self.conn.execute( + t1.insert(), + [ + {"id": 1, "data": "d1", "x": 5}, + {"id": 2, "data": "22", "x": 6}, + {"id": 3, "data": "8.5", "x": 7}, + {"id": 4, "data": "9.46", "x": 8}, + {"id": 5, "data": "d5", "x": 9} + ] + ) + context = MigrationContext.configure(self.conn) + self.op = Operations(context) + + def tearDown(self): + self.conn.execute("drop table foo") + self.conn.close() + + def _assert_data(self, data): + eq_( + [dict(row) for row in self.conn.execute("select * from foo")], + data + ) + + def test_change_type(self): + with self.op.batch_alter_table("foo") as batch_op: + batch_op.alter_column('data', type_=Integer) + + self._assert_data([ + {"id": 1, "data": 0, "x": 5}, + {"id": 2, "data": 22, "x": 6}, + {"id": 3, "data": 8, "x": 7}, + {"id": 4, "data": 9, "x": 8}, + {"id": 5, "data": 0, "x": 9} + ]) + + def test_drop_column(self): + with self.op.batch_alter_table("foo") as batch_op: + batch_op.drop_column('data') + + self._assert_data([ + {"id": 1, "x": 5}, + {"id": 2, "x": 6}, + {"id": 3, "x": 7}, + {"id": 4, "x": 8}, + {"id": 5, "x": 9} + ]) + + def test_rename_column(self): + with self.op.batch_alter_table("foo") as batch_op: + batch_op.alter_column('x', new_column_name='y') + + self._assert_data([ + {"id": 1, "data": "d1", "y": 5}, + {"id": 2, "data": "22", "y": 6}, + {"id": 3, "data": "8.5", "y": 7}, + {"id": 4, "data": "9.46", "y": 8}, + {"id": 5, "data": "d5", "y": 9} + ]) + + def test_drop_column_pk(self): + with self.op.batch_alter_table("foo") as batch_op: + batch_op.drop_column('id') + + self._assert_data([ + {"data": "d1", "x": 5}, + {"data": "22", "x": 6}, + {"data": "8.5", "x": 7}, + {"data": "9.46", "x": 8}, + {"data": "d5", "x": 9} + ]) + + def test_rename_column_pk(self): + with self.op.batch_alter_table("foo") as batch_op: + batch_op.alter_column('id', new_column_name='ident') + + self._assert_data([ + {"ident": 1, "data": "d1", "x": 5}, + {"ident": 2, "data": "22", "x": 6}, + {"ident": 3, "data": "8.5", "x": 7}, + {"ident": 4, "data": "9.46", "x": 8}, + {"ident": 5, "data": "d5", "x": 9} + ]) + + def test_add_column_auto(self): + # note this uses ALTER + with self.op.batch_alter_table("foo") as batch_op: + batch_op.add_column( + Column('data2', String(50), server_default='hi')) + + self._assert_data([ + {"id": 1, "data": "d1", "x": 5, 'data2': 'hi'}, + {"id": 2, "data": "22", "x": 6, 'data2': 'hi'}, + {"id": 3, "data": "8.5", "x": 7, 'data2': 'hi'}, + {"id": 4, "data": "9.46", "x": 8, 'data2': 'hi'}, + {"id": 5, "data": "d5", "x": 9, 'data2': 'hi'} + ]) + + def test_add_column_recreate(self): + with self.op.batch_alter_table("foo", recreate='always') as batch_op: + batch_op.add_column( + Column('data2', String(50), server_default='hi')) + + self._assert_data([ + {"id": 1, "data": "d1", "x": 5, 'data2': 'hi'}, + {"id": 2, "data": "22", "x": 6, 'data2': 'hi'}, + {"id": 3, "data": "8.5", "x": 7, 'data2': 'hi'}, + {"id": 4, "data": "9.46", "x": 8, 'data2': 'hi'}, + {"id": 5, "data": "d5", "x": 9, 'data2': 'hi'} + ]) |