summaryrefslogtreecommitdiff
path: root/test/dialect/test_oracle.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2014-01-21 20:10:23 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2014-01-21 20:10:23 -0500
commit07fb90c6cc14de6d02cf4be592c57d56831f59f7 (patch)
tree050ef65db988559c60f7aa40f2d0bfe24947e548 /test/dialect/test_oracle.py
parent560fd1d5ed643a1b0f95296f3b840c1963bbe67f (diff)
parentee1f4d21037690ad996c5eacf7e1200e92f2fbaa (diff)
downloadsqlalchemy-ticket_2501.tar.gz
Merge branch 'master' into ticket_2501ticket_2501
Conflicts: lib/sqlalchemy/orm/mapper.py
Diffstat (limited to 'test/dialect/test_oracle.py')
-rw-r--r--test/dialect/test_oracle.py635
1 files changed, 369 insertions, 266 deletions
diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py
index 71b2d96cb..8d0ff9776 100644
--- a/test/dialect/test_oracle.py
+++ b/test/dialect/test_oracle.py
@@ -18,7 +18,7 @@ from sqlalchemy.testing.schema import Table, Column
import datetime
import os
from sqlalchemy import sql
-
+from sqlalchemy.testing.mock import Mock
class OutParamTest(fixtures.TestBase, AssertsExecutionResults):
__only_on__ = 'oracle+cx_oracle'
@@ -26,31 +26,31 @@ class OutParamTest(fixtures.TestBase, AssertsExecutionResults):
@classmethod
def setup_class(cls):
testing.db.execute("""
-create or replace procedure foo(x_in IN number, x_out OUT number, y_out OUT number, z_out OUT varchar) IS
- retval number;
- begin
- retval := 6;
- x_out := 10;
- y_out := x_in * 15;
- z_out := NULL;
- end;
+ create or replace procedure foo(x_in IN number, x_out OUT number,
+ y_out OUT number, z_out OUT varchar) IS
+ retval number;
+ begin
+ retval := 6;
+ x_out := 10;
+ y_out := x_in * 15;
+ z_out := NULL;
+ end;
""")
def test_out_params(self):
- result = \
- testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, '
+ result = testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, '
':z_out); end;',
bindparams=[bindparam('x_in', Float),
outparam('x_out', Integer),
outparam('y_out', Float),
outparam('z_out', String)]), x_in=5)
- eq_(result.out_parameters, {'x_out': 10, 'y_out': 75, 'z_out'
- : None})
+ eq_(result.out_parameters,
+ {'x_out': 10, 'y_out': 75, 'z_out': None})
assert isinstance(result.out_parameters['x_out'], int)
@classmethod
def teardown_class(cls):
- testing.db.execute("DROP PROCEDURE foo")
+ testing.db.execute("DROP PROCEDURE foo")
class CXOracleArgsTest(fixtures.TestBase):
__only_on__ = 'oracle+cx_oracle'
@@ -92,7 +92,7 @@ class QuotedBindRoundTripTest(fixtures.TestBase):
metadata.create_all()
table.insert().execute(
- {"option":1, "plain":1, "union":1}
+ {"option": 1, "plain": 1, "union": 1}
)
eq_(
testing.db.execute(table.select()).first(),
@@ -106,8 +106,7 @@ class QuotedBindRoundTripTest(fixtures.TestBase):
class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
-
- __dialect__ = oracle.dialect()
+ __dialect__ = "oracle" #oracle.dialect()
def test_true_false(self):
self.assert_compile(
@@ -218,6 +217,49 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
':ROWNUM_1) WHERE ora_rn > :ora_rn_1 FOR '
'UPDATE')
+ def test_for_update(self):
+ table1 = table('mytable',
+ column('myid'), column('name'), column('description'))
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).with_for_update(),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).with_for_update(of=table1.c.myid),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF mytable.myid")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).with_for_update(nowait=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).
+ with_for_update(nowait=True, of=table1.c.myid),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 "
+ "FOR UPDATE OF mytable.myid NOWAIT")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).
+ with_for_update(nowait=True, of=[table1.c.myid, table1.c.name]),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF "
+ "mytable.myid, mytable.name NOWAIT")
+
+ ta = table1.alias()
+ self.assert_compile(
+ ta.select(ta.c.myid == 7).
+ with_for_update(of=[ta.c.myid, ta.c.name]),
+ "SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
+ "FROM mytable mytable_1 "
+ "WHERE mytable_1.myid = :myid_1 FOR UPDATE OF "
+ "mytable_1.myid, mytable_1.name"
+ )
+
def test_limit_preserves_typing_information(self):
class MyType(TypeDecorator):
impl = Integer
@@ -250,7 +292,7 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
def test_use_binds_for_limits_enabled(self):
t = table('sometable', column('col1'), column('col2'))
- dialect = oracle.OracleDialect(use_binds_for_limits = True)
+ dialect = oracle.OracleDialect(use_binds_for_limits=True)
self.assert_compile(select([t]).limit(10),
"SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, "
@@ -348,8 +390,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
)
query = select([table1, table2], or_(table1.c.name == 'fred',
- table1.c.myid == 10, table2.c.othername != 'jack'
- , 'EXISTS (select yay from foo where boo = lar)'
+ table1.c.myid == 10, table2.c.othername != 'jack',
+ 'EXISTS (select yay from foo where boo = lar)'
), from_obj=[outerjoin(table1, table2,
table1.c.myid == table2.c.otherid)])
self.assert_compile(query,
@@ -435,8 +477,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
'mytable.description AS description FROM '
'mytable LEFT OUTER JOIN myothertable ON '
'mytable.myid = myothertable.otherid) '
- 'anon_1 ON thirdtable.userid = anon_1.myid'
- , dialect=oracle.dialect(use_ansi=True))
+ 'anon_1 ON thirdtable.userid = anon_1.myid',
+ dialect=oracle.dialect(use_ansi=True))
self.assert_compile(q,
'SELECT thirdtable.userid, '
@@ -549,7 +591,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
def test_returning_insert_labeled(self):
t1 = table('t1', column('c1'), column('c2'), column('c3'))
self.assert_compile(
- t1.insert().values(c1=1).returning(t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')),
+ t1.insert().values(c1=1).returning(
+ t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')),
"INSERT INTO t1 (c1) VALUES (:c1) RETURNING "
"t1.c2, t1.c3 INTO :ret_0, :ret_1"
)
@@ -587,33 +630,52 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
schema.CreateIndex(Index("bar", t1.c.x)),
"CREATE INDEX alt_schema.bar ON alt_schema.foo (x)"
)
+
+ def test_create_index_expr(self):
+ m = MetaData()
+ t1 = Table('foo', m,
+ Column('x', Integer)
+ )
+ self.assert_compile(
+ schema.CreateIndex(Index("bar", t1.c.x > 5)),
+ "CREATE INDEX bar ON foo (x > 5)"
+ )
+
class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
- __only_on__ = 'oracle'
- def test_ora8_flags(self):
- def server_version_info(self):
- return (8, 2, 5)
+ def _dialect(self, server_version, **kw):
+ def server_version_info(conn):
+ return server_version
- dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi)
+ dialect = oracle.dialect(
+ dbapi=Mock(version="0.0.0", paramstyle="named"),
+ **kw)
dialect._get_server_version_info = server_version_info
+ dialect._check_unicode_returns = Mock()
+ dialect._check_unicode_description = Mock()
+ dialect._get_default_schema_name = Mock()
+ return dialect
+
+
+ def test_ora8_flags(self):
+ dialect = self._dialect((8, 2, 5))
# before connect, assume modern DB
assert dialect._supports_char_length
assert dialect._supports_nchar
assert dialect.use_ansi
- dialect.initialize(testing.db.connect())
+ dialect.initialize(Mock())
assert not dialect.implicit_returning
assert not dialect._supports_char_length
assert not dialect._supports_nchar
assert not dialect.use_ansi
- self.assert_compile(String(50),"VARCHAR2(50)",dialect=dialect)
- self.assert_compile(Unicode(50),"VARCHAR2(50)",dialect=dialect)
- self.assert_compile(UnicodeText(),"CLOB",dialect=dialect)
+ self.assert_compile(String(50), "VARCHAR2(50)", dialect=dialect)
+ self.assert_compile(Unicode(50), "VARCHAR2(50)", dialect=dialect)
+ self.assert_compile(UnicodeText(), "CLOB", dialect=dialect)
- dialect = oracle.dialect(implicit_returning=True,
- dbapi=testing.db.dialect.dbapi)
- dialect._get_server_version_info = server_version_info
+
+ dialect = self._dialect((8, 2, 5), implicit_returning=True)
dialect.initialize(testing.db.connect())
assert dialect.implicit_returning
@@ -621,26 +683,25 @@ class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
def test_default_flags(self):
"""test with no initialization or server version info"""
- dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi)
+ dialect = self._dialect(None)
+
assert dialect._supports_char_length
assert dialect._supports_nchar
assert dialect.use_ansi
- self.assert_compile(String(50),"VARCHAR2(50 CHAR)",dialect=dialect)
- self.assert_compile(Unicode(50),"NVARCHAR2(50)",dialect=dialect)
- self.assert_compile(UnicodeText(),"NCLOB",dialect=dialect)
+ self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect)
+ self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect)
+ self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect)
def test_ora10_flags(self):
- def server_version_info(self):
- return (10, 2, 5)
- dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi)
- dialect._get_server_version_info = server_version_info
- dialect.initialize(testing.db.connect())
+ dialect = self._dialect((10, 2, 5))
+
+ dialect.initialize(Mock())
assert dialect._supports_char_length
assert dialect._supports_nchar
assert dialect.use_ansi
- self.assert_compile(String(50),"VARCHAR2(50 CHAR)",dialect=dialect)
- self.assert_compile(Unicode(50),"NVARCHAR2(50)",dialect=dialect)
- self.assert_compile(UnicodeText(),"NCLOB",dialect=dialect)
+ self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect)
+ self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect)
+ self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect)
class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL):
@@ -664,9 +725,18 @@ create table test_schema.child(
parent_id integer references test_schema.parent(id)
);
+create table local_table(
+ id integer primary key,
+ data varchar2(50)
+);
+
create synonym test_schema.ptable for test_schema.parent;
create synonym test_schema.ctable for test_schema.child;
+create synonym test_schema_ptable for test_schema.parent;
+
+create synonym test_schema.local_table for local_table;
+
-- can't make a ref from local schema to the
-- remote schema's table without this,
-- *and* cant give yourself a grant !
@@ -682,15 +752,20 @@ grant references on test_schema.child to public;
for stmt in """
drop table test_schema.child;
drop table test_schema.parent;
+drop table local_table;
drop synonym test_schema.ctable;
drop synonym test_schema.ptable;
+drop synonym test_schema_ptable;
+drop synonym test_schema.local_table;
+
""".split(";"):
if stmt.strip():
testing.db.execute(stmt)
+ @testing.provide_metadata
def test_create_same_names_explicit_schema(self):
schema = testing.db.dialect.default_schema_name
- meta = MetaData(testing.db)
+ meta = self.metadata
parent = Table('parent', meta,
Column('pid', Integer, primary_key=True),
schema=schema
@@ -701,15 +776,31 @@ drop synonym test_schema.ptable;
schema=schema
)
meta.create_all()
- try:
- parent.insert().execute({'pid':1})
- child.insert().execute({'cid':1, 'pid':1})
- eq_(child.select().execute().fetchall(), [(1, 1)])
- finally:
- meta.drop_all()
+ parent.insert().execute({'pid': 1})
+ child.insert().execute({'cid': 1, 'pid': 1})
+ eq_(child.select().execute().fetchall(), [(1, 1)])
- def test_create_same_names_implicit_schema(self):
+ def test_reflect_alt_table_owner_local_synonym(self):
meta = MetaData(testing.db)
+ parent = Table('test_schema_ptable', meta, autoload=True,
+ oracle_resolve_synonyms=True)
+ self.assert_compile(parent.select(),
+ "SELECT test_schema_ptable.id, "
+ "test_schema_ptable.data FROM test_schema_ptable")
+ select([parent]).execute().fetchall()
+
+ def test_reflect_alt_synonym_owner_local_table(self):
+ meta = MetaData(testing.db)
+ parent = Table('local_table', meta, autoload=True,
+ oracle_resolve_synonyms=True, schema="test_schema")
+ self.assert_compile(parent.select(),
+ "SELECT test_schema.local_table.id, "
+ "test_schema.local_table.data FROM test_schema.local_table")
+ select([parent]).execute().fetchall()
+
+ @testing.provide_metadata
+ def test_create_same_names_implicit_schema(self):
+ meta = self.metadata
parent = Table('parent', meta,
Column('pid', Integer, primary_key=True),
)
@@ -718,12 +809,9 @@ drop synonym test_schema.ptable;
Column('pid', Integer, ForeignKey('parent.pid')),
)
meta.create_all()
- try:
- parent.insert().execute({'pid':1})
- child.insert().execute({'cid':1, 'pid':1})
- eq_(child.select().execute().fetchall(), [(1, 1)])
- finally:
- meta.drop_all()
+ parent.insert().execute({'pid': 1})
+ child.insert().execute({'cid': 1, 'pid': 1})
+ eq_(child.select().execute().fetchall(), [(1, 1)])
def test_reflect_alt_owner_explicit(self):
@@ -911,10 +999,17 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL):
dbapi = FakeDBAPI()
b = bindparam("foo", "hello world!")
- assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING'
+ eq_(
+ b.type.dialect_impl(dialect).get_dbapi_type(dbapi),
+ 'STRING'
+ )
b = bindparam("foo", "hello world!")
- assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING'
+ eq_(
+ b.type.dialect_impl(dialect).get_dbapi_type(dbapi),
+ 'STRING'
+ )
+
def test_long(self):
self.assert_compile(oracle.LONG(), "LONG")
@@ -943,14 +1038,14 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(oracle.RAW(35), "RAW(35)")
def test_char_length(self):
- self.assert_compile(VARCHAR(50),"VARCHAR(50 CHAR)")
+ self.assert_compile(VARCHAR(50), "VARCHAR(50 CHAR)")
oracle8dialect = oracle.dialect()
oracle8dialect.server_version_info = (8, 0)
- self.assert_compile(VARCHAR(50),"VARCHAR(50)",dialect=oracle8dialect)
+ self.assert_compile(VARCHAR(50), "VARCHAR(50)", dialect=oracle8dialect)
- self.assert_compile(NVARCHAR(50),"NVARCHAR2(50)")
- self.assert_compile(CHAR(50),"CHAR(50)")
+ self.assert_compile(NVARCHAR(50), "NVARCHAR2(50)")
+ self.assert_compile(CHAR(50), "CHAR(50)")
def test_varchar_types(self):
dialect = oracle.dialect()
@@ -961,6 +1056,12 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL):
(VARCHAR(50), "VARCHAR(50 CHAR)"),
(oracle.NVARCHAR2(50), "NVARCHAR2(50)"),
(oracle.VARCHAR2(50), "VARCHAR2(50 CHAR)"),
+ (String(), "VARCHAR2"),
+ (Unicode(), "NVARCHAR2"),
+ (NVARCHAR(), "NVARCHAR2"),
+ (VARCHAR(), "VARCHAR"),
+ (oracle.NVARCHAR2(), "NVARCHAR2"),
+ (oracle.VARCHAR2(), "VARCHAR2"),
]:
self.assert_compile(typ, exp, dialect=dialect)
@@ -998,36 +1099,36 @@ class TypesTest(fixtures.TestBase):
dict(id=3, data="value 3")
)
- eq_(t.select().where(t.c.data=='value 2').execute().fetchall(),
+ eq_(
+ t.select().where(t.c.data == 'value 2').execute().fetchall(),
[(2, 'value 2 ')]
- )
+ )
m2 = MetaData(testing.db)
t2 = Table('t1', m2, autoload=True)
assert type(t2.c.data.type) is CHAR
- eq_(t2.select().where(t2.c.data=='value 2').execute().fetchall(),
+ eq_(
+ t2.select().where(t2.c.data == 'value 2').execute().fetchall(),
[(2, 'value 2 ')]
- )
+ )
finally:
t.drop()
@testing.requires.returning
+ @testing.provide_metadata
def test_int_not_float(self):
- m = MetaData(testing.db)
+ m = self.metadata
t1 = Table('t1', m, Column('foo', Integer))
t1.create()
- try:
- r = t1.insert().values(foo=5).returning(t1.c.foo).execute()
- x = r.scalar()
- assert x == 5
- assert isinstance(x, int)
-
- x = t1.select().scalar()
- assert x == 5
- assert isinstance(x, int)
- finally:
- t1.drop()
+ r = t1.insert().values(foo=5).returning(t1.c.foo).execute()
+ x = r.scalar()
+ assert x == 5
+ assert isinstance(x, int)
+
+ x = t1.select().scalar()
+ assert x == 5
+ assert isinstance(x, int)
@testing.provide_metadata
def test_rowid(self):
@@ -1044,7 +1145,7 @@ class TypesTest(fixtures.TestBase):
# the ROWID type is not really needed here,
# as cx_oracle just treats it as a string,
# but we want to make sure the ROWID works...
- rowid_col= column('rowid', oracle.ROWID)
+ rowid_col = column('rowid', oracle.ROWID)
s3 = select([t.c.x, rowid_col]).\
where(rowid_col == cast(rowid, oracle.ROWID))
eq_(s3.select().execute().fetchall(),
@@ -1070,8 +1171,9 @@ class TypesTest(fixtures.TestBase):
eq_(row['day_interval'], datetime.timedelta(days=35,
seconds=5743))
+ @testing.provide_metadata
def test_numerics(self):
- m = MetaData(testing.db)
+ m = self.metadata
t1 = Table('t1', m,
Column('intcol', Integer),
Column('numericcol', Numeric(precision=9, scale=2)),
@@ -1084,41 +1186,38 @@ class TypesTest(fixtures.TestBase):
)
t1.create()
- try:
- t1.insert().execute(
- intcol=1,
- numericcol=5.2,
- floatcol1=6.5,
- floatcol2 = 8.5,
- doubleprec = 9.5,
- numbercol1=12,
- numbercol2=14.85,
- numbercol3=15.76
- )
-
- m2 = MetaData(testing.db)
- t2 = Table('t1', m2, autoload=True)
+ t1.insert().execute(
+ intcol=1,
+ numericcol=5.2,
+ floatcol1=6.5,
+ floatcol2=8.5,
+ doubleprec=9.5,
+ numbercol1=12,
+ numbercol2=14.85,
+ numbercol3=15.76
+ )
- for row in (
- t1.select().execute().first(),
- t2.select().execute().first()
- ):
- for i, (val, type_) in enumerate((
- (1, int),
- (decimal.Decimal("5.2"), decimal.Decimal),
- (6.5, float),
- (8.5, float),
- (9.5, float),
- (12, int),
- (decimal.Decimal("14.85"), decimal.Decimal),
- (15.76, float),
- )):
- eq_(row[i], val)
- assert isinstance(row[i], type_), '%r is not %r' \
- % (row[i], type_)
+ m2 = MetaData(testing.db)
+ t2 = Table('t1', m2, autoload=True)
+
+ for row in (
+ t1.select().execute().first(),
+ t2.select().execute().first()
+ ):
+ for i, (val, type_) in enumerate((
+ (1, int),
+ (decimal.Decimal("5.2"), decimal.Decimal),
+ (6.5, float),
+ (8.5, float),
+ (9.5, float),
+ (12, int),
+ (decimal.Decimal("14.85"), decimal.Decimal),
+ (15.76, float),
+ )):
+ eq_(row[i], val)
+ assert isinstance(row[i], type_), '%r is not %r' \
+ % (row[i], type_)
- finally:
- t1.drop()
def test_numeric_no_decimal_mode(self):
@@ -1150,28 +1249,26 @@ class TypesTest(fixtures.TestBase):
)
foo.create()
- foo.insert().execute(
- {'idata':5, 'ndata':decimal.Decimal("45.6"),
- 'ndata2':decimal.Decimal("45.0"),
- 'nidata':decimal.Decimal('53'), 'fdata':45.68392},
- )
+ foo.insert().execute({
+ 'idata': 5,
+ 'ndata': decimal.Decimal("45.6"),
+ 'ndata2': decimal.Decimal("45.0"),
+ 'nidata': decimal.Decimal('53'),
+ 'fdata': 45.68392
+ })
- stmt = """
- SELECT
- idata,
- ndata,
- ndata2,
- nidata,
- fdata
- FROM foo
- """
+ stmt = "SELECT idata, ndata, ndata2, nidata, fdata FROM foo"
row = testing.db.execute(stmt).fetchall()[0]
- eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, int, float])
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, decimal.Decimal, int, float]
+ )
eq_(
row,
- (5, decimal.Decimal('45.6'), decimal.Decimal('45'), 53, 45.683920000000001)
+ (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
+ 53, 45.683920000000001)
)
# with a nested subquery,
@@ -1195,7 +1292,10 @@ class TypesTest(fixtures.TestBase):
FROM dual
"""
row = testing.db.execute(stmt).fetchall()[0]
- eq_([type(x) for x in row], [int, decimal.Decimal, int, int, decimal.Decimal])
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, int, int, decimal.Decimal]
+ )
eq_(
row,
(5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392'))
@@ -1203,15 +1303,20 @@ class TypesTest(fixtures.TestBase):
row = testing.db.execute(text(stmt,
typemap={
- 'idata':Integer(),
- 'ndata':Numeric(20, 2),
- 'ndata2':Numeric(20, 2),
- 'nidata':Numeric(5, 0),
- 'fdata':Float()
+ 'idata': Integer(),
+ 'ndata': Numeric(20, 2),
+ 'ndata2': Numeric(20, 2),
+ 'nidata': Numeric(5, 0),
+ 'fdata': Float()
})).fetchall()[0]
- eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float])
- eq_(row,
- (5, decimal.Decimal('45.6'), decimal.Decimal('45'), decimal.Decimal('53'), 45.683920000000001)
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]
+ )
+ eq_(
+ row,
+ (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
+ decimal.Decimal('53'), 45.683920000000001)
)
stmt = """
@@ -1237,39 +1342,55 @@ class TypesTest(fixtures.TestBase):
)
WHERE ROWNUM >= 0) anon_1
"""
- row =testing.db.execute(stmt).fetchall()[0]
- eq_([type(x) for x in row], [int, decimal.Decimal, int, int, decimal.Decimal])
- eq_(row, (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')))
+ row = testing.db.execute(stmt).fetchall()[0]
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, int, int, decimal.Decimal]
+ )
+ eq_(
+ row,
+ (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392'))
+ )
row = testing.db.execute(text(stmt,
typemap={
- 'anon_1_idata':Integer(),
- 'anon_1_ndata':Numeric(20, 2),
- 'anon_1_ndata2':Numeric(20, 2),
- 'anon_1_nidata':Numeric(5, 0),
- 'anon_1_fdata':Float()
+ 'anon_1_idata': Integer(),
+ 'anon_1_ndata': Numeric(20, 2),
+ 'anon_1_ndata2': Numeric(20, 2),
+ 'anon_1_nidata': Numeric(5, 0),
+ 'anon_1_fdata': Float()
})).fetchall()[0]
- eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float])
- eq_(row,
- (5, decimal.Decimal('45.6'), decimal.Decimal('45'), decimal.Decimal('53'), 45.683920000000001)
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]
+ )
+ eq_(
+ row,
+ (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
+ decimal.Decimal('53'), 45.683920000000001)
)
row = testing.db.execute(text(stmt,
typemap={
- 'anon_1_idata':Integer(),
- 'anon_1_ndata':Numeric(20, 2, asdecimal=False),
- 'anon_1_ndata2':Numeric(20, 2, asdecimal=False),
- 'anon_1_nidata':Numeric(5, 0, asdecimal=False),
- 'anon_1_fdata':Float(asdecimal=True)
+ 'anon_1_idata': Integer(),
+ 'anon_1_ndata': Numeric(20, 2, asdecimal=False),
+ 'anon_1_ndata2': Numeric(20, 2, asdecimal=False),
+ 'anon_1_nidata': Numeric(5, 0, asdecimal=False),
+ 'anon_1_fdata': Float(asdecimal=True)
})).fetchall()[0]
- eq_([type(x) for x in row], [int, float, float, float, decimal.Decimal])
- eq_(row,
+ eq_(
+ [type(x) for x in row],
+ [int, float, float, float, decimal.Decimal]
+ )
+ eq_(
+ row,
(5, 45.6, 45, 53, decimal.Decimal('45.68392'))
)
+ @testing.provide_metadata
def test_reflect_dates(self):
- metadata = MetaData(testing.db)
+ metadata = self.metadata
Table(
"date_types", metadata,
Column('d1', DATE),
@@ -1278,20 +1399,16 @@ class TypesTest(fixtures.TestBase):
Column('d4', oracle.INTERVAL(second_precision=5)),
)
metadata.create_all()
- try:
- m = MetaData(testing.db)
- t1 = Table(
- "date_types", m,
- autoload=True)
- assert isinstance(t1.c.d1.type, DATE)
- assert isinstance(t1.c.d2.type, TIMESTAMP)
- assert not t1.c.d2.type.timezone
- assert isinstance(t1.c.d3.type, TIMESTAMP)
- assert t1.c.d3.type.timezone
- assert isinstance(t1.c.d4.type, oracle.INTERVAL)
-
- finally:
- metadata.drop_all()
+ m = MetaData(testing.db)
+ t1 = Table(
+ "date_types", m,
+ autoload=True)
+ assert isinstance(t1.c.d1.type, DATE)
+ assert isinstance(t1.c.d2.type, TIMESTAMP)
+ assert not t1.c.d2.type.timezone
+ assert isinstance(t1.c.d3.type, TIMESTAMP)
+ assert t1.c.d3.type.timezone
+ assert isinstance(t1.c.d4.type, oracle.INTERVAL)
def test_reflect_all_types_schema(self):
types_table = Table('all_types', MetaData(testing.db),
@@ -1319,7 +1436,7 @@ class TypesTest(fixtures.TestBase):
@testing.provide_metadata
def test_reflect_nvarchar(self):
metadata = self.metadata
- t = Table('t', metadata,
+ Table('t', metadata,
Column('data', sqltypes.NVARCHAR(255))
)
metadata.create_all()
@@ -1341,22 +1458,20 @@ class TypesTest(fixtures.TestBase):
assert isinstance(res, util.text_type)
+ @testing.provide_metadata
def test_char_length(self):
- metadata = MetaData(testing.db)
+ metadata = self.metadata
t1 = Table('t1', metadata,
Column("c1", VARCHAR(50)),
Column("c2", NVARCHAR(250)),
Column("c3", CHAR(200))
)
t1.create()
- try:
- m2 = MetaData(testing.db)
- t2 = Table('t1', m2, autoload=True)
- eq_(t2.c.c1.type.length, 50)
- eq_(t2.c.c2.type.length, 250)
- eq_(t2.c.c3.type.length, 200)
- finally:
- t1.drop()
+ m2 = MetaData(testing.db)
+ t2 = Table('t1', m2, autoload=True)
+ eq_(t2.c.c1.type.length, 50)
+ eq_(t2.c.c2.type.length, 250)
+ eq_(t2.c.c3.type.length, 200)
@testing.provide_metadata
def test_long_type(self):
@@ -1372,8 +1487,6 @@ class TypesTest(fixtures.TestBase):
"xyz"
)
-
-
def test_longstring(self):
metadata = MetaData(testing.db)
testing.db.execute("""
@@ -1424,15 +1537,16 @@ class EuroNumericTest(fixtures.TestBase):
del os.environ['NLS_LANG']
self.engine.dispose()
- @testing.provide_metadata
def test_output_type_handler(self):
- metadata = self.metadata
for stmt, exp, kw in [
("SELECT 0.1 FROM DUAL", decimal.Decimal("0.1"), {}),
("SELECT 15 FROM DUAL", 15, {}),
- ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", decimal.Decimal("15"), {}),
- ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", decimal.Decimal("0.1"), {}),
- ("SELECT :num FROM DUAL", decimal.Decimal("2.5"), {'num':decimal.Decimal("2.5")})
+ ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL",
+ decimal.Decimal("15"), {}),
+ ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL",
+ decimal.Decimal("0.1"), {}),
+ ("SELECT :num FROM DUAL", decimal.Decimal("2.5"),
+ {'num': decimal.Decimal("2.5")})
]:
test_exp = self.engine.scalar(stmt, **kw)
eq_(
@@ -1513,97 +1627,86 @@ class BufferedColumnTest(fixtures.TestBase, AssertsCompiledSQL):
class UnsupportedIndexReflectTest(fixtures.TestBase):
__only_on__ = 'oracle'
- def setup(self):
- global metadata
- metadata = MetaData(testing.db)
- t1 = Table('test_index_reflect', metadata,
+ @testing.emits_warning("No column names")
+ @testing.provide_metadata
+ def test_reflect_functional_index(self):
+ metadata = self.metadata
+ Table('test_index_reflect', metadata,
Column('data', String(20), primary_key=True)
)
metadata.create_all()
- def teardown(self):
- metadata.drop_all()
-
- @testing.emits_warning("No column names")
- def test_reflect_functional_index(self):
testing.db.execute('CREATE INDEX DATA_IDX ON '
'TEST_INDEX_REFLECT (UPPER(DATA))')
m2 = MetaData(testing.db)
- t2 = Table('test_index_reflect', m2, autoload=True)
+ Table('test_index_reflect', m2, autoload=True)
class RoundTripIndexTest(fixtures.TestBase):
__only_on__ = 'oracle'
+ @testing.provide_metadata
def test_basic(self):
- engine = testing.db
- metadata = MetaData(engine)
+ metadata = self.metadata
- table=Table("sometable", metadata,
+ table = Table("sometable", metadata,
Column("id_a", Unicode(255), primary_key=True),
Column("id_b", Unicode(255), primary_key=True, unique=True),
Column("group", Unicode(255), primary_key=True),
Column("col", Unicode(255)),
- UniqueConstraint('col','group'),
+ UniqueConstraint('col', 'group'),
)
# "group" is a keyword, so lower case
normalind = Index('tableind', table.c.id_b, table.c.group)
- # create
metadata.create_all()
- try:
- # round trip, create from reflection
- mirror = MetaData(engine)
- mirror.reflect()
- metadata.drop_all()
- mirror.create_all()
-
- # inspect the reflected creation
- inspect = MetaData(engine)
- inspect.reflect()
-
- def obj_definition(obj):
- return obj.__class__, tuple([c.name for c in
- obj.columns]), getattr(obj, 'unique', None)
-
- # find what the primary k constraint name should be
- primaryconsname = engine.execute(
- text("""SELECT constraint_name
- FROM all_constraints
- WHERE table_name = :table_name
- AND owner = :owner
- AND constraint_type = 'P' """),
- table_name=table.name.upper(),
- owner=engine.url.username.upper()).fetchall()[0][0]
-
- reflectedtable = inspect.tables[table.name]
-
- # make a dictionary of the reflected objects:
-
- reflected = dict([(obj_definition(i), i) for i in
- reflectedtable.indexes
- | reflectedtable.constraints])
-
- # assert we got primary key constraint and its name, Error
- # if not in dict
-
- assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b',
- 'group'), None)].name.upper() \
- == primaryconsname.upper()
-
- # Error if not in dict
-
- assert reflected[(Index, ('id_b', 'group'), False)].name \
- == normalind.name
- assert (Index, ('id_b', ), True) in reflected
- assert (Index, ('col', 'group'), True) in reflected
- assert len(reflectedtable.constraints) == 1
- assert len(reflectedtable.indexes) == 3
+ mirror = MetaData(testing.db)
+ mirror.reflect()
+ metadata.drop_all()
+ mirror.create_all()
- finally:
- metadata.drop_all()
+ inspect = MetaData(testing.db)
+ inspect.reflect()
+ def obj_definition(obj):
+ return obj.__class__, tuple([c.name for c in
+ obj.columns]), getattr(obj, 'unique', None)
+ # find what the primary k constraint name should be
+ primaryconsname = testing.db.execute(
+ text("""SELECT constraint_name
+ FROM all_constraints
+ WHERE table_name = :table_name
+ AND owner = :owner
+ AND constraint_type = 'P' """),
+ table_name=table.name.upper(),
+ owner=testing.db.url.username.upper()).fetchall()[0][0]
+
+ reflectedtable = inspect.tables[table.name]
+
+ # make a dictionary of the reflected objects:
+
+ reflected = dict([(obj_definition(i), i) for i in
+ reflectedtable.indexes
+ | reflectedtable.constraints])
+
+ # assert we got primary key constraint and its name, Error
+ # if not in dict
+
+ assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b',
+ 'group'), None)].name.upper() \
+ == primaryconsname.upper()
+
+ # Error if not in dict
+
+ eq_(
+ reflected[(Index, ('id_b', 'group'), False)].name,
+ normalind.name
+ )
+ assert (Index, ('id_b', ), True) in reflected
+ assert (Index, ('col', 'group'), True) in reflected
+ eq_(len(reflectedtable.constraints), 1)
+ eq_(len(reflectedtable.indexes), 3)
class SequenceTest(fixtures.TestBase, AssertsCompiledSQL):
@@ -1650,11 +1753,11 @@ class ExecuteTest(fixtures.TestBase):
metadata.create_all()
t.insert().execute(
- {'id':1, 'data':1},
- {'id':2, 'data':7},
- {'id':3, 'data':12},
- {'id':4, 'data':15},
- {'id':5, 'data':32},
+ {'id': 1, 'data': 1},
+ {'id': 2, 'data': 7},
+ {'id': 3, 'data': 12},
+ {'id': 4, 'data': 15},
+ {'id': 5, 'data': 32},
)
# here, we can't use ORDER BY.
@@ -1679,7 +1782,7 @@ class UnicodeSchemaTest(fixtures.TestBase):
@testing.provide_metadata
def test_quoted_column_non_unicode(self):
metadata = self.metadata
- table=Table("atable", metadata,
+ table = Table("atable", metadata,
Column("_underscorecolumn", Unicode(255), primary_key=True),
)
metadata.create_all()
@@ -1688,14 +1791,14 @@ class UnicodeSchemaTest(fixtures.TestBase):
{'_underscorecolumn': u('’é')},
)
result = testing.db.execute(
- table.select().where(table.c._underscorecolumn==u('’é'))
+ table.select().where(table.c._underscorecolumn == u('’é'))
).scalar()
eq_(result, u('’é'))
@testing.provide_metadata
def test_quoted_column_unicode(self):
metadata = self.metadata
- table=Table("atable", metadata,
+ table = Table("atable", metadata,
Column(u("méil"), Unicode(255), primary_key=True),
)
metadata.create_all()