summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Streeper <eric.streeper@gmail.com>2015-03-20 00:32:05 -0700
committerEric Streeper <eric.streeper@gmail.com>2015-03-20 00:32:05 -0700
commit0f0e305d25b6da1d42259e53ebd48712dfae5f40 (patch)
tree6781c0be7b849a4fc3aa1eebf1fa2f7ba00168f9
parent45876612d2342250b73a9d3d9afd2bd6008db4c6 (diff)
downloadsqlalchemy-pr/163.tar.gz
PEP8 cleanup in /test/enginepr/163
-rw-r--r--test/engine/test_bind.py7
-rw-r--r--test/engine/test_ddlevents.py45
-rw-r--r--test/engine/test_pool.py76
-rw-r--r--test/engine/test_processors.py15
-rw-r--r--test/engine/test_reflection.py88
-rw-r--r--test/engine/test_transaction.py8
6 files changed, 160 insertions, 79 deletions
diff --git a/test/engine/test_bind.py b/test/engine/test_bind.py
index 8f6c547f1..69ab721c1 100644
--- a/test/engine/test_bind.py
+++ b/test/engine/test_bind.py
@@ -11,6 +11,7 @@ import sqlalchemy as sa
from sqlalchemy import testing
from sqlalchemy.testing import fixtures
+
class BindTest(fixtures.TestBase):
def test_bind_close_engine(self):
e = testing.db
@@ -76,7 +77,8 @@ class BindTest(fixtures.TestBase):
]:
assert_raises_message(
exc.UnboundExecutionError,
- "Table object 'test_table' is not bound to an Engine or Connection.",
+ ("Table object 'test_table' is not bound to an Engine or "
+ "Connection."),
meth
)
@@ -163,7 +165,6 @@ class BindTest(fixtures.TestBase):
finally:
metadata.drop_all(bind=conn)
-
def test_clauseelement(self):
metadata = MetaData()
table = Table('test_table', metadata,
@@ -198,5 +199,3 @@ class BindTest(fixtures.TestBase):
if isinstance(bind, engine.Connection):
bind.close()
metadata.drop_all(bind=testing.db)
-
-
diff --git a/test/engine/test_ddlevents.py b/test/engine/test_ddlevents.py
index 0d828b340..18300179c 100644
--- a/test/engine/test_ddlevents.py
+++ b/test/engine/test_ddlevents.py
@@ -266,16 +266,16 @@ class DDLExecutionTest(fixtures.TestBase):
metadata, users, engine = self.metadata, self.users, self.engine
canary = []
users.append_ddl_listener('before-create',
- lambda e, t, b:canary.append('mxyzptlk')
+ lambda e, t, b: canary.append('mxyzptlk')
)
users.append_ddl_listener('after-create',
- lambda e, t, b:canary.append('klptzyxm')
+ lambda e, t, b: canary.append('klptzyxm')
)
users.append_ddl_listener('before-drop',
- lambda e, t, b:canary.append('xyzzy')
+ lambda e, t, b: canary.append('xyzzy')
)
users.append_ddl_listener('after-drop',
- lambda e, t, b:canary.append('fnord')
+ lambda e, t, b: canary.append('fnord')
)
metadata.create_all()
@@ -295,16 +295,16 @@ class DDLExecutionTest(fixtures.TestBase):
metadata, users, engine = self.metadata, self.users, self.engine
canary = []
metadata.append_ddl_listener('before-create',
- lambda e, t, b, tables=None:canary.append('mxyzptlk')
+ lambda e, t, b, tables=None: canary.append('mxyzptlk')
)
metadata.append_ddl_listener('after-create',
- lambda e, t, b, tables=None:canary.append('klptzyxm')
+ lambda e, t, b, tables=None: canary.append('klptzyxm')
)
metadata.append_ddl_listener('before-drop',
- lambda e, t, b, tables=None:canary.append('xyzzy')
+ lambda e, t, b, tables=None: canary.append('xyzzy')
)
metadata.append_ddl_listener('after-drop',
- lambda e, t, b, tables=None:canary.append('fnord')
+ lambda e, t, b, tables=None: canary.append('fnord')
)
metadata.create_all()
@@ -369,8 +369,8 @@ class DDLExecutionTest(fixtures.TestBase):
metadata, users, engine = self.metadata, self.users, self.engine
nonpg_mock = engines.mock_engine(dialect_name='sqlite')
pg_mock = engines.mock_engine(dialect_name='postgresql')
- constraint = CheckConstraint('a < b', name='my_test_constraint'
- , table=users)
+ constraint = CheckConstraint('a < b', name='my_test_constraint',
+ table=users)
# by placing the constraint in an Add/Drop construct, the
# 'inline_ddl' flag is set to False
@@ -405,8 +405,8 @@ class DDLExecutionTest(fixtures.TestBase):
metadata, users, engine = self.metadata, self.users, self.engine
nonpg_mock = engines.mock_engine(dialect_name='sqlite')
pg_mock = engines.mock_engine(dialect_name='postgresql')
- constraint = CheckConstraint('a < b', name='my_test_constraint'
- , table=users)
+ constraint = CheckConstraint('a < b', name='my_test_constraint',
+ table=users)
# by placing the constraint in an Add/Drop construct, the
# 'inline_ddl' flag is set to False
@@ -489,8 +489,6 @@ class DDLExecutionTest(fixtures.TestBase):
)
-
-
class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
def mock_engine(self):
executor = lambda *a, **kw: None
@@ -527,12 +525,11 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
dialect=dialect)
self.assert_compile(ddl.against(sane_schema), 'S S-T T-s.t-b',
dialect=dialect)
- self.assert_compile(ddl.against(insane_alone), 'S S-T T-"t t"-b'
- , dialect=dialect)
+ self.assert_compile(ddl.against(insane_alone), 'S S-T T-"t t"-b',
+ dialect=dialect)
self.assert_compile(ddl.against(insane_schema),
'S S-T T-"s s"."t t"-b', dialect=dialect)
-
def test_filter(self):
cx = self.mock_engine()
@@ -543,10 +540,10 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
assert DDL('').execute_if(dialect=target)._should_execute(tbl, cx)
assert not DDL('').execute_if(dialect='bogus').\
_should_execute(tbl, cx)
- assert DDL('').execute_if(callable_=lambda d, y,z, **kw: True).\
+ assert DDL('').execute_if(callable_=lambda d, y, z, **kw: True).\
_should_execute(tbl, cx)
assert(DDL('').execute_if(
- callable_=lambda d, y,z, **kw: z.engine.name
+ callable_=lambda d, y, z, **kw: z.engine.name
!= 'bogus').
_should_execute(tbl, cx))
@@ -561,16 +558,14 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
assert DDL('', on=target)._should_execute_deprecated('x', tbl, cx)
assert not DDL('', on='bogus').\
_should_execute_deprecated('x', tbl, cx)
- assert DDL('', on=lambda d, x,y,z: True).\
+ assert DDL('', on=lambda d, x, y, z: True).\
_should_execute_deprecated('x', tbl, cx)
- assert(DDL('', on=lambda d, x,y,z: z.engine.name != 'bogus').
+ assert(DDL('', on=lambda d, x, y, z: z.engine.name != 'bogus').
_should_execute_deprecated('x', tbl, cx))
def test_repr(self):
assert repr(DDL('s'))
assert repr(DDL('s', on='engine'))
assert repr(DDL('s', on=lambda x: 1))
- assert repr(DDL('s', context={'a':1}))
- assert repr(DDL('s', on='engine', context={'a':1}))
-
-
+ assert repr(DDL('s', context={'a': 1}))
+ assert repr(DDL('s', on='engine', context={'a': 1}))
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index 0c4557d49..196e4e760 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -49,6 +49,7 @@ class PoolTestBase(fixtures.TestBase):
return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
**kw)
+
class PoolTest(PoolTestBase):
def test_manager(self):
manager = pool.manage(MockDBAPI(), use_threadlocal=True)
@@ -86,7 +87,6 @@ class PoolTest(PoolTestBase):
]
)
-
def test_bad_args(self):
manager = pool.manage(MockDBAPI())
manager.connect(None)
@@ -218,6 +218,7 @@ class PoolTest(PoolTestBase):
class PoolDialectTest(PoolTestBase):
def _dialect(self):
canary = []
+
class PoolDialect(object):
def do_rollback(self, dbapi_connection):
canary.append('R')
@@ -266,6 +267,7 @@ class PoolEventsTest(PoolTestBase):
def _first_connect_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def first_connect(*arg, **kw):
canary.append('first_connect')
@@ -276,8 +278,10 @@ class PoolEventsTest(PoolTestBase):
def _connect_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def connect(*arg, **kw):
canary.append('connect')
+
event.listen(p, 'connect', connect)
return p, canary
@@ -285,6 +289,7 @@ class PoolEventsTest(PoolTestBase):
def _checkout_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def checkout(*arg, **kw):
canary.append('checkout')
event.listen(p, 'checkout', checkout)
@@ -294,6 +299,7 @@ class PoolEventsTest(PoolTestBase):
def _checkin_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def checkin(*arg, **kw):
canary.append('checkin')
event.listen(p, 'checkin', checkin)
@@ -303,6 +309,7 @@ class PoolEventsTest(PoolTestBase):
def _reset_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def reset(*arg, **kw):
canary.append('reset')
event.listen(p, 'reset', reset)
@@ -470,12 +477,16 @@ class PoolEventsTest(PoolTestBase):
def test_listen_targets_scope(self):
canary = []
+
def listen_one(*args):
canary.append("listen_one")
+
def listen_two(*args):
canary.append("listen_two")
+
def listen_three(*args):
canary.append("listen_three")
+
def listen_four(*args):
canary.append("listen_four")
@@ -492,13 +503,17 @@ class PoolEventsTest(PoolTestBase):
)
def test_listen_targets_per_subclass(self):
- """test that listen() called on a subclass remains specific to that subclass."""
+ """test that listen() called on a subclass remains specific to
+ that subclass."""
canary = []
+
def listen_one(*args):
canary.append("listen_one")
+
def listen_two(*args):
canary.append("listen_two")
+
def listen_three(*args):
canary.append("listen_three")
@@ -526,6 +541,7 @@ class PoolEventsTest(PoolTestBase):
# going
pool.Pool.dispatch._clear()
+
class PoolFirstConnectSyncTest(PoolTestBase):
# test [ticket:2964]
@@ -560,11 +576,14 @@ class PoolFirstConnectSyncTest(PoolTestBase):
th.join(join_timeout)
eq_(evt.mock_calls,
- [call.first_connect(), call.connect(), call.connect(), call.connect()]
+ [
+ call.first_connect(),
+ call.connect(),
+ call.connect(),
+ call.connect()]
)
-
class DeprecatedPoolListenerTest(PoolTestBase):
@testing.requires.predictable_gc
@testing.uses_deprecated(r".*Use event.listen")
@@ -580,38 +599,45 @@ class DeprecatedPoolListenerTest(PoolTestBase):
if hasattr(self, 'checkin'):
self.checkin = self.inst_checkin
self.clear()
+
def clear(self):
self.connected = []
self.first_connected = []
self.checked_out = []
self.checked_in = []
+
def assert_total(innerself, conn, fconn, cout, cin):
eq_(len(innerself.connected), conn)
eq_(len(innerself.first_connected), fconn)
eq_(len(innerself.checked_out), cout)
eq_(len(innerself.checked_in), cin)
+
def assert_in(innerself, item, in_conn, in_fconn,
in_cout, in_cin):
self.assert_((item in innerself.connected) == in_conn)
self.assert_((item in innerself.first_connected) == in_fconn)
self.assert_((item in innerself.checked_out) == in_cout)
self.assert_((item in innerself.checked_in) == in_cin)
+
def inst_connect(self, con, record):
print("connect(%s, %s)" % (con, record))
assert con is not None
assert record is not None
self.connected.append(con)
+
def inst_first_connect(self, con, record):
print("first_connect(%s, %s)" % (con, record))
assert con is not None
assert record is not None
self.first_connected.append(con)
+
def inst_checkout(self, con, record, proxy):
print("checkout(%s, %s, %s)" % (con, record, proxy))
assert con is not None
assert record is not None
assert proxy is not None
self.checked_out.append(con)
+
def inst_checkin(self, con, record):
print("checkin(%s, %s)" % (con, record))
# con can be None if invalidated
@@ -620,15 +646,19 @@ class DeprecatedPoolListenerTest(PoolTestBase):
class ListenAll(tsa.interfaces.PoolListener, InstrumentingListener):
pass
+
class ListenConnect(InstrumentingListener):
def connect(self, con, record):
pass
+
class ListenFirstConnect(InstrumentingListener):
def first_connect(self, con, record):
pass
+
class ListenCheckOut(InstrumentingListener):
def checkout(self, con, record, proxy, num):
pass
+
class ListenCheckIn(InstrumentingListener):
def checkin(self, con, record):
pass
@@ -746,8 +776,10 @@ class DeprecatedPoolListenerTest(PoolTestBase):
def test_listeners_callables(self):
def connect(dbapi_con, con_record):
counts[0] += 1
+
def checkout(dbapi_con, con_record, con_proxy):
counts[1] += 1
+
def checkin(dbapi_con, con_record):
counts[2] += 1
@@ -884,6 +916,7 @@ class QueuePoolTest(PoolTestBase):
pool_size=2,
max_overflow=1, use_threadlocal=False, timeout=3)
timeouts = []
+
def checkout():
for x in range(1):
now = time.time()
@@ -915,6 +948,7 @@ class QueuePoolTest(PoolTestBase):
dbapi = MockDBAPI()
mutex = threading.Lock()
+
def creator():
time.sleep(.05)
with mutex:
@@ -924,6 +958,7 @@ class QueuePoolTest(PoolTestBase):
pool_size=3, timeout=2,
max_overflow=max_overflow)
peaks = []
+
def whammy():
for i in range(10):
try:
@@ -947,7 +982,6 @@ class QueuePoolTest(PoolTestBase):
lazy_gc()
assert not pool._refs
-
def test_overflow_reset_on_failed_connect(self):
dbapi = Mock()
@@ -956,6 +990,7 @@ class QueuePoolTest(PoolTestBase):
raise Exception("connection failed")
creator = dbapi.connect
+
def create():
return creator()
@@ -1029,7 +1064,6 @@ class QueuePoolTest(PoolTestBase):
call("overflow_one")]
)
-
@testing.requires.threading_with_mock
@testing.requires.timing_intensive
def test_waiters_handled(self):
@@ -1039,6 +1073,7 @@ class QueuePoolTest(PoolTestBase):
"""
mutex = threading.Lock()
dbapi = MockDBAPI()
+
def creator():
mutex.acquire()
try:
@@ -1052,6 +1087,7 @@ class QueuePoolTest(PoolTestBase):
p = pool.QueuePool(creator=creator,
pool_size=2, timeout=timeout,
max_overflow=max_overflow)
+
def waiter(p, timeout, max_overflow):
success_key = (timeout, max_overflow)
conn = p.connect()
@@ -1088,12 +1124,14 @@ class QueuePoolTest(PoolTestBase):
dbapi = MockDBAPI()
canary = []
+
def creator():
canary.append(1)
return dbapi.connect()
p1 = pool.QueuePool(creator=creator,
pool_size=1, timeout=None,
max_overflow=0)
+
def waiter(p):
conn = p.connect()
canary.append(2)
@@ -1165,7 +1203,8 @@ class QueuePoolTest(PoolTestBase):
def test_mixed_close(self):
pool._refs.clear()
- p = self._queuepool_fixture(pool_size=3, max_overflow=-1, use_threadlocal=True)
+ p = self._queuepool_fixture(pool_size=3, max_overflow=-1,
+ use_threadlocal=True)
c1 = p.connect()
c2 = p.connect()
assert c1 is c2
@@ -1191,6 +1230,7 @@ class QueuePoolTest(PoolTestBase):
# disable weakref collection of the
# underlying connections
strong_refs = set()
+
def _conn():
c = p.connect()
strong_refs.add(c.connection)
@@ -1334,6 +1374,7 @@ class QueuePoolTest(PoolTestBase):
dialect.dbapi.Error = Error
pools = []
+
class TrackQueuePool(pool.QueuePool):
def __init__(self, *arg, **kw):
pools.append(self)
@@ -1357,11 +1398,13 @@ class QueuePoolTest(PoolTestBase):
def attempt(conn):
time.sleep(random.random())
try:
- conn._handle_dbapi_exception(Error(), "statement", {}, Mock(), Mock())
+ conn._handle_dbapi_exception(Error(), "statement", {},
+ Mock(), Mock())
except tsa.exc.DBAPIError:
pass
- # run an error + invalidate operation on the remaining 7 open connections
+ # run an error + invalidate operation on the remaining 7 open
+ #connections
threads = []
for conn in conns:
t = threading.Thread(target=attempt, args=(conn, ))
@@ -1399,7 +1442,8 @@ class QueuePoolTest(PoolTestBase):
assert c1.connection.id != c_id
def test_recreate(self):
- p = self._queuepool_fixture(reset_on_return=None, pool_size=1, max_overflow=0)
+ p = self._queuepool_fixture(reset_on_return=None, pool_size=1,
+ max_overflow=0)
p2 = p.recreate()
assert p2.size() == 1
assert p2._reset_on_return is pool.reset_none
@@ -1454,16 +1498,19 @@ class QueuePoolTest(PoolTestBase):
eq_(c2_con.close.call_count, 0)
def test_threadfairy(self):
- p = self._queuepool_fixture(pool_size=3, max_overflow=-1, use_threadlocal=True)
+ p = self._queuepool_fixture(pool_size=3, max_overflow=-1,
+ use_threadlocal=True)
c1 = p.connect()
c1.close()
c2 = p.connect()
assert c2.connection is not None
+
class ResetOnReturnTest(PoolTestBase):
def _fixture(self, **kw):
dbapi = Mock()
- return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw)
+ return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
+ **kw)
def test_plain_rollback(self):
dbapi, p = self._fixture(reset_on_return='rollback')
@@ -1550,6 +1597,7 @@ class ResetOnReturnTest(PoolTestBase):
assert not dbapi.connect().rollback.called
assert dbapi.connect().commit.called
+
class SingletonThreadPoolTest(PoolTestBase):
@testing.requires.threading_with_mock
@@ -1567,6 +1615,7 @@ class SingletonThreadPoolTest(PoolTestBase):
dbapi = MockDBAPI()
lock = threading.Lock()
+
def creator():
# the mock iterator isn't threadsafe...
with lock:
@@ -1575,6 +1624,7 @@ class SingletonThreadPoolTest(PoolTestBase):
if strong_refs:
sr = set()
+
def _conn():
c = p.connect()
sr.add(c.connection)
@@ -1604,6 +1654,7 @@ class SingletonThreadPoolTest(PoolTestBase):
still_opened = len([c for c in sr if not c.close.call_count])
eq_(still_opened, 3)
+
class AssertionPoolTest(PoolTestBase):
def test_connect_error(self):
dbapi = MockDBAPI()
@@ -1622,6 +1673,7 @@ class AssertionPoolTest(PoolTestBase):
c3 = p.connect()
assert_raises(AssertionError, p.connect)
+
class NullPoolTest(PoolTestBase):
def test_reconnect(self):
dbapi = MockDBAPI()
diff --git a/test/engine/test_processors.py b/test/engine/test_processors.py
index b1c482f09..f4df7827c 100644
--- a/test/engine/test_processors.py
+++ b/test/engine/test_processors.py
@@ -57,8 +57,10 @@ class PyDateProcessorTest(_DateProcessorTest):
)
)
+
class CDateProcessorTest(_DateProcessorTest):
__requires__ = ('cextensions',)
+
@classmethod
def setup_class(cls):
from sqlalchemy import cprocessors
@@ -104,7 +106,8 @@ class _DistillArgsTest(fixtures.TestBase):
def test_distill_single_list_tuples(self):
eq_(
- self.module._distill_params(([("foo", "bar"), ("bat", "hoho")],), {}),
+ self.module._distill_params(
+ ([("foo", "bar"), ("bat", "hoho")],), {}),
[('foo', 'bar'), ('bat', 'hoho')]
)
@@ -117,9 +120,7 @@ class _DistillArgsTest(fixtures.TestBase):
def test_distill_multi_list_tuple(self):
eq_(
self.module._distill_params(
- ([("foo", "bar")], [("bar", "bat")]),
- {}
- ),
+ ([("foo", "bar")], [("bar", "bat")]), {}),
([('foo', 'bar')], [('bar', 'bat')])
)
@@ -131,7 +132,8 @@ class _DistillArgsTest(fixtures.TestBase):
def test_distill_single_list_dicts(self):
eq_(
- self.module._distill_params(([{"foo": "bar"}, {"foo": "hoho"}],), {}),
+ self.module._distill_params(
+ ([{"foo": "bar"}, {"foo": "hoho"}],), {}),
[{'foo': 'bar'}, {'foo': 'hoho'}]
)
@@ -148,7 +150,6 @@ class _DistillArgsTest(fixtures.TestBase):
)
-
class PyDistillArgsTest(_DistillArgsTest):
@classmethod
def setup_class(cls):
@@ -160,8 +161,10 @@ class PyDistillArgsTest(_DistillArgsTest):
)
)
+
class CDistillArgsTest(_DistillArgsTest):
__requires__ = ('cextensions', )
+
@classmethod
def setup_class(cls):
from sqlalchemy import cutils as util
diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py
index 087610333..83650609d 100644
--- a/test/engine/test_reflection.py
+++ b/test/engine/test_reflection.py
@@ -14,6 +14,7 @@ from sqlalchemy.util import ue
metadata, users = None, None
+
class ReflectionTest(fixtures.TestBase, ComparesTables):
__backend__ = True
@@ -253,7 +254,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
"""
Table('a', self.metadata, Column('id', Integer, primary_key=True))
Table('b', self.metadata, Column('id', Integer, primary_key=True),
- Column('a_id', Integer, sa.ForeignKey('a.id')))
+ Column('a_id', Integer, sa.ForeignKey('a.id')))
self.metadata.create_all()
m2 = MetaData()
@@ -275,7 +276,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
"""
Table('a', self.metadata, Column('id', Integer, primary_key=True))
Table('b', self.metadata, Column('id', Integer, primary_key=True),
- Column('a_id', Integer, sa.ForeignKey('a.id')))
+ Column('a_id', Integer, sa.ForeignKey('a.id')))
self.metadata.create_all()
m2 = MetaData()
@@ -404,7 +405,6 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
eq_(list(table.primary_key), [table.c.col1])
eq_(table.c.col1.primary_key, True)
-
@testing.provide_metadata
def test_override_pkfk(self):
"""test that you can override columns which contain foreign keys
@@ -419,7 +419,6 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
Column('id', sa.Integer, primary_key=True),
Column('street', sa.String(30)))
-
meta.create_all()
meta2 = MetaData(testing.db)
a2 = Table('addresses', meta2,
@@ -541,8 +540,6 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
assert f1 in b1.constraints
assert len(b1.constraints) == 2
-
-
@testing.provide_metadata
def test_override_keys(self):
"""test that columns can be overridden with a 'key',
@@ -654,12 +651,13 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
backends with {dialect}.get_foreign_keys() support)"""
if testing.against('postgresql'):
- test_attrs = ('match', 'onupdate', 'ondelete', 'deferrable', 'initially')
+ test_attrs = ('match', 'onupdate', 'ondelete',
+ 'deferrable', 'initially')
addresses_user_id_fkey = sa.ForeignKey(
# Each option is specifically not a Postgres default, or
# it won't be returned by PG's inspection
'users.id',
- name = 'addresses_user_id_fkey',
+ name='addresses_user_id_fkey',
match='FULL',
onupdate='RESTRICT',
ondelete='RESTRICT',
@@ -672,7 +670,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
# elided by MySQL's inspection
addresses_user_id_fkey = sa.ForeignKey(
'users.id',
- name = 'addresses_user_id_fkey',
+ name='addresses_user_id_fkey',
onupdate='CASCADE',
ondelete='CASCADE'
)
@@ -726,11 +724,12 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
Column('slot', sa.String(128)),
)
- assert_raises_message(sa.exc.InvalidRequestError,
- "Foreign key associated with column 'slots.pkg_id' "
- "could not find table 'pkgs' with which to generate "
- "a foreign key to target column 'pkg_id'",
- metadata.create_all)
+ assert_raises_message(
+ sa.exc.InvalidRequestError,
+ "Foreign key associated with column 'slots.pkg_id' "
+ "could not find table 'pkgs' with which to generate "
+ "a foreign key to target column 'pkg_id'",
+ metadata.create_all)
def test_composite_pks(self):
"""test reflection of a composite primary key"""
@@ -797,7 +796,6 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
table.c.multi_hoho
== table2.c.lala).compare(j.onclause))
-
@testing.crashes('oracle', 'FIXME: unknown, confirm not fails_on')
@testing.requires.check_constraints
@testing.provide_metadata
@@ -869,7 +867,6 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
def test_reflect_uses_bind_engine_reflect(self):
self._test_reflect_uses_bind(lambda e: MetaData().reflect(e))
-
@testing.provide_metadata
def test_reflect_all(self):
existing = testing.db.table_names()
@@ -1053,6 +1050,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
finally:
_drop_views(metadata.bind)
+
class CreateDropTest(fixtures.TestBase):
__backend__ = True
@@ -1101,7 +1099,6 @@ class CreateDropTest(fixtures.TestBase):
eq_(ua, ['users', 'email_addresses'])
eq_(oi, ['orders', 'items'])
-
def test_checkfirst(self):
try:
assert not users.exists(testing.db)
@@ -1141,6 +1138,7 @@ class CreateDropTest(fixtures.TestBase):
- set(testing.db.table_names()))
metadata.drop_all(bind=testing.db)
+
class SchemaManipulationTest(fixtures.TestBase):
__backend__ = True
@@ -1159,6 +1157,7 @@ class SchemaManipulationTest(fixtures.TestBase):
assert len(addresses.c.user_id.foreign_keys) == 1
assert addresses.constraints == set([addresses.primary_key, fk])
+
class UnicodeReflectionTest(fixtures.TestBase):
__backend__ = True
@@ -1170,16 +1169,40 @@ class UnicodeReflectionTest(fixtures.TestBase):
('plain', 'col_plain', 'ix_plain')
])
no_has_table = [
- ('no_has_table_1', ue('col_Unit\u00e9ble'), ue('ix_Unit\u00e9ble')),
- ('no_has_table_2', ue('col_\u6e2c\u8a66'), ue('ix_\u6e2c\u8a66')),
+ (
+ 'no_has_table_1',
+ ue('col_Unit\u00e9ble'),
+ ue('ix_Unit\u00e9ble')
+ ),
+ (
+ 'no_has_table_2',
+ ue('col_\u6e2c\u8a66'),
+ ue('ix_\u6e2c\u8a66')
+ ),
]
no_case_sensitivity = [
- (ue('\u6e2c\u8a66'), ue('col_\u6e2c\u8a66'), ue('ix_\u6e2c\u8a66')),
- (ue('unit\u00e9ble'), ue('col_unit\u00e9ble'), ue('ix_unit\u00e9ble')),
+ (
+ ue('\u6e2c\u8a66'),
+ ue('col_\u6e2c\u8a66'),
+ ue('ix_\u6e2c\u8a66')
+ ),
+ (
+ ue('unit\u00e9ble'),
+ ue('col_unit\u00e9ble'),
+ ue('ix_unit\u00e9ble')
+ ),
]
full = [
- (ue('Unit\u00e9ble'), ue('col_Unit\u00e9ble'), ue('ix_Unit\u00e9ble')),
- (ue('\u6e2c\u8a66'), ue('col_\u6e2c\u8a66'), ue('ix_\u6e2c\u8a66')),
+ (
+ ue('Unit\u00e9ble'),
+ ue('col_Unit\u00e9ble'),
+ ue('ix_Unit\u00e9ble')
+ ),
+ (
+ ue('\u6e2c\u8a66'),
+ ue('col_\u6e2c\u8a66'),
+ ue('ix_\u6e2c\u8a66')
+ ),
]
# as you can see, our options for this kind of thing
@@ -1268,6 +1291,7 @@ class UnicodeReflectionTest(fixtures.TestBase):
[(names[tname][1], names[tname][0])]
)
+
class SchemaTest(fixtures.TestBase):
__backend__ = True
@@ -1398,8 +1422,6 @@ class SchemaTest(fixtures.TestBase):
)
-
-
# Tests related to engine.reflection
@@ -1432,7 +1454,8 @@ def createTables(meta, schema=None):
dingalings = Table("dingalings", meta,
Column('dingaling_id', sa.Integer, primary_key=True),
Column('address_id', sa.Integer,
- sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)),
+ sa.ForeignKey(
+ '%semail_addresses.address_id' % schema_prefix)),
Column('data', sa.String(30)),
schema=schema, test_needs_fk=True,
)
@@ -1448,6 +1471,7 @@ def createTables(meta, schema=None):
return (users, addresses, dingalings)
+
def createIndexes(con, schema=None):
fullname = 'users'
if schema:
@@ -1455,6 +1479,7 @@ def createIndexes(con, schema=None):
query = "CREATE INDEX users_t_idx ON %s (test1, test2)" % fullname
con.execute(sa.sql.text(query))
+
@testing.requires.views
def _create_views(con, schema=None):
for table_name in ('users', 'email_addresses'):
@@ -1462,10 +1487,10 @@ def _create_views(con, schema=None):
if schema:
fullname = "%s.%s" % (schema, table_name)
view_name = fullname + '_v'
- query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name,
- fullname)
+ query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name, fullname)
con.execute(sa.sql.text(query))
+
@testing.requires.views
def _drop_views(con, schema=None):
for table_name in ('email_addresses', 'users'):
@@ -1504,6 +1529,7 @@ class ReverseCasingReflectTest(fixtures.TestBase, AssertsCompiledSQL):
'weird_casing."Col2", weird_casing."col3" '
'FROM weird_casing')
+
class CaseSensitiveTest(fixtures.TablesTest):
"""Nail down case sensitive behaviors, mostly on MySQL."""
__backend__ = True
@@ -1539,7 +1565,8 @@ class CaseSensitiveTest(fixtures.TablesTest):
)
def test_reflect_via_fk(self):
m = MetaData()
- t2 = Table("SomeOtherTable", m, autoload=True, autoload_with=testing.db)
+ t2 = Table("SomeOtherTable", m, autoload=True,
+ autoload_with=testing.db)
eq_(t2.name, "SomeOtherTable")
assert "SomeTable" in m.tables
@@ -1551,7 +1578,6 @@ class CaseSensitiveTest(fixtures.TablesTest):
eq_(t2.name, "sOmEtAbLe")
-
class ColumnEventsTest(fixtures.RemovesEvents, fixtures.TestBase):
__backend__ = True
@@ -1584,6 +1610,7 @@ class ColumnEventsTest(fixtures.RemovesEvents, fixtures.TestBase):
from sqlalchemy.schema import Table
m = MetaData(testing.db)
+
def column_reflect(insp, table, column_info):
if column_info['name'] == col:
column_info.update(update)
@@ -1620,6 +1647,7 @@ class ColumnEventsTest(fixtures.RemovesEvents, fixtures.TestBase):
def test_override_key_fk(self):
m = MetaData(testing.db)
+
def column_reflect(insp, table, column_info):
if column_info['name'] == 'q':
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index b662c7fcd..7f8a7c97c 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -12,6 +12,8 @@ from sqlalchemy.testing import fixtures
users, metadata = None, None
+
+
class TransactionTest(fixtures.TestBase):
__backend__ = True
@@ -20,7 +22,7 @@ class TransactionTest(fixtures.TestBase):
global users, metadata
metadata = MetaData()
users = Table('query_users', metadata,
- Column('user_id', INT, primary_key = True),
+ Column('user_id', INT, primary_key=True),
Column('user_name', VARCHAR(20)),
test_needs_acid=True,
)
@@ -497,6 +499,7 @@ class TransactionTest(fixtures.TestBase):
order_by(users.c.user_id))
eq_(result.fetchall(), [])
+
class ResetAgentTest(fixtures.TestBase):
__backend__ = True
@@ -600,6 +603,7 @@ class ResetAgentTest(fixtures.TestBase):
trans.rollback()
assert connection.connection._reset_agent is None
+
class AutoRollbackTest(fixtures.TestBase):
__backend__ = True
@@ -633,6 +637,7 @@ class AutoRollbackTest(fixtures.TestBase):
users.drop(conn2)
conn2.close()
+
class ExplicitAutoCommitTest(fixtures.TestBase):
"""test the 'autocommit' flag on select() and text() objects.
@@ -1440,4 +1445,3 @@ class IsolationLevelTest(fixtures.TestBase):
eq_(conn.get_isolation_level(),
self._non_default_isolation_level())
eq_(c2.get_isolation_level(), self._non_default_isolation_level())
-