diff options
Diffstat (limited to 'test/sql/test_metadata.py')
-rw-r--r-- | test/sql/test_metadata.py | 520 |
1 files changed, 489 insertions, 31 deletions
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index 851e9b920..f933a2494 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -5,15 +5,16 @@ from sqlalchemy.testing import emits_warning import pickle from sqlalchemy import Integer, String, UniqueConstraint, \ CheckConstraint, ForeignKey, MetaData, Sequence, \ - ForeignKeyConstraint, ColumnDefault, Index, event,\ - events, Unicode, types as sqltypes -from sqlalchemy.testing.schema import Table, Column + ForeignKeyConstraint, PrimaryKeyConstraint, ColumnDefault, Index, event,\ + events, Unicode, types as sqltypes, bindparam, \ + Table, Column from sqlalchemy import schema, exc import sqlalchemy as tsa from sqlalchemy.testing import fixtures from sqlalchemy import testing from sqlalchemy.testing import ComparesTables, AssertsCompiledSQL -from sqlalchemy.testing import eq_, is_ +from sqlalchemy.testing import eq_, is_, mock +from contextlib import contextmanager class MetaDataTest(fixtures.TestBase, ComparesTables): def test_metadata_connect(self): @@ -236,6 +237,45 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): go ) + def test_fk_given_non_col(self): + not_a_col = bindparam('x') + assert_raises_message( + exc.ArgumentError, + "String, Column, or Column-bound argument expected, got Bind", + ForeignKey, not_a_col + ) + + def test_fk_given_non_col_clauseelem(self): + class Foo(object): + def __clause_element__(self): + return bindparam('x') + assert_raises_message( + exc.ArgumentError, + "String, Column, or Column-bound argument expected, got Bind", + ForeignKey, Foo() + ) + + def test_fk_given_col_non_table(self): + t = Table('t', MetaData(), Column('x', Integer)) + xa = t.alias().c.x + assert_raises_message( + exc.ArgumentError, + "ForeignKey received Column not bound to a Table, got: .*Alias", + ForeignKey, xa + ) + + def test_fk_given_col_non_table_clauseelem(self): + t = Table('t', MetaData(), Column('x', Integer)) + class Foo(object): + def __clause_element__(self): + return t.alias().c.x + + assert_raises_message( + exc.ArgumentError, + "ForeignKey received Column not bound to a Table, got: .*Alias", + ForeignKey, Foo() + ) + def test_fk_no_such_target_col_error_upfront(self): meta = MetaData() a = Table('a', meta, Column('a', Integer)) @@ -268,6 +308,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): @testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely') def test_to_metadata(self): + from sqlalchemy.testing.schema import Table meta = MetaData() table = Table('mytable', meta, @@ -280,7 +321,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): Column('description', String(30), CheckConstraint("description='hi'")), UniqueConstraint('name'), - test_needs_fk=True, + test_needs_fk=True ) table2 = Table('othertable', meta, @@ -288,7 +329,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): Column('myid', Integer, ForeignKey('mytable.myid'), ), - test_needs_fk=True, + test_needs_fk=True ) def test_to_metadata(): @@ -447,13 +488,11 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): Column('description', String(30), CheckConstraint("description='hi'")), UniqueConstraint('name'), - test_needs_fk=True, ) table2 = Table('othertable', meta, Column('id', Integer, primary_key=True), Column('myid', Integer, ForeignKey('mytable.myid')), - test_needs_fk=True, ) meta2 = MetaData() @@ -474,14 +513,12 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): Column('description', String(30), CheckConstraint("description='hi'")), UniqueConstraint('name'), - test_needs_fk=True, schema='myschema', ) table2 = Table('othertable', meta, Column('id', Integer, primary_key=True), Column('myid', Integer, ForeignKey('myschema.mytable.myid')), - test_needs_fk=True, schema='myschema', ) @@ -494,6 +531,47 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): eq_(str(table_c.join(table2_c).onclause), 'myschema.mytable.myid = myschema.othertable.myid') + def test_tometadata_copy_info(self): + m = MetaData() + fk = ForeignKey('t2.id') + c = Column('c', Integer, fk) + ck = CheckConstraint('c > 5') + t = Table('t', m, c, ck) + + m.info['minfo'] = True + fk.info['fkinfo'] = True + c.info['cinfo'] = True + ck.info['ckinfo'] = True + t.info['tinfo'] = True + t.primary_key.info['pkinfo'] = True + fkc = [const for const in t.constraints if + isinstance(const, ForeignKeyConstraint)][0] + fkc.info['fkcinfo'] = True + + m2 = MetaData() + t2 = t.tometadata(m2) + + m.info['minfo'] = False + fk.info['fkinfo'] = False + c.info['cinfo'] = False + ck.info['ckinfo'] = False + t.primary_key.info['pkinfo'] = False + fkc.info['fkcinfo'] = False + + eq_(m2.info, {}) + eq_(t2.info, {"tinfo": True}) + eq_(t2.c.c.info, {"cinfo": True}) + eq_(list(t2.c.c.foreign_keys)[0].info, {"fkinfo": True}) + eq_(t2.primary_key.info, {"pkinfo": True}) + + fkc2 = [const for const in t2.constraints + if isinstance(const, ForeignKeyConstraint)][0] + eq_(fkc2.info, {"fkcinfo": True}) + + ck2 = [const for const in + t2.constraints if isinstance(const, CheckConstraint)][0] + eq_(ck2.info, {"ckinfo": True}) + def test_tometadata_kwargs(self): meta = MetaData() @@ -506,6 +584,8 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): meta2 = MetaData() table_c = table.tometadata(meta2) + eq_(table.kwargs, {"mysql_engine": "InnoDB"}) + eq_(table.kwargs, table_c.kwargs) def test_tometadata_indexes(self): @@ -581,11 +661,13 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): kw['quote_schema'] = quote_schema t = Table(name, metadata, **kw) eq_(t.schema, exp_schema, "test %d, table schema" % i) - eq_(t.quote_schema, exp_quote_schema, + eq_(t.schema.quote if t.schema is not None else None, + exp_quote_schema, "test %d, table quote_schema" % i) seq = Sequence(name, metadata=metadata, **kw) eq_(seq.schema, exp_schema, "test %d, seq schema" % i) - eq_(seq.quote_schema, exp_quote_schema, + eq_(seq.schema.quote if seq.schema is not None else None, + exp_quote_schema, "test %d, seq quote_schema" % i) def test_manual_dependencies(self): @@ -614,13 +696,11 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): Column('name', String(40), nullable=True), Column('description', String(30), CheckConstraint("description='hi'")), UniqueConstraint('name'), - test_needs_fk=True ) table2 = Table('othertable', meta, Column('id', Integer, primary_key=True), Column('myid', Integer, ForeignKey('myschema.mytable.myid')), - test_needs_fk=True ) meta2 = MetaData(schema='someschema') @@ -641,13 +721,11 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): Column('description', String(30), CheckConstraint("description='hi'")), UniqueConstraint('name'), - test_needs_fk=True, ) table2 = Table('othertable', meta, Column('id', Integer, primary_key=True), Column('myid', Integer, ForeignKey('mytable.myid')), - test_needs_fk=True, ) meta2 = MetaData() @@ -764,6 +842,77 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): ) is_(t._autoincrement_column, t.c.id) + def test_pk_args_standalone(self): + m = MetaData() + t = Table('t', m, + Column('x', Integer, primary_key=True), + PrimaryKeyConstraint(mssql_clustered=True) + ) + eq_( + list(t.primary_key), [t.c.x] + ) + eq_( + t.primary_key.dialect_kwargs, {"mssql_clustered": True} + ) + + def test_pk_cols_sets_flags(self): + m = MetaData() + t = Table('t', m, + Column('x', Integer), + Column('y', Integer), + Column('z', Integer), + PrimaryKeyConstraint('x', 'y') + ) + eq_(t.c.x.primary_key, True) + eq_(t.c.y.primary_key, True) + eq_(t.c.z.primary_key, False) + + def test_pk_col_mismatch_one(self): + m = MetaData() + assert_raises_message( + exc.SAWarning, + "Table 't' specifies columns 'x' as primary_key=True, " + "not matching locally specified columns 'q'", + Table, 't', m, + Column('x', Integer, primary_key=True), + Column('q', Integer), + PrimaryKeyConstraint('q') + ) + + def test_pk_col_mismatch_two(self): + m = MetaData() + assert_raises_message( + exc.SAWarning, + "Table 't' specifies columns 'a', 'b', 'c' as primary_key=True, " + "not matching locally specified columns 'b', 'c'", + Table, 't', m, + Column('a', Integer, primary_key=True), + Column('b', Integer, primary_key=True), + Column('c', Integer, primary_key=True), + PrimaryKeyConstraint('b', 'c') + ) + + @testing.emits_warning("Table 't'") + def test_pk_col_mismatch_three(self): + m = MetaData() + t = Table('t', m, + Column('x', Integer, primary_key=True), + Column('q', Integer), + PrimaryKeyConstraint('q') + ) + eq_(list(t.primary_key), [t.c.q]) + + @testing.emits_warning("Table 't'") + def test_pk_col_mismatch_four(self): + m = MetaData() + t = Table('t', m, + Column('a', Integer, primary_key=True), + Column('b', Integer, primary_key=True), + Column('c', Integer, primary_key=True), + PrimaryKeyConstraint('b', 'c') + ) + eq_(list(t.primary_key), [t.c.b, t.c.c]) + class SchemaTypeTest(fixtures.TestBase): class MyType(sqltypes.SchemaType, sqltypes.TypeEngine): column = None @@ -1039,7 +1188,7 @@ class UseExistingTest(fixtures.TablesTest): meta2 = self._useexisting_fixture() users = Table('users', meta2, quote=True, autoload=True, keep_existing=True) - assert not users.quote + assert not users.name.quote def test_keep_existing_add_column(self): meta2 = self._useexisting_fixture() @@ -1055,12 +1204,15 @@ class UseExistingTest(fixtures.TablesTest): autoload=True, keep_existing=True) assert isinstance(users.c.name.type, Unicode) + @testing.skip_if( + lambda: testing.db.dialect.requires_name_normalize, + "test depends on lowercase as case insensitive") def test_keep_existing_quote_no_orig(self): meta2 = self._notexisting_fixture() users = Table('users', meta2, quote=True, autoload=True, keep_existing=True) - assert users.quote + assert users.name.quote def test_keep_existing_add_column_no_orig(self): meta2 = self._notexisting_fixture() @@ -1080,7 +1232,7 @@ class UseExistingTest(fixtures.TablesTest): meta2 = self._useexisting_fixture() users = Table('users', meta2, quote=True, keep_existing=True) - assert not users.quote + assert not users.name.quote def test_keep_existing_add_column_no_reflection(self): meta2 = self._useexisting_fixture() @@ -1097,9 +1249,12 @@ class UseExistingTest(fixtures.TablesTest): def test_extend_existing_quote(self): meta2 = self._useexisting_fixture() - users = Table('users', meta2, quote=True, autoload=True, - extend_existing=True) - assert users.quote + assert_raises_message( + tsa.exc.ArgumentError, + "Can't redefine 'quote' or 'quote_schema' arguments", + Table, 'users', meta2, quote=True, autoload=True, + extend_existing=True + ) def test_extend_existing_add_column(self): meta2 = self._useexisting_fixture() @@ -1115,12 +1270,15 @@ class UseExistingTest(fixtures.TablesTest): autoload=True, extend_existing=True) assert isinstance(users.c.name.type, Unicode) + @testing.skip_if( + lambda: testing.db.dialect.requires_name_normalize, + "test depends on lowercase as case insensitive") def test_extend_existing_quote_no_orig(self): meta2 = self._notexisting_fixture() users = Table('users', meta2, quote=True, autoload=True, extend_existing=True) - assert users.quote + assert users.name.quote def test_extend_existing_add_column_no_orig(self): meta2 = self._notexisting_fixture() @@ -1138,9 +1296,12 @@ class UseExistingTest(fixtures.TablesTest): def test_extend_existing_quote_no_reflection(self): meta2 = self._useexisting_fixture() - users = Table('users', meta2, quote=True, - extend_existing=True) - assert users.quote + assert_raises_message( + tsa.exc.ArgumentError, + "Can't redefine 'quote' or 'quote_schema' arguments", + Table, 'users', meta2, quote=True, + extend_existing=True + ) def test_extend_existing_add_column_no_reflection(self): meta2 = self._useexisting_fixture() @@ -1546,6 +1707,28 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): assert c.name == 'named' assert c.name == c.key + def test_unique_index_flags_default_to_none(self): + c = Column(Integer) + eq_(c.unique, None) + eq_(c.index, None) + + c = Column('c', Integer, index=True) + eq_(c.unique, None) + eq_(c.index, True) + + t = Table('t', MetaData(), c) + eq_(list(t.indexes)[0].unique, False) + + c = Column(Integer, unique=True) + eq_(c.unique, True) + eq_(c.index, None) + + c = Column('c', Integer, index=True, unique=True) + eq_(c.unique, True) + eq_(c.index, True) + + t = Table('t', MetaData(), c) + eq_(list(t.indexes)[0].unique, True) def test_bogus(self): assert_raises(exc.ArgumentError, Column, 'foo', name='bar') @@ -1841,7 +2024,6 @@ class ColumnOptionsTest(fixtures.TestBase): c.info['bar'] = 'zip' assert c.info['bar'] == 'zip' - class CatchAllEventsTest(fixtures.TestBase): def teardown(self): @@ -1890,6 +2072,7 @@ class CatchAllEventsTest(fixtures.TestBase): parent.__class__.__name__)) def after_attach(obj, parent): + assert hasattr(obj, 'name') # so we can change it canary.append("%s->%s" % (target.__name__, parent)) event.listen(target, "before_parent_attach", before_attach) event.listen(target, "after_parent_attach", after_attach) @@ -1897,14 +2080,15 @@ class CatchAllEventsTest(fixtures.TestBase): for target in [ schema.ForeignKeyConstraint, schema.PrimaryKeyConstraint, schema.UniqueConstraint, - schema.CheckConstraint + schema.CheckConstraint, + schema.Index ]: evt(target) m = MetaData() Table('t1', m, Column('id', Integer, Sequence('foo_id'), primary_key=True), - Column('bar', String, ForeignKey('t2.id')), + Column('bar', String, ForeignKey('t2.id'), index=True), Column('bat', Integer, unique=True), ) Table('t2', m, @@ -1912,17 +2096,291 @@ class CatchAllEventsTest(fixtures.TestBase): Column('bar', Integer), Column('bat', Integer), CheckConstraint("bar>5"), - UniqueConstraint('bar', 'bat') + UniqueConstraint('bar', 'bat'), + Index(None, 'bar', 'bat') ) eq_( canary, [ 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1', + 'Index->Table', 'Index->t1', 'ForeignKeyConstraint->Table', 'ForeignKeyConstraint->t1', 'UniqueConstraint->Table', 'UniqueConstraint->t1', 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t2', 'CheckConstraint->Table', 'CheckConstraint->t2', - 'UniqueConstraint->Table', 'UniqueConstraint->t2' + 'UniqueConstraint->Table', 'UniqueConstraint->t2', + 'Index->Table', 'Index->t2' ] ) +class DialectKWArgTest(fixtures.TestBase): + @contextmanager + def _fixture(self): + from sqlalchemy.engine.default import DefaultDialect + class ParticipatingDialect(DefaultDialect): + construct_arguments = [ + (schema.Index, { + "x": 5, + "y": False, + "z_one": None + }), + (schema.ForeignKeyConstraint, { + "foobar": False + }) + ] + + class ParticipatingDialect2(DefaultDialect): + construct_arguments = [ + (schema.Index, { + "x": 9, + "y": True, + "pp": "default" + }), + (schema.Table, { + "*": None + }) + ] + + class NonParticipatingDialect(DefaultDialect): + construct_arguments = None + + def load(dialect_name): + if dialect_name == "participating": + return ParticipatingDialect + elif dialect_name == "participating2": + return ParticipatingDialect2 + elif dialect_name == "nonparticipating": + return NonParticipatingDialect + else: + raise exc.NoSuchModuleError("no dialect %r" % dialect_name) + with mock.patch("sqlalchemy.dialects.registry.load", load): + yield + + def test_participating(self): + with self._fixture(): + idx = Index('a', 'b', 'c', participating_y=True) + eq_( + idx.dialect_options, + {"participating": {"x": 5, "y": True, "z_one": None}} + ) + eq_( + idx.dialect_kwargs, + { + 'participating_y': True, + } + ) + + def test_nonparticipating(self): + with self._fixture(): + idx = Index('a', 'b', 'c', nonparticipating_y=True, nonparticipating_q=5) + eq_( + idx.dialect_kwargs, + { + 'nonparticipating_y': True, + 'nonparticipating_q': 5 + } + ) + + def test_unknown_dialect_warning(self): + with self._fixture(): + assert_raises_message( + exc.SAWarning, + "Can't validate argument 'unknown_y'; can't locate " + "any SQLAlchemy dialect named 'unknown'", + Index, 'a', 'b', 'c', unknown_y=True + ) + + def test_participating_bad_kw(self): + with self._fixture(): + assert_raises_message( + exc.ArgumentError, + "Argument 'participating_q_p_x' is not accepted by dialect " + "'participating' on behalf of " + "<class 'sqlalchemy.sql.schema.Index'>", + Index, 'a', 'b', 'c', participating_q_p_x=8 + ) + + def test_participating_unknown_schema_item(self): + with self._fixture(): + # the dialect doesn't include UniqueConstraint in + # its registry at all. + assert_raises_message( + exc.ArgumentError, + "Argument 'participating_q_p_x' is not accepted by dialect " + "'participating' on behalf of " + "<class 'sqlalchemy.sql.schema.UniqueConstraint'>", + UniqueConstraint, 'a', 'b', participating_q_p_x=8 + ) + + @testing.emits_warning("Can't validate") + def test_unknown_dialect_warning_still_populates(self): + with self._fixture(): + idx = Index('a', 'b', 'c', unknown_y=True) + eq_(idx.dialect_kwargs, {"unknown_y": True}) # still populates + + @testing.emits_warning("Can't validate") + def test_unknown_dialect_warning_still_populates_multiple(self): + with self._fixture(): + idx = Index('a', 'b', 'c', unknown_y=True, unknown_z=5, + otherunknown_foo='bar', participating_y=8) + eq_( + idx.dialect_options, + { + "unknown": {'y': True, 'z': 5, '*': None}, + "otherunknown": {'foo': 'bar', '*': None}, + "participating": {'x': 5, 'y': 8, 'z_one': None} + } + ) + eq_(idx.dialect_kwargs, + {'unknown_z': 5, 'participating_y': 8, + 'unknown_y': True, + 'otherunknown_foo': 'bar'} + ) # still populates + + def test_combined(self): + with self._fixture(): + idx = Index('a', 'b', 'c', participating_x=7, + nonparticipating_y=True) + + eq_( + idx.dialect_options, + { + 'participating': {'y': False, 'x': 7, 'z_one': None}, + 'nonparticipating': {'y': True, '*': None} + } + ) + eq_( + idx.dialect_kwargs, + { + 'participating_x': 7, + 'nonparticipating_y': True, + } + ) + + def test_multiple_participating(self): + with self._fixture(): + idx = Index('a', 'b', 'c', + participating_x=7, + participating2_x=15, + participating2_y="lazy" + ) + eq_( + idx.dialect_options, + { + "participating": {'x': 7, 'y': False, 'z_one': None}, + "participating2": {'x': 15, 'y': 'lazy', 'pp': 'default'}, + } + ) + eq_( + idx.dialect_kwargs, + { + 'participating_x': 7, + 'participating2_x': 15, + 'participating2_y': 'lazy' + } + ) + + def test_foreign_key_propagate(self): + with self._fixture(): + m = MetaData() + fk = ForeignKey('t2.id', participating_foobar=True) + t = Table('t', m, Column('id', Integer, fk)) + fkc = [c for c in t.constraints if isinstance(c, ForeignKeyConstraint)][0] + eq_( + fkc.dialect_kwargs, + {'participating_foobar': True} + ) + + def test_foreign_key_propagate_exceptions_delayed(self): + with self._fixture(): + m = MetaData() + fk = ForeignKey('t2.id', participating_fake=True) + c1 = Column('id', Integer, fk) + assert_raises_message( + exc.ArgumentError, + "Argument 'participating_fake' is not accepted by " + "dialect 'participating' on behalf of " + "<class 'sqlalchemy.sql.schema.ForeignKeyConstraint'>", + Table, 't', m, c1 + ) + + def test_wildcard(self): + with self._fixture(): + m = MetaData() + t = Table('x', m, Column('x', Integer), + participating2_xyz='foo', + participating2_engine='InnoDB', + ) + eq_( + t.dialect_kwargs, + { + 'participating2_xyz': 'foo', + 'participating2_engine': 'InnoDB' + } + ) + + def test_uninit_wildcard(self): + with self._fixture(): + m = MetaData() + t = Table('x', m, Column('x', Integer)) + eq_( + t.dialect_options['participating2'], {'*': None} + ) + eq_( + t.dialect_kwargs, {} + ) + + def test_not_contains_wildcard(self): + with self._fixture(): + m = MetaData() + t = Table('x', m, Column('x', Integer)) + assert 'foobar' not in t.dialect_options['participating2'] + + def test_contains_wildcard(self): + with self._fixture(): + m = MetaData() + t = Table('x', m, Column('x', Integer), participating2_foobar=5) + assert 'foobar' in t.dialect_options['participating2'] + + + def test_update(self): + with self._fixture(): + idx = Index('a', 'b', 'c', participating_x=20) + eq_(idx.dialect_kwargs, { + "participating_x": 20, + }) + idx._validate_dialect_kwargs({ + "participating_x": 25, + "participating_z_one": "default"}) + eq_(idx.dialect_options, { + "participating": {"x": 25, "y": False, "z_one": "default"} + }) + eq_(idx.dialect_kwargs, { + "participating_x": 25, + 'participating_z_one': "default" + }) + + idx._validate_dialect_kwargs({ + "participating_x": 25, + "participating_z_one": "default"}) + + eq_(idx.dialect_options, { + "participating": {"x": 25, "y": False, "z_one": "default"} + }) + eq_(idx.dialect_kwargs, { + "participating_x": 25, + 'participating_z_one': "default" + }) + + idx._validate_dialect_kwargs({ + "participating_y": True, + 'participating2_y': "p2y"}) + eq_(idx.dialect_options, { + "participating": {"x": 25, "y": True, "z_one": "default"}, + "participating2": {"y": "p2y", "pp": "default", "x": 9} + }) + eq_(idx.dialect_kwargs, { + "participating_x": 25, + "participating_y": True, + 'participating2_y': "p2y", + "participating_z_one": "default"}) |