diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-03-28 13:12:38 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-03-28 13:12:38 -0400 |
| commit | 51fd3447373611af1b9f66a7f5f0c7c4abe94bbb (patch) | |
| tree | 2119202ddcc7282a59692a747703a7e234f5aa83 /lib/sqlalchemy | |
| parent | fb766e69bd2fa2811de127a69ad33d507621bff7 (diff) | |
| download | sqlalchemy-51fd3447373611af1b9f66a7f5f0c7c4abe94bbb.tar.gz | |
- The sqlalchemy.orm.shard module now becomes an extension,
sqlalchemy.ext.horizontal_shard. The old import
works with a deprecation warning.
Diffstat (limited to 'lib/sqlalchemy')
| -rw-r--r-- | lib/sqlalchemy/engine/__init__.py | 9 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/base.py | 3 | ||||
| -rw-r--r-- | lib/sqlalchemy/ext/horizontal_shard.py | 125 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/shard.py | 112 |
4 files changed, 138 insertions, 111 deletions
diff --git a/lib/sqlalchemy/engine/__init__.py b/lib/sqlalchemy/engine/__init__.py index 0dbb2404f..9b3dbedd8 100644 --- a/lib/sqlalchemy/engine/__init__.py +++ b/lib/sqlalchemy/engine/__init__.py @@ -107,10 +107,11 @@ def create_engine(*args, **kwargs): arguments sent as options to the dialect and resulting Engine. The URL is a string in the form - ``dialect://user:password@host/dbname[?key=value..]``, where - ``dialect`` is a name such as ``mysql``, ``oracle``, ``postgresql``, - etc. Alternatively, the URL can be an instance of - :class:`~sqlalchemy.engine.url.URL`. + ``dialect+driver://user:password@host/dbname[?key=value..]``, where + ``dialect`` is a database name such as ``mysql``, ``oracle``, + ``postgresql``, etc., and ``driver`` the name of a DBAPI, such as + ``psycopg2``, ``pyodbc``, ``cx_oracle``, etc. Alternatively, + the URL can be an instance of :class:`~sqlalchemy.engine.url.URL`. `**kwargs` takes a wide variety of options which are routed towards their appropriate components. Arguments may be diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 5490169c6..dc42ed957 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -1420,6 +1420,9 @@ class Engine(Connectable, log.Identified): """ Connects a :class:`~sqlalchemy.pool.Pool` and :class:`~sqlalchemy.engine.base.Dialect` together to provide a source of database connectivity and behavior. + + An :class:`Engine` object is instantiated publically using the :func:`~sqlalchemy.create_engine` + function. """ diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py new file mode 100644 index 000000000..78e3f5953 --- /dev/null +++ b/lib/sqlalchemy/ext/horizontal_shard.py @@ -0,0 +1,125 @@ +# horizontal_shard.py +# Copyright (C) the SQLAlchemy authors and contributors +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php + +"""Horizontal sharding support. + +Defines a rudimental 'horizontal sharding' system which allows a Session to +distribute queries and persistence operations across multiple databases. + +For a usage example, see the :ref:`examples_sharding` example included in +the source distrbution. + +""" + +import sqlalchemy.exceptions as sa_exc +from sqlalchemy import util +from sqlalchemy.orm.session import Session +from sqlalchemy.orm.query import Query + +__all__ = ['ShardedSession', 'ShardedQuery'] + + +class ShardedSession(Session): + def __init__(self, shard_chooser, id_chooser, query_chooser, shards=None, **kwargs): + """Construct a ShardedSession. + + :param shard_chooser: A callable which, passed a Mapper, a mapped instance, and possibly a + SQL clause, returns a shard ID. This id may be based off of the + attributes present within the object, or on some round-robin + scheme. If the scheme is based on a selection, it should set + whatever state on the instance to mark it in the future as + participating in that shard. + + :param id_chooser: A callable, passed a query and a tuple of identity values, which + should return a list of shard ids where the ID might reside. The + databases will be queried in the order of this listing. + + :param query_chooser: For a given Query, returns the list of shard_ids where the query + should be issued. Results from all shards returned will be combined + together into a single listing. + + :param shards: A dictionary of string shard names to :class:`~sqlalchemy.engine.base.Engine` + objects. + + """ + super(ShardedSession, self).__init__(**kwargs) + self.shard_chooser = shard_chooser + self.id_chooser = id_chooser + self.query_chooser = query_chooser + self.__binds = {} + self._mapper_flush_opts = {'connection_callable':self.connection} + self._query_cls = ShardedQuery + if shards is not None: + for k in shards: + self.bind_shard(k, shards[k]) + + def connection(self, mapper=None, instance=None, shard_id=None, **kwargs): + if shard_id is None: + shard_id = self.shard_chooser(mapper, instance) + + if self.transaction is not None: + return self.transaction.connection(mapper, shard_id=shard_id) + else: + return self.get_bind(mapper, + shard_id=shard_id, + instance=instance).contextual_connect(**kwargs) + + def get_bind(self, mapper, shard_id=None, instance=None, clause=None, **kw): + if shard_id is None: + shard_id = self.shard_chooser(mapper, instance, clause=clause) + return self.__binds[shard_id] + + def bind_shard(self, shard_id, bind): + self.__binds[shard_id] = bind + +class ShardedQuery(Query): + def __init__(self, *args, **kwargs): + super(ShardedQuery, self).__init__(*args, **kwargs) + self.id_chooser = self.session.id_chooser + self.query_chooser = self.session.query_chooser + self._shard_id = None + + def set_shard(self, shard_id): + """return a new query, limited to a single shard ID. + + all subsequent operations with the returned query will + be against the single shard regardless of other state. + """ + + q = self._clone() + q._shard_id = shard_id + return q + + def _execute_and_instances(self, context): + if self._shard_id is not None: + result = self.session.connection( + mapper=self._mapper_zero(), + shard_id=self._shard_id).execute(context.statement, self._params) + return self.instances(result, context) + else: + partial = [] + for shard_id in self.query_chooser(self): + result = self.session.connection( + mapper=self._mapper_zero(), + shard_id=shard_id).execute(context.statement, self._params) + partial = partial + list(self.instances(result, context)) + + # if some kind of in memory 'sorting' + # were done, this is where it would happen + return iter(partial) + + def get(self, ident, **kwargs): + if self._shard_id is not None: + return super(ShardedQuery, self).get(ident) + else: + ident = util.to_list(ident) + for shard_id in self.id_chooser(self, ident): + o = self.set_shard(shard_id).get(ident, **kwargs) + if o is not None: + return o + else: + return None + diff --git a/lib/sqlalchemy/orm/shard.py b/lib/sqlalchemy/orm/shard.py index b6026bbc3..9cb26db79 100644 --- a/lib/sqlalchemy/orm/shard.py +++ b/lib/sqlalchemy/orm/shard.py @@ -4,114 +4,12 @@ # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php -"""Horizontal sharding support. - -Defines a rudimental 'horizontal sharding' system which allows a Session to -distribute queries and persistence operations across multiple databases. - -For a usage example, see the file ``examples/sharding/attribute_shard.py`` -included in the source distrbution. - -""" - -import sqlalchemy.exceptions as sa_exc from sqlalchemy import util -from sqlalchemy.orm.session import Session -from sqlalchemy.orm.query import Query - -__all__ = ['ShardedSession', 'ShardedQuery'] - - -class ShardedSession(Session): - def __init__(self, shard_chooser, id_chooser, query_chooser, shards=None, **kwargs): - """Construct a ShardedSession. - - shard_chooser - A callable which, passed a Mapper, a mapped instance, and possibly a - SQL clause, returns a shard ID. This id may be based off of the - attributes present within the object, or on some round-robin - scheme. If the scheme is based on a selection, it should set - whatever state on the instance to mark it in the future as - participating in that shard. - - id_chooser - A callable, passed a query and a tuple of identity values, which - should return a list of shard ids where the ID might reside. The - databases will be queried in the order of this listing. - - query_chooser - For a given Query, returns the list of shard_ids where the query - should be issued. Results from all shards returned will be combined - together into a single listing. - - """ - super(ShardedSession, self).__init__(**kwargs) - self.shard_chooser = shard_chooser - self.id_chooser = id_chooser - self.query_chooser = query_chooser - self.__binds = {} - self._mapper_flush_opts = {'connection_callable':self.connection} - self._query_cls = ShardedQuery - if shards is not None: - for k in shards: - self.bind_shard(k, shards[k]) - - def connection(self, mapper=None, instance=None, shard_id=None, **kwargs): - if shard_id is None: - shard_id = self.shard_chooser(mapper, instance) - - if self.transaction is not None: - return self.transaction.connection(mapper, shard_id=shard_id) - else: - return self.get_bind(mapper, shard_id=shard_id, instance=instance).contextual_connect(**kwargs) - - def get_bind(self, mapper, shard_id=None, instance=None, clause=None, **kw): - if shard_id is None: - shard_id = self.shard_chooser(mapper, instance, clause=clause) - return self.__binds[shard_id] - def bind_shard(self, shard_id, bind): - self.__binds[shard_id] = bind +util.warn_deprecated( + "Horizontal sharding is now importable via " + "'import sqlalchemy.ext.horizontal_shard" +) -class ShardedQuery(Query): - def __init__(self, *args, **kwargs): - super(ShardedQuery, self).__init__(*args, **kwargs) - self.id_chooser = self.session.id_chooser - self.query_chooser = self.session.query_chooser - self._shard_id = None - - def set_shard(self, shard_id): - """return a new query, limited to a single shard ID. - - all subsequent operations with the returned query will - be against the single shard regardless of other state. - """ - - q = self._clone() - q._shard_id = shard_id - return q - - def _execute_and_instances(self, context): - if self._shard_id is not None: - result = self.session.connection(mapper=self._mapper_zero(), shard_id=self._shard_id).execute(context.statement, self._params) - return self.instances(result, context) - else: - partial = [] - for shard_id in self.query_chooser(self): - result = self.session.connection(mapper=self._mapper_zero(), shard_id=shard_id).execute(context.statement, self._params) - partial = partial + list(self.instances(result, context)) - # if some kind of in memory 'sorting' were done, this is where it would happen - return iter(partial) +from sqlalchemy.ext.horizontal_shard import * - def get(self, ident, **kwargs): - if self._shard_id is not None: - return super(ShardedQuery, self).get(ident) - else: - ident = util.to_list(ident) - for shard_id in self.id_chooser(self, ident): - o = self.set_shard(shard_id).get(ident, **kwargs) - if o is not None: - return o - else: - return None - |
