summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2017-12-14 10:20:50 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2017-12-22 11:36:53 -0500
commit50d9f1687a6e0c3ce9b62fe98b76b25af7b20c11 (patch)
treefec4931bac89e4706b45aa3ffdc82db30cfc9425 /lib/sqlalchemy/ext
parent04937652f426e518781b2b762b0e0e8090857cbd (diff)
downloadsqlalchemy-50d9f1687a6e0c3ce9b62fe98b76b25af7b20c11.tar.gz
Add an identity_token to the identity key
For the purposes of assisting with sharded setups, add a new member to the identity key that can be customized. this allows sharding across databases where the primary key space is shared. Change-Id: Iae3909f5d4c501b62c10d0371fbceb01abda51db Fixes: #4137
Diffstat (limited to 'lib/sqlalchemy/ext')
-rw-r--r--lib/sqlalchemy/ext/horizontal_shard.py34
1 files changed, 30 insertions, 4 deletions
diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py
index 8902ae606..c5cf98b40 100644
--- a/lib/sqlalchemy/ext/horizontal_shard.py
+++ b/lib/sqlalchemy/ext/horizontal_shard.py
@@ -15,6 +15,7 @@ the source distribution.
"""
+from .. import inspect
from .. import util
from ..orm.session import Session
from ..orm.query import Query
@@ -42,7 +43,7 @@ class ShardedQuery(Query):
def _execute_and_instances(self, context):
def iter_for_shard(shard_id):
- context.attributes['shard_id'] = shard_id
+ context.attributes['shard_id'] = context.identity_token = shard_id
result = self._connection_from_session(
mapper=self._mapper_zero(),
shard_id=shard_id).execute(
@@ -62,6 +63,9 @@ class ShardedQuery(Query):
return iter(partial)
def _get_impl(self, ident, fallback_fn):
+ # TODO: the "ident" here should be getting the identity token
+ # which indicates that this area can likely be simplified, as the
+ # token will fall through into _execute_and_instances
def _fallback(query, ident):
if self._shard_id is not None:
return fallback_fn(self, ident)
@@ -75,7 +79,13 @@ class ShardedQuery(Query):
else:
return None
- return super(ShardedQuery, self)._get_impl(ident, _fallback)
+ if self._shard_id is not None:
+ identity_token = self._shard_id
+ else:
+ identity_token = None
+
+ return super(ShardedQuery, self)._get_impl(
+ ident, _fallback, identity_token=identity_token)
class ShardedSession(Session):
@@ -112,9 +122,24 @@ class ShardedSession(Session):
for k in shards:
self.bind_shard(k, shards[k])
+ def _choose_shard_and_assign(self, mapper, instance, **kw):
+ if instance is not None:
+ state = inspect(instance)
+ if state.key:
+ token = state.key[2]
+ assert token is not None
+ return token
+ elif state.identity_token:
+ return state.identity_token
+
+ shard_id = self.shard_chooser(mapper, instance, **kw)
+ if instance is not None:
+ state.identity_token = shard_id
+ return shard_id
+
def connection(self, mapper=None, instance=None, shard_id=None, **kwargs):
if shard_id is None:
- shard_id = self.shard_chooser(mapper, instance)
+ shard_id = self._choose_shard_and_assign(mapper, instance)
if self.transaction is not None:
return self.transaction.connection(mapper, shard_id=shard_id)
@@ -128,7 +153,8 @@ class ShardedSession(Session):
def get_bind(self, mapper, shard_id=None,
instance=None, clause=None, **kw):
if shard_id is None:
- shard_id = self.shard_chooser(mapper, instance, clause=clause)
+ shard_id = self._choose_shard_and_assign(
+ mapper, instance, clause=clause)
return self.__binds[shard_id]
def bind_shard(self, shard_id, bind):