diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-11-28 15:34:41 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-11-28 15:34:41 -0500 |
| commit | 71083b6977c064de69df844dccc145742890ef35 (patch) | |
| tree | d2c81068694cfb7721ec6972c42fa9eb686bbc86 /test/sql/test_metadata.py | |
| parent | 2d9387139af453f855d6f20ec2e5f86023f24e2f (diff) | |
| download | sqlalchemy-71083b6977c064de69df844dccc145742890ef35.tar.gz | |
combine test/engine/test_metadata.py and test/sql/test_columns.py into new
test/sql/test_metadata.py, [ticket:1970]
Diffstat (limited to 'test/sql/test_metadata.py')
| -rw-r--r-- | test/sql/test_metadata.py | 573 |
1 files changed, 573 insertions, 0 deletions
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py new file mode 100644 index 000000000..c5ef48154 --- /dev/null +++ b/test/sql/test_metadata.py @@ -0,0 +1,573 @@ +from test.lib.testing import assert_raises +from test.lib.testing import assert_raises_message +from test.lib.testing import emits_warning + +import pickle +from sqlalchemy import Integer, String, UniqueConstraint, \ + CheckConstraint, ForeignKey, MetaData, Sequence, \ + ForeignKeyConstraint, ColumnDefault, Index +from test.lib.schema import Table, Column +from sqlalchemy import schema, exc +import sqlalchemy as tsa +from test.lib import TestBase, ComparesTables, \ + AssertsCompiledSQL, testing, engines +from test.lib.testing import eq_ + +class MetaDataTest(TestBase, ComparesTables): + def test_metadata_connect(self): + metadata = MetaData() + t1 = Table('table1', metadata, + Column('col1', Integer, primary_key=True), + Column('col2', String(20))) + metadata.bind = testing.db + metadata.create_all() + try: + assert t1.count().scalar() == 0 + finally: + metadata.drop_all() + + def test_metadata_contains(self): + metadata = MetaData() + t1 = Table('t1', metadata, Column('x', Integer)) + t2 = Table('t2', metadata, Column('x', Integer), schema='foo') + t3 = Table('t2', MetaData(), Column('x', Integer)) + t4 = Table('t1', MetaData(), Column('x', Integer), schema='foo') + + assert "t1" in metadata + assert "foo.t2" in metadata + assert "t2" not in metadata + assert "foo.t1" not in metadata + assert t1 in metadata + assert t2 in metadata + assert t3 not in metadata + assert t4 not in metadata + + def test_uninitialized_column_copy(self): + for col in [ + Column('foo', String(), nullable=False), + Column('baz', String(), unique=True), + Column(Integer(), primary_key=True), + Column('bar', Integer(), Sequence('foo_seq'), primary_key=True, + key='bar'), + Column(Integer(), ForeignKey('bat.blah')), + Column('bar', Integer(), ForeignKey('bat.blah'), primary_key=True, + key='bar'), + Column('bar', Integer(), info={'foo':'bar'}), + ]: + c2 = col.copy() + for attr in ('name', 'type', 'nullable', + 'primary_key', 'key', 'unique', 'info'): + eq_(getattr(col, attr), getattr(c2, attr)) + eq_(len(col.foreign_keys), len(c2.foreign_keys)) + if col.default: + eq_(c2.default.name, 'foo_seq') + for a1, a2 in zip(col.foreign_keys, c2.foreign_keys): + assert a1 is not a2 + eq_(a2._colspec, 'bat.blah') + + def test_uninitialized_column_copy_events(self): + msgs = [] + def write(t, c): + msgs.append("attach %s.%s" % (t.name, c.name)) + c1 = Column('foo', String()) + c1._on_table_attach(write) + m = MetaData() + for i in xrange(3): + cx = c1.copy() + t = Table('foo%d' % i, m, cx) + eq_(msgs, ['attach foo0.foo', 'attach foo1.foo', 'attach foo2.foo']) + + def test_schema_collection_add(self): + metadata = MetaData() + + t1 = Table('t1', metadata, Column('x', Integer), schema='foo') + t2 = Table('t2', metadata, Column('x', Integer), schema='bar') + t3 = Table('t3', metadata, Column('x', Integer)) + + eq_(metadata._schemas, set(['foo', 'bar'])) + eq_(len(metadata.tables), 3) + + def test_schema_collection_remove(self): + metadata = MetaData() + + t1 = Table('t1', metadata, Column('x', Integer), schema='foo') + t2 = Table('t2', metadata, Column('x', Integer), schema='bar') + t3 = Table('t3', metadata, Column('x', Integer), schema='bar') + + metadata.remove(t3) + eq_(metadata._schemas, set(['foo', 'bar'])) + eq_(len(metadata.tables), 2) + + metadata.remove(t1) + eq_(metadata._schemas, set(['bar'])) + eq_(len(metadata.tables), 1) + + def test_schema_collection_remove_all(self): + metadata = MetaData() + + t1 = Table('t1', metadata, Column('x', Integer), schema='foo') + t2 = Table('t2', metadata, Column('x', Integer), schema='bar') + + metadata.clear() + eq_(metadata._schemas, set()) + eq_(len(metadata.tables), 0) + + def test_metadata_tables_immutable(self): + metadata = MetaData() + + t1 = Table('t1', metadata, Column('x', Integer)) + assert 't1' in metadata.tables + + assert_raises( + TypeError, + lambda: metadata.tables.pop('t1') + ) + + @testing.provide_metadata + def test_dupe_tables(self): + t1 = Table('table1', metadata, + Column('col1', Integer, primary_key=True), + Column('col2', String(20))) + + metadata.create_all() + t1 = Table('table1', metadata, autoload=True) + def go(): + t2 = Table('table1', metadata, + Column('col1', Integer, primary_key=True), + Column('col2', String(20))) + assert_raises_message( + tsa.exc.InvalidRequestError, + "Table 'table1' is already defined for this "\ + "MetaData instance. Specify 'useexisting=True' "\ + "to redefine options and columns on an existing "\ + "Table object.", + go + ) + + def test_fk_copy(self): + c1 = Column('foo', Integer) + c2 = Column('bar', Integer) + m = MetaData() + t1 = Table('t', m, c1, c2) + + kw = dict(onupdate="X", + ondelete="Y", use_alter=True, name='f1', + deferrable="Z", initially="Q", link_to_name=True) + + fk1 = ForeignKey(c1, **kw) + fk2 = ForeignKeyConstraint((c1,), (c2,), **kw) + + t1.append_constraint(fk2) + fk1c = fk1.copy() + fk2c = fk2.copy() + + for k in kw: + eq_(getattr(fk1c, k), kw[k]) + eq_(getattr(fk2c, k), kw[k]) + + def test_fk_construct(self): + c1 = Column('foo', Integer) + c2 = Column('bar', Integer) + m = MetaData() + t1 = Table('t', m, c1, c2) + fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1) + assert fk1 in t1.constraints + + @testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely') + def test_to_metadata(self): + meta = MetaData() + + table = Table('mytable', meta, + Column('myid', Integer, Sequence('foo_id_seq'), primary_key=True), + Column('name', String(40), nullable=True), + Column('foo', String(40), nullable=False, server_default='x', + server_onupdate='q'), + Column('bar', String(40), nullable=False, default='y', + onupdate='z'), + Column('description', String(30), + CheckConstraint("description='hi'")), + UniqueConstraint('name'), + test_needs_fk=True, + ) + + table2 = Table('othertable', meta, + Column('id', Integer, Sequence('foo_seq'), primary_key=True), + Column('myid', Integer, + ForeignKey('mytable.myid'), + ), + test_needs_fk=True, + ) + + def test_to_metadata(): + meta2 = MetaData() + table_c = table.tometadata(meta2) + table2_c = table2.tometadata(meta2) + return (table_c, table2_c) + + def test_pickle(): + meta.bind = testing.db + meta2 = pickle.loads(pickle.dumps(meta)) + assert meta2.bind is None + meta3 = pickle.loads(pickle.dumps(meta2)) + return (meta2.tables['mytable'], meta2.tables['othertable']) + + def test_pickle_via_reflect(): + # this is the most common use case, pickling the results of a + # database reflection + meta2 = MetaData(bind=testing.db) + t1 = Table('mytable', meta2, autoload=True) + t2 = Table('othertable', meta2, autoload=True) + meta3 = pickle.loads(pickle.dumps(meta2)) + assert meta3.bind is None + assert meta3.tables['mytable'] is not t1 + return (meta3.tables['mytable'], meta3.tables['othertable']) + + meta.create_all(testing.db) + try: + for test, has_constraints, reflect in (test_to_metadata, + True, False), (test_pickle, True, False), \ + (test_pickle_via_reflect, False, True): + table_c, table2_c = test() + self.assert_tables_equal(table, table_c) + self.assert_tables_equal(table2, table2_c) + assert table is not table_c + assert table.primary_key is not table_c.primary_key + assert list(table2_c.c.myid.foreign_keys)[0].column \ + is table_c.c.myid + assert list(table2_c.c.myid.foreign_keys)[0].column \ + is not table.c.myid + assert 'x' in str(table_c.c.foo.server_default.arg) + if not reflect: + assert isinstance(table_c.c.myid.default, Sequence) + assert str(table_c.c.foo.server_onupdate.arg) == 'q' + assert str(table_c.c.bar.default.arg) == 'y' + assert getattr(table_c.c.bar.onupdate.arg, 'arg', + table_c.c.bar.onupdate.arg) == 'z' + assert isinstance(table2_c.c.id.default, Sequence) + + # constraints dont get reflected for any dialect right + # now + + if has_constraints: + for c in table_c.c.description.constraints: + if isinstance(c, CheckConstraint): + break + else: + assert False + assert str(c.sqltext) == "description='hi'" + for c in table_c.constraints: + if isinstance(c, UniqueConstraint): + break + else: + assert False + assert c.columns.contains_column(table_c.c.name) + assert not c.columns.contains_column(table.c.name) + finally: + meta.drop_all(testing.db) + + def test_tometadata_with_schema(self): + meta = MetaData() + + table = Table('mytable', meta, + Column('myid', Integer, primary_key=True), + Column('name', String(40), nullable=True), + Column('description', String(30), + CheckConstraint("description='hi'")), + UniqueConstraint('name'), + test_needs_fk=True, + ) + + table2 = Table('othertable', meta, + Column('id', Integer, primary_key=True), + Column('myid', Integer, ForeignKey('mytable.myid')), + test_needs_fk=True, + ) + + meta2 = MetaData() + table_c = table.tometadata(meta2, schema='someschema') + table2_c = table2.tometadata(meta2, schema='someschema') + + eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid + == table2_c.c.myid)) + eq_(str(table_c.join(table2_c).onclause), + 'someschema.mytable.myid = someschema.othertable.myid') + + def test_tometadata_kwargs(self): + meta = MetaData() + + table = Table('mytable', meta, + Column('myid', Integer, primary_key=True), + mysql_engine='InnoDB', + ) + + meta2 = MetaData() + table_c = table.tometadata(meta2) + + eq_(table.kwargs,table_c.kwargs) + + def test_tometadata_indexes(self): + meta = MetaData() + + table = Table('mytable', meta, + Column('id', Integer, primary_key=True), + Column('data1', Integer, index=True), + Column('data2', Integer), + ) + Index('multi',table.c.data1,table.c.data2), + + meta2 = MetaData() + table_c = table.tometadata(meta2) + + def _get_key(i): + return [i.name,i.unique] + \ + sorted(i.kwargs.items()) + \ + i.columns.keys() + + eq_( + sorted([_get_key(i) for i in table.indexes]), + sorted([_get_key(i) for i in table_c.indexes]) + ) + + @emits_warning("Table '.+' already exists within the given MetaData") + def test_tometadata_already_there(self): + + meta1 = MetaData() + table1 = Table('mytable', meta1, + Column('myid', Integer, primary_key=True), + ) + meta2 = MetaData() + table2 = Table('mytable', meta2, + Column('yourid', Integer, primary_key=True), + ) + + meta3 = MetaData() + + table_c = table1.tometadata(meta2) + table_d = table2.tometadata(meta2) + + # d'oh! + assert table_c is table_d + + def test_tometadata_default_schema(self): + meta = MetaData() + + table = Table('mytable', meta, + Column('myid', Integer, primary_key=True), + Column('name', String(40), nullable=True), + Column('description', String(30), + CheckConstraint("description='hi'")), + UniqueConstraint('name'), + test_needs_fk=True, + schema='myschema', + ) + + table2 = Table('othertable', meta, + Column('id', Integer, primary_key=True), + Column('myid', Integer, ForeignKey('myschema.mytable.myid')), + test_needs_fk=True, + schema='myschema', + ) + + meta2 = MetaData() + table_c = table.tometadata(meta2) + table2_c = table2.tometadata(meta2) + + eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid + == table2_c.c.myid)) + eq_(str(table_c.join(table2_c).onclause), + 'myschema.mytable.myid = myschema.othertable.myid') + + def test_manual_dependencies(self): + meta = MetaData() + a = Table('a', meta, Column('foo', Integer)) + b = Table('b', meta, Column('foo', Integer)) + c = Table('c', meta, Column('foo', Integer)) + d = Table('d', meta, Column('foo', Integer)) + e = Table('e', meta, Column('foo', Integer)) + + e.add_is_dependent_on(c) + a.add_is_dependent_on(b) + b.add_is_dependent_on(d) + e.add_is_dependent_on(b) + c.add_is_dependent_on(a) + eq_( + meta.sorted_tables, + [d, b, a, c, e] + ) + + def test_tometadata_strip_schema(self): + meta = MetaData() + + table = Table('mytable', meta, + Column('myid', Integer, primary_key=True), + Column('name', String(40), nullable=True), + Column('description', String(30), + CheckConstraint("description='hi'")), + UniqueConstraint('name'), + test_needs_fk=True, + ) + + table2 = Table('othertable', meta, + Column('id', Integer, primary_key=True), + Column('myid', Integer, ForeignKey('mytable.myid')), + test_needs_fk=True, + ) + + meta2 = MetaData() + table_c = table.tometadata(meta2, schema=None) + table2_c = table2.tometadata(meta2, schema=None) + + eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid + == table2_c.c.myid)) + eq_(str(table_c.join(table2_c).onclause), + 'mytable.myid = othertable.myid') + + def test_nonexistent(self): + assert_raises(tsa.exc.NoSuchTableError, Table, + 'fake_table', + MetaData(testing.db), autoload=True) + + +class TableTest(TestBase, AssertsCompiledSQL): + def test_prefixes(self): + table1 = Table("temporary_table_1", MetaData(), + Column("col1", Integer), + prefixes = ["TEMPORARY"]) + + self.assert_compile( + schema.CreateTable(table1), + "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)" + ) + + table2 = Table("temporary_table_2", MetaData(), + Column("col1", Integer), + prefixes = ["VIRTUAL"]) + self.assert_compile( + schema.CreateTable(table2), + "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)" + ) + + def test_table_info(self): + metadata = MetaData() + t1 = Table('foo', metadata, info={'x':'y'}) + t2 = Table('bar', metadata, info={}) + t3 = Table('bat', metadata) + assert t1.info == {'x':'y'} + assert t2.info == {} + assert t3.info == {} + for t in (t1, t2, t3): + t.info['bar'] = 'zip' + assert t.info['bar'] == 'zip' + + def test_c_immutable(self): + m = MetaData() + t1 = Table('t', m, Column('x', Integer), Column('y', Integer)) + assert_raises( + TypeError, + t1.c.extend, [Column('z', Integer)] + ) + + def assign(): + t1.c['z'] = Column('z', Integer) + assert_raises( + TypeError, + assign + ) + + def assign(): + t1.c.z = Column('z', Integer) + assert_raises( + TypeError, + assign + ) + + +class ColumnDefinitionTest(TestBase): + """Test Column() construction.""" + + # flesh this out with explicit coverage... + + def columns(self): + return [ Column(Integer), + Column('b', Integer), + Column(Integer), + Column('d', Integer), + Column(Integer, name='e'), + Column(type_=Integer), + Column(Integer()), + Column('h', Integer()), + Column(type_=Integer()) ] + + def test_basic(self): + c = self.columns() + + for i, v in ((0, 'a'), (2, 'c'), (5, 'f'), (6, 'g'), (8, 'i')): + c[i].name = v + c[i].key = v + del i, v + + tbl = Table('table', MetaData(), *c) + + for i, col in enumerate(tbl.c): + assert col.name == c[i].name + + def test_incomplete(self): + c = self.columns() + + assert_raises(exc.ArgumentError, Table, 't', MetaData(), *c) + + def test_incomplete_key(self): + c = Column(Integer) + assert c.name is None + assert c.key is None + + c.name = 'named' + t = Table('t', MetaData(), c) + + assert c.name == 'named' + assert c.name == c.key + + + def test_bogus(self): + assert_raises(exc.ArgumentError, Column, 'foo', name='bar') + assert_raises(exc.ArgumentError, Column, 'foo', Integer, + type_=Integer()) + + +class ColumnOptionsTest(TestBase): + + def test_default_generators(self): + g1, g2 = Sequence('foo_id_seq'), ColumnDefault('f5') + assert Column(String, default=g1).default is g1 + assert Column(String, onupdate=g1).onupdate is g1 + assert Column(String, default=g2).default is g2 + assert Column(String, onupdate=g2).onupdate is g2 + + def test_type_required(self): + assert_raises(exc.ArgumentError, Column) + assert_raises(exc.ArgumentError, Column, "foo") + assert_raises(exc.ArgumentError, Column, default="foo") + assert_raises(exc.ArgumentError, Column, Sequence("a")) + assert_raises(exc.ArgumentError, Column, "foo", default="foo") + assert_raises(exc.ArgumentError, Column, "foo", Sequence("a")) + Column(ForeignKey('bar.id')) + Column("foo", ForeignKey('bar.id')) + Column(ForeignKey('bar.id'), default="foo") + Column(ForeignKey('bar.id'), Sequence("a")) + Column("foo", ForeignKey('bar.id'), default="foo") + Column("foo", ForeignKey('bar.id'), Sequence("a")) + + def test_column_info(self): + + c1 = Column('foo', String, info={'x':'y'}) + c2 = Column('bar', String, info={}) + c3 = Column('bat', String) + assert c1.info == {'x':'y'} + assert c2.info == {} + assert c3.info == {} + + for c in (c1, c2, c3): + c.info['bar'] = 'zip' + assert c.info['bar'] == 'zip' + +
\ No newline at end of file |
