summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/sqlalchemy/orm/dependency.py3
-rw-r--r--lib/sqlalchemy/orm/interfaces.py19
-rw-r--r--lib/sqlalchemy/orm/mapper.py81
-rw-r--r--lib/sqlalchemy/orm/properties.py7
-rw-r--r--lib/sqlalchemy/orm/unitofwork.py675
-rw-r--r--lib/sqlalchemy/orm/uowdumper.py101
-rw-r--r--lib/sqlalchemy/topological.py199
7 files changed, 121 insertions, 964 deletions
diff --git a/lib/sqlalchemy/orm/dependency.py b/lib/sqlalchemy/orm/dependency.py
index cbbfb0883..fa3331804 100644
--- a/lib/sqlalchemy/orm/dependency.py
+++ b/lib/sqlalchemy/orm/dependency.py
@@ -176,7 +176,8 @@ class DependencyProcessor(object):
if state is not None and self.post_update:
for x in related:
if x is not None and not self._check_reverse_action(uowcommit, x, state, "postupdate"):
- uowcommit.register_object(state, postupdate=True, post_update_cols=[r for l, r in self.prop.synchronize_pairs])
+ uowcommit.register_object(state, postupdate=True,
+ post_update_cols=[r for l, r in self.prop.synchronize_pairs])
self._performed_action(uowcommit, x, state, "postupdate")
break
diff --git a/lib/sqlalchemy/orm/interfaces.py b/lib/sqlalchemy/orm/interfaces.py
index 579101f0d..412fabc23 100644
--- a/lib/sqlalchemy/orm/interfaces.py
+++ b/lib/sqlalchemy/orm/interfaces.py
@@ -498,24 +498,7 @@ class MapperProperty(object):
"""
pass
- def register_dependencies(self, *args, **kwargs):
- """Called by the ``Mapper`` in response to the UnitOfWork
- calling the ``Mapper``'s register_dependencies operation.
- Establishes a topological dependency between two mappers
- which will affect the order in which mappers persist data.
-
- """
-
- pass
-
- def register_processors(self, *args, **kwargs):
- """Called by the ``Mapper`` in response to the UnitOfWork
- calling the ``Mapper``'s register_processors operation.
- Establishes a processor object between two mappers which
- will link data and state between parent/child objects.
-
- """
-
+ def get_flush_actions(self, uowtransaction, records, state):
pass
def is_primary(self):
diff --git a/lib/sqlalchemy/orm/mapper.py b/lib/sqlalchemy/orm/mapper.py
index 8f0f2128b..18436e211 100644
--- a/lib/sqlalchemy/orm/mapper.py
+++ b/lib/sqlalchemy/orm/mapper.py
@@ -22,7 +22,7 @@ deque = __import__('collections').deque
from sqlalchemy import sql, util, log, exc as sa_exc
from sqlalchemy.sql import expression, visitors, operators, util as sqlutil
-from sqlalchemy.orm import attributes, sync, exc as orm_exc
+from sqlalchemy.orm import attributes, sync, exc as orm_exc, unitofwork
from sqlalchemy.orm.interfaces import (
MapperProperty, EXT_CONTINUE, PropComparator
)
@@ -1245,6 +1245,45 @@ class Mapper(object):
ret[t] = table_to_mapper[t]
return ret
+ @util.memoized_property
+ def _sorted_table_list(self):
+ l = []
+ for mapper in self.base_mapper.polymorphic_iterator():
+ for t in mapper.tables:
+ l.append(t)
+
+ return sqlutil.sort_tables(l)
+
+
+ def get_flush_actions(self, uowtransaction, state):
+ if isdelete:
+ type_ = Delete
+ tables = reversed(mapper._sorted_table_list)
+ elif not _state_has_identity(state):
+ type_ = Insert
+ tables = mapper._sorted_table_list
+ else:
+ type_ = Update
+ tables = mapper._sorted_table_list
+
+ recs = [
+ type_(state, table)
+ for table in tables
+ ]
+ for i, rec in enumerate(recs):
+ if i > 0:
+ self._dependency(recs[i - 1], recs[i])
+ recs.append(SyncKeys(state, recs[i - 1].table, recs[i].table))
+
+ dep_recs = []
+ for prop in mapper._props.values():
+ dp = prop.get_flush_actions(uowtransaction, recs, state)
+ if dp:
+ dep_recs.extend(dp)
+
+ return recs + dep_recs
+
+
def _save_obj(self, states, uowtransaction, postupdate=False,
post_update_cols=None, single=False):
"""Issue ``INSERT`` and/or ``UPDATE`` statements for a list of objects.
@@ -1595,21 +1634,6 @@ class Mapper(object):
if 'after_delete' in mapper.extension:
mapper.extension.after_delete(mapper, connection, state.obj())
- def _register_dependencies(self, uowcommit):
- """Register ``DependencyProcessor`` instances with a
- ``unitofwork.UOWTransaction``.
-
- This call `register_dependencies` on all attached
- ``MapperProperty`` instances.
-
- """
- for dep in self._props.values() + self._dependency_processors:
- dep.register_dependencies(uowcommit)
-
- def _register_processors(self, uowcommit):
- for dep in self._props.values() + self._dependency_processors:
- dep.register_processors(uowcommit)
-
def _instance_processor(self, context, path, adapter,
polymorphic_from=None, extension=None,
only_load_props=None, refresh_state=None,
@@ -1853,6 +1877,31 @@ class Mapper(object):
log.class_logger(Mapper)
+class Insert(unitofwork.Rec):
+ def __init__(self, mapper, state, table):
+ self.mapper = mapper
+ self.state = state
+ self.table = table
+
+class Update(unitofwork.Rec):
+ def __init__(self, mapper, state, table):
+ self.mapper = mapper
+ self.state = state
+ self.table = table
+
+class Delete(unitofwork.Rec):
+ def __init__(self, mapper, state, table):
+ self.mapper = mapper
+ self.state = state
+ self.table = table
+
+class SyncKeys(unitofwork.Rec):
+ def __init__(self, mapper, state, parent, child):
+ self.mapper = mapper
+ self.state = state
+ self.parent = parent
+ self.child = child
+
def reconstructor(fn):
"""Decorate a method as the 'reconstructor' hook.
diff --git a/lib/sqlalchemy/orm/properties.py b/lib/sqlalchemy/orm/properties.py
index 5c43fd355..7de02d3f0 100644
--- a/lib/sqlalchemy/orm/properties.py
+++ b/lib/sqlalchemy/orm/properties.py
@@ -1191,13 +1191,10 @@ class RelationshipProperty(StrategizedProperty):
source_selectable,
dest_selectable, secondary, target_adapter)
- def register_dependencies(self, uowcommit):
+ def get_flush_actions(self, uowtransaction, records, state):
if not self.viewonly:
- self._dependency_processor.register_dependencies(uowcommit)
+ return self._depency_processor.get_flush_actions(uowtransaction, records, state)
- def register_processors(self, uowcommit):
- if not self.viewonly:
- self._dependency_processor.register_processors(uowcommit)
PropertyLoader = RelationProperty = RelationshipProperty
log.class_logger(RelationshipProperty)
diff --git a/lib/sqlalchemy/orm/unitofwork.py b/lib/sqlalchemy/orm/unitofwork.py
index c0a088b01..dc4adff96 100644
--- a/lib/sqlalchemy/orm/unitofwork.py
+++ b/lib/sqlalchemy/orm/unitofwork.py
@@ -23,7 +23,7 @@ changes at once.
from sqlalchemy import util, log, topological
from sqlalchemy.orm import attributes, interfaces
from sqlalchemy.orm import util as mapperutil
-from sqlalchemy.orm.mapper import _state_mapper
+from sqlalchemy.orm.util import _state_mapper
# Load lazily
object_session = None
@@ -40,7 +40,8 @@ class UOWEventHandler(interfaces.AttributeExtension):
self.key = key
def append(self, state, item, initiator):
- # process "save_update" cascade rules for when an instance is appended to the list of another instance
+ # process "save_update" cascade rules for when
+ # an instance is appended to the list of another instance
sess = _state_session(state)
if sess:
prop = _state_mapper(state).get_property(self.key)
@@ -77,29 +78,23 @@ class UOWTransaction(object):
"""Handles the details of organizing and executing transaction
tasks during a UnitOfWork object's flush() operation.
- The central operation is to form a graph of nodes represented by the
- ``UOWTask`` class, which is then traversed by a ``UOWExecutor`` object
- that issues SQL and instance-synchronizing operations via the related
- packages.
"""
def __init__(self, session):
self.session = session
self.mapper_flush_opts = session._mapper_flush_opts
- # stores tuples of mapper/dependent mapper pairs,
- # representing a partial ordering fed into topological sort
- self.dependencies = set()
-
- # dictionary of mappers to UOWTasks
- self.tasks = {}
-
# dictionary used by external actors to store arbitrary state
# information.
self.attributes = {}
- self.processors = set()
-
+ self.recs = []
+ self.states = set()
+ self.dependencies = []
+
+ def _dependency(self, rec1, rec2):
+ self.dependencies.append((rec1, rec2))
+
def get_attribute_history(self, state, key, passive=True):
hashkey = ("history", state, key)
@@ -123,149 +118,32 @@ class UOWTransaction(object):
return history.as_state()
def register_object(self, state, isdelete=False,
- listonly=False, postupdate=False, post_update_cols=None):
+ listonly=False, postupdate=False,
+ post_update_cols=None):
# if object is not in the overall session, do nothing
if not self.session._contains_state(state):
return
-
- mapper = _state_mapper(state)
-
- task = self.get_task_by_mapper(mapper)
- if postupdate:
- task.append_postupdate(state, post_update_cols)
- else:
- task.append(state, listonly=listonly, isdelete=isdelete)
-
- # ensure the mapper for this object has had its
- # DependencyProcessors added.
- if mapper not in self.processors:
- mapper._register_processors(self)
- self.processors.add(mapper)
-
- if mapper.base_mapper not in self.processors:
- mapper.base_mapper._register_processors(self)
- self.processors.add(mapper.base_mapper)
-
- def set_row_switch(self, state):
- """mark a deleted object as a 'row switch'.
-
- this indicates that an INSERT statement elsewhere corresponds to this DELETE;
- the INSERT is converted to an UPDATE and the DELETE does not occur.
- """
- mapper = _state_mapper(state)
- task = self.get_task_by_mapper(mapper)
- taskelement = task._objects[state]
- taskelement.isdelete = "rowswitch"
-
- def is_deleted(self, state):
- """return true if the given state is marked as deleted within this UOWTransaction."""
-
+ if state in self.states:
+ return
+
mapper = _state_mapper(state)
- task = self.get_task_by_mapper(mapper)
- return task.is_deleted(state)
-
- def get_task_by_mapper(self, mapper, dontcreate=False):
- """return UOWTask element corresponding to the given mapper.
-
- Will create a new UOWTask, including a UOWTask corresponding to the
- "base" inherited mapper, if needed, unless the dontcreate flag is True.
- """
- try:
- return self.tasks[mapper]
- except KeyError:
- if dontcreate:
- return None
-
- base_mapper = mapper.base_mapper
- if base_mapper in self.tasks:
- base_task = self.tasks[base_mapper]
- else:
- self.tasks[base_mapper] = base_task = UOWTask(self, base_mapper)
- base_mapper._register_dependencies(self)
-
- if mapper not in self.tasks:
- self.tasks[mapper] = task = UOWTask(self, mapper, base_task=base_task)
- mapper._register_dependencies(self)
- else:
- task = self.tasks[mapper]
-
- return task
-
- def register_dependency(self, mapper, dependency):
- """register a dependency between two mappers.
-
- Called by ``mapper.PropertyLoader`` to register the objects
- handled by one mapper being dependent on the objects handled
- by another.
-
- """
- # correct for primary mapper
- # also convert to the "base mapper", the parentmost task at the top of an inheritance chain
- # dependency sorting is done via non-inheriting mappers only, dependencies between mappers
- # in the same inheritance chain is done at the per-object level
- mapper = mapper.primary_mapper().base_mapper
- dependency = dependency.primary_mapper().base_mapper
-
- self.dependencies.add((mapper, dependency))
-
- def register_processor(self, mapper, processor, mapperfrom):
- """register a dependency processor, corresponding to
- operations which occur between two mappers.
+ self.states.add(state)
- """
- # correct for primary mapper
- mapper = mapper.primary_mapper()
- mapperfrom = mapperfrom.primary_mapper()
-
- task = self.get_task_by_mapper(mapper)
- targettask = self.get_task_by_mapper(mapperfrom)
- up = UOWDependencyProcessor(processor, targettask)
- task.dependencies.add(up)
-
- def execute(self):
- """Execute this UOWTransaction.
-
- This will organize all collected UOWTasks into a dependency-sorted
- list which is then traversed using the traversal scheme
- encoded in the UOWExecutor class. Operations to mappers and dependency
- processors are fired off in order to issue SQL to the database and
- synchronize instance attributes with database values and related
- foreign key values."""
-
- # pre-execute dependency processors. this process may
- # result in new tasks, objects and/or dependency processors being added,
- # particularly with 'delete-orphan' cascade rules.
- # keep running through the full list of tasks until all
- # objects have been processed.
- while True:
- ret = False
- for task in self.tasks.values():
- for up in list(task.dependencies):
- if up.preexecute(self):
- ret = True
- if not ret:
- break
-
- tasks = self._sort_dependencies()
- if self._should_log_info():
- self.logger.info("Task dump:\n%s", self._dump(tasks))
- UOWExecutor().execute(self, tasks)
- self.logger.info("Execute Complete")
-
- def _dump(self, tasks):
- from uowdumper import UOWDumper
- return UOWDumper.dump(tasks)
-
- @property
- def elements(self):
- """Iterate UOWTaskElements."""
+ self.recs.extend(
+ mapper.get_flush_actions(self, state)
+ )
- for task in self.tasks.itervalues():
- for elem in task.elements:
- yield elem
+
+ def execute(self):
+ # so here, thinking we could figure out a way to get
+ # consecutive, "compatible" records to collapse together,
+ # i.e. a bunch of updates become an executemany(), etc.
+ # even though we usually need individual executes.
+ for rec in topological.sort(self.dependencies, self.recs):
+ rec.execute()
def finalize_flush_changes(self):
"""mark processed objects as clean / deleted after a successful flush().
@@ -280,501 +158,10 @@ class UOWTransaction(object):
elif not elem.listonly:
self.session._register_newly_persistent(elem.state)
- def _sort_dependencies(self):
- nodes = topological.sort_with_cycles(self.dependencies,
- [t.mapper for t in self.tasks.itervalues() if t.base_task is t]
- )
-
- ret = []
- for item, cycles in nodes:
- task = self.get_task_by_mapper(item)
- if cycles:
- for t in task._sort_circular_dependencies(
- self,
- [self.get_task_by_mapper(i) for i in cycles]
- ):
- ret.append(t)
- else:
- ret.append(task)
-
- return ret
-
log.class_logger(UOWTransaction)
-class UOWTask(object):
- """A collection of mapped states corresponding to a particular mapper."""
-
- def __init__(self, uowtransaction, mapper, base_task=None):
- self.uowtransaction = uowtransaction
-
- # base_task is the UOWTask which represents the "base mapper"
- # in our mapper's inheritance chain. if the mapper does not
- # inherit from any other mapper, the base_task is self.
- # the _inheriting_tasks dictionary is a dictionary present only
- # on the "base_task"-holding UOWTask, which maps all mappers within
- # an inheritance hierarchy to their corresponding UOWTask instances.
- if base_task is None:
- self.base_task = self
- self._inheriting_tasks = {mapper:self}
- else:
- self.base_task = base_task
- base_task._inheriting_tasks[mapper] = self
-
- # the Mapper which this UOWTask corresponds to
- self.mapper = mapper
-
- # mapping of InstanceState -> UOWTaskElement
- self._objects = {}
-
- self.dependent_tasks = []
- self.dependencies = set()
- self.cyclical_dependencies = set()
-
- @util.memoized_property
- def inheriting_mappers(self):
- return list(self.mapper.polymorphic_iterator())
-
- @property
- def polymorphic_tasks(self):
- """Return an iterator of UOWTask objects corresponding to the
- inheritance sequence of this UOWTask's mapper.
-
- e.g. if mapper B and mapper C inherit from mapper A, and
- mapper D inherits from B:
-
- mapperA -> mapperB -> mapperD
- -> mapperC
-
- the inheritance sequence starting at mapper A is a depth-first
- traversal:
-
- [mapperA, mapperB, mapperD, mapperC]
-
- this method will therefore return
-
- [UOWTask(mapperA), UOWTask(mapperB), UOWTask(mapperD),
- UOWTask(mapperC)]
-
- The concept of "polymporphic iteration" is adapted into
- several property-based iterators which return object
- instances, UOWTaskElements and UOWDependencyProcessors in an
- order corresponding to this sequence of parent UOWTasks. This
- is used to issue operations related to inheritance-chains of
- mappers in the proper order based on dependencies between
- those mappers.
-
- """
- for mapper in self.inheriting_mappers:
- t = self.base_task._inheriting_tasks.get(mapper, None)
- if t is not None:
- yield t
-
- def is_empty(self):
- """return True if this UOWTask is 'empty', meaning it has no child items.
-
- used only for debugging output.
- """
-
- return not self._objects and not self.dependencies
-
- def append(self, state, listonly=False, isdelete=False):
- if state not in self._objects:
- self._objects[state] = rec = UOWTaskElement(state)
- else:
- rec = self._objects[state]
-
- rec.update(listonly, isdelete)
-
- def append_postupdate(self, state, post_update_cols):
- """issue a 'post update' UPDATE statement via this object's mapper immediately.
-
- this operation is used only with relationships that specify the `post_update=True`
- flag.
- """
-
- # postupdates are UPDATED immeditely (for now)
- # convert post_update_cols list to a Set so that __hash__() is used to compare columns
- # instead of __eq__()
- self.mapper._save_obj([state], self.uowtransaction, postupdate=True, post_update_cols=set(post_update_cols))
-
- def __contains__(self, state):
- """return True if the given object is contained within this UOWTask or inheriting tasks."""
-
- for task in self.polymorphic_tasks:
- if state in task._objects:
- return True
- else:
- return False
-
- def is_deleted(self, state):
- """return True if the given object is marked as to be deleted within this UOWTask."""
-
- try:
- return self._objects[state].isdelete
- except KeyError:
- return False
-
- def _polymorphic_collection(fn):
- """return a property that will adapt the collection returned by the
- given callable into a polymorphic traversal."""
-
- @property
- def collection(self):
- for task in self.polymorphic_tasks:
- for rec in fn(task):
- yield rec
- return collection
-
- def _polymorphic_collection_filtered(fn):
-
- def collection(self, mappers):
- for task in self.polymorphic_tasks:
- if task.mapper in mappers:
- for rec in fn(task):
- yield rec
- return collection
-
- @property
- def elements(self):
- return self._objects.values()
-
- @_polymorphic_collection
- def polymorphic_elements(self):
- return self.elements
-
- @_polymorphic_collection_filtered
- def filter_polymorphic_elements(self):
- return self.elements
-
- @property
- def polymorphic_tosave_elements(self):
- return [rec for rec in self.polymorphic_elements if not rec.isdelete]
-
- @property
- def polymorphic_todelete_elements(self):
- return [rec for rec in self.polymorphic_elements if rec.isdelete]
-
- @property
- def polymorphic_tosave_objects(self):
- return [
- rec.state for rec in self.polymorphic_elements
- if rec.state is not None and not rec.listonly and rec.isdelete is False
- ]
-
- @property
- def polymorphic_todelete_objects(self):
- return [
- rec.state for rec in self.polymorphic_elements
- if rec.state is not None and not rec.listonly and rec.isdelete is True
- ]
-
- @_polymorphic_collection
- def polymorphic_dependencies(self):
- return self.dependencies
-
- @_polymorphic_collection
- def polymorphic_cyclical_dependencies(self):
- return self.cyclical_dependencies
-
- def _sort_circular_dependencies(self, trans, cycles):
- """Topologically sort individual entities with row-level dependencies.
-
- Builds a modified UOWTask structure, and is invoked when the
- per-mapper topological structure is found to have cycles.
-
- """
- dependencies = {}
- def set_processor_for_state(state, depprocessor, target_state, isdelete):
- if state not in dependencies:
- dependencies[state] = {}
- tasks = dependencies[state]
- if depprocessor not in tasks:
- tasks[depprocessor] = UOWDependencyProcessor(
- depprocessor.processor,
- UOWTask(self.uowtransaction, depprocessor.targettask.mapper)
- )
- tasks[depprocessor].targettask.append(target_state, isdelete=isdelete)
-
- cycles = set(cycles)
- def dependency_in_cycles(dep):
- proctask = trans.get_task_by_mapper(dep.processor.mapper.base_mapper, True)
- targettask = trans.get_task_by_mapper(dep.targettask.mapper.base_mapper, True)
- return targettask in cycles and (proctask is not None and proctask in cycles)
-
- deps_by_targettask = {}
- extradeplist = []
- for task in cycles:
- for dep in task.polymorphic_dependencies:
- if not dependency_in_cycles(dep):
- extradeplist.append(dep)
- for t in dep.targettask.polymorphic_tasks:
- l = deps_by_targettask.setdefault(t, [])
- l.append(dep)
-
- object_to_original_task = {}
- tuples = []
-
- for task in cycles:
- for subtask in task.polymorphic_tasks:
- for taskelement in subtask.elements:
- state = taskelement.state
- object_to_original_task[state] = subtask
- if subtask not in deps_by_targettask:
- continue
- for dep in deps_by_targettask[subtask]:
- if not dep.processor.has_dependencies or not dependency_in_cycles(dep):
- continue
- (processor, targettask) = (dep.processor, dep.targettask)
- isdelete = taskelement.isdelete
-
- # list of dependent objects from this object
- (added, unchanged, deleted) = dep.get_object_dependencies(state, trans, passive=True)
- if not added and not unchanged and not deleted:
- continue
-
- # the task corresponding to saving/deleting of those dependent objects
- childtask = trans.get_task_by_mapper(processor.mapper)
-
- childlist = added + unchanged + deleted
-
- for o in childlist:
- if o is None:
- continue
-
- if o not in childtask:
- childtask.append(o, listonly=True)
- object_to_original_task[o] = childtask
-
- whosdep = dep.whose_dependent_on_who(state, o)
- if whosdep is not None:
- tuples.append(whosdep)
-
- if whosdep[0] is state:
- set_processor_for_state(whosdep[0], dep, whosdep[0], isdelete=isdelete)
- else:
- set_processor_for_state(whosdep[0], dep, whosdep[1], isdelete=isdelete)
- else:
- # TODO: no test coverage here
- set_processor_for_state(state, dep, state, isdelete=isdelete)
-
- t = UOWTask(self.uowtransaction, self.mapper)
- t.dependencies.update(extradeplist)
-
- used_tasks = set()
-
- # rationale for "tree" sort as opposed to a straight
- # dependency - keep non-dependent objects
- # grouped together, so that insert ordering as determined
- # by session.add() is maintained.
- # An alternative might be to represent the "insert order"
- # as part of the topological sort itself, which would
- # eliminate the need for this step (but may make the original
- # topological sort more expensive)
- head = topological.sort_as_tree(tuples, object_to_original_task.iterkeys())
- if head is not None:
- original_to_tasks = {}
- stack = [(head, t)]
- while stack:
- ((state, cycles, children), parenttask) = stack.pop()
-
- originating_task = object_to_original_task[state]
- used_tasks.add(originating_task)
-
- if (parenttask, originating_task) not in original_to_tasks:
- task = UOWTask(self.uowtransaction, originating_task.mapper)
- original_to_tasks[(parenttask, originating_task)] = task
- parenttask.dependent_tasks.append(task)
- else:
- task = original_to_tasks[(parenttask, originating_task)]
-
- task.append(state, originating_task._objects[state].listonly, isdelete=originating_task._objects[state].isdelete)
-
- if state in dependencies:
- task.cyclical_dependencies.update(dependencies[state].itervalues())
-
- stack += [(n, task) for n in children]
-
- ret = [t]
-
- # add tasks that were in the cycle, but didnt get assembled
- # into the cyclical tree, to the start of the list
- for t2 in cycles:
- if t2 not in used_tasks and t2 is not self:
- localtask = UOWTask(self.uowtransaction, t2.mapper)
- for state in t2.elements:
- localtask.append(state, t2.listonly, isdelete=t2._objects[state].isdelete)
- for dep in t2.dependencies:
- localtask.dependencies.add(dep)
- ret.insert(0, localtask)
-
- return ret
-
- def __repr__(self):
- return ("UOWTask(%s) Mapper: '%r'" % (hex(id(self)), self.mapper))
-
-class UOWTaskElement(object):
- """Corresponds to a single InstanceState to be saved, deleted,
- or otherwise marked as having dependencies. A collection of
- UOWTaskElements are held by a UOWTask.
-
- """
- def __init__(self, state):
- self.state = state
- self.listonly = True
- self.isdelete = False
- self.preprocessed = set()
-
- def update(self, listonly, isdelete):
- if not listonly and self.listonly:
- self.listonly = False
- self.preprocessed.clear()
- if isdelete and not self.isdelete:
- self.isdelete = True
- self.preprocessed.clear()
-
- def __repr__(self):
- return "UOWTaskElement/%d: %s/%d %s" % (
- id(self),
- self.state.class_.__name__,
- id(self.state.obj()),
- (self.listonly and 'listonly' or (self.isdelete and 'delete' or 'save'))
- )
-
-class UOWDependencyProcessor(object):
- """In between the saving and deleting of objects, process
- dependent data, such as filling in a foreign key on a child item
- from a new primary key, or deleting association rows before a
- delete. This object acts as a proxy to a DependencyProcessor.
-
- """
- def __init__(self, processor, targettask):
- self.processor = processor
- self.targettask = targettask
- prop = processor.prop
-
- # define a set of mappers which
- # will filter the lists of entities
- # this UOWDP processes. this allows
- # MapperProperties to be overridden
- # at least for concrete mappers.
- self._mappers = set([
- m
- for m in self.processor.parent.polymorphic_iterator()
- if m._props[prop.key] is prop
- ]).union(self.processor.mapper.polymorphic_iterator())
-
- def __repr__(self):
- return "UOWDependencyProcessor(%s, %s)" % (str(self.processor), str(self.targettask))
-
- def __eq__(self, other):
- return other.processor is self.processor and other.targettask is self.targettask
-
- def __hash__(self):
- return hash((self.processor, self.targettask))
-
- def preexecute(self, trans):
- """preprocess all objects contained within this ``UOWDependencyProcessor``s target task.
-
- This may locate additional objects which should be part of the
- transaction, such as those affected deletes, orphans to be
- deleted, etc.
-
- Once an object is preprocessed, its ``UOWTaskElement`` is marked as processed. If subsequent
- changes occur to the ``UOWTaskElement``, its processed flag is reset, and will require processing
- again.
-
- Return True if any objects were preprocessed, or False if no
- objects were preprocessed. If True is returned, the parent ``UOWTransaction`` will
- ultimately call ``preexecute()`` again on all processors until no new objects are processed.
- """
-
- def getobj(elem):
- elem.preprocessed.add(self)
- return elem.state
-
- ret = False
- elements = [getobj(elem) for elem in
- self.targettask.filter_polymorphic_elements(self._mappers)
- if self not in elem.preprocessed and not elem.isdelete]
- if elements:
- ret = True
- self.processor.preprocess_dependencies(self.targettask, elements, trans, delete=False)
-
- elements = [getobj(elem) for elem in
- self.targettask.filter_polymorphic_elements(self._mappers)
- if self not in elem.preprocessed and elem.isdelete]
- if elements:
- ret = True
- self.processor.preprocess_dependencies(self.targettask, elements, trans, delete=True)
- return ret
-
- def execute(self, trans, delete):
- """process all objects contained within this ``UOWDependencyProcessor``s target task."""
-
-
- elements = [e for e in
- self.targettask.filter_polymorphic_elements(self._mappers)
- if bool(e.isdelete)==delete]
-
- self.processor.process_dependencies(
- self.targettask,
- [elem.state for elem in elements],
- trans,
- delete=delete)
-
- def get_object_dependencies(self, state, trans, passive):
- return trans.get_attribute_history(state, self.processor.key, passive=passive)
-
- def whose_dependent_on_who(self, state1, state2):
- """establish which object is operationally dependent amongst a parent/child
- using the semantics stated by the dependency processor.
-
- This method is used to establish a partial ordering (set of dependency tuples)
- when toplogically sorting on a per-instance basis.
-
- """
- return self.processor.whose_dependent_on_who(state1, state2)
-
-class UOWExecutor(object):
- """Encapsulates the execution traversal of a UOWTransaction structure."""
-
- def execute(self, trans, tasks, isdelete=None):
- if isdelete is not True:
- for task in tasks:
- self.execute_save_steps(trans, task)
- if isdelete is not False:
- for task in reversed(tasks):
- self.execute_delete_steps(trans, task)
-
- def save_objects(self, trans, task):
- task.mapper._save_obj(task.polymorphic_tosave_objects, trans)
-
- def delete_objects(self, trans, task):
- task.mapper._delete_obj(task.polymorphic_todelete_objects, trans)
-
- def execute_dependency(self, trans, dep, isdelete):
- dep.execute(trans, isdelete)
-
- def execute_save_steps(self, trans, task):
- self.save_objects(trans, task)
- for dep in task.polymorphic_cyclical_dependencies:
- self.execute_dependency(trans, dep, False)
- for dep in task.polymorphic_cyclical_dependencies:
- self.execute_dependency(trans, dep, True)
- self.execute_cyclical_dependencies(trans, task, False)
- self.execute_dependencies(trans, task)
-
- def execute_delete_steps(self, trans, task):
- self.execute_cyclical_dependencies(trans, task, True)
- self.delete_objects(trans, task)
-
- def execute_dependencies(self, trans, task):
- polymorphic_dependencies = list(task.polymorphic_dependencies)
- for dep in polymorphic_dependencies:
- self.execute_dependency(trans, dep, False)
- for dep in reversed(polymorphic_dependencies):
- self.execute_dependency(trans, dep, True)
+# TODO: don't know what these should be.
+# its very hard not to use subclasses to define behavior here.
+class Rec(object):
+ pass
- def execute_cyclical_dependencies(self, trans, task, isdelete):
- for t in task.dependent_tasks:
- self.execute(trans, [t], isdelete)
diff --git a/lib/sqlalchemy/orm/uowdumper.py b/lib/sqlalchemy/orm/uowdumper.py
deleted file mode 100644
index dd96b6b9a..000000000
--- a/lib/sqlalchemy/orm/uowdumper.py
+++ /dev/null
@@ -1,101 +0,0 @@
-# orm/uowdumper.py
-# Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Michael Bayer mike_mp@zzzcomputing.com
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-
-"""Dumps out a string representation of a UOWTask structure"""
-
-from sqlalchemy.orm import unitofwork
-from sqlalchemy.orm import util as mapperutil
-import StringIO
-
-class UOWDumper(unitofwork.UOWExecutor):
- def __init__(self, tasks, buf):
- self.indent = 0
- self.tasks = tasks
- self.buf = buf
- self.execute(None, tasks)
-
- @classmethod
- def dump(cls, tasks):
- buf = StringIO.StringIO()
- UOWDumper(tasks, buf)
- return buf.getvalue()
-
- def execute(self, trans, tasks, isdelete=None):
- if isdelete is not True:
- for task in tasks:
- self._execute(trans, task, False)
- if isdelete is not False:
- for task in reversed(tasks):
- self._execute(trans, task, True)
-
- def _execute(self, trans, task, isdelete):
- try:
- i = self._indent()
- if i:
- i = i[:-1] + "+-"
- self.buf.write(i + " " + self._repr_task(task))
- self.buf.write(" (" + (isdelete and "delete " or "save/update ") + "phase) \n")
- self.indent += 1
- super(UOWDumper, self).execute(trans, [task], isdelete)
- finally:
- self.indent -= 1
-
-
- def save_objects(self, trans, task):
- for rec in sorted(task.polymorphic_tosave_elements, key=lambda a: a.state.sort_key):
- if rec.listonly:
- continue
- self.buf.write(self._indent()[:-1] + "+-" + self._repr_task_element(rec) + "\n")
-
- def delete_objects(self, trans, task):
- for rec in task.polymorphic_todelete_elements:
- if rec.listonly:
- continue
- self.buf.write(self._indent() + "- " + self._repr_task_element(rec) + "\n")
-
- def execute_dependency(self, transaction, dep, isdelete):
- self._dump_processor(dep, isdelete)
-
- def _dump_processor(self, proc, deletes):
- if deletes:
- val = proc.targettask.polymorphic_todelete_elements
- else:
- val = proc.targettask.polymorphic_tosave_elements
-
- for v in val:
- self.buf.write(self._indent() + " +- " + self._repr_task_element(v, proc.processor.key, process=True) + "\n")
-
- def _repr_task_element(self, te, attribute=None, process=False):
- if getattr(te, 'state', None) is None:
- objid = "(placeholder)"
- else:
- if attribute is not None:
- objid = "%s.%s" % (mapperutil.state_str(te.state), attribute)
- else:
- objid = mapperutil.state_str(te.state)
- if process:
- return "Process %s" % (objid)
- else:
- return "%s %s" % ((te.isdelete and "Delete" or "Save"), objid)
-
- def _repr_task(self, task):
- if task.mapper is not None:
- if task.mapper.__class__.__name__ == 'Mapper':
- name = task.mapper.class_.__name__ + "/" + task.mapper.local_table.description
- else:
- name = repr(task.mapper)
- else:
- name = '(none)'
- return ("UOWTask(%s, %s)" % (hex(id(task)), name))
-
- def _repr_task_class(self, task):
- if task.mapper is not None and task.mapper.__class__.__name__ == 'Mapper':
- return task.mapper.class_.__name__
- else:
- return '(none)'
-
- def _indent(self):
- return " |" * self.indent
diff --git a/lib/sqlalchemy/topological.py b/lib/sqlalchemy/topological.py
index 76c0c717f..3f2ff6399 100644
--- a/lib/sqlalchemy/topological.py
+++ b/lib/sqlalchemy/topological.py
@@ -23,73 +23,21 @@ from sqlalchemy import util
__all__ = ['sort', 'sort_with_cycles', 'sort_as_tree']
-def sort(tuples, allitems):
- """sort the given list of items by dependency.
-
- 'tuples' is a list of tuples representing a partial ordering.
- """
-
- return [n.item for n in _sort(tuples, allitems, allow_cycles=False, ignore_self_cycles=True)]
-
-def sort_with_cycles(tuples, allitems):
- """sort the given list of items by dependency, cutting out cycles.
-
- returns results as an iterable of 2-tuples, containing the item,
- and a list containing items involved in a cycle with this item, if any.
-
- 'tuples' is a list of tuples representing a partial ordering.
- """
-
- return [(n.item, [n.item for n in n.cycles or []]) for n in _sort(tuples, allitems, allow_cycles=True)]
-
-def sort_as_tree(tuples, allitems, with_cycles=False):
- """sort the given list of items by dependency, and return results
- as a hierarchical tree structure.
-
- returns results as an iterable of 3-tuples, containing the item,
- a list containing items involved in a cycle with this item, if any,
- and a list of child tuples.
+# TODO: obviate the need for a _Node class.
+# a straight tuple should be used.
+class _Node(tuple):
+ """Represent each item in the sort."""
- if with_cycles is False, the returned structure is of the same form
- but the second element of each tuple, i.e. the 'cycles', is an empty list.
+ def __new__(cls, item):
+ children = []
+ t = tuple.__new__(cls, [item, children])
+ t.item = item
+ t.children = children
+ return t
- 'tuples' is a list of tuples representing a partial ordering.
- """
-
- return _organize_as_tree(_sort(tuples, allitems, allow_cycles=with_cycles))
-
-
-class _Node(object):
- """Represent each item in the sort."""
-
- def __init__(self, item):
- self.item = item
- self.dependencies = set()
- self.children = []
- self.cycles = None
-
- def __str__(self):
- return self.safestr()
+ def __hash__(self):
+ return id(self)
- def safestr(self, indent=0):
- return (' ' * indent * 2) + \
- str(self.item) + \
- (self.cycles is not None and (" (cycles: " + repr([x for x in self.cycles]) + ")") or "") + \
- "\n" + \
- ''.join(str(n) for n in self.children)
-
- def __repr__(self):
- return str(self.item)
-
- def all_deps(self):
- """Return a set of dependencies for this node and all its cycles."""
-
- deps = set(self.dependencies)
- if self.cycles is not None:
- for c in self.cycles:
- deps.update(c.dependencies)
- return deps
-
class _EdgeCollection(object):
"""A collection of directed edges."""
@@ -103,7 +51,6 @@ class _EdgeCollection(object):
parentnode, childnode = edge
self.parent_to_children[parentnode].add(childnode)
self.child_to_parents[childnode].add(parentnode)
- parentnode.dependencies.add(childnode)
def remove(self, edge):
"""Remove an edge from this collection.
@@ -156,7 +103,11 @@ class _EdgeCollection(object):
def __repr__(self):
return repr(list(self))
-def _sort(tuples, allitems, allow_cycles=False, ignore_self_cycles=False):
+def sort(tuples, allitems):
+ """sort the given list of items by dependency.
+
+ 'tuples' is a list of tuples representing a partial ordering.
+ """
nodes = {}
edges = _EdgeCollection()
@@ -168,11 +119,6 @@ def _sort(tuples, allitems, allow_cycles=False, ignore_self_cycles=False):
for t in tuples:
id0, id1 = id(t[0]), id(t[1])
if t[0] is t[1]:
- if allow_cycles:
- n = nodes[id0]
- n.cycles = set([n])
- elif not ignore_self_cycles:
- raise CircularDependencyError("Self-referential dependency detected " + repr(t))
continue
childnode = nodes[id1]
parentnode = nodes[id0]
@@ -186,117 +132,12 @@ def _sort(tuples, allitems, allow_cycles=False, ignore_self_cycles=False):
output = []
while nodes:
if not queue:
- # edges remain but no edgeless nodes to remove; this indicates
- # a cycle
- if allow_cycles:
- for cycle in _find_cycles(edges):
- lead = cycle[0][0]
- lead.cycles = set()
- for edge in cycle:
- n = edges.remove(edge)
- lead.cycles.add(edge[0])
- lead.cycles.add(edge[1])
- if n is not None:
- queue.append(n)
- for n in lead.cycles:
- if n is not lead:
- n._cyclical = True
- for (n, k) in list(edges.edges_by_parent(n)):
- edges.add((lead, k))
- edges.remove((n, k))
- continue
- else:
- # long cycles not allowed
- raise CircularDependencyError("Circular dependency detected " + repr(edges) + repr(queue))
+ raise CircularDependencyError("Circular dependency detected " +
+ repr(edges) + repr(queue))
node = queue.pop()
- if not hasattr(node, '_cyclical'):
- output.append(node)
+ output.append(node.item)
del nodes[id(node.item)]
for childnode in edges.pop_node(node):
queue.append(childnode)
return output
-def _organize_as_tree(nodes):
- """Given a list of nodes from a topological sort, organize the
- nodes into a tree structure, with as many non-dependent nodes
- set as siblings to each other as possible.
-
- returns nodes as 3-tuples (item, cycles, children).
- """
-
- if not nodes:
- return None
- # a list of all currently independent subtrees as a tuple of
- # (root_node, set_of_all_tree_nodes, set_of_all_cycle_nodes_in_tree)
- # order of the list has no semantics for the algorithmic
- independents = []
- # in reverse topological order
- for node in reversed(nodes):
- # nodes subtree and cycles contain the node itself
- subtree = set([node])
- if node.cycles is not None:
- cycles = set(node.cycles)
- else:
- cycles = set()
- # get a set of dependent nodes of node and its cycles
- nodealldeps = node.all_deps()
- if nodealldeps:
- # iterate over independent node indexes in reverse order so we can efficiently remove them
- for index in xrange(len(independents) - 1, -1, -1):
- child, childsubtree, childcycles = independents[index]
- # if there is a dependency between this node and an independent node
- if (childsubtree.intersection(nodealldeps) or childcycles.intersection(node.dependencies)):
- # prepend child to nodes children
- # (append should be fine, but previous implemetation used prepend)
- node.children[0:0] = [(child.item, [n.item for n in child.cycles or []], child.children)]
- # merge childs subtree and cycles
- subtree.update(childsubtree)
- cycles.update(childcycles)
- # remove the child from list of independent subtrees
- independents[index:index+1] = []
- # add node as a new independent subtree
- independents.append((node, subtree, cycles))
- # choose an arbitrary node from list of all independent subtrees
- head = independents.pop()[0]
- # add all other independent subtrees as a child of the chosen root
- # used prepend [0:0] instead of extend to maintain exact behaviour of previous implementation
- head.children[0:0] = [(i[0].item, [n.item for n in i[0].cycles or []], i[0].children) for i in independents]
- return (head.item, [n.item for n in head.cycles or []], head.children)
-
-def _find_cycles(edges):
- involved_in_cycles = set()
- cycles = {}
- def traverse(node, goal=None, cycle=None):
- if goal is None:
- goal = node
- cycle = []
- elif node is goal:
- return True
-
- for (n, key) in edges.edges_by_parent(node):
- if key in cycle:
- continue
- cycle.append(key)
- if traverse(key, goal, cycle):
- cycset = set(cycle)
- for x in cycle:
- involved_in_cycles.add(x)
- if x in cycles:
- existing_set = cycles[x]
- [existing_set.add(y) for y in cycset]
- for y in existing_set:
- cycles[y] = existing_set
- cycset = existing_set
- else:
- cycles[x] = cycset
- cycle.pop()
-
- for parent in edges.get_parents():
- traverse(parent)
-
- unique_cycles = set(tuple(s) for s in cycles.values())
-
- for cycle in unique_cycles:
- edgecollection = [edge for edge in edges
- if edge[0] in cycle and edge[1] in cycle]
- yield edgecollection