diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-01-21 20:10:23 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-01-21 20:10:23 -0500 |
commit | 07fb90c6cc14de6d02cf4be592c57d56831f59f7 (patch) | |
tree | 050ef65db988559c60f7aa40f2d0bfe24947e548 /test/dialect/test_oracle.py | |
parent | 560fd1d5ed643a1b0f95296f3b840c1963bbe67f (diff) | |
parent | ee1f4d21037690ad996c5eacf7e1200e92f2fbaa (diff) | |
download | sqlalchemy-ticket_2501.tar.gz |
Merge branch 'master' into ticket_2501ticket_2501
Conflicts:
lib/sqlalchemy/orm/mapper.py
Diffstat (limited to 'test/dialect/test_oracle.py')
-rw-r--r-- | test/dialect/test_oracle.py | 635 |
1 files changed, 369 insertions, 266 deletions
diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index 71b2d96cb..8d0ff9776 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -18,7 +18,7 @@ from sqlalchemy.testing.schema import Table, Column import datetime import os from sqlalchemy import sql - +from sqlalchemy.testing.mock import Mock class OutParamTest(fixtures.TestBase, AssertsExecutionResults): __only_on__ = 'oracle+cx_oracle' @@ -26,31 +26,31 @@ class OutParamTest(fixtures.TestBase, AssertsExecutionResults): @classmethod def setup_class(cls): testing.db.execute(""" -create or replace procedure foo(x_in IN number, x_out OUT number, y_out OUT number, z_out OUT varchar) IS - retval number; - begin - retval := 6; - x_out := 10; - y_out := x_in * 15; - z_out := NULL; - end; + create or replace procedure foo(x_in IN number, x_out OUT number, + y_out OUT number, z_out OUT varchar) IS + retval number; + begin + retval := 6; + x_out := 10; + y_out := x_in * 15; + z_out := NULL; + end; """) def test_out_params(self): - result = \ - testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, ' + result = testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, ' ':z_out); end;', bindparams=[bindparam('x_in', Float), outparam('x_out', Integer), outparam('y_out', Float), outparam('z_out', String)]), x_in=5) - eq_(result.out_parameters, {'x_out': 10, 'y_out': 75, 'z_out' - : None}) + eq_(result.out_parameters, + {'x_out': 10, 'y_out': 75, 'z_out': None}) assert isinstance(result.out_parameters['x_out'], int) @classmethod def teardown_class(cls): - testing.db.execute("DROP PROCEDURE foo") + testing.db.execute("DROP PROCEDURE foo") class CXOracleArgsTest(fixtures.TestBase): __only_on__ = 'oracle+cx_oracle' @@ -92,7 +92,7 @@ class QuotedBindRoundTripTest(fixtures.TestBase): metadata.create_all() table.insert().execute( - {"option":1, "plain":1, "union":1} + {"option": 1, "plain": 1, "union": 1} ) eq_( testing.db.execute(table.select()).first(), @@ -106,8 +106,7 @@ class QuotedBindRoundTripTest(fixtures.TestBase): class CompileTest(fixtures.TestBase, AssertsCompiledSQL): - - __dialect__ = oracle.dialect() + __dialect__ = "oracle" #oracle.dialect() def test_true_false(self): self.assert_compile( @@ -218,6 +217,49 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): ':ROWNUM_1) WHERE ora_rn > :ora_rn_1 FOR ' 'UPDATE') + def test_for_update(self): + table1 = table('mytable', + column('myid'), column('name'), column('description')) + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(of=table1.c.myid), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF mytable.myid") + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(nowait=True), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT") + + self.assert_compile( + table1.select(table1.c.myid == 7). + with_for_update(nowait=True, of=table1.c.myid), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 " + "FOR UPDATE OF mytable.myid NOWAIT") + + self.assert_compile( + table1.select(table1.c.myid == 7). + with_for_update(nowait=True, of=[table1.c.myid, table1.c.name]), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF " + "mytable.myid, mytable.name NOWAIT") + + ta = table1.alias() + self.assert_compile( + ta.select(ta.c.myid == 7). + with_for_update(of=[ta.c.myid, ta.c.name]), + "SELECT mytable_1.myid, mytable_1.name, mytable_1.description " + "FROM mytable mytable_1 " + "WHERE mytable_1.myid = :myid_1 FOR UPDATE OF " + "mytable_1.myid, mytable_1.name" + ) + def test_limit_preserves_typing_information(self): class MyType(TypeDecorator): impl = Integer @@ -250,7 +292,7 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): def test_use_binds_for_limits_enabled(self): t = table('sometable', column('col1'), column('col2')) - dialect = oracle.OracleDialect(use_binds_for_limits = True) + dialect = oracle.OracleDialect(use_binds_for_limits=True) self.assert_compile(select([t]).limit(10), "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, " @@ -348,8 +390,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): ) query = select([table1, table2], or_(table1.c.name == 'fred', - table1.c.myid == 10, table2.c.othername != 'jack' - , 'EXISTS (select yay from foo where boo = lar)' + table1.c.myid == 10, table2.c.othername != 'jack', + 'EXISTS (select yay from foo where boo = lar)' ), from_obj=[outerjoin(table1, table2, table1.c.myid == table2.c.otherid)]) self.assert_compile(query, @@ -435,8 +477,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): 'mytable.description AS description FROM ' 'mytable LEFT OUTER JOIN myothertable ON ' 'mytable.myid = myothertable.otherid) ' - 'anon_1 ON thirdtable.userid = anon_1.myid' - , dialect=oracle.dialect(use_ansi=True)) + 'anon_1 ON thirdtable.userid = anon_1.myid', + dialect=oracle.dialect(use_ansi=True)) self.assert_compile(q, 'SELECT thirdtable.userid, ' @@ -549,7 +591,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): def test_returning_insert_labeled(self): t1 = table('t1', column('c1'), column('c2'), column('c3')) self.assert_compile( - t1.insert().values(c1=1).returning(t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')), + t1.insert().values(c1=1).returning( + t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')), "INSERT INTO t1 (c1) VALUES (:c1) RETURNING " "t1.c2, t1.c3 INTO :ret_0, :ret_1" ) @@ -587,33 +630,52 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): schema.CreateIndex(Index("bar", t1.c.x)), "CREATE INDEX alt_schema.bar ON alt_schema.foo (x)" ) + + def test_create_index_expr(self): + m = MetaData() + t1 = Table('foo', m, + Column('x', Integer) + ) + self.assert_compile( + schema.CreateIndex(Index("bar", t1.c.x > 5)), + "CREATE INDEX bar ON foo (x > 5)" + ) + class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL): - __only_on__ = 'oracle' - def test_ora8_flags(self): - def server_version_info(self): - return (8, 2, 5) + def _dialect(self, server_version, **kw): + def server_version_info(conn): + return server_version - dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) + dialect = oracle.dialect( + dbapi=Mock(version="0.0.0", paramstyle="named"), + **kw) dialect._get_server_version_info = server_version_info + dialect._check_unicode_returns = Mock() + dialect._check_unicode_description = Mock() + dialect._get_default_schema_name = Mock() + return dialect + + + def test_ora8_flags(self): + dialect = self._dialect((8, 2, 5)) # before connect, assume modern DB assert dialect._supports_char_length assert dialect._supports_nchar assert dialect.use_ansi - dialect.initialize(testing.db.connect()) + dialect.initialize(Mock()) assert not dialect.implicit_returning assert not dialect._supports_char_length assert not dialect._supports_nchar assert not dialect.use_ansi - self.assert_compile(String(50),"VARCHAR2(50)",dialect=dialect) - self.assert_compile(Unicode(50),"VARCHAR2(50)",dialect=dialect) - self.assert_compile(UnicodeText(),"CLOB",dialect=dialect) + self.assert_compile(String(50), "VARCHAR2(50)", dialect=dialect) + self.assert_compile(Unicode(50), "VARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "CLOB", dialect=dialect) - dialect = oracle.dialect(implicit_returning=True, - dbapi=testing.db.dialect.dbapi) - dialect._get_server_version_info = server_version_info + + dialect = self._dialect((8, 2, 5), implicit_returning=True) dialect.initialize(testing.db.connect()) assert dialect.implicit_returning @@ -621,26 +683,25 @@ class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL): def test_default_flags(self): """test with no initialization or server version info""" - dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) + dialect = self._dialect(None) + assert dialect._supports_char_length assert dialect._supports_nchar assert dialect.use_ansi - self.assert_compile(String(50),"VARCHAR2(50 CHAR)",dialect=dialect) - self.assert_compile(Unicode(50),"NVARCHAR2(50)",dialect=dialect) - self.assert_compile(UnicodeText(),"NCLOB",dialect=dialect) + self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect) + self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect) def test_ora10_flags(self): - def server_version_info(self): - return (10, 2, 5) - dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) - dialect._get_server_version_info = server_version_info - dialect.initialize(testing.db.connect()) + dialect = self._dialect((10, 2, 5)) + + dialect.initialize(Mock()) assert dialect._supports_char_length assert dialect._supports_nchar assert dialect.use_ansi - self.assert_compile(String(50),"VARCHAR2(50 CHAR)",dialect=dialect) - self.assert_compile(Unicode(50),"NVARCHAR2(50)",dialect=dialect) - self.assert_compile(UnicodeText(),"NCLOB",dialect=dialect) + self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect) + self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect) class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL): @@ -664,9 +725,18 @@ create table test_schema.child( parent_id integer references test_schema.parent(id) ); +create table local_table( + id integer primary key, + data varchar2(50) +); + create synonym test_schema.ptable for test_schema.parent; create synonym test_schema.ctable for test_schema.child; +create synonym test_schema_ptable for test_schema.parent; + +create synonym test_schema.local_table for local_table; + -- can't make a ref from local schema to the -- remote schema's table without this, -- *and* cant give yourself a grant ! @@ -682,15 +752,20 @@ grant references on test_schema.child to public; for stmt in """ drop table test_schema.child; drop table test_schema.parent; +drop table local_table; drop synonym test_schema.ctable; drop synonym test_schema.ptable; +drop synonym test_schema_ptable; +drop synonym test_schema.local_table; + """.split(";"): if stmt.strip(): testing.db.execute(stmt) + @testing.provide_metadata def test_create_same_names_explicit_schema(self): schema = testing.db.dialect.default_schema_name - meta = MetaData(testing.db) + meta = self.metadata parent = Table('parent', meta, Column('pid', Integer, primary_key=True), schema=schema @@ -701,15 +776,31 @@ drop synonym test_schema.ptable; schema=schema ) meta.create_all() - try: - parent.insert().execute({'pid':1}) - child.insert().execute({'cid':1, 'pid':1}) - eq_(child.select().execute().fetchall(), [(1, 1)]) - finally: - meta.drop_all() + parent.insert().execute({'pid': 1}) + child.insert().execute({'cid': 1, 'pid': 1}) + eq_(child.select().execute().fetchall(), [(1, 1)]) - def test_create_same_names_implicit_schema(self): + def test_reflect_alt_table_owner_local_synonym(self): meta = MetaData(testing.db) + parent = Table('test_schema_ptable', meta, autoload=True, + oracle_resolve_synonyms=True) + self.assert_compile(parent.select(), + "SELECT test_schema_ptable.id, " + "test_schema_ptable.data FROM test_schema_ptable") + select([parent]).execute().fetchall() + + def test_reflect_alt_synonym_owner_local_table(self): + meta = MetaData(testing.db) + parent = Table('local_table', meta, autoload=True, + oracle_resolve_synonyms=True, schema="test_schema") + self.assert_compile(parent.select(), + "SELECT test_schema.local_table.id, " + "test_schema.local_table.data FROM test_schema.local_table") + select([parent]).execute().fetchall() + + @testing.provide_metadata + def test_create_same_names_implicit_schema(self): + meta = self.metadata parent = Table('parent', meta, Column('pid', Integer, primary_key=True), ) @@ -718,12 +809,9 @@ drop synonym test_schema.ptable; Column('pid', Integer, ForeignKey('parent.pid')), ) meta.create_all() - try: - parent.insert().execute({'pid':1}) - child.insert().execute({'cid':1, 'pid':1}) - eq_(child.select().execute().fetchall(), [(1, 1)]) - finally: - meta.drop_all() + parent.insert().execute({'pid': 1}) + child.insert().execute({'cid': 1, 'pid': 1}) + eq_(child.select().execute().fetchall(), [(1, 1)]) def test_reflect_alt_owner_explicit(self): @@ -911,10 +999,17 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL): dbapi = FakeDBAPI() b = bindparam("foo", "hello world!") - assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING' + eq_( + b.type.dialect_impl(dialect).get_dbapi_type(dbapi), + 'STRING' + ) b = bindparam("foo", "hello world!") - assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING' + eq_( + b.type.dialect_impl(dialect).get_dbapi_type(dbapi), + 'STRING' + ) + def test_long(self): self.assert_compile(oracle.LONG(), "LONG") @@ -943,14 +1038,14 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile(oracle.RAW(35), "RAW(35)") def test_char_length(self): - self.assert_compile(VARCHAR(50),"VARCHAR(50 CHAR)") + self.assert_compile(VARCHAR(50), "VARCHAR(50 CHAR)") oracle8dialect = oracle.dialect() oracle8dialect.server_version_info = (8, 0) - self.assert_compile(VARCHAR(50),"VARCHAR(50)",dialect=oracle8dialect) + self.assert_compile(VARCHAR(50), "VARCHAR(50)", dialect=oracle8dialect) - self.assert_compile(NVARCHAR(50),"NVARCHAR2(50)") - self.assert_compile(CHAR(50),"CHAR(50)") + self.assert_compile(NVARCHAR(50), "NVARCHAR2(50)") + self.assert_compile(CHAR(50), "CHAR(50)") def test_varchar_types(self): dialect = oracle.dialect() @@ -961,6 +1056,12 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL): (VARCHAR(50), "VARCHAR(50 CHAR)"), (oracle.NVARCHAR2(50), "NVARCHAR2(50)"), (oracle.VARCHAR2(50), "VARCHAR2(50 CHAR)"), + (String(), "VARCHAR2"), + (Unicode(), "NVARCHAR2"), + (NVARCHAR(), "NVARCHAR2"), + (VARCHAR(), "VARCHAR"), + (oracle.NVARCHAR2(), "NVARCHAR2"), + (oracle.VARCHAR2(), "VARCHAR2"), ]: self.assert_compile(typ, exp, dialect=dialect) @@ -998,36 +1099,36 @@ class TypesTest(fixtures.TestBase): dict(id=3, data="value 3") ) - eq_(t.select().where(t.c.data=='value 2').execute().fetchall(), + eq_( + t.select().where(t.c.data == 'value 2').execute().fetchall(), [(2, 'value 2 ')] - ) + ) m2 = MetaData(testing.db) t2 = Table('t1', m2, autoload=True) assert type(t2.c.data.type) is CHAR - eq_(t2.select().where(t2.c.data=='value 2').execute().fetchall(), + eq_( + t2.select().where(t2.c.data == 'value 2').execute().fetchall(), [(2, 'value 2 ')] - ) + ) finally: t.drop() @testing.requires.returning + @testing.provide_metadata def test_int_not_float(self): - m = MetaData(testing.db) + m = self.metadata t1 = Table('t1', m, Column('foo', Integer)) t1.create() - try: - r = t1.insert().values(foo=5).returning(t1.c.foo).execute() - x = r.scalar() - assert x == 5 - assert isinstance(x, int) - - x = t1.select().scalar() - assert x == 5 - assert isinstance(x, int) - finally: - t1.drop() + r = t1.insert().values(foo=5).returning(t1.c.foo).execute() + x = r.scalar() + assert x == 5 + assert isinstance(x, int) + + x = t1.select().scalar() + assert x == 5 + assert isinstance(x, int) @testing.provide_metadata def test_rowid(self): @@ -1044,7 +1145,7 @@ class TypesTest(fixtures.TestBase): # the ROWID type is not really needed here, # as cx_oracle just treats it as a string, # but we want to make sure the ROWID works... - rowid_col= column('rowid', oracle.ROWID) + rowid_col = column('rowid', oracle.ROWID) s3 = select([t.c.x, rowid_col]).\ where(rowid_col == cast(rowid, oracle.ROWID)) eq_(s3.select().execute().fetchall(), @@ -1070,8 +1171,9 @@ class TypesTest(fixtures.TestBase): eq_(row['day_interval'], datetime.timedelta(days=35, seconds=5743)) + @testing.provide_metadata def test_numerics(self): - m = MetaData(testing.db) + m = self.metadata t1 = Table('t1', m, Column('intcol', Integer), Column('numericcol', Numeric(precision=9, scale=2)), @@ -1084,41 +1186,38 @@ class TypesTest(fixtures.TestBase): ) t1.create() - try: - t1.insert().execute( - intcol=1, - numericcol=5.2, - floatcol1=6.5, - floatcol2 = 8.5, - doubleprec = 9.5, - numbercol1=12, - numbercol2=14.85, - numbercol3=15.76 - ) - - m2 = MetaData(testing.db) - t2 = Table('t1', m2, autoload=True) + t1.insert().execute( + intcol=1, + numericcol=5.2, + floatcol1=6.5, + floatcol2=8.5, + doubleprec=9.5, + numbercol1=12, + numbercol2=14.85, + numbercol3=15.76 + ) - for row in ( - t1.select().execute().first(), - t2.select().execute().first() - ): - for i, (val, type_) in enumerate(( - (1, int), - (decimal.Decimal("5.2"), decimal.Decimal), - (6.5, float), - (8.5, float), - (9.5, float), - (12, int), - (decimal.Decimal("14.85"), decimal.Decimal), - (15.76, float), - )): - eq_(row[i], val) - assert isinstance(row[i], type_), '%r is not %r' \ - % (row[i], type_) + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + + for row in ( + t1.select().execute().first(), + t2.select().execute().first() + ): + for i, (val, type_) in enumerate(( + (1, int), + (decimal.Decimal("5.2"), decimal.Decimal), + (6.5, float), + (8.5, float), + (9.5, float), + (12, int), + (decimal.Decimal("14.85"), decimal.Decimal), + (15.76, float), + )): + eq_(row[i], val) + assert isinstance(row[i], type_), '%r is not %r' \ + % (row[i], type_) - finally: - t1.drop() def test_numeric_no_decimal_mode(self): @@ -1150,28 +1249,26 @@ class TypesTest(fixtures.TestBase): ) foo.create() - foo.insert().execute( - {'idata':5, 'ndata':decimal.Decimal("45.6"), - 'ndata2':decimal.Decimal("45.0"), - 'nidata':decimal.Decimal('53'), 'fdata':45.68392}, - ) + foo.insert().execute({ + 'idata': 5, + 'ndata': decimal.Decimal("45.6"), + 'ndata2': decimal.Decimal("45.0"), + 'nidata': decimal.Decimal('53'), + 'fdata': 45.68392 + }) - stmt = """ - SELECT - idata, - ndata, - ndata2, - nidata, - fdata - FROM foo - """ + stmt = "SELECT idata, ndata, ndata2, nidata, fdata FROM foo" row = testing.db.execute(stmt).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, int, float]) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, int, float] + ) eq_( row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), 53, 45.683920000000001) + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + 53, 45.683920000000001) ) # with a nested subquery, @@ -1195,7 +1292,10 @@ class TypesTest(fixtures.TestBase): FROM dual """ row = testing.db.execute(stmt).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, int, int, decimal.Decimal]) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, int, int, decimal.Decimal] + ) eq_( row, (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')) @@ -1203,15 +1303,20 @@ class TypesTest(fixtures.TestBase): row = testing.db.execute(text(stmt, typemap={ - 'idata':Integer(), - 'ndata':Numeric(20, 2), - 'ndata2':Numeric(20, 2), - 'nidata':Numeric(5, 0), - 'fdata':Float() + 'idata': Integer(), + 'ndata': Numeric(20, 2), + 'ndata2': Numeric(20, 2), + 'nidata': Numeric(5, 0), + 'fdata': Float() })).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]) - eq_(row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), decimal.Decimal('53'), 45.683920000000001) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + decimal.Decimal('53'), 45.683920000000001) ) stmt = """ @@ -1237,39 +1342,55 @@ class TypesTest(fixtures.TestBase): ) WHERE ROWNUM >= 0) anon_1 """ - row =testing.db.execute(stmt).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, int, int, decimal.Decimal]) - eq_(row, (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392'))) + row = testing.db.execute(stmt).fetchall()[0] + eq_( + [type(x) for x in row], + [int, decimal.Decimal, int, int, decimal.Decimal] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')) + ) row = testing.db.execute(text(stmt, typemap={ - 'anon_1_idata':Integer(), - 'anon_1_ndata':Numeric(20, 2), - 'anon_1_ndata2':Numeric(20, 2), - 'anon_1_nidata':Numeric(5, 0), - 'anon_1_fdata':Float() + 'anon_1_idata': Integer(), + 'anon_1_ndata': Numeric(20, 2), + 'anon_1_ndata2': Numeric(20, 2), + 'anon_1_nidata': Numeric(5, 0), + 'anon_1_fdata': Float() })).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]) - eq_(row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), decimal.Decimal('53'), 45.683920000000001) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + decimal.Decimal('53'), 45.683920000000001) ) row = testing.db.execute(text(stmt, typemap={ - 'anon_1_idata':Integer(), - 'anon_1_ndata':Numeric(20, 2, asdecimal=False), - 'anon_1_ndata2':Numeric(20, 2, asdecimal=False), - 'anon_1_nidata':Numeric(5, 0, asdecimal=False), - 'anon_1_fdata':Float(asdecimal=True) + 'anon_1_idata': Integer(), + 'anon_1_ndata': Numeric(20, 2, asdecimal=False), + 'anon_1_ndata2': Numeric(20, 2, asdecimal=False), + 'anon_1_nidata': Numeric(5, 0, asdecimal=False), + 'anon_1_fdata': Float(asdecimal=True) })).fetchall()[0] - eq_([type(x) for x in row], [int, float, float, float, decimal.Decimal]) - eq_(row, + eq_( + [type(x) for x in row], + [int, float, float, float, decimal.Decimal] + ) + eq_( + row, (5, 45.6, 45, 53, decimal.Decimal('45.68392')) ) + @testing.provide_metadata def test_reflect_dates(self): - metadata = MetaData(testing.db) + metadata = self.metadata Table( "date_types", metadata, Column('d1', DATE), @@ -1278,20 +1399,16 @@ class TypesTest(fixtures.TestBase): Column('d4', oracle.INTERVAL(second_precision=5)), ) metadata.create_all() - try: - m = MetaData(testing.db) - t1 = Table( - "date_types", m, - autoload=True) - assert isinstance(t1.c.d1.type, DATE) - assert isinstance(t1.c.d2.type, TIMESTAMP) - assert not t1.c.d2.type.timezone - assert isinstance(t1.c.d3.type, TIMESTAMP) - assert t1.c.d3.type.timezone - assert isinstance(t1.c.d4.type, oracle.INTERVAL) - - finally: - metadata.drop_all() + m = MetaData(testing.db) + t1 = Table( + "date_types", m, + autoload=True) + assert isinstance(t1.c.d1.type, DATE) + assert isinstance(t1.c.d2.type, TIMESTAMP) + assert not t1.c.d2.type.timezone + assert isinstance(t1.c.d3.type, TIMESTAMP) + assert t1.c.d3.type.timezone + assert isinstance(t1.c.d4.type, oracle.INTERVAL) def test_reflect_all_types_schema(self): types_table = Table('all_types', MetaData(testing.db), @@ -1319,7 +1436,7 @@ class TypesTest(fixtures.TestBase): @testing.provide_metadata def test_reflect_nvarchar(self): metadata = self.metadata - t = Table('t', metadata, + Table('t', metadata, Column('data', sqltypes.NVARCHAR(255)) ) metadata.create_all() @@ -1341,22 +1458,20 @@ class TypesTest(fixtures.TestBase): assert isinstance(res, util.text_type) + @testing.provide_metadata def test_char_length(self): - metadata = MetaData(testing.db) + metadata = self.metadata t1 = Table('t1', metadata, Column("c1", VARCHAR(50)), Column("c2", NVARCHAR(250)), Column("c3", CHAR(200)) ) t1.create() - try: - m2 = MetaData(testing.db) - t2 = Table('t1', m2, autoload=True) - eq_(t2.c.c1.type.length, 50) - eq_(t2.c.c2.type.length, 250) - eq_(t2.c.c3.type.length, 200) - finally: - t1.drop() + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + eq_(t2.c.c1.type.length, 50) + eq_(t2.c.c2.type.length, 250) + eq_(t2.c.c3.type.length, 200) @testing.provide_metadata def test_long_type(self): @@ -1372,8 +1487,6 @@ class TypesTest(fixtures.TestBase): "xyz" ) - - def test_longstring(self): metadata = MetaData(testing.db) testing.db.execute(""" @@ -1424,15 +1537,16 @@ class EuroNumericTest(fixtures.TestBase): del os.environ['NLS_LANG'] self.engine.dispose() - @testing.provide_metadata def test_output_type_handler(self): - metadata = self.metadata for stmt, exp, kw in [ ("SELECT 0.1 FROM DUAL", decimal.Decimal("0.1"), {}), ("SELECT 15 FROM DUAL", 15, {}), - ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", decimal.Decimal("15"), {}), - ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", decimal.Decimal("0.1"), {}), - ("SELECT :num FROM DUAL", decimal.Decimal("2.5"), {'num':decimal.Decimal("2.5")}) + ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", + decimal.Decimal("15"), {}), + ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", + decimal.Decimal("0.1"), {}), + ("SELECT :num FROM DUAL", decimal.Decimal("2.5"), + {'num': decimal.Decimal("2.5")}) ]: test_exp = self.engine.scalar(stmt, **kw) eq_( @@ -1513,97 +1627,86 @@ class BufferedColumnTest(fixtures.TestBase, AssertsCompiledSQL): class UnsupportedIndexReflectTest(fixtures.TestBase): __only_on__ = 'oracle' - def setup(self): - global metadata - metadata = MetaData(testing.db) - t1 = Table('test_index_reflect', metadata, + @testing.emits_warning("No column names") + @testing.provide_metadata + def test_reflect_functional_index(self): + metadata = self.metadata + Table('test_index_reflect', metadata, Column('data', String(20), primary_key=True) ) metadata.create_all() - def teardown(self): - metadata.drop_all() - - @testing.emits_warning("No column names") - def test_reflect_functional_index(self): testing.db.execute('CREATE INDEX DATA_IDX ON ' 'TEST_INDEX_REFLECT (UPPER(DATA))') m2 = MetaData(testing.db) - t2 = Table('test_index_reflect', m2, autoload=True) + Table('test_index_reflect', m2, autoload=True) class RoundTripIndexTest(fixtures.TestBase): __only_on__ = 'oracle' + @testing.provide_metadata def test_basic(self): - engine = testing.db - metadata = MetaData(engine) + metadata = self.metadata - table=Table("sometable", metadata, + table = Table("sometable", metadata, Column("id_a", Unicode(255), primary_key=True), Column("id_b", Unicode(255), primary_key=True, unique=True), Column("group", Unicode(255), primary_key=True), Column("col", Unicode(255)), - UniqueConstraint('col','group'), + UniqueConstraint('col', 'group'), ) # "group" is a keyword, so lower case normalind = Index('tableind', table.c.id_b, table.c.group) - # create metadata.create_all() - try: - # round trip, create from reflection - mirror = MetaData(engine) - mirror.reflect() - metadata.drop_all() - mirror.create_all() - - # inspect the reflected creation - inspect = MetaData(engine) - inspect.reflect() - - def obj_definition(obj): - return obj.__class__, tuple([c.name for c in - obj.columns]), getattr(obj, 'unique', None) - - # find what the primary k constraint name should be - primaryconsname = engine.execute( - text("""SELECT constraint_name - FROM all_constraints - WHERE table_name = :table_name - AND owner = :owner - AND constraint_type = 'P' """), - table_name=table.name.upper(), - owner=engine.url.username.upper()).fetchall()[0][0] - - reflectedtable = inspect.tables[table.name] - - # make a dictionary of the reflected objects: - - reflected = dict([(obj_definition(i), i) for i in - reflectedtable.indexes - | reflectedtable.constraints]) - - # assert we got primary key constraint and its name, Error - # if not in dict - - assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b', - 'group'), None)].name.upper() \ - == primaryconsname.upper() - - # Error if not in dict - - assert reflected[(Index, ('id_b', 'group'), False)].name \ - == normalind.name - assert (Index, ('id_b', ), True) in reflected - assert (Index, ('col', 'group'), True) in reflected - assert len(reflectedtable.constraints) == 1 - assert len(reflectedtable.indexes) == 3 + mirror = MetaData(testing.db) + mirror.reflect() + metadata.drop_all() + mirror.create_all() - finally: - metadata.drop_all() + inspect = MetaData(testing.db) + inspect.reflect() + def obj_definition(obj): + return obj.__class__, tuple([c.name for c in + obj.columns]), getattr(obj, 'unique', None) + # find what the primary k constraint name should be + primaryconsname = testing.db.execute( + text("""SELECT constraint_name + FROM all_constraints + WHERE table_name = :table_name + AND owner = :owner + AND constraint_type = 'P' """), + table_name=table.name.upper(), + owner=testing.db.url.username.upper()).fetchall()[0][0] + + reflectedtable = inspect.tables[table.name] + + # make a dictionary of the reflected objects: + + reflected = dict([(obj_definition(i), i) for i in + reflectedtable.indexes + | reflectedtable.constraints]) + + # assert we got primary key constraint and its name, Error + # if not in dict + + assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b', + 'group'), None)].name.upper() \ + == primaryconsname.upper() + + # Error if not in dict + + eq_( + reflected[(Index, ('id_b', 'group'), False)].name, + normalind.name + ) + assert (Index, ('id_b', ), True) in reflected + assert (Index, ('col', 'group'), True) in reflected + eq_(len(reflectedtable.constraints), 1) + eq_(len(reflectedtable.indexes), 3) class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): @@ -1650,11 +1753,11 @@ class ExecuteTest(fixtures.TestBase): metadata.create_all() t.insert().execute( - {'id':1, 'data':1}, - {'id':2, 'data':7}, - {'id':3, 'data':12}, - {'id':4, 'data':15}, - {'id':5, 'data':32}, + {'id': 1, 'data': 1}, + {'id': 2, 'data': 7}, + {'id': 3, 'data': 12}, + {'id': 4, 'data': 15}, + {'id': 5, 'data': 32}, ) # here, we can't use ORDER BY. @@ -1679,7 +1782,7 @@ class UnicodeSchemaTest(fixtures.TestBase): @testing.provide_metadata def test_quoted_column_non_unicode(self): metadata = self.metadata - table=Table("atable", metadata, + table = Table("atable", metadata, Column("_underscorecolumn", Unicode(255), primary_key=True), ) metadata.create_all() @@ -1688,14 +1791,14 @@ class UnicodeSchemaTest(fixtures.TestBase): {'_underscorecolumn': u('’é')}, ) result = testing.db.execute( - table.select().where(table.c._underscorecolumn==u('’é')) + table.select().where(table.c._underscorecolumn == u('’é')) ).scalar() eq_(result, u('’é')) @testing.provide_metadata def test_quoted_column_unicode(self): metadata = self.metadata - table=Table("atable", metadata, + table = Table("atable", metadata, Column(u("méil"), Unicode(255), primary_key=True), ) metadata.create_all() |