From 9bc9d5c1068be878118202259add3c2e1bcec0cb Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sat, 12 Oct 2013 20:04:55 -0400 Subject: - Fixed bug in default compiler plus those of postgresql, mysql, and mssql to ensure that any literal SQL expression values are rendered directly as literals, instead of as bound parameters, within a CREATE INDEX statement. [ticket:2742] - don't need expression_as_ddl(); literal_binds and include_table take care of this functionality. --- test/dialect/test_oracle.py | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'test/dialect/test_oracle.py') diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index 71b2d96cb..1fde1c198 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -587,6 +587,17 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): schema.CreateIndex(Index("bar", t1.c.x)), "CREATE INDEX alt_schema.bar ON alt_schema.foo (x)" ) + + def test_create_index_expr(self): + m = MetaData() + t1 = Table('foo', m, + Column('x', Integer) + ) + self.assert_compile( + schema.CreateIndex(Index("bar", t1.c.x > 5)), + "CREATE INDEX bar ON foo (x > 5)" + ) + class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL): __only_on__ = 'oracle' -- cgit v1.2.1 From 5070c81ab963c1432bbbecf38d4cad7ac7b81652 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Fri, 25 Oct 2013 19:11:53 -0400 Subject: - Fixed bug where Oracle table reflection using synonyms would fail if the synonym and the table were in different remote schemas. Patch to fix courtesy Kyle Derr. [ticket:2853] --- test/dialect/test_oracle.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) (limited to 'test/dialect/test_oracle.py') diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index 1fde1c198..d843a2d9e 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -675,9 +675,18 @@ create table test_schema.child( parent_id integer references test_schema.parent(id) ); +create table local_table( + id integer primary key, + data varchar2(50) +); + create synonym test_schema.ptable for test_schema.parent; create synonym test_schema.ctable for test_schema.child; +create synonym test_schema_ptable for test_schema.parent; + +create synonym test_schema.local_table for local_table; + -- can't make a ref from local schema to the -- remote schema's table without this, -- *and* cant give yourself a grant ! @@ -693,8 +702,12 @@ grant references on test_schema.child to public; for stmt in """ drop table test_schema.child; drop table test_schema.parent; +drop table local_table; drop synonym test_schema.ctable; drop synonym test_schema.ptable; +drop synonym test_schema_ptable; +drop synonym test_schema.local_table; + """.split(";"): if stmt.strip(): testing.db.execute(stmt) @@ -719,6 +732,22 @@ drop synonym test_schema.ptable; finally: meta.drop_all() + def test_reflect_alt_table_owner_local_synonym(self): + meta = MetaData(testing.db) + parent = Table('test_schema_ptable', meta, autoload=True, oracle_resolve_synonyms=True) + self.assert_compile(parent.select(), + "SELECT test_schema_ptable.id, " + "test_schema_ptable.data FROM test_schema_ptable") + select([parent]).execute().fetchall() + + def test_reflect_alt_synonym_owner_local_table(self): + meta = MetaData(testing.db) + parent = Table('local_table', meta, autoload=True, oracle_resolve_synonyms=True, schema="test_schema") + self.assert_compile(parent.select(), + "SELECT test_schema.local_table.id, " + "test_schema.local_table.data FROM test_schema.local_table") + select([parent]).execute().fetchall() + def test_create_same_names_implicit_schema(self): meta = MetaData(testing.db) parent = Table('parent', meta, -- cgit v1.2.1 From 6661cba88d14ecba88ae905e1852bac3f084966d Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Fri, 22 Nov 2013 17:48:55 -0500 Subject: - cleanup --- test/dialect/test_oracle.py | 548 +++++++++++++++++++++++--------------------- 1 file changed, 281 insertions(+), 267 deletions(-) (limited to 'test/dialect/test_oracle.py') diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index d843a2d9e..36dae9e90 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -18,7 +18,7 @@ from sqlalchemy.testing.schema import Table, Column import datetime import os from sqlalchemy import sql - +from sqlalchemy.testing.mock import Mock class OutParamTest(fixtures.TestBase, AssertsExecutionResults): __only_on__ = 'oracle+cx_oracle' @@ -26,31 +26,31 @@ class OutParamTest(fixtures.TestBase, AssertsExecutionResults): @classmethod def setup_class(cls): testing.db.execute(""" -create or replace procedure foo(x_in IN number, x_out OUT number, y_out OUT number, z_out OUT varchar) IS - retval number; - begin - retval := 6; - x_out := 10; - y_out := x_in * 15; - z_out := NULL; - end; + create or replace procedure foo(x_in IN number, x_out OUT number, + y_out OUT number, z_out OUT varchar) IS + retval number; + begin + retval := 6; + x_out := 10; + y_out := x_in * 15; + z_out := NULL; + end; """) def test_out_params(self): - result = \ - testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, ' + result = testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, ' ':z_out); end;', bindparams=[bindparam('x_in', Float), outparam('x_out', Integer), outparam('y_out', Float), outparam('z_out', String)]), x_in=5) - eq_(result.out_parameters, {'x_out': 10, 'y_out': 75, 'z_out' - : None}) + eq_(result.out_parameters, + {'x_out': 10, 'y_out': 75, 'z_out': None}) assert isinstance(result.out_parameters['x_out'], int) @classmethod def teardown_class(cls): - testing.db.execute("DROP PROCEDURE foo") + testing.db.execute("DROP PROCEDURE foo") class CXOracleArgsTest(fixtures.TestBase): __only_on__ = 'oracle+cx_oracle' @@ -92,7 +92,7 @@ class QuotedBindRoundTripTest(fixtures.TestBase): metadata.create_all() table.insert().execute( - {"option":1, "plain":1, "union":1} + {"option": 1, "plain": 1, "union": 1} ) eq_( testing.db.execute(table.select()).first(), @@ -106,7 +106,6 @@ class QuotedBindRoundTripTest(fixtures.TestBase): class CompileTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = oracle.dialect() def test_true_false(self): @@ -250,7 +249,7 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): def test_use_binds_for_limits_enabled(self): t = table('sometable', column('col1'), column('col2')) - dialect = oracle.OracleDialect(use_binds_for_limits = True) + dialect = oracle.OracleDialect(use_binds_for_limits=True) self.assert_compile(select([t]).limit(10), "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, " @@ -348,8 +347,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): ) query = select([table1, table2], or_(table1.c.name == 'fred', - table1.c.myid == 10, table2.c.othername != 'jack' - , 'EXISTS (select yay from foo where boo = lar)' + table1.c.myid == 10, table2.c.othername != 'jack', + 'EXISTS (select yay from foo where boo = lar)' ), from_obj=[outerjoin(table1, table2, table1.c.myid == table2.c.otherid)]) self.assert_compile(query, @@ -435,8 +434,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): 'mytable.description AS description FROM ' 'mytable LEFT OUTER JOIN myothertable ON ' 'mytable.myid = myothertable.otherid) ' - 'anon_1 ON thirdtable.userid = anon_1.myid' - , dialect=oracle.dialect(use_ansi=True)) + 'anon_1 ON thirdtable.userid = anon_1.myid', + dialect=oracle.dialect(use_ansi=True)) self.assert_compile(q, 'SELECT thirdtable.userid, ' @@ -549,7 +548,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): def test_returning_insert_labeled(self): t1 = table('t1', column('c1'), column('c2'), column('c3')) self.assert_compile( - t1.insert().values(c1=1).returning(t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')), + t1.insert().values(c1=1).returning( + t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')), "INSERT INTO t1 (c1) VALUES (:c1) RETURNING " "t1.c2, t1.c3 INTO :ret_0, :ret_1" ) @@ -599,32 +599,40 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): ) class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL): - __only_on__ = 'oracle' - def test_ora8_flags(self): - def server_version_info(self): - return (8, 2, 5) + def _dialect(self, server_version, **kw): + def server_version_info(conn): + return server_version - dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) + dialect = oracle.dialect( + dbapi=Mock(version="0.0.0", paramstyle="named"), + **kw) dialect._get_server_version_info = server_version_info + dialect._check_unicode_returns = Mock() + dialect._check_unicode_description = Mock() + dialect._get_default_schema_name = Mock() + return dialect + + + def test_ora8_flags(self): + dialect = self._dialect((8, 2, 5)) # before connect, assume modern DB assert dialect._supports_char_length assert dialect._supports_nchar assert dialect.use_ansi - dialect.initialize(testing.db.connect()) + dialect.initialize(Mock()) assert not dialect.implicit_returning assert not dialect._supports_char_length assert not dialect._supports_nchar assert not dialect.use_ansi - self.assert_compile(String(50),"VARCHAR2(50)",dialect=dialect) - self.assert_compile(Unicode(50),"VARCHAR2(50)",dialect=dialect) - self.assert_compile(UnicodeText(),"CLOB",dialect=dialect) + self.assert_compile(String(50), "VARCHAR2(50)", dialect=dialect) + self.assert_compile(Unicode(50), "VARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "CLOB", dialect=dialect) - dialect = oracle.dialect(implicit_returning=True, - dbapi=testing.db.dialect.dbapi) - dialect._get_server_version_info = server_version_info + + dialect = self._dialect((8, 2, 5), implicit_returning=True) dialect.initialize(testing.db.connect()) assert dialect.implicit_returning @@ -632,26 +640,25 @@ class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL): def test_default_flags(self): """test with no initialization or server version info""" - dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) + dialect = self._dialect(None) + assert dialect._supports_char_length assert dialect._supports_nchar assert dialect.use_ansi - self.assert_compile(String(50),"VARCHAR2(50 CHAR)",dialect=dialect) - self.assert_compile(Unicode(50),"NVARCHAR2(50)",dialect=dialect) - self.assert_compile(UnicodeText(),"NCLOB",dialect=dialect) + self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect) + self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect) def test_ora10_flags(self): - def server_version_info(self): - return (10, 2, 5) - dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) - dialect._get_server_version_info = server_version_info - dialect.initialize(testing.db.connect()) + dialect = self._dialect((10, 2, 5)) + + dialect.initialize(Mock()) assert dialect._supports_char_length assert dialect._supports_nchar assert dialect.use_ansi - self.assert_compile(String(50),"VARCHAR2(50 CHAR)",dialect=dialect) - self.assert_compile(Unicode(50),"NVARCHAR2(50)",dialect=dialect) - self.assert_compile(UnicodeText(),"NCLOB",dialect=dialect) + self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect) + self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect) class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL): @@ -712,9 +719,10 @@ drop synonym test_schema.local_table; if stmt.strip(): testing.db.execute(stmt) + @testing.provide_metadata def test_create_same_names_explicit_schema(self): schema = testing.db.dialect.default_schema_name - meta = MetaData(testing.db) + meta = self.metadata parent = Table('parent', meta, Column('pid', Integer, primary_key=True), schema=schema @@ -725,16 +733,14 @@ drop synonym test_schema.local_table; schema=schema ) meta.create_all() - try: - parent.insert().execute({'pid':1}) - child.insert().execute({'cid':1, 'pid':1}) - eq_(child.select().execute().fetchall(), [(1, 1)]) - finally: - meta.drop_all() + parent.insert().execute({'pid': 1}) + child.insert().execute({'cid': 1, 'pid': 1}) + eq_(child.select().execute().fetchall(), [(1, 1)]) def test_reflect_alt_table_owner_local_synonym(self): meta = MetaData(testing.db) - parent = Table('test_schema_ptable', meta, autoload=True, oracle_resolve_synonyms=True) + parent = Table('test_schema_ptable', meta, autoload=True, + oracle_resolve_synonyms=True) self.assert_compile(parent.select(), "SELECT test_schema_ptable.id, " "test_schema_ptable.data FROM test_schema_ptable") @@ -742,14 +748,16 @@ drop synonym test_schema.local_table; def test_reflect_alt_synonym_owner_local_table(self): meta = MetaData(testing.db) - parent = Table('local_table', meta, autoload=True, oracle_resolve_synonyms=True, schema="test_schema") + parent = Table('local_table', meta, autoload=True, + oracle_resolve_synonyms=True, schema="test_schema") self.assert_compile(parent.select(), "SELECT test_schema.local_table.id, " "test_schema.local_table.data FROM test_schema.local_table") select([parent]).execute().fetchall() + @testing.provide_metadata def test_create_same_names_implicit_schema(self): - meta = MetaData(testing.db) + meta = self.metadata parent = Table('parent', meta, Column('pid', Integer, primary_key=True), ) @@ -758,12 +766,9 @@ drop synonym test_schema.local_table; Column('pid', Integer, ForeignKey('parent.pid')), ) meta.create_all() - try: - parent.insert().execute({'pid':1}) - child.insert().execute({'cid':1, 'pid':1}) - eq_(child.select().execute().fetchall(), [(1, 1)]) - finally: - meta.drop_all() + parent.insert().execute({'pid': 1}) + child.insert().execute({'cid': 1, 'pid': 1}) + eq_(child.select().execute().fetchall(), [(1, 1)]) def test_reflect_alt_owner_explicit(self): @@ -951,10 +956,17 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL): dbapi = FakeDBAPI() b = bindparam("foo", "hello world!") - assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING' + eq_( + b.type.dialect_impl(dialect).get_dbapi_type(dbapi), + 'STRING' + ) b = bindparam("foo", "hello world!") - assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING' + eq_( + b.type.dialect_impl(dialect).get_dbapi_type(dbapi), + 'STRING' + ) + def test_long(self): self.assert_compile(oracle.LONG(), "LONG") @@ -983,14 +995,14 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile(oracle.RAW(35), "RAW(35)") def test_char_length(self): - self.assert_compile(VARCHAR(50),"VARCHAR(50 CHAR)") + self.assert_compile(VARCHAR(50), "VARCHAR(50 CHAR)") oracle8dialect = oracle.dialect() oracle8dialect.server_version_info = (8, 0) - self.assert_compile(VARCHAR(50),"VARCHAR(50)",dialect=oracle8dialect) + self.assert_compile(VARCHAR(50), "VARCHAR(50)", dialect=oracle8dialect) - self.assert_compile(NVARCHAR(50),"NVARCHAR2(50)") - self.assert_compile(CHAR(50),"CHAR(50)") + self.assert_compile(NVARCHAR(50), "NVARCHAR2(50)") + self.assert_compile(CHAR(50), "CHAR(50)") def test_varchar_types(self): dialect = oracle.dialect() @@ -1038,36 +1050,36 @@ class TypesTest(fixtures.TestBase): dict(id=3, data="value 3") ) - eq_(t.select().where(t.c.data=='value 2').execute().fetchall(), + eq_( + t.select().where(t.c.data == 'value 2').execute().fetchall(), [(2, 'value 2 ')] - ) + ) m2 = MetaData(testing.db) t2 = Table('t1', m2, autoload=True) assert type(t2.c.data.type) is CHAR - eq_(t2.select().where(t2.c.data=='value 2').execute().fetchall(), + eq_( + t2.select().where(t2.c.data == 'value 2').execute().fetchall(), [(2, 'value 2 ')] - ) + ) finally: t.drop() @testing.requires.returning + @testing.provide_metadata def test_int_not_float(self): - m = MetaData(testing.db) + m = self.metadata t1 = Table('t1', m, Column('foo', Integer)) t1.create() - try: - r = t1.insert().values(foo=5).returning(t1.c.foo).execute() - x = r.scalar() - assert x == 5 - assert isinstance(x, int) - - x = t1.select().scalar() - assert x == 5 - assert isinstance(x, int) - finally: - t1.drop() + r = t1.insert().values(foo=5).returning(t1.c.foo).execute() + x = r.scalar() + assert x == 5 + assert isinstance(x, int) + + x = t1.select().scalar() + assert x == 5 + assert isinstance(x, int) @testing.provide_metadata def test_rowid(self): @@ -1084,7 +1096,7 @@ class TypesTest(fixtures.TestBase): # the ROWID type is not really needed here, # as cx_oracle just treats it as a string, # but we want to make sure the ROWID works... - rowid_col= column('rowid', oracle.ROWID) + rowid_col = column('rowid', oracle.ROWID) s3 = select([t.c.x, rowid_col]).\ where(rowid_col == cast(rowid, oracle.ROWID)) eq_(s3.select().execute().fetchall(), @@ -1110,8 +1122,9 @@ class TypesTest(fixtures.TestBase): eq_(row['day_interval'], datetime.timedelta(days=35, seconds=5743)) + @testing.provide_metadata def test_numerics(self): - m = MetaData(testing.db) + m = self.metadata t1 = Table('t1', m, Column('intcol', Integer), Column('numericcol', Numeric(precision=9, scale=2)), @@ -1124,41 +1137,38 @@ class TypesTest(fixtures.TestBase): ) t1.create() - try: - t1.insert().execute( - intcol=1, - numericcol=5.2, - floatcol1=6.5, - floatcol2 = 8.5, - doubleprec = 9.5, - numbercol1=12, - numbercol2=14.85, - numbercol3=15.76 - ) - - m2 = MetaData(testing.db) - t2 = Table('t1', m2, autoload=True) + t1.insert().execute( + intcol=1, + numericcol=5.2, + floatcol1=6.5, + floatcol2=8.5, + doubleprec=9.5, + numbercol1=12, + numbercol2=14.85, + numbercol3=15.76 + ) - for row in ( - t1.select().execute().first(), - t2.select().execute().first() - ): - for i, (val, type_) in enumerate(( - (1, int), - (decimal.Decimal("5.2"), decimal.Decimal), - (6.5, float), - (8.5, float), - (9.5, float), - (12, int), - (decimal.Decimal("14.85"), decimal.Decimal), - (15.76, float), - )): - eq_(row[i], val) - assert isinstance(row[i], type_), '%r is not %r' \ - % (row[i], type_) + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + + for row in ( + t1.select().execute().first(), + t2.select().execute().first() + ): + for i, (val, type_) in enumerate(( + (1, int), + (decimal.Decimal("5.2"), decimal.Decimal), + (6.5, float), + (8.5, float), + (9.5, float), + (12, int), + (decimal.Decimal("14.85"), decimal.Decimal), + (15.76, float), + )): + eq_(row[i], val) + assert isinstance(row[i], type_), '%r is not %r' \ + % (row[i], type_) - finally: - t1.drop() def test_numeric_no_decimal_mode(self): @@ -1190,28 +1200,26 @@ class TypesTest(fixtures.TestBase): ) foo.create() - foo.insert().execute( - {'idata':5, 'ndata':decimal.Decimal("45.6"), - 'ndata2':decimal.Decimal("45.0"), - 'nidata':decimal.Decimal('53'), 'fdata':45.68392}, - ) + foo.insert().execute({ + 'idata': 5, + 'ndata': decimal.Decimal("45.6"), + 'ndata2': decimal.Decimal("45.0"), + 'nidata': decimal.Decimal('53'), + 'fdata': 45.68392 + }) - stmt = """ - SELECT - idata, - ndata, - ndata2, - nidata, - fdata - FROM foo - """ + stmt = "SELECT idata, ndata, ndata2, nidata, fdata FROM foo" row = testing.db.execute(stmt).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, int, float]) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, int, float] + ) eq_( row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), 53, 45.683920000000001) + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + 53, 45.683920000000001) ) # with a nested subquery, @@ -1235,7 +1243,10 @@ class TypesTest(fixtures.TestBase): FROM dual """ row = testing.db.execute(stmt).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, int, int, decimal.Decimal]) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, int, int, decimal.Decimal] + ) eq_( row, (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')) @@ -1243,15 +1254,20 @@ class TypesTest(fixtures.TestBase): row = testing.db.execute(text(stmt, typemap={ - 'idata':Integer(), - 'ndata':Numeric(20, 2), - 'ndata2':Numeric(20, 2), - 'nidata':Numeric(5, 0), - 'fdata':Float() + 'idata': Integer(), + 'ndata': Numeric(20, 2), + 'ndata2': Numeric(20, 2), + 'nidata': Numeric(5, 0), + 'fdata': Float() })).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]) - eq_(row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), decimal.Decimal('53'), 45.683920000000001) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + decimal.Decimal('53'), 45.683920000000001) ) stmt = """ @@ -1277,39 +1293,55 @@ class TypesTest(fixtures.TestBase): ) WHERE ROWNUM >= 0) anon_1 """ - row =testing.db.execute(stmt).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, int, int, decimal.Decimal]) - eq_(row, (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392'))) + row = testing.db.execute(stmt).fetchall()[0] + eq_( + [type(x) for x in row], + [int, decimal.Decimal, int, int, decimal.Decimal] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')) + ) row = testing.db.execute(text(stmt, typemap={ - 'anon_1_idata':Integer(), - 'anon_1_ndata':Numeric(20, 2), - 'anon_1_ndata2':Numeric(20, 2), - 'anon_1_nidata':Numeric(5, 0), - 'anon_1_fdata':Float() + 'anon_1_idata': Integer(), + 'anon_1_ndata': Numeric(20, 2), + 'anon_1_ndata2': Numeric(20, 2), + 'anon_1_nidata': Numeric(5, 0), + 'anon_1_fdata': Float() })).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]) - eq_(row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), decimal.Decimal('53'), 45.683920000000001) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + decimal.Decimal('53'), 45.683920000000001) ) row = testing.db.execute(text(stmt, typemap={ - 'anon_1_idata':Integer(), - 'anon_1_ndata':Numeric(20, 2, asdecimal=False), - 'anon_1_ndata2':Numeric(20, 2, asdecimal=False), - 'anon_1_nidata':Numeric(5, 0, asdecimal=False), - 'anon_1_fdata':Float(asdecimal=True) + 'anon_1_idata': Integer(), + 'anon_1_ndata': Numeric(20, 2, asdecimal=False), + 'anon_1_ndata2': Numeric(20, 2, asdecimal=False), + 'anon_1_nidata': Numeric(5, 0, asdecimal=False), + 'anon_1_fdata': Float(asdecimal=True) })).fetchall()[0] - eq_([type(x) for x in row], [int, float, float, float, decimal.Decimal]) - eq_(row, + eq_( + [type(x) for x in row], + [int, float, float, float, decimal.Decimal] + ) + eq_( + row, (5, 45.6, 45, 53, decimal.Decimal('45.68392')) ) + @testing.provide_metadata def test_reflect_dates(self): - metadata = MetaData(testing.db) + metadata = self.metadata Table( "date_types", metadata, Column('d1', DATE), @@ -1318,20 +1350,16 @@ class TypesTest(fixtures.TestBase): Column('d4', oracle.INTERVAL(second_precision=5)), ) metadata.create_all() - try: - m = MetaData(testing.db) - t1 = Table( - "date_types", m, - autoload=True) - assert isinstance(t1.c.d1.type, DATE) - assert isinstance(t1.c.d2.type, TIMESTAMP) - assert not t1.c.d2.type.timezone - assert isinstance(t1.c.d3.type, TIMESTAMP) - assert t1.c.d3.type.timezone - assert isinstance(t1.c.d4.type, oracle.INTERVAL) - - finally: - metadata.drop_all() + m = MetaData(testing.db) + t1 = Table( + "date_types", m, + autoload=True) + assert isinstance(t1.c.d1.type, DATE) + assert isinstance(t1.c.d2.type, TIMESTAMP) + assert not t1.c.d2.type.timezone + assert isinstance(t1.c.d3.type, TIMESTAMP) + assert t1.c.d3.type.timezone + assert isinstance(t1.c.d4.type, oracle.INTERVAL) def test_reflect_all_types_schema(self): types_table = Table('all_types', MetaData(testing.db), @@ -1359,7 +1387,7 @@ class TypesTest(fixtures.TestBase): @testing.provide_metadata def test_reflect_nvarchar(self): metadata = self.metadata - t = Table('t', metadata, + Table('t', metadata, Column('data', sqltypes.NVARCHAR(255)) ) metadata.create_all() @@ -1381,22 +1409,20 @@ class TypesTest(fixtures.TestBase): assert isinstance(res, util.text_type) + @testing.provide_metadata def test_char_length(self): - metadata = MetaData(testing.db) + metadata = self.metadata t1 = Table('t1', metadata, Column("c1", VARCHAR(50)), Column("c2", NVARCHAR(250)), Column("c3", CHAR(200)) ) t1.create() - try: - m2 = MetaData(testing.db) - t2 = Table('t1', m2, autoload=True) - eq_(t2.c.c1.type.length, 50) - eq_(t2.c.c2.type.length, 250) - eq_(t2.c.c3.type.length, 200) - finally: - t1.drop() + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + eq_(t2.c.c1.type.length, 50) + eq_(t2.c.c2.type.length, 250) + eq_(t2.c.c3.type.length, 200) @testing.provide_metadata def test_long_type(self): @@ -1412,8 +1438,6 @@ class TypesTest(fixtures.TestBase): "xyz" ) - - def test_longstring(self): metadata = MetaData(testing.db) testing.db.execute(""" @@ -1464,15 +1488,16 @@ class EuroNumericTest(fixtures.TestBase): del os.environ['NLS_LANG'] self.engine.dispose() - @testing.provide_metadata def test_output_type_handler(self): - metadata = self.metadata for stmt, exp, kw in [ ("SELECT 0.1 FROM DUAL", decimal.Decimal("0.1"), {}), ("SELECT 15 FROM DUAL", 15, {}), - ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", decimal.Decimal("15"), {}), - ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", decimal.Decimal("0.1"), {}), - ("SELECT :num FROM DUAL", decimal.Decimal("2.5"), {'num':decimal.Decimal("2.5")}) + ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", + decimal.Decimal("15"), {}), + ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", + decimal.Decimal("0.1"), {}), + ("SELECT :num FROM DUAL", decimal.Decimal("2.5"), + {'num': decimal.Decimal("2.5")}) ]: test_exp = self.engine.scalar(stmt, **kw) eq_( @@ -1553,97 +1578,86 @@ class BufferedColumnTest(fixtures.TestBase, AssertsCompiledSQL): class UnsupportedIndexReflectTest(fixtures.TestBase): __only_on__ = 'oracle' - def setup(self): - global metadata - metadata = MetaData(testing.db) - t1 = Table('test_index_reflect', metadata, + @testing.emits_warning("No column names") + @testing.provide_metadata + def test_reflect_functional_index(self): + metadata = self.metadata + Table('test_index_reflect', metadata, Column('data', String(20), primary_key=True) ) metadata.create_all() - def teardown(self): - metadata.drop_all() - - @testing.emits_warning("No column names") - def test_reflect_functional_index(self): testing.db.execute('CREATE INDEX DATA_IDX ON ' 'TEST_INDEX_REFLECT (UPPER(DATA))') m2 = MetaData(testing.db) - t2 = Table('test_index_reflect', m2, autoload=True) + Table('test_index_reflect', m2, autoload=True) class RoundTripIndexTest(fixtures.TestBase): __only_on__ = 'oracle' + @testing.provide_metadata def test_basic(self): - engine = testing.db - metadata = MetaData(engine) + metadata = self.metadata - table=Table("sometable", metadata, + table = Table("sometable", metadata, Column("id_a", Unicode(255), primary_key=True), Column("id_b", Unicode(255), primary_key=True, unique=True), Column("group", Unicode(255), primary_key=True), Column("col", Unicode(255)), - UniqueConstraint('col','group'), + UniqueConstraint('col', 'group'), ) # "group" is a keyword, so lower case normalind = Index('tableind', table.c.id_b, table.c.group) - # create metadata.create_all() - try: - # round trip, create from reflection - mirror = MetaData(engine) - mirror.reflect() - metadata.drop_all() - mirror.create_all() - - # inspect the reflected creation - inspect = MetaData(engine) - inspect.reflect() - - def obj_definition(obj): - return obj.__class__, tuple([c.name for c in - obj.columns]), getattr(obj, 'unique', None) - - # find what the primary k constraint name should be - primaryconsname = engine.execute( - text("""SELECT constraint_name - FROM all_constraints - WHERE table_name = :table_name - AND owner = :owner - AND constraint_type = 'P' """), - table_name=table.name.upper(), - owner=engine.url.username.upper()).fetchall()[0][0] - - reflectedtable = inspect.tables[table.name] - - # make a dictionary of the reflected objects: - - reflected = dict([(obj_definition(i), i) for i in - reflectedtable.indexes - | reflectedtable.constraints]) - - # assert we got primary key constraint and its name, Error - # if not in dict - - assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b', - 'group'), None)].name.upper() \ - == primaryconsname.upper() - - # Error if not in dict - - assert reflected[(Index, ('id_b', 'group'), False)].name \ - == normalind.name - assert (Index, ('id_b', ), True) in reflected - assert (Index, ('col', 'group'), True) in reflected - assert len(reflectedtable.constraints) == 1 - assert len(reflectedtable.indexes) == 3 + mirror = MetaData(testing.db) + mirror.reflect() + metadata.drop_all() + mirror.create_all() - finally: - metadata.drop_all() + inspect = MetaData(testing.db) + inspect.reflect() + + def obj_definition(obj): + return obj.__class__, tuple([c.name for c in + obj.columns]), getattr(obj, 'unique', None) + + # find what the primary k constraint name should be + primaryconsname = testing.db.execute( + text("""SELECT constraint_name + FROM all_constraints + WHERE table_name = :table_name + AND owner = :owner + AND constraint_type = 'P' """), + table_name=table.name.upper(), + owner=testing.db.url.username.upper()).fetchall()[0][0] + reflectedtable = inspect.tables[table.name] + # make a dictionary of the reflected objects: + + reflected = dict([(obj_definition(i), i) for i in + reflectedtable.indexes + | reflectedtable.constraints]) + + # assert we got primary key constraint and its name, Error + # if not in dict + + assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b', + 'group'), None)].name.upper() \ + == primaryconsname.upper() + + # Error if not in dict + + eq_( + reflected[(Index, ('id_b', 'group'), False)].name, + normalind.name + ) + assert (Index, ('id_b', ), True) in reflected + assert (Index, ('col', 'group'), True) in reflected + eq_(len(reflectedtable.constraints), 1) + eq_(len(reflectedtable.indexes), 3) class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): @@ -1690,11 +1704,11 @@ class ExecuteTest(fixtures.TestBase): metadata.create_all() t.insert().execute( - {'id':1, 'data':1}, - {'id':2, 'data':7}, - {'id':3, 'data':12}, - {'id':4, 'data':15}, - {'id':5, 'data':32}, + {'id': 1, 'data': 1}, + {'id': 2, 'data': 7}, + {'id': 3, 'data': 12}, + {'id': 4, 'data': 15}, + {'id': 5, 'data': 32}, ) # here, we can't use ORDER BY. @@ -1719,7 +1733,7 @@ class UnicodeSchemaTest(fixtures.TestBase): @testing.provide_metadata def test_quoted_column_non_unicode(self): metadata = self.metadata - table=Table("atable", metadata, + table = Table("atable", metadata, Column("_underscorecolumn", Unicode(255), primary_key=True), ) metadata.create_all() @@ -1728,14 +1742,14 @@ class UnicodeSchemaTest(fixtures.TestBase): {'_underscorecolumn': u('’é')}, ) result = testing.db.execute( - table.select().where(table.c._underscorecolumn==u('’é')) + table.select().where(table.c._underscorecolumn == u('’é')) ).scalar() eq_(result, u('’é')) @testing.provide_metadata def test_quoted_column_unicode(self): metadata = self.metadata - table=Table("atable", metadata, + table = Table("atable", metadata, Column(u("méil"), Unicode(255), primary_key=True), ) metadata.create_all() -- cgit v1.2.1 From 467784e89c0817a74df32db4b12bd8b3e28a05df Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Fri, 22 Nov 2013 17:56:35 -0500 Subject: Fixed bug where Oracle ``VARCHAR`` types given with no length (e.g. for a ``CAST`` or similar) would incorrectly render ``None CHAR`` or similar. [ticket:2870] --- test/dialect/test_oracle.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'test/dialect/test_oracle.py') diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index 36dae9e90..185bfb883 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -1013,6 +1013,12 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL): (VARCHAR(50), "VARCHAR(50 CHAR)"), (oracle.NVARCHAR2(50), "NVARCHAR2(50)"), (oracle.VARCHAR2(50), "VARCHAR2(50 CHAR)"), + (String(), "VARCHAR2"), + (Unicode(), "NVARCHAR2"), + (NVARCHAR(), "NVARCHAR2"), + (VARCHAR(), "VARCHAR"), + (oracle.NVARCHAR2(), "NVARCHAR2"), + (oracle.VARCHAR2(), "VARCHAR2"), ]: self.assert_compile(typ, exp, dialect=dialect) -- cgit v1.2.1 From 4aaf3753d75c68050c136e734c29aae5ff9504b4 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Thu, 28 Nov 2013 22:25:09 -0500 Subject: - fix up rendering of "of" - move out tests, dialect specific out of compiler, compiler tests use new API, legacy API tests in test_selecatble - add support for adaptation of ForUpdateArg, alias support in compilers --- test/dialect/test_oracle.py | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) (limited to 'test/dialect/test_oracle.py') diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index 185bfb883..3af57c50b 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -217,6 +217,49 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): ':ROWNUM_1) WHERE ora_rn > :ora_rn_1 FOR ' 'UPDATE') + def test_for_update(self): + table1 = table('mytable', + column('myid'), column('name'), column('description')) + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(of=table1.c.myid), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF mytable.myid") + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(nowait=True), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT") + + self.assert_compile( + table1.select(table1.c.myid == 7). + with_for_update(nowait=True, of=table1.c.myid), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 " + "FOR UPDATE OF mytable.myid NOWAIT") + + self.assert_compile( + table1.select(table1.c.myid == 7). + with_for_update(nowait=True, of=[table1.c.myid, table1.c.name]), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF " + "mytable.myid, mytable.name NOWAIT") + + ta = table1.alias() + self.assert_compile( + ta.select(ta.c.myid == 7). + with_for_update(of=[ta.c.myid, ta.c.name]), + "SELECT mytable_1.myid, mytable_1.name, mytable_1.description " + "FROM mytable mytable_1 " + "WHERE mytable_1.myid = :myid_1 FOR UPDATE OF " + "mytable_1.myid, mytable_1.name" + ) + def test_limit_preserves_typing_information(self): class MyType(TypeDecorator): impl = Integer -- cgit v1.2.1 From 31cecebd4831fbf58310509c1486244a532d96b9 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Thu, 28 Nov 2013 23:23:27 -0500 Subject: - add support for specifying tables or entities for "of" - implement Query with_for_update() - rework docs and tests --- test/dialect/test_oracle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test/dialect/test_oracle.py') diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index 3af57c50b..8d0ff9776 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -106,7 +106,7 @@ class QuotedBindRoundTripTest(fixtures.TestBase): class CompileTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = oracle.dialect() + __dialect__ = "oracle" #oracle.dialect() def test_true_false(self): self.assert_compile( -- cgit v1.2.1