diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-04-27 12:58:12 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-05-25 13:56:37 -0400 |
| commit | 6930dfc032c3f9f474e71ab4e021c0ef8384930e (patch) | |
| tree | 34b919a3c34edaffda1750f161a629fc5b9a8020 /lib/sqlalchemy/ext | |
| parent | dce8c7a125cb99fad62c76cd145752d5afefae36 (diff) | |
| download | sqlalchemy-6930dfc032c3f9f474e71ab4e021c0ef8384930e.tar.gz | |
Convert execution to move through Session
This patch replaces the ORM execution flow with a
single pathway through Session.execute() for all queries,
including Core and ORM.
Currently included is full support for ORM Query,
Query.from_statement(), select(), as well as the
baked query and horizontal shard systems. Initial
changes have also been made to the dogpile caching
example, which like baked query makes use of a
new ORM-specific execution hook that replaces the
use of both QueryEvents.before_compile() as well
as Query._execute_and_instances() as the central
ORM interception hooks.
select() and Query() constructs alike can be passed to
Session.execute() where they will return ORM
results in a Results object. This API is currently
used internally by Query. Full support for
Session.execute()->results to behave in a fully
2.0 fashion will be in later changesets.
bulk update/delete with ORM support will also
be delivered via the update() and delete()
constructs, however these have not yet been adapted
to the new system and may follow in a subsequent
update.
Performance is also beginning to lag as of this
commit and some previous ones. It is hoped that
a few central functions such as the coercions
functions can be rewritten in C to re-gain
performance. Additionally, query caching
is now available and some subsequent patches
will attempt to cache more of the per-execution
work from the ORM layer, e.g. column getters
and adapters.
This patch also contains initial "turn on" of the
caching system enginewide via the query_cache_size
parameter to create_engine(). Still defaulting at
zero for "no caching". The caching system still
needs adjustments in order to gain adequate performance.
Change-Id: I047a7ebb26aa85dc01f6789fac2bff561dcd555d
Diffstat (limited to 'lib/sqlalchemy/ext')
| -rw-r--r-- | lib/sqlalchemy/ext/baked.py | 170 | ||||
| -rw-r--r-- | lib/sqlalchemy/ext/horizontal_shard.py | 156 |
2 files changed, 141 insertions, 185 deletions
diff --git a/lib/sqlalchemy/ext/baked.py b/lib/sqlalchemy/ext/baked.py index 24af454b6..112e245f7 100644 --- a/lib/sqlalchemy/ext/baked.py +++ b/lib/sqlalchemy/ext/baked.py @@ -19,7 +19,6 @@ from .. import exc as sa_exc from .. import util from ..orm import exc as orm_exc from ..orm import strategy_options -from ..orm.context import QueryContext from ..orm.query import Query from ..orm.session import Session from ..sql import func @@ -201,11 +200,12 @@ class BakedQuery(object): self.spoil(full=True) else: for opt in options: - cache_key = opt._generate_path_cache_key(cache_path) - if cache_key is False: - self.spoil(full=True) - elif cache_key is not None: - key += cache_key + if opt._is_legacy_option or opt._is_compile_state: + cache_key = opt._generate_path_cache_key(cache_path) + if cache_key is False: + self.spoil(full=True) + elif cache_key is not None: + key += cache_key self.add_criteria( lambda q: q._with_current_path(effective_path).options(*options), @@ -224,41 +224,32 @@ class BakedQuery(object): def _bake(self, session): query = self._as_query(session) + query.session = None - compile_state = query._compile_state() + # in 1.4, this is where before_compile() event is + # invoked + statement = query._statement_20(orm_results=True) - self._bake_subquery_loaders(session, compile_state) - - # TODO: compile_state clearly needs to be simplified here. - # if the session remains, fails memusage test - compile_state.orm_query = ( - query - ) = ( - compile_state.select_statement - ) = compile_state.query = compile_state.orm_query.with_session(None) - query._execution_options = query._execution_options.union( - {"compiled_cache": self._bakery} - ) - - # we'll be holding onto the query for some of its state, - # so delete some compilation-use-only attributes that can take up - # space - for attr in ( - "_correlate", - "_from_obj", - "_mapper_adapter_map", - "_joinpath", - "_joinpoint", - ): - query.__dict__.pop(attr, None) + # the before_compile() event can create a new Query object + # before it makes the statement. + query = statement.compile_options._orm_query # if the query is not safe to cache, we still do everything as though # we did cache it, since the receiver of _bake() assumes subqueryload # context was set up, etc. - if compile_state.compile_options._bake_ok: - self._bakery[self._effective_key(session)] = compile_state + # + # note also we want to cache the statement itself because this + # allows the statement itself to hold onto its cache key that is + # used by the Connection, which in itself is more expensive to + # generate than what BakedQuery was able to provide in 1.3 and prior + + if query.compile_options._bake_ok: + self._bakery[self._effective_key(session)] = ( + query, + statement, + ) - return compile_state + return query, statement def to_query(self, query_or_session): """Return the :class:`_query.Query` object for use as a subquery. @@ -321,50 +312,6 @@ class BakedQuery(object): return query - def _bake_subquery_loaders(self, session, compile_state): - """convert subquery eager loaders in the cache into baked queries. - - For subquery eager loading to work, all we need here is that the - Query point to the correct session when it is run. However, since - we are "baking" anyway, we may as well also turn the query into - a "baked" query so that we save on performance too. - - """ - compile_state.attributes["baked_queries"] = baked_queries = [] - for k, v in list(compile_state.attributes.items()): - if isinstance(v, dict) and "query" in v: - if "subqueryload_data" in k: - query = v["query"] - bk = BakedQuery(self._bakery, lambda *args: query) - bk._cache_key = self._cache_key + k - bk._bake(session) - baked_queries.append((k, bk._cache_key, v)) - del compile_state.attributes[k] - - def _unbake_subquery_loaders( - self, session, compile_state, context, params, post_criteria - ): - """Retrieve subquery eager loaders stored by _bake_subquery_loaders - and turn them back into Result objects that will iterate just - like a Query object. - - """ - if "baked_queries" not in compile_state.attributes: - return - - for k, cache_key, v in compile_state.attributes["baked_queries"]: - query = v["query"] - bk = BakedQuery( - self._bakery, lambda sess, q=query: q.with_session(sess) - ) - bk._cache_key = cache_key - q = bk.for_session(session) - for fn in post_criteria: - q = q.with_post_criteria(fn) - v = dict(v) - v["query"] = q.params(**params) - context.attributes[k] = v - class Result(object): """Invokes a :class:`.BakedQuery` against a :class:`.Session`. @@ -406,17 +353,19 @@ class Result(object): This adds a function that will be run against the :class:`_query.Query` object after it is retrieved from the - cache. Functions here can be used to alter the query in ways - that **do not affect the SQL output**, such as execution options - and shard identifiers (when using a shard-enabled query object) + cache. This currently includes **only** the + :meth:`_query.Query.params` and :meth:`_query.Query.execution_options` + methods. .. warning:: :meth:`_baked.Result.with_post_criteria` functions are applied to the :class:`_query.Query` object **after** the query's SQL statement - object has been retrieved from the cache. Any operations here - which intend to modify the SQL should ensure that - :meth:`.BakedQuery.spoil` was called first. + object has been retrieved from the cache. Only + :meth:`_query.Query.params` and + :meth:`_query.Query.execution_options` + methods should be used. + .. versionadded:: 1.2 @@ -438,40 +387,41 @@ class Result(object): def _iter(self): bq = self.bq + if not self.session.enable_baked_queries or bq._spoiled: return self._as_query()._iter() - baked_compile_state = bq._bakery.get( - bq._effective_key(self.session), None + query, statement = bq._bakery.get( + bq._effective_key(self.session), (None, None) ) - if baked_compile_state is None: - baked_compile_state = bq._bake(self.session) - - context = QueryContext(baked_compile_state, self.session) - context.session = self.session - - bq._unbake_subquery_loaders( - self.session, - baked_compile_state, - context, - self._params, - self._post_criteria, - ) - - # asserts true - # if isinstance(baked_compile_state.statement, expression.Select): - # assert baked_compile_state.statement._label_style == \ - # LABEL_STYLE_TABLENAME_PLUS_COL + if query is None: + query, statement = bq._bake(self.session) - if context.autoflush and not context.populate_existing: - self.session._autoflush() - q = context.orm_query.params(self._params).with_session(self.session) + q = query.params(self._params) for fn in self._post_criteria: q = fn(q) params = q.load_options._params + q.load_options += {"_orm_query": q} + execution_options = dict(q._execution_options) + execution_options.update( + { + "_sa_orm_load_options": q.load_options, + "compiled_cache": bq._bakery, + } + ) + + result = self.session.execute( + statement, params, execution_options=execution_options + ) + + if result._attributes.get("is_single_entity", False): + result = result.scalars() + + if result._attributes.get("filtered", False): + result = result.unique() - return q._execute_and_instances(context, params=params) + return result def count(self): """return the 'count'. @@ -583,10 +533,10 @@ class Result(object): query = self.bq.steps[0](self.session) return query._get_impl(ident, self._load_on_pk_identity) - def _load_on_pk_identity(self, query, primary_key_identity): + def _load_on_pk_identity(self, session, query, primary_key_identity, **kw): """Load the given primary key identity from the database.""" - mapper = query._only_full_mapper_zero("load_on_pk_identity") + mapper = query._raw_columns[0]._annotations["parententity"] _get_clause, _get_params = mapper._get_clause diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py index 919f4409a..1375a24cd 100644 --- a/lib/sqlalchemy/ext/horizontal_shard.py +++ b/lib/sqlalchemy/ext/horizontal_shard.py @@ -15,10 +15,8 @@ the source distribution. """ -import copy - +from sqlalchemy import event from .. import inspect -from .. import util from ..orm.query import Query from ..orm.session import Session @@ -37,54 +35,32 @@ class ShardedQuery(Query): all subsequent operations with the returned query will be against the single shard regardless of other state. - """ - q = self._clone() - q._shard_id = shard_id - return q + The shard_id can be passed for a 2.0 style execution to the + bind_arguments dictionary of :meth:`.Session.execute`:: - def _execute_and_instances(self, context, params=None): - if params is None: - params = self.load_options._params - - def iter_for_shard(shard_id): - # shallow copy, so that each context may be used by - # ORM load events and similar. - copied_context = copy.copy(context) - copied_context.attributes = context.attributes.copy() - - copied_context.attributes[ - "shard_id" - ] = copied_context.identity_token = shard_id - result_ = self._connection_from_session( - mapper=context.compile_state._bind_mapper(), shard_id=shard_id - ).execute( - copied_context.compile_state.statement, - self.load_options._params, + results = session.execute( + stmt, + bind_arguments={"shard_id": "my_shard"} ) - return self.instances(result_, copied_context) - if context.identity_token is not None: - return iter_for_shard(context.identity_token) - elif self._shard_id is not None: - return iter_for_shard(self._shard_id) - else: - partial = [] - for shard_id in self.query_chooser(self): - result_ = iter_for_shard(shard_id) - partial.append(result_) + """ - return partial[0].merge(*partial[1:]) + q = self._clone() + q._shard_id = shard_id + return q def _execute_crud(self, stmt, mapper): def exec_for_shard(shard_id): - conn = self._connection_from_session( + conn = self.session.connection( mapper=mapper, shard_id=shard_id, clause=stmt, close_with_result=True, ) - result = conn.execute(stmt, self.load_options._params) + result = conn._execute_20( + stmt, self.load_options._params, self._execution_options + ) return result if self._shard_id is not None: @@ -99,38 +75,6 @@ class ShardedQuery(Query): return ShardedResult(results, rowcount) - def _get_impl(self, primary_key_identity, db_load_fn, identity_token=None): - """Override the default Query._get_impl() method so that we emit - a query to the DB for each possible identity token, if we don't - have one already. - - """ - - def _db_load_fn(query, primary_key_identity): - # load from the database. The original db_load_fn will - # use the given Query object to load from the DB, so our - # shard_id is what will indicate the DB that we query from. - if self._shard_id is not None: - return db_load_fn(self, primary_key_identity) - else: - ident = util.to_list(primary_key_identity) - # build a ShardedQuery for each shard identifier and - # try to load from the DB - for shard_id in self.id_chooser(self, ident): - q = self.set_shard(shard_id) - o = db_load_fn(q, ident) - if o is not None: - return o - else: - return None - - if identity_token is None and self._shard_id is not None: - identity_token = self._shard_id - - return super(ShardedQuery, self)._get_impl( - primary_key_identity, _db_load_fn, identity_token=identity_token - ) - class ShardedResult(object): """A value object that represents multiple :class:`_engine.CursorResult` @@ -190,11 +134,14 @@ class ShardedSession(Session): """ super(ShardedSession, self).__init__(query_cls=query_cls, **kwargs) + + event.listen( + self, "do_orm_execute", execute_and_instances, retval=True + ) self.shard_chooser = shard_chooser self.id_chooser = id_chooser self.query_chooser = query_chooser self.__binds = {} - self.connection_callable = self.connection if shards is not None: for k in shards: self.bind_shard(k, shards[k]) @@ -207,8 +154,8 @@ class ShardedSession(Session): lazy_loaded_from=None, **kw ): - """override the default :meth:`.Session._identity_lookup` method so that we - search for a given non-token primary key identity across all + """override the default :meth:`.Session._identity_lookup` method so + that we search for a given non-token primary key identity across all possible identity tokens (e.g. shard ids). .. versionchanged:: 1.4 Moved :meth:`.Session._identity_lookup` from @@ -255,7 +202,14 @@ class ShardedSession(Session): state.identity_token = shard_id return shard_id - def connection(self, mapper=None, instance=None, shard_id=None, **kwargs): + def connection_callable( + self, mapper=None, instance=None, shard_id=None, **kwargs + ): + """Provide a :class:`_engine.Connection` to use in the unit of work + flush process. + + """ + if shard_id is None: shard_id = self._choose_shard_and_assign(mapper, instance) @@ -267,7 +221,7 @@ class ShardedSession(Session): ).connect(**kwargs) def get_bind( - self, mapper, shard_id=None, instance=None, clause=None, **kw + self, mapper=None, shard_id=None, instance=None, clause=None, **kw ): if shard_id is None: shard_id = self._choose_shard_and_assign( @@ -277,3 +231,55 @@ class ShardedSession(Session): def bind_shard(self, shard_id, bind): self.__binds[shard_id] = bind + + +def execute_and_instances(orm_context): + if orm_context.bind_arguments.get("_horizontal_shard", False): + return None + + params = orm_context.parameters + + load_options = orm_context.load_options + session = orm_context.session + orm_query = orm_context.orm_query + + if params is None: + params = load_options._params + + def iter_for_shard(shard_id, load_options): + execution_options = dict(orm_context.execution_options) + + bind_arguments = dict(orm_context.bind_arguments) + bind_arguments["_horizontal_shard"] = True + bind_arguments["shard_id"] = shard_id + + load_options += {"_refresh_identity_token": shard_id} + execution_options["_sa_orm_load_options"] = load_options + + return session.execute( + orm_context.statement, + orm_context.parameters, + execution_options, + bind_arguments, + ) + + if load_options._refresh_identity_token is not None: + shard_id = load_options._refresh_identity_token + elif orm_query is not None and orm_query._shard_id is not None: + shard_id = orm_query._shard_id + elif "shard_id" in orm_context.bind_arguments: + shard_id = orm_context.bind_arguments["shard_id"] + else: + shard_id = None + + if shard_id is not None: + return iter_for_shard(shard_id, load_options) + else: + partial = [] + for shard_id in session.query_chooser( + orm_query if orm_query is not None else orm_context.statement + ): + result_ = iter_for_shard(shard_id, load_options) + partial.append(result_) + + return partial[0].merge(*partial[1:]) |
