diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2017-12-14 10:20:50 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2017-12-22 11:36:53 -0500 |
| commit | 50d9f1687a6e0c3ce9b62fe98b76b25af7b20c11 (patch) | |
| tree | fec4931bac89e4706b45aa3ffdc82db30cfc9425 /lib/sqlalchemy/ext | |
| parent | 04937652f426e518781b2b762b0e0e8090857cbd (diff) | |
| download | sqlalchemy-50d9f1687a6e0c3ce9b62fe98b76b25af7b20c11.tar.gz | |
Add an identity_token to the identity key
For the purposes of assisting with sharded setups, add a new
member to the identity key that can be customized. this allows
sharding across databases where the primary key space is shared.
Change-Id: Iae3909f5d4c501b62c10d0371fbceb01abda51db
Fixes: #4137
Diffstat (limited to 'lib/sqlalchemy/ext')
| -rw-r--r-- | lib/sqlalchemy/ext/horizontal_shard.py | 34 |
1 files changed, 30 insertions, 4 deletions
diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py index 8902ae606..c5cf98b40 100644 --- a/lib/sqlalchemy/ext/horizontal_shard.py +++ b/lib/sqlalchemy/ext/horizontal_shard.py @@ -15,6 +15,7 @@ the source distribution. """ +from .. import inspect from .. import util from ..orm.session import Session from ..orm.query import Query @@ -42,7 +43,7 @@ class ShardedQuery(Query): def _execute_and_instances(self, context): def iter_for_shard(shard_id): - context.attributes['shard_id'] = shard_id + context.attributes['shard_id'] = context.identity_token = shard_id result = self._connection_from_session( mapper=self._mapper_zero(), shard_id=shard_id).execute( @@ -62,6 +63,9 @@ class ShardedQuery(Query): return iter(partial) def _get_impl(self, ident, fallback_fn): + # TODO: the "ident" here should be getting the identity token + # which indicates that this area can likely be simplified, as the + # token will fall through into _execute_and_instances def _fallback(query, ident): if self._shard_id is not None: return fallback_fn(self, ident) @@ -75,7 +79,13 @@ class ShardedQuery(Query): else: return None - return super(ShardedQuery, self)._get_impl(ident, _fallback) + if self._shard_id is not None: + identity_token = self._shard_id + else: + identity_token = None + + return super(ShardedQuery, self)._get_impl( + ident, _fallback, identity_token=identity_token) class ShardedSession(Session): @@ -112,9 +122,24 @@ class ShardedSession(Session): for k in shards: self.bind_shard(k, shards[k]) + def _choose_shard_and_assign(self, mapper, instance, **kw): + if instance is not None: + state = inspect(instance) + if state.key: + token = state.key[2] + assert token is not None + return token + elif state.identity_token: + return state.identity_token + + shard_id = self.shard_chooser(mapper, instance, **kw) + if instance is not None: + state.identity_token = shard_id + return shard_id + def connection(self, mapper=None, instance=None, shard_id=None, **kwargs): if shard_id is None: - shard_id = self.shard_chooser(mapper, instance) + shard_id = self._choose_shard_and_assign(mapper, instance) if self.transaction is not None: return self.transaction.connection(mapper, shard_id=shard_id) @@ -128,7 +153,8 @@ class ShardedSession(Session): def get_bind(self, mapper, shard_id=None, instance=None, clause=None, **kw): if shard_id is None: - shard_id = self.shard_chooser(mapper, instance, clause=clause) + shard_id = self._choose_shard_and_assign( + mapper, instance, clause=clause) return self.__binds[shard_id] def bind_shard(self, shard_id, bind): |
