summaryrefslogtreecommitdiff
path: root/test/dialect/postgresql/test_reflection.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2013-06-28 22:30:11 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2013-06-28 22:30:11 -0400
commit1c23741b8e045d266d0ecbed975952547444a5fa (patch)
tree366b9619c81a271bb3f05a37867ddb2124467c1d /test/dialect/postgresql/test_reflection.py
parent83f3dbc83d1066216084a01b32cddcc090f697d5 (diff)
downloadsqlalchemy-1c23741b8e045d266d0ecbed975952547444a5fa.tar.gz
refactor test suites for postgresql, mssql, mysql into packages.
Diffstat (limited to 'test/dialect/postgresql/test_reflection.py')
-rw-r--r--test/dialect/postgresql/test_reflection.py460
1 files changed, 460 insertions, 0 deletions
diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py
new file mode 100644
index 000000000..fb399b546
--- /dev/null
+++ b/test/dialect/postgresql/test_reflection.py
@@ -0,0 +1,460 @@
+# coding: utf-8
+
+from sqlalchemy.testing.assertions import eq_, assert_raises, \
+ assert_raises_message, is_, AssertsExecutionResults, \
+ AssertsCompiledSQL, ComparesTables
+from sqlalchemy.testing import engines, fixtures
+from sqlalchemy import testing
+from sqlalchemy import Table, Column, select, MetaData, text, Integer, \
+ String, Sequence, ForeignKey, join, Numeric, \
+ PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \
+ func, literal_column, literal, bindparam, cast, extract, \
+ SmallInteger, Enum, REAL, update, insert, Index, delete, \
+ and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text
+from sqlalchemy import exc
+from sqlalchemy.dialects.postgresql import base as postgresql
+import logging
+
+class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
+
+ """Test PostgreSQL domains"""
+
+ __only_on__ = 'postgresql'
+
+ @classmethod
+ def setup_class(cls):
+ con = testing.db.connect()
+ for ddl in \
+ 'CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42', \
+ 'CREATE DOMAIN test_schema.testdomain INTEGER DEFAULT 0', \
+ "CREATE TYPE testtype AS ENUM ('test')", \
+ 'CREATE DOMAIN enumdomain AS testtype'\
+ :
+ try:
+ con.execute(ddl)
+ except exc.DBAPIError as e:
+ if not 'already exists' in str(e):
+ raise e
+ con.execute('CREATE TABLE testtable (question integer, answer '
+ 'testdomain)')
+ con.execute('CREATE TABLE test_schema.testtable(question '
+ 'integer, answer test_schema.testdomain, anything '
+ 'integer)')
+ con.execute('CREATE TABLE crosschema (question integer, answer '
+ 'test_schema.testdomain)')
+
+ con.execute('CREATE TABLE enum_test (id integer, data enumdomain)')
+
+ @classmethod
+ def teardown_class(cls):
+ con = testing.db.connect()
+ con.execute('DROP TABLE testtable')
+ con.execute('DROP TABLE test_schema.testtable')
+ con.execute('DROP TABLE crosschema')
+ con.execute('DROP DOMAIN testdomain')
+ con.execute('DROP DOMAIN test_schema.testdomain')
+ con.execute("DROP TABLE enum_test")
+ con.execute("DROP DOMAIN enumdomain")
+ con.execute("DROP TYPE testtype")
+
+ def test_table_is_reflected(self):
+ metadata = MetaData(testing.db)
+ table = Table('testtable', metadata, autoload=True)
+ eq_(set(table.columns.keys()), set(['question', 'answer']),
+ "Columns of reflected table didn't equal expected columns")
+ assert isinstance(table.c.answer.type, Integer)
+
+ def test_domain_is_reflected(self):
+ metadata = MetaData(testing.db)
+ table = Table('testtable', metadata, autoload=True)
+ eq_(str(table.columns.answer.server_default.arg), '42',
+ "Reflected default value didn't equal expected value")
+ assert not table.columns.answer.nullable, \
+ 'Expected reflected column to not be nullable.'
+
+ def test_enum_domain_is_reflected(self):
+ metadata = MetaData(testing.db)
+ table = Table('enum_test', metadata, autoload=True)
+ eq_(
+ table.c.data.type.enums,
+ ('test', )
+ )
+
+ def test_table_is_reflected_test_schema(self):
+ metadata = MetaData(testing.db)
+ table = Table('testtable', metadata, autoload=True,
+ schema='test_schema')
+ eq_(set(table.columns.keys()), set(['question', 'answer',
+ 'anything']),
+ "Columns of reflected table didn't equal expected columns")
+ assert isinstance(table.c.anything.type, Integer)
+
+ def test_schema_domain_is_reflected(self):
+ metadata = MetaData(testing.db)
+ table = Table('testtable', metadata, autoload=True,
+ schema='test_schema')
+ eq_(str(table.columns.answer.server_default.arg), '0',
+ "Reflected default value didn't equal expected value")
+ assert table.columns.answer.nullable, \
+ 'Expected reflected column to be nullable.'
+
+ def test_crosschema_domain_is_reflected(self):
+ metadata = MetaData(testing.db)
+ table = Table('crosschema', metadata, autoload=True)
+ eq_(str(table.columns.answer.server_default.arg), '0',
+ "Reflected default value didn't equal expected value")
+ assert table.columns.answer.nullable, \
+ 'Expected reflected column to be nullable.'
+
+ def test_unknown_types(self):
+ from sqlalchemy.databases import postgresql
+ ischema_names = postgresql.PGDialect.ischema_names
+ postgresql.PGDialect.ischema_names = {}
+ try:
+ m2 = MetaData(testing.db)
+ assert_raises(exc.SAWarning, Table, 'testtable', m2,
+ autoload=True)
+
+ @testing.emits_warning('Did not recognize type')
+ def warns():
+ m3 = MetaData(testing.db)
+ t3 = Table('testtable', m3, autoload=True)
+ assert t3.c.answer.type.__class__ == sa.types.NullType
+ finally:
+ postgresql.PGDialect.ischema_names = ischema_names
+
+
+class ReflectionTest(fixtures.TestBase):
+ __only_on__ = 'postgresql'
+
+ @testing.fails_if(('postgresql', '<', (8, 4)),
+ "newer query is bypassed due to unsupported SQL functions")
+ @testing.provide_metadata
+ def test_reflected_primary_key_order(self):
+ meta1 = self.metadata
+ subject = Table('subject', meta1,
+ Column('p1', Integer, primary_key=True),
+ Column('p2', Integer, primary_key=True),
+ PrimaryKeyConstraint('p2', 'p1')
+ )
+ meta1.create_all()
+ meta2 = MetaData(testing.db)
+ subject = Table('subject', meta2, autoload=True)
+ eq_(subject.primary_key.columns.keys(), ['p2', 'p1'])
+
+ @testing.provide_metadata
+ def test_pg_weirdchar_reflection(self):
+ meta1 = self.metadata
+ subject = Table('subject', meta1, Column('id$', Integer,
+ primary_key=True))
+ referer = Table('referer', meta1, Column('id', Integer,
+ primary_key=True), Column('ref', Integer,
+ ForeignKey('subject.id$')))
+ meta1.create_all()
+ meta2 = MetaData(testing.db)
+ subject = Table('subject', meta2, autoload=True)
+ referer = Table('referer', meta2, autoload=True)
+ self.assert_((subject.c['id$']
+ == referer.c.ref).compare(
+ subject.join(referer).onclause))
+
+ @testing.provide_metadata
+ def test_renamed_sequence_reflection(self):
+ metadata = self.metadata
+ t = Table('t', metadata, Column('id', Integer, primary_key=True))
+ metadata.create_all()
+ m2 = MetaData(testing.db)
+ t2 = Table('t', m2, autoload=True, implicit_returning=False)
+ eq_(t2.c.id.server_default.arg.text,
+ "nextval('t_id_seq'::regclass)")
+ r = t2.insert().execute()
+ eq_(r.inserted_primary_key, [1])
+ testing.db.connect().execution_options(autocommit=True).\
+ execute('alter table t_id_seq rename to foobar_id_seq'
+ )
+ m3 = MetaData(testing.db)
+ t3 = Table('t', m3, autoload=True, implicit_returning=False)
+ eq_(t3.c.id.server_default.arg.text,
+ "nextval('foobar_id_seq'::regclass)")
+ r = t3.insert().execute()
+ eq_(r.inserted_primary_key, [2])
+
+ @testing.provide_metadata
+ def test_renamed_pk_reflection(self):
+ metadata = self.metadata
+ t = Table('t', metadata, Column('id', Integer, primary_key=True))
+ metadata.create_all()
+ testing.db.connect().execution_options(autocommit=True).\
+ execute('alter table t rename id to t_id')
+ m2 = MetaData(testing.db)
+ t2 = Table('t', m2, autoload=True)
+ eq_([c.name for c in t2.primary_key], ['t_id'])
+
+ @testing.provide_metadata
+ def test_schema_reflection(self):
+ """note: this test requires that the 'test_schema' schema be
+ separate and accessible by the test user"""
+
+ meta1 = self.metadata
+
+ users = Table('users', meta1, Column('user_id', Integer,
+ primary_key=True), Column('user_name',
+ String(30), nullable=False), schema='test_schema')
+ addresses = Table(
+ 'email_addresses',
+ meta1,
+ Column('address_id', Integer, primary_key=True),
+ Column('remote_user_id', Integer,
+ ForeignKey(users.c.user_id)),
+ Column('email_address', String(20)),
+ schema='test_schema',
+ )
+ meta1.create_all()
+ meta2 = MetaData(testing.db)
+ addresses = Table('email_addresses', meta2, autoload=True,
+ schema='test_schema')
+ users = Table('users', meta2, mustexist=True,
+ schema='test_schema')
+ j = join(users, addresses)
+ self.assert_((users.c.user_id
+ == addresses.c.remote_user_id).compare(j.onclause))
+
+ @testing.provide_metadata
+ def test_schema_reflection_2(self):
+ meta1 = self.metadata
+ subject = Table('subject', meta1, Column('id', Integer,
+ primary_key=True))
+ referer = Table('referer', meta1, Column('id', Integer,
+ primary_key=True), Column('ref', Integer,
+ ForeignKey('subject.id')), schema='test_schema')
+ meta1.create_all()
+ meta2 = MetaData(testing.db)
+ subject = Table('subject', meta2, autoload=True)
+ referer = Table('referer', meta2, schema='test_schema',
+ autoload=True)
+ self.assert_((subject.c.id
+ == referer.c.ref).compare(
+ subject.join(referer).onclause))
+
+ @testing.provide_metadata
+ def test_schema_reflection_3(self):
+ meta1 = self.metadata
+ subject = Table('subject', meta1, Column('id', Integer,
+ primary_key=True), schema='test_schema_2')
+ referer = Table('referer', meta1, Column('id', Integer,
+ primary_key=True), Column('ref', Integer,
+ ForeignKey('test_schema_2.subject.id')),
+ schema='test_schema')
+ meta1.create_all()
+ meta2 = MetaData(testing.db)
+ subject = Table('subject', meta2, autoload=True,
+ schema='test_schema_2')
+ referer = Table('referer', meta2, schema='test_schema',
+ autoload=True)
+ self.assert_((subject.c.id
+ == referer.c.ref).compare(
+ subject.join(referer).onclause))
+
+ @testing.provide_metadata
+ def test_uppercase_lowercase_table(self):
+ metadata = self.metadata
+
+ a_table = Table('a', metadata, Column('x', Integer))
+ A_table = Table('A', metadata, Column('x', Integer))
+
+ a_table.create()
+ assert testing.db.has_table("a")
+ assert not testing.db.has_table("A")
+ A_table.create(checkfirst=True)
+ assert testing.db.has_table("A")
+
+ def test_uppercase_lowercase_sequence(self):
+
+ a_seq = Sequence('a')
+ A_seq = Sequence('A')
+
+ a_seq.create(testing.db)
+ assert testing.db.dialect.has_sequence(testing.db, "a")
+ assert not testing.db.dialect.has_sequence(testing.db, "A")
+ A_seq.create(testing.db, checkfirst=True)
+ assert testing.db.dialect.has_sequence(testing.db, "A")
+
+ a_seq.drop(testing.db)
+ A_seq.drop(testing.db)
+
+ def test_schema_reflection_multi_search_path(self):
+ """test the 'set the same schema' rule when
+ multiple schemas/search paths are in effect."""
+
+ db = engines.testing_engine()
+ conn = db.connect()
+ trans = conn.begin()
+ try:
+ conn.execute("set search_path to test_schema_2, "
+ "test_schema, public")
+ conn.dialect.default_schema_name = "test_schema_2"
+
+ conn.execute("""
+ CREATE TABLE test_schema.some_table (
+ id SERIAL not null primary key
+ )
+ """)
+
+ conn.execute("""
+ CREATE TABLE test_schema_2.some_other_table (
+ id SERIAL not null primary key,
+ sid INTEGER REFERENCES test_schema.some_table(id)
+ )
+ """)
+
+ m1 = MetaData()
+
+ t2_schema = Table('some_other_table',
+ m1,
+ schema="test_schema_2",
+ autoload=True,
+ autoload_with=conn)
+ t1_schema = Table('some_table',
+ m1,
+ schema="test_schema",
+ autoload=True,
+ autoload_with=conn)
+
+ t2_no_schema = Table('some_other_table',
+ m1,
+ autoload=True,
+ autoload_with=conn)
+
+ t1_no_schema = Table('some_table',
+ m1,
+ autoload=True,
+ autoload_with=conn)
+
+ # OK, this because, "test_schema" is
+ # in the search path, and might as well be
+ # the default too. why would we assign
+ # a "schema" to the Table ?
+ assert t2_schema.c.sid.references(
+ t1_no_schema.c.id)
+
+ assert t2_no_schema.c.sid.references(
+ t1_no_schema.c.id)
+
+ finally:
+ trans.rollback()
+ conn.close()
+ db.dispose()
+
+ @testing.provide_metadata
+ def test_index_reflection(self):
+ """ Reflecting partial & expression-based indexes should warn
+ """
+
+ metadata = self.metadata
+
+ t1 = Table('party', metadata, Column('id', String(10),
+ nullable=False), Column('name', String(20),
+ index=True), Column('aname', String(20)))
+ metadata.create_all()
+ testing.db.execute("""
+ create index idx1 on party ((id || name))
+ """)
+ testing.db.execute("""
+ create unique index idx2 on party (id) where name = 'test'
+ """)
+ testing.db.execute("""
+ create index idx3 on party using btree
+ (lower(name::text), lower(aname::text))
+ """)
+
+ def go():
+ m2 = MetaData(testing.db)
+ t2 = Table('party', m2, autoload=True)
+ assert len(t2.indexes) == 2
+
+ # Make sure indexes are in the order we expect them in
+
+ tmp = [(idx.name, idx) for idx in t2.indexes]
+ tmp.sort()
+ r1, r2 = [idx[1] for idx in tmp]
+ assert r1.name == 'idx2'
+ assert r1.unique == True
+ assert r2.unique == False
+ assert [t2.c.id] == r1.columns
+ assert [t2.c.name] == r2.columns
+
+ testing.assert_warnings(go,
+ [
+ 'Skipped unsupported reflection of '
+ 'expression-based index idx1',
+ 'Predicate of partial index idx2 ignored during '
+ 'reflection',
+ 'Skipped unsupported reflection of '
+ 'expression-based index idx3'
+ ])
+
+ @testing.provide_metadata
+ def test_index_reflection_modified(self):
+ """reflect indexes when a column name has changed - PG 9
+ does not update the name of the column in the index def.
+ [ticket:2141]
+
+ """
+
+ metadata = self.metadata
+
+ t1 = Table('t', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('x', Integer)
+ )
+ metadata.create_all()
+ conn = testing.db.connect().execution_options(autocommit=True)
+ conn.execute("CREATE INDEX idx1 ON t (x)")
+ conn.execute("ALTER TABLE t RENAME COLUMN x to y")
+
+ ind = testing.db.dialect.get_indexes(conn, "t", None)
+ eq_(ind, [{'unique': False, 'column_names': ['y'], 'name': 'idx1'}])
+ conn.close()
+
+class CustomTypeReflectionTest(fixtures.TestBase):
+
+ class CustomType(object):
+ def __init__(self, arg1=None, arg2=None):
+ self.arg1 = arg1
+ self.arg2 = arg2
+
+ ischema_names = None
+
+ def setup(self):
+ ischema_names = postgresql.PGDialect.ischema_names
+ postgresql.PGDialect.ischema_names = ischema_names.copy()
+ self.ischema_names = ischema_names
+
+ def teardown(self):
+ postgresql.PGDialect.ischema_names = self.ischema_names
+ self.ischema_names = None
+
+ def _assert_reflected(self, dialect):
+ for sch, args in [
+ ('my_custom_type', (None, None)),
+ ('my_custom_type()', (None, None)),
+ ('my_custom_type(ARG1)', ('ARG1', None)),
+ ('my_custom_type(ARG1, ARG2)', ('ARG1', 'ARG2')),
+ ]:
+ column_info = dialect._get_column_info(
+ 'colname', sch, None, False,
+ {}, {}, 'public')
+ assert isinstance(column_info['type'], self.CustomType)
+ eq_(column_info['type'].arg1, args[0])
+ eq_(column_info['type'].arg2, args[1])
+
+ def test_clslevel(self):
+ postgresql.PGDialect.ischema_names['my_custom_type'] = self.CustomType
+ dialect = postgresql.PGDialect()
+ self._assert_reflected(dialect)
+
+ def test_instancelevel(self):
+ dialect = postgresql.PGDialect()
+ dialect.ischema_names = dialect.ischema_names.copy()
+ dialect.ischema_names['my_custom_type'] = self.CustomType
+ self._assert_reflected(dialect)