diff options
| -rw-r--r-- | doc/build/changelog/changelog_10.rst | 12 | ||||
| -rw-r--r-- | lib/sqlalchemy/sql/schema.py | 54 | ||||
| -rw-r--r-- | test/ext/declarative/test_inheritance.py | 35 | ||||
| -rw-r--r-- | test/sql/test_constraints.py | 97 |
4 files changed, 181 insertions, 17 deletions
diff --git a/doc/build/changelog/changelog_10.rst b/doc/build/changelog/changelog_10.rst index 2caa48a5e..3bcb4d6cf 100644 --- a/doc/build/changelog/changelog_10.rst +++ b/doc/build/changelog/changelog_10.rst @@ -19,6 +19,18 @@ :version: 1.0.0b4 .. change:: + :tags: feature, schema + :tickets: 3341 + + The "auto-attach" feature of constraints such as :class:`.UniqueConstraint` + and :class:`.CheckConstraint` has been further enhanced such that + when the constraint is associated with non-table-bound :class:`.Column` + objects, the constraint will set up event listeners with the + columns themselves such that the constraint auto attaches at the + same time the columns are associated with the table. This in particular + helps in some edge cases in declarative but is also of general use. + + .. change:: :tags: bug, sql :tickets: 3340 diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py index e022c5768..3aeba9804 100644 --- a/lib/sqlalchemy/sql/schema.py +++ b/lib/sqlalchemy/sql/schema.py @@ -2390,24 +2390,44 @@ class ColumnCollectionMixin(object): self._pending_colargs = [_to_schema_column_or_string(c) for c in columns] if _autoattach and self._pending_colargs: - columns = [ - c for c in self._pending_colargs - if isinstance(c, Column) and - isinstance(c.table, Table) - ] + self._check_attach() - tables = set([c.table for c in columns]) - if len(tables) == 1: - self._set_parent_with_dispatch(tables.pop()) - elif len(tables) > 1 and not self._allow_multiple_tables: - table = columns[0].table - others = [c for c in columns[1:] if c.table is not table] - if others: - raise exc.ArgumentError( - "Column(s) %s are not part of table '%s'." % - (", ".join("'%s'" % c for c in others), - table.description) - ) + def _check_attach(self, evt=False): + col_objs = [ + c for c in self._pending_colargs + if isinstance(c, Column) + ] + cols_w_table = [ + c for c in col_objs if isinstance(c.table, Table) + ] + cols_wo_table = set(col_objs).difference(cols_w_table) + + if cols_wo_table: + assert not evt, "Should not reach here on event call" + + def _col_attached(column, table): + cols_wo_table.discard(column) + if not cols_wo_table: + self._check_attach(evt=True) + self._cols_wo_table = cols_wo_table + for col in cols_wo_table: + col._on_table_attach(_col_attached) + return + + columns = cols_w_table + + tables = set([c.table for c in columns]) + if len(tables) == 1: + self._set_parent_with_dispatch(tables.pop()) + elif len(tables) > 1 and not self._allow_multiple_tables: + table = columns[0].table + others = [c for c in columns[1:] if c.table is not table] + if others: + raise exc.ArgumentError( + "Column(s) %s are not part of table '%s'." % + (", ".join("'%s'" % c for c in others), + table.description) + ) def _set_parent(self, table): for col in self._pending_colargs: diff --git a/test/ext/declarative/test_inheritance.py b/test/ext/declarative/test_inheritance.py index 6ea37e4d3..2ecee99fd 100644 --- a/test/ext/declarative/test_inheritance.py +++ b/test/ext/declarative/test_inheritance.py @@ -485,6 +485,41 @@ class DeclarativeInheritanceTest(DeclarativeTestBase): ).one(), Engineer(name='vlad', primary_language='cobol')) + def test_single_constraint_on_sub(self): + """test the somewhat unusual case of [ticket:3341]""" + + class Person(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column(String(50)) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator} + + class Engineer(Person): + + __mapper_args__ = {'polymorphic_identity': 'engineer'} + primary_language = Column(String(50)) + + __hack_args_one__ = sa.UniqueConstraint( + Person.name, primary_language) + __hack_args_two__ = sa.CheckConstraint( + Person.name != primary_language) + + uq = [c for c in Person.__table__.constraints + if isinstance(c, sa.UniqueConstraint)][0] + ck = [c for c in Person.__table__.constraints + if isinstance(c, sa.CheckConstraint)][0] + eq_( + list(uq.columns), + [Person.__table__.c.name, Person.__table__.c.primary_language] + ) + eq_( + list(ck.columns), + [Person.__table__.c.name, Person.__table__.c.primary_language] + ) + @testing.skip_if(lambda: testing.against('oracle'), "Test has an empty insert in it at the moment") def test_columns_single_inheritance_conflict_resolution(self): diff --git a/test/sql/test_constraints.py b/test/sql/test_constraints.py index eb558fc95..d024e1a27 100644 --- a/test/sql/test_constraints.py +++ b/test/sql/test_constraints.py @@ -1052,6 +1052,103 @@ class ConstraintAPITest(fixtures.TestBase): assert c not in t.constraints assert c not in t2.constraints + def test_auto_append_ck_on_col_attach_one(self): + m = MetaData() + + a = Column('a', Integer) + b = Column('b', Integer) + ck = CheckConstraint(a > b) + + t = Table('tbl', m, a, b) + assert ck in t.constraints + + def test_auto_append_ck_on_col_attach_two(self): + m = MetaData() + + a = Column('a', Integer) + b = Column('b', Integer) + c = Column('c', Integer) + ck = CheckConstraint(a > b + c) + + t = Table('tbl', m, a) + assert ck not in t.constraints + + t.append_column(b) + assert ck not in t.constraints + + t.append_column(c) + assert ck in t.constraints + + def test_auto_append_ck_on_col_attach_three(self): + m = MetaData() + + a = Column('a', Integer) + b = Column('b', Integer) + c = Column('c', Integer) + ck = CheckConstraint(a > b + c) + + t = Table('tbl', m, a) + assert ck not in t.constraints + + t.append_column(b) + assert ck not in t.constraints + + t2 = Table('t2', m) + t2.append_column(c) + + # two different tables, so CheckConstraint does nothing. + assert ck not in t.constraints + + def test_auto_append_uq_on_col_attach_one(self): + m = MetaData() + + a = Column('a', Integer) + b = Column('b', Integer) + uq = UniqueConstraint(a, b) + + t = Table('tbl', m, a, b) + assert uq in t.constraints + + def test_auto_append_uq_on_col_attach_two(self): + m = MetaData() + + a = Column('a', Integer) + b = Column('b', Integer) + c = Column('c', Integer) + uq = UniqueConstraint(a, b, c) + + t = Table('tbl', m, a) + assert uq not in t.constraints + + t.append_column(b) + assert uq not in t.constraints + + t.append_column(c) + assert uq in t.constraints + + def test_auto_append_uq_on_col_attach_three(self): + m = MetaData() + + a = Column('a', Integer) + b = Column('b', Integer) + c = Column('c', Integer) + uq = UniqueConstraint(a, b, c) + + t = Table('tbl', m, a) + assert uq not in t.constraints + + t.append_column(b) + assert uq not in t.constraints + + t2 = Table('t2', m) + + # two different tables, so UniqueConstraint raises + assert_raises_message( + exc.ArgumentError, + r"Column\(s\) 't2\.c' are not part of table 'tbl'\.", + t2.append_column, c + ) + def test_index_asserts_cols_standalone(self): metadata = MetaData() |
