diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-04-29 23:26:36 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-05-18 17:46:10 -0400 |
| commit | f07e050c9ce4afdeb9c0c136dbcc547f7e5ac7b8 (patch) | |
| tree | 1b3cd7409ae2eddef635960126551d74f469acc1 /lib/sqlalchemy/sql/coercions.py | |
| parent | 614dfb5f5b5a2427d5d6ce0bc5f34bf0581bf698 (diff) | |
| download | sqlalchemy-f07e050c9ce4afdeb9c0c136dbcc547f7e5ac7b8.tar.gz | |
Implement new ClauseElement role and coercion system
A major refactoring of all the functions handle all detection of
Core argument types as well as perform coercions into a new class hierarchy
based on "roles", each of which identify a syntactical location within a
SQL statement. In contrast to the ClauseElement hierarchy that identifies
"what" each object is syntactically, the SQLRole hierarchy identifies
the "where does it go" of each object syntactically. From this we define
a consistent type checking and coercion system that establishes well
defined behviors.
This is a breakout of the patch that is reorganizing select()
constructs to no longer be in the FromClause hierarchy.
Also includes a rename of as_scalar() into scalar_subquery(); deprecates
automatic coercion to scalar_subquery().
Partially-fixes: #4617
Change-Id: I26f1e78898693c6b99ef7ea2f4e7dfd0e8e1a1bd
Diffstat (limited to 'lib/sqlalchemy/sql/coercions.py')
| -rw-r--r-- | lib/sqlalchemy/sql/coercions.py | 580 |
1 files changed, 580 insertions, 0 deletions
diff --git a/lib/sqlalchemy/sql/coercions.py b/lib/sqlalchemy/sql/coercions.py new file mode 100644 index 000000000..7c7222f9f --- /dev/null +++ b/lib/sqlalchemy/sql/coercions.py @@ -0,0 +1,580 @@ +# sql/coercions.py +# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php + +import numbers +import re + +from . import operators +from . import roles +from . import visitors +from .visitors import Visitable +from .. import exc +from .. import inspection +from .. import util +from ..util import collections_abc + +elements = None # type: types.ModuleType +schema = None # type: types.ModuleType +selectable = None # type: types.ModuleType +sqltypes = None # type: types.ModuleType + + +def _is_literal(element): + """Return whether or not the element is a "literal" in the context + of a SQL expression construct. + + """ + return not isinstance( + element, (Visitable, schema.SchemaEventTarget) + ) and not hasattr(element, "__clause_element__") + + +def _document_text_coercion(paramname, meth_rst, param_rst): + return util.add_parameter_text( + paramname, + ( + ".. warning:: " + "The %s argument to %s can be passed as a Python string argument, " + "which will be treated " + "as **trusted SQL text** and rendered as given. **DO NOT PASS " + "UNTRUSTED INPUT TO THIS PARAMETER**." + ) + % (param_rst, meth_rst), + ) + + +def expect(role, element, **kw): + # major case is that we are given a ClauseElement already, skip more + # elaborate logic up front if possible + impl = _impl_lookup[role] + + if not isinstance(element, (elements.ClauseElement, schema.SchemaItem)): + resolved = impl._resolve_for_clause_element(element, **kw) + else: + resolved = element + + if issubclass(resolved.__class__, impl._role_class): + if impl._post_coercion: + resolved = impl._post_coercion(resolved, **kw) + return resolved + else: + return impl._implicit_coercions(element, resolved, **kw) + + +def expect_as_key(role, element, **kw): + kw["as_key"] = True + return expect(role, element, **kw) + + +def expect_col_expression_collection(role, expressions): + for expr in expressions: + strname = None + column = None + + resolved = expect(role, expr) + if isinstance(resolved, util.string_types): + strname = resolved = expr + else: + cols = [] + visitors.traverse(resolved, {}, {"column": cols.append}) + if cols: + column = cols[0] + add_element = column if column is not None else strname + yield resolved, column, strname, add_element + + +class RoleImpl(object): + __slots__ = ("_role_class", "name", "_use_inspection") + + def _literal_coercion(self, element, **kw): + raise NotImplementedError() + + _post_coercion = None + + def __init__(self, role_class): + self._role_class = role_class + self.name = role_class._role_name + self._use_inspection = issubclass(role_class, roles.UsesInspection) + + def _resolve_for_clause_element(self, element, argname=None, **kw): + literal_coercion = self._literal_coercion + original_element = element + is_clause_element = False + + while hasattr(element, "__clause_element__") and not isinstance( + element, (elements.ClauseElement, schema.SchemaItem) + ): + element = element.__clause_element__() + is_clause_element = True + + if not is_clause_element: + if self._use_inspection: + insp = inspection.inspect(element, raiseerr=False) + if insp is not None: + try: + return insp.__clause_element__() + except AttributeError: + self._raise_for_expected(original_element, argname) + + return self._literal_coercion(element, argname=argname, **kw) + else: + return element + + def _implicit_coercions(self, element, resolved, argname=None, **kw): + self._raise_for_expected(element, argname) + + def _raise_for_expected(self, element, argname=None): + if argname: + raise exc.ArgumentError( + "%s expected for argument %r; got %r." + % (self.name, argname, element) + ) + else: + raise exc.ArgumentError( + "%s expected, got %r." % (self.name, element) + ) + + +class _StringOnly(object): + def _resolve_for_clause_element(self, element, argname=None, **kw): + return self._literal_coercion(element, **kw) + + +class _ReturnsStringKey(object): + def _implicit_coercions( + self, original_element, resolved, argname=None, **kw + ): + if isinstance(original_element, util.string_types): + return original_element + else: + self._raise_for_expected(original_element, argname) + + def _literal_coercion(self, element, **kw): + return element + + +class _ColumnCoercions(object): + def _warn_for_scalar_subquery_coercion(self): + util.warn_deprecated( + "coercing SELECT object to scalar subquery in a " + "column-expression context is deprecated in version 1.4; " + "please use the .scalar_subquery() method to produce a scalar " + "subquery. This automatic coercion will be removed in a " + "future release." + ) + + def _implicit_coercions( + self, original_element, resolved, argname=None, **kw + ): + if resolved._is_select_statement: + self._warn_for_scalar_subquery_coercion() + return resolved.scalar_subquery() + elif ( + resolved._is_from_clause + and isinstance(resolved, selectable.Alias) + and resolved.original._is_select_statement + ): + self._warn_for_scalar_subquery_coercion() + return resolved.original.scalar_subquery() + else: + self._raise_for_expected(original_element, argname) + + +def _no_text_coercion( + element, argname=None, exc_cls=exc.ArgumentError, extra=None +): + raise exc_cls( + "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be " + "explicitly declared as text(%(expr)r)" + % { + "expr": util.ellipses_string(element), + "argname": "for argument %s" % (argname,) if argname else "", + "extra": "%s " % extra if extra else "", + } + ) + + +class _NoTextCoercion(object): + def _literal_coercion(self, element, argname=None): + if isinstance(element, util.string_types) and issubclass( + elements.TextClause, self._role_class + ): + _no_text_coercion(element, argname) + else: + self._raise_for_expected(element, argname) + + +class _CoerceLiterals(object): + _coerce_consts = False + _coerce_star = False + _coerce_numerics = False + + def _text_coercion(self, element, argname=None): + return _no_text_coercion(element, argname) + + def _literal_coercion(self, element, argname=None): + if isinstance(element, util.string_types): + if self._coerce_star and element == "*": + return elements.ColumnClause("*", is_literal=True) + else: + return self._text_coercion(element, argname) + + if self._coerce_consts: + if element is None: + return elements.Null() + elif element is False: + return elements.False_() + elif element is True: + return elements.True_() + + if self._coerce_numerics and isinstance(element, (numbers.Number)): + return elements.ColumnClause(str(element), is_literal=True) + + self._raise_for_expected(element, argname) + + +class ExpressionElementImpl( + _ColumnCoercions, RoleImpl, roles.ExpressionElementRole +): + def _literal_coercion(self, element, name=None, type_=None, argname=None): + if element is None: + return elements.Null() + else: + try: + return elements.BindParameter( + name, element, type_, unique=True + ) + except exc.ArgumentError: + self._raise_for_expected(element) + + +class BinaryElementImpl( + ExpressionElementImpl, RoleImpl, roles.BinaryElementRole +): + def _literal_coercion( + self, element, expr, operator, bindparam_type=None, argname=None + ): + try: + return expr._bind_param(operator, element, type_=bindparam_type) + except exc.ArgumentError: + self._raise_for_expected(element) + + def _post_coercion(self, resolved, expr, **kw): + if ( + isinstance(resolved, elements.BindParameter) + and resolved.type._isnull + ): + resolved = resolved._clone() + resolved.type = expr.type + return resolved + + +class InElementImpl(RoleImpl, roles.InElementRole): + def _implicit_coercions( + self, original_element, resolved, argname=None, **kw + ): + if resolved._is_from_clause: + if ( + isinstance(resolved, selectable.Alias) + and resolved.original._is_select_statement + ): + return resolved.original + else: + return resolved.select() + else: + self._raise_for_expected(original_element, argname) + + def _literal_coercion(self, element, expr, operator, **kw): + if isinstance(element, collections_abc.Iterable) and not isinstance( + element, util.string_types + ): + args = [] + for o in element: + if not _is_literal(o): + if not isinstance(o, operators.ColumnOperators): + self._raise_for_expected(element, **kw) + elif o is None: + o = elements.Null() + else: + o = expr._bind_param(operator, o) + args.append(o) + + return elements.ClauseList(*args) + + else: + self._raise_for_expected(element, **kw) + + def _post_coercion(self, element, expr, operator, **kw): + if element._is_select_statement: + return element.scalar_subquery() + elif isinstance(element, elements.ClauseList): + if len(element.clauses) == 0: + op, negate_op = ( + (operators.empty_in_op, operators.empty_notin_op) + if operator is operators.in_op + else (operators.empty_notin_op, operators.empty_in_op) + ) + return element.self_group(against=op)._annotate( + dict(in_ops=(op, negate_op)) + ) + else: + return element.self_group(against=operator) + + elif isinstance(element, elements.BindParameter) and element.expanding: + + if isinstance(expr, elements.Tuple): + element = element._with_expanding_in_types( + [elem.type for elem in expr] + ) + return element + else: + return element + + +class WhereHavingImpl( + _CoerceLiterals, _ColumnCoercions, RoleImpl, roles.WhereHavingRole +): + + _coerce_consts = True + + def _text_coercion(self, element, argname=None): + return _no_text_coercion(element, argname) + + +class StatementOptionImpl( + _CoerceLiterals, RoleImpl, roles.StatementOptionRole +): + + _coerce_consts = True + + def _text_coercion(self, element, argname=None): + return elements.TextClause(element) + + +class ColumnArgumentImpl(_NoTextCoercion, RoleImpl, roles.ColumnArgumentRole): + pass + + +class ColumnArgumentOrKeyImpl( + _ReturnsStringKey, RoleImpl, roles.ColumnArgumentOrKeyRole +): + pass + + +class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole): + + _coerce_consts = True + + def _text_coercion(self, element, argname=None): + return elements._textual_label_reference(element) + + +class OrderByImpl(ByOfImpl, RoleImpl, roles.OrderByRole): + def _post_coercion(self, resolved): + if ( + isinstance(resolved, self._role_class) + and resolved._order_by_label_element is not None + ): + return elements._label_reference(resolved) + else: + return resolved + + +class DMLColumnImpl(_ReturnsStringKey, RoleImpl, roles.DMLColumnRole): + def _post_coercion(self, element, as_key=False): + if as_key: + return element.key + else: + return element + + +class ConstExprImpl(RoleImpl, roles.ConstExprRole): + def _literal_coercion(self, element, argname=None): + if element is None: + return elements.Null() + elif element is False: + return elements.False_() + elif element is True: + return elements.True_() + else: + self._raise_for_expected(element, argname) + + +class TruncatedLabelImpl(_StringOnly, RoleImpl, roles.TruncatedLabelRole): + def _implicit_coercions( + self, original_element, resolved, argname=None, **kw + ): + if isinstance(original_element, util.string_types): + return resolved + else: + self._raise_for_expected(original_element, argname) + + def _literal_coercion(self, element, argname=None): + """coerce the given value to :class:`._truncated_label`. + + Existing :class:`._truncated_label` and + :class:`._anonymous_label` objects are passed + unchanged. + """ + + if isinstance(element, elements._truncated_label): + return element + else: + return elements._truncated_label(element) + + +class DDLExpressionImpl(_CoerceLiterals, RoleImpl, roles.DDLExpressionRole): + + _coerce_consts = True + + def _text_coercion(self, element, argname=None): + return elements.TextClause(element) + + +class DDLConstraintColumnImpl( + _ReturnsStringKey, RoleImpl, roles.DDLConstraintColumnRole +): + pass + + +class LimitOffsetImpl(RoleImpl, roles.LimitOffsetRole): + def _implicit_coercions(self, element, resolved, argname=None, **kw): + if resolved is None: + return None + else: + self._raise_for_expected(element, argname) + + def _literal_coercion(self, element, name, type_, **kw): + if element is None: + return None + else: + value = util.asint(element) + return selectable._OffsetLimitParam( + name, value, type_=type_, unique=True + ) + + +class LabeledColumnExprImpl( + ExpressionElementImpl, roles.LabeledColumnExprRole +): + def _implicit_coercions( + self, original_element, resolved, argname=None, **kw + ): + if isinstance(resolved, roles.ExpressionElementRole): + return resolved.label(None) + else: + new = super(LabeledColumnExprImpl, self)._implicit_coercions( + original_element, resolved, argname=argname, **kw + ) + if isinstance(new, roles.ExpressionElementRole): + return new.label(None) + else: + self._raise_for_expected(original_element, argname) + + +class ColumnsClauseImpl(_CoerceLiterals, RoleImpl, roles.ColumnsClauseRole): + + _coerce_consts = True + _coerce_numerics = True + _coerce_star = True + + _guess_straight_column = re.compile(r"^\w\S*$", re.I) + + def _text_coercion(self, element, argname=None): + element = str(element) + + guess_is_literal = not self._guess_straight_column.match(element) + raise exc.ArgumentError( + "Textual column expression %(column)r %(argname)sshould be " + "explicitly declared with text(%(column)r), " + "or use %(literal_column)s(%(column)r) " + "for more specificity" + % { + "column": util.ellipses_string(element), + "argname": "for argument %s" % (argname,) if argname else "", + "literal_column": "literal_column" + if guess_is_literal + else "column", + } + ) + + +class ReturnsRowsImpl(RoleImpl, roles.ReturnsRowsRole): + pass + + +class StatementImpl(_NoTextCoercion, RoleImpl, roles.StatementRole): + pass + + +class CoerceTextStatementImpl(_CoerceLiterals, RoleImpl, roles.StatementRole): + def _text_coercion(self, element, argname=None): + return elements.TextClause(element) + + +class SelectStatementImpl( + _NoTextCoercion, RoleImpl, roles.SelectStatementRole +): + def _implicit_coercions( + self, original_element, resolved, argname=None, **kw + ): + if resolved._is_text_clause: + return resolved.columns() + else: + self._raise_for_expected(original_element, argname) + + +class HasCTEImpl(ReturnsRowsImpl, roles.HasCTERole): + pass + + +class FromClauseImpl(_NoTextCoercion, RoleImpl, roles.FromClauseRole): + def _implicit_coercions( + self, original_element, resolved, argname=None, **kw + ): + if resolved._is_text_clause: + return resolved + else: + self._raise_for_expected(original_element, argname) + + +class DMLSelectImpl(_NoTextCoercion, RoleImpl, roles.DMLSelectRole): + def _implicit_coercions( + self, original_element, resolved, argname=None, **kw + ): + if resolved._is_from_clause: + if ( + isinstance(resolved, selectable.Alias) + and resolved.original._is_select_statement + ): + return resolved.original + else: + return resolved.select() + else: + self._raise_for_expected(original_element, argname) + + +class CompoundElementImpl( + _NoTextCoercion, RoleImpl, roles.CompoundElementRole +): + def _implicit_coercions(self, original_element, resolved, argname=None): + if resolved._is_from_clause: + return resolved + else: + self._raise_for_expected(original_element, argname) + + +_impl_lookup = {} + + +for name in dir(roles): + cls = getattr(roles, name) + if name.endswith("Role"): + name = name.replace("Role", "Impl") + if name in globals(): + impl = globals()[name](cls) + _impl_lookup[cls] = impl |
