summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-04-27 12:58:12 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-05-25 13:56:37 -0400
commit6930dfc032c3f9f474e71ab4e021c0ef8384930e (patch)
tree34b919a3c34edaffda1750f161a629fc5b9a8020 /lib/sqlalchemy/ext
parentdce8c7a125cb99fad62c76cd145752d5afefae36 (diff)
downloadsqlalchemy-6930dfc032c3f9f474e71ab4e021c0ef8384930e.tar.gz
Convert execution to move through Session
This patch replaces the ORM execution flow with a single pathway through Session.execute() for all queries, including Core and ORM. Currently included is full support for ORM Query, Query.from_statement(), select(), as well as the baked query and horizontal shard systems. Initial changes have also been made to the dogpile caching example, which like baked query makes use of a new ORM-specific execution hook that replaces the use of both QueryEvents.before_compile() as well as Query._execute_and_instances() as the central ORM interception hooks. select() and Query() constructs alike can be passed to Session.execute() where they will return ORM results in a Results object. This API is currently used internally by Query. Full support for Session.execute()->results to behave in a fully 2.0 fashion will be in later changesets. bulk update/delete with ORM support will also be delivered via the update() and delete() constructs, however these have not yet been adapted to the new system and may follow in a subsequent update. Performance is also beginning to lag as of this commit and some previous ones. It is hoped that a few central functions such as the coercions functions can be rewritten in C to re-gain performance. Additionally, query caching is now available and some subsequent patches will attempt to cache more of the per-execution work from the ORM layer, e.g. column getters and adapters. This patch also contains initial "turn on" of the caching system enginewide via the query_cache_size parameter to create_engine(). Still defaulting at zero for "no caching". The caching system still needs adjustments in order to gain adequate performance. Change-Id: I047a7ebb26aa85dc01f6789fac2bff561dcd555d
Diffstat (limited to 'lib/sqlalchemy/ext')
-rw-r--r--lib/sqlalchemy/ext/baked.py170
-rw-r--r--lib/sqlalchemy/ext/horizontal_shard.py156
2 files changed, 141 insertions, 185 deletions
diff --git a/lib/sqlalchemy/ext/baked.py b/lib/sqlalchemy/ext/baked.py
index 24af454b6..112e245f7 100644
--- a/lib/sqlalchemy/ext/baked.py
+++ b/lib/sqlalchemy/ext/baked.py
@@ -19,7 +19,6 @@ from .. import exc as sa_exc
from .. import util
from ..orm import exc as orm_exc
from ..orm import strategy_options
-from ..orm.context import QueryContext
from ..orm.query import Query
from ..orm.session import Session
from ..sql import func
@@ -201,11 +200,12 @@ class BakedQuery(object):
self.spoil(full=True)
else:
for opt in options:
- cache_key = opt._generate_path_cache_key(cache_path)
- if cache_key is False:
- self.spoil(full=True)
- elif cache_key is not None:
- key += cache_key
+ if opt._is_legacy_option or opt._is_compile_state:
+ cache_key = opt._generate_path_cache_key(cache_path)
+ if cache_key is False:
+ self.spoil(full=True)
+ elif cache_key is not None:
+ key += cache_key
self.add_criteria(
lambda q: q._with_current_path(effective_path).options(*options),
@@ -224,41 +224,32 @@ class BakedQuery(object):
def _bake(self, session):
query = self._as_query(session)
+ query.session = None
- compile_state = query._compile_state()
+ # in 1.4, this is where before_compile() event is
+ # invoked
+ statement = query._statement_20(orm_results=True)
- self._bake_subquery_loaders(session, compile_state)
-
- # TODO: compile_state clearly needs to be simplified here.
- # if the session remains, fails memusage test
- compile_state.orm_query = (
- query
- ) = (
- compile_state.select_statement
- ) = compile_state.query = compile_state.orm_query.with_session(None)
- query._execution_options = query._execution_options.union(
- {"compiled_cache": self._bakery}
- )
-
- # we'll be holding onto the query for some of its state,
- # so delete some compilation-use-only attributes that can take up
- # space
- for attr in (
- "_correlate",
- "_from_obj",
- "_mapper_adapter_map",
- "_joinpath",
- "_joinpoint",
- ):
- query.__dict__.pop(attr, None)
+ # the before_compile() event can create a new Query object
+ # before it makes the statement.
+ query = statement.compile_options._orm_query
# if the query is not safe to cache, we still do everything as though
# we did cache it, since the receiver of _bake() assumes subqueryload
# context was set up, etc.
- if compile_state.compile_options._bake_ok:
- self._bakery[self._effective_key(session)] = compile_state
+ #
+ # note also we want to cache the statement itself because this
+ # allows the statement itself to hold onto its cache key that is
+ # used by the Connection, which in itself is more expensive to
+ # generate than what BakedQuery was able to provide in 1.3 and prior
+
+ if query.compile_options._bake_ok:
+ self._bakery[self._effective_key(session)] = (
+ query,
+ statement,
+ )
- return compile_state
+ return query, statement
def to_query(self, query_or_session):
"""Return the :class:`_query.Query` object for use as a subquery.
@@ -321,50 +312,6 @@ class BakedQuery(object):
return query
- def _bake_subquery_loaders(self, session, compile_state):
- """convert subquery eager loaders in the cache into baked queries.
-
- For subquery eager loading to work, all we need here is that the
- Query point to the correct session when it is run. However, since
- we are "baking" anyway, we may as well also turn the query into
- a "baked" query so that we save on performance too.
-
- """
- compile_state.attributes["baked_queries"] = baked_queries = []
- for k, v in list(compile_state.attributes.items()):
- if isinstance(v, dict) and "query" in v:
- if "subqueryload_data" in k:
- query = v["query"]
- bk = BakedQuery(self._bakery, lambda *args: query)
- bk._cache_key = self._cache_key + k
- bk._bake(session)
- baked_queries.append((k, bk._cache_key, v))
- del compile_state.attributes[k]
-
- def _unbake_subquery_loaders(
- self, session, compile_state, context, params, post_criteria
- ):
- """Retrieve subquery eager loaders stored by _bake_subquery_loaders
- and turn them back into Result objects that will iterate just
- like a Query object.
-
- """
- if "baked_queries" not in compile_state.attributes:
- return
-
- for k, cache_key, v in compile_state.attributes["baked_queries"]:
- query = v["query"]
- bk = BakedQuery(
- self._bakery, lambda sess, q=query: q.with_session(sess)
- )
- bk._cache_key = cache_key
- q = bk.for_session(session)
- for fn in post_criteria:
- q = q.with_post_criteria(fn)
- v = dict(v)
- v["query"] = q.params(**params)
- context.attributes[k] = v
-
class Result(object):
"""Invokes a :class:`.BakedQuery` against a :class:`.Session`.
@@ -406,17 +353,19 @@ class Result(object):
This adds a function that will be run against the
:class:`_query.Query` object after it is retrieved from the
- cache. Functions here can be used to alter the query in ways
- that **do not affect the SQL output**, such as execution options
- and shard identifiers (when using a shard-enabled query object)
+ cache. This currently includes **only** the
+ :meth:`_query.Query.params` and :meth:`_query.Query.execution_options`
+ methods.
.. warning:: :meth:`_baked.Result.with_post_criteria`
functions are applied
to the :class:`_query.Query`
object **after** the query's SQL statement
- object has been retrieved from the cache. Any operations here
- which intend to modify the SQL should ensure that
- :meth:`.BakedQuery.spoil` was called first.
+ object has been retrieved from the cache. Only
+ :meth:`_query.Query.params` and
+ :meth:`_query.Query.execution_options`
+ methods should be used.
+
.. versionadded:: 1.2
@@ -438,40 +387,41 @@ class Result(object):
def _iter(self):
bq = self.bq
+
if not self.session.enable_baked_queries or bq._spoiled:
return self._as_query()._iter()
- baked_compile_state = bq._bakery.get(
- bq._effective_key(self.session), None
+ query, statement = bq._bakery.get(
+ bq._effective_key(self.session), (None, None)
)
- if baked_compile_state is None:
- baked_compile_state = bq._bake(self.session)
-
- context = QueryContext(baked_compile_state, self.session)
- context.session = self.session
-
- bq._unbake_subquery_loaders(
- self.session,
- baked_compile_state,
- context,
- self._params,
- self._post_criteria,
- )
-
- # asserts true
- # if isinstance(baked_compile_state.statement, expression.Select):
- # assert baked_compile_state.statement._label_style == \
- # LABEL_STYLE_TABLENAME_PLUS_COL
+ if query is None:
+ query, statement = bq._bake(self.session)
- if context.autoflush and not context.populate_existing:
- self.session._autoflush()
- q = context.orm_query.params(self._params).with_session(self.session)
+ q = query.params(self._params)
for fn in self._post_criteria:
q = fn(q)
params = q.load_options._params
+ q.load_options += {"_orm_query": q}
+ execution_options = dict(q._execution_options)
+ execution_options.update(
+ {
+ "_sa_orm_load_options": q.load_options,
+ "compiled_cache": bq._bakery,
+ }
+ )
+
+ result = self.session.execute(
+ statement, params, execution_options=execution_options
+ )
+
+ if result._attributes.get("is_single_entity", False):
+ result = result.scalars()
+
+ if result._attributes.get("filtered", False):
+ result = result.unique()
- return q._execute_and_instances(context, params=params)
+ return result
def count(self):
"""return the 'count'.
@@ -583,10 +533,10 @@ class Result(object):
query = self.bq.steps[0](self.session)
return query._get_impl(ident, self._load_on_pk_identity)
- def _load_on_pk_identity(self, query, primary_key_identity):
+ def _load_on_pk_identity(self, session, query, primary_key_identity, **kw):
"""Load the given primary key identity from the database."""
- mapper = query._only_full_mapper_zero("load_on_pk_identity")
+ mapper = query._raw_columns[0]._annotations["parententity"]
_get_clause, _get_params = mapper._get_clause
diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py
index 919f4409a..1375a24cd 100644
--- a/lib/sqlalchemy/ext/horizontal_shard.py
+++ b/lib/sqlalchemy/ext/horizontal_shard.py
@@ -15,10 +15,8 @@ the source distribution.
"""
-import copy
-
+from sqlalchemy import event
from .. import inspect
-from .. import util
from ..orm.query import Query
from ..orm.session import Session
@@ -37,54 +35,32 @@ class ShardedQuery(Query):
all subsequent operations with the returned query will
be against the single shard regardless of other state.
- """
- q = self._clone()
- q._shard_id = shard_id
- return q
+ The shard_id can be passed for a 2.0 style execution to the
+ bind_arguments dictionary of :meth:`.Session.execute`::
- def _execute_and_instances(self, context, params=None):
- if params is None:
- params = self.load_options._params
-
- def iter_for_shard(shard_id):
- # shallow copy, so that each context may be used by
- # ORM load events and similar.
- copied_context = copy.copy(context)
- copied_context.attributes = context.attributes.copy()
-
- copied_context.attributes[
- "shard_id"
- ] = copied_context.identity_token = shard_id
- result_ = self._connection_from_session(
- mapper=context.compile_state._bind_mapper(), shard_id=shard_id
- ).execute(
- copied_context.compile_state.statement,
- self.load_options._params,
+ results = session.execute(
+ stmt,
+ bind_arguments={"shard_id": "my_shard"}
)
- return self.instances(result_, copied_context)
- if context.identity_token is not None:
- return iter_for_shard(context.identity_token)
- elif self._shard_id is not None:
- return iter_for_shard(self._shard_id)
- else:
- partial = []
- for shard_id in self.query_chooser(self):
- result_ = iter_for_shard(shard_id)
- partial.append(result_)
+ """
- return partial[0].merge(*partial[1:])
+ q = self._clone()
+ q._shard_id = shard_id
+ return q
def _execute_crud(self, stmt, mapper):
def exec_for_shard(shard_id):
- conn = self._connection_from_session(
+ conn = self.session.connection(
mapper=mapper,
shard_id=shard_id,
clause=stmt,
close_with_result=True,
)
- result = conn.execute(stmt, self.load_options._params)
+ result = conn._execute_20(
+ stmt, self.load_options._params, self._execution_options
+ )
return result
if self._shard_id is not None:
@@ -99,38 +75,6 @@ class ShardedQuery(Query):
return ShardedResult(results, rowcount)
- def _get_impl(self, primary_key_identity, db_load_fn, identity_token=None):
- """Override the default Query._get_impl() method so that we emit
- a query to the DB for each possible identity token, if we don't
- have one already.
-
- """
-
- def _db_load_fn(query, primary_key_identity):
- # load from the database. The original db_load_fn will
- # use the given Query object to load from the DB, so our
- # shard_id is what will indicate the DB that we query from.
- if self._shard_id is not None:
- return db_load_fn(self, primary_key_identity)
- else:
- ident = util.to_list(primary_key_identity)
- # build a ShardedQuery for each shard identifier and
- # try to load from the DB
- for shard_id in self.id_chooser(self, ident):
- q = self.set_shard(shard_id)
- o = db_load_fn(q, ident)
- if o is not None:
- return o
- else:
- return None
-
- if identity_token is None and self._shard_id is not None:
- identity_token = self._shard_id
-
- return super(ShardedQuery, self)._get_impl(
- primary_key_identity, _db_load_fn, identity_token=identity_token
- )
-
class ShardedResult(object):
"""A value object that represents multiple :class:`_engine.CursorResult`
@@ -190,11 +134,14 @@ class ShardedSession(Session):
"""
super(ShardedSession, self).__init__(query_cls=query_cls, **kwargs)
+
+ event.listen(
+ self, "do_orm_execute", execute_and_instances, retval=True
+ )
self.shard_chooser = shard_chooser
self.id_chooser = id_chooser
self.query_chooser = query_chooser
self.__binds = {}
- self.connection_callable = self.connection
if shards is not None:
for k in shards:
self.bind_shard(k, shards[k])
@@ -207,8 +154,8 @@ class ShardedSession(Session):
lazy_loaded_from=None,
**kw
):
- """override the default :meth:`.Session._identity_lookup` method so that we
- search for a given non-token primary key identity across all
+ """override the default :meth:`.Session._identity_lookup` method so
+ that we search for a given non-token primary key identity across all
possible identity tokens (e.g. shard ids).
.. versionchanged:: 1.4 Moved :meth:`.Session._identity_lookup` from
@@ -255,7 +202,14 @@ class ShardedSession(Session):
state.identity_token = shard_id
return shard_id
- def connection(self, mapper=None, instance=None, shard_id=None, **kwargs):
+ def connection_callable(
+ self, mapper=None, instance=None, shard_id=None, **kwargs
+ ):
+ """Provide a :class:`_engine.Connection` to use in the unit of work
+ flush process.
+
+ """
+
if shard_id is None:
shard_id = self._choose_shard_and_assign(mapper, instance)
@@ -267,7 +221,7 @@ class ShardedSession(Session):
).connect(**kwargs)
def get_bind(
- self, mapper, shard_id=None, instance=None, clause=None, **kw
+ self, mapper=None, shard_id=None, instance=None, clause=None, **kw
):
if shard_id is None:
shard_id = self._choose_shard_and_assign(
@@ -277,3 +231,55 @@ class ShardedSession(Session):
def bind_shard(self, shard_id, bind):
self.__binds[shard_id] = bind
+
+
+def execute_and_instances(orm_context):
+ if orm_context.bind_arguments.get("_horizontal_shard", False):
+ return None
+
+ params = orm_context.parameters
+
+ load_options = orm_context.load_options
+ session = orm_context.session
+ orm_query = orm_context.orm_query
+
+ if params is None:
+ params = load_options._params
+
+ def iter_for_shard(shard_id, load_options):
+ execution_options = dict(orm_context.execution_options)
+
+ bind_arguments = dict(orm_context.bind_arguments)
+ bind_arguments["_horizontal_shard"] = True
+ bind_arguments["shard_id"] = shard_id
+
+ load_options += {"_refresh_identity_token": shard_id}
+ execution_options["_sa_orm_load_options"] = load_options
+
+ return session.execute(
+ orm_context.statement,
+ orm_context.parameters,
+ execution_options,
+ bind_arguments,
+ )
+
+ if load_options._refresh_identity_token is not None:
+ shard_id = load_options._refresh_identity_token
+ elif orm_query is not None and orm_query._shard_id is not None:
+ shard_id = orm_query._shard_id
+ elif "shard_id" in orm_context.bind_arguments:
+ shard_id = orm_context.bind_arguments["shard_id"]
+ else:
+ shard_id = None
+
+ if shard_id is not None:
+ return iter_for_shard(shard_id, load_options)
+ else:
+ partial = []
+ for shard_id in session.query_chooser(
+ orm_query if orm_query is not None else orm_context.statement
+ ):
+ result_ = iter_for_shard(shard_id, load_options)
+ partial.append(result_)
+
+ return partial[0].merge(*partial[1:])