summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py296
1 files changed, 249 insertions, 47 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index b0256d325..fbb1878dc 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -1,7 +1,7 @@
# coding: utf-8
from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \
- config, is_
+ config, is_, is_not_, le_
import re
from sqlalchemy.testing.util import picklers
from sqlalchemy.interfaces import ConnectionProxy
@@ -484,6 +484,32 @@ class ExecuteTest(fixtures.TestBase):
eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
@testing.requires.ad_hoc_engines
+ def test_dispose_event(self):
+ canary = Mock()
+ eng = create_engine(testing.db.url)
+ event.listen(eng, "engine_disposed", canary)
+
+ conn = eng.connect()
+ conn.close()
+ eng.dispose()
+
+
+ conn = eng.connect()
+ conn.close()
+
+ eq_(
+ canary.mock_calls,
+ [call(eng)]
+ )
+
+ eng.dispose()
+
+ eq_(
+ canary.mock_calls,
+ [call(eng), call(eng)]
+ )
+
+ @testing.requires.ad_hoc_engines
def test_autocommit_option_no_issue_first_connect(self):
eng = create_engine(testing.db.url)
eng.update_execution_options(autocommit=True)
@@ -1021,76 +1047,91 @@ class ExecutionOptionsTest(fixtures.TestBase):
)
-class AlternateResultProxyTest(fixtures.TestBase):
+class AlternateResultProxyTest(fixtures.TablesTest):
__requires__ = ('sqlite', )
@classmethod
- def setup_class(cls):
+ def setup_bind(cls):
cls.engine = engine = testing_engine('sqlite://')
- m = MetaData()
- cls.table = t = Table('test', m,
- Column('x', Integer, primary_key=True),
- Column('y', String(50, convert_unicode='force'))
- )
- m.create_all(engine)
- engine.execute(t.insert(), [
+ return engine
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ 'test', metadata,
+ Column('x', Integer, primary_key=True),
+ Column('y', String(50, convert_unicode='force'))
+ )
+
+ @classmethod
+ def insert_data(cls):
+ cls.engine.execute(cls.tables.test.insert(), [
{'x': i, 'y': "t_%d" % i} for i in range(1, 12)
])
- def _test_proxy(self, cls):
+ @contextmanager
+ def _proxy_fixture(self, cls):
+ self.table = self.tables.test
+
class ExcCtx(default.DefaultExecutionContext):
def get_result_proxy(self):
return cls(self)
- self.engine.dialect.execution_ctx_cls = ExcCtx
- rows = []
- r = self.engine.execute(select([self.table]))
- assert isinstance(r, cls)
- for i in range(5):
- rows.append(r.fetchone())
- eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+ self.patcher = patch.object(
+ self.engine.dialect, "execution_ctx_cls", ExcCtx)
+ with self.patcher:
+ yield
- rows = r.fetchmany(3)
- eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
+ def _test_proxy(self, cls):
+ with self._proxy_fixture(cls):
+ rows = []
+ r = self.engine.execute(select([self.table]))
+ assert isinstance(r, cls)
+ for i in range(5):
+ rows.append(r.fetchone())
+ eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+
+ rows = r.fetchmany(3)
+ eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
- rows = r.fetchall()
- eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
+ rows = r.fetchall()
+ eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
- r = self.engine.execute(select([self.table]))
- rows = r.fetchmany(None)
- eq_(rows[0], (1, "t_1"))
- # number of rows here could be one, or the whole thing
- assert len(rows) == 1 or len(rows) == 11
+ r = self.engine.execute(select([self.table]))
+ rows = r.fetchmany(None)
+ eq_(rows[0], (1, "t_1"))
+ # number of rows here could be one, or the whole thing
+ assert len(rows) == 1 or len(rows) == 11
- r = self.engine.execute(select([self.table]).limit(1))
- r.fetchone()
- eq_(r.fetchone(), None)
+ r = self.engine.execute(select([self.table]).limit(1))
+ r.fetchone()
+ eq_(r.fetchone(), None)
- r = self.engine.execute(select([self.table]).limit(5))
- rows = r.fetchmany(6)
- eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+ r = self.engine.execute(select([self.table]).limit(5))
+ rows = r.fetchmany(6)
+ eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
- # result keeps going just fine with blank results...
- eq_(r.fetchmany(2), [])
+ # result keeps going just fine with blank results...
+ eq_(r.fetchmany(2), [])
- eq_(r.fetchmany(2), [])
+ eq_(r.fetchmany(2), [])
- eq_(r.fetchall(), [])
+ eq_(r.fetchall(), [])
- eq_(r.fetchone(), None)
+ eq_(r.fetchone(), None)
- # until we close
- r.close()
+ # until we close
+ r.close()
- self._assert_result_closed(r)
+ self._assert_result_closed(r)
- r = self.engine.execute(select([self.table]).limit(5))
- eq_(r.first(), (1, "t_1"))
- self._assert_result_closed(r)
+ r = self.engine.execute(select([self.table]).limit(5))
+ eq_(r.first(), (1, "t_1"))
+ self._assert_result_closed(r)
- r = self.engine.execute(select([self.table]).limit(5))
- eq_(r.scalar(), 1)
- self._assert_result_closed(r)
+ r = self.engine.execute(select([self.table]).limit(5))
+ eq_(r.scalar(), 1)
+ self._assert_result_closed(r)
def _assert_result_closed(self, r):
assert_raises_message(
@@ -1123,6 +1164,42 @@ class AlternateResultProxyTest(fixtures.TestBase):
def test_buffered_column_result_proxy(self):
self._test_proxy(_result.BufferedColumnResultProxy)
+ def test_buffered_row_growth(self):
+ with self._proxy_fixture(_result.BufferedRowResultProxy):
+ with self.engine.connect() as conn:
+ conn.execute(self.table.insert(), [
+ {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
+ ])
+ result = conn.execute(self.table.select())
+ checks = {
+ 0: 5, 1: 10, 9: 20, 135: 250, 274: 500,
+ 1351: 1000
+ }
+ for idx, row in enumerate(result, 0):
+ if idx in checks:
+ eq_(result._bufsize, checks[idx])
+ le_(
+ len(result._BufferedRowResultProxy__rowbuffer),
+ 1000
+ )
+
+ def test_max_row_buffer_option(self):
+ with self._proxy_fixture(_result.BufferedRowResultProxy):
+ with self.engine.connect() as conn:
+ conn.execute(self.table.insert(), [
+ {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
+ ])
+ result = conn.execution_options(max_row_buffer=27).execute(
+ self.table.select()
+ )
+ for idx, row in enumerate(result, 0):
+ if idx in (16, 70, 150, 250):
+ eq_(result._bufsize, 27)
+ le_(
+ len(result._BufferedRowResultProxy__rowbuffer),
+ 27
+ )
+
class EngineEventsTest(fixtures.TestBase):
__requires__ = 'ad_hoc_engines',
@@ -1943,6 +2020,47 @@ class HandleErrorTest(fixtures.TestBase):
self._test_alter_disconnect(True, False)
self._test_alter_disconnect(False, False)
+ @testing.requires.independent_connections
+ def _test_alter_invalidate_pool_to_false(self, set_to_false):
+ orig_error = True
+
+ engine = engines.testing_engine()
+
+ @event.listens_for(engine, "handle_error")
+ def evt(ctx):
+ if set_to_false:
+ ctx.invalidate_pool_on_disconnect = False
+
+ c1, c2, c3 = engine.pool.connect(), \
+ engine.pool.connect(), engine.pool.connect()
+ crecs = [conn._connection_record for conn in (c1, c2, c3)]
+ c1.close()
+ c2.close()
+ c3.close()
+
+ with patch.object(engine.dialect, "is_disconnect",
+ Mock(return_value=orig_error)):
+
+ with engine.connect() as c:
+ target_crec = c.connection._connection_record
+ try:
+ c.execute("SELECT x FROM nonexistent")
+ assert False
+ except tsa.exc.StatementError as st:
+ eq_(st.connection_invalidated, True)
+
+ for crec in crecs:
+ if crec is target_crec or not set_to_false:
+ is_not_(crec.connection, crec.get_connection())
+ else:
+ is_(crec.connection, crec.get_connection())
+
+ def test_alter_invalidate_pool_to_false(self):
+ self._test_alter_invalidate_pool_to_false(True)
+
+ def test_alter_invalidate_pool_stays_true(self):
+ self._test_alter_invalidate_pool_to_false(False)
+
def test_handle_error_event_connect_isolation_level(self):
engine = engines.testing_engine()
@@ -2532,3 +2650,87 @@ class DialectEventTest(fixtures.TestBase):
def test_cursor_execute_wo_replace(self):
self._test_cursor_execute(False)
+
+ def test_connect_replace_params(self):
+ e = engines.testing_engine(options={"_initialize": False})
+
+ @event.listens_for(e, "do_connect")
+ def evt(dialect, conn_rec, cargs, cparams):
+ cargs[:] = ['foo', 'hoho']
+ cparams.clear()
+ cparams['bar'] = 'bat'
+ conn_rec.info['boom'] = "bap"
+
+ m1 = Mock()
+ e.dialect.connect = m1.real_connect
+
+ with e.connect() as conn:
+ eq_(m1.mock_calls, [call.real_connect('foo', 'hoho', bar='bat')])
+ eq_(conn.info['boom'], 'bap')
+
+ def test_connect_do_connect(self):
+ e = engines.testing_engine(options={"_initialize": False})
+
+ m1 = Mock()
+
+ @event.listens_for(e, "do_connect")
+ def evt1(dialect, conn_rec, cargs, cparams):
+ cargs[:] = ['foo', 'hoho']
+ cparams.clear()
+ cparams['bar'] = 'bat'
+ conn_rec.info['boom'] = "one"
+
+ @event.listens_for(e, "do_connect")
+ def evt2(dialect, conn_rec, cargs, cparams):
+ conn_rec.info['bap'] = "two"
+ return m1.our_connect(cargs, cparams)
+
+ with e.connect() as conn:
+ # called with args
+ eq_(
+ m1.mock_calls,
+ [call.our_connect(['foo', 'hoho'], {'bar': 'bat'})])
+
+ eq_(conn.info['boom'], "one")
+ eq_(conn.info['bap'], "two")
+
+ # returned our mock connection
+ is_(conn.connection.connection, m1.our_connect())
+
+ def test_connect_do_connect_info_there_after_recycle(self):
+ # test that info is maintained after the do_connect()
+ # event for a soft invalidation.
+
+ e = engines.testing_engine(options={"_initialize": False})
+
+ @event.listens_for(e, "do_connect")
+ def evt1(dialect, conn_rec, cargs, cparams):
+ conn_rec.info['boom'] = "one"
+
+ conn = e.connect()
+ eq_(conn.info['boom'], "one")
+
+ conn.connection.invalidate(soft=True)
+ conn.close()
+ conn = e.connect()
+ eq_(conn.info['boom'], "one")
+
+ def test_connect_do_connect_info_there_after_invalidate(self):
+ # test that info is maintained after the do_connect()
+ # event for a hard invalidation.
+
+ e = engines.testing_engine(options={"_initialize": False})
+
+ @event.listens_for(e, "do_connect")
+ def evt1(dialect, conn_rec, cargs, cparams):
+ assert not conn_rec.info
+ conn_rec.info['boom'] = "one"
+
+ conn = e.connect()
+ eq_(conn.info['boom'], "one")
+
+ conn.connection.invalidate()
+ conn = e.connect()
+ eq_(conn.info['boom'], "one")
+
+