diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2006-05-25 14:20:23 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2006-05-25 14:20:23 +0000 |
| commit | bb79e2e871d0a4585164c1a6ed626d96d0231975 (patch) | |
| tree | 6d457ba6c36c408b45db24ec3c29e147fe7504ff /lib/sqlalchemy/engine | |
| parent | 4fc3a0648699c2b441251ba4e1d37a9107bd1986 (diff) | |
| download | sqlalchemy-bb79e2e871d0a4585164c1a6ed626d96d0231975.tar.gz | |
merged 0.2 branch into trunk; 0.1 now in sqlalchemy/branches/rel_0_1
Diffstat (limited to 'lib/sqlalchemy/engine')
| -rw-r--r-- | lib/sqlalchemy/engine/__init__.py | 92 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/base.py | 687 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/default.py | 213 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/strategies.py | 70 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/threadlocal.py | 84 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/url.py | 81 |
6 files changed, 1227 insertions, 0 deletions
diff --git a/lib/sqlalchemy/engine/__init__.py b/lib/sqlalchemy/engine/__init__.py new file mode 100644 index 000000000..2cb94a90d --- /dev/null +++ b/lib/sqlalchemy/engine/__init__.py @@ -0,0 +1,92 @@ +# engine/__init__.py +# Copyright (C) 2005,2006 Michael Bayer mike_mp@zzzcomputing.com +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php + +import sqlalchemy.databases + +from base import * +import strategies +import re + +def engine_descriptors(): + """provides a listing of all the database implementations supported. this data + is provided as a list of dictionaries, where each dictionary contains the following + key/value pairs: + + name : the name of the engine, suitable for use in the create_engine function + + description: a plain description of the engine. + + arguments : a dictionary describing the name and description of each parameter + used to connect to this engine's underlying DBAPI. + + This function is meant for usage in automated configuration tools that wish to + query the user for database and connection information. + """ + result = [] + #for module in sqlalchemy.databases.__all__: + for module in ['sqlite', 'postgres', 'mysql']: + module = getattr(__import__('sqlalchemy.databases.%s' % module).databases, module) + result.append(module.descriptor()) + return result + +default_strategy = 'plain' +def create_engine(*args, **kwargs): + """creates a new Engine instance. Using the given strategy name, + locates that strategy and invokes its create() method to produce the Engine. + The strategies themselves are instances of EngineStrategy, and the built in + ones are present in the sqlalchemy.engine.strategies module. Current implementations + include "plain" and "threadlocal". The default used by this function is "threadlocal". + + "plain" provides support for a Connection object which can be used to execute SQL queries + with a specific underlying DBAPI connection. + + "threadlocal" is similar to "plain" except that it adds support for a thread-local connection and + transaction context, which allows a group of engine operations to participate using the same + connection and transaction without the need for explicit passing of a Connection object. + + The standard method of specifying the engine is via URL as the first positional + argument, to indicate the appropriate database dialect and connection arguments, with additional + keyword arguments sent as options to the dialect and resulting Engine. + + The URL is in the form <dialect>://opt1=val1&opt2=val2. + Where <dialect> is a name such as "mysql", "oracle", "postgres", and the options indicate + username, password, database, etc. Supported keynames include "username", "user", "password", + "pw", "db", "database", "host", "filename". + + **kwargs represents options to be sent to the Engine itself as well as the components of the Engine, + including the Dialect, the ConnectionProvider, and the Pool. A list of common options is as follows: + + pool=None : an instance of sqlalchemy.pool.DBProxy or sqlalchemy.pool.Pool to be used as the + underlying source for connections (DBProxy/Pool is described in the previous section). If None, + a default DBProxy will be created using the engine's own database module with the given + arguments. + + echo=False : if True, the Engine will log all statements as well as a repr() of their + parameter lists to the engines logger, which defaults to sys.stdout. A Engine instances' + "echo" data member can be modified at any time to turn logging on and off. If set to the string + 'debug', result rows will be printed to the standard output as well. + + logger=None : a file-like object where logging output can be sent, if echo is set to True. + This defaults to sys.stdout. + + encoding='utf-8' : the encoding to be used when encoding/decoding Unicode strings + + convert_unicode=False : True if unicode conversion should be applied to all str types + + module=None : used by Oracle and Postgres, this is a reference to a DBAPI2 module to be used + instead of the engine's default module. For Postgres, the default is psycopg2, or psycopg1 if + 2 cannot be found. For Oracle, its cx_Oracle. For mysql, MySQLdb. + + use_ansi=True : used only by Oracle; when False, the Oracle driver attempts to support a + particular "quirk" of some Oracle databases, that the LEFT OUTER JOIN SQL syntax is not + supported, and the "Oracle join" syntax of using <column1>(+)=<column2> must be used + in order to achieve a LEFT OUTER JOIN. Its advised that the Oracle database be configured to + have full ANSI support instead of using this feature. + + """ + strategy = kwargs.pop('strategy', default_strategy) + strategy = strategies.strategies[strategy] + return strategy.create(*args, **kwargs) diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py new file mode 100644 index 000000000..bf7b1c20d --- /dev/null +++ b/lib/sqlalchemy/engine/base.py @@ -0,0 +1,687 @@ +from sqlalchemy import exceptions, sql, schema, util, types +import StringIO, sys, re + +class ConnectionProvider(object): + """defines an interface that returns raw Connection objects (or compatible).""" + def get_connection(self): + """this method should return a Connection or compatible object from a DBAPI which + also contains a close() method. + It is not defined what context this connection belongs to. It may be newly connected, + returned from a pool, part of some other kind of context such as thread-local, + or can be a fixed member of this object.""" + raise NotImplementedError() + def dispose(self): + """releases all resources corresponding to this ConnectionProvider, such + as any underlying connection pools.""" + raise NotImplementedError() + +class Dialect(sql.AbstractDialect): + """Adds behavior to the execution of queries to provide + support for column defaults, differences between paramstyles, quirks between post-execution behavior, + and a general consistentization of the behavior of various DBAPIs. + + The Dialect should also implement the following two attributes: + + positional - True if the paramstyle for this Dialect is positional + + paramstyle - the paramstyle to be used (some DBAPIs support multiple paramstyles) + + supports_autoclose_results - usually True; if False, indicates that rows returned by fetchone() + might not be just plain tuples, and may be "live" proxy objects which still require the cursor + to be open in order to be read (such as pyPgSQL which has active filehandles for BLOBs). in that + case, an auto-closing ResultProxy cannot automatically close itself after results are consumed. + + convert_unicode - True if unicode conversion should be applied to all str types + + encoding - type of encoding to use for unicode, usually defaults to 'utf-8' + """ + def create_connect_args(self, opts): + """given a dictionary of key-valued connect parameters, returns a tuple + consisting of a *args/**kwargs suitable to send directly to the dbapi's connect function. + The connect args will have any number of the following keynames: host, hostname, database, dbanme, + user,username, password, pw, passwd, filename.""" + raise NotImplementedError() + def convert_compiled_params(self, parameters): + """given a sql.ClauseParameters object, returns an array or dictionary suitable to pass + directly to this Dialect's DBAPI's execute method.""" + def type_descriptor(self, typeobj): + """provides a database-specific TypeEngine object, given the generic object + which comes from the types module. Subclasses will usually use the adapt_type() + method in the types module to make this job easy.""" + raise NotImplementedError() + def oid_column_name(self): + """returns the oid column name for this dialect, or None if the dialect cant/wont support OID/ROWID.""" + raise NotImplementedError() + def supports_sane_rowcount(self): + """Provided to indicate when MySQL is being used, which does not have standard behavior + for the "rowcount" function on a statement handle. """ + raise NotImplementedError() + def schemagenerator(self, engine, proxy, **params): + """returns a schema.SchemaVisitor instance that can generate schemas, when it is + invoked to traverse a set of schema objects. + + schemagenerator is called via the create() method on Table, Index, and others. + """ + raise NotImplementedError() + def schemadropper(self, engine, proxy, **params): + """returns a schema.SchemaVisitor instance that can drop schemas, when it is + invoked to traverse a set of schema objects. + + schemagenerator is called via the drop() method on Table, Index, and others. + """ + raise NotImplementedError() + def defaultrunner(self, engine, proxy, **params): + """returns a schema.SchemaVisitor instances that can execute defaults.""" + raise NotImplementedError() + def compiler(self, statement, parameters): + """returns a sql.ClauseVisitor which will produce a string representation of the given + ClauseElement and parameter dictionary. This object is usually a subclass of + ansisql.ANSICompiler. + + compiler is called within the context of the compile() method.""" + raise NotImplementedError() + def reflecttable(self, connection, table): + """given an Connection and a Table object, reflects its columns and properties from the database.""" + raise NotImplementedError() + def has_table(self, connection, table_name): + raise NotImplementedError() + def dbapi(self): + """subclasses override this method to provide the DBAPI module used to establish + connections.""" + raise NotImplementedError() + def get_default_schema_name(self, connection): + """returns the currently selected schema given an connection""" + raise NotImplementedError() + def execution_context(self): + """returns a new ExecutionContext object.""" + raise NotImplementedError() + def do_begin(self, connection): + """provides an implementation of connection.begin()""" + raise NotImplementedError() + def do_rollback(self, connection): + """provides an implementation of connection.rollback()""" + raise NotImplementedError() + def do_commit(self, connection): + """provides an implementation of connection.commit()""" + raise NotImplementedError() + def do_executemany(self, cursor, statement, parameters): + raise NotImplementedError() + def do_execute(self, cursor, statement, parameters): + raise NotImplementedError() + +class ExecutionContext(object): + """a messenger object for a Dialect that corresponds to a single execution. The Dialect + should provide an ExecutionContext via the create_execution_context() method. + The pre_exec and post_exec methods will be called for compiled statements, afterwhich + it is expected that the various methods last_inserted_ids, last_inserted_params, etc. + will contain appropriate values, if applicable.""" + def pre_exec(self, engine, proxy, compiled, parameters): + """called before an execution of a compiled statement. proxy is a callable that + takes a string statement and a bind parameter list/dictionary.""" + raise NotImplementedError() + def post_exec(self, engine, proxy, compiled, parameters): + """called after the execution of a compiled statement. proxy is a callable that + takes a string statement and a bind parameter list/dictionary.""" + raise NotImplementedError() + def get_rowcount(self, cursor): + """returns the count of rows updated/deleted for an UPDATE/DELETE statement""" + raise NotImplementedError() + def supports_sane_rowcount(self): + """Provided to indicate when MySQL is being used, which does not have standard behavior + for the "rowcount" function on a statement handle. """ + raise NotImplementedError() + def last_inserted_ids(self): + """returns the list of the primary key values for the last insert statement executed. + This does not apply to straight textual clauses; only to sql.Insert objects compiled against + a schema.Table object, which are executed via statement.execute(). The order of items in the + list is the same as that of the Table's 'primary_key' attribute. + + In some cases, this method may invoke a query back to the database to retrieve the data, based on + the "lastrowid" value in the cursor.""" + raise NotImplementedError() + def last_inserted_params(self): + """returns a dictionary of the full parameter dictionary for the last compiled INSERT statement, + including any ColumnDefaults or Sequences that were pre-executed. this value is thread-local.""" + raise NotImplementedError() + def last_updated_params(self): + """returns a dictionary of the full parameter dictionary for the last compiled UPDATE statement, + including any ColumnDefaults that were pre-executed. this value is thread-local.""" + raise NotImplementedError() + def lastrow_has_defaults(self): + """returns True if the last row INSERTED via a compiled insert statement contained PassiveDefaults, + indicating that the database inserted data beyond that which we gave it. this value is thread-local.""" + raise NotImplementedError() + +class Connectable(object): + """interface for an object that can provide an Engine and a Connection object which correponds to that Engine.""" + def contextual_connect(self): + """returns a Connection object which may be part of an ongoing context.""" + raise NotImplementedError() + def create(self, entity, **kwargs): + """creates a table or index given an appropriate schema object.""" + raise NotImplementedError() + def drop(self, entity, **kwargs): + raise NotImplementedError() + def execute(self, object, *multiparams, **params): + raise NotImplementedError() + def _not_impl(self): + raise NotImplementedError() + engine = property(_not_impl, doc="returns the Engine which this Connectable is associated with.") + +class Connection(Connectable): + """represents a single DBAPI connection returned from the underlying connection pool. Provides + execution support for string-based SQL statements as well as ClauseElement, Compiled and DefaultGenerator objects. + provides a begin method to return Transaction objects.""" + def __init__(self, engine, connection=None, close_with_result=False): + self.__engine = engine + self.__connection = connection or engine.raw_connection() + self.__transaction = None + self.__close_with_result = close_with_result + engine = property(lambda s:s.__engine, doc="The Engine with which this Connection is associated (read only)") + connection = property(lambda s:s.__connection, doc="The underlying DBAPI connection managed by this Connection.") + should_close_with_result = property(lambda s:s.__close_with_result, doc="Indicates if this Connection should be closed when a corresponding ResultProxy is closed; this is essentially an auto-release mode.") + def _create_transaction(self, parent): + return Transaction(self, parent) + def connect(self): + """connect() is implemented to return self so that an incoming Engine or Connection object can be treated similarly.""" + return self + def contextual_connect(self, **kwargs): + """contextual_connect() is implemented to return self so that an incoming Engine or Connection object can be treated similarly.""" + return self + def begin(self): + if self.__transaction is None: + self.__transaction = self._create_transaction(None) + return self.__transaction + else: + return self._create_transaction(self.__transaction) + def _begin_impl(self): + if self.__engine.echo: + self.__engine.log("BEGIN") + self.__engine.dialect.do_begin(self.__connection) + def _rollback_impl(self): + if self.__engine.echo: + self.__engine.log("ROLLBACK") + self.__engine.dialect.do_rollback(self.__connection) + def _commit_impl(self): + if self.__engine.echo: + self.__engine.log("COMMIT") + self.__engine.dialect.do_commit(self.__connection) + def _autocommit(self, statement): + """when no Transaction is present, this is called after executions to provide "autocommit" behavior.""" + # TODO: have the dialect determine if autocommit can be set on the connection directly without this + # extra step + if self.__transaction is None and re.match(r'UPDATE|INSERT|CREATE|DELETE|DROP', statement.lstrip().upper()): + self._commit_impl() + def close(self): + if self.__connection is not None: + self.__connection.close() + self.__connection = None + def scalar(self, object, parameters, **kwargs): + row = self.execute(object, parameters, **kwargs).fetchone() + if row is not None: + return row[0] + else: + return None + def execute(self, object, *multiparams, **params): + return Connection.executors[type(object).__mro__[-2]](self, object, *multiparams, **params) + def execute_default(self, default, **kwargs): + return default.accept_schema_visitor(self.__engine.dialect.defaultrunner(self.__engine, self.proxy, **kwargs)) + def execute_text(self, statement, parameters=None): + cursor = self._execute_raw(statement, parameters) + return ResultProxy(self.__engine, self, cursor) + def _params_to_listofdicts(self, *multiparams, **params): + if len(multiparams) == 0: + return [params] + elif len(multiparams) == 1: + if multiparams[0] == None: + return [{}] + elif isinstance (multiparams[0], list) or isinstance (multiparams[0], tuple): + return multiparams[0] + else: + return [multiparams[0]] + else: + return multiparams + def execute_clauseelement(self, elem, *multiparams, **params): + executemany = len(multiparams) > 0 + if executemany: + param = multiparams[0] + else: + param = params + return self.execute_compiled(elem.compile(engine=self.__engine, parameters=param), *multiparams, **params) + def execute_compiled(self, compiled, *multiparams, **params): + """executes a sql.Compiled object.""" + cursor = self.__connection.cursor() + parameters = [compiled.get_params(**m) for m in self._params_to_listofdicts(*multiparams, **params)] + if len(parameters) == 1: + parameters = parameters[0] + def proxy(statement=None, parameters=None): + if statement is None: + return cursor + + parameters = self.__engine.dialect.convert_compiled_params(parameters) + self._execute_raw(statement, parameters, cursor=cursor, context=context) + return cursor + context = self.__engine.dialect.create_execution_context() + context.pre_exec(self.__engine, proxy, compiled, parameters) + proxy(str(compiled), parameters) + context.post_exec(self.__engine, proxy, compiled, parameters) + return ResultProxy(self.__engine, self, cursor, context, typemap=compiled.typemap) + + # poor man's multimethod/generic function thingy + executors = { + sql.ClauseElement : execute_clauseelement, + sql.Compiled : execute_compiled, + schema.SchemaItem:execute_default, + str.__mro__[-2] : execute_text + } + + def create(self, entity, **kwargs): + """creates a table or index given an appropriate schema object.""" + return self.__engine.create(entity, connection=self, **kwargs) + def drop(self, entity, **kwargs): + """drops a table or index given an appropriate schema object.""" + return self.__engine.drop(entity, connection=self, **kwargs) + def reflecttable(self, table, **kwargs): + """reflects the columns in the given table from the database.""" + return self.__engine.reflecttable(table, connection=self, **kwargs) + def default_schema_name(self): + return self.__engine.dialect.get_default_schema_name(self) + def run_callable(self, callable_): + callable_(self) + def _execute_raw(self, statement, parameters=None, cursor=None, echo=None, context=None, **kwargs): + if cursor is None: + cursor = self.__connection.cursor() + try: + if echo is True or self.__engine.echo is not False: + self.__engine.log(statement) + self.__engine.log(repr(parameters)) + if parameters is not None and isinstance(parameters, list) and len(parameters) > 0 and (isinstance(parameters[0], list) or isinstance(parameters[0], dict)): + self._executemany(cursor, statement, parameters, context=context) + else: + self._execute(cursor, statement, parameters, context=context) + self._autocommit(statement) + except: + raise + return cursor + + def _execute(self, c, statement, parameters, context=None): + if parameters is None: + if self.__engine.dialect.positional: + parameters = () + else: + parameters = {} + try: + self.__engine.dialect.do_execute(c, statement, parameters, context=context) + except Exception, e: + self._rollback_impl() + if self.__close_with_result: + self.close() + raise exceptions.SQLError(statement, parameters, e) + def _executemany(self, c, statement, parameters, context=None): + try: + self.__engine.dialect.do_executemany(c, statement, parameters, context=context) + except Exception, e: + self._rollback_impl() + if self.__close_with_result: + self.close() + raise exceptions.SQLError(statement, parameters, e) + def proxy(self, statement=None, parameters=None): + """executes the given statement string and parameter object. + the parameter object is expected to be the result of a call to compiled.get_params(). + This callable is a generic version of a connection/cursor-specific callable that + is produced within the execute_compiled method, and is used for objects that require + this style of proxy when outside of an execute_compiled method, primarily the DefaultRunner.""" + parameters = self.__engine.dialect.convert_compiled_params(parameters) + return self._execute_raw(statement, parameters) + +class Transaction(object): + """represents a Transaction in progress""" + def __init__(self, connection, parent): + self.__connection = connection + self.__parent = parent or self + self.__is_active = True + if self.__parent is self: + self.__connection._begin_impl() + connection = property(lambda s:s.__connection, doc="The Connection object referenced by this Transaction") + def rollback(self): + if not self.__parent.__is_active: + raise exceptions.InvalidRequestError("This transaction is inactive") + if self.__parent is self: + self.__connection._rollback_impl() + self.__is_active = False + else: + self.__parent.rollback() + def commit(self): + if not self.__parent.__is_active: + raise exceptions.InvalidRequestError("This transaction is inactive") + if self.__parent is self: + self.__connection._commit_impl() + self.__is_active = False + +class ComposedSQLEngine(sql.Engine, Connectable): + """ + Connects a ConnectionProvider, a Dialect and a CompilerFactory together to + provide a default implementation of SchemaEngine. + """ + def __init__(self, connection_provider, dialect, echo=False, logger=None, **kwargs): + self.connection_provider = connection_provider + self.dialect=dialect + self.echo = echo + self.logger = logger or util.Logger(origin='engine') + + name = property(lambda s:sys.modules[s.dialect.__module__].descriptor()['name']) + engine = property(lambda s:s) + + def dispose(self): + self.connection_provider.dispose() + def create(self, entity, connection=None, **kwargs): + """creates a table or index within this engine's database connection given a schema.Table object.""" + self._run_visitor(self.dialect.schemagenerator, entity, connection=connection, **kwargs) + def drop(self, entity, connection=None, **kwargs): + """drops a table or index within this engine's database connection given a schema.Table object.""" + self._run_visitor(self.dialect.schemadropper, entity, connection=connection, **kwargs) + def execute_default(self, default, **kwargs): + connection = self.contextual_connect() + try: + return connection.execute_default(default, **kwargs) + finally: + connection.close() + + def _func(self): + return sql.FunctionGenerator(self) + func = property(_func) + def text(self, text, *args, **kwargs): + """returns a sql.text() object for performing literal queries.""" + return sql.text(text, engine=self, *args, **kwargs) + + def _run_visitor(self, visitorcallable, element, connection=None, **kwargs): + if connection is None: + conn = self.contextual_connect() + else: + conn = connection + try: + element.accept_schema_visitor(visitorcallable(self, conn.proxy, **kwargs)) + finally: + if connection is None: + conn.close() + + def transaction(self, callable_, connection=None, *args, **kwargs): + if connection is None: + conn = self.contextual_connect() + else: + conn = connection + try: + trans = conn.begin() + try: + ret = callable_(conn, *args, **kwargs) + trans.commit() + return ret + except: + trans.rollback() + raise + finally: + if connection is None: + conn.close() + + def run_callable(self, callable_, connection=None, *args, **kwargs): + if connection is None: + conn = self.contextual_connect() + else: + conn = connection + try: + return callable_(conn, *args, **kwargs) + finally: + if connection is None: + conn.close() + + def execute(self, statement, *multiparams, **params): + connection = self.contextual_connect(close_with_result=True) + return connection.execute(statement, *multiparams, **params) + + def execute_compiled(self, compiled, *multiparams, **params): + connection = self.contextual_connect(close_with_result=True) + return connection.execute_compiled(compiled, *multiparams, **params) + + def compiler(self, statement, parameters, **kwargs): + return self.dialect.compiler(statement, parameters, engine=self, **kwargs) + + def connect(self, **kwargs): + """returns a newly allocated Connection object.""" + return Connection(self, **kwargs) + + def contextual_connect(self, close_with_result=False, **kwargs): + """returns a Connection object which may be newly allocated, or may be part of some + ongoing context. This Connection is meant to be used by the various "auto-connecting" operations.""" + return Connection(self, close_with_result=close_with_result, **kwargs) + + def reflecttable(self, table, connection=None): + """given a Table object, reflects its columns and properties from the database.""" + if connection is None: + conn = self.contextual_connect() + else: + conn = connection + try: + self.dialect.reflecttable(conn, table) + finally: + if connection is None: + conn.close() + def has_table(self, table_name): + return self.run_callable(lambda c: self.dialect.has_table(c, table_name)) + + def raw_connection(self): + """returns a DBAPI connection.""" + return self.connection_provider.get_connection() + + def log(self, msg): + """logs a message using this SQLEngine's logger stream.""" + self.logger.write(msg) + +class ResultProxy: + """wraps a DBAPI cursor object to provide access to row columns based on integer + position, case-insensitive column name, or by schema.Column object. e.g.: + + row = fetchone() + + col1 = row[0] # access via integer position + + col2 = row['col2'] # access via name + + col3 = row[mytable.c.mycol] # access via Column object. + + ResultProxy also contains a map of TypeEngine objects and will invoke the appropriate + convert_result_value() method before returning columns. + """ + class AmbiguousColumn(object): + def __init__(self, key): + self.key = key + def convert_result_value(self, arg, engine): + raise InvalidRequestError("Ambiguous column name '%s' in result set! try 'use_labels' option on select statement." % (self.key)) + + def __init__(self, engine, connection, cursor, executioncontext=None, typemap=None): + """ResultProxy objects are constructed via the execute() method on SQLEngine.""" + self.connection = connection + self.dialect = engine.dialect + self.cursor = cursor + self.engine = engine + self.closed = False + self.executioncontext = executioncontext + self.echo = engine.echo=="debug" + if executioncontext: + self.rowcount = executioncontext.get_rowcount(cursor) + else: + self.rowcount = cursor.rowcount + metadata = cursor.description + self.props = {} + self.keys = [] + i = 0 + if metadata is not None: + for item in metadata: + # sqlite possibly prepending table name to colnames so strip + colname = item[0].split('.')[-1].lower() + if typemap is not None: + rec = (typemap.get(colname, types.NULLTYPE), i) + else: + rec = (types.NULLTYPE, i) + if rec[0] is None: + raise DBAPIError("None for metadata " + colname) + if self.props.setdefault(colname, rec) is not rec: + self.props[colname] = (ResultProxy.AmbiguousColumn(colname), 0) + self.keys.append(colname) + self.props[i] = rec + i+=1 + def close(self): + if not self.closed: + self.closed = True + if self.connection.should_close_with_result and self.dialect.supports_autoclose_results: + self.connection.close() + def _get_col(self, row, key): + if isinstance(key, sql.ColumnElement): + try: + rec = self.props[key._label.lower()] + except KeyError: + try: + rec = self.props[key.key.lower()] + except KeyError: + rec = self.props[key.name.lower()] + elif isinstance(key, str): + rec = self.props[key.lower()] + else: + rec = self.props[key] + return rec[0].dialect_impl(self.dialect).convert_result_value(row[rec[1]], self.dialect) + + def __iter__(self): + while True: + row = self.fetchone() + if row is None: + raise StopIteration + else: + yield row + + def last_inserted_ids(self): + return self.executioncontext.last_inserted_ids() + def last_updated_params(self): + return self.executioncontext.last_updated_params() + def last_inserted_params(self): + return self.executioncontext.last_inserted_params() + def lastrow_has_defaults(self): + return self.executioncontext.lastrow_has_defaults() + def supports_sane_rowcount(self): + return self.executioncontext.supports_sane_rowcount() + + def fetchall(self): + """fetches all rows, just like DBAPI cursor.fetchall().""" + l = [] + while True: + v = self.fetchone() + if v is None: + return l + l.append(v) + + def fetchone(self): + """fetches one row, just like DBAPI cursor.fetchone().""" + row = self.cursor.fetchone() + if row is not None: + if self.echo: self.engine.log(repr(row)) + return RowProxy(self, row) + else: + # controversy! can we auto-close the cursor after results are consumed ? + # what if the returned rows are still hanging around, and are "live" objects + # and not just plain tuples ? + self.close() + return None + +class RowProxy: + """proxies a single cursor row for a parent ResultProxy.""" + def __init__(self, parent, row): + """RowProxy objects are constructed by ResultProxy objects.""" + self.__parent = parent + self.__row = row + def close(self): + self.__parent.close() + def __iter__(self): + for i in range(0, len(self.__row)): + yield self.__parent._get_col(self.__row, i) + def __eq__(self, other): + return (other is self) or (other == tuple([self.__parent._get_col(self.__row, key) for key in range(0, len(self.__row))])) + def __repr__(self): + return repr(tuple([self.__parent._get_col(self.__row, key) for key in range(0, len(self.__row))])) + def __getitem__(self, key): + return self.__parent._get_col(self.__row, key) + def __getattr__(self, name): + try: + return self.__parent._get_col(self.__row, name) + except KeyError: + raise AttributeError + def items(self): + return [(key, getattr(self, key)) for key in self.keys()] + def keys(self): + return self.__parent.keys + def values(self): + return list(self) + def __len__(self): + return len(self.__row) + +class SchemaIterator(schema.SchemaVisitor): + """a visitor that can gather text into a buffer and execute the contents of the buffer.""" + def __init__(self, engine, proxy, **params): + self.proxy = proxy + self.engine = engine + self.buffer = StringIO.StringIO() + + def append(self, s): + """appends content to the SchemaIterator's query buffer.""" + self.buffer.write(s) + + def execute(self): + """executes the contents of the SchemaIterator's buffer using its sql proxy and + clears out the buffer.""" + try: + return self.proxy(self.buffer.getvalue(), None) + finally: + self.buffer.truncate(0) + +class DefaultRunner(schema.SchemaVisitor): + def __init__(self, engine, proxy): + self.proxy = proxy + self.engine = engine + + def get_column_default(self, column): + if column.default is not None: + return column.default.accept_schema_visitor(self) + else: + return None + + def get_column_onupdate(self, column): + if column.onupdate is not None: + return column.onupdate.accept_schema_visitor(self) + else: + return None + + def visit_passive_default(self, default): + """passive defaults by definition return None on the app side, + and are post-fetched to get the DB-side value""" + return None + + def visit_sequence(self, seq): + """sequences are not supported by default""" + return None + + def exec_default_sql(self, default): + c = sql.select([default.arg], engine=self.engine).compile() + return self.proxy(str(c), c.get_params()).fetchone()[0] + + def visit_column_onupdate(self, onupdate): + if isinstance(onupdate.arg, sql.ClauseElement): + return self.exec_default_sql(onupdate) + elif callable(onupdate.arg): + return onupdate.arg() + else: + return onupdate.arg + + def visit_column_default(self, default): + if isinstance(default.arg, sql.ClauseElement): + return self.exec_default_sql(default) + elif callable(default.arg): + return default.arg() + else: + return default.arg diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py new file mode 100644 index 000000000..40978204a --- /dev/null +++ b/lib/sqlalchemy/engine/default.py @@ -0,0 +1,213 @@ +# engine/default.py +# Copyright (C) 2005,2006 Michael Bayer mike_mp@zzzcomputing.com +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php + + +from sqlalchemy import schema, exceptions, util, sql, types +import sqlalchemy.pool +import StringIO, sys, re +import base + +"""provides default implementations of the engine interfaces""" + + +class PoolConnectionProvider(base.ConnectionProvider): + def __init__(self, dialect, url, poolclass=None, pool=None, **kwargs): + (cargs, cparams) = dialect.create_connect_args(url) + if pool is None: + kwargs.setdefault('echo', False) + kwargs.setdefault('use_threadlocal',True) + if poolclass is None: + poolclass = sqlalchemy.pool.QueuePool + dbapi = dialect.dbapi() + if dbapi is None: + raise exceptions.InvalidRequestException("Cant get DBAPI module for dialect '%s'" % dialect) + self._pool = poolclass(lambda: dbapi.connect(*cargs, **cparams), **kwargs) + else: + if isinstance(pool, sqlalchemy.pool.DBProxy): + self._pool = pool.get_pool(*cargs, **cparams) + else: + self._pool = pool + def get_connection(self): + return self._pool.connect() + def dispose(self): + self._pool.dispose() + if hasattr(self, '_dbproxy'): + self._dbproxy.dispose() + +class DefaultDialect(base.Dialect): + """default implementation of Dialect""" + def __init__(self, convert_unicode=False, encoding='utf-8', **kwargs): + self.convert_unicode = convert_unicode + self.supports_autoclose_results = True + self.encoding = encoding + self.positional = False + self.paramstyle = 'named' + self._ischema = None + self._figure_paramstyle() + def create_execution_context(self): + return DefaultExecutionContext(self) + def type_descriptor(self, typeobj): + """provides a database-specific TypeEngine object, given the generic object + which comes from the types module. Subclasses will usually use the adapt_type() + method in the types module to make this job easy.""" + if type(typeobj) is type: + typeobj = typeobj() + return typeobj + def oid_column_name(self): + return None + def supports_sane_rowcount(self): + return True + def do_begin(self, connection): + """implementations might want to put logic here for turning autocommit on/off, + etc.""" + pass + def do_rollback(self, connection): + """implementations might want to put logic here for turning autocommit on/off, + etc.""" + #print "ENGINE ROLLBACK ON ", connection.connection + connection.rollback() + def do_commit(self, connection): + """implementations might want to put logic here for turning autocommit on/off, etc.""" + #print "ENGINE COMMIT ON ", connection.connection + connection.commit() + def do_executemany(self, cursor, statement, parameters, **kwargs): + cursor.executemany(statement, parameters) + def do_execute(self, cursor, statement, parameters, **kwargs): + cursor.execute(statement, parameters) + def defaultrunner(self, engine, proxy): + return base.DefaultRunner(engine, proxy) + + def _set_paramstyle(self, style): + self._paramstyle = style + self._figure_paramstyle(style) + paramstyle = property(lambda s:s._paramstyle, _set_paramstyle) + + def convert_compiled_params(self, parameters): + executemany = parameters is not None and isinstance(parameters, list) + # the bind params are a CompiledParams object. but all the DBAPI's hate + # that object (or similar). so convert it to a clean + # dictionary/list/tuple of dictionary/tuple of list + if parameters is not None: + if self.positional: + if executemany: + parameters = [p.values() for p in parameters] + else: + parameters = parameters.values() + else: + if executemany: + parameters = [p.get_raw_dict() for p in parameters] + else: + parameters = parameters.get_raw_dict() + return parameters + + def _figure_paramstyle(self, paramstyle=None): + db = self.dbapi() + if paramstyle is not None: + self._paramstyle = paramstyle + elif db is not None: + self._paramstyle = db.paramstyle + else: + self._paramstyle = 'named' + + if self._paramstyle == 'named': + self.positional=False + elif self._paramstyle == 'pyformat': + self.positional=False + elif self._paramstyle == 'qmark' or self._paramstyle == 'format' or self._paramstyle == 'numeric': + # for positional, use pyformat internally, ANSICompiler will convert + # to appropriate character upon compilation + self.positional = True + else: + raise DBAPIError("Unsupported paramstyle '%s'" % self._paramstyle) + + def _get_ischema(self): + # We use a property for ischema so that the accessor + # creation only happens as needed, since otherwise we + # have a circularity problem with the generic + # ansisql.engine() + if self._ischema is None: + import sqlalchemy.databases.information_schema as ischema + self._ischema = ischema.ISchema(self) + return self._ischema + ischema = property(_get_ischema, doc="""returns an ISchema object for this engine, which allows access to information_schema tables (if supported)""") + +class DefaultExecutionContext(base.ExecutionContext): + def __init__(self, dialect): + self.dialect = dialect + def pre_exec(self, engine, proxy, compiled, parameters): + self._process_defaults(engine, proxy, compiled, parameters) + def post_exec(self, engine, proxy, compiled, parameters): + pass + def get_rowcount(self, cursor): + if hasattr(self, '_rowcount'): + return self._rowcount + else: + return cursor.rowcount + def supports_sane_rowcount(self): + return self.dialect.supports_sane_rowcount() + def last_inserted_ids(self): + return self._last_inserted_ids + def last_inserted_params(self): + return self._last_inserted_params + def last_updated_params(self): + return self._last_updated_params + def lastrow_has_defaults(self): + return self._lastrow_has_defaults + def _process_defaults(self, engine, proxy, compiled, parameters): + """INSERT and UPDATE statements, when compiled, may have additional columns added to their + VALUES and SET lists corresponding to column defaults/onupdates that are present on the + Table object (i.e. ColumnDefault, Sequence, PassiveDefault). This method pre-execs those + DefaultGenerator objects that require pre-execution and sets their values within the + parameter list, and flags the thread-local state about + PassiveDefault objects that may require post-fetching the row after it is inserted/updated. + This method relies upon logic within the ANSISQLCompiler in its visit_insert and + visit_update methods that add the appropriate column clauses to the statement when its + being compiled, so that these parameters can be bound to the statement.""" + if compiled is None: return + if getattr(compiled, "isinsert", False): + if isinstance(parameters, list): + plist = parameters + else: + plist = [parameters] + drunner = self.dialect.defaultrunner(engine, proxy) + self._lastrow_has_defaults = False + for param in plist: + last_inserted_ids = [] + need_lastrowid=False + for c in compiled.statement.table.c: + if not param.has_key(c.name) or param[c.name] is None: + if isinstance(c.default, schema.PassiveDefault): + self._lastrow_has_defaults = True + newid = drunner.get_column_default(c) + if newid is not None: + param[c.name] = newid + if c.primary_key: + last_inserted_ids.append(param[c.name]) + elif c.primary_key: + need_lastrowid = True + elif c.primary_key: + last_inserted_ids.append(param[c.name]) + if need_lastrowid: + self._last_inserted_ids = None + else: + self._last_inserted_ids = last_inserted_ids + self._last_inserted_params = param + elif getattr(compiled, 'isupdate', False): + if isinstance(parameters, list): + plist = parameters + else: + plist = [parameters] + drunner = self.dialect.defaultrunner(engine, proxy) + self._lastrow_has_defaults = False + for param in plist: + for c in compiled.statement.table.c: + if c.onupdate is not None and (not param.has_key(c.name) or param[c.name] is None): + value = drunner.get_column_onupdate(c) + if value is not None: + param[c.name] = value + self._last_updated_params = param + + diff --git a/lib/sqlalchemy/engine/strategies.py b/lib/sqlalchemy/engine/strategies.py new file mode 100644 index 000000000..a4f406502 --- /dev/null +++ b/lib/sqlalchemy/engine/strategies.py @@ -0,0 +1,70 @@ +"""defines different strategies for creating new instances of sql.Engine. +by default there are two, one which is the "thread-local" strategy, one which is the "plain" strategy. +new strategies can be added via constructing a new EngineStrategy object which will add itself to the +list of available strategies here, or replace one of the existing name. +this can be accomplished via a mod; see the sqlalchemy/mods package for details.""" + + +from sqlalchemy.engine import base, default, threadlocal, url + +strategies = {} + +class EngineStrategy(object): + """defines a function that receives input arguments and produces an instance of sql.Engine, typically + an instance sqlalchemy.engine.base.ComposedSQLEngine or a subclass.""" + def __init__(self, name): + """constructs a new EngineStrategy object and sets it in the list of available strategies + under this name.""" + self.name = name + strategies[self.name] = self + def create(self, *args, **kwargs): + """given arguments, returns a new sql.Engine instance.""" + raise NotImplementedError() + + +class PlainEngineStrategy(EngineStrategy): + def __init__(self): + EngineStrategy.__init__(self, 'plain') + def create(self, name_or_url, **kwargs): + u = url.make_url(name_or_url) + module = u.get_module() + + dialect = module.dialect(**kwargs) + + poolargs = {} + for key in (('echo', 'echo_pool'), ('pool_size', 'pool_size'), ('max_overflow', 'max_overflow'), ('poolclass', 'poolclass'), ('pool_timeout','timeout')): + if kwargs.has_key(key[0]): + poolargs[key[1]] = kwargs[key[0]] + poolclass = getattr(module, 'poolclass', None) + if poolclass is not None: + poolargs.setdefault('poolclass', poolclass) + poolargs['use_threadlocal'] = False + provider = default.PoolConnectionProvider(dialect, u, **poolargs) + + return base.ComposedSQLEngine(provider, dialect, **kwargs) +PlainEngineStrategy() + +class ThreadLocalEngineStrategy(EngineStrategy): + def __init__(self): + EngineStrategy.__init__(self, 'threadlocal') + def create(self, name_or_url, **kwargs): + u = url.make_url(name_or_url) + module = u.get_module() + + dialect = module.dialect(**kwargs) + + poolargs = {} + for key in (('echo', 'echo_pool'), ('pool_size', 'pool_size'), ('max_overflow', 'max_overflow'), ('poolclass', 'poolclass'), ('pool_timeout','timeout')): + if kwargs.has_key(key[0]): + poolargs[key[1]] = kwargs[key[0]] + poolclass = getattr(module, 'poolclass', None) + if poolclass is not None: + poolargs.setdefault('poolclass', poolclass) + poolargs['use_threadlocal'] = True + provider = threadlocal.TLocalConnectionProvider(dialect, u, **poolargs) + + return threadlocal.TLEngine(provider, dialect, **kwargs) +ThreadLocalEngineStrategy() + + + diff --git a/lib/sqlalchemy/engine/threadlocal.py b/lib/sqlalchemy/engine/threadlocal.py new file mode 100644 index 000000000..85628c208 --- /dev/null +++ b/lib/sqlalchemy/engine/threadlocal.py @@ -0,0 +1,84 @@ +from sqlalchemy import schema, exceptions, util, sql, types +import StringIO, sys, re +import base, default + +"""provides a thread-local transactional wrapper around the basic ComposedSQLEngine. multiple calls to engine.connect() +will return the same connection for the same thread. also provides begin/commit methods on the engine itself +which correspond to a thread-local transaction.""" + +class TLTransaction(base.Transaction): + def rollback(self): + try: + base.Transaction.rollback(self) + finally: + try: + del self.connection.engine.context.transaction + except AttributeError: + pass + def commit(self): + try: + base.Transaction.commit(self) + stack = self.connection.engine.context.transaction + stack.pop() + if len(stack) == 0: + del self.connection.engine.context.transaction + except: + try: + del self.connection.engine.context.transaction + except AttributeError: + pass + raise + +class TLConnection(base.Connection): + def _create_transaction(self, parent): + return TLTransaction(self, parent) + def begin(self): + t = base.Connection.begin(self) + if not hasattr(self.engine.context, 'transaction'): + self.engine.context.transaction = [] + self.engine.context.transaction.append(t) + return t + +class TLEngine(base.ComposedSQLEngine): + """a ComposedSQLEngine that includes support for thread-local managed transactions. This engine + is better suited to be used with threadlocal Pool object.""" + def __init__(self, *args, **kwargs): + """the TLEngine relies upon the ConnectionProvider having "threadlocal" behavior, + so that once a connection is checked out for the current thread, you get that same connection + repeatedly.""" + base.ComposedSQLEngine.__init__(self, *args, **kwargs) + self.context = util.ThreadLocal() + def raw_connection(self): + """returns a DBAPI connection.""" + return self.connection_provider.get_connection() + def connect(self, **kwargs): + """returns a Connection that is not thread-locally scoped. this is the equilvalent to calling + "connect()" on a ComposedSQLEngine.""" + return base.Connection(self, self.connection_provider.unique_connection()) + def contextual_connect(self, **kwargs): + """returns a TLConnection which is thread-locally scoped.""" + return TLConnection(self, **kwargs) + def begin(self): + return self.connect().begin() + def commit(self): + if hasattr(self.context, 'transaction'): + self.context.transaction[-1].commit() + def rollback(self): + if hasattr(self.context, 'transaction'): + self.context.transaction[-1].rollback() + def transaction(self, func, *args, **kwargs): + """executes the given function within a transaction boundary. this is a shortcut for + explicitly calling begin() and commit() and optionally rollback() when execptions are raised. + The given *args and **kwargs will be passed to the function as well, which could be handy + in constructing decorators.""" + trans = self.begin() + try: + func(*args, **kwargs) + except: + trans.rollback() + raise + trans.commit() + +class TLocalConnectionProvider(default.PoolConnectionProvider): + def unique_connection(self): + return self._pool.unique_connection() diff --git a/lib/sqlalchemy/engine/url.py b/lib/sqlalchemy/engine/url.py new file mode 100644 index 000000000..d79213c68 --- /dev/null +++ b/lib/sqlalchemy/engine/url.py @@ -0,0 +1,81 @@ +import re +import cgi + +class URL(object): + def __init__(self, drivername, username=None, password=None, host=None, port=None, database=None): + self.drivername = drivername + self.username = username + self.password = password + self.host = host + self.port = port + self.database= database + def __str__(self): + s = self.drivername + "://" + if self.username is not None: + s += self.username + if self.password is not None: + s += ':' + self.password + s += "@" + if self.host is not None: + s += self.host + if self.port is not None: + s += ':' + self.port + if self.database is not None: + s += '/' + self.database + return s + def get_module(self): + return getattr(__import__('sqlalchemy.databases.%s' % self.drivername).databases, self.drivername) + def translate_connect_args(self, names): + """translates this URL's attributes into a dictionary of connection arguments used by a specific dbapi. + the names parameter is a list of argument names in the form ('host', 'database', 'user', 'password', 'port') + where the given strings match the corresponding argument names for the dbapi. Will return a dictionary + with the dbapi-specific parameters.""" + a = {} + attribute_names = ['host', 'database', 'username', 'password', 'port'] + for n in names: + sname = attribute_names.pop(0) + if n is None: + continue + if getattr(self, sname, None) is not None: + a[n] = getattr(self, sname) + return a + + +def make_url(name_or_url): + if isinstance(name_or_url, str): + return _parse_rfc1738_args(name_or_url) + else: + return name_or_url + +def _parse_rfc1738_args(name): + pattern = re.compile(r''' + (\w+):// + (?: + ([^:]*) + (?::(.*))? + @)? + (?: + ([^/:]*) + (?::([^/]*))? + )? + (?:/(.*))? + ''' + , re.X) + + m = pattern.match(name) + if m is not None: + (name, username, password, host, port, database) = m.group(1, 2, 3, 4, 5, 6) + opts = {'username':username,'password':password,'host':host,'port':port,'database':database} + return URL(name, **opts) + else: + return None + +def _parse_keyvalue_args(name): + m = re.match( r'(\w+)://(.*)', name) + if m is not None: + (name, args) = m.group(1, 2) + opts = dict( cgi.parse_qsl( args ) ) + return URL(name, *opts) + else: + return None + |
