diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2023-01-24 11:05:12 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2023-01-25 19:42:14 -0500 |
| commit | 8a32f367175871500723c5ebfc0f1af1564d3478 (patch) | |
| tree | 90ab26593282bf30a6ac0a3b002493ebb8cb8e4f /lib/sqlalchemy/ext | |
| parent | d426d3bbad1d3e4a0b80e83c4423dea055609c15 (diff) | |
| download | sqlalchemy-8a32f367175871500723c5ebfc0f1af1564d3478.tar.gz | |
add set_shard_id() loader option for horizontal shard
Added new option to horizontal sharding API
:class:`_horizontal.set_shard_id` which sets the effective shard identifier
to query against, for both the primary query as well as for all secondary
loaders including relationship eager loaders as well as relationship and
column lazy loaders.
Modernize sharding examples with new-style mappings, add new asyncio example.
Fixes: #7226
Fixes: #7028
Change-Id: Ie69248060c305e8de04f75a529949777944ad511
Diffstat (limited to 'lib/sqlalchemy/ext')
| -rw-r--r-- | lib/sqlalchemy/ext/horizontal_shard.py | 104 |
1 files changed, 81 insertions, 23 deletions
diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py index df9d0d797..0bcc5628f 100644 --- a/lib/sqlalchemy/ext/horizontal_shard.py +++ b/lib/sqlalchemy/ext/horizontal_shard.py @@ -42,6 +42,7 @@ from .. import inspect from .. import util from ..orm import PassiveFlag from ..orm._typing import OrmExecuteOptionsParameter +from ..orm.interfaces import ORMOption from ..orm.mapper import Mapper from ..orm.query import Query from ..orm.session import _BindArguments @@ -73,7 +74,7 @@ _T = TypeVar("_T", bound=Any) SelfShardedQuery = TypeVar("SelfShardedQuery", bound="ShardedQuery[Any]") -_ShardKey = str +ShardIdentifier = str class ShardChooser(Protocol): @@ -105,8 +106,7 @@ class ShardedQuery(Query[_T]): .. legacy:: The :class:`.ShardedQuery` is a subclass of the legacy :class:`.Query` class. The :class:`.ShardedSession` now supports - 2.0 style execution via the :meth:`.ShardedSession.execute` method - as well. + 2.0 style execution via the :meth:`.ShardedSession.execute` method. """ @@ -118,7 +118,9 @@ class ShardedQuery(Query[_T]): self.execute_chooser = self.session.execute_chooser self._shard_id = None - def set_shard(self: SelfShardedQuery, shard_id: str) -> SelfShardedQuery: + def set_shard( + self: SelfShardedQuery, shard_id: ShardIdentifier + ) -> SelfShardedQuery: """Return a new query, limited to a single shard ID. All subsequent operations with the returned query will @@ -166,14 +168,12 @@ class ShardedSession(Session): should set whatever state on the instance to mark it in the future as participating in that shard. - :param id_chooser: A callable, passed a :class:`.ShardedQuery` and a - tuple of identity values, which should return a list of shard ids - where the ID might reside. The databases will be queried in the order - of this listing. + :param identity_chooser: A callable, passed a Mapper and primary key + argument, which should return a list of shard ids where this + primary key might reside. - .. legacy:: This parameter still uses the legacy - :class:`.ShardedQuery` class as an argument passed to the - callable. + .. versionchanged:: 2.0 The ``identity_chooser`` parameter + supersedes the ``id_chooser`` parameter. :param execute_chooser: For a given :class:`.ORMExecuteState`, returns the list of shard_ids @@ -250,7 +250,7 @@ class ShardedSession(Session): "execute_chooser or query_chooser is required" ) self.execute_chooser = execute_chooser - self.__shards: Dict[_ShardKey, _SessionBind] = {} + self.__shards: Dict[ShardIdentifier, _SessionBind] = {} if shards is not None: for k in shards: self.bind_shard(k, shards[k]) @@ -329,7 +329,7 @@ class ShardedSession(Session): self, mapper: Optional[Mapper[_T]] = None, instance: Optional[Any] = None, - shard_id: Optional[Any] = None, + shard_id: Optional[ShardIdentifier] = None, **kw: Any, ) -> Connection: """Provide a :class:`_engine.Connection` to use in the unit of work @@ -359,7 +359,7 @@ class ShardedSession(Session): self, mapper: Optional[_EntityBindKey[_O]] = None, *, - shard_id: Optional[_ShardKey] = None, + shard_id: Optional[ShardIdentifier] = None, instance: Optional[Any] = None, clause: Optional[ClauseElement] = None, **kw: Any, @@ -372,11 +372,61 @@ class ShardedSession(Session): return self.__shards[shard_id] def bind_shard( - self, shard_id: _ShardKey, bind: Union[Engine, OptionEngine] + self, shard_id: ShardIdentifier, bind: Union[Engine, OptionEngine] ) -> None: self.__shards[shard_id] = bind +class set_shard_id(ORMOption): + """a loader option for statements to apply a specific shard id to the + primary query as well as for additional relationship and column + loaders. + + The :class:`_horizontal.set_shard_id` option may be applied using + the :meth:`_sql.Executable.options` method of any executable statement:: + + stmt = ( + select(MyObject). + where(MyObject.name == 'some name'). + options(set_shard_id("shard1")) + ) + + Above, the statement when invoked will limit to the "shard1" shard + identifier for the primary query as well as for all relationship and + column loading strategies, including eager loaders such as + :func:`_orm.selectinload`, deferred column loaders like :func:`_orm.defer`, + and the lazy relationship loader :func:`_orm.lazyload`. + + In this way, the :class:`_horizontal.set_shard_id` option has much wider + scope than using the "shard_id" argument within the + :paramref:`_orm.Session.execute.bind_arguments` dictionary. + + + .. versionadded:: 2.0.0 + + """ + + __slots__ = ("shard_id", "propagate_to_loaders") + + def __init__( + self, shard_id: ShardIdentifier, propagate_to_loaders: bool = True + ): + """Construct a :class:`_horizontal.set_shard_id` option. + + :param shard_id: shard identifier + :param propagate_to_loaders: if left at its default of ``True``, the + shard option will take place for lazy loaders such as + :func:`_orm.lazyload` and :func:`_orm.defer`; if False, the option + will not be propagated to loaded objects. Note that :func:`_orm.defer` + always limits to the shard_id of the parent row in any case, so the + parameter only has a net effect on the behavior of the + :func:`_orm.lazyload` strategy. + + """ + self.shard_id = shard_id + self.propagate_to_loaders = propagate_to_loaders + + def execute_and_instances( orm_context: ORMExecuteState, ) -> Union[Result[_T], IteratorResult[_TP]]: @@ -400,7 +450,7 @@ def execute_and_instances( assert isinstance(session, ShardedSession) def iter_for_shard( - shard_id: str, + shard_id: ShardIdentifier, ) -> Union[Result[_T], IteratorResult[_TP]]: bind_arguments = dict(orm_context.bind_arguments) @@ -409,14 +459,22 @@ def execute_and_instances( orm_context.update_execution_options(identity_token=shard_id) return orm_context.invoke_statement(bind_arguments=bind_arguments) - if active_options and active_options._identity_token is not None: - shard_id = active_options._identity_token - elif "_sa_shard_id" in orm_context.execution_options: - shard_id = orm_context.execution_options["_sa_shard_id"] - elif "shard_id" in orm_context.bind_arguments: - shard_id = orm_context.bind_arguments["shard_id"] + for orm_opt in orm_context._non_compile_orm_options: + # TODO: if we had an ORMOption that gets applied at ORM statement + # execution time, that would allow this to be more generalized. + # for now just iterate and look for our options + if isinstance(orm_opt, set_shard_id): + shard_id = orm_opt.shard_id + break else: - shard_id = None + if active_options and active_options._identity_token is not None: + shard_id = active_options._identity_token + elif "_sa_shard_id" in orm_context.execution_options: + shard_id = orm_context.execution_options["_sa_shard_id"] + elif "shard_id" in orm_context.bind_arguments: + shard_id = orm_context.bind_arguments["shard_id"] + else: + shard_id = None if shard_id is not None: return iter_for_shard(shard_id) |
