summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-04-07 14:15:43 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-04-16 13:35:55 -0400
commit2f617f56f2acdce00b88f746c403cf5ed66d4d27 (patch)
tree0962f2c43c1a361135ecdab933167fa0963ae58a /test
parentbd303b10e2bf69169f07447c7272fc71ac931f10 (diff)
downloadsqlalchemy-2f617f56f2acdce00b88f746c403cf5ed66d4d27.tar.gz
Create initial 2.0 engine implementation
Implemented the SQLAlchemy 2 :func:`.future.create_engine` function which is used for forwards compatibility with SQLAlchemy 2. This engine features always-transactional behavior with autobegin. Allow execution options per statement execution. This includes that the before_execute() and after_execute() events now accept an additional dictionary with these options, empty if not passed; a legacy event decorator is added for backwards compatibility which now also emits a deprecation warning. Add some basic tests for execution, transactions, and the new result object. Build out on a new testing fixture that swaps in the future engine completely to start with. Change-Id: I70e7338bb3f0ce22d2f702537d94bb249bd9fb0a Fixes: #4644
Diffstat (limited to 'test')
-rw-r--r--test/base/test_events.py46
-rw-r--r--test/base/test_tutorials.py4
-rw-r--r--test/dialect/mysql/test_reflection.py2
-rw-r--r--test/engine/test_deprecations.py92
-rw-r--r--test/engine/test_execute.py261
-rw-r--r--test/engine/test_logging.py4
-rw-r--r--test/engine/test_transaction.py564
-rw-r--r--test/ext/test_baked.py4
-rw-r--r--test/orm/test_deprecations.py68
-rw-r--r--test/orm/test_events.py18
-rw-r--r--test/orm/test_transaction.py145
-rw-r--r--test/requirements.py4
-rw-r--r--test/sql/test_defaults.py17
-rw-r--r--test/sql/test_resultset.py96
-rw-r--r--test/sql/test_sequences.py10
15 files changed, 1241 insertions, 94 deletions
diff --git a/test/base/test_events.py b/test/base/test_events.py
index f13137084..677cf80b0 100644
--- a/test/base/test_events.py
+++ b/test/base/test_events.py
@@ -6,6 +6,7 @@ from sqlalchemy import testing
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_deprecated
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_not_
@@ -408,7 +409,13 @@ class LegacySignatureTest(fixtures.TestBase):
def handler1(x, y):
canary(x, y)
- self.TargetOne().dispatch.event_three(4, 5, 6, 7)
+ with expect_deprecated(
+ 'The argument signature for the "TargetEventsOne.event_three" '
+ "event listener has changed as of version 0.9, and conversion "
+ "for the old argument signature will be removed in a future "
+ r'release. The new signature is "def event_three\(x, y, z, q\)"'
+ ):
+ self.TargetOne().dispatch.event_three(4, 5, 6, 7)
eq_(canary.mock_calls, [call(4, 5)])
@@ -451,7 +458,14 @@ class LegacySignatureTest(fixtures.TestBase):
eq_(canary.mock_calls, [call(5, 4, 5, foo="bar")])
def _test_legacy_accept_kw(self, target, canary):
- target.dispatch.event_four(4, 5, 6, 7, foo="bar")
+ with expect_deprecated(
+ 'The argument signature for the "TargetEventsOne.event_four" '
+ "event listener has changed as of version 0.9, and conversion "
+ "for the old argument signature will be removed in a future "
+ r"release. The new signature is "
+ r'"def event_four\(x, y, z, q, \*\*kw\)"'
+ ):
+ target.dispatch.event_four(4, 5, 6, 7, foo="bar")
eq_(canary.mock_calls, [call(4, 5, {"foo": "bar"})])
@@ -462,9 +476,27 @@ class LegacySignatureTest(fixtures.TestBase):
def handler1(x, y, z, q):
canary(x, y, z, q)
- self.TargetOne().dispatch.event_six(4, 5)
+ with expect_deprecated(
+ 'The argument signature for the "TargetEventsOne.event_six" '
+ "event listener has changed as of version 0.9, and "
+ "conversion for the old argument signature will be removed in "
+ "a future release. The new signature is "
+ r'"def event_six\(x, y\)'
+ ):
+ self.TargetOne().dispatch.event_six(4, 5)
eq_(canary.mock_calls, [call(4, 5, 9, 20)])
+ def test_complex_new_accept(self):
+ canary = Mock()
+
+ @event.listens_for(self.TargetOne, "event_six")
+ def handler1(x, y):
+ canary(x, y)
+
+ # new version does not emit a warning
+ self.TargetOne().dispatch.event_six(4, 5)
+ eq_(canary.mock_calls, [call(4, 5)])
+
def test_legacy_accept_from_method(self):
canary = Mock()
@@ -474,7 +506,13 @@ class LegacySignatureTest(fixtures.TestBase):
event.listen(self.TargetOne, "event_three", MyClass().handler1)
- self.TargetOne().dispatch.event_three(4, 5, 6, 7)
+ with expect_deprecated(
+ 'The argument signature for the "TargetEventsOne.event_three" '
+ "event listener has changed as of version 0.9, and conversion "
+ "for the old argument signature will be removed in a future "
+ r'release. The new signature is "def event_three\(x, y, z, q\)"'
+ ):
+ self.TargetOne().dispatch.event_three(4, 5, 6, 7)
eq_(canary.mock_calls, [call(4, 5)])
def test_standard_accept_has_legacies(self):
diff --git a/test/base/test_tutorials.py b/test/base/test_tutorials.py
index 2c1058b9a..97dca753d 100644
--- a/test/base/test_tutorials.py
+++ b/test/base/test_tutorials.py
@@ -13,7 +13,7 @@ from sqlalchemy.testing import fixtures
class DocTest(fixtures.TestBase):
def _setup_logger(self):
- rootlogger = logging.getLogger("sqlalchemy.engine.base.Engine")
+ rootlogger = logging.getLogger("sqlalchemy.engine.Engine")
class MyStream(object):
def write(self, string):
@@ -28,7 +28,7 @@ class DocTest(fixtures.TestBase):
rootlogger.addHandler(handler)
def _teardown_logger(self):
- rootlogger = logging.getLogger("sqlalchemy.engine.base.Engine")
+ rootlogger = logging.getLogger("sqlalchemy.engine.Engine")
rootlogger.removeHandler(self._handler)
def _setup_create_table_patcher(self):
diff --git a/test/dialect/mysql/test_reflection.py b/test/dialect/mysql/test_reflection.py
index 2de530607..b0d0e4151 100644
--- a/test/dialect/mysql/test_reflection.py
+++ b/test/dialect/mysql/test_reflection.py
@@ -853,7 +853,7 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL):
dialect._casing = casing
dialect.default_schema_name = "Test"
connection = mock.Mock(
- dialect=dialect, execute=lambda stmt, **params: ischema
+ dialect=dialect, execute=lambda stmt, params: ischema
)
dialect._correct_for_mysql_bugs_88718_96365(fkeys, connection)
eq_(
diff --git a/test/engine/test_deprecations.py b/test/engine/test_deprecations.py
index 53df6c1a8..20f2b7d74 100644
--- a/test/engine/test_deprecations.py
+++ b/test/engine/test_deprecations.py
@@ -1,5 +1,8 @@
+import re
+
import sqlalchemy as tsa
from sqlalchemy import create_engine
+from sqlalchemy import event
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import inspect
@@ -13,9 +16,11 @@ from sqlalchemy import testing
from sqlalchemy import VARCHAR
from sqlalchemy.engine import reflection
from sqlalchemy.engine.base import Connection
+from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.mock import MockConnection
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
+from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
@@ -23,6 +28,7 @@ from sqlalchemy.testing import is_
from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_instance_of
from sqlalchemy.testing import is_true
+from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.mock import Mock
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
@@ -766,3 +772,89 @@ class RawExecuteTest(fixtures.TablesTest):
(3, "horse"),
(4, "sally"),
]
+
+
+class EngineEventsTest(fixtures.TestBase):
+ __requires__ = ("ad_hoc_engines",)
+ __backend__ = True
+
+ def tearDown(self):
+ Engine.dispatch._clear()
+ Engine._has_events = False
+
+ def _assert_stmts(self, expected, received):
+ list(received)
+ for stmt, params, posn in expected:
+ if not received:
+ assert False, "Nothing available for stmt: %s" % stmt
+ while received:
+ teststmt, testparams, testmultiparams = received.pop(0)
+ teststmt = (
+ re.compile(r"[\n\t ]+", re.M).sub(" ", teststmt).strip()
+ )
+ if teststmt.startswith(stmt) and (
+ testparams == params or testparams == posn
+ ):
+ break
+
+ def test_retval_flag(self):
+ canary = []
+
+ def tracker(name):
+ def go(conn, *args, **kw):
+ canary.append(name)
+
+ return go
+
+ def execute(conn, clauseelement, multiparams, params):
+ canary.append("execute")
+ return clauseelement, multiparams, params
+
+ def cursor_execute(
+ conn, cursor, statement, parameters, context, executemany
+ ):
+ canary.append("cursor_execute")
+ return statement, parameters
+
+ engine = engines.testing_engine()
+
+ assert_raises(
+ tsa.exc.ArgumentError,
+ event.listen,
+ engine,
+ "begin",
+ tracker("begin"),
+ retval=True,
+ )
+
+ event.listen(engine, "before_execute", execute, retval=True)
+ event.listen(
+ engine, "before_cursor_execute", cursor_execute, retval=True
+ )
+ with testing.expect_deprecated(
+ r"The argument signature for the "
+ r"\"ConnectionEvents.before_execute\" event listener",
+ ):
+ engine.execute(select([1]))
+ eq_(canary, ["execute", "cursor_execute"])
+
+ def test_argument_format_execute(self):
+ def before_execute(conn, clauseelement, multiparams, params):
+ assert isinstance(multiparams, (list, tuple))
+ assert isinstance(params, dict)
+
+ def after_execute(conn, clauseelement, multiparams, params, result):
+ assert isinstance(multiparams, (list, tuple))
+ assert isinstance(params, dict)
+
+ e1 = testing_engine(config.db_url)
+ event.listen(e1, "before_execute", before_execute)
+ event.listen(e1, "after_execute", after_execute)
+
+ with testing.expect_deprecated(
+ r"The argument signature for the "
+ r"\"ConnectionEvents.before_execute\" event listener",
+ r"The argument signature for the "
+ r"\"ConnectionEvents.after_execute\" event listener",
+ ):
+ e1.execute(select([1]))
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index e2c009e2d..f694a251c 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -24,6 +24,7 @@ from sqlalchemy import TypeDecorator
from sqlalchemy import util
from sqlalchemy import VARCHAR
from sqlalchemy.engine import default
+from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.base import Engine
from sqlalchemy.sql import column
from sqlalchemy.sql import literal
@@ -112,6 +113,11 @@ class ExecuteTest(fixtures.TablesTest):
)
def test_raw_named_invalid(self, connection):
+ # this is awkward b.c. this is just testing if regular Python
+ # is raising TypeError if they happened to send arguments that
+ # look like the legacy ones which also happen to conflict with
+ # the positional signature for the method. some combinations
+ # can get through and fail differently
assert_raises(
TypeError,
connection.exec_driver_sql,
@@ -119,6 +125,7 @@ class ExecuteTest(fixtures.TablesTest):
"values (%(id)s, %(name)s)",
{"id": 2, "name": "ed"},
{"id": 3, "name": "horse"},
+ {"id": 4, "name": "horse"},
)
assert_raises(
TypeError,
@@ -1224,6 +1231,7 @@ class EngineEventsTest(fixtures.TestBase):
def _assert_stmts(self, expected, received):
list(received)
+
for stmt, params, posn in expected:
if not received:
assert False, "Nothing available for stmt: %s" % stmt
@@ -1245,12 +1253,20 @@ class EngineEventsTest(fixtures.TestBase):
event.listen(e1, "before_execute", canary)
s1 = select([1])
s2 = select([2])
- e1.execute(s1)
- e2.execute(s2)
+
+ with e1.connect() as conn:
+ conn.execute(s1)
+
+ with e2.connect() as conn:
+ conn.execute(s2)
eq_([arg[1][1] for arg in canary.mock_calls], [s1])
event.listen(e2, "before_execute", canary)
- e1.execute(s1)
- e2.execute(s2)
+
+ with e1.connect() as conn:
+ conn.execute(s1)
+
+ with e2.connect() as conn:
+ conn.execute(s2)
eq_([arg[1][1] for arg in canary.mock_calls], [s1, s1, s2])
def test_per_engine_plus_global(self):
@@ -1265,11 +1281,13 @@ class EngineEventsTest(fixtures.TestBase):
e1.connect()
e2.connect()
- e1.execute(select([1]))
+ with e1.connect() as conn:
+ conn.execute(select([1]))
eq_(canary.be1.call_count, 1)
eq_(canary.be2.call_count, 1)
- e2.execute(select([1]))
+ with e2.connect() as conn:
+ conn.execute(select([1]))
eq_(canary.be1.call_count, 2)
eq_(canary.be2.call_count, 1)
@@ -1288,9 +1306,10 @@ class EngineEventsTest(fixtures.TestBase):
eq_(canary.be1.call_count, 1)
eq_(canary.be2.call_count, 1)
- conn._branch().execute(select([1]))
- eq_(canary.be1.call_count, 2)
- eq_(canary.be2.call_count, 2)
+ if testing.requires.legacy_engine.enabled:
+ conn._branch().execute(select([1]))
+ eq_(canary.be1.call_count, 2)
+ eq_(canary.be2.call_count, 2)
def test_add_event_after_connect(self):
# new feature as of #2978
@@ -1339,7 +1358,7 @@ class EngineEventsTest(fixtures.TestBase):
dialect = conn.dialect
ctx = dialect.execution_ctx_cls._init_statement(
- dialect, conn, conn.connection, stmt, {}
+ dialect, conn, conn.connection, {}, stmt, {}
)
ctx._execute_scalar(stmt, Integer())
@@ -1377,11 +1396,15 @@ class EngineEventsTest(fixtures.TestBase):
)
def test_argument_format_execute(self):
- def before_execute(conn, clauseelement, multiparams, params):
+ def before_execute(
+ conn, clauseelement, multiparams, params, execution_options
+ ):
assert isinstance(multiparams, (list, tuple))
assert isinstance(params, dict)
- def after_execute(conn, clauseelement, multiparams, params, result):
+ def after_execute(
+ conn, clauseelement, multiparams, params, result, execution_options
+ ):
assert isinstance(multiparams, (list, tuple))
assert isinstance(params, dict)
@@ -1389,18 +1412,23 @@ class EngineEventsTest(fixtures.TestBase):
event.listen(e1, "before_execute", before_execute)
event.listen(e1, "after_execute", after_execute)
- e1.execute(select([1]))
- e1.execute(select([1]).compile(dialect=e1.dialect).statement)
- e1.execute(select([1]).compile(dialect=e1.dialect))
- e1._execute_compiled(select([1]).compile(dialect=e1.dialect), (), {})
+ with e1.connect() as conn:
+ conn.execute(select([1]))
+ conn.execute(select([1]).compile(dialect=e1.dialect).statement)
+ conn.execute(select([1]).compile(dialect=e1.dialect))
+
+ conn._execute_compiled(
+ select([1]).compile(dialect=e1.dialect), (), {}
+ )
- @testing.fails_on("firebird", "Data type unknown")
def test_execute_events(self):
stmts = []
cursor_stmts = []
- def execute(conn, clauseelement, multiparams, params):
+ def execute(
+ conn, clauseelement, multiparams, params, execution_options
+ ):
stmts.append((str(clauseelement), params, multiparams))
def cursor_execute(
@@ -1408,6 +1436,8 @@ class EngineEventsTest(fixtures.TestBase):
):
cursor_stmts.append((str(statement), parameters, None))
+ # TODO: this test is kind of a mess
+
for engine in [
engines.testing_engine(options=dict(implicit_returning=False)),
engines.testing_engine(
@@ -1428,28 +1458,57 @@ class EngineEventsTest(fixtures.TestBase):
primary_key=True,
),
)
- m.create_all()
+
+ if isinstance(engine, Connection) and engine._is_future:
+ ctx = None
+ conn = engine
+ elif engine._is_future:
+ ctx = conn = engine.connect()
+ else:
+ ctx = None
+ conn = engine
+
try:
- t1.insert().execute(c1=5, c2="some data")
- t1.insert().execute(c1=6)
- eq_(
- engine.execute(text("select * from t1")).fetchall(),
- [(5, "some data"), (6, "foo")],
- )
+ m.create_all(conn, checkfirst=False)
+ try:
+ conn.execute(t1.insert(), dict(c1=5, c2="some data"))
+ conn.execute(t1.insert(), dict(c1=6))
+ eq_(
+ conn.execute(text("select * from t1")).fetchall(),
+ [(5, "some data"), (6, "foo")],
+ )
+ finally:
+ m.drop_all(conn)
+ if engine._is_future:
+ conn.commit()
finally:
- m.drop_all()
-
- compiled = [
- ("CREATE TABLE t1", {}, None),
- (
- "INSERT INTO t1 (c1, c2)",
- {"c2": "some data", "c1": 5},
- None,
- ),
- ("INSERT INTO t1 (c1, c2)", {"c1": 6}, None),
- ("select * from t1", {}, None),
- ("DROP TABLE t1", {}, None),
- ]
+ if ctx:
+ ctx.close()
+
+ if engine._is_future:
+ compiled = [
+ ("CREATE TABLE t1", {}, None),
+ (
+ "INSERT INTO t1 (c1, c2)",
+ {"c2": "some data", "c1": 5},
+ None,
+ ),
+ ("INSERT INTO t1 (c1, c2)", {"c1": 6}, None),
+ ("select * from t1", {}, None),
+ ("DROP TABLE t1", {}, None),
+ ]
+ else:
+ compiled = [
+ ("CREATE TABLE t1", {}, None),
+ (
+ "INSERT INTO t1 (c1, c2)",
+ {},
+ ({"c2": "some data", "c1": 5},),
+ ),
+ ("INSERT INTO t1 (c1, c2)", {}, ({"c1": 6},)),
+ ("select * from t1", {}, None),
+ ("DROP TABLE t1", {}, None),
+ ]
cursor = [
("CREATE TABLE t1", {}, ()),
@@ -1512,11 +1571,13 @@ class EngineEventsTest(fixtures.TestBase):
event.listen(eng, "before_execute", l2)
event.listen(eng1, "before_execute", l3)
- eng.execute(select([1])).close()
+ with eng.connect() as conn:
+ conn.execute(select([1]))
eq_(canary, ["l1", "l2"])
- eng1.execute(select([1])).close()
+ with eng1.connect() as conn:
+ conn.execute(select([1]))
eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
@@ -1547,11 +1608,13 @@ class EngineEventsTest(fixtures.TestBase):
event.listen(eng, "before_execute", l3)
event.listen(eng1, "before_execute", l4)
- eng.execute(select([1])).close()
+ with eng.connect() as conn:
+ conn.execute(select([1]))
eq_(canary, ["l1", "l2", "l3"])
- eng1.execute(select([1])).close()
+ with eng1.connect() as conn:
+ conn.execute(select([1]))
eq_(canary, ["l1", "l2", "l3", "l4", "l1", "l2", "l3"])
@@ -1561,7 +1624,8 @@ class EngineEventsTest(fixtures.TestBase):
event.remove(eng1, "before_execute", l4)
event.remove(eng, "before_execute", l3)
- eng1.execute(select([1])).close()
+ with eng1.connect() as conn:
+ conn.execute(select([1]))
eq_(canary, ["l2"])
@testing.requires.ad_hoc_engines
@@ -1609,7 +1673,9 @@ class EngineEventsTest(fixtures.TestBase):
return go
- def execute(conn, clauseelement, multiparams, params):
+ def execute(
+ conn, clauseelement, multiparams, params, execution_options
+ ):
canary.append("execute")
return clauseelement, multiparams, params
@@ -1634,9 +1700,11 @@ class EngineEventsTest(fixtures.TestBase):
event.listen(
engine, "before_cursor_execute", cursor_execute, retval=True
)
- engine.execute(select([1]))
+ with engine.connect() as conn:
+ conn.execute(select([1]))
eq_(canary, ["execute", "cursor_execute"])
+ @testing.requires.legacy_engine
def test_engine_connect(self):
engine = engines.testing_engine()
@@ -1781,7 +1849,15 @@ class EngineEventsTest(fixtures.TestBase):
("begin", set(["conn"])),
(
"execute",
- set(["conn", "clauseelement", "multiparams", "params"]),
+ set(
+ [
+ "conn",
+ "clauseelement",
+ "multiparams",
+ "params",
+ "execution_options",
+ ]
+ ),
),
(
"cursor_execute",
@@ -1800,7 +1876,15 @@ class EngineEventsTest(fixtures.TestBase):
("begin", set(["conn"])),
(
"execute",
- set(["conn", "clauseelement", "multiparams", "params"]),
+ set(
+ [
+ "conn",
+ "clauseelement",
+ "multiparams",
+ "params",
+ "execution_options",
+ ]
+ ),
),
(
"cursor_execute",
@@ -1908,6 +1992,10 @@ class EngineEventsTest(fixtures.TestBase):
)
+class FutureEngineEventsTest(fixtures.FutureEngineMixin, EngineEventsTest):
+ pass
+
+
class HandleErrorTest(fixtures.TestBase):
__requires__ = ("ad_hoc_engines",)
__backend__ = True
@@ -2649,7 +2737,7 @@ class DialectEventTest(fixtures.TestBase):
stmt = "insert into table foo"
params = {"foo": "bar"}
ctx = dialect.execution_ctx_cls._init_statement(
- dialect, conn, conn.connection, stmt, [params]
+ dialect, conn, conn.connection, {}, stmt, [params],
)
conn._cursor_execute(ctx.cursor, stmt, params, ctx)
@@ -2813,3 +2901,80 @@ class AutocommitTextTest(fixtures.TestBase):
def test_select(self):
self._test_keyword("SELECT foo FROM table", False)
+
+
+class FutureExecuteTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "users",
+ metadata,
+ Column("user_id", INT, primary_key=True, autoincrement=False),
+ Column("user_name", VARCHAR(20)),
+ test_needs_acid=True,
+ )
+ Table(
+ "users_autoinc",
+ metadata,
+ Column(
+ "user_id", INT, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("user_name", VARCHAR(20)),
+ test_needs_acid=True,
+ )
+
+ @testing.combinations(
+ ({}, {}, {}),
+ ({"a": "b"}, {}, {"a": "b"}),
+ ({"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
+ argnames="conn_opts, exec_opts, expected",
+ )
+ def test_execution_opts_per_invoke(
+ self, connection, conn_opts, exec_opts, expected
+ ):
+ opts = []
+
+ @event.listens_for(connection, "before_cursor_execute")
+ def before_cursor_execute(
+ conn, cursor, statement, parameters, context, executemany
+ ):
+ opts.append(context.execution_options)
+
+ if conn_opts:
+ connection = connection.execution_options(**conn_opts)
+
+ if exec_opts:
+ connection.execute(select([1]), execution_options=exec_opts)
+ else:
+ connection.execute(select([1]))
+
+ eq_(opts, [expected])
+
+ def test_execution_opts_invoke_illegal(self, connection):
+ assert_raises_message(
+ tsa.exc.InvalidRequestError,
+ "The 'isolation_level' execution option is not supported "
+ "at the per-statement level",
+ connection.execute,
+ select([1]),
+ execution_options={"isolation_level": "AUTOCOMMIT"},
+ )
+
+ assert_raises_message(
+ tsa.exc.InvalidRequestError,
+ "The 'schema_translate_map' execution option is not supported "
+ "at the per-statement level",
+ connection.execute,
+ select([1]),
+ execution_options={"schema_translate_map": {}},
+ )
+
+ def test_no_branching(self, connection):
+ assert_raises_message(
+ NotImplementedError,
+ "sqlalchemy.future.Connection does not support "
+ "'branching' of new connections.",
+ connection.connect,
+ )
diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py
index 5d50a010d..b906b87be 100644
--- a/test/engine/test_logging.py
+++ b/test/engine/test_logging.py
@@ -491,7 +491,7 @@ class LoggingNameTest(fixtures.TestBase):
assert self.buf.buffer
for name in [b.name for b in self.buf.buffer]:
assert name in (
- "sqlalchemy.engine.base.Engine.%s" % eng_name,
+ "sqlalchemy.engine.Engine.%s" % eng_name,
"sqlalchemy.pool.impl.%s.%s"
% (eng.pool.__class__.__name__, pool_name),
)
@@ -501,7 +501,7 @@ class LoggingNameTest(fixtures.TestBase):
assert self.buf.buffer
for name in [b.name for b in self.buf.buffer]:
assert name in (
- "sqlalchemy.engine.base.Engine",
+ "sqlalchemy.engine.Engine",
"sqlalchemy.pool.impl.%s" % eng.pool.__class__.__name__,
)
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index 13058fbe1..1836b2e74 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -12,16 +12,17 @@ from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import VARCHAR
+from sqlalchemy.future import select as future_select
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import mock
from sqlalchemy.testing import ne_
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
-
users, metadata = None, None
@@ -1102,3 +1103,564 @@ class IsolationLevelTest(fixtures.TestBase):
conn.get_isolation_level(), self._non_default_isolation_level()
)
eq_(c2.get_isolation_level(), self._non_default_isolation_level())
+
+
+class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
+ """The SQLAlchemy 2.0 Connection ensures its own transaction is rolled
+ back upon close. Therefore the whole "reset agent" thing can go away.
+ this suite runs through all the reset agent tests to ensure the state
+ of the transaction is maintained while the "reset agent" feature is not
+ needed at all.
+
+ """
+
+ __backend__ = True
+
+ def test_begin_close(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary)
+ trans = connection.begin()
+ assert connection.connection._reset_agent is None
+ assert not trans.is_active
+ eq_(canary.mock_calls, [mock.call(connection)])
+
+ def test_begin_rollback(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary)
+ trans = connection.begin()
+ assert connection.connection._reset_agent is None
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+ assert not trans.is_active
+ eq_(canary.mock_calls, [mock.call(connection)])
+
+ def test_begin_commit(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(connection, "commit", canary.commit)
+ trans = connection.begin()
+ assert connection.connection._reset_agent is None
+ trans.commit()
+ assert connection.connection._reset_agent is None
+ assert not trans.is_active
+ eq_(canary.mock_calls, [mock.call.commit(connection)])
+
+ @testing.requires.savepoints
+ def test_begin_nested_close(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(connection, "commit", canary.commit)
+ trans = connection.begin_nested()
+ assert connection.connection._reset_agent is None
+ assert trans.is_active # it's a savepoint
+ eq_(canary.mock_calls, [mock.call.rollback(connection)])
+
+ @testing.requires.savepoints
+ def test_begin_begin_nested_close(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(connection, "commit", canary.commit)
+ trans = connection.begin()
+ trans2 = connection.begin_nested()
+ assert connection.connection._reset_agent is None
+ assert trans2.is_active # was never closed
+ assert not trans.is_active
+ eq_(canary.mock_calls, [mock.call.rollback(connection)])
+
+ @testing.requires.savepoints
+ def test_begin_begin_nested_rollback_commit(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(
+ connection, "rollback_savepoint", canary.rollback_savepoint
+ )
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(connection, "commit", canary.commit)
+ trans = connection.begin()
+ trans2 = connection.begin_nested()
+ assert connection.connection._reset_agent is None
+ trans2.rollback() # this is not a connection level event
+ assert connection.connection._reset_agent is None
+ trans.commit()
+ assert connection.connection._reset_agent is None
+ eq_(
+ canary.mock_calls,
+ [
+ mock.call.rollback_savepoint(connection, mock.ANY, trans),
+ mock.call.commit(connection),
+ ],
+ )
+
+ @testing.requires.savepoints
+ def test_begin_begin_nested_rollback_rollback(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(connection, "commit", canary.commit)
+ trans = connection.begin()
+ trans2 = connection.begin_nested()
+ assert connection.connection._reset_agent is None
+ trans2.rollback()
+ assert connection.connection._reset_agent is None
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+ eq_(canary.mock_calls, [mock.call.rollback(connection)])
+
+ @testing.requires.two_phase_transactions
+ def test_reset_via_agent_begin_twophase(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(
+ connection, "rollback_twophase", canary.rollback_twophase
+ )
+ event.listen(connection, "commit", canary.commit)
+ trans = connection.begin_twophase()
+ assert connection.connection._reset_agent is None
+ assert not trans.is_active
+ eq_(
+ canary.mock_calls,
+ [mock.call.rollback_twophase(connection, mock.ANY, False)],
+ )
+
+ @testing.requires.two_phase_transactions
+ def test_reset_via_agent_begin_twophase_commit(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(connection, "commit", canary.commit)
+ event.listen(connection, "commit_twophase", canary.commit_twophase)
+ trans = connection.begin_twophase()
+ assert connection.connection._reset_agent is None
+ trans.commit()
+ assert connection.connection._reset_agent is None
+ eq_(
+ canary.mock_calls,
+ [mock.call.commit_twophase(connection, mock.ANY, False)],
+ )
+
+ @testing.requires.two_phase_transactions
+ def test_reset_via_agent_begin_twophase_rollback(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(
+ connection, "rollback_twophase", canary.rollback_twophase
+ )
+ event.listen(connection, "commit", canary.commit)
+ trans = connection.begin_twophase()
+ assert connection.connection._reset_agent is None
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+ eq_(
+ canary.mock_calls,
+ [mock.call.rollback_twophase(connection, mock.ANY, False)],
+ )
+
+
+class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "users",
+ metadata,
+ Column("user_id", INT, primary_key=True, autoincrement=False),
+ Column("user_name", VARCHAR(20)),
+ test_needs_acid=True,
+ )
+ Table(
+ "users_autoinc",
+ metadata,
+ Column(
+ "user_id", INT, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("user_name", VARCHAR(20)),
+ test_needs_acid=True,
+ )
+
+ def test_autobegin_rollback(self):
+ users = self.tables.users
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.rollback()
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)), 0
+ )
+
+ @testing.requires.autocommit
+ def test_autocommit_isolation_level(self):
+ users = self.tables.users
+
+ with testing.db.connect().execution_options(
+ isolation_level="AUTOCOMMIT"
+ ) as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.rollback()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ @testing.requires.autocommit
+ def test_no_autocommit_w_begin(self):
+
+ with testing.db.begin() as conn:
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection has already begun a transaction; "
+ "isolation level may not be altered until transaction end",
+ conn.execution_options,
+ isolation_level="AUTOCOMMIT",
+ )
+
+ @testing.requires.autocommit
+ def test_no_autocommit_w_autobegin(self):
+
+ with testing.db.connect() as conn:
+ conn.execute(future_select(1))
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection has already begun a transaction; "
+ "isolation level may not be altered until transaction end",
+ conn.execution_options,
+ isolation_level="AUTOCOMMIT",
+ )
+
+ conn.rollback()
+
+ conn.execution_options(isolation_level="AUTOCOMMIT")
+
+ def test_autobegin_commit(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+
+ assert not conn.in_transaction()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ assert conn.in_transaction()
+ conn.commit()
+
+ assert not conn.in_transaction()
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name 2"})
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ assert conn.in_transaction()
+ conn.rollback()
+ assert not conn.in_transaction()
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ def test_rollback_on_close(self):
+ canary = mock.Mock()
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ conn.execute(select([1]))
+ assert conn.in_transaction()
+
+ eq_(canary.mock_calls, [mock.call(conn)])
+
+ def test_no_on_close_no_transaction(self):
+ canary = mock.Mock()
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ conn.execute(select([1]))
+ conn.rollback()
+ assert not conn.in_transaction()
+
+ eq_(canary.mock_calls, [mock.call(conn)])
+
+ def test_rollback_on_exception(self):
+ canary = mock.Mock()
+ try:
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ conn.execute(select([1]))
+ assert conn.in_transaction()
+ raise Exception("some error")
+ assert False
+ except:
+ pass
+
+ eq_(canary.mock_calls, [mock.call(conn)])
+
+ def test_rollback_on_exception_if_no_trans(self):
+ canary = mock.Mock()
+ try:
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ assert not conn.in_transaction()
+ raise Exception("some error")
+ assert False
+ except:
+ pass
+
+ eq_(canary.mock_calls, [])
+
+ def test_commit_no_begin(self):
+ with testing.db.connect() as conn:
+ assert not conn.in_transaction()
+ conn.commit()
+
+ @testing.requires.independent_connections
+ def test_commit_inactive(self):
+ with testing.db.connect() as conn:
+ conn.begin()
+ conn.invalidate()
+
+ assert_raises_message(
+ exc.InvalidRequestError, "Can't reconnect until", conn.commit
+ )
+
+ @testing.requires.independent_connections
+ def test_rollback_inactive(self):
+ users = self.tables.users
+ with testing.db.connect() as conn:
+
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.commit()
+
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ conn.invalidate()
+
+ assert_raises_message(
+ exc.StatementError,
+ "Can't reconnect",
+ conn.execute,
+ select([1]),
+ )
+
+ conn.rollback()
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ def test_rollback_no_begin(self):
+ with testing.db.connect() as conn:
+ assert not conn.in_transaction()
+ conn.rollback()
+
+ def test_rollback_end_ctx_manager(self):
+ with testing.db.begin() as conn:
+ assert conn.in_transaction()
+ conn.rollback()
+
+ def test_explicit_begin(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ assert not conn.in_transaction()
+ conn.begin()
+ assert conn.in_transaction()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.commit()
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ def test_no_double_begin(self):
+ with testing.db.connect() as conn:
+ conn.begin()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "a transaction is already begun for this connection",
+ conn.begin,
+ )
+
+ def test_no_autocommit(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 0,
+ )
+
+ def test_begin_block(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_one(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ savepoint = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 2,
+ )
+ savepoint.rollback()
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_two(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ savepoint = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 2,
+ )
+ savepoint.commit()
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_three(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ conn.rollback()
+
+ assert not conn.in_transaction()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 0,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_four(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ sp2.rollback()
+
+ assert conn.in_transaction()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_five(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ sp2.commit()
+
+ assert conn.in_transaction()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 3,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_six(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ sp2.commit()
+
+ sp1.rollback()
+
+ assert conn.in_transaction()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
diff --git a/test/ext/test_baked.py b/test/ext/test_baked.py
index 9906339c2..cdf8414b4 100644
--- a/test/ext/test_baked.py
+++ b/test/ext/test_baked.py
@@ -385,7 +385,9 @@ class ResultPostCriteriaTest(BakedTest):
with testing.db.connect() as conn:
@event.listens_for(conn, "before_execute")
- def before_execute(conn, clauseelement, multiparams, params):
+ def before_execute(
+ conn, clauseelement, multiparams, params, execution_options
+ ):
assert "yes" in conn._execution_options
bq = self.bakery(lambda s: s.query(User.id).order_by(User.id))
diff --git a/test/orm/test_deprecations.py b/test/orm/test_deprecations.py
index c63379504..d1c7a08a9 100644
--- a/test/orm/test_deprecations.py
+++ b/test/orm/test_deprecations.py
@@ -1,6 +1,7 @@
import sqlalchemy as sa
from sqlalchemy import and_
from sqlalchemy import desc
+from sqlalchemy import event
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import select
@@ -38,10 +39,13 @@ from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_true
+from sqlalchemy.testing.mock import call
+from sqlalchemy.testing.mock import Mock
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from . import _fixtures
from .inheritance import _poly_fixtures
+from .test_events import _RemoveListeners
from .test_options import PathTest as OptionsPathTest
from .test_query import QueryTest
@@ -1604,3 +1608,67 @@ class DistinctOrderByImplicitTest(QueryTest, AssertsCompiledSQL):
"addresses_email_address FROM users, addresses "
"ORDER BY users.id, users.name, addresses.email_address",
)
+
+
+class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest):
+ run_inserts = None
+
+ def test_on_bulk_update_hook(self):
+ User, users = self.classes.User, self.tables.users
+
+ sess = Session()
+ canary = Mock()
+
+ event.listen(sess, "after_bulk_update", canary.after_bulk_update)
+
+ def legacy(ses, qry, ctx, res):
+ canary.after_bulk_update_legacy(ses, qry, ctx, res)
+
+ event.listen(sess, "after_bulk_update", legacy)
+
+ mapper(User, users)
+
+ with testing.expect_deprecated(
+ 'The argument signature for the "SessionEvents.after_bulk_update" '
+ "event listener"
+ ):
+ sess.query(User).update({"name": "foo"})
+
+ eq_(canary.after_bulk_update.call_count, 1)
+
+ upd = canary.after_bulk_update.mock_calls[0][1][0]
+ eq_(upd.session, sess)
+ eq_(
+ canary.after_bulk_update_legacy.mock_calls,
+ [call(sess, upd.query, upd.context, upd.result)],
+ )
+
+ def test_on_bulk_delete_hook(self):
+ User, users = self.classes.User, self.tables.users
+
+ sess = Session()
+ canary = Mock()
+
+ event.listen(sess, "after_bulk_delete", canary.after_bulk_delete)
+
+ def legacy(ses, qry, ctx, res):
+ canary.after_bulk_delete_legacy(ses, qry, ctx, res)
+
+ event.listen(sess, "after_bulk_delete", legacy)
+
+ mapper(User, users)
+
+ with testing.expect_deprecated(
+ 'The argument signature for the "SessionEvents.after_bulk_delete" '
+ "event listener"
+ ):
+ sess.query(User).delete()
+
+ eq_(canary.after_bulk_delete.call_count, 1)
+
+ upd = canary.after_bulk_delete.mock_calls[0][1][0]
+ eq_(upd.session, sess)
+ eq_(
+ canary.after_bulk_delete_legacy.mock_calls,
+ [call(sess, upd.query, upd.context, upd.result)],
+ )
diff --git a/test/orm/test_events.py b/test/orm/test_events.py
index 225468baa..c1457289a 100644
--- a/test/orm/test_events.py
+++ b/test/orm/test_events.py
@@ -1759,11 +1759,6 @@ class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest):
event.listen(sess, "after_begin", canary.after_begin)
event.listen(sess, "after_bulk_update", canary.after_bulk_update)
- def legacy(ses, qry, ctx, res):
- canary.after_bulk_update_legacy(ses, qry, ctx, res)
-
- event.listen(sess, "after_bulk_update", legacy)
-
mapper(User, users)
sess.query(User).update({"name": "foo"})
@@ -1773,10 +1768,6 @@ class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest):
upd = canary.after_bulk_update.mock_calls[0][1][0]
eq_(upd.session, sess)
- eq_(
- canary.after_bulk_update_legacy.mock_calls,
- [call(sess, upd.query, upd.context, upd.result)],
- )
def test_on_bulk_delete_hook(self):
User, users = self.classes.User, self.tables.users
@@ -1787,11 +1778,6 @@ class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest):
event.listen(sess, "after_begin", canary.after_begin)
event.listen(sess, "after_bulk_delete", canary.after_bulk_delete)
- def legacy(ses, qry, ctx, res):
- canary.after_bulk_delete_legacy(ses, qry, ctx, res)
-
- event.listen(sess, "after_bulk_delete", legacy)
-
mapper(User, users)
sess.query(User).delete()
@@ -1801,10 +1787,6 @@ class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest):
upd = canary.after_bulk_delete.mock_calls[0][1][0]
eq_(upd.session, sess)
- eq_(
- canary.after_bulk_delete_legacy.mock_calls,
- [call(sess, upd.query, upd.context, upd.result)],
- )
def test_connection_emits_after_begin(self):
sess, canary = self._listener_fixture(bind=testing.db)
diff --git a/test/orm/test_transaction.py b/test/orm/test_transaction.py
index 15244d9d2..2b32282ba 100644
--- a/test/orm/test_transaction.py
+++ b/test/orm/test_transaction.py
@@ -1,5 +1,3 @@
-from __future__ import with_statement
-
from sqlalchemy import Column
from sqlalchemy import event
from sqlalchemy import exc as sa_exc
@@ -9,6 +7,7 @@ from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
+from sqlalchemy.future import Engine
from sqlalchemy.orm import attributes
from sqlalchemy.orm import create_session
from sqlalchemy.orm import exc as orm_exc
@@ -62,7 +61,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
c.close()
@engines.close_open_connections
- def test_subtransaction_on_external(self):
+ def test_subtransaction_on_external_subtrans(self):
users, User = self.tables.users, self.classes.User
mapper(User, users)
@@ -78,6 +77,22 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
assert len(sess.query(User).all()) == 0
sess.close()
+ @engines.close_open_connections
+ def test_subtransaction_on_external_no_begin(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+ conn = testing.db.connect()
+ trans = conn.begin()
+ sess = create_session(bind=conn, autocommit=False, autoflush=True)
+ u = User(name="ed")
+ sess.add(u)
+ sess.flush()
+ sess.commit() # commit does nothing
+ trans.rollback() # rolls back
+ assert len(sess.query(User).all()) == 0
+ sess.close()
+
@testing.requires.savepoints
@engines.close_open_connections
def test_external_nested_transaction(self):
@@ -104,6 +119,71 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
conn.close()
raise
+ @engines.close_open_connections
+ def test_subtransaction_on_external_commit_future(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+
+ engine = Engine._future_facade(testing.db)
+
+ conn = engine.connect()
+ conn.begin()
+
+ sess = create_session(bind=conn, autocommit=False, autoflush=True)
+ u = User(name="ed")
+ sess.add(u)
+ sess.flush()
+ sess.commit() # commit does nothing
+ conn.rollback() # rolls back
+ assert len(sess.query(User).all()) == 0
+ sess.close()
+
+ @engines.close_open_connections
+ def test_subtransaction_on_external_rollback_future(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+
+ engine = Engine._future_facade(testing.db)
+
+ conn = engine.connect()
+ conn.begin()
+
+ sess = create_session(bind=conn, autocommit=False, autoflush=True)
+ u = User(name="ed")
+ sess.add(u)
+ sess.flush()
+ sess.rollback() # rolls back
+ conn.commit() # nothing to commit
+ assert len(sess.query(User).all()) == 0
+ sess.close()
+
+ @testing.requires.savepoints
+ @engines.close_open_connections
+ def test_savepoint_on_external_future(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+
+ engine = Engine._future_facade(testing.db)
+
+ with engine.connect() as conn:
+ conn.begin()
+ sess = create_session(bind=conn, autocommit=False, autoflush=True)
+ u1 = User(name="u1")
+ sess.add(u1)
+ sess.flush()
+
+ sess.begin_nested()
+ u2 = User(name="u2")
+ sess.add(u2)
+ sess.flush()
+ sess.rollback()
+
+ conn.commit()
+ assert len(sess.query(User).all()) == 1
+
@testing.requires.savepoints
def test_nested_accounting_new_items_removed(self):
User, users = self.classes.User, self.tables.users
@@ -175,6 +255,40 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
)
@testing.requires.savepoints
+ def test_heavy_nesting_future(self):
+ users = self.tables.users
+
+ engine = Engine._future_facade(testing.db)
+ session = create_session(engine)
+
+ session.begin()
+ session.connection().execute(users.insert().values(name="user1"))
+ session.begin(subtransactions=True)
+ session.begin_nested()
+ session.connection().execute(users.insert().values(name="user2"))
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 2
+ )
+ session.rollback()
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 1
+ )
+ session.connection().execute(users.insert().values(name="user3"))
+ session.commit()
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 2
+ )
+
+ @testing.requires.savepoints
def test_dirty_state_transferred_deep_nesting(self):
User, users = self.classes.User, self.tables.users
@@ -767,17 +881,24 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
return sess, u1
def test_execution_options_begin_transaction(self):
- bind = mock.Mock()
+ bind = mock.Mock(
+ connect=mock.Mock(
+ return_value=mock.Mock(
+ _is_future=False,
+ execution_options=mock.Mock(
+ return_value=mock.Mock(_is_future=False)
+ ),
+ )
+ )
+ )
sess = Session(bind=bind)
c1 = sess.connection(execution_options={"isolation_level": "FOO"})
+ eq_(bind.mock_calls, [mock.call.connect()])
eq_(
- bind.mock_calls,
- [
- mock.call.connect(),
- mock.call.connect().execution_options(isolation_level="FOO"),
- mock.call.connect().execution_options().begin(),
- ],
+ bind.connect().mock_calls,
+ [mock.call.execution_options(isolation_level="FOO")],
)
+ eq_(bind.connect().execution_options().mock_calls, [mock.call.begin()])
eq_(c1, bind.connect().execution_options())
def test_execution_options_ignored_mid_transaction(self):
@@ -914,9 +1035,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
with expect_warnings(".*during handling of a previous exception.*"):
session.begin_nested()
- savepoint = (
- session.connection()._Connection__transaction._savepoint
- )
+ savepoint = session.connection()._transaction._savepoint
# force the savepoint to disappear
session.connection().dialect.do_release_savepoint(
diff --git a/test/requirements.py b/test/requirements.py
index 669e0b7eb..aac376dba 100644
--- a/test/requirements.py
+++ b/test/requirements.py
@@ -1374,6 +1374,10 @@ class DefaultRequirements(SuiteRequirements):
return only_on(["mssql+pymssql"])
@property
+ def legacy_engine(self):
+ return exclusions.skip_if(lambda config: config.db._is_future)
+
+ @property
def ad_hoc_engines(self):
return exclusions.skip_if(
["oracle"],
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py
index fd5aec503..7352810ae 100644
--- a/test/sql/test_defaults.py
+++ b/test/sql/test_defaults.py
@@ -535,7 +535,7 @@ class DefaultRoundTripTest(fixtures.TablesTest):
result = connection.execute(t.select().order_by(t.c.col1))
today = datetime.date.today()
eq_(
- result.fetchall(),
+ list(result),
[
(
x,
@@ -715,9 +715,11 @@ class DefaultRoundTripTest(fixtures.TablesTest):
"group 1",
connection.execute,
t.insert(),
- {"col4": 7, "col7": 12, "col8": 19},
- {"col4": 7, "col8": 19},
- {"col4": 7, "col7": 12, "col8": 19},
+ [
+ {"col4": 7, "col7": 12, "col8": 19},
+ {"col4": 7, "col8": 19},
+ {"col4": 7, "col7": 12, "col8": 19},
+ ],
)
def test_insert_values(self, connection):
@@ -834,6 +836,13 @@ class DefaultRoundTripTest(fixtures.TablesTest):
eq_(55, row._mapping["col3"])
+class FutureDefaultRoundTripTest(
+ fixtures.FutureEngineMixin, DefaultRoundTripTest
+):
+
+ __backend__ = True
+
+
class CTEDefaultTest(fixtures.TablesTest):
__requires__ = ("ctes", "returning", "ctes_on_dml")
__backend__ = True
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py
index 470417dd3..1611dc1ba 100644
--- a/test/sql/test_resultset.py
+++ b/test/sql/test_resultset.py
@@ -29,6 +29,7 @@ from sqlalchemy.engine import default
from sqlalchemy.engine import result as _result
from sqlalchemy.engine import Row
from sqlalchemy.ext.compiler import compiles
+from sqlalchemy.future import select as future_select
from sqlalchemy.sql import ColumnElement
from sqlalchemy.sql import expression
from sqlalchemy.sql.selectable import TextualSelect
@@ -2237,3 +2238,98 @@ class AlternateResultProxyTest(fixtures.TablesTest):
le_(len(result.cursor_strategy._rowbuffer), max_size)
eq_(checks, assertion)
+
+
+class FutureResultTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "users",
+ metadata,
+ Column("user_id", INT, primary_key=True, autoincrement=False),
+ Column("user_name", VARCHAR(20)),
+ Column("x", Integer),
+ Column("y", Integer),
+ test_needs_acid=True,
+ )
+ Table(
+ "users_autoinc",
+ metadata,
+ Column(
+ "user_id", INT, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("user_name", VARCHAR(20)),
+ test_needs_acid=True,
+ )
+
+ def test_fetchall(self, connection):
+ users = self.tables.users
+ connection.execute(
+ users.insert(),
+ [
+ {"user_id": 7, "user_name": "jack", "x": 1, "y": 2},
+ {"user_id": 8, "user_name": "ed", "x": 2, "y": 3},
+ {"user_id": 9, "user_name": "fred", "x": 15, "y": 20},
+ ],
+ )
+
+ result = connection.execute(
+ future_select(users).order_by(users.c.user_id)
+ )
+ eq_(
+ result.all(),
+ [(7, "jack", 1, 2), (8, "ed", 2, 3), (9, "fred", 15, 20)],
+ )
+
+ @testing.combinations(
+ ((1, 0), [("jack", 7), ("ed", 8), ("fred", 9)]),
+ ((3,), [(2,), (3,), (20,)]),
+ ((-2, -1), [(1, 2), (2, 3), (15, 20)]),
+ argnames="columns, expected",
+ )
+ def test_columns(self, connection, columns, expected):
+ users = self.tables.users
+ connection.execute(
+ users.insert(),
+ [
+ {"user_id": 7, "user_name": "jack", "x": 1, "y": 2},
+ {"user_id": 8, "user_name": "ed", "x": 2, "y": 3},
+ {"user_id": 9, "user_name": "fred", "x": 15, "y": 20},
+ ],
+ )
+
+ result = connection.execute(
+ future_select(users).order_by(users.c.user_id)
+ )
+ eq_(result.columns(*columns).all(), expected)
+
+ def test_partitions(self, connection):
+ users = self.tables.users
+ connection.execute(
+ users.insert(),
+ [
+ {
+ "user_id": i,
+ "user_name": "user %s" % i,
+ "x": i * 5,
+ "y": i * 20,
+ }
+ for i in range(500)
+ ],
+ )
+
+ result = connection.execute(
+ future_select(users).order_by(users.c.user_id)
+ )
+
+ start = 0
+ for partition in result.columns(0, 1).partitions(20):
+ eq_(
+ partition,
+ [(i, "user %s" % i) for i in range(start, start + 20)],
+ )
+ start += 20
+
+ assert result._soft_closed
diff --git a/test/sql/test_sequences.py b/test/sql/test_sequences.py
index 8beee514a..1d78c0904 100644
--- a/test/sql/test_sequences.py
+++ b/test/sql/test_sequences.py
@@ -279,6 +279,11 @@ class SequenceExecTest(fixtures.TestBase):
self._assert_seq_result(r.inserted_primary_key[0])
+class FutureSequenceExecTest(fixtures.FutureEngineMixin, SequenceExecTest):
+ __requires__ = ("sequences",)
+ __backend__ = True
+
+
class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__requires__ = ("sequences",)
__backend__ = True
@@ -396,6 +401,11 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
eq_(result.inserted_primary_key, [1])
+class FutureSequenceTest(fixtures.FutureEngineMixin, SequenceTest):
+ __requires__ = ("sequences",)
+ __backend__ = True
+
+
class TableBoundSequenceTest(fixtures.TablesTest):
__requires__ = ("sequences",)
__backend__ = True