summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py146
1 files changed, 90 insertions, 56 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 1d2aebf97..d3bd3c2cd 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -1,4 +1,4 @@
-
+# coding: utf-8
from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \
config, is_
@@ -17,9 +17,9 @@ from sqlalchemy.testing.engines import testing_engine
import logging.handlers
from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam
from sqlalchemy.engine import result as _result, default
-from sqlalchemy.engine.base import Connection, Engine
+from sqlalchemy.engine.base import Engine
from sqlalchemy.testing import fixtures
-from sqlalchemy.testing.mock import Mock, call
+from sqlalchemy.testing.mock import Mock, call, patch
users, metadata, users_autoinc = None, None, None
@@ -29,11 +29,11 @@ class ExecuteTest(fixtures.TestBase):
global users, users_autoinc, metadata
metadata = MetaData(testing.db)
users = Table('users', metadata,
- Column('user_id', INT, primary_key = True, autoincrement=False),
+ Column('user_id', INT, primary_key=True, autoincrement=False),
Column('user_name', VARCHAR(20)),
)
users_autoinc = Table('users_autoinc', metadata,
- Column('user_id', INT, primary_key = True,
+ Column('user_id', INT, primary_key=True,
test_needs_autoincrement=True),
Column('user_name', VARCHAR(20)),
)
@@ -59,10 +59,9 @@ class ExecuteTest(fixtures.TestBase):
scalar(stmt)
eq_(result, '%')
- @testing.fails_on_everything_except('firebird', 'maxdb',
+ @testing.fails_on_everything_except('firebird',
'sqlite', '+pyodbc',
- '+mxodbc', '+zxjdbc', 'mysql+oursql',
- 'informix+informixdb')
+ '+mxodbc', '+zxjdbc', 'mysql+oursql')
def test_raw_qmark(self):
def go(conn):
conn.execute('insert into users (user_id, user_name) '
@@ -182,7 +181,7 @@ class ExecuteTest(fixtures.TestBase):
finally:
conn.close()
- @testing.fails_on_everything_except('sqlite', 'oracle+cx_oracle', 'informix+informixdb')
+ @testing.fails_on_everything_except('sqlite', 'oracle+cx_oracle')
def test_raw_named(self):
def go(conn):
conn.execute('insert into users (user_id, user_name) '
@@ -204,19 +203,36 @@ class ExecuteTest(fixtures.TestBase):
finally:
conn.close()
+ @testing.engines.close_open_connections
def test_exception_wrapping_dbapi(self):
- def go(conn):
+ conn = testing.db.connect()
+ for _c in testing.db, conn:
assert_raises_message(
tsa.exc.DBAPIError,
r"not_a_valid_statement",
- conn.execute, 'not_a_valid_statement'
+ _c.execute, 'not_a_valid_statement'
)
- go(testing.db)
- conn = testing.db.connect()
- try:
- go(conn)
- finally:
- conn.close()
+
+ @testing.requires.sqlite
+ def test_exception_wrapping_non_dbapi_error(self):
+ e = create_engine('sqlite://')
+ e.dialect.is_disconnect = is_disconnect = Mock()
+
+ with e.connect() as c:
+ c.connection.cursor = Mock(
+ return_value=Mock(
+ execute=Mock(
+ side_effect=TypeError("I'm not a DBAPI error")
+ ))
+ )
+
+ assert_raises_message(
+ TypeError,
+ "I'm not a DBAPI error",
+ c.execute, "select "
+ )
+ eq_(is_disconnect.call_count, 0)
+
def test_exception_wrapping_non_dbapi_statement(self):
class MyType(TypeDecorator):
@@ -227,7 +243,7 @@ class ExecuteTest(fixtures.TestBase):
def _go(conn):
assert_raises_message(
tsa.exc.StatementError,
- r"nope \(original cause: Exception: nope\) 'SELECT 1 ",
+ r"nope \(original cause: Exception: nope\) u?'SELECT 1 ",
conn.execute,
select([1]).\
where(
@@ -241,6 +257,25 @@ class ExecuteTest(fixtures.TestBase):
finally:
conn.close()
+ def test_stmt_exception_non_ascii(self):
+ name = util.u('méil')
+ with testing.db.connect() as conn:
+ assert_raises_message(
+ tsa.exc.StatementError,
+ util.u(
+ "A value is required for bind parameter 'uname'"
+ r'.*SELECT users.user_name AS .m\\xe9il.') if util.py2k
+ else
+ util.u(
+ "A value is required for bind parameter 'uname'"
+ '.*SELECT users.user_name AS .méil.')
+ ,
+ conn.execute,
+ select([users.c.user_name.label(name)]).where(
+ users.c.user_name == bindparam("uname")),
+ {'uname_incorrect': 'foo'}
+ )
+
def test_stmt_exception_pickleable_no_dbapi(self):
self._test_stmt_exception_pickleable(Exception("hello world"))
@@ -326,17 +361,17 @@ class ExecuteTest(fixtures.TestBase):
def test_engine_level_options(self):
eng = engines.testing_engine(options={'execution_options':
{'foo': 'bar'}})
- conn = eng.contextual_connect()
- eq_(conn._execution_options['foo'], 'bar')
- eq_(conn.execution_options(bat='hoho')._execution_options['foo'
- ], 'bar')
- eq_(conn.execution_options(bat='hoho')._execution_options['bat'
- ], 'hoho')
- eq_(conn.execution_options(foo='hoho')._execution_options['foo'
- ], 'hoho')
- eng.update_execution_options(foo='hoho')
- conn = eng.contextual_connect()
- eq_(conn._execution_options['foo'], 'hoho')
+ with eng.contextual_connect() as conn:
+ eq_(conn._execution_options['foo'], 'bar')
+ eq_(conn.execution_options(bat='hoho')._execution_options['foo'
+ ], 'bar')
+ eq_(conn.execution_options(bat='hoho')._execution_options['bat'
+ ], 'hoho')
+ eq_(conn.execution_options(foo='hoho')._execution_options['foo'
+ ], 'hoho')
+ eng.update_execution_options(foo='hoho')
+ conn = eng.contextual_connect()
+ eq_(conn._execution_options['foo'], 'hoho')
@testing.requires.ad_hoc_engines
def test_generative_engine_execution_options(self):
@@ -383,8 +418,8 @@ class ExecuteTest(fixtures.TestBase):
event.listen(eng, "before_execute", l2)
event.listen(eng1, "before_execute", l3)
- eng.execute(select([1]))
- eng1.execute(select([1]))
+ eng.execute(select([1])).close()
+ eng1.execute(select([1])).close()
eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
@@ -892,45 +927,44 @@ class ResultProxyTest(fixtures.TestBase):
def test_no_rowcount_on_selects_inserts(self):
"""assert that rowcount is only called on deletes and updates.
- This because cursor.rowcount can be expensive on some dialects
- such as Firebird.
+ This because cursor.rowcount may can be expensive on some dialects
+ such as Firebird, however many dialects require it be called
+ before the cursor is closed.
"""
metadata = self.metadata
engine = engines.testing_engine()
- metadata.bind = engine
t = Table('t1', metadata,
Column('data', String(10))
)
- metadata.create_all()
+ metadata.create_all(engine)
- class BreakRowcountMixin(object):
- @property
- def rowcount(self):
- assert False
+ with patch.object(engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount:
+ mock_rowcount.__get__ = Mock()
+ engine.execute(t.insert(),
+ {'data': 'd1'},
+ {'data': 'd2'},
+ {'data': 'd3'})
- execution_ctx_cls = engine.dialect.execution_ctx_cls
- engine.dialect.execution_ctx_cls = type("FakeCtx",
- (BreakRowcountMixin,
- execution_ctx_cls),
- {})
+ eq_(len(mock_rowcount.__get__.mock_calls), 0)
- try:
- r = t.insert().execute({'data': 'd1'}, {'data': 'd2'},
- {'data': 'd3'})
- eq_(t.select().execute().fetchall(), [('d1', ), ('d2', ),
- ('d3', )])
- assert_raises(AssertionError, t.update().execute, {'data'
- : 'd4'})
- assert_raises(AssertionError, t.delete().execute)
- finally:
- engine.dialect.execution_ctx_cls = execution_ctx_cls
+ eq_(
+ engine.execute(t.select()).fetchall(),
+ [('d1', ), ('d2', ), ('d3', )]
+ )
+ eq_(len(mock_rowcount.__get__.mock_calls), 0)
+
+ engine.execute(t.update(), {'data': 'd4'})
+
+ eq_(len(mock_rowcount.__get__.mock_calls), 1)
+
+ engine.execute(t.delete())
+ eq_(len(mock_rowcount.__get__.mock_calls), 2)
- @testing.requires.python26
def test_rowproxy_is_sequence(self):
import collections
from sqlalchemy.engine import RowProxy
@@ -1016,7 +1050,7 @@ class ResultProxyTest(fixtures.TestBase):
class ExecutionOptionsTest(fixtures.TestBase):
def test_dialect_conn_options(self):
- engine = testing_engine("sqlite://")
+ engine = testing_engine("sqlite://", options=dict(_initialize=False))
engine.dialect = Mock()
conn = engine.connect()
c2 = conn.execution_options(foo="bar")