summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy
diff options
context:
space:
mode:
authormike bayer <mike_mp@zzzcomputing.com>2018-10-04 15:53:51 -0400
committerGerrit Code Review <gerrit@ci.zzzcomputing.com>2018-10-04 15:53:51 -0400
commitbb65193bffeb106e63dab535a024565d7fc2e26d (patch)
tree914ccbc85d4db0ebbe514392a4a08d5231d11d24 /lib/sqlalchemy
parent96bb76222e1f612af7bd425ef5b4f208e10a255d (diff)
parent3081269e6f1fc51d8d5cfc5120dd10ee2872e871 (diff)
downloadsqlalchemy-bb65193bffeb106e63dab535a024565d7fc2e26d.tar.gz
Merge "Route bulk update/delete exec through new Query._execute_crud method"
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r--lib/sqlalchemy/ext/horizontal_shard.py45
-rw-r--r--lib/sqlalchemy/orm/persistence.py4
-rw-r--r--lib/sqlalchemy/orm/query.py6
3 files changed, 52 insertions, 3 deletions
diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py
index 6ef4c5612..425d28963 100644
--- a/lib/sqlalchemy/ext/horizontal_shard.py
+++ b/lib/sqlalchemy/ext/horizontal_shard.py
@@ -64,6 +64,28 @@ class ShardedQuery(Query):
# were done, this is where it would happen
return iter(partial)
+ def _execute_crud(self, stmt, mapper):
+ def exec_for_shard(shard_id):
+ conn = self._connection_from_session(
+ mapper=mapper,
+ shard_id=shard_id,
+ clause=stmt,
+ close_with_result=True)
+ result = conn.execute(stmt, self._params)
+ return result
+
+ if self._shard_id is not None:
+ return exec_for_shard(self._shard_id)
+ else:
+ rowcount = 0
+ results = []
+ for shard_id in self.query_chooser(self):
+ result = exec_for_shard(shard_id)
+ rowcount += result.rowcount
+ results.append(result)
+
+ return ShardedResult(results, rowcount)
+
def _identity_lookup(
self, mapper, primary_key_identity, identity_token=None,
lazy_loaded_from=None, **kw):
@@ -123,6 +145,29 @@ class ShardedQuery(Query):
primary_key_identity, _db_load_fn, identity_token=identity_token)
+class ShardedResult(object):
+ """A value object that represents multiple :class:`.ResultProxy` objects.
+
+ This is used by the :meth:`.ShardedQuery._execute_crud` hook to return
+ an object that takes the place of the single :class:`.ResultProxy`.
+
+ Attribute include ``result_proxies``, which is a sequence of the
+ actual :class:`.ResultProxy` objects, as well as ``aggregate_rowcount``
+ or ``rowcount``, which is the sum of all the individual rowcount values.
+
+ .. versionadded:: 1.3
+ """
+
+ __slots__ = ('result_proxies', 'aggregate_rowcount',)
+
+ def __init__(self, result_proxies, aggregate_rowcount):
+ self.result_proxies = result_proxies
+ self.aggregate_rowcount = aggregate_rowcount
+
+ @property
+ def rowcount(self):
+ return self.aggregate_rowcount
+
class ShardedSession(Session):
def __init__(self, shard_chooser, id_chooser, query_chooser, shards=None,
query_cls=ShardedQuery, **kwargs):
diff --git a/lib/sqlalchemy/orm/persistence.py b/lib/sqlalchemy/orm/persistence.py
index 95e26d83c..afa3b50b9 100644
--- a/lib/sqlalchemy/orm/persistence.py
+++ b/lib/sqlalchemy/orm/persistence.py
@@ -1337,9 +1337,7 @@ class BulkUD(object):
self._do_post()
def _execute_stmt(self, stmt):
- self.result = self.query.session.execute(
- stmt, params=self.query._params,
- mapper=self.mapper)
+ self.result = self.query._execute_crud(stmt, self.mapper)
self.rowcount = self.result.rowcount
@util.dependencies("sqlalchemy.orm.query")
diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py
index 7e7c93527..e96996a39 100644
--- a/lib/sqlalchemy/orm/query.py
+++ b/lib/sqlalchemy/orm/query.py
@@ -3016,6 +3016,12 @@ class Query(object):
result = conn.execute(querycontext.statement, self._params)
return loading.instances(querycontext.query, result, querycontext)
+ def _execute_crud(self, stmt, mapper):
+ conn = self._connection_from_session(
+ mapper=mapper, clause=stmt, close_with_result=True)
+
+ return conn.execute(stmt, self._params)
+
def _get_bind_args(self, querycontext, fn, **kw):
return fn(
mapper=self._bind_mapper(),