summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2023-01-24 11:05:12 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2023-01-25 19:42:14 -0500
commit8a32f367175871500723c5ebfc0f1af1564d3478 (patch)
tree90ab26593282bf30a6ac0a3b002493ebb8cb8e4f /lib/sqlalchemy/ext
parentd426d3bbad1d3e4a0b80e83c4423dea055609c15 (diff)
downloadsqlalchemy-8a32f367175871500723c5ebfc0f1af1564d3478.tar.gz
add set_shard_id() loader option for horizontal shard
Added new option to horizontal sharding API :class:`_horizontal.set_shard_id` which sets the effective shard identifier to query against, for both the primary query as well as for all secondary loaders including relationship eager loaders as well as relationship and column lazy loaders. Modernize sharding examples with new-style mappings, add new asyncio example. Fixes: #7226 Fixes: #7028 Change-Id: Ie69248060c305e8de04f75a529949777944ad511
Diffstat (limited to 'lib/sqlalchemy/ext')
-rw-r--r--lib/sqlalchemy/ext/horizontal_shard.py104
1 files changed, 81 insertions, 23 deletions
diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py
index df9d0d797..0bcc5628f 100644
--- a/lib/sqlalchemy/ext/horizontal_shard.py
+++ b/lib/sqlalchemy/ext/horizontal_shard.py
@@ -42,6 +42,7 @@ from .. import inspect
from .. import util
from ..orm import PassiveFlag
from ..orm._typing import OrmExecuteOptionsParameter
+from ..orm.interfaces import ORMOption
from ..orm.mapper import Mapper
from ..orm.query import Query
from ..orm.session import _BindArguments
@@ -73,7 +74,7 @@ _T = TypeVar("_T", bound=Any)
SelfShardedQuery = TypeVar("SelfShardedQuery", bound="ShardedQuery[Any]")
-_ShardKey = str
+ShardIdentifier = str
class ShardChooser(Protocol):
@@ -105,8 +106,7 @@ class ShardedQuery(Query[_T]):
.. legacy:: The :class:`.ShardedQuery` is a subclass of the legacy
:class:`.Query` class. The :class:`.ShardedSession` now supports
- 2.0 style execution via the :meth:`.ShardedSession.execute` method
- as well.
+ 2.0 style execution via the :meth:`.ShardedSession.execute` method.
"""
@@ -118,7 +118,9 @@ class ShardedQuery(Query[_T]):
self.execute_chooser = self.session.execute_chooser
self._shard_id = None
- def set_shard(self: SelfShardedQuery, shard_id: str) -> SelfShardedQuery:
+ def set_shard(
+ self: SelfShardedQuery, shard_id: ShardIdentifier
+ ) -> SelfShardedQuery:
"""Return a new query, limited to a single shard ID.
All subsequent operations with the returned query will
@@ -166,14 +168,12 @@ class ShardedSession(Session):
should set whatever state on the instance to mark it in the future as
participating in that shard.
- :param id_chooser: A callable, passed a :class:`.ShardedQuery` and a
- tuple of identity values, which should return a list of shard ids
- where the ID might reside. The databases will be queried in the order
- of this listing.
+ :param identity_chooser: A callable, passed a Mapper and primary key
+ argument, which should return a list of shard ids where this
+ primary key might reside.
- .. legacy:: This parameter still uses the legacy
- :class:`.ShardedQuery` class as an argument passed to the
- callable.
+ .. versionchanged:: 2.0 The ``identity_chooser`` parameter
+ supersedes the ``id_chooser`` parameter.
:param execute_chooser: For a given :class:`.ORMExecuteState`,
returns the list of shard_ids
@@ -250,7 +250,7 @@ class ShardedSession(Session):
"execute_chooser or query_chooser is required"
)
self.execute_chooser = execute_chooser
- self.__shards: Dict[_ShardKey, _SessionBind] = {}
+ self.__shards: Dict[ShardIdentifier, _SessionBind] = {}
if shards is not None:
for k in shards:
self.bind_shard(k, shards[k])
@@ -329,7 +329,7 @@ class ShardedSession(Session):
self,
mapper: Optional[Mapper[_T]] = None,
instance: Optional[Any] = None,
- shard_id: Optional[Any] = None,
+ shard_id: Optional[ShardIdentifier] = None,
**kw: Any,
) -> Connection:
"""Provide a :class:`_engine.Connection` to use in the unit of work
@@ -359,7 +359,7 @@ class ShardedSession(Session):
self,
mapper: Optional[_EntityBindKey[_O]] = None,
*,
- shard_id: Optional[_ShardKey] = None,
+ shard_id: Optional[ShardIdentifier] = None,
instance: Optional[Any] = None,
clause: Optional[ClauseElement] = None,
**kw: Any,
@@ -372,11 +372,61 @@ class ShardedSession(Session):
return self.__shards[shard_id]
def bind_shard(
- self, shard_id: _ShardKey, bind: Union[Engine, OptionEngine]
+ self, shard_id: ShardIdentifier, bind: Union[Engine, OptionEngine]
) -> None:
self.__shards[shard_id] = bind
+class set_shard_id(ORMOption):
+ """a loader option for statements to apply a specific shard id to the
+ primary query as well as for additional relationship and column
+ loaders.
+
+ The :class:`_horizontal.set_shard_id` option may be applied using
+ the :meth:`_sql.Executable.options` method of any executable statement::
+
+ stmt = (
+ select(MyObject).
+ where(MyObject.name == 'some name').
+ options(set_shard_id("shard1"))
+ )
+
+ Above, the statement when invoked will limit to the "shard1" shard
+ identifier for the primary query as well as for all relationship and
+ column loading strategies, including eager loaders such as
+ :func:`_orm.selectinload`, deferred column loaders like :func:`_orm.defer`,
+ and the lazy relationship loader :func:`_orm.lazyload`.
+
+ In this way, the :class:`_horizontal.set_shard_id` option has much wider
+ scope than using the "shard_id" argument within the
+ :paramref:`_orm.Session.execute.bind_arguments` dictionary.
+
+
+ .. versionadded:: 2.0.0
+
+ """
+
+ __slots__ = ("shard_id", "propagate_to_loaders")
+
+ def __init__(
+ self, shard_id: ShardIdentifier, propagate_to_loaders: bool = True
+ ):
+ """Construct a :class:`_horizontal.set_shard_id` option.
+
+ :param shard_id: shard identifier
+ :param propagate_to_loaders: if left at its default of ``True``, the
+ shard option will take place for lazy loaders such as
+ :func:`_orm.lazyload` and :func:`_orm.defer`; if False, the option
+ will not be propagated to loaded objects. Note that :func:`_orm.defer`
+ always limits to the shard_id of the parent row in any case, so the
+ parameter only has a net effect on the behavior of the
+ :func:`_orm.lazyload` strategy.
+
+ """
+ self.shard_id = shard_id
+ self.propagate_to_loaders = propagate_to_loaders
+
+
def execute_and_instances(
orm_context: ORMExecuteState,
) -> Union[Result[_T], IteratorResult[_TP]]:
@@ -400,7 +450,7 @@ def execute_and_instances(
assert isinstance(session, ShardedSession)
def iter_for_shard(
- shard_id: str,
+ shard_id: ShardIdentifier,
) -> Union[Result[_T], IteratorResult[_TP]]:
bind_arguments = dict(orm_context.bind_arguments)
@@ -409,14 +459,22 @@ def execute_and_instances(
orm_context.update_execution_options(identity_token=shard_id)
return orm_context.invoke_statement(bind_arguments=bind_arguments)
- if active_options and active_options._identity_token is not None:
- shard_id = active_options._identity_token
- elif "_sa_shard_id" in orm_context.execution_options:
- shard_id = orm_context.execution_options["_sa_shard_id"]
- elif "shard_id" in orm_context.bind_arguments:
- shard_id = orm_context.bind_arguments["shard_id"]
+ for orm_opt in orm_context._non_compile_orm_options:
+ # TODO: if we had an ORMOption that gets applied at ORM statement
+ # execution time, that would allow this to be more generalized.
+ # for now just iterate and look for our options
+ if isinstance(orm_opt, set_shard_id):
+ shard_id = orm_opt.shard_id
+ break
else:
- shard_id = None
+ if active_options and active_options._identity_token is not None:
+ shard_id = active_options._identity_token
+ elif "_sa_shard_id" in orm_context.execution_options:
+ shard_id = orm_context.execution_options["_sa_shard_id"]
+ elif "shard_id" in orm_context.bind_arguments:
+ shard_id = orm_context.bind_arguments["shard_id"]
+ else:
+ shard_id = None
if shard_id is not None:
return iter_for_shard(shard_id)