diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/sqlalchemy/orm/strategies.py | 479 |
1 files changed, 285 insertions, 194 deletions
diff --git a/lib/sqlalchemy/orm/strategies.py b/lib/sqlalchemy/orm/strategies.py index 53b3f2961..62a94093f 100644 --- a/lib/sqlalchemy/orm/strategies.py +++ b/lib/sqlalchemy/orm/strategies.py @@ -10,12 +10,12 @@ from sqlalchemy import exc as sa_exc from sqlalchemy import sql, util, log, event from sqlalchemy.sql import util as sql_util -from sqlalchemy.sql import visitors, expression, operators -from sqlalchemy.orm import mapper, attributes, interfaces, exc as orm_exc +from sqlalchemy.sql import visitors +from sqlalchemy.orm import attributes, interfaces, exc as orm_exc from sqlalchemy.orm.mapper import _none_set from sqlalchemy.orm.interfaces import ( LoaderStrategy, StrategizedOption, MapperOption, PropertyOption, - serialize_path, deserialize_path, StrategizedProperty + StrategizedProperty ) from sqlalchemy.orm import session as sessionlib, unitofwork from sqlalchemy.orm import util as mapperutil @@ -146,13 +146,13 @@ class ColumnLoader(LoaderStrategy): if adapter: col = adapter.columns[col] if col is not None and col in row: - def new_execute(state, dict_, row): + def fetch_col(state, dict_, row): dict_[key] = row[col] - return new_execute, None, None + return fetch_col, None, None else: - def new_execute(state, dict_, row): + def expire_for_non_present_col(state, dict_, row): state.expire_attribute_pre_commit(dict_, key) - return new_execute, None, None + return expire_for_non_present_col, None, None log.class_logger(ColumnLoader) @@ -171,15 +171,15 @@ class DeferredColumnLoader(LoaderStrategy): context, path, reduced_path, mapper, row, adapter) elif not self.is_class_level: - def new_execute(state, dict_, row): + def set_deferred_for_local_state(state, dict_, row): state.set_callable(dict_, key, LoadDeferredColumns(state, key)) + return set_deferred_for_local_state, None, None else: - def new_execute(state, dict_, row): + def reset_col_for_deferred(state, dict_, row): # reset state on the key so that deferred callables # fire off on next access. state.reset(dict_, key) - - return new_execute, None, None + return reset_col_for_deferred, None, None def init(self): if hasattr(self.parent_property, 'composite_class'): @@ -216,7 +216,6 @@ class DeferredColumnLoader(LoaderStrategy): if passive is attributes.PASSIVE_NO_FETCH: return attributes.PASSIVE_NO_RESULT - prop = self.parent_property localparent = state.manager.mapper if self.group: @@ -311,9 +310,9 @@ class NoLoader(AbstractRelationshipLoader): ) def create_row_processor(self, context, path, reduced_path, mapper, row, adapter): - def new_execute(state, dict_, row): + def invoke_no_load(state, dict_, row): state.initialize(self.key) - return new_execute, None, None + return invoke_no_load, None, None log.class_logger(NoLoader) @@ -461,11 +460,8 @@ class LazyLoader(AbstractRelationshipLoader): (not self.parent_property.load_on_pending or not state.session_id): return attributes.ATTR_EMPTY - instance_mapper = state.manager.mapper - prop = self.parent_property - key = self.key - prop_mapper = self.mapper pending = not state.key + ident_key = None if ( (passive is attributes.PASSIVE_NO_FETCH or \ @@ -482,31 +478,17 @@ class LazyLoader(AbstractRelationshipLoader): raise orm_exc.DetachedInstanceError( "Parent instance %s is not bound to a Session; " "lazy load operation of attribute '%s' cannot proceed" % - (mapperutil.state_str(state), key) + (mapperutil.state_str(state), self.key) ) # if we have a simple primary key load, check the # identity map without generating a Query at all if self.use_get: - if session._flushing: - get_attr = instance_mapper._get_committed_state_attr_by_column - else: - get_attr = instance_mapper._get_state_attr_by_column - - dict_ = state.dict - if passive is attributes.PASSIVE_NO_FETCH_RELATED: - attr_passive = attributes.PASSIVE_OFF - else: - attr_passive = passive - - ident = [ - get_attr( - state, - state.dict, - self._equated_columns[pk], - passive=attr_passive) - for pk in prop_mapper.primary_key - ] + ident = self._get_ident_for_use_get( + session, + state, + passive + ) if attributes.PASSIVE_NO_RESULT in ident: return attributes.PASSIVE_NO_RESULT elif attributes.NEVER_SET in ident: @@ -515,7 +497,7 @@ class LazyLoader(AbstractRelationshipLoader): if _none_set.issuperset(ident): return None - ident_key = prop_mapper.identity_key_from_primary_key(ident) + ident_key = self.mapper.identity_key_from_primary_key(ident) instance = Query._get_from_identity(session, ident_key, passive) if instance is not None: return instance @@ -523,16 +505,47 @@ class LazyLoader(AbstractRelationshipLoader): passive is attributes.PASSIVE_NO_FETCH_RELATED: return attributes.PASSIVE_NO_RESULT - q = session.query(prop_mapper)._adapt_all_clauses() + return self._emit_lazyload(session, state, ident_key) + + def _get_ident_for_use_get(self, session, state, passive): + instance_mapper = state.manager.mapper + + if session._flushing: + get_attr = instance_mapper._get_committed_state_attr_by_column + else: + get_attr = instance_mapper._get_state_attr_by_column + + # create a strong reference + # to state.dict + dict_ = state.dict + + if passive is attributes.PASSIVE_NO_FETCH_RELATED: + attr_passive = attributes.PASSIVE_OFF + else: + attr_passive = passive + + return [ + get_attr( + state, + state.dict, + self._equated_columns[pk], + passive=attr_passive) + for pk in self.mapper.primary_key + ] + + def _emit_lazyload(self, session, state, ident_key): + q = session.query(self.mapper)._adapt_all_clauses() q = q._with_invoke_all_eagers(False) + pending = not state.key + # don't autoflush on pending if pending: q = q.autoflush(False) if state.load_path: - q = q._with_current_path(state.load_path + (key,)) + q = q._with_current_path(state.load_path + (self.key,)) if state.load_options: q = q._conditional_options(*state.load_options) @@ -540,10 +553,10 @@ class LazyLoader(AbstractRelationshipLoader): if self.use_get: return q._load_on_ident(ident_key) - if prop.order_by: - q = q.order_by(*util.to_list(prop.order_by)) + if self.parent_property.order_by: + q = q.order_by(*util.to_list(self.parent_property.order_by)) - for rev in prop._reverse_property: + for rev in self.parent_property._reverse_property: # reverse props that are MANYTOONE are loading *this* # object from get(), so don't need to eager out to those. if rev.direction is interfaces.MANYTOONE and \ @@ -570,17 +583,18 @@ class LazyLoader(AbstractRelationshipLoader): util.warn( "Multiple rows returned with " "uselist=False for lazily-loaded attribute '%s' " - % prop) + % self.parent_property) return result[0] else: return None + def create_row_processor(self, context, path, reduced_path, mapper, row, adapter): key = self.key if not self.is_class_level: - def new_execute(state, dict_, row): + def set_lazy_callable(state, dict_, row): # we are not the primary manager for this attribute # on this class - set up a # per-instance lazyloader, which will override the @@ -590,8 +604,9 @@ class LazyLoader(AbstractRelationshipLoader): # attribute - "eager" attributes always have a # class-level lazyloader installed. state.set_callable(dict_, key, LoadLazyAttribute(state, key)) + return set_lazy_callable, None, None else: - def new_execute(state, dict_, row): + def reset_for_lazy_callable(state, dict_, row): # we are the primary manager for this attribute on # this class - reset its # per-instance attribute state, so that the class-level @@ -602,7 +617,7 @@ class LazyLoader(AbstractRelationshipLoader): # any existing state. state.reset(dict_, key) - return new_execute, None, None + return reset_for_lazy_callable, None, None @classmethod def _create_lazy_clause(cls, prop, reverse_direction=False): @@ -677,11 +692,12 @@ class ImmediateLoader(AbstractRelationshipLoader): parentmapper=None, **kwargs): pass - def create_row_processor(self, context, path, reduced_path, mapper, row, adapter): - def execute(state, dict_, row): + def create_row_processor(self, context, path, reduced_path, + mapper, row, adapter): + def load_immediate(state, dict_, row): state.get_impl(self.key).get(state, dict_) - return None, None, execute + return None, None, load_immediate class SubqueryLoader(AbstractRelationshipLoader): def init(self): @@ -694,7 +710,8 @@ class SubqueryLoader(AbstractRelationshipLoader): init_class_attribute(mapper) def setup_query(self, context, entity, - path, reduced_path, adapter, column_collection=None, + path, reduced_path, adapter, + column_collection=None, parentmapper=None, **kwargs): if not context.query._enable_eagerloads: @@ -715,13 +732,52 @@ class SubqueryLoader(AbstractRelationshipLoader): if len(path) / 2 > self.join_depth: return else: - if self.mapper.base_mapper in interfaces._reduce_path(subq_path): + if self.mapper.base_mapper in \ + interfaces._reduce_path(subq_path): return + subq_mapper, leftmost_mapper, leftmost_prop, \ + leftmost_cols, remote_cols, leftmost_attr = \ + self._get_leftmost(subq_path) + orig_query = context.attributes.get( ("orig_query", SubqueryLoader), context.query) + # generate a new Query from the original, then + # produce a subquery from it. + left_alias = self._generate_from_original_query( + orig_query, leftmost_mapper, + leftmost_attr, subq_path + ) + + # generate another Query that will join the + # left alias to the target relationships. + # basically doing a longhand + # "from_self()". (from_self() itself not quite industrial + # strength enough for all contingencies...but very close) + q = orig_query.session.query(self.mapper) + q._attributes = { + ("orig_query", SubqueryLoader): orig_query, + ('subquery_path', None) : subq_path + } + q = q._enable_single_crit(False) + + to_join, local_attr, parent_alias = \ + self._prep_for_joins(left_alias, subq_path) + q = q.order_by(*local_attr) + q = q.add_columns(*local_attr) + + q = self._apply_joins(q, to_join, left_alias, parent_alias) + + q = self._setup_options(q, subq_path, orig_query) + q = self._setup_outermost_orderby(q) + + # add new query to attributes to be picked up + # by create_row_processor + context.attributes[('subquery', reduced_path)] = q + + def _get_leftmost(self, subq_path): subq_mapper = mapperutil._class_to_mapper(subq_path[0]) # determine attributes of the leftmost mapper @@ -738,7 +794,13 @@ class SubqueryLoader(AbstractRelationshipLoader): leftmost_mapper._columntoproperty[c].class_attribute for c in leftmost_cols ] + return subq_mapper, leftmost_mapper, leftmost_prop, \ + leftmost_cols, remote_cols, leftmost_attr + def _generate_from_original_query(self, + orig_query, leftmost_mapper, + leftmost_attr, subq_path + ): # reformat the original query # to look only for significant columns q = orig_query._clone() @@ -759,18 +821,10 @@ class SubqueryLoader(AbstractRelationshipLoader): # which we'll join onto. embed_q = q.with_labels().subquery() left_alias = mapperutil.AliasedClass(leftmost_mapper, embed_q) + return left_alias - # q becomes a new query. basically doing a longhand - # "from_self()". (from_self() itself not quite industrial - # strength enough for all contingencies...but very close) - - q = q.session.query(self.mapper) - q._attributes = { - ("orig_query", SubqueryLoader): orig_query, - ('subquery_path', None) : subq_path - } - q = q._enable_single_crit(False) + def _prep_for_joins(self, left_alias, subq_path): # figure out what's being joined. a.k.a. the fun part to_join = [ (subq_path[i], subq_path[i+1]) @@ -804,9 +858,9 @@ class SubqueryLoader(AbstractRelationshipLoader): for c in local_cols ] - q = q.order_by(*local_attr) - q = q.add_columns(*local_attr) + return to_join, local_attr, parent_alias + def _apply_joins(self, q, to_join, left_alias, parent_alias): for i, (mapper, key) in enumerate(to_join): # we need to use query.join() as opposed to @@ -829,12 +883,27 @@ class SubqueryLoader(AbstractRelationshipLoader): q = q.join(parent_alias, attr, from_joinpoint=True) else: q = q.join(attr, aliased=middle, from_joinpoint=True) + return q + + def _local_remote_columns(self, prop): + if prop.secondary is None: + return zip(*prop.local_remote_pairs) + else: + return \ + [p[0] for p in prop.synchronize_pairs],\ + [ + p[0] for p in prop. + secondary_synchronize_pairs + ] + def _setup_options(self, q, subq_path, orig_query): # propagate loader options etc. to the new query. # these will fire relative to subq_path. q = q._with_current_path(subq_path) q = q._conditional_options(*orig_query._with_options) + return q + def _setup_outermost_orderby(self, q): if self.parent_property.order_by: # if there's an ORDER BY, alias it the same # way joinedloader does, but we have to pull out @@ -850,21 +919,7 @@ class SubqueryLoader(AbstractRelationshipLoader): ) ) q = q.order_by(*eager_order_by) - - # add new query to attributes to be picked up - # by create_row_processor - context.attributes[('subquery', reduced_path)] = q - - def _local_remote_columns(self, prop): - if prop.secondary is None: - return zip(*prop.local_remote_pairs) - else: - return \ - [p[0] for p in prop.synchronize_pairs],\ - [ - p[0] for p in prop. - secondary_synchronize_pairs - ] + return q def create_row_processor(self, context, path, reduced_path, mapper, row, adapter): @@ -881,10 +936,6 @@ class SubqueryLoader(AbstractRelationshipLoader): local_cols, remote_cols = self._local_remote_columns(self.parent_property) - remote_attr = [ - self.mapper._columntoproperty[c].key - for c in remote_cols] - q = context.attributes[('subquery', reduced_path)] collections = dict( @@ -898,30 +949,38 @@ class SubqueryLoader(AbstractRelationshipLoader): local_cols = [adapter.columns[c] for c in local_cols] if self.uselist: - def execute(state, dict_, row): - collection = collections.get( - tuple([row[col] for col in local_cols]), - () - ) - state.get_impl(self.key).\ - set_committed_value(state, dict_, collection) + return self._create_collection_loader(collections, local_cols) else: - def execute(state, dict_, row): - collection = collections.get( - tuple([row[col] for col in local_cols]), - (None,) - ) - if len(collection) > 1: - util.warn( - "Multiple rows returned with " - "uselist=False for eagerly-loaded attribute '%s' " - % self) + return self._create_scalar_loader(collections, local_cols) + + def _create_collection_loader(self, collections, local_cols): + def load_collection_from_subq(state, dict_, row): + collection = collections.get( + tuple([row[col] for col in local_cols]), + () + ) + state.get_impl(self.key).\ + set_committed_value(state, dict_, collection) - scalar = collection[0] - state.get_impl(self.key).\ - set_committed_value(state, dict_, scalar) + return load_collection_from_subq, None, None - return execute, None, None + def _create_scalar_loader(self, collections, local_cols): + def load_scalar_from_subq(state, dict_, row): + collection = collections.get( + tuple([row[col] for col in local_cols]), + (None,) + ) + if len(collection) > 1: + util.warn( + "Multiple rows returned with " + "uselist=False for eagerly-loaded attribute '%s' " + % self) + + scalar = collection[0] + state.get_impl(self.key).\ + set_committed_value(state, dict_, scalar) + + return load_scalar_from_subq, None, None log.class_logger(SubqueryLoader) @@ -951,25 +1010,12 @@ class JoinedLoader(AbstractRelationshipLoader): path = path + (self.key,) reduced_path = reduced_path + (self.key,) - # check for user-defined eager alias if ("user_defined_eager_row_processor", reduced_path) in\ context.attributes: - clauses = context.attributes[ - ("user_defined_eager_row_processor", - reduced_path)] - - adapter = entity._get_entity_clauses(context.query, context) - if adapter and clauses: - context.attributes[ - ("user_defined_eager_row_processor", - reduced_path)] = clauses = clauses.wrap(adapter) - elif adapter: - context.attributes[ - ("user_defined_eager_row_processor", - reduced_path)] = clauses = adapter - - add_to_collection = context.primary_columns - + clauses, adapter, add_to_collection = \ + self._get_user_defined_adapter( + context, entity, reduced_path, adapter + ) else: # check for join_depth or basic recursion, # if the current path was not explicitly stated as @@ -982,32 +1028,11 @@ class JoinedLoader(AbstractRelationshipLoader): if self.mapper.base_mapper in reduced_path: return - clauses = mapperutil.ORMAdapter( - mapperutil.AliasedClass(self.mapper), - equivalents=self.mapper._equivalent_columns, - adapt_required=True) - - if self.parent_property.direction != interfaces.MANYTOONE: - context.multi_row_eager_loaders = True - - innerjoin = allow_innerjoin and context.attributes.get( - ("eager_join_type", path), - self.parent_property.innerjoin) - if not innerjoin: - # if this is an outer join, all eager joins from - # here must also be outer joins - allow_innerjoin = False - - context.create_eager_joins.append( - (self._create_eager_join, context, - entity, path, adapter, - parentmapper, clauses, innerjoin) - ) - - add_to_collection = context.secondary_columns - context.attributes[ - ("eager_row_processor", reduced_path) - ] = clauses + clauses, adapter, add_to_collection, \ + allow_innerjoin = self._generate_row_adapter( + context, entity, path, reduced_path, adapter, + column_collection, parentmapper, allow_innerjoin + ) path += (self.mapper,) reduced_path += (self.mapper.base_mapper,) @@ -1023,6 +1048,57 @@ class JoinedLoader(AbstractRelationshipLoader): column_collection=add_to_collection, allow_innerjoin=allow_innerjoin) + def _get_user_defined_adapter(self, context, entity, + reduced_path, adapter): + clauses = context.attributes[ + ("user_defined_eager_row_processor", + reduced_path)] + + adapter = entity._get_entity_clauses(context.query, context) + if adapter and clauses: + context.attributes[ + ("user_defined_eager_row_processor", + reduced_path)] = clauses = clauses.wrap(adapter) + elif adapter: + context.attributes[ + ("user_defined_eager_row_processor", + reduced_path)] = clauses = adapter + + add_to_collection = context.primary_columns + return clauses, adapter, add_to_collection + + def _generate_row_adapter(self, + context, entity, path, reduced_path, adapter, + column_collection, parentmapper, allow_innerjoin + ): + clauses = mapperutil.ORMAdapter( + mapperutil.AliasedClass(self.mapper), + equivalents=self.mapper._equivalent_columns, + adapt_required=True) + + if self.parent_property.direction != interfaces.MANYTOONE: + context.multi_row_eager_loaders = True + + innerjoin = allow_innerjoin and context.attributes.get( + ("eager_join_type", path), + self.parent_property.innerjoin) + if not innerjoin: + # if this is an outer join, all eager joins from + # here must also be outer joins + allow_innerjoin = False + + context.create_eager_joins.append( + (self._create_eager_join, context, + entity, path, adapter, + parentmapper, clauses, innerjoin) + ) + + add_to_collection = context.secondary_columns + context.attributes[ + ("eager_row_processor", reduced_path) + ] = clauses + return clauses, adapter, add_to_collection, allow_innerjoin + def _create_eager_join(self, context, entity, path, adapter, parentmapper, clauses, innerjoin): @@ -1138,9 +1214,9 @@ class JoinedLoader(AbstractRelationshipLoader): return False try: - identity_key = self.mapper.identity_key_from_row(row, decorator) + self.mapper.identity_key_from_row(row, decorator) return decorator - except KeyError, k: + except KeyError: # no identity key - dont return a row # processor, will cause a degrade to lazy return False @@ -1169,52 +1245,10 @@ class JoinedLoader(AbstractRelationshipLoader): our_reduced_path + (self.mapper.base_mapper,), eager_adapter) - def eager_exec(state, dict_, row): - _instance(row, None) - if not self.uselist: - def new_execute(state, dict_, row): - # set a scalar object instance directly on the parent - # object, bypassing InstrumentedAttribute event handlers. - dict_[key] = _instance(row, None) - - def existing_execute(state, dict_, row): - # call _instance on the row, even though the object has - # been created, so that we further descend into properties - existing = _instance(row, None) - if existing is not None \ - and key in dict_ \ - and existing is not dict_[key]: - util.warn( - "Multiple rows returned with " - "uselist=False for eagerly-loaded attribute '%s' " - % self) - return new_execute, existing_execute, None, eager_exec + return self._create_scalar_loader(context, key, _instance) else: - def new_execute(state, dict_, row): - collection = attributes.init_state_collection( - state, dict_, key) - result_list = util.UniqueAppender(collection, - 'append_without_event') - context.attributes[(state, key)] = result_list - _instance(row, result_list) - - def existing_execute(state, dict_, row): - if (state, key) in context.attributes: - result_list = context.attributes[(state, key)] - else: - # appender_key can be absent from context.attributes - # with isnew=False when self-referential eager loading - # is used; the same instance may be present in two - # distinct sets of result columns - collection = attributes.init_state_collection(state, - dict_, key) - result_list = util.UniqueAppender( - collection, - 'append_without_event') - context.attributes[(state, key)] = result_list - _instance(row, result_list) - return new_execute, existing_execute, None, eager_exec + return self._create_collection_loader(context, key, _instance) else: return self.parent_property.\ _get_strategy(LazyLoader).\ @@ -1223,6 +1257,63 @@ class JoinedLoader(AbstractRelationshipLoader): reduced_path, mapper, row, adapter) + def _create_collection_loader(self, context, key, _instance): + def load_collection_from_joined_new_row(state, dict_, row): + collection = attributes.init_state_collection( + state, dict_, key) + result_list = util.UniqueAppender(collection, + 'append_without_event') + context.attributes[(state, key)] = result_list + _instance(row, result_list) + + def load_collection_from_joined_existing_row(state, dict_, row): + if (state, key) in context.attributes: + result_list = context.attributes[(state, key)] + else: + # appender_key can be absent from context.attributes + # with isnew=False when self-referential eager loading + # is used; the same instance may be present in two + # distinct sets of result columns + collection = attributes.init_state_collection(state, + dict_, key) + result_list = util.UniqueAppender( + collection, + 'append_without_event') + context.attributes[(state, key)] = result_list + _instance(row, result_list) + + def load_collection_from_joined_exec(state, dict_, row): + _instance(row, None) + + return load_collection_from_joined_new_row, \ + load_collection_from_joined_existing_row, \ + None, load_collection_from_joined_exec + + def _create_scalar_loader(self, context, key, _instance): + def load_scalar_from_joined_new_row(state, dict_, row): + # set a scalar object instance directly on the parent + # object, bypassing InstrumentedAttribute event handlers. + dict_[key] = _instance(row, None) + + def load_scalar_from_joined_existing_row(state, dict_, row): + # call _instance on the row, even though the object has + # been created, so that we further descend into properties + existing = _instance(row, None) + if existing is not None \ + and key in dict_ \ + and existing is not dict_[key]: + util.warn( + "Multiple rows returned with " + "uselist=False for eagerly-loaded attribute '%s' " + % self) + + def load_scalar_from_joined_exec(state, dict_, row): + _instance(row, None) + + return load_scalar_from_joined_new_row, \ + load_scalar_from_joined_existing_row, \ + None, load_scalar_from_joined_exec + EagerLoader = JoinedLoader """Deprecated, use JoinedLoader""" |
