diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-04-24 15:34:19 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-05-02 11:18:13 -0400 |
| commit | bbf644862ab05734d153d74abf59aa3492278563 (patch) | |
| tree | 0a563147603ff2906422ed1b07e9f5f03e7584c8 /lib/sqlalchemy | |
| parent | 7acf9af1ce74a0bda4c4d29af7da543b5c42b3f8 (diff) | |
| download | sqlalchemy-bbf644862ab05734d153d74abf59aa3492278563.tar.gz | |
Integrate new Result into ORM query
The next step in the 2.0 ORM changes is to have the
ORM integrate with the new Result object fully.
this patch uses Result to represent ORM objects rather
than lists. public API to get at this Result is not
added yet. dogpile.cache and horizontal sharding
recipe/extensions have small adjustments to accommodate
this change.
Callcounts have fluctuated, some slightly better and
some slightly worse. A few have gone up by a bit,
however as the codebase is still in flux it is anticipated
there will be some performance gains later on as
ORM fetching is refined to no longer need to accommodate
for extensive aliasing. The addition of caching
will then change the entire story.
References: #5087
References: #4395
Change-Id: If1a23824ffb77d8d58cf2338cf35dd6b5963b17f
Diffstat (limited to 'lib/sqlalchemy')
| -rw-r--r-- | lib/sqlalchemy/ext/baked.py | 7 | ||||
| -rw-r--r-- | lib/sqlalchemy/ext/horizontal_shard.py | 25 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/loading.py | 100 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/query.py | 70 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/strategies.py | 8 |
5 files changed, 117 insertions, 93 deletions
diff --git a/lib/sqlalchemy/ext/baked.py b/lib/sqlalchemy/ext/baked.py index e5e31c1f9..ca07be784 100644 --- a/lib/sqlalchemy/ext/baked.py +++ b/lib/sqlalchemy/ext/baked.py @@ -425,9 +425,12 @@ class Result(object): return str(self._as_query()) def __iter__(self): + return iter(self._iter()) + + def _iter(self): bq = self.bq if not self.session.enable_baked_queries or bq._spoiled: - return iter(self._as_query()) + return self._as_query()._iter() baked_context = bq._bakery.get(bq._effective_key(self.session), None) if baked_context is None: @@ -548,7 +551,7 @@ class Result(object): Equivalent to :meth:`_query.Query.all`. """ - return list(self) + return self._iter().all() def get(self, ident): """Retrieve an object based on identity. diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py index aa2921498..931f45699 100644 --- a/lib/sqlalchemy/ext/horizontal_shard.py +++ b/lib/sqlalchemy/ext/horizontal_shard.py @@ -15,12 +15,13 @@ the source distribution. """ +import copy + from .. import inspect from .. import util from ..orm.query import Query from ..orm.session import Session - __all__ = ["ShardedSession", "ShardedQuery"] @@ -44,11 +45,18 @@ class ShardedQuery(Query): def _execute_and_instances(self, context): def iter_for_shard(shard_id): - context.attributes["shard_id"] = context.identity_token = shard_id - result = self._connection_from_session( + # shallow copy, so that each context may be used by + # ORM load events and similar. + copied_context = copy.copy(context) + copied_context.attributes = context.attributes.copy() + + copied_context.attributes[ + "shard_id" + ] = copied_context.identity_token = shard_id + result_ = self._connection_from_session( mapper=self._bind_mapper(), shard_id=shard_id - ).execute(context.statement, self._params) - return self.instances(result, context) + ).execute(copied_context.statement, self._params) + return self.instances(result_, copied_context) if context.identity_token is not None: return iter_for_shard(context.identity_token) @@ -57,11 +65,10 @@ class ShardedQuery(Query): else: partial = [] for shard_id in self.query_chooser(self): - partial.extend(iter_for_shard(shard_id)) + result_ = iter_for_shard(shard_id) + partial.append(result_) - # if some kind of in memory 'sorting' - # were done, this is where it would happen - return iter(partial) + return partial[0].merge(*partial[1:]) def _execute_crud(self, stmt, mapper): def exec_for_shard(shard_id): diff --git a/lib/sqlalchemy/orm/loading.py b/lib/sqlalchemy/orm/loading.py index d781df980..10d937945 100644 --- a/lib/sqlalchemy/orm/loading.py +++ b/lib/sqlalchemy/orm/loading.py @@ -29,9 +29,11 @@ from .util import state_str from .. import exc as sa_exc from .. import util from ..engine import result_tuple +from ..engine.result import ChunkedIteratorResult +from ..engine.result import FrozenResult +from ..engine.result import SimpleResultMetaData from ..sql import util as sql_util - _new_runid = util.counter() @@ -41,20 +43,7 @@ def instances(query, cursor, context): context.runid = _new_runid() context.post_load_paths = {} - filtered = query._has_mapper_entities - - single_entity = query.is_single_entity - - if filtered: - if single_entity: - filter_fn = id - else: - - def filter_fn(row): - return tuple( - id(item) if ent.use_id_for_hash else item - for ent, item in zip(query._entities, row) - ) + single_entity = context.is_single_entity try: (process, labels, extra) = list( @@ -66,42 +55,66 @@ def instances(query, cursor, context): ) ) - if not single_entity: - keyed_tuple = result_tuple(labels, extra) + if query._yield_per and ( + context.loaders_require_buffering + or context.loaders_require_uniquing + ): + raise sa_exc.InvalidRequestError( + "Can't use yield_per with eager loaders that require uniquing " + "or row buffering, e.g. joinedload() against collections " + "or subqueryload(). Consider the selectinload() strategy " + "for better flexibility in loading objects." + ) + + except Exception: + with util.safe_reraise(): + cursor.close() + + row_metadata = SimpleResultMetaData( + labels, + extra, + _unique_filters=[ + id if ent.use_id_for_hash else None for ent in query._entities + ], + ) + def chunks(size): while True: + yield_per = size + context.partials = {} - if query._yield_per: - fetch = cursor.fetchmany(query._yield_per) + if yield_per: + fetch = cursor.fetchmany(yield_per) if not fetch: break else: fetch = cursor.fetchall() - if single_entity: - proc = process[0] - rows = [proc(row) for row in fetch] - else: - rows = [ - keyed_tuple([proc(row) for proc in process]) - for row in fetch - ] + rows = [tuple([proc(row) for proc in process]) for row in fetch] for path, post_load in context.post_load_paths.items(): post_load.invoke(context, path) - if filtered: - rows = util.unique_list(rows, filter_fn) + yield rows - for row in rows: - yield row - - if not query._yield_per: + if not yield_per: break - except Exception: - with util.safe_reraise(): - cursor.close() + + result = ChunkedIteratorResult(row_metadata, chunks) + if query._yield_per: + result.yield_per(query._yield_per) + + if single_entity: + result = result.scalars() + + # filtered = context.loaders_require_uniquing + filtered = query._has_mapper_entities + + if filtered: + result = result.unique() + + return result @util.preload_module("sqlalchemy.orm.query") @@ -114,10 +127,18 @@ def merge_result(query, iterator, load=True): # flush current contents if we expect to load data session._autoflush() + # TODO: need test coverage and documentation for the FrozenResult + # use case. + if isinstance(iterator, FrozenResult): + frozen_result = iterator + iterator = iter(frozen_result.data) + else: + frozen_result = None + autoflush = session.autoflush try: session.autoflush = False - single_entity = len(query._entities) == 1 + single_entity = not frozen_result and len(query._entities) == 1 if single_entity: if isinstance(query._entities[0], querylib._MapperEntity): result = [ @@ -156,7 +177,10 @@ def merge_result(query, iterator, load=True): ) result.append(keyed_tuple(newrow)) - return iter(result) + if frozen_result: + return frozen_result.with_data(result) + else: + return iter(result) finally: session.autoflush = autoflush diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index 1fc299cec..5588828eb 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -728,15 +728,6 @@ class Query(Generative): """ self._enable_eagerloads = value - def _no_yield_per(self, message): - raise sa_exc.InvalidRequestError( - "The yield_per Query option is currently not " - "compatible with %s eager loading. Please " - "specify lazyload('*') or query.enable_eagerloads(False) in " - "order to " - "proceed with query.yield_per()." % message - ) - @_generative def with_labels(self): """Apply column labels to the return value of Query.statement. @@ -3238,7 +3229,7 @@ class Query(Generative): :ref:`faq_query_deduplicating` """ - return list(self) + return self._iter().all() @_generative @_assertions(_no_clauseelement_condition) @@ -3283,14 +3274,11 @@ class Query(Generative): :meth:`_query.Query.one_or_none` """ + # replicates limit(1) behavior if self._statement is not None: - ret = list(self)[0:1] + return self._iter().first() else: - ret = list(self[0:1]) - if len(ret) > 0: - return ret[0] - else: - return None + return self.limit(1)._iter().first() def one_or_none(self): """Return at most one result or raise an exception. @@ -3316,17 +3304,7 @@ class Query(Generative): :meth:`_query.Query.one` """ - ret = list(self) - - l = len(ret) - if l == 1: - return ret[0] - elif l == 0: - return None - else: - raise orm_exc.MultipleResultsFound( - "Multiple rows were found for one_or_none()" - ) + return self._iter().one_or_none() def one(self): """Return exactly one result or raise an exception. @@ -3346,19 +3324,7 @@ class Query(Generative): :meth:`_query.Query.one_or_none` """ - try: - ret = self.one_or_none() - except orm_exc.MultipleResultsFound as err: - util.raise_( - orm_exc.MultipleResultsFound( - "Multiple rows were found for one()" - ), - replace_context=err, - ) - else: - if ret is None: - raise orm_exc.NoResultFound("No row was found for one()") - return ret + return self._iter().one() def scalar(self): """Return the first element of the first result or None @@ -3379,6 +3345,7 @@ class Query(Generative): This results in an execution of the underlying query. """ + # TODO: not sure why we can't use result.scalar() here try: ret = self.one() if not isinstance(ret, collections_abc.Sequence): @@ -3388,6 +3355,24 @@ class Query(Generative): return None def __iter__(self): + return self._iter().__iter__() + + # TODO: having _iter(), _execute_and_instances, _connection_from_session, + # etc., is all too much. + + # new recipes / extensions should be based on an event hook of some kind, + # can allow an execution that would return a Result to take in all the + # information and return a different Result. this has to be at + # the session / connection .execute() level, and can perhaps be + # before_execute() but needs to be focused around rewriting of results. + + # the dialect do_execute() *may* be this but that seems a bit too low + # level. it may need to be ORM session based and be a session event, + # becasue it might not invoke the cursor, might invoke for multiple + # connections, etc. OK really has to be a session level event in this + # case to support horizontal sharding. + + def _iter(self): context = self._compile_context() context.statement.label_style = LABEL_STYLE_TABLENAME_PLUS_COL if self._autoflush: @@ -4795,6 +4780,9 @@ class QueryContext(object): "post_load_paths", "identity_token", "single_inh_entities", + "is_single_entity", + "loaders_require_uniquing", + "loaders_require_buffering", ) def __init__(self, query): @@ -4815,6 +4803,8 @@ class QueryContext(object): self.whereclause = query._criterion self.order_by = query._order_by + self.is_single_entity = query.is_single_entity + self.loaders_require_buffering = self.loaders_require_uniquing = False self.multi_row_eager_loaders = False self.adapter = None self.froms = () diff --git a/lib/sqlalchemy/orm/strategies.py b/lib/sqlalchemy/orm/strategies.py index 2e9b2f316..9009c3425 100644 --- a/lib/sqlalchemy/orm/strategies.py +++ b/lib/sqlalchemy/orm/strategies.py @@ -1070,8 +1070,8 @@ class SubqueryLoader(PostLoader): if not context.query._enable_eagerloads or context.refresh_state: return - elif context.query._yield_per: - context.query._no_yield_per("subquery") + + context.loaders_require_buffering = True path = path[self.parent_property] @@ -1536,8 +1536,8 @@ class JoinedLoader(AbstractRelationshipLoader): if not context.query._enable_eagerloads: return - elif context.query._yield_per and self.uselist: - context.query._no_yield_per("joined collection") + elif self.uselist: + context.loaders_require_uniquing = True path = path[self.parent_property] |
