diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-06-03 17:38:35 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-06-06 13:31:54 -0400 |
| commit | 3ab2364e78641c4f0e4b6456afc2cbed39b0d0e6 (patch) | |
| tree | f3dc26609070c1a357a366592c791a3ec0655483 /lib/sqlalchemy/ext | |
| parent | 14bc09203a8b5b2bc001f764ad7cce6a184975cc (diff) | |
| download | sqlalchemy-3ab2364e78641c4f0e4b6456afc2cbed39b0d0e6.tar.gz | |
Convert bulk update/delete to new execution model
This reorganizes the BulkUD model in sqlalchemy.orm.persistence
to be based on the CompileState concept and to allow plain
update() / delete() to be passed to session.execute() where
the ORM synchronize session logic will take place.
Also gets "synchronize_session='fetch'" working with horizontal
sharding.
Adding a few more result.scalar_one() types of methods
as scalar_one() seems like what is normally desired.
Fixes: #5160
Change-Id: I8001ebdad089da34119eb459709731ba6c0ba975
Diffstat (limited to 'lib/sqlalchemy/ext')
| -rw-r--r-- | lib/sqlalchemy/ext/horizontal_shard.py | 95 | ||||
| -rw-r--r-- | lib/sqlalchemy/ext/hybrid.py | 5 |
2 files changed, 27 insertions, 73 deletions
diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py index c3ac71c10..0983807cb 100644 --- a/lib/sqlalchemy/ext/horizontal_shard.py +++ b/lib/sqlalchemy/ext/horizontal_shard.py @@ -50,58 +50,6 @@ class ShardedQuery(Query): """ return self.execution_options(_sa_shard_id=shard_id) - def _execute_crud(self, stmt, mapper): - def exec_for_shard(shard_id): - conn = self.session.connection( - mapper=mapper, - shard_id=shard_id, - clause=stmt, - close_with_result=True, - ) - result = conn._execute_20( - stmt, self.load_options._params, self._execution_options - ) - return result - - if self._shard_id is not None: - return exec_for_shard(self._shard_id) - else: - rowcount = 0 - results = [] - # TODO: this will have to be the new object - for shard_id in self.execute_chooser(self): - result = exec_for_shard(shard_id) - rowcount += result.rowcount - results.append(result) - - return ShardedResult(results, rowcount) - - -class ShardedResult(object): - """A value object that represents multiple :class:`_engine.CursorResult` - objects. - - This is used by the :meth:`.ShardedQuery._execute_crud` hook to return - an object that takes the place of the single :class:`_engine.CursorResult`. - - Attribute include ``result_proxies``, which is a sequence of the - actual :class:`_engine.CursorResult` objects, - as well as ``aggregate_rowcount`` - or ``rowcount``, which is the sum of all the individual rowcount values. - - .. versionadded:: 1.3 - """ - - __slots__ = ("result_proxies", "aggregate_rowcount") - - def __init__(self, result_proxies, aggregate_rowcount): - self.result_proxies = result_proxies - self.aggregate_rowcount = aggregate_rowcount - - @property - def rowcount(self): - return self.aggregate_rowcount - class ShardedSession(Session): def __init__( @@ -259,37 +207,40 @@ class ShardedSession(Session): def execute_and_instances(orm_context): - if orm_context.bind_arguments.get("_horizontal_shard", False): - return None - params = orm_context.parameters - load_options = orm_context.load_options + if orm_context.is_select: + load_options = active_options = orm_context.load_options + update_options = None + if params is None: + params = active_options._params + + else: + load_options = None + update_options = active_options = orm_context.update_delete_options + session = orm_context.session # orm_query = orm_context.orm_query - if params is None: - params = load_options._params - - def iter_for_shard(shard_id, load_options): + def iter_for_shard(shard_id, load_options, update_options): execution_options = dict(orm_context.local_execution_options) bind_arguments = dict(orm_context.bind_arguments) - bind_arguments["_horizontal_shard"] = True bind_arguments["shard_id"] = shard_id - load_options += {"_refresh_identity_token": shard_id} - execution_options["_sa_orm_load_options"] = load_options + if orm_context.is_select: + load_options += {"_refresh_identity_token": shard_id} + execution_options["_sa_orm_load_options"] = load_options + else: + update_options += {"_refresh_identity_token": shard_id} + execution_options["_sa_orm_update_options"] = update_options - return session.execute( - orm_context.statement, - orm_context.parameters, - execution_options, - bind_arguments, + return orm_context.invoke_statement( + bind_arguments=bind_arguments, execution_options=execution_options ) - if load_options._refresh_identity_token is not None: - shard_id = load_options._refresh_identity_token + if active_options._refresh_identity_token is not None: + shard_id = active_options._refresh_identity_token elif "_sa_shard_id" in orm_context.merged_execution_options: shard_id = orm_context.merged_execution_options["_sa_shard_id"] elif "shard_id" in orm_context.bind_arguments: @@ -298,11 +249,11 @@ def execute_and_instances(orm_context): shard_id = None if shard_id is not None: - return iter_for_shard(shard_id, load_options) + return iter_for_shard(shard_id, load_options, update_options) else: partial = [] for shard_id in session.execute_chooser(orm_context): - result_ = iter_for_shard(shard_id, load_options) + result_ = iter_for_shard(shard_id, load_options, update_options) partial.append(result_) return partial[0].merge(*partial[1:]) diff --git a/lib/sqlalchemy/ext/hybrid.py b/lib/sqlalchemy/ext/hybrid.py index 9f73b5d31..efd8d7d6b 100644 --- a/lib/sqlalchemy/ext/hybrid.py +++ b/lib/sqlalchemy/ext/hybrid.py @@ -777,7 +777,7 @@ things it can be used for. from .. import util from ..orm import attributes from ..orm import interfaces - +from ..sql import elements HYBRID_METHOD = util.symbol("HYBRID_METHOD") """Symbol indicating an :class:`InspectionAttr` that's @@ -1144,6 +1144,9 @@ class ExprComparator(Comparator): return self.hybrid.info def _bulk_update_tuples(self, value): + if isinstance(value, elements.BindParameter): + value = value.value + if isinstance(self.expression, attributes.QueryableAttribute): return self.expression._bulk_update_tuples(value) elif self.hybrid.update_expr is not None: |
