summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--alembic/batch.py16
-rw-r--r--tests/test_batch.py155
2 files changed, 157 insertions, 14 deletions
diff --git a/alembic/batch.py b/alembic/batch.py
index 5f74511..a64d2bc 100644
--- a/alembic/batch.py
+++ b/alembic/batch.py
@@ -1,5 +1,5 @@
from sqlalchemy import Table, MetaData, Index, select, Column, \
- ForeignKeyConstraint
+ ForeignKeyConstraint, cast
from sqlalchemy import types as sqltypes
from sqlalchemy.util import OrderedDict
@@ -194,23 +194,23 @@ class ApplyBatchImpl(object):
def alter_column(self, table_name, column_name,
nullable=None,
server_default=False,
- new_column_name=None,
+ name=None,
type_=None,
autoincrement=None,
**kw
):
existing = self.columns[column_name]
existing_transfer = self.column_transfers[column_name]
- if new_column_name is not None and new_column_name != column_name:
+ if name is not None and name != column_name:
# note that we don't change '.key' - we keep referring
# to the renamed column by its old key in _create(). neat!
- existing.name = new_column_name
- existing_transfer["name"] = new_column_name
+ existing.name = name
+ existing_transfer["name"] = name
if type_ is not None:
type_ = sqltypes.to_instance(type_)
existing.type = type_
- existing_transfer["typecast"] = type_
+ existing_transfer["expr"] = cast(existing_transfer["expr"], type_)
if nullable is not None:
existing.nullable = nullable
if server_default is not False:
@@ -219,7 +219,9 @@ class ApplyBatchImpl(object):
existing.autoincrement = bool(autoincrement)
def add_column(self, table_name, column, **kw):
- self.columns[column.name] = column
+ # we copy the column because operations.add_column()
+ # gives us a Column that is part of a Table already.
+ self.columns[column.name] = column.copy(schema=self.table.schema)
self.column_transfers[column.name] = {}
def drop_column(self, table_name, column, **kw):
diff --git a/tests/test_batch.py b/tests/test_batch.py
index dfddbc5..4f2695b 100644
--- a/tests/test_batch.py
+++ b/tests/test_batch.py
@@ -1,11 +1,12 @@
from contextlib import contextmanager
import re
-from alembic.testing import TestBase, eq_
+from alembic.testing import TestBase, eq_, config
from alembic.testing.fixtures import op_fixture
from alembic.testing import mock
from alembic.operations import Operations
from alembic.batch import ApplyBatchImpl
+from alembic.migration import MigrationContext
from sqlalchemy import Integer, Table, Column, String, MetaData, ForeignKey, \
UniqueConstraint, ForeignKeyConstraint
@@ -97,10 +98,18 @@ class BatchApplyTest(TestBase):
'SELECT %(tname_colnames)s FROM tname' % {
"colnames": ", ".join([
impl.new_table.c[name].name
- for name in colnames if name in impl.table.c]),
+ for name in colnames
+ if name in impl.table.c]),
"tname_colnames":
- ", ".join("tname.%s" % name
- for name in colnames if name in impl.table.c)
+ ", ".join(
+ "CAST(tname.%s AS %s) AS anon_1" % (
+ name, impl.new_table.c[name].type)
+ if (
+ impl.new_table.c[name].type
+ is not impl.table.c[name].type)
+ else "tname.%s" % name
+ for name in colnames if name in impl.table.c
+ )
},
'DROP TABLE tname',
'ALTER TABLE _alembic_batch_temp RENAME TO tname'
@@ -115,13 +124,22 @@ class BatchApplyTest(TestBase):
def test_rename_col(self):
impl = self._simple_fixture()
- impl.alter_column('tname', 'x', new_column_name='q')
+ impl.alter_column('tname', 'x', name='q')
new_table = self._assert_impl(impl)
eq_(new_table.c.x.name, 'q')
+ def test_add_col(self):
+ impl = self._simple_fixture()
+ col = Column('g', Integer)
+ # operations.add_column produces a table
+ t = self.op._table('tname', col) # noqa
+ impl.add_column('tname', col)
+ new_table = self._assert_impl(impl, colnames=['id', 'x', 'y', 'g'])
+ eq_(new_table.c.g.name, 'g')
+
def test_rename_col_pk(self):
impl = self._simple_fixture()
- impl.alter_column('tname', 'id', new_column_name='foobar')
+ impl.alter_column('tname', 'id', name='foobar')
new_table = self._assert_impl(
impl, ddl_contains="PRIMARY KEY (foobar)")
eq_(new_table.c.id.name, 'foobar')
@@ -129,7 +147,7 @@ class BatchApplyTest(TestBase):
def test_rename_col_fk(self):
impl = self._fk_fixture()
- impl.alter_column('tname', 'user_id', new_column_name='foobar')
+ impl.alter_column('tname', 'user_id', name='foobar')
new_table = self._assert_impl(
impl, colnames=['id', 'email', 'user_id'],
ddl_contains='FOREIGN KEY(foobar) REFERENCES "user" (id)')
@@ -329,3 +347,126 @@ class BatchAPITest(TestBase):
batch.impl.operations.impl.mock_calls,
[mock.call.drop_constraint(self.mock_schema.Constraint())]
)
+
+
+class BatchRoundTripTest(TestBase):
+ __only_on__ = "sqlite"
+
+ def setUp(self):
+ self.conn = config.db.connect()
+ t1 = Table(
+ 'foo', MetaData(),
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50)),
+ Column('x', Integer)
+ )
+ t1.create(self.conn)
+ self.conn.execute(
+ t1.insert(),
+ [
+ {"id": 1, "data": "d1", "x": 5},
+ {"id": 2, "data": "22", "x": 6},
+ {"id": 3, "data": "8.5", "x": 7},
+ {"id": 4, "data": "9.46", "x": 8},
+ {"id": 5, "data": "d5", "x": 9}
+ ]
+ )
+ context = MigrationContext.configure(self.conn)
+ self.op = Operations(context)
+
+ def tearDown(self):
+ self.conn.execute("drop table foo")
+ self.conn.close()
+
+ def _assert_data(self, data):
+ eq_(
+ [dict(row) for row in self.conn.execute("select * from foo")],
+ data
+ )
+
+ def test_change_type(self):
+ with self.op.batch_alter_table("foo") as batch_op:
+ batch_op.alter_column('data', type_=Integer)
+
+ self._assert_data([
+ {"id": 1, "data": 0, "x": 5},
+ {"id": 2, "data": 22, "x": 6},
+ {"id": 3, "data": 8, "x": 7},
+ {"id": 4, "data": 9, "x": 8},
+ {"id": 5, "data": 0, "x": 9}
+ ])
+
+ def test_drop_column(self):
+ with self.op.batch_alter_table("foo") as batch_op:
+ batch_op.drop_column('data')
+
+ self._assert_data([
+ {"id": 1, "x": 5},
+ {"id": 2, "x": 6},
+ {"id": 3, "x": 7},
+ {"id": 4, "x": 8},
+ {"id": 5, "x": 9}
+ ])
+
+ def test_rename_column(self):
+ with self.op.batch_alter_table("foo") as batch_op:
+ batch_op.alter_column('x', new_column_name='y')
+
+ self._assert_data([
+ {"id": 1, "data": "d1", "y": 5},
+ {"id": 2, "data": "22", "y": 6},
+ {"id": 3, "data": "8.5", "y": 7},
+ {"id": 4, "data": "9.46", "y": 8},
+ {"id": 5, "data": "d5", "y": 9}
+ ])
+
+ def test_drop_column_pk(self):
+ with self.op.batch_alter_table("foo") as batch_op:
+ batch_op.drop_column('id')
+
+ self._assert_data([
+ {"data": "d1", "x": 5},
+ {"data": "22", "x": 6},
+ {"data": "8.5", "x": 7},
+ {"data": "9.46", "x": 8},
+ {"data": "d5", "x": 9}
+ ])
+
+ def test_rename_column_pk(self):
+ with self.op.batch_alter_table("foo") as batch_op:
+ batch_op.alter_column('id', new_column_name='ident')
+
+ self._assert_data([
+ {"ident": 1, "data": "d1", "x": 5},
+ {"ident": 2, "data": "22", "x": 6},
+ {"ident": 3, "data": "8.5", "x": 7},
+ {"ident": 4, "data": "9.46", "x": 8},
+ {"ident": 5, "data": "d5", "x": 9}
+ ])
+
+ def test_add_column_auto(self):
+ # note this uses ALTER
+ with self.op.batch_alter_table("foo") as batch_op:
+ batch_op.add_column(
+ Column('data2', String(50), server_default='hi'))
+
+ self._assert_data([
+ {"id": 1, "data": "d1", "x": 5, 'data2': 'hi'},
+ {"id": 2, "data": "22", "x": 6, 'data2': 'hi'},
+ {"id": 3, "data": "8.5", "x": 7, 'data2': 'hi'},
+ {"id": 4, "data": "9.46", "x": 8, 'data2': 'hi'},
+ {"id": 5, "data": "d5", "x": 9, 'data2': 'hi'}
+ ])
+
+ def test_add_column_recreate(self):
+ with self.op.batch_alter_table("foo", recreate='always') as batch_op:
+ batch_op.add_column(
+ Column('data2', String(50), server_default='hi'))
+
+ self._assert_data([
+ {"id": 1, "data": "d1", "x": 5, 'data2': 'hi'},
+ {"id": 2, "data": "22", "x": 6, 'data2': 'hi'},
+ {"id": 3, "data": "8.5", "x": 7, 'data2': 'hi'},
+ {"id": 4, "data": "9.46", "x": 8, 'data2': 'hi'},
+ {"id": 5, "data": "d5", "x": 9, 'data2': 'hi'}
+ ])