summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/strategies.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/engine/strategies.py')
-rw-r--r--lib/sqlalchemy/engine/strategies.py305
1 files changed, 4 insertions, 301 deletions
diff --git a/lib/sqlalchemy/engine/strategies.py b/lib/sqlalchemy/engine/strategies.py
index d3a22e5ac..41f582f51 100644
--- a/lib/sqlalchemy/engine/strategies.py
+++ b/lib/sqlalchemy/engine/strategies.py
@@ -5,310 +5,13 @@
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""Strategies for creating new instances of Engine types.
+"""Deprecated mock engine strategy used by Alembic.
-These are semi-private implementation classes which provide the
-underlying behavior for the "strategy" keyword argument available on
-:func:`~sqlalchemy.engine.create_engine`. Current available options are
-``plain``, ``threadlocal``, and ``mock``.
-New strategies can be added via new ``EngineStrategy`` classes.
"""
-from operator import attrgetter
+from .mock import MockConnection # noqa
-from . import base
-from . import threadlocal
-from . import url
-from .. import event
-from .. import pool as poollib
-from .. import util
-from ..sql import schema
-
-strategies = {}
-
-
-class EngineStrategy(object):
- """An adaptor that processes input arguments and produces an Engine.
-
- Provides a ``create`` method that receives input arguments and
- produces an instance of base.Engine or a subclass.
-
- """
-
- def __init__(self):
- strategies[self.name] = self
-
- def create(self, *args, **kwargs):
- """Given arguments, returns a new Engine instance."""
-
- raise NotImplementedError()
-
-
-class DefaultEngineStrategy(EngineStrategy):
- """Base class for built-in strategies."""
-
- def create(self, name_or_url, **kwargs):
- # create url.URL object
- u = url.make_url(name_or_url)
-
- plugins = u._instantiate_plugins(kwargs)
-
- u.query.pop("plugin", None)
- kwargs.pop("plugins", None)
-
- entrypoint = u._get_entrypoint()
- dialect_cls = entrypoint.get_dialect_cls(u)
-
- if kwargs.pop("_coerce_config", False):
-
- def pop_kwarg(key, default=None):
- value = kwargs.pop(key, default)
- if key in dialect_cls.engine_config_types:
- value = dialect_cls.engine_config_types[key](value)
- return value
-
- else:
- pop_kwarg = kwargs.pop
-
- dialect_args = {}
- # consume dialect arguments from kwargs
- for k in util.get_cls_kwargs(dialect_cls):
- if k in kwargs:
- dialect_args[k] = pop_kwarg(k)
-
- dbapi = kwargs.pop("module", None)
- if dbapi is None:
- dbapi_args = {}
- for k in util.get_func_kwargs(dialect_cls.dbapi):
- if k in kwargs:
- dbapi_args[k] = pop_kwarg(k)
- dbapi = dialect_cls.dbapi(**dbapi_args)
-
- dialect_args["dbapi"] = dbapi
-
- for plugin in plugins:
- plugin.handle_dialect_kwargs(dialect_cls, dialect_args)
-
- # create dialect
- dialect = dialect_cls(**dialect_args)
-
- # assemble connection arguments
- (cargs, cparams) = dialect.create_connect_args(u)
- cparams.update(pop_kwarg("connect_args", {}))
- cargs = list(cargs) # allow mutability
-
- # look for existing pool or create
- pool = pop_kwarg("pool", None)
- if pool is None:
-
- def connect(connection_record=None):
- if dialect._has_events:
- for fn in dialect.dispatch.do_connect:
- connection = fn(
- dialect, connection_record, cargs, cparams
- )
- if connection is not None:
- return connection
- return dialect.connect(*cargs, **cparams)
-
- creator = pop_kwarg("creator", connect)
-
- poolclass = pop_kwarg("poolclass", None)
- if poolclass is None:
- poolclass = dialect_cls.get_pool_class(u)
- pool_args = {"dialect": dialect}
-
- # consume pool arguments from kwargs, translating a few of
- # the arguments
- translate = {
- "logging_name": "pool_logging_name",
- "echo": "echo_pool",
- "timeout": "pool_timeout",
- "recycle": "pool_recycle",
- "events": "pool_events",
- "use_threadlocal": "pool_threadlocal",
- "reset_on_return": "pool_reset_on_return",
- "pre_ping": "pool_pre_ping",
- "use_lifo": "pool_use_lifo",
- }
- for k in util.get_cls_kwargs(poolclass):
- tk = translate.get(k, k)
- if tk in kwargs:
- pool_args[k] = pop_kwarg(tk)
-
- for plugin in plugins:
- plugin.handle_pool_kwargs(poolclass, pool_args)
-
- pool = poolclass(creator, **pool_args)
- else:
- if isinstance(pool, poollib.dbapi_proxy._DBProxy):
- pool = pool.get_pool(*cargs, **cparams)
- else:
- pool = pool
-
- pool._dialect = dialect
-
- # create engine.
- engineclass = self.engine_cls
- engine_args = {}
- for k in util.get_cls_kwargs(engineclass):
- if k in kwargs:
- engine_args[k] = pop_kwarg(k)
-
- _initialize = kwargs.pop("_initialize", True)
-
- # all kwargs should be consumed
- if kwargs:
- raise TypeError(
- "Invalid argument(s) %s sent to create_engine(), "
- "using configuration %s/%s/%s. Please check that the "
- "keyword arguments are appropriate for this combination "
- "of components."
- % (
- ",".join("'%s'" % k for k in kwargs),
- dialect.__class__.__name__,
- pool.__class__.__name__,
- engineclass.__name__,
- )
- )
-
- engine = engineclass(pool, dialect, u, **engine_args)
-
- if _initialize:
- do_on_connect = dialect.on_connect()
- if do_on_connect:
-
- def on_connect(dbapi_connection, connection_record):
- conn = getattr(
- dbapi_connection, "_sqla_unwrap", dbapi_connection
- )
- if conn is None:
- return
- do_on_connect(conn)
-
- event.listen(pool, "first_connect", on_connect)
- event.listen(pool, "connect", on_connect)
-
- def first_connect(dbapi_connection, connection_record):
- c = base.Connection(
- engine, connection=dbapi_connection, _has_events=False
- )
- c._execution_options = util.immutabledict()
- dialect.initialize(c)
- dialect.do_rollback(c.connection)
-
- event.listen(pool, "first_connect", first_connect, once=True)
-
- dialect_cls.engine_created(engine)
- if entrypoint is not dialect_cls:
- entrypoint.engine_created(engine)
-
- for plugin in plugins:
- plugin.engine_created(engine)
-
- return engine
-
-
-class PlainEngineStrategy(DefaultEngineStrategy):
- """Strategy for configuring a regular Engine."""
-
- name = "plain"
- engine_cls = base.Engine
-
-
-PlainEngineStrategy()
-
-
-class ThreadLocalEngineStrategy(DefaultEngineStrategy):
- """Strategy for configuring an Engine with threadlocal behavior."""
-
- name = "threadlocal"
- engine_cls = threadlocal.TLEngine
-
-
-ThreadLocalEngineStrategy()
-
-
-class MockEngineStrategy(EngineStrategy):
- """Strategy for configuring an Engine-like object with mocked execution.
-
- Produces a single mock Connectable object which dispatches
- statement execution to a passed-in function.
-
- """
-
- name = "mock"
-
- def create(self, name_or_url, executor, **kwargs):
- # create url.URL object
- u = url.make_url(name_or_url)
-
- dialect_cls = u.get_dialect()
-
- dialect_args = {}
- # consume dialect arguments from kwargs
- for k in util.get_cls_kwargs(dialect_cls):
- if k in kwargs:
- dialect_args[k] = kwargs.pop(k)
-
- # create dialect
- dialect = dialect_cls(**dialect_args)
-
- return MockEngineStrategy.MockConnection(dialect, executor)
-
- class MockConnection(base.Connectable):
- def __init__(self, dialect, execute):
- self._dialect = dialect
- self.execute = execute
-
- engine = property(lambda s: s)
- dialect = property(attrgetter("_dialect"))
- name = property(lambda s: s._dialect.name)
-
- schema_for_object = schema._schema_getter(None)
-
- def contextual_connect(self, **kwargs):
- return self
-
- def connect(self, **kwargs):
- return self
-
- def execution_options(self, **kw):
- return self
-
- def compiler(self, statement, parameters, **kwargs):
- return self._dialect.compiler(
- statement, parameters, engine=self, **kwargs
- )
-
- def create(self, entity, **kwargs):
- kwargs["checkfirst"] = False
- from sqlalchemy.engine import ddl
-
- ddl.SchemaGenerator(self.dialect, self, **kwargs).traverse_single(
- entity
- )
-
- def drop(self, entity, **kwargs):
- kwargs["checkfirst"] = False
- from sqlalchemy.engine import ddl
-
- ddl.SchemaDropper(self.dialect, self, **kwargs).traverse_single(
- entity
- )
-
- def _run_visitor(
- self, visitorcallable, element, connection=None, **kwargs
- ):
- kwargs["checkfirst"] = False
- visitorcallable(self.dialect, self, **kwargs).traverse_single(
- element
- )
-
- def execute(self, object_, *multiparams, **params):
- raise NotImplementedError()
-
-
-MockEngineStrategy()
+class MockEngineStrategy(object):
+ MockConnection = MockConnection