diff options
Diffstat (limited to 'test/engine/test_execute.py')
| -rw-r--r-- | test/engine/test_execute.py | 296 |
1 files changed, 249 insertions, 47 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index b0256d325..fbb1878dc 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -1,7 +1,7 @@ # coding: utf-8 from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \ - config, is_ + config, is_, is_not_, le_ import re from sqlalchemy.testing.util import picklers from sqlalchemy.interfaces import ConnectionProxy @@ -484,6 +484,32 @@ class ExecuteTest(fixtures.TestBase): eq_(canary, ["l1", "l2", "l3", "l1", "l2"]) @testing.requires.ad_hoc_engines + def test_dispose_event(self): + canary = Mock() + eng = create_engine(testing.db.url) + event.listen(eng, "engine_disposed", canary) + + conn = eng.connect() + conn.close() + eng.dispose() + + + conn = eng.connect() + conn.close() + + eq_( + canary.mock_calls, + [call(eng)] + ) + + eng.dispose() + + eq_( + canary.mock_calls, + [call(eng), call(eng)] + ) + + @testing.requires.ad_hoc_engines def test_autocommit_option_no_issue_first_connect(self): eng = create_engine(testing.db.url) eng.update_execution_options(autocommit=True) @@ -1021,76 +1047,91 @@ class ExecutionOptionsTest(fixtures.TestBase): ) -class AlternateResultProxyTest(fixtures.TestBase): +class AlternateResultProxyTest(fixtures.TablesTest): __requires__ = ('sqlite', ) @classmethod - def setup_class(cls): + def setup_bind(cls): cls.engine = engine = testing_engine('sqlite://') - m = MetaData() - cls.table = t = Table('test', m, - Column('x', Integer, primary_key=True), - Column('y', String(50, convert_unicode='force')) - ) - m.create_all(engine) - engine.execute(t.insert(), [ + return engine + + @classmethod + def define_tables(cls, metadata): + Table( + 'test', metadata, + Column('x', Integer, primary_key=True), + Column('y', String(50, convert_unicode='force')) + ) + + @classmethod + def insert_data(cls): + cls.engine.execute(cls.tables.test.insert(), [ {'x': i, 'y': "t_%d" % i} for i in range(1, 12) ]) - def _test_proxy(self, cls): + @contextmanager + def _proxy_fixture(self, cls): + self.table = self.tables.test + class ExcCtx(default.DefaultExecutionContext): def get_result_proxy(self): return cls(self) - self.engine.dialect.execution_ctx_cls = ExcCtx - rows = [] - r = self.engine.execute(select([self.table])) - assert isinstance(r, cls) - for i in range(5): - rows.append(r.fetchone()) - eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) + self.patcher = patch.object( + self.engine.dialect, "execution_ctx_cls", ExcCtx) + with self.patcher: + yield - rows = r.fetchmany(3) - eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)]) + def _test_proxy(self, cls): + with self._proxy_fixture(cls): + rows = [] + r = self.engine.execute(select([self.table])) + assert isinstance(r, cls) + for i in range(5): + rows.append(r.fetchone()) + eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) + + rows = r.fetchmany(3) + eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)]) - rows = r.fetchall() - eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)]) + rows = r.fetchall() + eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)]) - r = self.engine.execute(select([self.table])) - rows = r.fetchmany(None) - eq_(rows[0], (1, "t_1")) - # number of rows here could be one, or the whole thing - assert len(rows) == 1 or len(rows) == 11 + r = self.engine.execute(select([self.table])) + rows = r.fetchmany(None) + eq_(rows[0], (1, "t_1")) + # number of rows here could be one, or the whole thing + assert len(rows) == 1 or len(rows) == 11 - r = self.engine.execute(select([self.table]).limit(1)) - r.fetchone() - eq_(r.fetchone(), None) + r = self.engine.execute(select([self.table]).limit(1)) + r.fetchone() + eq_(r.fetchone(), None) - r = self.engine.execute(select([self.table]).limit(5)) - rows = r.fetchmany(6) - eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) + r = self.engine.execute(select([self.table]).limit(5)) + rows = r.fetchmany(6) + eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) - # result keeps going just fine with blank results... - eq_(r.fetchmany(2), []) + # result keeps going just fine with blank results... + eq_(r.fetchmany(2), []) - eq_(r.fetchmany(2), []) + eq_(r.fetchmany(2), []) - eq_(r.fetchall(), []) + eq_(r.fetchall(), []) - eq_(r.fetchone(), None) + eq_(r.fetchone(), None) - # until we close - r.close() + # until we close + r.close() - self._assert_result_closed(r) + self._assert_result_closed(r) - r = self.engine.execute(select([self.table]).limit(5)) - eq_(r.first(), (1, "t_1")) - self._assert_result_closed(r) + r = self.engine.execute(select([self.table]).limit(5)) + eq_(r.first(), (1, "t_1")) + self._assert_result_closed(r) - r = self.engine.execute(select([self.table]).limit(5)) - eq_(r.scalar(), 1) - self._assert_result_closed(r) + r = self.engine.execute(select([self.table]).limit(5)) + eq_(r.scalar(), 1) + self._assert_result_closed(r) def _assert_result_closed(self, r): assert_raises_message( @@ -1123,6 +1164,42 @@ class AlternateResultProxyTest(fixtures.TestBase): def test_buffered_column_result_proxy(self): self._test_proxy(_result.BufferedColumnResultProxy) + def test_buffered_row_growth(self): + with self._proxy_fixture(_result.BufferedRowResultProxy): + with self.engine.connect() as conn: + conn.execute(self.table.insert(), [ + {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) + ]) + result = conn.execute(self.table.select()) + checks = { + 0: 5, 1: 10, 9: 20, 135: 250, 274: 500, + 1351: 1000 + } + for idx, row in enumerate(result, 0): + if idx in checks: + eq_(result._bufsize, checks[idx]) + le_( + len(result._BufferedRowResultProxy__rowbuffer), + 1000 + ) + + def test_max_row_buffer_option(self): + with self._proxy_fixture(_result.BufferedRowResultProxy): + with self.engine.connect() as conn: + conn.execute(self.table.insert(), [ + {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) + ]) + result = conn.execution_options(max_row_buffer=27).execute( + self.table.select() + ) + for idx, row in enumerate(result, 0): + if idx in (16, 70, 150, 250): + eq_(result._bufsize, 27) + le_( + len(result._BufferedRowResultProxy__rowbuffer), + 27 + ) + class EngineEventsTest(fixtures.TestBase): __requires__ = 'ad_hoc_engines', @@ -1943,6 +2020,47 @@ class HandleErrorTest(fixtures.TestBase): self._test_alter_disconnect(True, False) self._test_alter_disconnect(False, False) + @testing.requires.independent_connections + def _test_alter_invalidate_pool_to_false(self, set_to_false): + orig_error = True + + engine = engines.testing_engine() + + @event.listens_for(engine, "handle_error") + def evt(ctx): + if set_to_false: + ctx.invalidate_pool_on_disconnect = False + + c1, c2, c3 = engine.pool.connect(), \ + engine.pool.connect(), engine.pool.connect() + crecs = [conn._connection_record for conn in (c1, c2, c3)] + c1.close() + c2.close() + c3.close() + + with patch.object(engine.dialect, "is_disconnect", + Mock(return_value=orig_error)): + + with engine.connect() as c: + target_crec = c.connection._connection_record + try: + c.execute("SELECT x FROM nonexistent") + assert False + except tsa.exc.StatementError as st: + eq_(st.connection_invalidated, True) + + for crec in crecs: + if crec is target_crec or not set_to_false: + is_not_(crec.connection, crec.get_connection()) + else: + is_(crec.connection, crec.get_connection()) + + def test_alter_invalidate_pool_to_false(self): + self._test_alter_invalidate_pool_to_false(True) + + def test_alter_invalidate_pool_stays_true(self): + self._test_alter_invalidate_pool_to_false(False) + def test_handle_error_event_connect_isolation_level(self): engine = engines.testing_engine() @@ -2532,3 +2650,87 @@ class DialectEventTest(fixtures.TestBase): def test_cursor_execute_wo_replace(self): self._test_cursor_execute(False) + + def test_connect_replace_params(self): + e = engines.testing_engine(options={"_initialize": False}) + + @event.listens_for(e, "do_connect") + def evt(dialect, conn_rec, cargs, cparams): + cargs[:] = ['foo', 'hoho'] + cparams.clear() + cparams['bar'] = 'bat' + conn_rec.info['boom'] = "bap" + + m1 = Mock() + e.dialect.connect = m1.real_connect + + with e.connect() as conn: + eq_(m1.mock_calls, [call.real_connect('foo', 'hoho', bar='bat')]) + eq_(conn.info['boom'], 'bap') + + def test_connect_do_connect(self): + e = engines.testing_engine(options={"_initialize": False}) + + m1 = Mock() + + @event.listens_for(e, "do_connect") + def evt1(dialect, conn_rec, cargs, cparams): + cargs[:] = ['foo', 'hoho'] + cparams.clear() + cparams['bar'] = 'bat' + conn_rec.info['boom'] = "one" + + @event.listens_for(e, "do_connect") + def evt2(dialect, conn_rec, cargs, cparams): + conn_rec.info['bap'] = "two" + return m1.our_connect(cargs, cparams) + + with e.connect() as conn: + # called with args + eq_( + m1.mock_calls, + [call.our_connect(['foo', 'hoho'], {'bar': 'bat'})]) + + eq_(conn.info['boom'], "one") + eq_(conn.info['bap'], "two") + + # returned our mock connection + is_(conn.connection.connection, m1.our_connect()) + + def test_connect_do_connect_info_there_after_recycle(self): + # test that info is maintained after the do_connect() + # event for a soft invalidation. + + e = engines.testing_engine(options={"_initialize": False}) + + @event.listens_for(e, "do_connect") + def evt1(dialect, conn_rec, cargs, cparams): + conn_rec.info['boom'] = "one" + + conn = e.connect() + eq_(conn.info['boom'], "one") + + conn.connection.invalidate(soft=True) + conn.close() + conn = e.connect() + eq_(conn.info['boom'], "one") + + def test_connect_do_connect_info_there_after_invalidate(self): + # test that info is maintained after the do_connect() + # event for a hard invalidation. + + e = engines.testing_engine(options={"_initialize": False}) + + @event.listens_for(e, "do_connect") + def evt1(dialect, conn_rec, cargs, cparams): + assert not conn_rec.info + conn_rec.info['boom'] = "one" + + conn = e.connect() + eq_(conn.info['boom'], "one") + + conn.connection.invalidate() + conn = e.connect() + eq_(conn.info['boom'], "one") + + |
