diff options
| author | mike bayer <mike_mp@zzzcomputing.com> | 2018-10-04 15:53:51 -0400 |
|---|---|---|
| committer | Gerrit Code Review <gerrit@ci.zzzcomputing.com> | 2018-10-04 15:53:51 -0400 |
| commit | bb65193bffeb106e63dab535a024565d7fc2e26d (patch) | |
| tree | 914ccbc85d4db0ebbe514392a4a08d5231d11d24 /lib/sqlalchemy | |
| parent | 96bb76222e1f612af7bd425ef5b4f208e10a255d (diff) | |
| parent | 3081269e6f1fc51d8d5cfc5120dd10ee2872e871 (diff) | |
| download | sqlalchemy-bb65193bffeb106e63dab535a024565d7fc2e26d.tar.gz | |
Merge "Route bulk update/delete exec through new Query._execute_crud method"
Diffstat (limited to 'lib/sqlalchemy')
| -rw-r--r-- | lib/sqlalchemy/ext/horizontal_shard.py | 45 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/persistence.py | 4 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/query.py | 6 |
3 files changed, 52 insertions, 3 deletions
diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py index 6ef4c5612..425d28963 100644 --- a/lib/sqlalchemy/ext/horizontal_shard.py +++ b/lib/sqlalchemy/ext/horizontal_shard.py @@ -64,6 +64,28 @@ class ShardedQuery(Query): # were done, this is where it would happen return iter(partial) + def _execute_crud(self, stmt, mapper): + def exec_for_shard(shard_id): + conn = self._connection_from_session( + mapper=mapper, + shard_id=shard_id, + clause=stmt, + close_with_result=True) + result = conn.execute(stmt, self._params) + return result + + if self._shard_id is not None: + return exec_for_shard(self._shard_id) + else: + rowcount = 0 + results = [] + for shard_id in self.query_chooser(self): + result = exec_for_shard(shard_id) + rowcount += result.rowcount + results.append(result) + + return ShardedResult(results, rowcount) + def _identity_lookup( self, mapper, primary_key_identity, identity_token=None, lazy_loaded_from=None, **kw): @@ -123,6 +145,29 @@ class ShardedQuery(Query): primary_key_identity, _db_load_fn, identity_token=identity_token) +class ShardedResult(object): + """A value object that represents multiple :class:`.ResultProxy` objects. + + This is used by the :meth:`.ShardedQuery._execute_crud` hook to return + an object that takes the place of the single :class:`.ResultProxy`. + + Attribute include ``result_proxies``, which is a sequence of the + actual :class:`.ResultProxy` objects, as well as ``aggregate_rowcount`` + or ``rowcount``, which is the sum of all the individual rowcount values. + + .. versionadded:: 1.3 + """ + + __slots__ = ('result_proxies', 'aggregate_rowcount',) + + def __init__(self, result_proxies, aggregate_rowcount): + self.result_proxies = result_proxies + self.aggregate_rowcount = aggregate_rowcount + + @property + def rowcount(self): + return self.aggregate_rowcount + class ShardedSession(Session): def __init__(self, shard_chooser, id_chooser, query_chooser, shards=None, query_cls=ShardedQuery, **kwargs): diff --git a/lib/sqlalchemy/orm/persistence.py b/lib/sqlalchemy/orm/persistence.py index 95e26d83c..afa3b50b9 100644 --- a/lib/sqlalchemy/orm/persistence.py +++ b/lib/sqlalchemy/orm/persistence.py @@ -1337,9 +1337,7 @@ class BulkUD(object): self._do_post() def _execute_stmt(self, stmt): - self.result = self.query.session.execute( - stmt, params=self.query._params, - mapper=self.mapper) + self.result = self.query._execute_crud(stmt, self.mapper) self.rowcount = self.result.rowcount @util.dependencies("sqlalchemy.orm.query") diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index 7e7c93527..e96996a39 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -3016,6 +3016,12 @@ class Query(object): result = conn.execute(querycontext.statement, self._params) return loading.instances(querycontext.query, result, querycontext) + def _execute_crud(self, stmt, mapper): + conn = self._connection_from_session( + mapper=mapper, clause=stmt, close_with_result=True) + + return conn.execute(stmt, self._params) + def _get_bind_args(self, querycontext, fn, **kw): return fn( mapper=self._bind_mapper(), |
