diff options
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r-- | test/engine/test_execute.py | 146 |
1 files changed, 90 insertions, 56 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 1d2aebf97..d3bd3c2cd 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -1,4 +1,4 @@ - +# coding: utf-8 from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \ config, is_ @@ -17,9 +17,9 @@ from sqlalchemy.testing.engines import testing_engine import logging.handlers from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam from sqlalchemy.engine import result as _result, default -from sqlalchemy.engine.base import Connection, Engine +from sqlalchemy.engine.base import Engine from sqlalchemy.testing import fixtures -from sqlalchemy.testing.mock import Mock, call +from sqlalchemy.testing.mock import Mock, call, patch users, metadata, users_autoinc = None, None, None @@ -29,11 +29,11 @@ class ExecuteTest(fixtures.TestBase): global users, users_autoinc, metadata metadata = MetaData(testing.db) users = Table('users', metadata, - Column('user_id', INT, primary_key = True, autoincrement=False), + Column('user_id', INT, primary_key=True, autoincrement=False), Column('user_name', VARCHAR(20)), ) users_autoinc = Table('users_autoinc', metadata, - Column('user_id', INT, primary_key = True, + Column('user_id', INT, primary_key=True, test_needs_autoincrement=True), Column('user_name', VARCHAR(20)), ) @@ -59,10 +59,9 @@ class ExecuteTest(fixtures.TestBase): scalar(stmt) eq_(result, '%') - @testing.fails_on_everything_except('firebird', 'maxdb', + @testing.fails_on_everything_except('firebird', 'sqlite', '+pyodbc', - '+mxodbc', '+zxjdbc', 'mysql+oursql', - 'informix+informixdb') + '+mxodbc', '+zxjdbc', 'mysql+oursql') def test_raw_qmark(self): def go(conn): conn.execute('insert into users (user_id, user_name) ' @@ -182,7 +181,7 @@ class ExecuteTest(fixtures.TestBase): finally: conn.close() - @testing.fails_on_everything_except('sqlite', 'oracle+cx_oracle', 'informix+informixdb') + @testing.fails_on_everything_except('sqlite', 'oracle+cx_oracle') def test_raw_named(self): def go(conn): conn.execute('insert into users (user_id, user_name) ' @@ -204,19 +203,36 @@ class ExecuteTest(fixtures.TestBase): finally: conn.close() + @testing.engines.close_open_connections def test_exception_wrapping_dbapi(self): - def go(conn): + conn = testing.db.connect() + for _c in testing.db, conn: assert_raises_message( tsa.exc.DBAPIError, r"not_a_valid_statement", - conn.execute, 'not_a_valid_statement' + _c.execute, 'not_a_valid_statement' ) - go(testing.db) - conn = testing.db.connect() - try: - go(conn) - finally: - conn.close() + + @testing.requires.sqlite + def test_exception_wrapping_non_dbapi_error(self): + e = create_engine('sqlite://') + e.dialect.is_disconnect = is_disconnect = Mock() + + with e.connect() as c: + c.connection.cursor = Mock( + return_value=Mock( + execute=Mock( + side_effect=TypeError("I'm not a DBAPI error") + )) + ) + + assert_raises_message( + TypeError, + "I'm not a DBAPI error", + c.execute, "select " + ) + eq_(is_disconnect.call_count, 0) + def test_exception_wrapping_non_dbapi_statement(self): class MyType(TypeDecorator): @@ -227,7 +243,7 @@ class ExecuteTest(fixtures.TestBase): def _go(conn): assert_raises_message( tsa.exc.StatementError, - r"nope \(original cause: Exception: nope\) 'SELECT 1 ", + r"nope \(original cause: Exception: nope\) u?'SELECT 1 ", conn.execute, select([1]).\ where( @@ -241,6 +257,25 @@ class ExecuteTest(fixtures.TestBase): finally: conn.close() + def test_stmt_exception_non_ascii(self): + name = util.u('méil') + with testing.db.connect() as conn: + assert_raises_message( + tsa.exc.StatementError, + util.u( + "A value is required for bind parameter 'uname'" + r'.*SELECT users.user_name AS .m\\xe9il.') if util.py2k + else + util.u( + "A value is required for bind parameter 'uname'" + '.*SELECT users.user_name AS .méil.') + , + conn.execute, + select([users.c.user_name.label(name)]).where( + users.c.user_name == bindparam("uname")), + {'uname_incorrect': 'foo'} + ) + def test_stmt_exception_pickleable_no_dbapi(self): self._test_stmt_exception_pickleable(Exception("hello world")) @@ -326,17 +361,17 @@ class ExecuteTest(fixtures.TestBase): def test_engine_level_options(self): eng = engines.testing_engine(options={'execution_options': {'foo': 'bar'}}) - conn = eng.contextual_connect() - eq_(conn._execution_options['foo'], 'bar') - eq_(conn.execution_options(bat='hoho')._execution_options['foo' - ], 'bar') - eq_(conn.execution_options(bat='hoho')._execution_options['bat' - ], 'hoho') - eq_(conn.execution_options(foo='hoho')._execution_options['foo' - ], 'hoho') - eng.update_execution_options(foo='hoho') - conn = eng.contextual_connect() - eq_(conn._execution_options['foo'], 'hoho') + with eng.contextual_connect() as conn: + eq_(conn._execution_options['foo'], 'bar') + eq_(conn.execution_options(bat='hoho')._execution_options['foo' + ], 'bar') + eq_(conn.execution_options(bat='hoho')._execution_options['bat' + ], 'hoho') + eq_(conn.execution_options(foo='hoho')._execution_options['foo' + ], 'hoho') + eng.update_execution_options(foo='hoho') + conn = eng.contextual_connect() + eq_(conn._execution_options['foo'], 'hoho') @testing.requires.ad_hoc_engines def test_generative_engine_execution_options(self): @@ -383,8 +418,8 @@ class ExecuteTest(fixtures.TestBase): event.listen(eng, "before_execute", l2) event.listen(eng1, "before_execute", l3) - eng.execute(select([1])) - eng1.execute(select([1])) + eng.execute(select([1])).close() + eng1.execute(select([1])).close() eq_(canary, ["l1", "l2", "l3", "l1", "l2"]) @@ -892,45 +927,44 @@ class ResultProxyTest(fixtures.TestBase): def test_no_rowcount_on_selects_inserts(self): """assert that rowcount is only called on deletes and updates. - This because cursor.rowcount can be expensive on some dialects - such as Firebird. + This because cursor.rowcount may can be expensive on some dialects + such as Firebird, however many dialects require it be called + before the cursor is closed. """ metadata = self.metadata engine = engines.testing_engine() - metadata.bind = engine t = Table('t1', metadata, Column('data', String(10)) ) - metadata.create_all() + metadata.create_all(engine) - class BreakRowcountMixin(object): - @property - def rowcount(self): - assert False + with patch.object(engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount: + mock_rowcount.__get__ = Mock() + engine.execute(t.insert(), + {'data': 'd1'}, + {'data': 'd2'}, + {'data': 'd3'}) - execution_ctx_cls = engine.dialect.execution_ctx_cls - engine.dialect.execution_ctx_cls = type("FakeCtx", - (BreakRowcountMixin, - execution_ctx_cls), - {}) + eq_(len(mock_rowcount.__get__.mock_calls), 0) - try: - r = t.insert().execute({'data': 'd1'}, {'data': 'd2'}, - {'data': 'd3'}) - eq_(t.select().execute().fetchall(), [('d1', ), ('d2', ), - ('d3', )]) - assert_raises(AssertionError, t.update().execute, {'data' - : 'd4'}) - assert_raises(AssertionError, t.delete().execute) - finally: - engine.dialect.execution_ctx_cls = execution_ctx_cls + eq_( + engine.execute(t.select()).fetchall(), + [('d1', ), ('d2', ), ('d3', )] + ) + eq_(len(mock_rowcount.__get__.mock_calls), 0) + + engine.execute(t.update(), {'data': 'd4'}) + + eq_(len(mock_rowcount.__get__.mock_calls), 1) + + engine.execute(t.delete()) + eq_(len(mock_rowcount.__get__.mock_calls), 2) - @testing.requires.python26 def test_rowproxy_is_sequence(self): import collections from sqlalchemy.engine import RowProxy @@ -1016,7 +1050,7 @@ class ResultProxyTest(fixtures.TestBase): class ExecutionOptionsTest(fixtures.TestBase): def test_dialect_conn_options(self): - engine = testing_engine("sqlite://") + engine = testing_engine("sqlite://", options=dict(_initialize=False)) engine.dialect = Mock() conn = engine.connect() c2 = conn.execution_options(foo="bar") |