summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-04-07 14:15:43 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-04-16 13:35:55 -0400
commit2f617f56f2acdce00b88f746c403cf5ed66d4d27 (patch)
tree0962f2c43c1a361135ecdab933167fa0963ae58a /lib/sqlalchemy
parentbd303b10e2bf69169f07447c7272fc71ac931f10 (diff)
downloadsqlalchemy-2f617f56f2acdce00b88f746c403cf5ed66d4d27.tar.gz
Create initial 2.0 engine implementation
Implemented the SQLAlchemy 2 :func:`.future.create_engine` function which is used for forwards compatibility with SQLAlchemy 2. This engine features always-transactional behavior with autobegin. Allow execution options per statement execution. This includes that the before_execute() and after_execute() events now accept an additional dictionary with these options, empty if not passed; a legacy event decorator is added for backwards compatibility which now also emits a deprecation warning. Add some basic tests for execution, transactions, and the new result object. Build out on a new testing fixture that swaps in the future engine completely to start with. Change-Id: I70e7338bb3f0ce22d2f702537d94bb249bd9fb0a Fixes: #4644
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r--lib/sqlalchemy/cextension/utils.c7
-rw-r--r--lib/sqlalchemy/dialects/mssql/base.py32
-rw-r--r--lib/sqlalchemy/dialects/mysql/base.py17
-rw-r--r--lib/sqlalchemy/dialects/oracle/base.py35
-rw-r--r--lib/sqlalchemy/dialects/postgresql/base.py6
-rw-r--r--lib/sqlalchemy/engine/base.py316
-rw-r--r--lib/sqlalchemy/engine/create.py3
-rw-r--r--lib/sqlalchemy/engine/default.py127
-rw-r--r--lib/sqlalchemy/engine/events.py68
-rw-r--r--lib/sqlalchemy/engine/result.py4
-rw-r--r--lib/sqlalchemy/engine/util.py32
-rw-r--r--lib/sqlalchemy/event/attr.py2
-rw-r--r--lib/sqlalchemy/event/legacy.py20
-rw-r--r--lib/sqlalchemy/future/__init__.py4
-rw-r--r--lib/sqlalchemy/future/engine.py434
-rw-r--r--lib/sqlalchemy/future/result.py181
-rw-r--r--lib/sqlalchemy/log.py19
-rw-r--r--lib/sqlalchemy/orm/session.py19
-rw-r--r--lib/sqlalchemy/sql/base.py11
-rw-r--r--lib/sqlalchemy/sql/compiler.py8
-rw-r--r--lib/sqlalchemy/sql/ddl.py8
-rw-r--r--lib/sqlalchemy/sql/elements.py8
-rw-r--r--lib/sqlalchemy/sql/functions.py8
-rw-r--r--lib/sqlalchemy/sql/schema.py8
-rw-r--r--lib/sqlalchemy/testing/assertsql.py4
-rw-r--r--lib/sqlalchemy/testing/config.py8
-rw-r--r--lib/sqlalchemy/testing/engines.py7
-rw-r--r--lib/sqlalchemy/testing/fixtures.py23
-rw-r--r--lib/sqlalchemy/testing/suite/test_ddl.py6
-rw-r--r--lib/sqlalchemy/testing/warnings.py7
-rw-r--r--lib/sqlalchemy/util/deprecations.py2
31 files changed, 1225 insertions, 209 deletions
diff --git a/lib/sqlalchemy/cextension/utils.c b/lib/sqlalchemy/cextension/utils.c
index a437adc70..fb7fbe4e6 100644
--- a/lib/sqlalchemy/cextension/utils.c
+++ b/lib/sqlalchemy/cextension/utils.c
@@ -23,6 +23,9 @@ the MIT License: http://www.opensource.org/licenses/mit-license.php
static PyObject *
distill_params(PyObject *self, PyObject *args)
{
+ // TODO: pass the Connection in so that there can be a standard
+ // method for warning on parameter format
+
PyObject *multiparams, *params;
PyObject *enclosing_list, *double_enclosing_list;
PyObject *zero_element, *zero_element_item;
@@ -44,6 +47,8 @@ distill_params(PyObject *self, PyObject *args)
if (multiparam_size == 0) {
if (params != Py_None && PyDict_Size(params) != 0) {
+ // TODO: this is keyword parameters, emit parameter format
+ // deprecation warning
enclosing_list = PyList_New(1);
if (enclosing_list == NULL) {
return NULL;
@@ -152,6 +157,8 @@ distill_params(PyObject *self, PyObject *args)
}
}
else {
+ // TODO: this is multiple positional params, emit parameter format
+ // deprecation warning
zero_element = PyTuple_GetItem(multiparams, 0);
if (PyObject_HasAttrString(zero_element, "__iter__") &&
!PyObject_HasAttrString(zero_element, "strip")
diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py
index b0021e873..df6196bae 100644
--- a/lib/sqlalchemy/dialects/mssql/base.py
+++ b/lib/sqlalchemy/dialects/mssql/base.py
@@ -2544,16 +2544,20 @@ class MSDialect(default.DefaultDialect):
@_db_plus_owner
def has_table(self, connection, tablename, dbname, owner, schema):
- columns = ischema.columns
+ tables = ischema.tables
- whereclause = columns.c.table_name == tablename
+ s = sql.select([tables.c.table_name]).where(
+ sql.and_(
+ tables.c.table_type == "BASE TABLE",
+ tables.c.table_name == tablename,
+ )
+ )
if owner:
- whereclause = sql.and_(
- whereclause, columns.c.table_schema == owner
- )
- s = sql.select([columns], whereclause)
+ s = s.where(tables.c.table_schema == owner)
+
c = connection.execute(s)
+
return c.first() is not None
@reflection.cache
@@ -2569,13 +2573,15 @@ class MSDialect(default.DefaultDialect):
@_db_plus_owner_listing
def get_table_names(self, connection, dbname, owner, schema, **kw):
tables = ischema.tables
- s = sql.select(
- [tables.c.table_name],
- sql.and_(
- tables.c.table_schema == owner,
- tables.c.table_type == "BASE TABLE",
- ),
- order_by=[tables.c.table_name],
+ s = (
+ sql.select([tables.c.table_name])
+ .where(
+ sql.and_(
+ tables.c.table_schema == owner,
+ tables.c.table_type == "BASE TABLE",
+ )
+ )
+ .order_by(tables.c.table_name)
)
table_names = [r[0] for r in connection.execute(s)]
return table_names
diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py
index c7c3bd433..53c916304 100644
--- a/lib/sqlalchemy/dialects/mysql/base.py
+++ b/lib/sqlalchemy/dialects/mysql/base.py
@@ -2379,25 +2379,25 @@ class MySQLDialect(default.DefaultDialect):
raise
def do_begin_twophase(self, connection, xid):
- connection.execute(sql.text("XA BEGIN :xid"), xid=xid)
+ connection.execute(sql.text("XA BEGIN :xid"), dict(xid=xid))
def do_prepare_twophase(self, connection, xid):
- connection.execute(sql.text("XA END :xid"), xid=xid)
- connection.execute(sql.text("XA PREPARE :xid"), xid=xid)
+ connection.execute(sql.text("XA END :xid"), dict(xid=xid))
+ connection.execute(sql.text("XA PREPARE :xid"), dict(xid=xid))
def do_rollback_twophase(
self, connection, xid, is_prepared=True, recover=False
):
if not is_prepared:
- connection.execute(sql.text("XA END :xid"), xid=xid)
- connection.execute(sql.text("XA ROLLBACK :xid"), xid=xid)
+ connection.execute(sql.text("XA END :xid"), dict(xid=xid))
+ connection.execute(sql.text("XA ROLLBACK :xid"), dict(xid=xid))
def do_commit_twophase(
self, connection, xid, is_prepared=True, recover=False
):
if not is_prepared:
self.do_prepare_twophase(connection, xid)
- connection.execute(sql.text("XA COMMIT :xid"), xid=xid)
+ connection.execute(sql.text("XA COMMIT :xid"), dict(xid=xid))
def do_recover_twophase(self, connection):
resultset = connection.exec_driver_sql("XA RECOVER")
@@ -2501,8 +2501,7 @@ class MySQLDialect(default.DefaultDialect):
"WHERE TABLE_NAME=:name AND "
"TABLE_SCHEMA=:schema_name"
),
- name=sequence_name,
- schema_name=schema,
+ dict(name=sequence_name, schema_name=schema),
)
return cursor.first() is not None
@@ -2750,7 +2749,7 @@ class MySQLDialect(default.DefaultDialect):
:table_data;
"""
).bindparams(sql.bindparam("table_data", expanding=True)),
- table_data=col_tuples,
+ dict(table_data=col_tuples),
)
# in casing=0, table name and schema name come back in their
diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py
index 50fa71d7e..e0d33cf37 100644
--- a/lib/sqlalchemy/dialects/oracle/base.py
+++ b/lib/sqlalchemy/dialects/oracle/base.py
@@ -1467,8 +1467,10 @@ class OracleDialect(default.DefaultDialect):
"SELECT table_name FROM all_tables "
"WHERE table_name = :name AND owner = :schema_name"
),
- name=self.denormalize_name(table_name),
- schema_name=self.denormalize_name(schema),
+ dict(
+ name=self.denormalize_name(table_name),
+ schema_name=self.denormalize_name(schema),
+ ),
)
return cursor.first() is not None
@@ -1481,8 +1483,10 @@ class OracleDialect(default.DefaultDialect):
"WHERE sequence_name = :name AND "
"sequence_owner = :schema_name"
),
- name=self.denormalize_name(sequence_name),
- schema_name=self.denormalize_name(schema),
+ dict(
+ name=self.denormalize_name(sequence_name),
+ schema_name=self.denormalize_name(schema),
+ ),
)
return cursor.first() is not None
@@ -1525,7 +1529,7 @@ class OracleDialect(default.DefaultDialect):
q += " AND ".join(clauses)
result = connection.execution_options(future_result=True).execute(
- sql.text(q), **params
+ sql.text(q), params
)
if desired_owner:
row = result.mappings().first()
@@ -1621,7 +1625,7 @@ class OracleDialect(default.DefaultDialect):
"OWNER = :owner " "AND IOT_NAME IS NULL " "AND DURATION IS NULL"
)
- cursor = connection.execute(sql.text(sql_str), owner=schema)
+ cursor = connection.execute(sql.text(sql_str), dict(owner=schema))
return [self.normalize_name(row[0]) for row in cursor]
@reflection.cache
@@ -1641,14 +1645,16 @@ class OracleDialect(default.DefaultDialect):
"AND DURATION IS NOT NULL"
)
- cursor = connection.execute(sql.text(sql_str), owner=schema)
+ cursor = connection.execute(sql.text(sql_str), dict(owner=schema))
return [self.normalize_name(row[0]) for row in cursor]
@reflection.cache
def get_view_names(self, connection, schema=None, **kw):
schema = self.denormalize_name(schema or self.default_schema_name)
s = sql.text("SELECT view_name FROM all_views WHERE owner = :owner")
- cursor = connection.execute(s, owner=self.denormalize_name(schema))
+ cursor = connection.execute(
+ s, dict(owner=self.denormalize_name(schema))
+ )
return [self.normalize_name(row[0]) for row in cursor]
@reflection.cache
@@ -1687,7 +1693,7 @@ class OracleDialect(default.DefaultDialect):
text += " AND owner = :owner "
text = text % {"dblink": dblink, "columns": ", ".join(columns)}
- result = connection.execute(sql.text(text), **params)
+ result = connection.execute(sql.text(text), params)
enabled = dict(DISABLED=False, ENABLED=True)
@@ -1752,7 +1758,7 @@ class OracleDialect(default.DefaultDialect):
text += " ORDER BY col.column_id"
text = text % {"dblink": dblink, "char_length_col": char_length_col}
- c = connection.execute(sql.text(text), **params)
+ c = connection.execute(sql.text(text), params)
for row in c:
colname = self.normalize_name(row[0])
@@ -1842,7 +1848,8 @@ class OracleDialect(default.DefaultDialect):
"""
c = connection.execute(
- sql.text(COMMENT_SQL), table_name=table_name, schema_name=schema
+ sql.text(COMMENT_SQL),
+ dict(table_name=table_name, schema_name=schema),
)
return {"text": c.scalar()}
@@ -1890,7 +1897,7 @@ class OracleDialect(default.DefaultDialect):
text = text % {"dblink": dblink}
q = sql.text(text)
- rp = connection.execute(q, **params)
+ rp = connection.execute(q, params)
indexes = []
last_index_name = None
pk_constraint = self.get_pk_constraint(
@@ -1987,7 +1994,7 @@ class OracleDialect(default.DefaultDialect):
)
text = text % {"dblink": dblink}
- rp = connection.execute(sql.text(text), **params)
+ rp = connection.execute(sql.text(text), params)
constraint_data = rp.fetchall()
return constraint_data
@@ -2215,7 +2222,7 @@ class OracleDialect(default.DefaultDialect):
text += " AND owner = :schema"
params["schema"] = schema
- rp = connection.execute(sql.text(text), **params).scalar()
+ rp = connection.execute(sql.text(text), params).scalar()
if rp:
if util.py2k:
rp = rp.decode(self.encoding)
diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py
index f9b3d9b95..20540ac02 100644
--- a/lib/sqlalchemy/dialects/postgresql/base.py
+++ b/lib/sqlalchemy/dialects/postgresql/base.py
@@ -2753,7 +2753,7 @@ class PGDialect(default.DefaultDialect):
s = s.columns(oid=sqltypes.Integer)
if schema:
s = s.bindparams(sql.bindparam("schema", type_=sqltypes.Unicode))
- c = connection.execute(s, table_name=table_name, schema=schema)
+ c = connection.execute(s, dict(table_name=table_name, schema=schema))
table_oid = c.scalar()
if table_oid is None:
raise exc.NoSuchTableError(table_name)
@@ -3519,7 +3519,9 @@ class PGDialect(default.DefaultDialect):
pgd.objoid = :table_oid
"""
- c = connection.execute(sql.text(COMMENT_SQL), table_oid=table_oid)
+ c = connection.execute(
+ sql.text(COMMENT_SQL), dict(table_oid=table_oid)
+ )
return {"text": c.scalar()}
@reflection.cache
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 8a340d9ce..09e700b5c 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -12,6 +12,7 @@ import sys
from .interfaces import Connectable
from .interfaces import ExceptionContext
from .util import _distill_params
+from .util import _distill_params_20
from .. import exc
from .. import inspection
from .. import log
@@ -52,6 +53,8 @@ class Connection(Connectable):
"""
_schema_translate_map = None
+ _is_future = False
+ _sqla_logger_namespace = "sqlalchemy.engine.Connection"
def __init__(
self,
@@ -85,7 +88,7 @@ class Connection(Connectable):
if connection is not None
else engine.raw_connection()
)
- self.__transaction = None
+ self._transaction = None
self.__savepoint_seq = 0
self.should_close_with_result = close_with_result
@@ -168,13 +171,15 @@ class Connection(Connectable):
else:
return self
- def _clone(self):
- """Create a shallow copy of this Connection.
+ def _generate_for_options(self):
+ """define connection method chaining behavior for execution_options"""
- """
- c = self.__class__.__new__(self.__class__)
- c.__dict__ = self.__dict__.copy()
- return c
+ if self._is_future:
+ return self
+ else:
+ c = self.__class__.__new__(self.__class__)
+ c.__dict__ = self.__dict__.copy()
+ return c
def __enter__(self):
return self
@@ -340,7 +345,7 @@ class Connection(Connectable):
""" # noqa
- c = self._clone()
+ c = self._generate_for_options()
c._execution_options = c._execution_options.union(opt)
if self._has_events or self.engine._has_events:
self.dispatch.set_connection_execution_options(c, opt)
@@ -469,7 +474,7 @@ class Connection(Connectable):
if self.__branch_from:
return self.__branch_from._revalidate_connection()
if self.__can_reconnect and self.__invalid:
- if self.__transaction is not None:
+ if self._transaction is not None:
raise exc.InvalidRequestError(
"Can't reconnect until invalid "
"transaction is rolled back"
@@ -640,14 +645,21 @@ class Connection(Connectable):
:class:`_engine.Engine`
"""
- if self.__branch_from:
+ if self._is_future:
+ assert not self.__branch_from
+ elif self.__branch_from:
return self.__branch_from.begin()
- if self.__transaction is None:
- self.__transaction = RootTransaction(self)
- return self.__transaction
+ if self._transaction is None:
+ self._transaction = RootTransaction(self)
+ return self._transaction
else:
- return Transaction(self, self.__transaction)
+ if self._is_future:
+ raise exc.InvalidRequestError(
+ "a transaction is already begun for this connection"
+ )
+ else:
+ return Transaction(self, self._transaction)
def begin_nested(self):
"""Begin a nested transaction and return a transaction handle.
@@ -667,14 +679,22 @@ class Connection(Connectable):
:meth:`_engine.Connection.begin_twophase`
"""
- if self.__branch_from:
+ if self._is_future:
+ assert not self.__branch_from
+ elif self.__branch_from:
return self.__branch_from.begin_nested()
- if self.__transaction is None:
- self.__transaction = RootTransaction(self)
- else:
- self.__transaction = NestedTransaction(self, self.__transaction)
- return self.__transaction
+ if self._transaction is None:
+ if self._is_future:
+ self._autobegin()
+ else:
+ self._transaction = RootTransaction(self)
+ return self._transaction
+
+ trans = NestedTransaction(self, self._transaction)
+ if not self._is_future:
+ self._transaction = trans
+ return trans
def begin_twophase(self, xid=None):
"""Begin a two-phase or XA transaction and return a transaction
@@ -699,15 +719,15 @@ class Connection(Connectable):
if self.__branch_from:
return self.__branch_from.begin_twophase(xid=xid)
- if self.__transaction is not None:
+ if self._transaction is not None:
raise exc.InvalidRequestError(
"Cannot start a two phase transaction when a transaction "
"is already in progress."
)
if xid is None:
xid = self.engine.dialect.create_xid()
- self.__transaction = TwoPhaseTransaction(self, xid)
- return self.__transaction
+ self._transaction = TwoPhaseTransaction(self, xid)
+ return self._transaction
def recover_twophase(self):
return self.engine.dialect.do_recover_twophase(self)
@@ -721,8 +741,8 @@ class Connection(Connectable):
def in_transaction(self):
"""Return True if a transaction is in progress."""
return (
- self._root.__transaction is not None
- and self._root.__transaction.is_active
+ self._root._transaction is not None
+ and self._root._transaction.is_active
)
def _begin_impl(self, transaction):
@@ -736,7 +756,7 @@ class Connection(Connectable):
try:
self.engine.dialect.do_begin(self.connection)
- if self.connection._reset_agent is None:
+ if not self._is_future and self.connection._reset_agent is None:
self.connection._reset_agent = transaction
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
@@ -757,7 +777,7 @@ class Connection(Connectable):
finally:
if (
not self.__invalid
- and self.connection._reset_agent is self.__transaction
+ and self.connection._reset_agent is self._transaction
):
self.connection._reset_agent = None
@@ -776,10 +796,10 @@ class Connection(Connectable):
finally:
if (
not self.__invalid
- and self.connection._reset_agent is self.__transaction
+ and self.connection._reset_agent is self._transaction
):
self.connection._reset_agent = None
- self.__transaction = None
+ self._transaction = None
def _savepoint_impl(self, name=None):
assert not self.__branch_from
@@ -795,13 +815,13 @@ class Connection(Connectable):
return name
def _discard_transaction(self, trans):
- if trans is self.__transaction:
+ if trans is self._transaction:
if trans._is_root:
assert trans._parent is trans
- self.__transaction = None
+ self._transaction = None
else:
assert trans._parent is not trans
- self.__transaction = trans._parent
+ self._transaction = trans._parent
def _rollback_to_savepoint_impl(
self, name, context, deactivate_only=False
@@ -822,7 +842,7 @@ class Connection(Connectable):
if self._still_open_and_connection_is_valid:
self.engine.dialect.do_release_savepoint(self, name)
- self.__transaction = context
+ self._transaction = context
def _begin_twophase_impl(self, transaction):
assert not self.__branch_from
@@ -835,7 +855,7 @@ class Connection(Connectable):
if self._still_open_and_connection_is_valid:
self.engine.dialect.do_begin_twophase(self, transaction.xid)
- if self.connection._reset_agent is None:
+ if not self._is_future and self.connection._reset_agent is None:
self.connection._reset_agent = transaction
def _prepare_twophase_impl(self, xid):
@@ -845,7 +865,7 @@ class Connection(Connectable):
self.dispatch.prepare_twophase(self, xid)
if self._still_open_and_connection_is_valid:
- assert isinstance(self.__transaction, TwoPhaseTransaction)
+ assert isinstance(self._transaction, TwoPhaseTransaction)
self.engine.dialect.do_prepare_twophase(self, xid)
def _rollback_twophase_impl(self, xid, is_prepared):
@@ -855,17 +875,17 @@ class Connection(Connectable):
self.dispatch.rollback_twophase(self, xid, is_prepared)
if self._still_open_and_connection_is_valid:
- assert isinstance(self.__transaction, TwoPhaseTransaction)
+ assert isinstance(self._transaction, TwoPhaseTransaction)
try:
self.engine.dialect.do_rollback_twophase(
self, xid, is_prepared
)
finally:
- if self.connection._reset_agent is self.__transaction:
+ if self.connection._reset_agent is self._transaction:
self.connection._reset_agent = None
- self.__transaction = None
+ self._transaction = None
else:
- self.__transaction = None
+ self._transaction = None
def _commit_twophase_impl(self, xid, is_prepared):
assert not self.__branch_from
@@ -874,15 +894,20 @@ class Connection(Connectable):
self.dispatch.commit_twophase(self, xid, is_prepared)
if self._still_open_and_connection_is_valid:
- assert isinstance(self.__transaction, TwoPhaseTransaction)
+ assert isinstance(self._transaction, TwoPhaseTransaction)
try:
self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
finally:
- if self.connection._reset_agent is self.__transaction:
+ if self.connection._reset_agent is self._transaction:
self.connection._reset_agent = None
- self.__transaction = None
+ self._transaction = None
else:
- self.__transaction = None
+ self._transaction = None
+
+ def _autobegin(self):
+ assert self._is_future
+
+ return self.begin()
def _autorollback(self):
if not self._root.in_transaction():
@@ -907,6 +932,8 @@ class Connection(Connectable):
and will allow no further operations.
"""
+ assert not self._is_future
+
if self.__branch_from:
util.warn_deprecated_20(
"The .close() method on a so-called 'branched' connection is "
@@ -929,7 +956,7 @@ class Connection(Connectable):
else:
conn.close()
- if conn._reset_agent is self.__transaction:
+ if conn._reset_agent is self._transaction:
conn._reset_agent = None
# the close() process can end up invalidating us,
@@ -938,7 +965,7 @@ class Connection(Connectable):
if not self.__invalid:
del self.__connection
self.__can_reconnect = False
- self.__transaction = None
+ self._transaction = None
def scalar(self, object_, *multiparams, **params):
"""Executes and returns the first column of the first row.
@@ -1030,8 +1057,11 @@ class Connection(Connectable):
"or the Connection.exec_driver_sql() method to invoke a "
"driver-level SQL string."
)
- distilled_params = _distill_params(multiparams, params)
- return self._exec_driver_sql_distilled(object_, distilled_params)
+ distilled_parameters = _distill_params(multiparams, params)
+
+ return self._exec_driver_sql(
+ object_, multiparams, params, distilled_parameters
+ )
try:
meth = object_._execute_on_connection
except AttributeError as err:
@@ -1039,20 +1069,28 @@ class Connection(Connectable):
exc.ObjectNotExecutableError(object_), replace_context=err
)
else:
- return meth(self, multiparams, params)
+ return meth(self, multiparams, params, util.immutabledict())
- def _execute_function(self, func, multiparams, params):
+ def _execute_function(
+ self, func, multiparams, params, execution_options=util.immutabledict()
+ ):
"""Execute a sql.FunctionElement object."""
return self._execute_clauseelement(func.select(), multiparams, params)
- def _execute_default(self, default, multiparams, params):
+ def _execute_default(
+ self,
+ default,
+ multiparams,
+ params,
+ execution_options=util.immutabledict(),
+ ):
"""Execute a schema.ColumnDefault object."""
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_execute:
default, multiparams, params = fn(
- self, default, multiparams, params
+ self, default, multiparams, params, execution_options
)
try:
@@ -1066,7 +1104,9 @@ class Connection(Connectable):
conn = self._revalidate_connection()
dialect = self.dialect
- ctx = dialect.execution_ctx_cls._init_default(dialect, self, conn)
+ ctx = dialect.execution_ctx_cls._init_default(
+ dialect, self, conn, execution_options
+ )
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
@@ -1076,17 +1116,21 @@ class Connection(Connectable):
if self._has_events or self.engine._has_events:
self.dispatch.after_execute(
- self, default, multiparams, params, ret
+ self, default, multiparams, params, execution_options, ret
)
return ret
- def _execute_ddl(self, ddl, multiparams, params):
+ def _execute_ddl(
+ self, ddl, multiparams, params, execution_options=util.immutabledict()
+ ):
"""Execute a schema.DDL object."""
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_execute:
- ddl, multiparams, params = fn(self, ddl, multiparams, params)
+ ddl, multiparams, params = fn(
+ self, ddl, multiparams, params, execution_options
+ )
dialect = self.dialect
@@ -1098,18 +1142,25 @@ class Connection(Connectable):
dialect.execution_ctx_cls._init_ddl,
compiled,
None,
+ execution_options,
compiled,
)
if self._has_events or self.engine._has_events:
- self.dispatch.after_execute(self, ddl, multiparams, params, ret)
+ self.dispatch.after_execute(
+ self, ddl, multiparams, params, execution_options, ret
+ )
return ret
- def _execute_clauseelement(self, elem, multiparams, params):
+ def _execute_clauseelement(
+ self, elem, multiparams, params, execution_options=util.immutabledict()
+ ):
"""Execute a sql.ClauseElement object."""
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_execute:
- elem, multiparams, params = fn(self, elem, multiparams, params)
+ elem, multiparams, params = fn(
+ self, elem, multiparams, params, execution_options
+ )
distilled_params = _distill_params(multiparams, params)
if distilled_params:
@@ -1121,22 +1172,31 @@ class Connection(Connectable):
dialect = self.dialect
- if "compiled_cache" in self._execution_options:
- elem_cache_key, extracted_params = elem._generate_cache_key()
+ exec_opts = self._execution_options
+ if execution_options:
+ exec_opts = exec_opts.union(execution_options)
+
+ if "compiled_cache" in exec_opts:
+ elem_cache_key = elem._generate_cache_key()
+ else:
+ elem_cache_key = None
+
+ if elem_cache_key:
+ cache_key, extracted_params = elem_cache_key
key = (
dialect,
- elem_cache_key,
+ cache_key,
tuple(sorted(keys)),
bool(self._schema_translate_map),
len(distilled_params) > 1,
)
- cache = self._execution_options["compiled_cache"]
+ cache = exec_opts["compiled_cache"]
compiled_sql = cache.get(key)
if compiled_sql is None:
compiled_sql = elem.compile(
dialect=dialect,
- cache_key=(elem_cache_key, extracted_params),
+ cache_key=elem_cache_key,
column_keys=keys,
inline=len(distilled_params) > 1,
schema_translate_map=self._schema_translate_map,
@@ -1160,22 +1220,31 @@ class Connection(Connectable):
dialect.execution_ctx_cls._init_compiled,
compiled_sql,
distilled_params,
+ execution_options,
compiled_sql,
distilled_params,
elem,
extracted_params,
)
if self._has_events or self.engine._has_events:
- self.dispatch.after_execute(self, elem, multiparams, params, ret)
+ self.dispatch.after_execute(
+ self, elem, multiparams, params, execution_options, ret
+ )
return ret
- def _execute_compiled(self, compiled, multiparams, params):
+ def _execute_compiled(
+ self,
+ compiled,
+ multiparams,
+ params,
+ execution_options=util.immutabledict(),
+ ):
"""Execute a sql.Compiled object."""
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_execute:
compiled, multiparams, params = fn(
- self, compiled, multiparams, params
+ self, compiled, multiparams, params, execution_options
)
dialect = self.dialect
@@ -1185,6 +1254,7 @@ class Connection(Connectable):
dialect.execution_ctx_cls._init_compiled,
compiled,
parameters,
+ execution_options,
compiled,
parameters,
None,
@@ -1192,16 +1262,23 @@ class Connection(Connectable):
)
if self._has_events or self.engine._has_events:
self.dispatch.after_execute(
- self, compiled, multiparams, params, ret
+ self, compiled, multiparams, params, execution_options, ret
)
return ret
- def _exec_driver_sql_distilled(self, statement, parameters):
+ def _exec_driver_sql(
+ self,
+ statement,
+ multiparams,
+ params,
+ distilled_parameters,
+ execution_options=util.immutabledict(),
+ ):
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_execute:
statement, multiparams, params = fn(
- self, statement, parameters, {}
+ self, statement, multiparams, params, execution_options
)
dialect = self.dialect
@@ -1209,15 +1286,38 @@ class Connection(Connectable):
dialect,
dialect.execution_ctx_cls._init_statement,
statement,
- parameters,
+ distilled_parameters,
+ execution_options,
statement,
- parameters,
+ distilled_parameters,
)
if self._has_events or self.engine._has_events:
- self.dispatch.after_execute(self, statement, parameters, {})
+ self.dispatch.after_execute(
+ self, statement, multiparams, params, execution_options, ret
+ )
return ret
- def exec_driver_sql(self, statement, parameters=None):
+ def _execute_20(
+ self,
+ statement,
+ parameters=None,
+ execution_options=util.immutabledict(),
+ ):
+ multiparams, params, distilled_parameters = _distill_params_20(
+ parameters
+ )
+ try:
+ meth = statement._execute_on_connection
+ except AttributeError as err:
+ util.raise_(
+ exc.ObjectNotExecutableError(statement), replace_context=err
+ )
+ else:
+ return meth(self, multiparams, params, execution_options)
+
+ def exec_driver_sql(
+ self, statement, parameters=None, execution_options=None
+ ):
r"""Executes a SQL statement construct and returns a
:class:`_engine.ResultProxy`.
@@ -1258,22 +1358,33 @@ class Connection(Connectable):
"""
- if isinstance(parameters, list) and parameters:
- if not isinstance(parameters[0], (dict, tuple)):
- raise exc.ArgumentError(
- "List argument must consist only of tuples or dictionaries"
- )
- elif isinstance(parameters, (dict, tuple)):
- parameters = [parameters]
+ multiparams, params, distilled_parameters = _distill_params_20(
+ parameters
+ )
- return self._exec_driver_sql_distilled(statement, parameters or ())
+ return self._exec_driver_sql(
+ statement,
+ multiparams,
+ params,
+ distilled_parameters,
+ execution_options,
+ )
def _execute_context(
- self, dialect, constructor, statement, parameters, *args
+ self,
+ dialect,
+ constructor,
+ statement,
+ parameters,
+ execution_options,
+ *args
):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.ResultProxy`."""
+ if execution_options:
+ dialect.set_exec_execution_options(self, execution_options)
+
try:
try:
conn = self.__connection
@@ -1284,23 +1395,29 @@ class Connection(Connectable):
if conn is None:
conn = self._revalidate_connection()
- context = constructor(dialect, self, conn, *args)
+ context = constructor(
+ dialect, self, conn, execution_options, *args
+ )
except BaseException as e:
self._handle_dbapi_exception(
e, util.text_type(statement), parameters, None, None
)
- if self._root.__transaction and not self._root.__transaction.is_active:
+ if self._root._transaction and not self._root._transaction.is_active:
raise exc.InvalidRequestError(
"This connection is on an inactive %stransaction. "
"Please rollback() fully before proceeding."
% (
"savepoint "
- if isinstance(self.__transaction, NestedTransaction)
+ if isinstance(self._transaction, NestedTransaction)
else ""
),
code="8s2a",
)
+
+ if self._is_future and self._root._transaction is None:
+ self._autobegin()
+
if context.compiled:
context.pre_exec()
@@ -1386,12 +1503,17 @@ class Connection(Connectable):
result = context._setup_result_proxy()
- if context.should_autocommit and self._root.__transaction is None:
+ if (
+ not self._is_future
+ and context.should_autocommit
+ and self._root._transaction is None
+ ):
self._root._commit_impl(autocommit=True)
# for "connectionless" execution, we have to close this
# Connection after the statement is complete.
if self.should_close_with_result:
+ assert not self._is_future
assert not context._is_future_result
# ResultProxy already exhausted rows / has no rows.
@@ -1600,6 +1722,7 @@ class Connection(Connectable):
self.engine.pool._invalidate(dbapi_conn_wrapper, e)
self.invalidate(e)
if self.should_close_with_result:
+ assert not self._is_future
self.close()
@classmethod
@@ -1991,6 +2114,8 @@ class Engine(Connectable, log.Identified):
_execution_options = util.immutabledict()
_has_events = False
_connection_cls = Connection
+ _sqla_logger_namespace = "sqlalchemy.engine.Engine"
+ _is_future = False
_schema_translate_map = None
@@ -2114,7 +2239,7 @@ class Engine(Connectable, log.Identified):
"""
- return OptionEngine(self, opt)
+ return self._option_cls(self, opt)
def get_execution_options(self):
""" Get the non-SQL options which will take effect during execution.
@@ -2200,7 +2325,8 @@ class Engine(Connectable, log.Identified):
if type_ is not None:
self.transaction.rollback()
else:
- self.transaction.commit()
+ if self.transaction.is_active:
+ self.transaction.commit()
if not self.close_with_result:
self.conn.close()
@@ -2239,7 +2365,10 @@ class Engine(Connectable, log.Identified):
for a particular :class:`_engine.Connection`.
"""
- conn = self.connect(close_with_result=close_with_result)
+ if self._connection_cls._is_future:
+ conn = self.connect()
+ else:
+ conn = self.connect(close_with_result=close_with_result)
try:
trans = conn.begin()
except:
@@ -2477,7 +2606,7 @@ class Engine(Connectable, log.Identified):
return self._wrap_pool_connect(self.pool.connect, _connection)
-class OptionEngine(Engine):
+class OptionEngineMixin(object):
_sa_propagate_class_events = False
def __init__(self, proxied, execution_options):
@@ -2523,3 +2652,10 @@ class OptionEngine(Engine):
self.__dict__["_has_events"] = value
_has_events = property(_get_has_events, _set_has_events)
+
+
+class OptionEngine(OptionEngineMixin, Engine):
+ pass
+
+
+Engine._option_cls = OptionEngine
diff --git a/lib/sqlalchemy/engine/create.py b/lib/sqlalchemy/engine/create.py
index e5b8a87d3..a53e94f1e 100644
--- a/lib/sqlalchemy/engine/create.py
+++ b/lib/sqlalchemy/engine/create.py
@@ -532,7 +532,8 @@ def create_engine(url, **kwargs):
pool._dialect = dialect
# create engine.
- engineclass = base.Engine
+ engineclass = kwargs.pop("_future_engine_class", base.Engine)
+
engine_args = {}
for k in util.get_cls_kwargs(engineclass):
if k in kwargs:
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 5ec13d103..865a1160b 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -500,14 +500,35 @@ class DefaultDialect(interfaces.Dialect):
if "schema_translate_map" in opts:
connection._schema_translate_map = opts["schema_translate_map"]
+ def set_exec_execution_options(self, connection, opts):
+ if "isolation_level" in opts:
+ raise exc.InvalidRequestError(
+ "The 'isolation_level' execution "
+ "option is not supported at the per-statement level"
+ )
+ self._set_connection_isolation(connection, opts["isolation_level"])
+
+ if "schema_translate_map" in opts:
+ raise exc.InvalidRequestError(
+ "The 'schema_translate_map' execution "
+ "option is not supported at the per-statement level"
+ )
+
def _set_connection_isolation(self, connection, level):
if connection.in_transaction():
- util.warn(
- "Connection is already established with a Transaction; "
- "setting isolation_level may implicitly rollback or commit "
- "the existing transaction, or have no effect until "
- "next transaction"
- )
+ if connection._is_future:
+ raise exc.InvalidRequestError(
+ "This connection has already begun a transaction; "
+ "isolation level may not be altered until transaction end"
+ )
+ else:
+ util.warn(
+ "Connection is already established with a Transaction; "
+ "setting isolation_level may implicitly rollback or "
+ "commit "
+ "the existing transaction, or have no effect until "
+ "next transaction"
+ )
self.set_isolation_level(connection.connection, level)
connection.connection._connection_record.finalize_callback.append(
self.reset_isolation_level
@@ -688,6 +709,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
statement = None
result_column_struct = None
returned_defaults = None
+ execution_options = util.immutabledict()
_is_implicit_returning = False
_is_explicit_returning = False
_is_future_result = False
@@ -701,7 +723,14 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
_expanded_parameters = util.immutabledict()
@classmethod
- def _init_ddl(cls, dialect, connection, dbapi_connection, compiled_ddl):
+ def _init_ddl(
+ cls,
+ dialect,
+ connection,
+ dbapi_connection,
+ execution_options,
+ compiled_ddl,
+ ):
"""Initialize execution context for a DDLElement construct."""
self = cls.__new__(cls)
@@ -714,8 +743,18 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
self.execution_options = compiled.execution_options
if connection._execution_options:
- self.execution_options = dict(self.execution_options)
- self.execution_options.update(connection._execution_options)
+ self.execution_options = self.execution_options.union(
+ connection._execution_options
+ )
+ if execution_options:
+ self.execution_options = self.execution_options.union(
+ execution_options
+ )
+
+ self._is_future_result = (
+ connection._is_future
+ or self.execution_options.get("future_result", False)
+ )
self.unicode_statement = util.text_type(compiled)
if compiled.schema_translate_map:
@@ -745,6 +784,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
dialect,
connection,
dbapi_connection,
+ execution_options,
compiled,
parameters,
invoked_statement,
@@ -764,11 +804,19 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
# we get here
assert compiled.can_execute
- self._is_future_result = connection._execution_options.get(
- "future_result", False
- )
- self.execution_options = compiled.execution_options.union(
- connection._execution_options
+ self.execution_options = compiled.execution_options
+ if connection._execution_options:
+ self.execution_options = self.execution_options.union(
+ connection._execution_options
+ )
+ if execution_options:
+ self.execution_options = self.execution_options.union(
+ execution_options
+ )
+
+ self._is_future_result = (
+ connection._is_future
+ or self.execution_options.get("future_result", False)
)
self.result_column_struct = (
@@ -905,7 +953,13 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
@classmethod
def _init_statement(
- cls, dialect, connection, dbapi_connection, statement, parameters
+ cls,
+ dialect,
+ connection,
+ dbapi_connection,
+ execution_options,
+ statement,
+ parameters,
):
"""Initialize execution context for a string SQL statement."""
@@ -915,12 +969,19 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
self.dialect = connection.dialect
self.is_text = True
- self._is_future_result = connection._execution_options.get(
- "future_result", False
- )
+ if connection._execution_options:
+ self.execution_options = self.execution_options.union(
+ connection._execution_options
+ )
+ if execution_options:
+ self.execution_options = self.execution_options.union(
+ execution_options
+ )
- # plain text statement
- self.execution_options = connection._execution_options
+ self._is_future_result = (
+ connection._is_future
+ or self.execution_options.get("future_result", False)
+ )
if not parameters:
if self.dialect.positional:
@@ -956,14 +1017,30 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
return self
@classmethod
- def _init_default(cls, dialect, connection, dbapi_connection):
+ def _init_default(
+ cls, dialect, connection, dbapi_connection, execution_options
+ ):
"""Initialize execution context for a ColumnDefault construct."""
self = cls.__new__(cls)
self.root_connection = connection
self._dbapi_connection = dbapi_connection
self.dialect = connection.dialect
- self.execution_options = connection._execution_options
+
+ if connection._execution_options:
+ self.execution_options = self.execution_options.union(
+ connection._execution_options
+ )
+ if execution_options:
+ self.execution_options = self.execution_options.union(
+ execution_options
+ )
+
+ self._is_future_result = (
+ connection._is_future
+ or self.execution_options.get("future_result", False)
+ )
+
self.cursor = self.create_cursor()
return self
@@ -1043,7 +1120,11 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
@property
def connection(self):
- return self.root_connection._branch()
+ conn = self.root_connection
+ if conn._is_future:
+ return conn
+ else:
+ return conn._branch()
def should_autocommit_text(self, statement):
return AUTOCOMMIT_REGEXP.match(statement)
diff --git a/lib/sqlalchemy/engine/events.py b/lib/sqlalchemy/engine/events.py
index 65b73002c..2ab707b8a 100644
--- a/lib/sqlalchemy/engine/events.py
+++ b/lib/sqlalchemy/engine/events.py
@@ -107,9 +107,15 @@ class ConnectionEvents(event.Events):
orig_fn = fn
def wrap_before_execute(
- conn, clauseelement, multiparams, params
+ conn, clauseelement, multiparams, params, execution_options
):
- orig_fn(conn, clauseelement, multiparams, params)
+ orig_fn(
+ conn,
+ clauseelement,
+ multiparams,
+ params,
+ execution_options,
+ )
return clauseelement, multiparams, params
fn = wrap_before_execute
@@ -143,7 +149,19 @@ class ConnectionEvents(event.Events):
)
event_key.with_wrapper(fn).base_listen()
- def before_execute(self, conn, clauseelement, multiparams, params):
+ @event._legacy_signature(
+ "1.4",
+ ["conn", "clauseelement", "multiparams", "params"],
+ lambda conn, clauseelement, multiparams, params, execution_options: (
+ conn,
+ clauseelement,
+ multiparams,
+ params,
+ ),
+ )
+ def before_execute(
+ self, conn, clauseelement, multiparams, params, execution_options
+ ):
"""Intercept high level execute() events, receiving uncompiled
SQL constructs and other objects prior to rendering into SQL.
@@ -166,6 +184,17 @@ class ConnectionEvents(event.Events):
:meth:`_engine.Connection.execute`.
:param multiparams: Multiple parameter sets, a list of dictionaries.
:param params: Single parameter set, a single dictionary.
+ :param execution_options: dictionary of per-execution execution
+ options passed along with the statement, if any. This only applies to
+ the the SQLAlchemy 2.0 version of :meth:`_engine.Connection.execute`
+ . To
+ view all execution options associated with the connection, access the
+ :meth:`_engine.Connection.get_execution_options`
+ method to view the fixed
+ execution options dictionary, then consider elements within this local
+ dictionary to be unioned into that dictionary.
+
+ .. versionadded: 1.4
.. seealso::
@@ -173,7 +202,26 @@ class ConnectionEvents(event.Events):
"""
- def after_execute(self, conn, clauseelement, multiparams, params, result):
+ @event._legacy_signature(
+ "1.4",
+ ["conn", "clauseelement", "multiparams", "params", "result"],
+ lambda conn, clauseelement, multiparams, params, execution_options, result: ( # noqa
+ conn,
+ clauseelement,
+ multiparams,
+ params,
+ result,
+ ),
+ )
+ def after_execute(
+ self,
+ conn,
+ clauseelement,
+ multiparams,
+ params,
+ execution_options,
+ result,
+ ):
"""Intercept high level execute() events after execute.
@@ -183,6 +231,18 @@ class ConnectionEvents(event.Events):
:meth:`_engine.Connection.execute`.
:param multiparams: Multiple parameter sets, a list of dictionaries.
:param params: Single parameter set, a single dictionary.
+ :param execution_options: dictionary of per-execution execution
+ options passed along with the statement, if any. This only applies to
+ the the SQLAlchemy 2.0 version of :meth:`_engine.Connection.execute`
+ . To
+ view all execution options associated with the connection, access the
+ :meth:`_engine.Connection.get_execution_options`
+ method to view the fixed
+ execution options dictionary, then consider elements within this local
+ dictionary to be unioned into that dictionary.
+
+ .. versionadded: 1.4
+
:param result: :class:`_engine.ResultProxy` generated by the execution
.
diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py
index e1e5e9016..bc3cdbb9a 100644
--- a/lib/sqlalchemy/engine/result.py
+++ b/lib/sqlalchemy/engine/result.py
@@ -78,6 +78,7 @@ class SimpleResultMetaData(ResultMetaData):
for index in range(len_keys)
}
)
+ # TODO: negative indexes? test coverage?
if extra:
for key, ex in zip(keys, extra):
rec = self._keymap[key]
@@ -639,6 +640,9 @@ class CursorResultMetaData(ResultMetaData):
"""
indexes = []
for key in keys:
+ if isinstance(key, int):
+ indexes.append(key)
+ continue
try:
rec = self._keymap[key]
except KeyError as ke:
diff --git a/lib/sqlalchemy/engine/util.py b/lib/sqlalchemy/engine/util.py
index d25927be2..8fb04646f 100644
--- a/lib/sqlalchemy/engine/util.py
+++ b/lib/sqlalchemy/engine/util.py
@@ -5,7 +5,9 @@
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
+from .. import exc
from .. import util
+from ..util import collections_abc
def connection_memoize(key):
@@ -28,6 +30,8 @@ def connection_memoize(key):
def py_fallback():
+ # TODO: pass the Connection in so that there can be a standard
+ # method for warning on parameter format
def _distill_params(multiparams, params): # noqa
r"""Given arguments from the calling form \*multiparams, \**params,
return a list of bind parameter structures, usually a list of
@@ -40,6 +44,7 @@ def py_fallback():
if not multiparams:
if params:
+ # TODO: parameter format deprecation warning
return [params]
else:
return []
@@ -64,6 +69,7 @@ def py_fallback():
# execute(stmt, "value")
return [[zero]]
else:
+ # TODO: parameter format deprecation warning
if hasattr(multiparams[0], "__iter__") and not hasattr(
multiparams[0], "strip"
):
@@ -74,6 +80,32 @@ def py_fallback():
return locals()
+_no_tuple = ()
+_no_kw = util.immutabledict()
+
+
+def _distill_params_20(params):
+ if params is None:
+ return _no_tuple, _no_kw, []
+ elif isinstance(params, collections_abc.MutableSequence): # list
+ if params and not isinstance(
+ params[0], (collections_abc.Mapping, tuple)
+ ):
+ raise exc.ArgumentError(
+ "List argument must consist only of tuples or dictionaries"
+ )
+
+ # the tuple is needed atm by the C version of _distill_params...
+ return tuple(params), _no_kw, params
+ elif isinstance(
+ params,
+ (collections_abc.Sequence, collections_abc.Mapping), # tuple or dict
+ ):
+ return _no_tuple, params, [params]
+ else:
+ raise exc.ArgumentError("mapping or sequence expected for parameters")
+
+
try:
from sqlalchemy.cutils import _distill_params # noqa
except ImportError:
diff --git a/lib/sqlalchemy/event/attr.py b/lib/sqlalchemy/event/attr.py
index cefb640a1..87c6e980f 100644
--- a/lib/sqlalchemy/event/attr.py
+++ b/lib/sqlalchemy/event/attr.py
@@ -71,6 +71,7 @@ class _ClsLevelDispatch(RefCollection):
"""Class-level events on :class:`._Dispatch` classes."""
__slots__ = (
+ "clsname",
"name",
"arg_names",
"has_kw",
@@ -81,6 +82,7 @@ class _ClsLevelDispatch(RefCollection):
def __init__(self, parent_dispatch_cls, fn):
self.name = fn.__name__
+ self.clsname = parent_dispatch_cls.__name__
argspec = util.inspect_getfullargspec(fn)
self.arg_names = argspec.args[1:]
self.has_kw = bool(argspec.varkw)
diff --git a/lib/sqlalchemy/event/legacy.py b/lib/sqlalchemy/event/legacy.py
index 9205cc53a..f63c7d101 100644
--- a/lib/sqlalchemy/event/legacy.py
+++ b/lib/sqlalchemy/event/legacy.py
@@ -35,15 +35,35 @@ def _wrap_fn_for_legacy(dispatch_collection, fn, argspec):
argspec.varkw
):
+ formatted_def = "def %s(%s%s)" % (
+ dispatch_collection.name,
+ ", ".join(dispatch_collection.arg_names),
+ ", **kw" if has_kw else "",
+ )
+ warning_txt = (
+ 'The argument signature for the "%s.%s" event listener '
+ "has changed as of version %s, and conversion for "
+ "the old argument signature will be removed in a "
+ 'future release. The new signature is "%s"'
+ % (
+ dispatch_collection.clsname,
+ dispatch_collection.name,
+ since,
+ formatted_def,
+ )
+ )
+
if conv:
assert not has_kw
def wrap_leg(*args):
+ util.warn_deprecated(warning_txt, version=since)
return fn(*conv(*args))
else:
def wrap_leg(*args, **kw):
+ util.warn_deprecated(warning_txt, version=since)
argdict = dict(zip(dispatch_collection.arg_names, args))
args = [argdict[name] for name in argnames]
if has_kw:
diff --git a/lib/sqlalchemy/future/__init__.py b/lib/sqlalchemy/future/__init__.py
index d38d27d88..02cbd697b 100644
--- a/lib/sqlalchemy/future/__init__.py
+++ b/lib/sqlalchemy/future/__init__.py
@@ -8,7 +8,9 @@
"""Future 2.0 API features.
"""
-
+from .engine import Connection # noqa
+from .engine import create_engine # noqa
+from .engine import Engine # noqa
from .result import Result # noqa
from ..sql.selectable import Select
from ..util.langhelpers import public_factory
diff --git a/lib/sqlalchemy/future/engine.py b/lib/sqlalchemy/future/engine.py
new file mode 100644
index 000000000..286c83cc4
--- /dev/null
+++ b/lib/sqlalchemy/future/engine.py
@@ -0,0 +1,434 @@
+from .. import util
+from ..engine import Connection as _LegacyConnection
+from ..engine import create_engine as _create_engine
+from ..engine import Engine as _LegacyEngine
+from ..engine.base import OptionEngineMixin
+
+NO_OPTIONS = util.immutabledict()
+
+
+def create_engine(*arg, **kw):
+ """Create a new :class:`_future.Engine` instance.
+
+ Arguments passed to :func:`_future.create_engine` are mostly identical
+ to those passed to the 1.x :func:`_sa.create_engine` function.
+ The difference is that the object returned is the :class:`._future.Engine`
+ which has the 2.0 version of the API.
+
+ """
+
+ kw["_future_engine_class"] = Engine
+ return _create_engine(*arg, **kw)
+
+
+class Connection(_LegacyConnection):
+ """Provides high-level functionality for a wrapped DB-API connection.
+
+ **This is the SQLAlchemy 2.0 version** of the :class:`_engine.Connection`
+ class. The API and behavior of this object is largely the same, with the
+ following differences in behavior:
+
+ * The result object returned for results is the :class:`_future.Result`
+ object. This object has a slightly different API and behavior than the
+ prior :class:`_engine.ResultProxy` object.
+
+ * The object has :meth:`_future.Connection.commit` and
+ :meth:`_future.Connection.rollback` methods which commit or roll back
+ the current transaction in progress, if any.
+
+ * The object features "autobegin" behavior, such that any call to
+ :meth:`_future.Connection.execute` will
+ unconditionally start a
+ transaction which can be controlled using the above mentioned
+ :meth:`_future.Connection.commit` and
+ :meth:`_future.Connection.rollback` methods.
+
+ * The object does not have any "autocommit" functionality. Any SQL
+ statement or DDL statement will not be followed by any COMMIT until
+ the transaction is explicitly committed, either via the
+ :meth:`_future.Connection.commit` method, or if the connection is
+ being used in a context manager that commits such as the one
+ returned by :meth:`_future.Engine.begin`.
+
+ * The SAVEPOINT method :meth:`_future.Connection.begin_nested` returns
+ a :class:`_engine.NestedTransaction` as was always the case, and the
+ savepoint can be controlled by invoking
+ :meth:`_engine.NestedTransaction.commit` or
+ :meth:`_engine.NestedTransaction.rollback` as was the case before.
+ However, this savepoint "transaction" is not associated with the
+ transaction that is controlled by the connection itself; the overall
+ transaction can be committed or rolled back directly which will not emit
+ any special instructions for the SAVEPOINT (this will typically have the
+ effect that one desires).
+
+ * There are no "nested" connections or transactions.
+
+
+
+ """
+
+ _is_future = True
+
+ def _branch(self):
+ raise NotImplementedError(
+ "sqlalchemy.future.Connection does not support "
+ "'branching' of new connections."
+ )
+
+ def begin(self):
+ """Begin a transaction prior to autobegin occurring.
+
+ The :meth:`_future.Connection.begin` method in SQLAlchemy 2.0 begins a
+ transaction that normally will be begun in any case when the connection
+ is first used to execute a statement. The reason this method might be
+ used would be to invoke the :meth:`_events.ConnectionEvents.begin`
+ event at a specific time, or to organize code within the scope of a
+ connection checkout in terms of context managed blocks, such as::
+
+ with engine.connect() as conn:
+ with conn.begin():
+ conn.execute(...)
+ conn.execute(...)
+
+ with conn.begin():
+ conn.execute(...)
+ conn.execute(...)
+
+ The above code is not fundamentally any different in its behavior than
+ the following code which does not use
+ :meth:`_future.Connection.begin`::
+
+ with engine.connect() as conn:
+ conn.execute(...)
+ conn.execute(...)
+ conn.commit()
+
+ conn.execute(...)
+ conn.execute(...)
+ conn.commit()
+
+ In both examples, if an exception is raised, the transaction will not
+ be committed. An explicit rollback of the transaction will occur,
+ including that the :meth:`_events.ConnectionEvents.rollback` event will
+ be emitted, as connection's context manager will call
+ :meth:`_future.Connection.close`, which will call
+ :meth:`_future.Connection.rollback` for any transaction in place
+ (excluding that of a SAVEPOINT).
+
+ From a database point of view, the :meth:`_future.Connection.begin`
+ method does not emit any SQL or change the state of the underlying
+ DBAPI connection in any way; the Python DBAPI does not have any
+ concept of explicit transaction begin.
+
+ :return: a :class:`_engine.Transaction` object. This object supports
+ context-manager operation which will commit a transaction or
+ emit a rollback in case of error.
+
+ . If this event is not being used, then there is
+ no real effect from invoking :meth:`_future.Connection.begin` ahead
+ of time as the Python DBAPI does not implement any explicit BEGIN
+
+
+ The returned object is an instance of :class:`_engine.Transaction`.
+ This object represents the "scope" of the transaction,
+ which completes when either the :meth:`_engine.Transaction.rollback`
+ or :meth:`_engine.Transaction.commit` method is called.
+
+ Nested calls to :meth:`_future.Connection.begin` on the same
+ :class:`_future.Connection` will return new
+ :class:`_engine.Transaction` objects that represent an emulated
+ transaction within the scope of the enclosing transaction, that is::
+
+ trans = conn.begin() # outermost transaction
+ trans2 = conn.begin() # "nested"
+ trans2.commit() # does nothing
+ trans.commit() # actually commits
+
+ Calls to :meth:`_engine.Transaction.commit` only have an effect when
+ invoked via the outermost :class:`_engine.Transaction` object, though
+ the :meth:`_engine.Transaction.rollback` method of any of the
+ :class:`_engine.Transaction` objects will roll back the transaction.
+
+ .. seealso::
+
+ :meth:`_future.Connection.begin_nested` - use a SAVEPOINT
+
+ :meth:`_future.Connection.begin_twophase` -
+ use a two phase /XID transaction
+
+ :meth:`_future.Engine.begin` - context manager available from
+ :class:`_future.Engine`
+
+ """
+ return super(Connection, self).begin()
+
+ def begin_nested(self):
+ """Begin a nested transaction and return a transaction handle.
+
+ The returned object is an instance of
+ :class:`_engine.NestedTransaction`.
+
+ Nested transactions require SAVEPOINT support in the
+ underlying database. Any transaction in the hierarchy may
+ ``commit`` and ``rollback``, however the outermost transaction
+ still controls the overall ``commit`` or ``rollback`` of the
+ transaction of a whole.
+
+ In SQLAlchemy 2.0, the :class:`_engine.NestedTransaction` remains
+ independent of the :class:`_future.Connection` object itself. Calling
+ the :meth:`_future.Connection.commit` or
+ :meth:`_future.Connection.rollback` will always affect the actual
+ containing database transaction itself, and not the SAVEPOINT itself.
+ When a database transaction is committed, any SAVEPOINTs that have been
+ established are cleared and the data changes within their scope is also
+ committed.
+
+ .. seealso::
+
+ :meth:`_future.Connection.begin`
+
+
+ """
+ return super(Connection, self).begin_nested()
+
+ def commit(self):
+ """Commit the transaction that is currently in progress.
+
+ This method commits the current transaction if one has been started.
+ If no transaction was started, the method has no effect, assuming
+ the connection is in a non-invalidated state.
+
+ A transaction is begun on a :class:`_future.Connection` automatically
+ whenever a statement is first executed, or when the
+ :meth:`_future.Connection.begin` method is called.
+
+ .. note:: The :meth:`_future.Connection.commit` method only acts upon
+ the primary database transaction that is linked to the
+ :class:`_future.Connection` object. It does not operate upon a
+ SAVEPOINT that would have been invoked from the
+ :meth:`_future.Connection.begin_nested` method; for control of a
+ SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the
+ :class:`_engine.NestedTransaction` that is returned by the
+ :meth:`_future.Connection.begin_nested` method itself.
+
+
+ """
+ if self._transaction:
+ self._transaction.commit()
+
+ def rollback(self):
+ """Roll back the transaction that is currently in progress.
+
+ This method rolls back the current transaction if one has been started.
+ If no transaction was started, the method has no effect. If a
+ transaction was started and the connection is in an invalidated state,
+ the transaction is cleared using this method.
+
+ A transaction is begun on a :class:`_future.Connection` automatically
+ whenever a statement is first executed, or when the
+ :meth:`_future.Connection.begin` method is called.
+
+ .. note:: The :meth:`_future.Connection.rollback` method only acts
+ upon the primary database transaction that is linked to the
+ :class:`_future.Connection` object. It does not operate upon a
+ SAVEPOINT that would have been invoked from the
+ :meth:`_future.Connection.begin_nested` method; for control of a
+ SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the
+ :class:`_engine.NestedTransaction` that is returned by the
+ :meth:`_future.Connection.begin_nested` method itself.
+
+
+ """
+ if self._transaction:
+ self._transaction.rollback()
+
+ def close(self):
+ """Close this :class:`_future.Connection`.
+
+ This has the effect of also calling :meth:`_future.Connection.rollback`
+ if any transaction is in place.
+
+ """
+
+ try:
+ conn = self.__connection
+ except AttributeError:
+ pass
+ else:
+ # TODO: can we do away with "_reset_agent" stuff now?
+ if self._transaction:
+ self._transaction.rollback()
+
+ conn.close()
+
+ # the close() process can end up invalidating us,
+ # as the pool will call our transaction as the "reset_agent"
+ # for rollback(), which can then cause an invalidation
+ if not self.__invalid:
+ del self.__connection
+ self.__can_reconnect = False
+ self._transaction = None
+
+ def execute(self, statement, parameters=None, execution_options=None):
+ r"""Executes a SQL statement construct and returns a
+ :class:`_future.Result`.
+
+ :param object: The statement to be executed. This is always
+ an object that is in both the :class:`_expression.ClauseElement` and
+ :class:`_expression.Executable` hierarchies, including:
+
+ * :class:`_expression.Select`
+ * :class:`_expression.Insert`, :class:`_expression.Update`,
+ :class:`_expression.Delete`
+ * :class:`_expression.TextClause` and
+ :class:`_expression.TextualSelect`
+ * :class:`_schema.DDL` and objects which inherit from
+ :class:`_schema.DDLElement`
+
+ :param parameters: parameters which will be bound into the statment.
+ This may be either a dictionary of parameter names to values,
+ or a mutable sequence (e.g. a list) of dictionaries. When a
+ list of dictionaries is passed, the underlying statement execution
+ will make use of the DBAPI ``cursor.executemany()`` method.
+ When a single dictionary is passed, the DBAPI ``cursor.execute()``
+ method will be used.
+
+ :param execution_options: optional dictionary of execution options,
+ which will be associated with the statement execution. This
+ dictionary can provide a subset of the options that are accepted
+ by :meth:`_future.Connection.execution_options`.
+
+ :return: a :class:`_future.Result` object.
+
+ """
+ return self._execute_20(
+ statement, parameters, execution_options or NO_OPTIONS
+ )
+
+ def scalar(self, statement, parameters=None, execution_options=None):
+ r"""Executes a SQL statement construct and returns a scalar object.
+
+ This method is shorthand for invoking the
+ :meth:`_future.Result.scalar` method after invoking the
+ :meth:`_future.Connection.execute` method. Parameters are equivalent.
+
+ :return: a scalar Python value representing the first column of the
+ first row returned.
+
+ """
+ return self.execute(statement, parameters, execution_options).scalar()
+
+
+class Engine(_LegacyEngine):
+ """Connects a :class:`_pool.Pool` and
+ :class:`_engine.Dialect` together to provide a
+ source of database connectivity and behavior.
+
+ **This is the SQLAlchemy 2.0 version** of the :class:`~.engine.Engine`.
+
+ An :class:`.future.Engine` object is instantiated publicly using the
+ :func:`~sqlalchemy.future.create_engine` function.
+
+ .. seealso::
+
+ :doc:`/core/engines`
+
+ :ref:`connections_toplevel`
+
+ """
+
+ _connection_cls = Connection
+ _is_future = True
+
+ def _not_implemented(self, *arg, **kw):
+ raise NotImplementedError(
+ "This method is not implemented for SQLAlchemy 2.0."
+ )
+
+ transaction = (
+ run_callable
+ ) = (
+ execute
+ ) = (
+ scalar
+ ) = (
+ _execute_clauseelement
+ ) = _execute_compiled = table_names = has_table = _not_implemented
+
+ def _run_ddl_visitor(self, visitorcallable, element, **kwargs):
+ # TODO: this is for create_all support etc. not clear if we
+ # want to provide this in 2.0, that is, a way to execute SQL where
+ # they aren't calling "engine.begin()" explicitly, however, DDL
+ # may be a special case for which we want to continue doing it this
+ # way. A big win here is that the full DDL sequence is inside of a
+ # single transaction rather than COMMIT for each statment.
+ with self.begin() as conn:
+ conn._run_ddl_visitor(visitorcallable, element, **kwargs)
+
+ @classmethod
+ def _future_facade(self, legacy_engine):
+ return Engine(
+ legacy_engine.pool,
+ legacy_engine.dialect,
+ legacy_engine.url,
+ logging_name=legacy_engine.logging_name,
+ echo=legacy_engine.echo,
+ hide_parameters=legacy_engine.hide_parameters,
+ execution_options=legacy_engine._execution_options,
+ )
+
+ def begin(self):
+ """Return a :class:`_future.Connection` object with a transaction
+ begun.
+
+ Use of this method is similar to that of
+ :meth:`_future.Engine.connect`, typically as a context manager, which
+ will automatically maintain the state of the transaction when the block
+ ends, either by calling :meth:`_future.Connection.commit` when the
+ block succeeds normally, or :meth:`_future.Connection.rollback` when an
+ exception is raised, before propagating the exception outwards::
+
+ with engine.begin() as connection:
+ connection.execute(text("insert into table values ('foo')"))
+
+
+ .. seealso::
+
+ :meth:`_future.Engine.connect`
+
+ :meth:`_future.Connection.begin`
+
+ """
+ return super(Engine, self).begin()
+
+ def connect(self):
+ """Return a new :class:`_future.Connection` object.
+
+ The :class:`_future.Connection` acts as a Python context manager, so
+ the typical use of this method looks like::
+
+ with engine.connect() as connection:
+ connection.execute(text("insert into table values ('foo')"))
+ connection.commit()
+
+ Where above, after the block is completed, the connection is "closed"
+ and its underlying DBAPI resources are returned to the connection pool.
+ This also has the effect of rolling back any transaction that
+ was explicitly begun or was begun via autobegin, and will
+ emit the :meth:`_events.ConnectionEvents.rollback` event if one was
+ started and is still in progress.
+
+ .. seealso::
+
+ :meth:`_future.Engine.begin`
+
+
+ """
+ return super(Engine, self).connect()
+
+
+class OptionEngine(OptionEngineMixin, Engine):
+ pass
+
+
+Engine._option_cls = OptionEngine
diff --git a/lib/sqlalchemy/future/result.py b/lib/sqlalchemy/future/result.py
index 82d87ddf1..21a42e1f6 100644
--- a/lib/sqlalchemy/future/result.py
+++ b/lib/sqlalchemy/future/result.py
@@ -1,17 +1,16 @@
import operator
from .. import util
-from ..engine.result import _baserow_usecext
from ..engine.result import BaseResult
from ..engine.result import CursorResultMetaData
from ..engine.result import DefaultCursorFetchStrategy
from ..engine.result import Row
from ..sql import util as sql_util
from ..sql.base import _generative
-from ..sql.base import Generative
+from ..sql.base import InPlaceGenerative
-class Result(Generative, BaseResult):
+class Result(InPlaceGenerative, BaseResult):
"""Interim "future" result proxy so that dialects can build on
upcoming 2.0 patterns.
@@ -50,21 +49,76 @@ class Result(Generative, BaseResult):
self._soft_close(hard=True)
def columns(self, *col_expressions):
- indexes = []
- for key in col_expressions:
- try:
- rec = self._keymap[key]
- except KeyError:
- rec = self._key_fallback(key, True)
- if rec is None:
- return None
-
- index, obj = rec[0:2]
-
- if index is None:
- self._metadata._raise_for_ambiguous_column_name(obj)
- indexes.append(index)
- return self._column_slices(indexes)
+ r"""Establish the columns that should be returned in each row.
+
+ This method may be used to limit the columns returned as well
+ as to reorder them. The given list of expressions are normally
+ a series of integers or string key names. They may also be
+ appropriate :class:`.ColumnElement` objects which correspond to
+ a given statement construct.
+
+ E.g.::
+
+ statement = select(table.c.x, table.c.y, table.c.z)
+ result = connection.execute(statement)
+
+ for z, y in result.columns('z', 'y'):
+ # ...
+
+
+ Example of using the column objects from the statement itself::
+
+ for z, y in result.columns(
+ statement.selected_columns.c.z,
+ statement.selected_columns.c.y
+ ):
+ # ...
+
+ :param \*col_expressions: indicates columns to be returned. Elements
+ may be integer row indexes, string column names, or appropriate
+ :class:`.ColumnElement` objects corresponding to a select construct.
+
+ :return: this :class:`_future.Result` object with the modifications
+ given.
+
+ """
+ return self._column_slices(col_expressions)
+
+ def partitions(self, size=100):
+ """Iterate through sub-lists of rows of the size given.
+
+ Each list will be of the size given, excluding the last list to
+ be yielded, which may have a small number of rows. No empty
+ lists will be yielded.
+
+ The result object is automatically closed when the iterator
+ is fully consumed.
+
+ Note that the backend driver will usually buffer the entire result
+ ahead of time unless the
+ :paramref:`.Connection.execution_options.stream_results` execution
+ option is used indicating that the driver should not pre-buffer
+ results, if possible. Not all drivers support this option and
+ the option is silently ignored for those who do. For a positive
+ assertion that the driver supports streaming results that will
+ fail if not supported, use the
+ :paramref:`.Connection.execution_options.stream_per`
+ execution option.
+
+ :param size: indicate the maximum number of rows to be present
+ in each list yielded.
+ :return: iterator of lists
+
+ """
+ getter = self._row_getter()
+ while True:
+ partition = [
+ getter(r) for r in self._safe_fetchmany_impl(size=size)
+ ]
+ if partition:
+ yield partition
+ else:
+ break
def scalars(self):
result = self._column_slices(0)
@@ -73,12 +127,7 @@ class Result(Generative, BaseResult):
@_generative
def _column_slices(self, indexes):
- if _baserow_usecext:
- self._column_slice_filter = self._metadata._tuplegetter(*indexes)
- else:
- self._column_slice_filter = self._metadata._pure_py_tuplegetter(
- *indexes
- )
+ self._column_slice_filter = self._metadata._tuple_getter(indexes)
@_generative
def mappings(self):
@@ -135,7 +184,7 @@ class Result(Generative, BaseResult):
def _safe_fetchmany_impl(self, size=None):
try:
- l = self.process_rows(self.cursor_strategy.fetchmany(size))
+ l = self.cursor_strategy.fetchmany(size)
if len(l) == 0:
self._soft_close()
return l
@@ -156,11 +205,77 @@ class Result(Generative, BaseResult):
else:
return getter(row)
+ @util.deprecated(
+ "2.0",
+ "The :meth:`_future.Result.fetchall` "
+ "method is provided for backwards "
+ "compatibility and will be removed in a future release.",
+ )
+ def fetchall(self):
+ """A synonym for the :meth:`_future.Result.all` method."""
+
+ return self.all()
+
+ @util.deprecated(
+ "2.0",
+ "The :meth:`_future.Result.fetchone` "
+ "method is provided for backwards "
+ "compatibility and will be removed in a future release.",
+ )
+ def fetchone(self):
+ """Fetch one row.
+
+ this method is provided for backwards compatibility with
+ SQLAlchemy 1.x.x.
+
+ To fetch the first row of a result only, use the
+ :meth:`.future.Result.first` method. To iterate through all
+ rows, iterate the :class:`_future.Result` object directly.
+
+ """
+ return self._onerow()
+
+ @util.deprecated(
+ "2.0",
+ "The :meth:`_future.Result.fetchmany` "
+ "method is provided for backwards "
+ "compatibility and will be removed in a future release.",
+ )
+ def fetchmany(self, size=None):
+ """Fetch many rows.
+
+ this method is provided for backwards compatibility with
+ SQLAlchemy 1.x.x.
+
+ To fetch rows in groups, use the :meth:`.future.Result.partitions`
+ method, or the :meth:`.future.Result.chunks` method in combination
+ with the :paramref:`.Connection.execution_options.stream_per`
+ option which sets up the buffer size before fetching the result.
+
+ """
+ getter = self._row_getter()
+ return [getter(r) for r in self._safe_fetchmany_impl(size=size)]
+
def all(self):
+ """Return all rows in a list.
+
+ Closes the result set after invocation.
+
+ :return: a list of :class:`.Row` objects.
+
+ """
getter = self._row_getter()
return [getter(r) for r in self._safe_fetchall_impl()]
def first(self):
+ """Fetch the first row or None if no row is present.
+
+ Closes the result set and discards remaining rows. A warning
+ is emitted if additional rows remain.
+
+ :return: a :class:`.Row` object, or None if no rows remain
+
+ """
getter = self._row_getter()
row = self._safe_fetchone_impl()
if row is None:
@@ -172,3 +287,19 @@ class Result(Generative, BaseResult):
self._soft_close()
util.warn("Additional rows remain")
return row
+
+ def scalar(self):
+ """Fetch the first column of the first row, and close the result set.
+
+ After calling this method, the object is fully closed,
+ e.g. the :meth:`_engine.ResultProxy.close`
+ method will have been called.
+
+ :return: a Python scalar value , or None if no rows remain
+
+ """
+ row = self.first()
+ if row is not None:
+ return row[0]
+ else:
+ return None
diff --git a/lib/sqlalchemy/log.py b/lib/sqlalchemy/log.py
index 42c8a2c29..44f8c4ff8 100644
--- a/lib/sqlalchemy/log.py
+++ b/lib/sqlalchemy/log.py
@@ -41,8 +41,15 @@ def _add_default_handler(logger):
_logged_classes = set()
+def _qual_logger_name_for_cls(cls):
+ return (
+ getattr(cls, "_sqla_logger_namespace", None)
+ or cls.__module__ + "." + cls.__name__
+ )
+
+
def class_logger(cls):
- logger = logging.getLogger(cls.__module__ + "." + cls.__name__)
+ logger = logging.getLogger(_qual_logger_name_for_cls(cls))
cls._should_log_debug = lambda self: logger.isEnabledFor(logging.DEBUG)
cls._should_log_info = lambda self: logger.isEnabledFor(logging.INFO)
cls.logger = logger
@@ -175,16 +182,12 @@ def instance_logger(instance, echoflag=None):
"""create a logger for an instance that implements :class:`.Identified`."""
if instance.logging_name:
- name = "%s.%s.%s" % (
- instance.__class__.__module__,
- instance.__class__.__name__,
+ name = "%s.%s" % (
+ _qual_logger_name_for_cls(instance.__class__),
instance.logging_name,
)
else:
- name = "%s.%s" % (
- instance.__class__.__module__,
- instance.__class__.__name__,
- )
+ name = _qual_logger_name_for_cls(instance.__class__)
instance._echo = echoflag
diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py
index 534d67530..4ca715dd3 100644
--- a/lib/sqlalchemy/orm/session.py
+++ b/lib/sqlalchemy/orm/session.py
@@ -447,7 +447,10 @@ class SessionTransaction(object):
elif self.nested:
transaction = conn.begin_nested()
else:
- transaction = conn.begin()
+ if conn._is_future and conn.in_transaction():
+ transaction = conn._transaction
+ else:
+ transaction = conn.begin()
except:
# connection will not not be associated with this Session;
# close it immediately so that it isn't closed under GC
@@ -455,10 +458,13 @@ class SessionTransaction(object):
conn.close()
raise
else:
+ bind_is_connection = isinstance(bind, engine.Connection)
+
self._connections[conn] = self._connections[conn.engine] = (
conn,
transaction,
- conn is not bind,
+ not bind_is_connection or not conn._is_future,
+ not bind_is_connection,
)
self.session.dispatch.after_begin(self.session, self, conn)
return conn
@@ -509,8 +515,11 @@ class SessionTransaction(object):
self._prepare_impl()
if self._parent is None or self.nested:
- for t in set(self._connections.values()):
- t[1].commit()
+ for conn, trans, should_commit, autoclose in set(
+ self._connections.values()
+ ):
+ if should_commit:
+ trans.commit()
self._state = COMMITTED
self.session.dispatch.after_commit(self.session)
@@ -579,7 +588,7 @@ class SessionTransaction(object):
def close(self, invalidate=False):
self.session._transaction = self._parent
if self._parent is None:
- for connection, transaction, autoclose in set(
+ for connection, transaction, should_commit, autoclose in set(
self._connections.values()
):
if invalidate:
diff --git a/lib/sqlalchemy/sql/base.py b/lib/sqlalchemy/sql/base.py
index d070027c8..2d023c6a6 100644
--- a/lib/sqlalchemy/sql/base.py
+++ b/lib/sqlalchemy/sql/base.py
@@ -461,6 +461,17 @@ class Generative(HasMemoized):
return s
+class InPlaceGenerative(HasMemoized):
+ """Provide a method-chaining pattern in conjunction with the
+ @_generative decorator taht mutates in place."""
+
+ def _generate(self):
+ skip = self._memoized_keys
+ for k in skip:
+ self.__dict__.pop(k, None)
+ return self
+
+
class HasCompileState(Generative):
"""A class that has a :class:`.CompileState` associated with it."""
diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py
index 7ef9f7e6d..bc16b1429 100644
--- a/lib/sqlalchemy/sql/compiler.py
+++ b/lib/sqlalchemy/sql/compiler.py
@@ -434,9 +434,13 @@ class Compiled(object):
self.string, schema_translate_map
)
- def _execute_on_connection(self, connection, multiparams, params):
+ def _execute_on_connection(
+ self, connection, multiparams, params, execution_options
+ ):
if self.can_execute:
- return connection._execute_compiled(self, multiparams, params)
+ return connection._execute_compiled(
+ self, multiparams, params, execution_options
+ )
else:
raise exc.ObjectNotExecutableError(self.statement)
diff --git a/lib/sqlalchemy/sql/ddl.py b/lib/sqlalchemy/sql/ddl.py
index 4c8250e98..51526173f 100644
--- a/lib/sqlalchemy/sql/ddl.py
+++ b/lib/sqlalchemy/sql/ddl.py
@@ -68,8 +68,12 @@ class DDLElement(roles.DDLRole, Executable, _DDLCompiles):
dialect = None
callable_ = None
- def _execute_on_connection(self, connection, multiparams, params):
- return connection._execute_ddl(self, multiparams, params)
+ def _execute_on_connection(
+ self, connection, multiparams, params, execution_options
+ ):
+ return connection._execute_ddl(
+ self, multiparams, params, execution_options
+ )
def execute(self, bind=None, target=None):
"""Execute this DDL immediately.
diff --git a/lib/sqlalchemy/sql/elements.py b/lib/sqlalchemy/sql/elements.py
index 49bb08644..d8b5a1626 100644
--- a/lib/sqlalchemy/sql/elements.py
+++ b/lib/sqlalchemy/sql/elements.py
@@ -282,9 +282,13 @@ class ClauseElement(
d.pop("_generate_cache_key", None)
return d
- def _execute_on_connection(self, connection, multiparams, params):
+ def _execute_on_connection(
+ self, connection, multiparams, params, execution_options
+ ):
if self.supports_execution:
- return connection._execute_clauseelement(self, multiparams, params)
+ return connection._execute_clauseelement(
+ self, multiparams, params, execution_options
+ )
else:
raise exc.ObjectNotExecutableError(self)
diff --git a/lib/sqlalchemy/sql/functions.py b/lib/sqlalchemy/sql/functions.py
index 1b10df954..cedb76f55 100644
--- a/lib/sqlalchemy/sql/functions.py
+++ b/lib/sqlalchemy/sql/functions.py
@@ -115,8 +115,12 @@ class FunctionElement(Executable, ColumnElement, FromClause):
operator=operators.comma_op, group_contents=True, *args
).self_group()
- def _execute_on_connection(self, connection, multiparams, params):
- return connection._execute_function(self, multiparams, params)
+ def _execute_on_connection(
+ self, connection, multiparams, params, execution_options
+ ):
+ return connection._execute_function(
+ self, multiparams, params, execution_options
+ )
@property
def columns(self):
diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py
index ec8d5a458..eddd62d65 100644
--- a/lib/sqlalchemy/sql/schema.py
+++ b/lib/sqlalchemy/sql/schema.py
@@ -2186,8 +2186,12 @@ class DefaultGenerator(SchemaItem):
bind = _bind_or_error(self)
return bind.execute(self, **kwargs)
- def _execute_on_connection(self, connection, multiparams, params):
- return connection._execute_default(self, multiparams, params)
+ def _execute_on_connection(
+ self, connection, multiparams, params, execution_options
+ ):
+ return connection._execute_default(
+ self, multiparams, params, execution_options
+ )
@property
def bind(self):
diff --git a/lib/sqlalchemy/testing/assertsql.py b/lib/sqlalchemy/testing/assertsql.py
index 8876c2304..7988b4ec9 100644
--- a/lib/sqlalchemy/testing/assertsql.py
+++ b/lib/sqlalchemy/testing/assertsql.py
@@ -388,7 +388,9 @@ def assert_engine(engine):
orig = []
@event.listens_for(engine, "before_execute")
- def connection_execute(conn, clauseelement, multiparams, params):
+ def connection_execute(
+ conn, clauseelement, multiparams, params, execution_options
+ ):
# grab the original statement + params before any cursor
# execution
orig[:] = clauseelement, multiparams, params
diff --git a/lib/sqlalchemy/testing/config.py b/lib/sqlalchemy/testing/config.py
index 140f5f782..e97821d72 100644
--- a/lib/sqlalchemy/testing/config.py
+++ b/lib/sqlalchemy/testing/config.py
@@ -150,6 +150,14 @@ class Config(object):
cls.set_as_current(config, namespace)
@classmethod
+ def pop(cls, namespace):
+ if cls._stack:
+ # a failed test w/ -x option can call reset() ahead of time
+ _current = cls._stack[-1]
+ del cls._stack[-1]
+ cls.set_as_current(_current, namespace)
+
+ @classmethod
def reset(cls, namespace):
if cls._stack:
cls.set_as_current(cls._stack[0], namespace)
diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py
index 910af5876..280e6901e 100644
--- a/lib/sqlalchemy/testing/engines.py
+++ b/lib/sqlalchemy/testing/engines.py
@@ -238,10 +238,13 @@ def reconnecting_engine(url=None, options=None):
return engine
-def testing_engine(url=None, options=None):
+def testing_engine(url=None, options=None, future=False):
"""Produce an engine configured by --options with optional overrides."""
- from sqlalchemy import create_engine
+ if future or config.db and config.db._is_future:
+ from sqlalchemy.future import create_engine
+ else:
+ from sqlalchemy import create_engine
from sqlalchemy.engine.url import make_url
if not options:
diff --git a/lib/sqlalchemy/testing/fixtures.py b/lib/sqlalchemy/testing/fixtures.py
index e5e6c42fc..26ae221b8 100644
--- a/lib/sqlalchemy/testing/fixtures.py
+++ b/lib/sqlalchemy/testing/fixtures.py
@@ -84,6 +84,29 @@ class TestBase(object):
# engines.drop_all_tables(metadata, config.db)
+class FutureEngineMixin(object):
+ @classmethod
+ def setup_class(cls):
+ super_ = super(FutureEngineMixin, cls)
+ if hasattr(super_, "setup_class"):
+ super_.setup_class()
+
+ from ..future.engine import Engine
+ from sqlalchemy import testing
+
+ config._current.push_engine(Engine._future_facade(config.db), testing)
+
+ @classmethod
+ def teardown_class(cls):
+ from sqlalchemy import testing
+
+ config._current.pop(testing)
+
+ super_ = super(FutureEngineMixin, cls)
+ if hasattr(super_, "teardown_class"):
+ super_.teardown_class()
+
+
class TablesTest(TestBase):
# 'once', None
diff --git a/lib/sqlalchemy/testing/suite/test_ddl.py b/lib/sqlalchemy/testing/suite/test_ddl.py
index 1f49106fb..93706338c 100644
--- a/lib/sqlalchemy/testing/suite/test_ddl.py
+++ b/lib/sqlalchemy/testing/suite/test_ddl.py
@@ -90,4 +90,8 @@ class TableDDLTest(fixtures.TestBase):
)
-__all__ = ("TableDDLTest",)
+class FutureTableDDLTest(fixtures.FutureEngineMixin, TableDDLTest):
+ pass
+
+
+__all__ = ("TableDDLTest", "FutureTableDDLTest")
diff --git a/lib/sqlalchemy/testing/warnings.py b/lib/sqlalchemy/testing/warnings.py
index 6b42c98cb..39cffbf15 100644
--- a/lib/sqlalchemy/testing/warnings.py
+++ b/lib/sqlalchemy/testing/warnings.py
@@ -34,6 +34,13 @@ def setup_filters():
# ignore 2.0 warnings unless we are explicitly testing for them
warnings.filterwarnings("ignore", category=sa_exc.RemovedIn20Warning)
+ # ignore things that are deprecated *as of* 2.0 :)
+ warnings.filterwarnings(
+ "ignore",
+ category=sa_exc.SADeprecationWarning,
+ message=r".*\(deprecated since: 2.0\)$",
+ )
+
try:
import pytest
except ImportError:
diff --git a/lib/sqlalchemy/util/deprecations.py b/lib/sqlalchemy/util/deprecations.py
index ad734a1c3..8ea8e8695 100644
--- a/lib/sqlalchemy/util/deprecations.py
+++ b/lib/sqlalchemy/util/deprecations.py
@@ -108,6 +108,8 @@ def deprecated(
if warning is None:
warning = exc.SADeprecationWarning
+ message += " (deprecated since: %s)" % version
+
def decorate(fn):
return _decorate_with_warning(
fn, warning, message % dict(func=fn.__name__), version, header