diff options
| author | mike bayer <mike_mp@zzzcomputing.com> | 2020-05-28 19:28:35 +0000 |
|---|---|---|
| committer | Gerrit Code Review <gerrit@bbpush.zzzcomputing.com> | 2020-05-28 19:28:35 +0000 |
| commit | 056bad48e2bc948a08621ab841fd882cb6934262 (patch) | |
| tree | 2635059b149309c2ad7a648bfce13fd5844d8dc8 /lib/sqlalchemy/engine | |
| parent | c07979e8d44a30fdf0ea73bc587aa05a52e9955a (diff) | |
| parent | 77f1b7d236dba6b1c859bb428ef32d118ec372e6 (diff) | |
| download | sqlalchemy-056bad48e2bc948a08621ab841fd882cb6934262.tar.gz | |
Merge "callcount reductions and refinement for cached queries"
Diffstat (limited to 'lib/sqlalchemy/engine')
| -rw-r--r-- | lib/sqlalchemy/engine/base.py | 159 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/cursor.py | 345 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/default.py | 166 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/events.py | 28 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/result.py | 128 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/row.py | 10 |
6 files changed, 421 insertions, 415 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 0193ea47c..a36f4eee2 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -25,6 +25,8 @@ from ..sql import util as sql_util """ +_EMPTY_EXECUTION_OPTS = util.immutabledict() + class Connection(Connectable): """Provides high-level functionality for a wrapped DB-API connection. @@ -1038,7 +1040,11 @@ class Connection(Connectable): distilled_parameters = _distill_params(multiparams, params) return self._exec_driver_sql( - object_, multiparams, params, distilled_parameters + object_, + multiparams, + params, + distilled_parameters, + _EMPTY_EXECUTION_OPTS, ) try: meth = object_._execute_on_connection @@ -1047,24 +1053,29 @@ class Connection(Connectable): exc.ObjectNotExecutableError(object_), replace_context=err ) else: - return meth(self, multiparams, params, util.immutabledict()) + return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS) - def _execute_function( - self, func, multiparams, params, execution_options=util.immutabledict() - ): + def _execute_function(self, func, multiparams, params, execution_options): """Execute a sql.FunctionElement object.""" - return self._execute_clauseelement(func.select(), multiparams, params) + return self._execute_clauseelement( + func.select(), multiparams, params, execution_options + ) def _execute_default( self, default, multiparams, params, - execution_options=util.immutabledict(), + # migrate is calling this directly :( + execution_options=_EMPTY_EXECUTION_OPTS, ): """Execute a schema.ColumnDefault object.""" + execution_options = self._execution_options.merge_with( + execution_options + ) + if self._has_events or self.engine._has_events: for fn in self.dispatch.before_execute: default, multiparams, params = fn( @@ -1096,11 +1107,13 @@ class Connection(Connectable): return ret - def _execute_ddl( - self, ddl, multiparams, params, execution_options=util.immutabledict() - ): + def _execute_ddl(self, ddl, multiparams, params, execution_options): """Execute a schema.DDL object.""" + execution_options = ddl._execution_options.merge_with( + self._execution_options, execution_options + ) + if self._has_events or self.engine._has_events: for fn in self.dispatch.before_execute: ddl, multiparams, params = fn( @@ -1130,11 +1143,16 @@ class Connection(Connectable): return ret def _execute_clauseelement( - self, elem, multiparams, params, execution_options=util.immutabledict() + self, elem, multiparams, params, execution_options ): """Execute a sql.ClauseElement object.""" - if self._has_events or self.engine._has_events: + execution_options = elem._execution_options.merge_with( + self._execution_options, execution_options + ) + + has_events = self._has_events or self.engine._has_events + if has_events: for fn in self.dispatch.before_execute: elem, multiparams, params = fn( self, elem, multiparams, params, execution_options @@ -1144,18 +1162,19 @@ class Connection(Connectable): if distilled_params: # ensure we don't retain a link to the view object for keys() # which links to the values, which we don't want to cache - keys = list(distilled_params[0].keys()) - + keys = sorted(distilled_params[0]) + inline = len(distilled_params) > 1 else: keys = [] + inline = False dialect = self.dialect - exec_opts = self._execution_options.merge_with(execution_options) - - schema_translate_map = exec_opts.get("schema_translate_map", None) + schema_translate_map = execution_options.get( + "schema_translate_map", None + ) - compiled_cache = exec_opts.get( + compiled_cache = execution_options.get( "compiled_cache", self.dialect._compiled_cache ) @@ -1165,13 +1184,13 @@ class Connection(Connectable): elem_cache_key = None if elem_cache_key: - cache_key, extracted_params, _ = elem_cache_key + cache_key, extracted_params = elem_cache_key key = ( dialect, cache_key, - tuple(sorted(keys)), + tuple(keys), bool(schema_translate_map), - len(distilled_params) > 1, + inline, ) compiled_sql = compiled_cache.get(key) @@ -1180,7 +1199,7 @@ class Connection(Connectable): dialect=dialect, cache_key=elem_cache_key, column_keys=keys, - inline=len(distilled_params) > 1, + inline=inline, schema_translate_map=schema_translate_map, linting=self.dialect.compiler_linting | compiler.WARN_LINTING, @@ -1191,7 +1210,7 @@ class Connection(Connectable): compiled_sql = elem.compile( dialect=dialect, column_keys=keys, - inline=len(distilled_params) > 1, + inline=inline, schema_translate_map=schema_translate_map, linting=self.dialect.compiler_linting | compiler.WARN_LINTING, ) @@ -1207,7 +1226,7 @@ class Connection(Connectable): elem, extracted_params, ) - if self._has_events or self.engine._has_events: + if has_events: self.dispatch.after_execute( self, elem, multiparams, params, execution_options, ret ) @@ -1218,9 +1237,17 @@ class Connection(Connectable): compiled, multiparams, params, - execution_options=util.immutabledict(), + execution_options=_EMPTY_EXECUTION_OPTS, ): - """Execute a sql.Compiled object.""" + """Execute a sql.Compiled object. + + TODO: why do we have this? likely deprecate or remove + + """ + + execution_options = compiled.execution_options.merge_with( + self._execution_options, execution_options + ) if self._has_events or self.engine._has_events: for fn in self.dispatch.before_execute: @@ -1253,9 +1280,13 @@ class Connection(Connectable): multiparams, params, distilled_parameters, - execution_options=util.immutabledict(), + execution_options, ): + execution_options = self._execution_options.merge_with( + execution_options + ) + if self._has_events or self.engine._has_events: for fn in self.dispatch.before_execute: statement, multiparams, params = fn( @@ -1282,7 +1313,7 @@ class Connection(Connectable): self, statement, parameters=None, - execution_options=util.immutabledict(), + execution_options=_EMPTY_EXECUTION_OPTS, ): multiparams, params, distilled_parameters = _distill_params_20( parameters @@ -1398,8 +1429,7 @@ class Connection(Connectable): if self._is_future and self._transaction is None: self.begin() - if context.compiled: - context.pre_exec() + context.pre_exec() cursor, statement, parameters = ( context.cursor, @@ -1495,30 +1525,35 @@ class Connection(Connectable): context.executemany, ) - if context.compiled: - context.post_exec() + context.post_exec() result = context._setup_result_proxy() - if ( - not self._is_future - # usually we're in a transaction so avoid relatively - # expensive / legacy should_autocommit call - and self._transaction is None - and context.should_autocommit - ): - self._commit_impl(autocommit=True) + if not self._is_future: + should_close_with_result = branched.should_close_with_result - # for "connectionless" execution, we have to close this - # Connection after the statement is complete. - # legacy stuff. - if branched.should_close_with_result and context._soft_closed: - assert not self._is_future - assert not context._is_future_result + if not result._soft_closed and should_close_with_result: + result._autoclose_connection = True + + if ( + # usually we're in a transaction so avoid relatively + # expensive / legacy should_autocommit call + self._transaction is None + and context.should_autocommit + ): + self._commit_impl(autocommit=True) + + # for "connectionless" execution, we have to close this + # Connection after the statement is complete. + # legacy stuff. + if should_close_with_result and context._soft_closed: + assert not self._is_future + assert not context._is_future_result + + # CursorResult already exhausted rows / has no rows. + # close us now + branched.close() - # CursorResult already exhausted rows / has no rows. - # close us now - branched.close() except BaseException as e: self._handle_dbapi_exception( e, statement, parameters, cursor, context @@ -2319,7 +2354,7 @@ class Engine(Connectable, log.Identified): """ - _execution_options = util.immutabledict() + _execution_options = _EMPTY_EXECUTION_OPTS _has_events = False _connection_cls = Connection _sqla_logger_namespace = "sqlalchemy.engine.Engine" @@ -2709,13 +2744,29 @@ class Engine(Connectable, log.Identified): """ return self.execute(statement, *multiparams, **params).scalar() - def _execute_clauseelement(self, elem, multiparams=None, params=None): + def _execute_clauseelement( + self, + elem, + multiparams=None, + params=None, + execution_options=_EMPTY_EXECUTION_OPTS, + ): connection = self.connect(close_with_result=True) - return connection._execute_clauseelement(elem, multiparams, params) + return connection._execute_clauseelement( + elem, multiparams, params, execution_options + ) - def _execute_compiled(self, compiled, multiparams, params): + def _execute_compiled( + self, + compiled, + multiparams, + params, + execution_options=_EMPTY_EXECUTION_OPTS, + ): connection = self.connect(close_with_result=True) - return connection._execute_compiled(compiled, multiparams, params) + return connection._execute_compiled( + compiled, multiparams, params, execution_options + ) def connect(self, close_with_result=False): """Return a new :class:`_engine.Connection` object. diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py index fdbf826ed..c32427644 100644 --- a/lib/sqlalchemy/engine/cursor.py +++ b/lib/sqlalchemy/engine/cursor.py @@ -10,12 +10,12 @@ import collections +import functools from .result import Result from .result import ResultMetaData from .result import SimpleResultMetaData from .result import tuplegetter -from .row import _baserow_usecext from .row import LegacyRow from .. import exc from .. import util @@ -89,14 +89,6 @@ class CursorResultMetaData(ResultMetaData): for index, rec in enumerate(self._metadata_for_keys(keys)) ] new_metadata._keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs} - if not _baserow_usecext: - # TODO: can consider assembling ints + negative ints here - new_metadata._keymap.update( - { - index: (index, new_keys[index], ()) - for index in range(len(new_keys)) - } - ) # TODO: need unit test for: # result = connection.execute("raw sql, no columns").scalars() @@ -186,25 +178,6 @@ class CursorResultMetaData(ResultMetaData): ) self._keymap = {} - if not _baserow_usecext: - # keymap indexes by integer index: this is only used - # in the pure Python BaseRow.__getitem__ - # implementation to avoid an expensive - # isinstance(key, util.int_types) in the most common - # case path - - len_raw = len(raw) - - self._keymap.update( - [ - (metadata_entry[MD_INDEX], metadata_entry) - for metadata_entry in raw - ] - + [ - (metadata_entry[MD_INDEX] - len_raw, metadata_entry) - for metadata_entry in raw - ] - ) # processors in key order for certain per-row # views like __iter__ and slices @@ -623,20 +596,23 @@ class CursorResultMetaData(ResultMetaData): return index def _indexes_for_keys(self, keys): - for rec in self._metadata_for_keys(keys): - yield rec[0] + + try: + return [self._keymap[key][0] for key in keys] + except KeyError as ke: + # ensure it raises + CursorResultMetaData._key_fallback(self, ke.args[0], ke) def _metadata_for_keys(self, keys): for key in keys: - # TODO: can consider pre-loading ints and negative ints - # into _keymap - if isinstance(key, int): + if int in key.__class__.__mro__: key = self._keys[key] try: rec = self._keymap[key] except KeyError as ke: - rec = self._key_fallback(key, ke) + # ensure it raises + CursorResultMetaData._key_fallback(self, ke.args[0], ke) index = rec[0] @@ -786,25 +762,27 @@ class ResultFetchStrategy(object): __slots__ = () - def soft_close(self, result): + alternate_cursor_description = None + + def soft_close(self, result, dbapi_cursor): raise NotImplementedError() - def hard_close(self, result): + def hard_close(self, result, dbapi_cursor): raise NotImplementedError() - def yield_per(self, result, num): + def yield_per(self, result, dbapi_cursor, num): return - def fetchone(self, result, hard_close=False): + def fetchone(self, result, dbapi_cursor, hard_close=False): raise NotImplementedError() - def fetchmany(self, result, size=None): + def fetchmany(self, result, dbapi_cursor, size=None): raise NotImplementedError() def fetchall(self, result): raise NotImplementedError() - def handle_exception(self, result, err): + def handle_exception(self, result, dbapi_cursor, err): raise err @@ -819,21 +797,19 @@ class NoCursorFetchStrategy(ResultFetchStrategy): __slots__ = () - cursor_description = None - - def soft_close(self, result): + def soft_close(self, result, dbapi_cursor): pass - def hard_close(self, result): + def hard_close(self, result, dbapi_cursor): pass - def fetchone(self, result, hard_close=False): + def fetchone(self, result, dbapi_cursor, hard_close=False): return self._non_result(result, None) - def fetchmany(self, result, size=None): + def fetchmany(self, result, dbapi_cursor, size=None): return self._non_result(result, []) - def fetchall(self, result): + def fetchall(self, result, dbapi_cursor): return self._non_result(result, []) def _non_result(self, result, default, err=None): @@ -893,71 +869,59 @@ class CursorFetchStrategy(ResultFetchStrategy): """ - __slots__ = ("dbapi_cursor", "cursor_description") - - def __init__(self, dbapi_cursor, cursor_description): - self.dbapi_cursor = dbapi_cursor - self.cursor_description = cursor_description - - @classmethod - def create(cls, result): - dbapi_cursor = result.cursor - description = dbapi_cursor.description - - if description is None: - return _NO_CURSOR_DML - else: - return cls(dbapi_cursor, description) + __slots__ = () - def soft_close(self, result): + def soft_close(self, result, dbapi_cursor): result.cursor_strategy = _NO_CURSOR_DQL - def hard_close(self, result): + def hard_close(self, result, dbapi_cursor): result.cursor_strategy = _NO_CURSOR_DQL - def handle_exception(self, result, err): + def handle_exception(self, result, dbapi_cursor, err): result.connection._handle_dbapi_exception( - err, None, None, self.dbapi_cursor, result.context + err, None, None, dbapi_cursor, result.context ) - def yield_per(self, result, num): + def yield_per(self, result, dbapi_cursor, num): result.cursor_strategy = BufferedRowCursorFetchStrategy( - self.dbapi_cursor, - self.cursor_description, - num, - collections.deque(), + dbapi_cursor, + {"max_row_buffer": num}, + initial_buffer=collections.deque(), growth_factor=0, ) - def fetchone(self, result, hard_close=False): + def fetchone(self, result, dbapi_cursor, hard_close=False): try: - row = self.dbapi_cursor.fetchone() + row = dbapi_cursor.fetchone() if row is None: result._soft_close(hard=hard_close) return row except BaseException as e: - self.handle_exception(result, e) + self.handle_exception(result, dbapi_cursor, e) - def fetchmany(self, result, size=None): + def fetchmany(self, result, dbapi_cursor, size=None): try: if size is None: - l = self.dbapi_cursor.fetchmany() + l = dbapi_cursor.fetchmany() else: - l = self.dbapi_cursor.fetchmany(size) + l = dbapi_cursor.fetchmany(size) if not l: result._soft_close() return l except BaseException as e: - self.handle_exception(result, e) + self.handle_exception(result, dbapi_cursor, e) - def fetchall(self, result): + def fetchall(self, result, dbapi_cursor): try: - rows = self.dbapi_cursor.fetchall() + rows = dbapi_cursor.fetchall() result._soft_close() return rows except BaseException as e: - self.handle_exception(result, e) + self.handle_exception(result, dbapi_cursor, e) + + +_DEFAULT_FETCH = CursorFetchStrategy() class BufferedRowCursorFetchStrategy(CursorFetchStrategy): @@ -993,18 +957,18 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy): def __init__( self, dbapi_cursor, - description, - max_row_buffer, - initial_buffer, + execution_options, growth_factor=5, + initial_buffer=None, ): - super(BufferedRowCursorFetchStrategy, self).__init__( - dbapi_cursor, description - ) - self._max_row_buffer = max_row_buffer + self._max_row_buffer = execution_options.get("max_row_buffer", 1000) + + if initial_buffer is not None: + self._rowbuffer = initial_buffer + else: + self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1)) self._growth_factor = growth_factor - self._rowbuffer = initial_buffer if growth_factor: self._bufsize = min(self._max_row_buffer, self._growth_factor) @@ -1013,39 +977,19 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy): @classmethod def create(cls, result): - """Buffered row strategy has to buffer the first rows *before* - cursor.description is fetched so that it works with named cursors - correctly - - """ - - dbapi_cursor = result.cursor - - # TODO: is create() called within a handle_error block externally? - # can this be guaranteed / tested / etc - initial_buffer = collections.deque(dbapi_cursor.fetchmany(1)) - - description = dbapi_cursor.description - - if description is None: - return _NO_CURSOR_DML - else: - max_row_buffer = result.context.execution_options.get( - "max_row_buffer", 1000 - ) - return cls( - dbapi_cursor, description, max_row_buffer, initial_buffer - ) + return BufferedRowCursorFetchStrategy( + result.cursor, result.context.execution_options, + ) - def _buffer_rows(self, result): + def _buffer_rows(self, result, dbapi_cursor): size = self._bufsize try: if size < 1: - new_rows = self.dbapi_cursor.fetchall() + new_rows = dbapi_cursor.fetchall() else: - new_rows = self.dbapi_cursor.fetchmany(size) + new_rows = dbapi_cursor.fetchmany(size) except BaseException as e: - self.handle_exception(result, e) + self.handle_exception(result, dbapi_cursor, e) if not new_rows: return @@ -1055,21 +999,25 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy): self._max_row_buffer, size * self._growth_factor ) - def yield_per(self, result, num): + def yield_per(self, result, dbapi_cursor, num): self._growth_factor = 0 self._max_row_buffer = self._bufsize = num - def soft_close(self, result): + def soft_close(self, result, dbapi_cursor): self._rowbuffer.clear() - super(BufferedRowCursorFetchStrategy, self).soft_close(result) + super(BufferedRowCursorFetchStrategy, self).soft_close( + result, dbapi_cursor + ) - def hard_close(self, result): + def hard_close(self, result, dbapi_cursor): self._rowbuffer.clear() - super(BufferedRowCursorFetchStrategy, self).hard_close(result) + super(BufferedRowCursorFetchStrategy, self).hard_close( + result, dbapi_cursor + ) - def fetchone(self, result, hard_close=False): + def fetchone(self, result, dbapi_cursor, hard_close=False): if not self._rowbuffer: - self._buffer_rows(result) + self._buffer_rows(result, dbapi_cursor) if not self._rowbuffer: try: result._soft_close(hard=hard_close) @@ -1078,15 +1026,15 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy): return None return self._rowbuffer.popleft() - def fetchmany(self, result, size=None): + def fetchmany(self, result, dbapi_cursor, size=None): if size is None: - return self.fetchall(result) + return self.fetchall(result, dbapi_cursor) buf = list(self._rowbuffer) lb = len(buf) if size > lb: try: - buf.extend(self.dbapi_cursor.fetchmany(size - lb)) + buf.extend(dbapi_cursor.fetchmany(size - lb)) except BaseException as e: self.handle_exception(result, e) @@ -1094,14 +1042,14 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy): self._rowbuffer = collections.deque(buf[size:]) return result - def fetchall(self, result): + def fetchall(self, result, dbapi_cursor): try: - ret = list(self._rowbuffer) + list(self.dbapi_cursor.fetchall()) + ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall()) self._rowbuffer.clear() result._soft_close() return ret except BaseException as e: - self.handle_exception(result, e) + self.handle_exception(result, dbapi_cursor, e) class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): @@ -1113,42 +1061,42 @@ class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): """ - __slots__ = ("_rowbuffer",) + __slots__ = ("_rowbuffer", "alternate_cursor_description") - def __init__(self, dbapi_cursor, description, initial_buffer=None): - super(FullyBufferedCursorFetchStrategy, self).__init__( - dbapi_cursor, description - ) + def __init__( + self, dbapi_cursor, alternate_description, initial_buffer=None + ): + self.alternate_cursor_description = alternate_description if initial_buffer is not None: self._rowbuffer = collections.deque(initial_buffer) else: - self._rowbuffer = collections.deque(self.dbapi_cursor.fetchall()) - - @classmethod - def create_from_buffer(cls, dbapi_cursor, description, buffer): - return cls(dbapi_cursor, description, buffer) + self._rowbuffer = collections.deque(dbapi_cursor.fetchall()) - def yield_per(self, result, num): + def yield_per(self, result, dbapi_cursor, num): pass - def soft_close(self, result): + def soft_close(self, result, dbapi_cursor): self._rowbuffer.clear() - super(FullyBufferedCursorFetchStrategy, self).soft_close(result) + super(FullyBufferedCursorFetchStrategy, self).soft_close( + result, dbapi_cursor + ) - def hard_close(self, result): + def hard_close(self, result, dbapi_cursor): self._rowbuffer.clear() - super(FullyBufferedCursorFetchStrategy, self).hard_close(result) + super(FullyBufferedCursorFetchStrategy, self).hard_close( + result, dbapi_cursor + ) - def fetchone(self, result, hard_close=False): + def fetchone(self, result, dbapi_cursor, hard_close=False): if self._rowbuffer: return self._rowbuffer.popleft() else: result._soft_close(hard=hard_close) return None - def fetchmany(self, result, size=None): + def fetchmany(self, result, dbapi_cursor, size=None): if size is None: - return self.fetchall(result) + return self.fetchall(result, dbapi_cursor) buf = list(self._rowbuffer) rows = buf[0:size] @@ -1157,7 +1105,7 @@ class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): result._soft_close() return rows - def fetchall(self, result): + def fetchall(self, result, dbapi_cursor): ret = self._rowbuffer self._rowbuffer = collections.deque() result._soft_close() @@ -1210,40 +1158,53 @@ class BaseCursorResult(object): _soft_closed = False closed = False - @classmethod - def _create_for_context(cls, context): - - if context._is_future_result: - obj = CursorResult(context) - else: - obj = LegacyCursorResult(context) - return obj - - def __init__(self, context): + def __init__(self, context, cursor_strategy, cursor_description): self.context = context self.dialect = context.dialect self.cursor = context.cursor + self.cursor_strategy = cursor_strategy self.connection = context.root_connection self._echo = echo = ( self.connection._echo and context.engine._should_log_debug() ) - if echo: - log = self.context.engine.logger.debug + if cursor_description is not None: + # inline of Result._row_getter(), set up an initial row + # getter assuming no transformations will be called as this + # is the most common case + + if echo: + log = self.context.engine.logger.debug + + def log_row(row): + log("Row %r", sql_util._repr_row(row)) + return row - def log_row(row): - log("Row %r", sql_util._repr_row(row)) - return row + self._row_logging_fn = log_row + else: + log_row = None + + metadata = self._init_metadata(context, cursor_description) + + keymap = metadata._keymap + processors = metadata._processors + process_row = self._process_row + key_style = process_row._default_key_style + _make_row = functools.partial( + process_row, metadata, processors, keymap, key_style + ) + if log_row: - self._row_logging_fn = log_row + def make_row(row): + made_row = _make_row(row) + log_row(made_row) + return made_row - # this is a hook used by dialects to change the strategy, - # so for the moment we have to keep calling this every time - # :( - self.cursor_strategy = strat = context.get_result_cursor_strategy(self) + self._row_getter = make_row + else: + make_row = _make_row + self._set_memoized_attribute("_row_getter", make_row) - if strat.cursor_description is not None: - self._init_metadata(context, strat.cursor_description) else: self._metadata = _NO_RESULT_METADATA @@ -1251,19 +1212,41 @@ class BaseCursorResult(object): if context.compiled: if context.compiled._cached_metadata: cached_md = self.context.compiled._cached_metadata - self._metadata = cached_md self._metadata_from_cache = True + # result rewrite/ adapt step. two translations can occur here. + # one is if we are invoked against a cached statement, we want + # to rewrite the ResultMetaData to reflect the column objects + # that are in our current selectable, not the cached one. the + # other is, the CompileState can return an alternative Result + # object. Finally, CompileState might want to tell us to not + # actually do the ResultMetaData adapt step if it in fact has + # changed the selected columns in any case. + compiled = context.compiled + if ( + compiled + and not compiled._rewrites_selected_columns + and compiled.statement is not context.invoked_statement + ): + cached_md = cached_md._adapt_to_context(context) + + self._metadata = metadata = cached_md + else: self._metadata = ( - context.compiled._cached_metadata - ) = self._cursor_metadata(self, cursor_description) + metadata + ) = context.compiled._cached_metadata = self._cursor_metadata( + self, cursor_description + ) else: - self._metadata = self._cursor_metadata(self, cursor_description) + self._metadata = metadata = self._cursor_metadata( + self, cursor_description + ) if self._echo: context.engine.logger.debug( "Col %r", tuple(x[0] for x in cursor_description) ) + return metadata def _soft_close(self, hard=False): """Soft close this :class:`_engine.CursorResult`. @@ -1294,9 +1277,9 @@ class BaseCursorResult(object): if hard: self.closed = True - self.cursor_strategy.hard_close(self) + self.cursor_strategy.hard_close(self, self.cursor) else: - self.cursor_strategy.soft_close(self) + self.cursor_strategy.soft_close(self, self.cursor) if not self._soft_closed: cursor = self.cursor @@ -1632,19 +1615,19 @@ class CursorResult(BaseCursorResult, Result): fetchone = self.cursor_strategy.fetchone while True: - row = fetchone(self) + row = fetchone(self, self.cursor) if row is None: break yield row def _fetchone_impl(self, hard_close=False): - return self.cursor_strategy.fetchone(self, hard_close) + return self.cursor_strategy.fetchone(self, self.cursor, hard_close) def _fetchall_impl(self): - return self.cursor_strategy.fetchall(self) + return self.cursor_strategy.fetchall(self, self.cursor) def _fetchmany_impl(self, size=None): - return self.cursor_strategy.fetchmany(self, size) + return self.cursor_strategy.fetchmany(self, self.cursor, size) def _raw_row_iterator(self): return self._fetchiter_impl() @@ -1674,7 +1657,7 @@ class CursorResult(BaseCursorResult, Result): @_generative def yield_per(self, num): self._yield_per = num - self.cursor_strategy.yield_per(self, num) + self.cursor_strategy.yield_per(self, self.cursor, num) class LegacyCursorResult(CursorResult): diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index b5cb2a1b2..d0f5cfe96 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -709,6 +709,8 @@ class DefaultExecutionContext(interfaces.ExecutionContext): returned_defaults = None execution_options = util.immutabledict() + cursor_fetch_strategy = _cursor._DEFAULT_FETCH + cache_stats = None invoked_statement = None @@ -745,9 +747,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext): self.compiled = compiled = compiled_ddl self.isddl = True - self.execution_options = compiled.execution_options.merge_with( - connection._execution_options, execution_options - ) + self.execution_options = execution_options self._is_future_result = ( connection._is_future @@ -802,13 +802,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext): self.invoked_statement = invoked_statement self.compiled = compiled - # this should be caught in the engine before - # we get here - assert compiled.can_execute - - self.execution_options = compiled.execution_options.merge_with( - connection._execution_options, execution_options - ) + self.execution_options = execution_options self._is_future_result = ( connection._is_future @@ -829,7 +823,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext): if self.isinsert or self.isupdate or self.isdelete: self.is_crud = True self._is_explicit_returning = bool(compiled.statement._returning) - self._is_implicit_returning = bool( + self._is_implicit_returning = ( compiled.returning and not compiled.statement._returning ) @@ -853,7 +847,10 @@ class DefaultExecutionContext(interfaces.ExecutionContext): # this must occur before create_cursor() since the statement # has to be regexed in some cases for server side cursor - self.unicode_statement = util.text_type(compiled) + if util.py2k: + self.unicode_statement = util.text_type(compiled.string) + else: + self.unicode_statement = compiled.string self.cursor = self.create_cursor() @@ -909,32 +906,38 @@ class DefaultExecutionContext(interfaces.ExecutionContext): # Convert the dictionary of bind parameter values # into a dict or list to be sent to the DBAPI's # execute() or executemany() method. + parameters = [] if compiled.positional: - parameters = [ - dialect.execute_sequence_format( - [ - processors[key](compiled_params[key]) - if key in processors - else compiled_params[key] - for key in positiontup - ] - ) - for compiled_params in self.compiled_parameters - ] + for compiled_params in self.compiled_parameters: + param = [ + processors[key](compiled_params[key]) + if key in processors + else compiled_params[key] + for key in positiontup + ] + parameters.append(dialect.execute_sequence_format(param)) else: encode = not dialect.supports_unicode_statements + if encode: + encoder = dialect._encoder + for compiled_params in self.compiled_parameters: - parameters = [ - { - dialect._encoder(key)[0] - if encode - else key: processors[key](value) - if key in processors - else value - for key, value in compiled_params.items() - } - for compiled_params in self.compiled_parameters - ] + if encode: + param = { + encoder(key)[0]: processors[key](compiled_params[key]) + if key in processors + else compiled_params[key] + for key in compiled_params + } + else: + param = { + key: processors[key](compiled_params[key]) + if key in processors + else compiled_params[key] + for key in compiled_params + } + + parameters.append(param) self.parameters = dialect.execute_sequence_format(parameters) @@ -958,9 +961,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext): self.dialect = connection.dialect self.is_text = True - self.execution_options = self.execution_options.merge_with( - connection._execution_options, execution_options - ) + self.execution_options = execution_options self._is_future_result = ( connection._is_future @@ -1011,9 +1012,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext): self._dbapi_connection = dbapi_connection self.dialect = connection.dialect - self.execution_options = self.execution_options.merge_with( - connection._execution_options, execution_options - ) + self.execution_options = execution_options self._is_future_result = ( connection._is_future @@ -1214,25 +1213,6 @@ class DefaultExecutionContext(interfaces.ExecutionContext): def handle_dbapi_exception(self, e): pass - def get_result_cursor_strategy(self, result): - """Dialect-overriable hook to return the internal strategy that - fetches results. - - - Some dialects will in some cases return special objects here that - have pre-buffered rows from some source or another, such as turning - Oracle OUT parameters into rows to accommodate for "returning", - SQL Server fetching "returning" before it resets "identity insert", - etc. - - """ - if self._is_server_side: - strat_cls = _cursor.BufferedRowCursorFetchStrategy - else: - strat_cls = _cursor.CursorFetchStrategy - - return strat_cls.create(result) - @property def rowcount(self): return self.cursor.rowcount @@ -1245,9 +1225,28 @@ class DefaultExecutionContext(interfaces.ExecutionContext): def _setup_result_proxy(self): if self.is_crud or self.is_text: - result = self._setup_crud_result_proxy() + result = self._setup_dml_or_text_result() else: - result = _cursor.CursorResult._create_for_context(self) + strategy = self.cursor_fetch_strategy + if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: + strategy = _cursor.BufferedRowCursorFetchStrategy( + self.cursor, self.execution_options + ) + cursor_description = ( + strategy.alternate_cursor_description + or self.cursor.description + ) + if cursor_description is None: + strategy = _cursor._NO_CURSOR_DQL + + if self._is_future_result: + result = _cursor.CursorResult( + self, strategy, cursor_description + ) + else: + result = _cursor.LegacyCursorResult( + self, strategy, cursor_description + ) if ( self.compiled @@ -1256,33 +1255,8 @@ class DefaultExecutionContext(interfaces.ExecutionContext): ): self._setup_out_parameters(result) - if not self._is_future_result: - conn = self.root_connection - assert not conn._is_future - - if not result._soft_closed and conn.should_close_with_result: - result._autoclose_connection = True - self._soft_closed = result._soft_closed - # result rewrite/ adapt step. two translations can occur here. - # one is if we are invoked against a cached statement, we want - # to rewrite the ResultMetaData to reflect the column objects - # that are in our current selectable, not the cached one. the - # other is, the CompileState can return an alternative Result - # object. Finally, CompileState might want to tell us to not - # actually do the ResultMetaData adapt step if it in fact has - # changed the selected columns in any case. - compiled = self.compiled - if compiled: - adapt_metadata = ( - result._metadata_from_cache - and not compiled._rewrites_selected_columns - ) - - if adapt_metadata: - result._metadata = result._metadata._adapt_to_context(self) - return result def _setup_out_parameters(self, result): @@ -1313,7 +1287,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext): result.out_parameters = out_parameters - def _setup_crud_result_proxy(self): + def _setup_dml_or_text_result(self): if self.isinsert and not self.executemany: if ( not self._is_implicit_returning @@ -1326,7 +1300,23 @@ class DefaultExecutionContext(interfaces.ExecutionContext): elif not self._is_implicit_returning: self._setup_ins_pk_from_empty() - result = _cursor.CursorResult._create_for_context(self) + strategy = self.cursor_fetch_strategy + if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: + strategy = _cursor.BufferedRowCursorFetchStrategy( + self.cursor, self.execution_options + ) + cursor_description = ( + strategy.alternate_cursor_description or self.cursor.description + ) + if cursor_description is None: + strategy = _cursor._NO_CURSOR_DML + + if self._is_future_result: + result = _cursor.CursorResult(self, strategy, cursor_description) + else: + result = _cursor.LegacyCursorResult( + self, strategy, cursor_description + ) if self.isinsert: if self._is_implicit_returning: diff --git a/lib/sqlalchemy/engine/events.py b/lib/sqlalchemy/engine/events.py index 293c7afdd..ef760bb54 100644 --- a/lib/sqlalchemy/engine/events.py +++ b/lib/sqlalchemy/engine/events.py @@ -184,15 +184,11 @@ class ConnectionEvents(event.Events): :meth:`_engine.Connection.execute`. :param multiparams: Multiple parameter sets, a list of dictionaries. :param params: Single parameter set, a single dictionary. - :param execution_options: dictionary of per-execution execution - options passed along with the statement, if any. This only applies to - the the SQLAlchemy 2.0 version of :meth:`_engine.Connection.execute` - . To - view all execution options associated with the connection, access the - :meth:`_engine.Connection.get_execution_options` - method to view the fixed - execution options dictionary, then consider elements within this local - dictionary to be unioned into that dictionary. + :param execution_options: dictionary of execution + options passed along with the statement, if any. This is a merge + of all options that will be used, including those of the statement, + the connection, and those passed in to the method itself for + the 2.0 style of execution. .. versionadded: 1.4 @@ -231,15 +227,11 @@ class ConnectionEvents(event.Events): :meth:`_engine.Connection.execute`. :param multiparams: Multiple parameter sets, a list of dictionaries. :param params: Single parameter set, a single dictionary. - :param execution_options: dictionary of per-execution execution - options passed along with the statement, if any. This only applies to - the the SQLAlchemy 2.0 version of :meth:`_engine.Connection.execute` - . To - view all execution options associated with the connection, access the - :meth:`_engine.Connection.get_execution_options` - method to view the fixed - execution options dictionary, then consider elements within this local - dictionary to be unioned into that dictionary. + :param execution_options: dictionary of execution + options passed along with the statement, if any. This is a merge + of all options that will be used, including those of the statement, + the connection, and those passed in to the method itself for + the 2.0 style of execution. .. versionadded: 1.4 diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py index 0ee80ede4..600229037 100644 --- a/lib/sqlalchemy/engine/result.py +++ b/lib/sqlalchemy/engine/result.py @@ -36,8 +36,11 @@ else: return lambda row: (it(row),) def _row_as_tuple(*indexes): + # circumvent LegacyRow.__getitem__ pointing to + # _get_by_key_impl_mapping for now. otherwise we could + # use itemgetter getters = [ - operator.methodcaller("_get_by_key_impl_mapping", index) + operator.methodcaller("_get_by_int_impl", index) for index in indexes ] return lambda rec: tuple([getter(rec) for getter in getters]) @@ -64,10 +67,7 @@ class ResultMetaData(object): def _key_fallback(self, key, err, raiseerr=True): assert raiseerr - if isinstance(key, int): - util.raise_(IndexError(key), replace_context=err) - else: - util.raise_(KeyError(key), replace_context=err) + util.raise_(KeyError(key), replace_context=err) def _warn_for_nonint(self, key): raise TypeError( @@ -94,7 +94,7 @@ class ResultMetaData(object): return None def _row_as_tuple_getter(self, keys): - indexes = list(self._indexes_for_keys(keys)) + indexes = self._indexes_for_keys(keys) return _row_as_tuple(*indexes) @@ -154,19 +154,15 @@ class SimpleResultMetaData(ResultMetaData): self._tuplefilter = _tuplefilter self._translated_indexes = _translated_indexes self._unique_filters = _unique_filters - len_keys = len(self._keys) if extra: recs_names = [ - ( - (index, name, index - len_keys) + extras, - (index, name, extras), - ) + ((name,) + extras, (index, name, extras),) for index, (name, extras) in enumerate(zip(self._keys, extra)) ] else: recs_names = [ - ((index, name, index - len_keys), (index, name, ())) + ((name,), (index, name, ())) for index, name in enumerate(self._keys) ] @@ -212,6 +208,8 @@ class SimpleResultMetaData(ResultMetaData): return value in row._data def _index_for_key(self, key, raiseerr=True): + if int in key.__class__.__mro__: + key = self._keys[key] try: rec = self._keymap[key] except KeyError as ke: @@ -220,11 +218,13 @@ class SimpleResultMetaData(ResultMetaData): return rec[0] def _indexes_for_keys(self, keys): - for rec in self._metadata_for_keys(keys): - yield rec[0] + return [self._keymap[key][0] for key in keys] def _metadata_for_keys(self, keys): for key in keys: + if int in key.__class__.__mro__: + key = self._keys[key] + try: rec = self._keymap[key] except KeyError as ke: @@ -234,7 +234,12 @@ class SimpleResultMetaData(ResultMetaData): def _reduce(self, keys): try: - metadata_for_keys = [self._keymap[key] for key in keys] + metadata_for_keys = [ + self._keymap[ + self._keys[key] if int in key.__class__.__mro__ else key + ] + for key in keys + ] except KeyError as ke: self._key_fallback(ke.args[0], ke, True) @@ -508,12 +513,11 @@ class Result(InPlaceGenerative): @_generative def _column_slices(self, indexes): - self._metadata = self._metadata._reduce(indexes) - if self._source_supports_scalars and len(indexes) == 1: self._generate_rows = False else: self._generate_rows = True + self._metadata = self._metadata._reduce(indexes) def _getter(self, key, raiseerr=True): """return a callable that will retrieve the given key from a @@ -551,10 +555,15 @@ class Result(InPlaceGenerative): :return: this :class:`._engine.Result` object with modifications. """ + + if self._source_supports_scalars: + self._metadata = self._metadata._reduce([0]) + self._post_creational_filter = operator.attrgetter("_mapping") self._no_scalar_onerow = False self._generate_rows = True + @HasMemoized.memoized_attribute def _row_getter(self): if self._source_supports_scalars: if not self._generate_rows: @@ -571,6 +580,7 @@ class Result(InPlaceGenerative): else: process_row = self._process_row + key_style = self._process_row._default_key_style metadata = self._metadata @@ -578,7 +588,7 @@ class Result(InPlaceGenerative): processors = metadata._processors tf = metadata._tuplefilter - if tf: + if tf and not self._source_supports_scalars: if processors: processors = tf(processors) @@ -660,7 +670,7 @@ class Result(InPlaceGenerative): @HasMemoized.memoized_attribute def _iterator_getter(self): - make_row = self._row_getter() + make_row = self._row_getter post_creational_filter = self._post_creational_filter @@ -689,60 +699,44 @@ class Result(InPlaceGenerative): return iterrows - @HasMemoized.memoized_attribute - def _allrow_getter(self): + def _raw_all_rows(self): + make_row = self._row_getter + rows = self._fetchall_impl() + return [make_row(row) for row in rows] - make_row = self._row_getter() + def _allrows(self): + + make_row = self._row_getter + + rows = self._fetchall_impl() + if make_row: + made_rows = [make_row(row) for row in rows] + else: + made_rows = rows post_creational_filter = self._post_creational_filter if self._unique_filter_state: uniques, strategy = self._unique_strategy - def allrows(self): - rows = self._fetchall_impl() - if make_row: - made_rows = [make_row(row) for row in rows] - else: - made_rows = rows - rows = [ - made_row - for made_row, sig_row in [ - ( - made_row, - strategy(made_row) if strategy else made_row, - ) - for made_row in made_rows - ] - if sig_row not in uniques and not uniques.add(sig_row) + rows = [ + made_row + for made_row, sig_row in [ + (made_row, strategy(made_row) if strategy else made_row,) + for made_row in made_rows ] - - if post_creational_filter: - rows = [post_creational_filter(row) for row in rows] - return rows - + if sig_row not in uniques and not uniques.add(sig_row) + ] else: + rows = made_rows - def allrows(self): - rows = self._fetchall_impl() - - if post_creational_filter: - if make_row: - rows = [ - post_creational_filter(make_row(row)) - for row in rows - ] - else: - rows = [post_creational_filter(row) for row in rows] - elif make_row: - rows = [make_row(row) for row in rows] - return rows - - return allrows + if post_creational_filter: + rows = [post_creational_filter(row) for row in rows] + return rows @HasMemoized.memoized_attribute def _onerow_getter(self): - make_row = self._row_getter() + make_row = self._row_getter post_creational_filter = self._post_creational_filter @@ -782,7 +776,7 @@ class Result(InPlaceGenerative): @HasMemoized.memoized_attribute def _manyrow_getter(self): - make_row = self._row_getter() + make_row = self._row_getter post_creational_filter = self._post_creational_filter @@ -884,7 +878,7 @@ class Result(InPlaceGenerative): def fetchall(self): """A synonym for the :meth:`_engine.Result.all` method.""" - return self._allrow_getter(self) + return self._allrows() def fetchone(self): """Fetch one row. @@ -955,7 +949,7 @@ class Result(InPlaceGenerative): may be returned. """ - return self._allrow_getter(self) + return self._allrows() def _only_one_row(self, raise_for_second_row, raise_for_none): onerow = self._fetchone_impl @@ -969,7 +963,7 @@ class Result(InPlaceGenerative): else: return None - make_row = self._row_getter() + make_row = self._row_getter row = make_row(row) if make_row else row @@ -1236,13 +1230,11 @@ class ChunkedIteratorResult(IteratorResult): self.raw = raw self.iterator = itertools.chain.from_iterable(self.chunks(None)) - def _column_slices(self, indexes): - result = super(ChunkedIteratorResult, self)._column_slices(indexes) - return result - @_generative def yield_per(self, num): self._yield_per = num + # TODO: this should raise if the iterator has already been started. + # we can't change the yield mid-stream like this self.iterator = itertools.chain.from_iterable(self.chunks(num)) diff --git a/lib/sqlalchemy/engine/row.py b/lib/sqlalchemy/engine/row.py index 70f45c82c..fe6831e30 100644 --- a/lib/sqlalchemy/engine/row.py +++ b/lib/sqlalchemy/engine/row.py @@ -103,8 +103,10 @@ except ImportError: def __getitem__(self, key): return self._data[key] + _get_by_int_impl = __getitem__ + def _get_by_key_impl(self, key): - if self._key_style == KEY_INTEGER_ONLY: + if int in key.__class__.__mro__: return self._data[key] # the following is all LegacyRow support. none of this @@ -125,11 +127,7 @@ except ImportError: if mdindex is None: self._parent._raise_for_ambiguous_column_name(rec) - elif ( - self._key_style == KEY_OBJECTS_BUT_WARN - and mdindex != key - and not isinstance(key, int) - ): + elif self._key_style == KEY_OBJECTS_BUT_WARN and mdindex != key: self._parent._warn_for_nonint(key) return self._data[mdindex] |
