summaryrefslogtreecommitdiff
path: root/test/sql/test_metadata.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2010-11-28 15:34:41 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2010-11-28 15:34:41 -0500
commit71083b6977c064de69df844dccc145742890ef35 (patch)
treed2c81068694cfb7721ec6972c42fa9eb686bbc86 /test/sql/test_metadata.py
parent2d9387139af453f855d6f20ec2e5f86023f24e2f (diff)
downloadsqlalchemy-71083b6977c064de69df844dccc145742890ef35.tar.gz
combine test/engine/test_metadata.py and test/sql/test_columns.py into new
test/sql/test_metadata.py, [ticket:1970]
Diffstat (limited to 'test/sql/test_metadata.py')
-rw-r--r--test/sql/test_metadata.py573
1 files changed, 573 insertions, 0 deletions
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py
new file mode 100644
index 000000000..c5ef48154
--- /dev/null
+++ b/test/sql/test_metadata.py
@@ -0,0 +1,573 @@
+from test.lib.testing import assert_raises
+from test.lib.testing import assert_raises_message
+from test.lib.testing import emits_warning
+
+import pickle
+from sqlalchemy import Integer, String, UniqueConstraint, \
+ CheckConstraint, ForeignKey, MetaData, Sequence, \
+ ForeignKeyConstraint, ColumnDefault, Index
+from test.lib.schema import Table, Column
+from sqlalchemy import schema, exc
+import sqlalchemy as tsa
+from test.lib import TestBase, ComparesTables, \
+ AssertsCompiledSQL, testing, engines
+from test.lib.testing import eq_
+
+class MetaDataTest(TestBase, ComparesTables):
+ def test_metadata_connect(self):
+ metadata = MetaData()
+ t1 = Table('table1', metadata,
+ Column('col1', Integer, primary_key=True),
+ Column('col2', String(20)))
+ metadata.bind = testing.db
+ metadata.create_all()
+ try:
+ assert t1.count().scalar() == 0
+ finally:
+ metadata.drop_all()
+
+ def test_metadata_contains(self):
+ metadata = MetaData()
+ t1 = Table('t1', metadata, Column('x', Integer))
+ t2 = Table('t2', metadata, Column('x', Integer), schema='foo')
+ t3 = Table('t2', MetaData(), Column('x', Integer))
+ t4 = Table('t1', MetaData(), Column('x', Integer), schema='foo')
+
+ assert "t1" in metadata
+ assert "foo.t2" in metadata
+ assert "t2" not in metadata
+ assert "foo.t1" not in metadata
+ assert t1 in metadata
+ assert t2 in metadata
+ assert t3 not in metadata
+ assert t4 not in metadata
+
+ def test_uninitialized_column_copy(self):
+ for col in [
+ Column('foo', String(), nullable=False),
+ Column('baz', String(), unique=True),
+ Column(Integer(), primary_key=True),
+ Column('bar', Integer(), Sequence('foo_seq'), primary_key=True,
+ key='bar'),
+ Column(Integer(), ForeignKey('bat.blah')),
+ Column('bar', Integer(), ForeignKey('bat.blah'), primary_key=True,
+ key='bar'),
+ Column('bar', Integer(), info={'foo':'bar'}),
+ ]:
+ c2 = col.copy()
+ for attr in ('name', 'type', 'nullable',
+ 'primary_key', 'key', 'unique', 'info'):
+ eq_(getattr(col, attr), getattr(c2, attr))
+ eq_(len(col.foreign_keys), len(c2.foreign_keys))
+ if col.default:
+ eq_(c2.default.name, 'foo_seq')
+ for a1, a2 in zip(col.foreign_keys, c2.foreign_keys):
+ assert a1 is not a2
+ eq_(a2._colspec, 'bat.blah')
+
+ def test_uninitialized_column_copy_events(self):
+ msgs = []
+ def write(t, c):
+ msgs.append("attach %s.%s" % (t.name, c.name))
+ c1 = Column('foo', String())
+ c1._on_table_attach(write)
+ m = MetaData()
+ for i in xrange(3):
+ cx = c1.copy()
+ t = Table('foo%d' % i, m, cx)
+ eq_(msgs, ['attach foo0.foo', 'attach foo1.foo', 'attach foo2.foo'])
+
+ def test_schema_collection_add(self):
+ metadata = MetaData()
+
+ t1 = Table('t1', metadata, Column('x', Integer), schema='foo')
+ t2 = Table('t2', metadata, Column('x', Integer), schema='bar')
+ t3 = Table('t3', metadata, Column('x', Integer))
+
+ eq_(metadata._schemas, set(['foo', 'bar']))
+ eq_(len(metadata.tables), 3)
+
+ def test_schema_collection_remove(self):
+ metadata = MetaData()
+
+ t1 = Table('t1', metadata, Column('x', Integer), schema='foo')
+ t2 = Table('t2', metadata, Column('x', Integer), schema='bar')
+ t3 = Table('t3', metadata, Column('x', Integer), schema='bar')
+
+ metadata.remove(t3)
+ eq_(metadata._schemas, set(['foo', 'bar']))
+ eq_(len(metadata.tables), 2)
+
+ metadata.remove(t1)
+ eq_(metadata._schemas, set(['bar']))
+ eq_(len(metadata.tables), 1)
+
+ def test_schema_collection_remove_all(self):
+ metadata = MetaData()
+
+ t1 = Table('t1', metadata, Column('x', Integer), schema='foo')
+ t2 = Table('t2', metadata, Column('x', Integer), schema='bar')
+
+ metadata.clear()
+ eq_(metadata._schemas, set())
+ eq_(len(metadata.tables), 0)
+
+ def test_metadata_tables_immutable(self):
+ metadata = MetaData()
+
+ t1 = Table('t1', metadata, Column('x', Integer))
+ assert 't1' in metadata.tables
+
+ assert_raises(
+ TypeError,
+ lambda: metadata.tables.pop('t1')
+ )
+
+ @testing.provide_metadata
+ def test_dupe_tables(self):
+ t1 = Table('table1', metadata,
+ Column('col1', Integer, primary_key=True),
+ Column('col2', String(20)))
+
+ metadata.create_all()
+ t1 = Table('table1', metadata, autoload=True)
+ def go():
+ t2 = Table('table1', metadata,
+ Column('col1', Integer, primary_key=True),
+ Column('col2', String(20)))
+ assert_raises_message(
+ tsa.exc.InvalidRequestError,
+ "Table 'table1' is already defined for this "\
+ "MetaData instance. Specify 'useexisting=True' "\
+ "to redefine options and columns on an existing "\
+ "Table object.",
+ go
+ )
+
+ def test_fk_copy(self):
+ c1 = Column('foo', Integer)
+ c2 = Column('bar', Integer)
+ m = MetaData()
+ t1 = Table('t', m, c1, c2)
+
+ kw = dict(onupdate="X",
+ ondelete="Y", use_alter=True, name='f1',
+ deferrable="Z", initially="Q", link_to_name=True)
+
+ fk1 = ForeignKey(c1, **kw)
+ fk2 = ForeignKeyConstraint((c1,), (c2,), **kw)
+
+ t1.append_constraint(fk2)
+ fk1c = fk1.copy()
+ fk2c = fk2.copy()
+
+ for k in kw:
+ eq_(getattr(fk1c, k), kw[k])
+ eq_(getattr(fk2c, k), kw[k])
+
+ def test_fk_construct(self):
+ c1 = Column('foo', Integer)
+ c2 = Column('bar', Integer)
+ m = MetaData()
+ t1 = Table('t', m, c1, c2)
+ fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1)
+ assert fk1 in t1.constraints
+
+ @testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely')
+ def test_to_metadata(self):
+ meta = MetaData()
+
+ table = Table('mytable', meta,
+ Column('myid', Integer, Sequence('foo_id_seq'), primary_key=True),
+ Column('name', String(40), nullable=True),
+ Column('foo', String(40), nullable=False, server_default='x',
+ server_onupdate='q'),
+ Column('bar', String(40), nullable=False, default='y',
+ onupdate='z'),
+ Column('description', String(30),
+ CheckConstraint("description='hi'")),
+ UniqueConstraint('name'),
+ test_needs_fk=True,
+ )
+
+ table2 = Table('othertable', meta,
+ Column('id', Integer, Sequence('foo_seq'), primary_key=True),
+ Column('myid', Integer,
+ ForeignKey('mytable.myid'),
+ ),
+ test_needs_fk=True,
+ )
+
+ def test_to_metadata():
+ meta2 = MetaData()
+ table_c = table.tometadata(meta2)
+ table2_c = table2.tometadata(meta2)
+ return (table_c, table2_c)
+
+ def test_pickle():
+ meta.bind = testing.db
+ meta2 = pickle.loads(pickle.dumps(meta))
+ assert meta2.bind is None
+ meta3 = pickle.loads(pickle.dumps(meta2))
+ return (meta2.tables['mytable'], meta2.tables['othertable'])
+
+ def test_pickle_via_reflect():
+ # this is the most common use case, pickling the results of a
+ # database reflection
+ meta2 = MetaData(bind=testing.db)
+ t1 = Table('mytable', meta2, autoload=True)
+ t2 = Table('othertable', meta2, autoload=True)
+ meta3 = pickle.loads(pickle.dumps(meta2))
+ assert meta3.bind is None
+ assert meta3.tables['mytable'] is not t1
+ return (meta3.tables['mytable'], meta3.tables['othertable'])
+
+ meta.create_all(testing.db)
+ try:
+ for test, has_constraints, reflect in (test_to_metadata,
+ True, False), (test_pickle, True, False), \
+ (test_pickle_via_reflect, False, True):
+ table_c, table2_c = test()
+ self.assert_tables_equal(table, table_c)
+ self.assert_tables_equal(table2, table2_c)
+ assert table is not table_c
+ assert table.primary_key is not table_c.primary_key
+ assert list(table2_c.c.myid.foreign_keys)[0].column \
+ is table_c.c.myid
+ assert list(table2_c.c.myid.foreign_keys)[0].column \
+ is not table.c.myid
+ assert 'x' in str(table_c.c.foo.server_default.arg)
+ if not reflect:
+ assert isinstance(table_c.c.myid.default, Sequence)
+ assert str(table_c.c.foo.server_onupdate.arg) == 'q'
+ assert str(table_c.c.bar.default.arg) == 'y'
+ assert getattr(table_c.c.bar.onupdate.arg, 'arg',
+ table_c.c.bar.onupdate.arg) == 'z'
+ assert isinstance(table2_c.c.id.default, Sequence)
+
+ # constraints dont get reflected for any dialect right
+ # now
+
+ if has_constraints:
+ for c in table_c.c.description.constraints:
+ if isinstance(c, CheckConstraint):
+ break
+ else:
+ assert False
+ assert str(c.sqltext) == "description='hi'"
+ for c in table_c.constraints:
+ if isinstance(c, UniqueConstraint):
+ break
+ else:
+ assert False
+ assert c.columns.contains_column(table_c.c.name)
+ assert not c.columns.contains_column(table.c.name)
+ finally:
+ meta.drop_all(testing.db)
+
+ def test_tometadata_with_schema(self):
+ meta = MetaData()
+
+ table = Table('mytable', meta,
+ Column('myid', Integer, primary_key=True),
+ Column('name', String(40), nullable=True),
+ Column('description', String(30),
+ CheckConstraint("description='hi'")),
+ UniqueConstraint('name'),
+ test_needs_fk=True,
+ )
+
+ table2 = Table('othertable', meta,
+ Column('id', Integer, primary_key=True),
+ Column('myid', Integer, ForeignKey('mytable.myid')),
+ test_needs_fk=True,
+ )
+
+ meta2 = MetaData()
+ table_c = table.tometadata(meta2, schema='someschema')
+ table2_c = table2.tometadata(meta2, schema='someschema')
+
+ eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid
+ == table2_c.c.myid))
+ eq_(str(table_c.join(table2_c).onclause),
+ 'someschema.mytable.myid = someschema.othertable.myid')
+
+ def test_tometadata_kwargs(self):
+ meta = MetaData()
+
+ table = Table('mytable', meta,
+ Column('myid', Integer, primary_key=True),
+ mysql_engine='InnoDB',
+ )
+
+ meta2 = MetaData()
+ table_c = table.tometadata(meta2)
+
+ eq_(table.kwargs,table_c.kwargs)
+
+ def test_tometadata_indexes(self):
+ meta = MetaData()
+
+ table = Table('mytable', meta,
+ Column('id', Integer, primary_key=True),
+ Column('data1', Integer, index=True),
+ Column('data2', Integer),
+ )
+ Index('multi',table.c.data1,table.c.data2),
+
+ meta2 = MetaData()
+ table_c = table.tometadata(meta2)
+
+ def _get_key(i):
+ return [i.name,i.unique] + \
+ sorted(i.kwargs.items()) + \
+ i.columns.keys()
+
+ eq_(
+ sorted([_get_key(i) for i in table.indexes]),
+ sorted([_get_key(i) for i in table_c.indexes])
+ )
+
+ @emits_warning("Table '.+' already exists within the given MetaData")
+ def test_tometadata_already_there(self):
+
+ meta1 = MetaData()
+ table1 = Table('mytable', meta1,
+ Column('myid', Integer, primary_key=True),
+ )
+ meta2 = MetaData()
+ table2 = Table('mytable', meta2,
+ Column('yourid', Integer, primary_key=True),
+ )
+
+ meta3 = MetaData()
+
+ table_c = table1.tometadata(meta2)
+ table_d = table2.tometadata(meta2)
+
+ # d'oh!
+ assert table_c is table_d
+
+ def test_tometadata_default_schema(self):
+ meta = MetaData()
+
+ table = Table('mytable', meta,
+ Column('myid', Integer, primary_key=True),
+ Column('name', String(40), nullable=True),
+ Column('description', String(30),
+ CheckConstraint("description='hi'")),
+ UniqueConstraint('name'),
+ test_needs_fk=True,
+ schema='myschema',
+ )
+
+ table2 = Table('othertable', meta,
+ Column('id', Integer, primary_key=True),
+ Column('myid', Integer, ForeignKey('myschema.mytable.myid')),
+ test_needs_fk=True,
+ schema='myschema',
+ )
+
+ meta2 = MetaData()
+ table_c = table.tometadata(meta2)
+ table2_c = table2.tometadata(meta2)
+
+ eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid
+ == table2_c.c.myid))
+ eq_(str(table_c.join(table2_c).onclause),
+ 'myschema.mytable.myid = myschema.othertable.myid')
+
+ def test_manual_dependencies(self):
+ meta = MetaData()
+ a = Table('a', meta, Column('foo', Integer))
+ b = Table('b', meta, Column('foo', Integer))
+ c = Table('c', meta, Column('foo', Integer))
+ d = Table('d', meta, Column('foo', Integer))
+ e = Table('e', meta, Column('foo', Integer))
+
+ e.add_is_dependent_on(c)
+ a.add_is_dependent_on(b)
+ b.add_is_dependent_on(d)
+ e.add_is_dependent_on(b)
+ c.add_is_dependent_on(a)
+ eq_(
+ meta.sorted_tables,
+ [d, b, a, c, e]
+ )
+
+ def test_tometadata_strip_schema(self):
+ meta = MetaData()
+
+ table = Table('mytable', meta,
+ Column('myid', Integer, primary_key=True),
+ Column('name', String(40), nullable=True),
+ Column('description', String(30),
+ CheckConstraint("description='hi'")),
+ UniqueConstraint('name'),
+ test_needs_fk=True,
+ )
+
+ table2 = Table('othertable', meta,
+ Column('id', Integer, primary_key=True),
+ Column('myid', Integer, ForeignKey('mytable.myid')),
+ test_needs_fk=True,
+ )
+
+ meta2 = MetaData()
+ table_c = table.tometadata(meta2, schema=None)
+ table2_c = table2.tometadata(meta2, schema=None)
+
+ eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid
+ == table2_c.c.myid))
+ eq_(str(table_c.join(table2_c).onclause),
+ 'mytable.myid = othertable.myid')
+
+ def test_nonexistent(self):
+ assert_raises(tsa.exc.NoSuchTableError, Table,
+ 'fake_table',
+ MetaData(testing.db), autoload=True)
+
+
+class TableTest(TestBase, AssertsCompiledSQL):
+ def test_prefixes(self):
+ table1 = Table("temporary_table_1", MetaData(),
+ Column("col1", Integer),
+ prefixes = ["TEMPORARY"])
+
+ self.assert_compile(
+ schema.CreateTable(table1),
+ "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)"
+ )
+
+ table2 = Table("temporary_table_2", MetaData(),
+ Column("col1", Integer),
+ prefixes = ["VIRTUAL"])
+ self.assert_compile(
+ schema.CreateTable(table2),
+ "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)"
+ )
+
+ def test_table_info(self):
+ metadata = MetaData()
+ t1 = Table('foo', metadata, info={'x':'y'})
+ t2 = Table('bar', metadata, info={})
+ t3 = Table('bat', metadata)
+ assert t1.info == {'x':'y'}
+ assert t2.info == {}
+ assert t3.info == {}
+ for t in (t1, t2, t3):
+ t.info['bar'] = 'zip'
+ assert t.info['bar'] == 'zip'
+
+ def test_c_immutable(self):
+ m = MetaData()
+ t1 = Table('t', m, Column('x', Integer), Column('y', Integer))
+ assert_raises(
+ TypeError,
+ t1.c.extend, [Column('z', Integer)]
+ )
+
+ def assign():
+ t1.c['z'] = Column('z', Integer)
+ assert_raises(
+ TypeError,
+ assign
+ )
+
+ def assign():
+ t1.c.z = Column('z', Integer)
+ assert_raises(
+ TypeError,
+ assign
+ )
+
+
+class ColumnDefinitionTest(TestBase):
+ """Test Column() construction."""
+
+ # flesh this out with explicit coverage...
+
+ def columns(self):
+ return [ Column(Integer),
+ Column('b', Integer),
+ Column(Integer),
+ Column('d', Integer),
+ Column(Integer, name='e'),
+ Column(type_=Integer),
+ Column(Integer()),
+ Column('h', Integer()),
+ Column(type_=Integer()) ]
+
+ def test_basic(self):
+ c = self.columns()
+
+ for i, v in ((0, 'a'), (2, 'c'), (5, 'f'), (6, 'g'), (8, 'i')):
+ c[i].name = v
+ c[i].key = v
+ del i, v
+
+ tbl = Table('table', MetaData(), *c)
+
+ for i, col in enumerate(tbl.c):
+ assert col.name == c[i].name
+
+ def test_incomplete(self):
+ c = self.columns()
+
+ assert_raises(exc.ArgumentError, Table, 't', MetaData(), *c)
+
+ def test_incomplete_key(self):
+ c = Column(Integer)
+ assert c.name is None
+ assert c.key is None
+
+ c.name = 'named'
+ t = Table('t', MetaData(), c)
+
+ assert c.name == 'named'
+ assert c.name == c.key
+
+
+ def test_bogus(self):
+ assert_raises(exc.ArgumentError, Column, 'foo', name='bar')
+ assert_raises(exc.ArgumentError, Column, 'foo', Integer,
+ type_=Integer())
+
+
+class ColumnOptionsTest(TestBase):
+
+ def test_default_generators(self):
+ g1, g2 = Sequence('foo_id_seq'), ColumnDefault('f5')
+ assert Column(String, default=g1).default is g1
+ assert Column(String, onupdate=g1).onupdate is g1
+ assert Column(String, default=g2).default is g2
+ assert Column(String, onupdate=g2).onupdate is g2
+
+ def test_type_required(self):
+ assert_raises(exc.ArgumentError, Column)
+ assert_raises(exc.ArgumentError, Column, "foo")
+ assert_raises(exc.ArgumentError, Column, default="foo")
+ assert_raises(exc.ArgumentError, Column, Sequence("a"))
+ assert_raises(exc.ArgumentError, Column, "foo", default="foo")
+ assert_raises(exc.ArgumentError, Column, "foo", Sequence("a"))
+ Column(ForeignKey('bar.id'))
+ Column("foo", ForeignKey('bar.id'))
+ Column(ForeignKey('bar.id'), default="foo")
+ Column(ForeignKey('bar.id'), Sequence("a"))
+ Column("foo", ForeignKey('bar.id'), default="foo")
+ Column("foo", ForeignKey('bar.id'), Sequence("a"))
+
+ def test_column_info(self):
+
+ c1 = Column('foo', String, info={'x':'y'})
+ c2 = Column('bar', String, info={})
+ c3 = Column('bat', String)
+ assert c1.info == {'x':'y'}
+ assert c2.info == {}
+ assert c3.info == {}
+
+ for c in (c1, c2, c3):
+ c.info['bar'] = 'zip'
+ assert c.info['bar'] == 'zip'
+
+ \ No newline at end of file