diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/sqlalchemy/orm/dependency.py | 3 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/interfaces.py | 19 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/mapper.py | 81 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/properties.py | 7 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/unitofwork.py | 675 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/uowdumper.py | 101 | ||||
| -rw-r--r-- | lib/sqlalchemy/topological.py | 199 |
7 files changed, 121 insertions, 964 deletions
diff --git a/lib/sqlalchemy/orm/dependency.py b/lib/sqlalchemy/orm/dependency.py index cbbfb0883..fa3331804 100644 --- a/lib/sqlalchemy/orm/dependency.py +++ b/lib/sqlalchemy/orm/dependency.py @@ -176,7 +176,8 @@ class DependencyProcessor(object): if state is not None and self.post_update: for x in related: if x is not None and not self._check_reverse_action(uowcommit, x, state, "postupdate"): - uowcommit.register_object(state, postupdate=True, post_update_cols=[r for l, r in self.prop.synchronize_pairs]) + uowcommit.register_object(state, postupdate=True, + post_update_cols=[r for l, r in self.prop.synchronize_pairs]) self._performed_action(uowcommit, x, state, "postupdate") break diff --git a/lib/sqlalchemy/orm/interfaces.py b/lib/sqlalchemy/orm/interfaces.py index 579101f0d..412fabc23 100644 --- a/lib/sqlalchemy/orm/interfaces.py +++ b/lib/sqlalchemy/orm/interfaces.py @@ -498,24 +498,7 @@ class MapperProperty(object): """ pass - def register_dependencies(self, *args, **kwargs): - """Called by the ``Mapper`` in response to the UnitOfWork - calling the ``Mapper``'s register_dependencies operation. - Establishes a topological dependency between two mappers - which will affect the order in which mappers persist data. - - """ - - pass - - def register_processors(self, *args, **kwargs): - """Called by the ``Mapper`` in response to the UnitOfWork - calling the ``Mapper``'s register_processors operation. - Establishes a processor object between two mappers which - will link data and state between parent/child objects. - - """ - + def get_flush_actions(self, uowtransaction, records, state): pass def is_primary(self): diff --git a/lib/sqlalchemy/orm/mapper.py b/lib/sqlalchemy/orm/mapper.py index 8f0f2128b..18436e211 100644 --- a/lib/sqlalchemy/orm/mapper.py +++ b/lib/sqlalchemy/orm/mapper.py @@ -22,7 +22,7 @@ deque = __import__('collections').deque from sqlalchemy import sql, util, log, exc as sa_exc from sqlalchemy.sql import expression, visitors, operators, util as sqlutil -from sqlalchemy.orm import attributes, sync, exc as orm_exc +from sqlalchemy.orm import attributes, sync, exc as orm_exc, unitofwork from sqlalchemy.orm.interfaces import ( MapperProperty, EXT_CONTINUE, PropComparator ) @@ -1245,6 +1245,45 @@ class Mapper(object): ret[t] = table_to_mapper[t] return ret + @util.memoized_property + def _sorted_table_list(self): + l = [] + for mapper in self.base_mapper.polymorphic_iterator(): + for t in mapper.tables: + l.append(t) + + return sqlutil.sort_tables(l) + + + def get_flush_actions(self, uowtransaction, state): + if isdelete: + type_ = Delete + tables = reversed(mapper._sorted_table_list) + elif not _state_has_identity(state): + type_ = Insert + tables = mapper._sorted_table_list + else: + type_ = Update + tables = mapper._sorted_table_list + + recs = [ + type_(state, table) + for table in tables + ] + for i, rec in enumerate(recs): + if i > 0: + self._dependency(recs[i - 1], recs[i]) + recs.append(SyncKeys(state, recs[i - 1].table, recs[i].table)) + + dep_recs = [] + for prop in mapper._props.values(): + dp = prop.get_flush_actions(uowtransaction, recs, state) + if dp: + dep_recs.extend(dp) + + return recs + dep_recs + + def _save_obj(self, states, uowtransaction, postupdate=False, post_update_cols=None, single=False): """Issue ``INSERT`` and/or ``UPDATE`` statements for a list of objects. @@ -1595,21 +1634,6 @@ class Mapper(object): if 'after_delete' in mapper.extension: mapper.extension.after_delete(mapper, connection, state.obj()) - def _register_dependencies(self, uowcommit): - """Register ``DependencyProcessor`` instances with a - ``unitofwork.UOWTransaction``. - - This call `register_dependencies` on all attached - ``MapperProperty`` instances. - - """ - for dep in self._props.values() + self._dependency_processors: - dep.register_dependencies(uowcommit) - - def _register_processors(self, uowcommit): - for dep in self._props.values() + self._dependency_processors: - dep.register_processors(uowcommit) - def _instance_processor(self, context, path, adapter, polymorphic_from=None, extension=None, only_load_props=None, refresh_state=None, @@ -1853,6 +1877,31 @@ class Mapper(object): log.class_logger(Mapper) +class Insert(unitofwork.Rec): + def __init__(self, mapper, state, table): + self.mapper = mapper + self.state = state + self.table = table + +class Update(unitofwork.Rec): + def __init__(self, mapper, state, table): + self.mapper = mapper + self.state = state + self.table = table + +class Delete(unitofwork.Rec): + def __init__(self, mapper, state, table): + self.mapper = mapper + self.state = state + self.table = table + +class SyncKeys(unitofwork.Rec): + def __init__(self, mapper, state, parent, child): + self.mapper = mapper + self.state = state + self.parent = parent + self.child = child + def reconstructor(fn): """Decorate a method as the 'reconstructor' hook. diff --git a/lib/sqlalchemy/orm/properties.py b/lib/sqlalchemy/orm/properties.py index 5c43fd355..7de02d3f0 100644 --- a/lib/sqlalchemy/orm/properties.py +++ b/lib/sqlalchemy/orm/properties.py @@ -1191,13 +1191,10 @@ class RelationshipProperty(StrategizedProperty): source_selectable, dest_selectable, secondary, target_adapter) - def register_dependencies(self, uowcommit): + def get_flush_actions(self, uowtransaction, records, state): if not self.viewonly: - self._dependency_processor.register_dependencies(uowcommit) + return self._depency_processor.get_flush_actions(uowtransaction, records, state) - def register_processors(self, uowcommit): - if not self.viewonly: - self._dependency_processor.register_processors(uowcommit) PropertyLoader = RelationProperty = RelationshipProperty log.class_logger(RelationshipProperty) diff --git a/lib/sqlalchemy/orm/unitofwork.py b/lib/sqlalchemy/orm/unitofwork.py index c0a088b01..dc4adff96 100644 --- a/lib/sqlalchemy/orm/unitofwork.py +++ b/lib/sqlalchemy/orm/unitofwork.py @@ -23,7 +23,7 @@ changes at once. from sqlalchemy import util, log, topological from sqlalchemy.orm import attributes, interfaces from sqlalchemy.orm import util as mapperutil -from sqlalchemy.orm.mapper import _state_mapper +from sqlalchemy.orm.util import _state_mapper # Load lazily object_session = None @@ -40,7 +40,8 @@ class UOWEventHandler(interfaces.AttributeExtension): self.key = key def append(self, state, item, initiator): - # process "save_update" cascade rules for when an instance is appended to the list of another instance + # process "save_update" cascade rules for when + # an instance is appended to the list of another instance sess = _state_session(state) if sess: prop = _state_mapper(state).get_property(self.key) @@ -77,29 +78,23 @@ class UOWTransaction(object): """Handles the details of organizing and executing transaction tasks during a UnitOfWork object's flush() operation. - The central operation is to form a graph of nodes represented by the - ``UOWTask`` class, which is then traversed by a ``UOWExecutor`` object - that issues SQL and instance-synchronizing operations via the related - packages. """ def __init__(self, session): self.session = session self.mapper_flush_opts = session._mapper_flush_opts - # stores tuples of mapper/dependent mapper pairs, - # representing a partial ordering fed into topological sort - self.dependencies = set() - - # dictionary of mappers to UOWTasks - self.tasks = {} - # dictionary used by external actors to store arbitrary state # information. self.attributes = {} - self.processors = set() - + self.recs = [] + self.states = set() + self.dependencies = [] + + def _dependency(self, rec1, rec2): + self.dependencies.append((rec1, rec2)) + def get_attribute_history(self, state, key, passive=True): hashkey = ("history", state, key) @@ -123,149 +118,32 @@ class UOWTransaction(object): return history.as_state() def register_object(self, state, isdelete=False, - listonly=False, postupdate=False, post_update_cols=None): + listonly=False, postupdate=False, + post_update_cols=None): # if object is not in the overall session, do nothing if not self.session._contains_state(state): return - - mapper = _state_mapper(state) - - task = self.get_task_by_mapper(mapper) - if postupdate: - task.append_postupdate(state, post_update_cols) - else: - task.append(state, listonly=listonly, isdelete=isdelete) - - # ensure the mapper for this object has had its - # DependencyProcessors added. - if mapper not in self.processors: - mapper._register_processors(self) - self.processors.add(mapper) - - if mapper.base_mapper not in self.processors: - mapper.base_mapper._register_processors(self) - self.processors.add(mapper.base_mapper) - - def set_row_switch(self, state): - """mark a deleted object as a 'row switch'. - - this indicates that an INSERT statement elsewhere corresponds to this DELETE; - the INSERT is converted to an UPDATE and the DELETE does not occur. - """ - mapper = _state_mapper(state) - task = self.get_task_by_mapper(mapper) - taskelement = task._objects[state] - taskelement.isdelete = "rowswitch" - - def is_deleted(self, state): - """return true if the given state is marked as deleted within this UOWTransaction.""" - + if state in self.states: + return + mapper = _state_mapper(state) - task = self.get_task_by_mapper(mapper) - return task.is_deleted(state) - - def get_task_by_mapper(self, mapper, dontcreate=False): - """return UOWTask element corresponding to the given mapper. - - Will create a new UOWTask, including a UOWTask corresponding to the - "base" inherited mapper, if needed, unless the dontcreate flag is True. - """ - try: - return self.tasks[mapper] - except KeyError: - if dontcreate: - return None - - base_mapper = mapper.base_mapper - if base_mapper in self.tasks: - base_task = self.tasks[base_mapper] - else: - self.tasks[base_mapper] = base_task = UOWTask(self, base_mapper) - base_mapper._register_dependencies(self) - - if mapper not in self.tasks: - self.tasks[mapper] = task = UOWTask(self, mapper, base_task=base_task) - mapper._register_dependencies(self) - else: - task = self.tasks[mapper] - - return task - - def register_dependency(self, mapper, dependency): - """register a dependency between two mappers. - - Called by ``mapper.PropertyLoader`` to register the objects - handled by one mapper being dependent on the objects handled - by another. - - """ - # correct for primary mapper - # also convert to the "base mapper", the parentmost task at the top of an inheritance chain - # dependency sorting is done via non-inheriting mappers only, dependencies between mappers - # in the same inheritance chain is done at the per-object level - mapper = mapper.primary_mapper().base_mapper - dependency = dependency.primary_mapper().base_mapper - - self.dependencies.add((mapper, dependency)) - - def register_processor(self, mapper, processor, mapperfrom): - """register a dependency processor, corresponding to - operations which occur between two mappers. + self.states.add(state) - """ - # correct for primary mapper - mapper = mapper.primary_mapper() - mapperfrom = mapperfrom.primary_mapper() - - task = self.get_task_by_mapper(mapper) - targettask = self.get_task_by_mapper(mapperfrom) - up = UOWDependencyProcessor(processor, targettask) - task.dependencies.add(up) - - def execute(self): - """Execute this UOWTransaction. - - This will organize all collected UOWTasks into a dependency-sorted - list which is then traversed using the traversal scheme - encoded in the UOWExecutor class. Operations to mappers and dependency - processors are fired off in order to issue SQL to the database and - synchronize instance attributes with database values and related - foreign key values.""" - - # pre-execute dependency processors. this process may - # result in new tasks, objects and/or dependency processors being added, - # particularly with 'delete-orphan' cascade rules. - # keep running through the full list of tasks until all - # objects have been processed. - while True: - ret = False - for task in self.tasks.values(): - for up in list(task.dependencies): - if up.preexecute(self): - ret = True - if not ret: - break - - tasks = self._sort_dependencies() - if self._should_log_info(): - self.logger.info("Task dump:\n%s", self._dump(tasks)) - UOWExecutor().execute(self, tasks) - self.logger.info("Execute Complete") - - def _dump(self, tasks): - from uowdumper import UOWDumper - return UOWDumper.dump(tasks) - - @property - def elements(self): - """Iterate UOWTaskElements.""" + self.recs.extend( + mapper.get_flush_actions(self, state) + ) - for task in self.tasks.itervalues(): - for elem in task.elements: - yield elem + + def execute(self): + # so here, thinking we could figure out a way to get + # consecutive, "compatible" records to collapse together, + # i.e. a bunch of updates become an executemany(), etc. + # even though we usually need individual executes. + for rec in topological.sort(self.dependencies, self.recs): + rec.execute() def finalize_flush_changes(self): """mark processed objects as clean / deleted after a successful flush(). @@ -280,501 +158,10 @@ class UOWTransaction(object): elif not elem.listonly: self.session._register_newly_persistent(elem.state) - def _sort_dependencies(self): - nodes = topological.sort_with_cycles(self.dependencies, - [t.mapper for t in self.tasks.itervalues() if t.base_task is t] - ) - - ret = [] - for item, cycles in nodes: - task = self.get_task_by_mapper(item) - if cycles: - for t in task._sort_circular_dependencies( - self, - [self.get_task_by_mapper(i) for i in cycles] - ): - ret.append(t) - else: - ret.append(task) - - return ret - log.class_logger(UOWTransaction) -class UOWTask(object): - """A collection of mapped states corresponding to a particular mapper.""" - - def __init__(self, uowtransaction, mapper, base_task=None): - self.uowtransaction = uowtransaction - - # base_task is the UOWTask which represents the "base mapper" - # in our mapper's inheritance chain. if the mapper does not - # inherit from any other mapper, the base_task is self. - # the _inheriting_tasks dictionary is a dictionary present only - # on the "base_task"-holding UOWTask, which maps all mappers within - # an inheritance hierarchy to their corresponding UOWTask instances. - if base_task is None: - self.base_task = self - self._inheriting_tasks = {mapper:self} - else: - self.base_task = base_task - base_task._inheriting_tasks[mapper] = self - - # the Mapper which this UOWTask corresponds to - self.mapper = mapper - - # mapping of InstanceState -> UOWTaskElement - self._objects = {} - - self.dependent_tasks = [] - self.dependencies = set() - self.cyclical_dependencies = set() - - @util.memoized_property - def inheriting_mappers(self): - return list(self.mapper.polymorphic_iterator()) - - @property - def polymorphic_tasks(self): - """Return an iterator of UOWTask objects corresponding to the - inheritance sequence of this UOWTask's mapper. - - e.g. if mapper B and mapper C inherit from mapper A, and - mapper D inherits from B: - - mapperA -> mapperB -> mapperD - -> mapperC - - the inheritance sequence starting at mapper A is a depth-first - traversal: - - [mapperA, mapperB, mapperD, mapperC] - - this method will therefore return - - [UOWTask(mapperA), UOWTask(mapperB), UOWTask(mapperD), - UOWTask(mapperC)] - - The concept of "polymporphic iteration" is adapted into - several property-based iterators which return object - instances, UOWTaskElements and UOWDependencyProcessors in an - order corresponding to this sequence of parent UOWTasks. This - is used to issue operations related to inheritance-chains of - mappers in the proper order based on dependencies between - those mappers. - - """ - for mapper in self.inheriting_mappers: - t = self.base_task._inheriting_tasks.get(mapper, None) - if t is not None: - yield t - - def is_empty(self): - """return True if this UOWTask is 'empty', meaning it has no child items. - - used only for debugging output. - """ - - return not self._objects and not self.dependencies - - def append(self, state, listonly=False, isdelete=False): - if state not in self._objects: - self._objects[state] = rec = UOWTaskElement(state) - else: - rec = self._objects[state] - - rec.update(listonly, isdelete) - - def append_postupdate(self, state, post_update_cols): - """issue a 'post update' UPDATE statement via this object's mapper immediately. - - this operation is used only with relationships that specify the `post_update=True` - flag. - """ - - # postupdates are UPDATED immeditely (for now) - # convert post_update_cols list to a Set so that __hash__() is used to compare columns - # instead of __eq__() - self.mapper._save_obj([state], self.uowtransaction, postupdate=True, post_update_cols=set(post_update_cols)) - - def __contains__(self, state): - """return True if the given object is contained within this UOWTask or inheriting tasks.""" - - for task in self.polymorphic_tasks: - if state in task._objects: - return True - else: - return False - - def is_deleted(self, state): - """return True if the given object is marked as to be deleted within this UOWTask.""" - - try: - return self._objects[state].isdelete - except KeyError: - return False - - def _polymorphic_collection(fn): - """return a property that will adapt the collection returned by the - given callable into a polymorphic traversal.""" - - @property - def collection(self): - for task in self.polymorphic_tasks: - for rec in fn(task): - yield rec - return collection - - def _polymorphic_collection_filtered(fn): - - def collection(self, mappers): - for task in self.polymorphic_tasks: - if task.mapper in mappers: - for rec in fn(task): - yield rec - return collection - - @property - def elements(self): - return self._objects.values() - - @_polymorphic_collection - def polymorphic_elements(self): - return self.elements - - @_polymorphic_collection_filtered - def filter_polymorphic_elements(self): - return self.elements - - @property - def polymorphic_tosave_elements(self): - return [rec for rec in self.polymorphic_elements if not rec.isdelete] - - @property - def polymorphic_todelete_elements(self): - return [rec for rec in self.polymorphic_elements if rec.isdelete] - - @property - def polymorphic_tosave_objects(self): - return [ - rec.state for rec in self.polymorphic_elements - if rec.state is not None and not rec.listonly and rec.isdelete is False - ] - - @property - def polymorphic_todelete_objects(self): - return [ - rec.state for rec in self.polymorphic_elements - if rec.state is not None and not rec.listonly and rec.isdelete is True - ] - - @_polymorphic_collection - def polymorphic_dependencies(self): - return self.dependencies - - @_polymorphic_collection - def polymorphic_cyclical_dependencies(self): - return self.cyclical_dependencies - - def _sort_circular_dependencies(self, trans, cycles): - """Topologically sort individual entities with row-level dependencies. - - Builds a modified UOWTask structure, and is invoked when the - per-mapper topological structure is found to have cycles. - - """ - dependencies = {} - def set_processor_for_state(state, depprocessor, target_state, isdelete): - if state not in dependencies: - dependencies[state] = {} - tasks = dependencies[state] - if depprocessor not in tasks: - tasks[depprocessor] = UOWDependencyProcessor( - depprocessor.processor, - UOWTask(self.uowtransaction, depprocessor.targettask.mapper) - ) - tasks[depprocessor].targettask.append(target_state, isdelete=isdelete) - - cycles = set(cycles) - def dependency_in_cycles(dep): - proctask = trans.get_task_by_mapper(dep.processor.mapper.base_mapper, True) - targettask = trans.get_task_by_mapper(dep.targettask.mapper.base_mapper, True) - return targettask in cycles and (proctask is not None and proctask in cycles) - - deps_by_targettask = {} - extradeplist = [] - for task in cycles: - for dep in task.polymorphic_dependencies: - if not dependency_in_cycles(dep): - extradeplist.append(dep) - for t in dep.targettask.polymorphic_tasks: - l = deps_by_targettask.setdefault(t, []) - l.append(dep) - - object_to_original_task = {} - tuples = [] - - for task in cycles: - for subtask in task.polymorphic_tasks: - for taskelement in subtask.elements: - state = taskelement.state - object_to_original_task[state] = subtask - if subtask not in deps_by_targettask: - continue - for dep in deps_by_targettask[subtask]: - if not dep.processor.has_dependencies or not dependency_in_cycles(dep): - continue - (processor, targettask) = (dep.processor, dep.targettask) - isdelete = taskelement.isdelete - - # list of dependent objects from this object - (added, unchanged, deleted) = dep.get_object_dependencies(state, trans, passive=True) - if not added and not unchanged and not deleted: - continue - - # the task corresponding to saving/deleting of those dependent objects - childtask = trans.get_task_by_mapper(processor.mapper) - - childlist = added + unchanged + deleted - - for o in childlist: - if o is None: - continue - - if o not in childtask: - childtask.append(o, listonly=True) - object_to_original_task[o] = childtask - - whosdep = dep.whose_dependent_on_who(state, o) - if whosdep is not None: - tuples.append(whosdep) - - if whosdep[0] is state: - set_processor_for_state(whosdep[0], dep, whosdep[0], isdelete=isdelete) - else: - set_processor_for_state(whosdep[0], dep, whosdep[1], isdelete=isdelete) - else: - # TODO: no test coverage here - set_processor_for_state(state, dep, state, isdelete=isdelete) - - t = UOWTask(self.uowtransaction, self.mapper) - t.dependencies.update(extradeplist) - - used_tasks = set() - - # rationale for "tree" sort as opposed to a straight - # dependency - keep non-dependent objects - # grouped together, so that insert ordering as determined - # by session.add() is maintained. - # An alternative might be to represent the "insert order" - # as part of the topological sort itself, which would - # eliminate the need for this step (but may make the original - # topological sort more expensive) - head = topological.sort_as_tree(tuples, object_to_original_task.iterkeys()) - if head is not None: - original_to_tasks = {} - stack = [(head, t)] - while stack: - ((state, cycles, children), parenttask) = stack.pop() - - originating_task = object_to_original_task[state] - used_tasks.add(originating_task) - - if (parenttask, originating_task) not in original_to_tasks: - task = UOWTask(self.uowtransaction, originating_task.mapper) - original_to_tasks[(parenttask, originating_task)] = task - parenttask.dependent_tasks.append(task) - else: - task = original_to_tasks[(parenttask, originating_task)] - - task.append(state, originating_task._objects[state].listonly, isdelete=originating_task._objects[state].isdelete) - - if state in dependencies: - task.cyclical_dependencies.update(dependencies[state].itervalues()) - - stack += [(n, task) for n in children] - - ret = [t] - - # add tasks that were in the cycle, but didnt get assembled - # into the cyclical tree, to the start of the list - for t2 in cycles: - if t2 not in used_tasks and t2 is not self: - localtask = UOWTask(self.uowtransaction, t2.mapper) - for state in t2.elements: - localtask.append(state, t2.listonly, isdelete=t2._objects[state].isdelete) - for dep in t2.dependencies: - localtask.dependencies.add(dep) - ret.insert(0, localtask) - - return ret - - def __repr__(self): - return ("UOWTask(%s) Mapper: '%r'" % (hex(id(self)), self.mapper)) - -class UOWTaskElement(object): - """Corresponds to a single InstanceState to be saved, deleted, - or otherwise marked as having dependencies. A collection of - UOWTaskElements are held by a UOWTask. - - """ - def __init__(self, state): - self.state = state - self.listonly = True - self.isdelete = False - self.preprocessed = set() - - def update(self, listonly, isdelete): - if not listonly and self.listonly: - self.listonly = False - self.preprocessed.clear() - if isdelete and not self.isdelete: - self.isdelete = True - self.preprocessed.clear() - - def __repr__(self): - return "UOWTaskElement/%d: %s/%d %s" % ( - id(self), - self.state.class_.__name__, - id(self.state.obj()), - (self.listonly and 'listonly' or (self.isdelete and 'delete' or 'save')) - ) - -class UOWDependencyProcessor(object): - """In between the saving and deleting of objects, process - dependent data, such as filling in a foreign key on a child item - from a new primary key, or deleting association rows before a - delete. This object acts as a proxy to a DependencyProcessor. - - """ - def __init__(self, processor, targettask): - self.processor = processor - self.targettask = targettask - prop = processor.prop - - # define a set of mappers which - # will filter the lists of entities - # this UOWDP processes. this allows - # MapperProperties to be overridden - # at least for concrete mappers. - self._mappers = set([ - m - for m in self.processor.parent.polymorphic_iterator() - if m._props[prop.key] is prop - ]).union(self.processor.mapper.polymorphic_iterator()) - - def __repr__(self): - return "UOWDependencyProcessor(%s, %s)" % (str(self.processor), str(self.targettask)) - - def __eq__(self, other): - return other.processor is self.processor and other.targettask is self.targettask - - def __hash__(self): - return hash((self.processor, self.targettask)) - - def preexecute(self, trans): - """preprocess all objects contained within this ``UOWDependencyProcessor``s target task. - - This may locate additional objects which should be part of the - transaction, such as those affected deletes, orphans to be - deleted, etc. - - Once an object is preprocessed, its ``UOWTaskElement`` is marked as processed. If subsequent - changes occur to the ``UOWTaskElement``, its processed flag is reset, and will require processing - again. - - Return True if any objects were preprocessed, or False if no - objects were preprocessed. If True is returned, the parent ``UOWTransaction`` will - ultimately call ``preexecute()`` again on all processors until no new objects are processed. - """ - - def getobj(elem): - elem.preprocessed.add(self) - return elem.state - - ret = False - elements = [getobj(elem) for elem in - self.targettask.filter_polymorphic_elements(self._mappers) - if self not in elem.preprocessed and not elem.isdelete] - if elements: - ret = True - self.processor.preprocess_dependencies(self.targettask, elements, trans, delete=False) - - elements = [getobj(elem) for elem in - self.targettask.filter_polymorphic_elements(self._mappers) - if self not in elem.preprocessed and elem.isdelete] - if elements: - ret = True - self.processor.preprocess_dependencies(self.targettask, elements, trans, delete=True) - return ret - - def execute(self, trans, delete): - """process all objects contained within this ``UOWDependencyProcessor``s target task.""" - - - elements = [e for e in - self.targettask.filter_polymorphic_elements(self._mappers) - if bool(e.isdelete)==delete] - - self.processor.process_dependencies( - self.targettask, - [elem.state for elem in elements], - trans, - delete=delete) - - def get_object_dependencies(self, state, trans, passive): - return trans.get_attribute_history(state, self.processor.key, passive=passive) - - def whose_dependent_on_who(self, state1, state2): - """establish which object is operationally dependent amongst a parent/child - using the semantics stated by the dependency processor. - - This method is used to establish a partial ordering (set of dependency tuples) - when toplogically sorting on a per-instance basis. - - """ - return self.processor.whose_dependent_on_who(state1, state2) - -class UOWExecutor(object): - """Encapsulates the execution traversal of a UOWTransaction structure.""" - - def execute(self, trans, tasks, isdelete=None): - if isdelete is not True: - for task in tasks: - self.execute_save_steps(trans, task) - if isdelete is not False: - for task in reversed(tasks): - self.execute_delete_steps(trans, task) - - def save_objects(self, trans, task): - task.mapper._save_obj(task.polymorphic_tosave_objects, trans) - - def delete_objects(self, trans, task): - task.mapper._delete_obj(task.polymorphic_todelete_objects, trans) - - def execute_dependency(self, trans, dep, isdelete): - dep.execute(trans, isdelete) - - def execute_save_steps(self, trans, task): - self.save_objects(trans, task) - for dep in task.polymorphic_cyclical_dependencies: - self.execute_dependency(trans, dep, False) - for dep in task.polymorphic_cyclical_dependencies: - self.execute_dependency(trans, dep, True) - self.execute_cyclical_dependencies(trans, task, False) - self.execute_dependencies(trans, task) - - def execute_delete_steps(self, trans, task): - self.execute_cyclical_dependencies(trans, task, True) - self.delete_objects(trans, task) - - def execute_dependencies(self, trans, task): - polymorphic_dependencies = list(task.polymorphic_dependencies) - for dep in polymorphic_dependencies: - self.execute_dependency(trans, dep, False) - for dep in reversed(polymorphic_dependencies): - self.execute_dependency(trans, dep, True) +# TODO: don't know what these should be. +# its very hard not to use subclasses to define behavior here. +class Rec(object): + pass - def execute_cyclical_dependencies(self, trans, task, isdelete): - for t in task.dependent_tasks: - self.execute(trans, [t], isdelete) diff --git a/lib/sqlalchemy/orm/uowdumper.py b/lib/sqlalchemy/orm/uowdumper.py deleted file mode 100644 index dd96b6b9a..000000000 --- a/lib/sqlalchemy/orm/uowdumper.py +++ /dev/null @@ -1,101 +0,0 @@ -# orm/uowdumper.py -# Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Michael Bayer mike_mp@zzzcomputing.com -# -# This module is part of SQLAlchemy and is released under -# the MIT License: http://www.opensource.org/licenses/mit-license.php - -"""Dumps out a string representation of a UOWTask structure""" - -from sqlalchemy.orm import unitofwork -from sqlalchemy.orm import util as mapperutil -import StringIO - -class UOWDumper(unitofwork.UOWExecutor): - def __init__(self, tasks, buf): - self.indent = 0 - self.tasks = tasks - self.buf = buf - self.execute(None, tasks) - - @classmethod - def dump(cls, tasks): - buf = StringIO.StringIO() - UOWDumper(tasks, buf) - return buf.getvalue() - - def execute(self, trans, tasks, isdelete=None): - if isdelete is not True: - for task in tasks: - self._execute(trans, task, False) - if isdelete is not False: - for task in reversed(tasks): - self._execute(trans, task, True) - - def _execute(self, trans, task, isdelete): - try: - i = self._indent() - if i: - i = i[:-1] + "+-" - self.buf.write(i + " " + self._repr_task(task)) - self.buf.write(" (" + (isdelete and "delete " or "save/update ") + "phase) \n") - self.indent += 1 - super(UOWDumper, self).execute(trans, [task], isdelete) - finally: - self.indent -= 1 - - - def save_objects(self, trans, task): - for rec in sorted(task.polymorphic_tosave_elements, key=lambda a: a.state.sort_key): - if rec.listonly: - continue - self.buf.write(self._indent()[:-1] + "+-" + self._repr_task_element(rec) + "\n") - - def delete_objects(self, trans, task): - for rec in task.polymorphic_todelete_elements: - if rec.listonly: - continue - self.buf.write(self._indent() + "- " + self._repr_task_element(rec) + "\n") - - def execute_dependency(self, transaction, dep, isdelete): - self._dump_processor(dep, isdelete) - - def _dump_processor(self, proc, deletes): - if deletes: - val = proc.targettask.polymorphic_todelete_elements - else: - val = proc.targettask.polymorphic_tosave_elements - - for v in val: - self.buf.write(self._indent() + " +- " + self._repr_task_element(v, proc.processor.key, process=True) + "\n") - - def _repr_task_element(self, te, attribute=None, process=False): - if getattr(te, 'state', None) is None: - objid = "(placeholder)" - else: - if attribute is not None: - objid = "%s.%s" % (mapperutil.state_str(te.state), attribute) - else: - objid = mapperutil.state_str(te.state) - if process: - return "Process %s" % (objid) - else: - return "%s %s" % ((te.isdelete and "Delete" or "Save"), objid) - - def _repr_task(self, task): - if task.mapper is not None: - if task.mapper.__class__.__name__ == 'Mapper': - name = task.mapper.class_.__name__ + "/" + task.mapper.local_table.description - else: - name = repr(task.mapper) - else: - name = '(none)' - return ("UOWTask(%s, %s)" % (hex(id(task)), name)) - - def _repr_task_class(self, task): - if task.mapper is not None and task.mapper.__class__.__name__ == 'Mapper': - return task.mapper.class_.__name__ - else: - return '(none)' - - def _indent(self): - return " |" * self.indent diff --git a/lib/sqlalchemy/topological.py b/lib/sqlalchemy/topological.py index 76c0c717f..3f2ff6399 100644 --- a/lib/sqlalchemy/topological.py +++ b/lib/sqlalchemy/topological.py @@ -23,73 +23,21 @@ from sqlalchemy import util __all__ = ['sort', 'sort_with_cycles', 'sort_as_tree'] -def sort(tuples, allitems): - """sort the given list of items by dependency. - - 'tuples' is a list of tuples representing a partial ordering. - """ - - return [n.item for n in _sort(tuples, allitems, allow_cycles=False, ignore_self_cycles=True)] - -def sort_with_cycles(tuples, allitems): - """sort the given list of items by dependency, cutting out cycles. - - returns results as an iterable of 2-tuples, containing the item, - and a list containing items involved in a cycle with this item, if any. - - 'tuples' is a list of tuples representing a partial ordering. - """ - - return [(n.item, [n.item for n in n.cycles or []]) for n in _sort(tuples, allitems, allow_cycles=True)] - -def sort_as_tree(tuples, allitems, with_cycles=False): - """sort the given list of items by dependency, and return results - as a hierarchical tree structure. - - returns results as an iterable of 3-tuples, containing the item, - a list containing items involved in a cycle with this item, if any, - and a list of child tuples. +# TODO: obviate the need for a _Node class. +# a straight tuple should be used. +class _Node(tuple): + """Represent each item in the sort.""" - if with_cycles is False, the returned structure is of the same form - but the second element of each tuple, i.e. the 'cycles', is an empty list. + def __new__(cls, item): + children = [] + t = tuple.__new__(cls, [item, children]) + t.item = item + t.children = children + return t - 'tuples' is a list of tuples representing a partial ordering. - """ - - return _organize_as_tree(_sort(tuples, allitems, allow_cycles=with_cycles)) - - -class _Node(object): - """Represent each item in the sort.""" - - def __init__(self, item): - self.item = item - self.dependencies = set() - self.children = [] - self.cycles = None - - def __str__(self): - return self.safestr() + def __hash__(self): + return id(self) - def safestr(self, indent=0): - return (' ' * indent * 2) + \ - str(self.item) + \ - (self.cycles is not None and (" (cycles: " + repr([x for x in self.cycles]) + ")") or "") + \ - "\n" + \ - ''.join(str(n) for n in self.children) - - def __repr__(self): - return str(self.item) - - def all_deps(self): - """Return a set of dependencies for this node and all its cycles.""" - - deps = set(self.dependencies) - if self.cycles is not None: - for c in self.cycles: - deps.update(c.dependencies) - return deps - class _EdgeCollection(object): """A collection of directed edges.""" @@ -103,7 +51,6 @@ class _EdgeCollection(object): parentnode, childnode = edge self.parent_to_children[parentnode].add(childnode) self.child_to_parents[childnode].add(parentnode) - parentnode.dependencies.add(childnode) def remove(self, edge): """Remove an edge from this collection. @@ -156,7 +103,11 @@ class _EdgeCollection(object): def __repr__(self): return repr(list(self)) -def _sort(tuples, allitems, allow_cycles=False, ignore_self_cycles=False): +def sort(tuples, allitems): + """sort the given list of items by dependency. + + 'tuples' is a list of tuples representing a partial ordering. + """ nodes = {} edges = _EdgeCollection() @@ -168,11 +119,6 @@ def _sort(tuples, allitems, allow_cycles=False, ignore_self_cycles=False): for t in tuples: id0, id1 = id(t[0]), id(t[1]) if t[0] is t[1]: - if allow_cycles: - n = nodes[id0] - n.cycles = set([n]) - elif not ignore_self_cycles: - raise CircularDependencyError("Self-referential dependency detected " + repr(t)) continue childnode = nodes[id1] parentnode = nodes[id0] @@ -186,117 +132,12 @@ def _sort(tuples, allitems, allow_cycles=False, ignore_self_cycles=False): output = [] while nodes: if not queue: - # edges remain but no edgeless nodes to remove; this indicates - # a cycle - if allow_cycles: - for cycle in _find_cycles(edges): - lead = cycle[0][0] - lead.cycles = set() - for edge in cycle: - n = edges.remove(edge) - lead.cycles.add(edge[0]) - lead.cycles.add(edge[1]) - if n is not None: - queue.append(n) - for n in lead.cycles: - if n is not lead: - n._cyclical = True - for (n, k) in list(edges.edges_by_parent(n)): - edges.add((lead, k)) - edges.remove((n, k)) - continue - else: - # long cycles not allowed - raise CircularDependencyError("Circular dependency detected " + repr(edges) + repr(queue)) + raise CircularDependencyError("Circular dependency detected " + + repr(edges) + repr(queue)) node = queue.pop() - if not hasattr(node, '_cyclical'): - output.append(node) + output.append(node.item) del nodes[id(node.item)] for childnode in edges.pop_node(node): queue.append(childnode) return output -def _organize_as_tree(nodes): - """Given a list of nodes from a topological sort, organize the - nodes into a tree structure, with as many non-dependent nodes - set as siblings to each other as possible. - - returns nodes as 3-tuples (item, cycles, children). - """ - - if not nodes: - return None - # a list of all currently independent subtrees as a tuple of - # (root_node, set_of_all_tree_nodes, set_of_all_cycle_nodes_in_tree) - # order of the list has no semantics for the algorithmic - independents = [] - # in reverse topological order - for node in reversed(nodes): - # nodes subtree and cycles contain the node itself - subtree = set([node]) - if node.cycles is not None: - cycles = set(node.cycles) - else: - cycles = set() - # get a set of dependent nodes of node and its cycles - nodealldeps = node.all_deps() - if nodealldeps: - # iterate over independent node indexes in reverse order so we can efficiently remove them - for index in xrange(len(independents) - 1, -1, -1): - child, childsubtree, childcycles = independents[index] - # if there is a dependency between this node and an independent node - if (childsubtree.intersection(nodealldeps) or childcycles.intersection(node.dependencies)): - # prepend child to nodes children - # (append should be fine, but previous implemetation used prepend) - node.children[0:0] = [(child.item, [n.item for n in child.cycles or []], child.children)] - # merge childs subtree and cycles - subtree.update(childsubtree) - cycles.update(childcycles) - # remove the child from list of independent subtrees - independents[index:index+1] = [] - # add node as a new independent subtree - independents.append((node, subtree, cycles)) - # choose an arbitrary node from list of all independent subtrees - head = independents.pop()[0] - # add all other independent subtrees as a child of the chosen root - # used prepend [0:0] instead of extend to maintain exact behaviour of previous implementation - head.children[0:0] = [(i[0].item, [n.item for n in i[0].cycles or []], i[0].children) for i in independents] - return (head.item, [n.item for n in head.cycles or []], head.children) - -def _find_cycles(edges): - involved_in_cycles = set() - cycles = {} - def traverse(node, goal=None, cycle=None): - if goal is None: - goal = node - cycle = [] - elif node is goal: - return True - - for (n, key) in edges.edges_by_parent(node): - if key in cycle: - continue - cycle.append(key) - if traverse(key, goal, cycle): - cycset = set(cycle) - for x in cycle: - involved_in_cycles.add(x) - if x in cycles: - existing_set = cycles[x] - [existing_set.add(y) for y in cycset] - for y in existing_set: - cycles[y] = existing_set - cycset = existing_set - else: - cycles[x] = cycset - cycle.pop() - - for parent in edges.get_parents(): - traverse(parent) - - unique_cycles = set(tuple(s) for s in cycles.values()) - - for cycle in unique_cycles: - edgecollection = [edge for edge in edges - if edge[0] in cycle and edge[1] in cycle] - yield edgecollection |
