summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-06-03 17:38:35 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-06-06 13:31:54 -0400
commit3ab2364e78641c4f0e4b6456afc2cbed39b0d0e6 (patch)
treef3dc26609070c1a357a366592c791a3ec0655483 /lib/sqlalchemy/ext
parent14bc09203a8b5b2bc001f764ad7cce6a184975cc (diff)
downloadsqlalchemy-3ab2364e78641c4f0e4b6456afc2cbed39b0d0e6.tar.gz
Convert bulk update/delete to new execution model
This reorganizes the BulkUD model in sqlalchemy.orm.persistence to be based on the CompileState concept and to allow plain update() / delete() to be passed to session.execute() where the ORM synchronize session logic will take place. Also gets "synchronize_session='fetch'" working with horizontal sharding. Adding a few more result.scalar_one() types of methods as scalar_one() seems like what is normally desired. Fixes: #5160 Change-Id: I8001ebdad089da34119eb459709731ba6c0ba975
Diffstat (limited to 'lib/sqlalchemy/ext')
-rw-r--r--lib/sqlalchemy/ext/horizontal_shard.py95
-rw-r--r--lib/sqlalchemy/ext/hybrid.py5
2 files changed, 27 insertions, 73 deletions
diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py
index c3ac71c10..0983807cb 100644
--- a/lib/sqlalchemy/ext/horizontal_shard.py
+++ b/lib/sqlalchemy/ext/horizontal_shard.py
@@ -50,58 +50,6 @@ class ShardedQuery(Query):
"""
return self.execution_options(_sa_shard_id=shard_id)
- def _execute_crud(self, stmt, mapper):
- def exec_for_shard(shard_id):
- conn = self.session.connection(
- mapper=mapper,
- shard_id=shard_id,
- clause=stmt,
- close_with_result=True,
- )
- result = conn._execute_20(
- stmt, self.load_options._params, self._execution_options
- )
- return result
-
- if self._shard_id is not None:
- return exec_for_shard(self._shard_id)
- else:
- rowcount = 0
- results = []
- # TODO: this will have to be the new object
- for shard_id in self.execute_chooser(self):
- result = exec_for_shard(shard_id)
- rowcount += result.rowcount
- results.append(result)
-
- return ShardedResult(results, rowcount)
-
-
-class ShardedResult(object):
- """A value object that represents multiple :class:`_engine.CursorResult`
- objects.
-
- This is used by the :meth:`.ShardedQuery._execute_crud` hook to return
- an object that takes the place of the single :class:`_engine.CursorResult`.
-
- Attribute include ``result_proxies``, which is a sequence of the
- actual :class:`_engine.CursorResult` objects,
- as well as ``aggregate_rowcount``
- or ``rowcount``, which is the sum of all the individual rowcount values.
-
- .. versionadded:: 1.3
- """
-
- __slots__ = ("result_proxies", "aggregate_rowcount")
-
- def __init__(self, result_proxies, aggregate_rowcount):
- self.result_proxies = result_proxies
- self.aggregate_rowcount = aggregate_rowcount
-
- @property
- def rowcount(self):
- return self.aggregate_rowcount
-
class ShardedSession(Session):
def __init__(
@@ -259,37 +207,40 @@ class ShardedSession(Session):
def execute_and_instances(orm_context):
- if orm_context.bind_arguments.get("_horizontal_shard", False):
- return None
-
params = orm_context.parameters
- load_options = orm_context.load_options
+ if orm_context.is_select:
+ load_options = active_options = orm_context.load_options
+ update_options = None
+ if params is None:
+ params = active_options._params
+
+ else:
+ load_options = None
+ update_options = active_options = orm_context.update_delete_options
+
session = orm_context.session
# orm_query = orm_context.orm_query
- if params is None:
- params = load_options._params
-
- def iter_for_shard(shard_id, load_options):
+ def iter_for_shard(shard_id, load_options, update_options):
execution_options = dict(orm_context.local_execution_options)
bind_arguments = dict(orm_context.bind_arguments)
- bind_arguments["_horizontal_shard"] = True
bind_arguments["shard_id"] = shard_id
- load_options += {"_refresh_identity_token": shard_id}
- execution_options["_sa_orm_load_options"] = load_options
+ if orm_context.is_select:
+ load_options += {"_refresh_identity_token": shard_id}
+ execution_options["_sa_orm_load_options"] = load_options
+ else:
+ update_options += {"_refresh_identity_token": shard_id}
+ execution_options["_sa_orm_update_options"] = update_options
- return session.execute(
- orm_context.statement,
- orm_context.parameters,
- execution_options,
- bind_arguments,
+ return orm_context.invoke_statement(
+ bind_arguments=bind_arguments, execution_options=execution_options
)
- if load_options._refresh_identity_token is not None:
- shard_id = load_options._refresh_identity_token
+ if active_options._refresh_identity_token is not None:
+ shard_id = active_options._refresh_identity_token
elif "_sa_shard_id" in orm_context.merged_execution_options:
shard_id = orm_context.merged_execution_options["_sa_shard_id"]
elif "shard_id" in orm_context.bind_arguments:
@@ -298,11 +249,11 @@ def execute_and_instances(orm_context):
shard_id = None
if shard_id is not None:
- return iter_for_shard(shard_id, load_options)
+ return iter_for_shard(shard_id, load_options, update_options)
else:
partial = []
for shard_id in session.execute_chooser(orm_context):
- result_ = iter_for_shard(shard_id, load_options)
+ result_ = iter_for_shard(shard_id, load_options, update_options)
partial.append(result_)
return partial[0].merge(*partial[1:])
diff --git a/lib/sqlalchemy/ext/hybrid.py b/lib/sqlalchemy/ext/hybrid.py
index 9f73b5d31..efd8d7d6b 100644
--- a/lib/sqlalchemy/ext/hybrid.py
+++ b/lib/sqlalchemy/ext/hybrid.py
@@ -777,7 +777,7 @@ things it can be used for.
from .. import util
from ..orm import attributes
from ..orm import interfaces
-
+from ..sql import elements
HYBRID_METHOD = util.symbol("HYBRID_METHOD")
"""Symbol indicating an :class:`InspectionAttr` that's
@@ -1144,6 +1144,9 @@ class ExprComparator(Comparator):
return self.hybrid.info
def _bulk_update_tuples(self, value):
+ if isinstance(value, elements.BindParameter):
+ value = value.value
+
if isinstance(self.expression, attributes.QueryableAttribute):
return self.expression._bulk_update_tuples(value)
elif self.hybrid.update_expr is not None: