diff options
Diffstat (limited to 'lib/sqlalchemy')
| -rw-r--r-- | lib/sqlalchemy/ext/mypy/apply.py | 215 | ||||
| -rw-r--r-- | lib/sqlalchemy/ext/mypy/decl_class.py | 771 | ||||
| -rw-r--r-- | lib/sqlalchemy/ext/mypy/infer.py | 398 | ||||
| -rw-r--r-- | lib/sqlalchemy/ext/mypy/names.py | 8 | ||||
| -rw-r--r-- | lib/sqlalchemy/ext/mypy/plugin.py | 31 | ||||
| -rw-r--r-- | lib/sqlalchemy/ext/mypy/util.py | 62 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/__init__.py | 1 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/decl_api.py | 42 |
8 files changed, 874 insertions, 654 deletions
diff --git a/lib/sqlalchemy/ext/mypy/apply.py b/lib/sqlalchemy/ext/mypy/apply.py new file mode 100644 index 000000000..6442cbc22 --- /dev/null +++ b/lib/sqlalchemy/ext/mypy/apply.py @@ -0,0 +1,215 @@ +# ext/mypy/apply.py +# Copyright (C) 2021 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php + +from typing import Optional +from typing import Union + +from mypy import nodes +from mypy.nodes import ARG_NAMED_OPT +from mypy.nodes import Argument +from mypy.nodes import AssignmentStmt +from mypy.nodes import CallExpr +from mypy.nodes import ClassDef +from mypy.nodes import MDEF +from mypy.nodes import NameExpr +from mypy.nodes import StrExpr +from mypy.nodes import SymbolTableNode +from mypy.nodes import TempNode +from mypy.nodes import TypeInfo +from mypy.nodes import Var +from mypy.plugin import SemanticAnalyzerPluginInterface +from mypy.plugins.common import add_method_to_class +from mypy.types import AnyType +from mypy.types import Instance +from mypy.types import NoneTyp +from mypy.types import TypeOfAny +from mypy.types import UnionType + +from . import util + + +def _apply_mypy_mapped_attr( + cls: ClassDef, + api: SemanticAnalyzerPluginInterface, + item: Union[NameExpr, StrExpr], + cls_metadata: util.DeclClassApplied, +): + if isinstance(item, NameExpr): + name = item.name + elif isinstance(item, StrExpr): + name = item.value + else: + return + + for stmt in cls.defs.body: + if isinstance(stmt, AssignmentStmt) and stmt.lvalues[0].name == name: + break + else: + util.fail(api, "Can't find mapped attribute {}".format(name), cls) + return + + if stmt.type is None: + util.fail( + api, + "Statement linked from _mypy_mapped_attrs has no " + "typing information", + stmt, + ) + return + + left_hand_explicit_type = stmt.type + + cls_metadata.mapped_attr_names.append((name, left_hand_explicit_type)) + + _apply_type_to_mapped_statement( + api, stmt, stmt.lvalues[0], left_hand_explicit_type, None + ) + + +def _re_apply_declarative_assignments( + cls: ClassDef, + api: SemanticAnalyzerPluginInterface, + cls_metadata: util.DeclClassApplied, +): + """For multiple class passes, re-apply our left-hand side types as mypy + seems to reset them in place. + + """ + mapped_attr_lookup = { + name: typ for name, typ in cls_metadata.mapped_attr_names + } + + descriptor = api.lookup("__sa_Mapped", cls) + for stmt in cls.defs.body: + # for a re-apply, all of our statements are AssignmentStmt; + # @declared_attr calls will have been converted and this + # currently seems to be preserved by mypy (but who knows if this + # will change). + if ( + isinstance(stmt, AssignmentStmt) + and stmt.lvalues[0].name in mapped_attr_lookup + ): + typ = mapped_attr_lookup[stmt.lvalues[0].name] + left_node = stmt.lvalues[0].node + + inst = Instance(descriptor.node, [typ]) + left_node.type = inst + + +def _apply_type_to_mapped_statement( + api: SemanticAnalyzerPluginInterface, + stmt: AssignmentStmt, + lvalue: NameExpr, + left_hand_explicit_type: Optional[Union[Instance, UnionType]], + python_type_for_type: Union[Instance, UnionType], +) -> None: + """Apply the Mapped[<type>] annotation and right hand object to a + declarative assignment statement. + + This converts a Python declarative class statement such as:: + + class User(Base): + # ... + + attrname = Column(Integer) + + To one that describes the final Python behavior to Mypy:: + + class User(Base): + # ... + + attrname : Mapped[Optional[int]] = <meaningless temp node> + + """ + descriptor = api.lookup("__sa_Mapped", stmt) + left_node = lvalue.node + + inst = Instance(descriptor.node, [python_type_for_type]) + + if left_hand_explicit_type is not None: + left_node.type = Instance(descriptor.node, [left_hand_explicit_type]) + else: + lvalue.is_inferred_def = False + left_node.type = inst + + # so to have it skip the right side totally, we can do this: + # stmt.rvalue = TempNode(AnyType(TypeOfAny.special_form)) + + # however, if we instead manufacture a new node that uses the old + # one, then we can still get type checking for the call itself, + # e.g. the Column, relationship() call, etc. + + # rewrite the node as: + # <attr> : Mapped[<typ>] = + # _sa_Mapped._empty_constructor(<original CallExpr from rvalue>) + # the original right-hand side is maintained so it gets type checked + # internally + column_descriptor = nodes.NameExpr("__sa_Mapped") + column_descriptor.fullname = "sqlalchemy.orm.Mapped" + mm = nodes.MemberExpr(column_descriptor, "_empty_constructor") + orig_call_expr = stmt.rvalue + stmt.rvalue = CallExpr(mm, [orig_call_expr], [nodes.ARG_POS], ["arg1"]) + + +def _add_additional_orm_attributes( + cls: ClassDef, + api: SemanticAnalyzerPluginInterface, + cls_metadata: util.DeclClassApplied, +) -> None: + """Apply __init__, __table__ and other attributes to the mapped class.""" + + info = util._info_for_cls(cls, api) + if "__init__" not in info.names and cls_metadata.is_mapped: + mapped_attr_names = {n: t for n, t in cls_metadata.mapped_attr_names} + + for mapped_base in cls_metadata.mapped_mro: + base_cls_metadata = util.DeclClassApplied.deserialize( + mapped_base.type.metadata["_sa_decl_class_applied"], api + ) + for n, t in base_cls_metadata.mapped_attr_names: + mapped_attr_names.setdefault(n, t) + + arguments = [] + for name, typ in mapped_attr_names.items(): + if typ is None: + typ = AnyType(TypeOfAny.special_form) + arguments.append( + Argument( + variable=Var(name, typ), + type_annotation=typ, + initializer=TempNode(typ), + kind=ARG_NAMED_OPT, + ) + ) + add_method_to_class(api, cls, "__init__", arguments, NoneTyp()) + + if "__table__" not in info.names and cls_metadata.has_table: + _apply_placeholder_attr_to_class( + api, cls, "sqlalchemy.sql.schema.Table", "__table__" + ) + if cls_metadata.is_mapped: + _apply_placeholder_attr_to_class( + api, cls, "sqlalchemy.orm.mapper.Mapper", "__mapper__" + ) + + +def _apply_placeholder_attr_to_class( + api: SemanticAnalyzerPluginInterface, + cls: ClassDef, + qualified_name: str, + attrname: str, +): + sym = api.lookup_fully_qualified_or_none(qualified_name) + if sym: + assert isinstance(sym.node, TypeInfo) + type_ = Instance(sym.node, []) + else: + type_ = AnyType(TypeOfAny.special_form) + var = Var(attrname) + var.info = cls.info + var.type = type_ + cls.info.names[attrname] = SymbolTableNode(MDEF, var) diff --git a/lib/sqlalchemy/ext/mypy/decl_class.py b/lib/sqlalchemy/ext/mypy/decl_class.py index 46f3cc30e..a0e272f71 100644 --- a/lib/sqlalchemy/ext/mypy/decl_class.py +++ b/lib/sqlalchemy/ext/mypy/decl_class.py @@ -6,23 +6,14 @@ # the MIT License: http://www.opensource.org/licenses/mit-license.php from typing import Optional -from typing import Sequence -from typing import Tuple from typing import Type -from typing import Union from mypy import nodes -from mypy import types -from mypy.messages import format_type -from mypy.nodes import ARG_NAMED_OPT -from mypy.nodes import Argument from mypy.nodes import AssignmentStmt from mypy.nodes import CallExpr from mypy.nodes import ClassDef from mypy.nodes import Decorator -from mypy.nodes import JsonDict from mypy.nodes import ListExpr -from mypy.nodes import MDEF from mypy.nodes import NameExpr from mypy.nodes import PlaceholderNode from mypy.nodes import RefExpr @@ -32,73 +23,30 @@ from mypy.nodes import TempNode from mypy.nodes import TypeInfo from mypy.nodes import Var from mypy.plugin import SemanticAnalyzerPluginInterface -from mypy.plugins.common import add_method_to_class -from mypy.plugins.common import deserialize_and_fixup_type -from mypy.subtypes import is_subtype from mypy.types import AnyType from mypy.types import Instance -from mypy.types import NoneTyp from mypy.types import NoneType from mypy.types import TypeOfAny from mypy.types import UnboundType from mypy.types import UnionType +from . import apply +from . import infer from . import names from . import util -class DeclClassApplied: - def __init__( - self, - is_mapped: bool, - has_table: bool, - mapped_attr_names: Sequence[Tuple[str, Type]], - mapped_mro: Sequence[Type], - ): - self.is_mapped = is_mapped - self.has_table = has_table - self.mapped_attr_names = mapped_attr_names - self.mapped_mro = mapped_mro - - def serialize(self) -> JsonDict: - return { - "is_mapped": self.is_mapped, - "has_table": self.has_table, - "mapped_attr_names": [ - (name, type_.serialize()) - for name, type_ in self.mapped_attr_names - ], - "mapped_mro": [type_.serialize() for type_ in self.mapped_mro], - } - - @classmethod - def deserialize( - cls, data: JsonDict, api: SemanticAnalyzerPluginInterface - ) -> "DeclClassApplied": - - return DeclClassApplied( - is_mapped=data["is_mapped"], - has_table=data["has_table"], - mapped_attr_names=[ - (name, deserialize_and_fixup_type(type_, api)) - for name, type_ in data["mapped_attr_names"] - ], - mapped_mro=[ - deserialize_and_fixup_type(type_, api) - for type_ in data["mapped_mro"] - ], - ) - - def _scan_declarative_assignments_and_apply_types( cls: ClassDef, api: SemanticAnalyzerPluginInterface, is_mixin_scan=False -) -> Optional[DeclClassApplied]: +) -> Optional[util.DeclClassApplied]: + + info = util._info_for_cls(cls, api) if cls.fullname.startswith("builtins"): return None - elif "_sa_decl_class_applied" in cls.info.metadata: - cls_metadata = DeclClassApplied.deserialize( - cls.info.metadata["_sa_decl_class_applied"], api + elif "_sa_decl_class_applied" in info.metadata: + cls_metadata = util.DeclClassApplied.deserialize( + info.metadata["_sa_decl_class_applied"], api ) # ensure that a class that's mapped is always picked up by @@ -112,30 +60,117 @@ def _scan_declarative_assignments_and_apply_types( # removing our ability to re-scan. but we have the types # here, so lets re-apply them. - _re_apply_declarative_assignments(cls, api, cls_metadata) + apply._re_apply_declarative_assignments(cls, api, cls_metadata) return cls_metadata - cls_metadata = DeclClassApplied(not is_mixin_scan, False, [], []) + cls_metadata = util.DeclClassApplied(not is_mixin_scan, False, [], []) + + if not cls.defs.body: + # when we get a mixin class from another file, the body is + # empty (!) but the names are in the symbol table. so use that. - for stmt in util._flatten_typechecking(cls.defs.body): - if isinstance(stmt, AssignmentStmt): - _scan_declarative_assignment_stmt(cls, api, stmt, cls_metadata) - elif isinstance(stmt, Decorator): - _scan_declarative_decorator_stmt(cls, api, stmt, cls_metadata) + for sym_name, sym in info.names.items(): + _scan_symbol_table_entry(cls, api, sym_name, sym, cls_metadata) + else: + for stmt in util._flatten_typechecking(cls.defs.body): + if isinstance(stmt, AssignmentStmt): + _scan_declarative_assignment_stmt(cls, api, stmt, cls_metadata) + elif isinstance(stmt, Decorator): + _scan_declarative_decorator_stmt(cls, api, stmt, cls_metadata) _scan_for_mapped_bases(cls, api, cls_metadata) - _add_additional_orm_attributes(cls, api, cls_metadata) - cls.info.metadata["_sa_decl_class_applied"] = cls_metadata.serialize() + if not is_mixin_scan: + apply._add_additional_orm_attributes(cls, api, cls_metadata) + + info.metadata["_sa_decl_class_applied"] = cls_metadata.serialize() return cls_metadata +def _scan_symbol_table_entry( + cls: ClassDef, + api: SemanticAnalyzerPluginInterface, + name: str, + value: SymbolTableNode, + cls_metadata: util.DeclClassApplied, +): + """Extract mapping information from a SymbolTableNode that's in the + type.names dictionary. + + """ + if not isinstance(value.type, Instance): + return + + left_hand_explicit_type = None + type_id = names._type_id_for_named_node(value.type.type) + # type_id = names._type_id_for_unbound_type(value.type.type, cls, api) + + err = False + + # TODO: this is nearly the same logic as that of + # _scan_declarative_decorator_stmt, likely can be merged + if type_id in { + names.MAPPED, + names.RELATIONSHIP, + names.COMPOSITE_PROPERTY, + names.MAPPER_PROPERTY, + names.SYNONYM_PROPERTY, + names.COLUMN_PROPERTY, + }: + if value.type.args: + left_hand_explicit_type = value.type.args[0] + else: + err = True + elif type_id is names.COLUMN: + if not value.type.args: + err = True + else: + typeengine_arg = value.type.args[0] + if isinstance(typeengine_arg, Instance): + typeengine_arg = typeengine_arg.type + + if isinstance(typeengine_arg, (UnboundType, TypeInfo)): + sym = api.lookup(typeengine_arg.name, typeengine_arg) + if sym is not None: + if names._mro_has_id(sym.node.mro, names.TYPEENGINE): + + left_hand_explicit_type = UnionType( + [ + infer._extract_python_type_from_typeengine( + api, sym.node, [] + ), + NoneType(), + ] + ) + else: + util.fail( + api, + "Column type should be a TypeEngine " + "subclass not '{}'".format(sym.node.fullname), + value.type, + ) + + if err: + msg = ( + "Can't infer type from attribute {} on class {}. " + "please specify a return type from this function that is " + "one of: Mapped[<python type>], relationship[<target class>], " + "Column[<TypeEngine>], MapperProperty[<python type>]" + ) + util.fail(api, msg.format(name, cls.name)) + + left_hand_explicit_type = AnyType(TypeOfAny.special_form) + + if left_hand_explicit_type is not None: + cls_metadata.mapped_attr_names.append((name, left_hand_explicit_type)) + + def _scan_declarative_decorator_stmt( cls: ClassDef, api: SemanticAnalyzerPluginInterface, stmt: Decorator, - cls_metadata: DeclClassApplied, + cls_metadata: util.DeclClassApplied, ): """Extract mapping information from a @declared_attr in a declarative class. @@ -201,7 +236,7 @@ def _scan_declarative_decorator_stmt( left_hand_explicit_type = UnionType( [ - _extract_python_type_from_typeengine( + infer._extract_python_type_from_typeengine( api, sym.node, [] ), NoneType(), @@ -279,7 +314,7 @@ def _scan_declarative_assignment_stmt( cls: ClassDef, api: SemanticAnalyzerPluginInterface, stmt: AssignmentStmt, - cls_metadata: DeclClassApplied, + cls_metadata: util.DeclClassApplied, ): """Extract mapping information from an assignment statement in a declarative class. @@ -317,7 +352,7 @@ def _scan_declarative_assignment_stmt( else: for item in stmt.rvalue.items: if isinstance(item, (NameExpr, StrExpr)): - _apply_mypy_mapped_attr(cls, api, item, cls_metadata) + apply._apply_mypy_mapped_attr(cls, api, item, cls_metadata) left_hand_mapped_type: Type = None @@ -378,24 +413,26 @@ def _scan_declarative_assignment_stmt( if type_id is None: return elif type_id is names.COLUMN: - python_type_for_type = _infer_type_from_decl_column( + python_type_for_type = infer._infer_type_from_decl_column( api, stmt, node, left_hand_explicit_type, stmt.rvalue ) elif type_id is names.RELATIONSHIP: - python_type_for_type = _infer_type_from_relationship( + python_type_for_type = infer._infer_type_from_relationship( api, stmt, node, left_hand_explicit_type ) elif type_id is names.COLUMN_PROPERTY: - python_type_for_type = _infer_type_from_decl_column_property( + python_type_for_type = infer._infer_type_from_decl_column_property( api, stmt, node, left_hand_explicit_type ) elif type_id is names.SYNONYM_PROPERTY: - python_type_for_type = _infer_type_from_left_hand_type_only( + python_type_for_type = infer._infer_type_from_left_hand_type_only( api, node, left_hand_explicit_type ) elif type_id is names.COMPOSITE_PROPERTY: - python_type_for_type = _infer_type_from_decl_composite_property( - api, stmt, node, left_hand_explicit_type + python_type_for_type = ( + infer._infer_type_from_decl_composite_property( + api, stmt, node, left_hand_explicit_type + ) ) else: return @@ -407,7 +444,7 @@ def _scan_declarative_assignment_stmt( assert python_type_for_type is not None - _apply_type_to_mapped_statement( + apply._apply_type_to_mapped_statement( api, stmt, lvalue, @@ -416,486 +453,10 @@ def _scan_declarative_assignment_stmt( ) -def _apply_mypy_mapped_attr( - cls: ClassDef, - api: SemanticAnalyzerPluginInterface, - item: Union[NameExpr, StrExpr], - cls_metadata: DeclClassApplied, -): - if isinstance(item, NameExpr): - name = item.name - elif isinstance(item, StrExpr): - name = item.value - else: - return - - for stmt in cls.defs.body: - if isinstance(stmt, AssignmentStmt) and stmt.lvalues[0].name == name: - break - else: - util.fail(api, "Can't find mapped attribute {}".format(name), cls) - return - - if stmt.type is None: - util.fail( - api, - "Statement linked from _mypy_mapped_attrs has no " - "typing information", - stmt, - ) - return - - left_hand_explicit_type = stmt.type - - cls_metadata.mapped_attr_names.append((name, left_hand_explicit_type)) - - _apply_type_to_mapped_statement( - api, stmt, stmt.lvalues[0], left_hand_explicit_type, None - ) - - -def _infer_type_from_relationship( - api: SemanticAnalyzerPluginInterface, - stmt: AssignmentStmt, - node: Var, - left_hand_explicit_type: Optional[types.Type], -) -> Union[Instance, UnionType, None]: - """Infer the type of mapping from a relationship. - - E.g.:: - - @reg.mapped - class MyClass: - # ... - - addresses = relationship(Address, uselist=True) - - order: Mapped["Order"] = relationship("Order") - - Will resolve in mypy as:: - - @reg.mapped - class MyClass: - # ... - - addresses: Mapped[List[Address]] - - order: Mapped["Order"] - - """ - - assert isinstance(stmt.rvalue, CallExpr) - target_cls_arg = stmt.rvalue.args[0] - python_type_for_type = None - - if isinstance(target_cls_arg, NameExpr) and isinstance( - target_cls_arg.node, TypeInfo - ): - # type - related_object_type = target_cls_arg.node - python_type_for_type = Instance(related_object_type, []) - - # other cases not covered - an error message directs the user - # to set an explicit type annotation - # - # node.type == str, it's a string - # if isinstance(target_cls_arg, NameExpr) and isinstance( - # target_cls_arg.node, Var - # ) - # points to a type - # isinstance(target_cls_arg, NameExpr) and isinstance( - # target_cls_arg.node, TypeAlias - # ) - # string expression - # isinstance(target_cls_arg, StrExpr) - - uselist_arg = util._get_callexpr_kwarg(stmt.rvalue, "uselist") - collection_cls_arg = util._get_callexpr_kwarg( - stmt.rvalue, "collection_class" - ) - - # this can be used to determine Optional for a many-to-one - # in the same way nullable=False could be used, if we start supporting - # that. - # innerjoin_arg = _get_callexpr_kwarg(stmt.rvalue, "innerjoin") - - if ( - uselist_arg is not None - and uselist_arg.fullname == "builtins.True" - and collection_cls_arg is None - ): - if python_type_for_type is not None: - python_type_for_type = Instance( - api.lookup_fully_qualified("builtins.list").node, - [python_type_for_type], - ) - elif ( - uselist_arg is None or uselist_arg.fullname == "builtins.True" - ) and collection_cls_arg is not None: - if isinstance(collection_cls_arg.node, TypeInfo): - if python_type_for_type is not None: - python_type_for_type = Instance( - collection_cls_arg.node, [python_type_for_type] - ) - else: - util.fail( - api, - "Expected Python collection type for " - "collection_class parameter", - stmt.rvalue, - ) - python_type_for_type = None - elif uselist_arg is not None and uselist_arg.fullname == "builtins.False": - if collection_cls_arg is not None: - util.fail( - api, - "Sending uselist=False and collection_class at the same time " - "does not make sense", - stmt.rvalue, - ) - if python_type_for_type is not None: - python_type_for_type = UnionType( - [python_type_for_type, NoneType()] - ) - - else: - if left_hand_explicit_type is None: - msg = ( - "Can't infer scalar or collection for ORM mapped expression " - "assigned to attribute '{}' if both 'uselist' and " - "'collection_class' arguments are absent from the " - "relationship(); please specify a " - "type annotation on the left hand side." - ) - util.fail(api, msg.format(node.name), node) - - if python_type_for_type is None: - return _infer_type_from_left_hand_type_only( - api, node, left_hand_explicit_type - ) - elif left_hand_explicit_type is not None: - return _infer_type_from_left_and_inferred_right( - api, node, left_hand_explicit_type, python_type_for_type - ) - else: - return python_type_for_type - - -def _infer_type_from_decl_composite_property( - api: SemanticAnalyzerPluginInterface, - stmt: AssignmentStmt, - node: Var, - left_hand_explicit_type: Optional[types.Type], -) -> Union[Instance, UnionType, None]: - """Infer the type of mapping from a CompositeProperty.""" - - assert isinstance(stmt.rvalue, CallExpr) - target_cls_arg = stmt.rvalue.args[0] - python_type_for_type = None - - if isinstance(target_cls_arg, NameExpr) and isinstance( - target_cls_arg.node, TypeInfo - ): - related_object_type = target_cls_arg.node - python_type_for_type = Instance(related_object_type, []) - else: - python_type_for_type = None - - if python_type_for_type is None: - return _infer_type_from_left_hand_type_only( - api, node, left_hand_explicit_type - ) - elif left_hand_explicit_type is not None: - return _infer_type_from_left_and_inferred_right( - api, node, left_hand_explicit_type, python_type_for_type - ) - else: - return python_type_for_type - - -def _infer_type_from_decl_column_property( - api: SemanticAnalyzerPluginInterface, - stmt: AssignmentStmt, - node: Var, - left_hand_explicit_type: Optional[types.Type], -) -> Union[Instance, UnionType, None]: - """Infer the type of mapping from a ColumnProperty. - - This includes mappings against ``column_property()`` as well as the - ``deferred()`` function. - - """ - assert isinstance(stmt.rvalue, CallExpr) - first_prop_arg = stmt.rvalue.args[0] - - if isinstance(first_prop_arg, CallExpr): - type_id = names._type_id_for_callee(first_prop_arg.callee) - else: - type_id = None - - print(stmt.lvalues[0].name) - - # look for column_property() / deferred() etc with Column as first - # argument - if type_id is names.COLUMN: - return _infer_type_from_decl_column( - api, stmt, node, left_hand_explicit_type, first_prop_arg - ) - else: - return _infer_type_from_left_hand_type_only( - api, node, left_hand_explicit_type - ) - - -def _infer_type_from_decl_column( - api: SemanticAnalyzerPluginInterface, - stmt: AssignmentStmt, - node: Var, - left_hand_explicit_type: Optional[types.Type], - right_hand_expression: CallExpr, -) -> Union[Instance, UnionType, None]: - """Infer the type of mapping from a Column. - - E.g.:: - - @reg.mapped - class MyClass: - # ... - - a = Column(Integer) - - b = Column("b", String) - - c: Mapped[int] = Column(Integer) - - d: bool = Column(Boolean) - - Will resolve in MyPy as:: - - @reg.mapped - class MyClass: - # ... - - a : Mapped[int] - - b : Mapped[str] - - c: Mapped[int] - - d: Mapped[bool] - - """ - assert isinstance(node, Var) - - callee = None - - for column_arg in right_hand_expression.args[0:2]: - if isinstance(column_arg, nodes.CallExpr): - # x = Column(String(50)) - callee = column_arg.callee - type_args = column_arg.args - break - elif isinstance(column_arg, (nodes.NameExpr, nodes.MemberExpr)): - if isinstance(column_arg.node, TypeInfo): - # x = Column(String) - callee = column_arg - type_args = () - break - else: - # x = Column(some_name, String), go to next argument - continue - elif isinstance(column_arg, (StrExpr,)): - # x = Column("name", String), go to next argument - continue - else: - assert False - - if callee is None: - return None - - if isinstance(callee.node, TypeInfo) and names._mro_has_id( - callee.node.mro, names.TYPEENGINE - ): - python_type_for_type = _extract_python_type_from_typeengine( - api, callee.node, type_args - ) - - if left_hand_explicit_type is not None: - - return _infer_type_from_left_and_inferred_right( - api, node, left_hand_explicit_type, python_type_for_type - ) - - else: - python_type_for_type = UnionType( - [python_type_for_type, NoneType()] - ) - return python_type_for_type - else: - # it's not TypeEngine, it's typically implicitly typed - # like ForeignKey. we can't infer from the right side. - return _infer_type_from_left_hand_type_only( - api, node, left_hand_explicit_type - ) - - -def _infer_type_from_left_and_inferred_right( - api: SemanticAnalyzerPluginInterface, - node: Var, - left_hand_explicit_type: Optional[types.Type], - python_type_for_type: Union[Instance, UnionType], -) -> Optional[Union[Instance, UnionType]]: - """Validate type when a left hand annotation is present and we also - could infer the right hand side:: - - attrname: SomeType = Column(SomeDBType) - - """ - if not is_subtype(left_hand_explicit_type, python_type_for_type): - descriptor = api.lookup("__sa_Mapped", node) - - effective_type = Instance(descriptor.node, [python_type_for_type]) - - msg = ( - "Left hand assignment '{}: {}' not compatible " - "with ORM mapped expression of type {}" - ) - util.fail( - api, - msg.format( - node.name, - format_type(left_hand_explicit_type), - format_type(effective_type), - ), - node, - ) - - return left_hand_explicit_type - - -def _infer_type_from_left_hand_type_only( - api: SemanticAnalyzerPluginInterface, - node: Var, - left_hand_explicit_type: Optional[types.Type], -) -> Optional[Union[Instance, UnionType]]: - """Determine the type based on explicit annotation only. - - if no annotation were present, note that we need one there to know - the type. - - """ - if left_hand_explicit_type is None: - msg = ( - "Can't infer type from ORM mapped expression " - "assigned to attribute '{}'; please specify a " - "Python type or " - "Mapped[<python type>] on the left hand side." - ) - util.fail(api, msg.format(node.name), node) - - descriptor = api.lookup("__sa_Mapped", node) - return Instance(descriptor.node, [AnyType(TypeOfAny.special_form)]) - - else: - # use type from the left hand side - return left_hand_explicit_type - - -def _re_apply_declarative_assignments( - cls: ClassDef, - api: SemanticAnalyzerPluginInterface, - cls_metadata: DeclClassApplied, -): - """For multiple class passes, re-apply our left-hand side types as mypy - seems to reset them in place. - - """ - mapped_attr_lookup = { - name: typ for name, typ in cls_metadata.mapped_attr_names - } - - descriptor = api.lookup("__sa_Mapped", cls) - for stmt in cls.defs.body: - # for a re-apply, all of our statements are AssignmentStmt; - # @declared_attr calls will have been converted and this - # currently seems to be preserved by mypy (but who knows if this - # will change). - if ( - isinstance(stmt, AssignmentStmt) - and stmt.lvalues[0].name in mapped_attr_lookup - ): - typ = mapped_attr_lookup[stmt.lvalues[0].name] - left_node = stmt.lvalues[0].node - - inst = Instance(descriptor.node, [typ]) - left_node.type = inst - - -def _apply_type_to_mapped_statement( - api: SemanticAnalyzerPluginInterface, - stmt: AssignmentStmt, - lvalue: NameExpr, - left_hand_explicit_type: Optional[Union[Instance, UnionType]], - python_type_for_type: Union[Instance, UnionType], -) -> None: - """Apply the Mapped[<type>] annotation and right hand object to a - declarative assignment statement. - - This converts a Python declarative class statement such as:: - - class User(Base): - # ... - - attrname = Column(Integer) - - To one that describes the final Python behavior to Mypy:: - - class User(Base): - # ... - - attrname : Mapped[Optional[int]] = <meaningless temp node> - - """ - descriptor = api.lookup("__sa_Mapped", stmt) - left_node = lvalue.node - - inst = Instance(descriptor.node, [python_type_for_type]) - - if left_hand_explicit_type is not None: - left_node.type = Instance(descriptor.node, [left_hand_explicit_type]) - else: - lvalue.is_inferred_def = False - left_node.type = inst - - # so to have it skip the right side totally, we can do this: - # stmt.rvalue = TempNode(AnyType(TypeOfAny.special_form)) - - # however, if we instead manufacture a new node that uses the old - # one, then we can still get type checking for the call itself, - # e.g. the Column, relationship() call, etc. - - # rewrite the node as: - # <attr> : Mapped[<typ>] = - # _sa_Mapped._empty_constructor(<original CallExpr from rvalue>) - # the original right-hand side is maintained so it gets type checked - # internally - api.add_symbol_table_node("_sa_Mapped", descriptor) - column_descriptor = nodes.NameExpr("_sa_Mapped") - column_descriptor.fullname = "sqlalchemy.orm.Mapped" - mm = nodes.MemberExpr(column_descriptor, "_empty_constructor") - orig_call_expr = stmt.rvalue - stmt.rvalue = CallExpr( - mm, - [orig_call_expr], - [nodes.ARG_POS], - ["arg1"], - ) - - def _scan_for_mapped_bases( cls: ClassDef, api: SemanticAnalyzerPluginInterface, - cls_metadata: DeclClassApplied, + cls_metadata: util.DeclClassApplied, ) -> None: """Given a class, iterate through its superclass hierarchy to find all other classes that are considered as ORM-significant. @@ -905,99 +466,25 @@ def _scan_for_mapped_bases( """ - baseclasses = list(cls.info.bases) + info = util._info_for_cls(cls, api) + + baseclasses = list(info.bases) + while baseclasses: base: Instance = baseclasses.pop(0) + if base.type.fullname.startswith("builtins"): + continue + # scan each base for mapped attributes. if they are not already - # scanned, that means they are unmapped mixins + # scanned (but have all their type info), that means they are unmapped + # mixins base_decl_class_applied = ( _scan_declarative_assignments_and_apply_types( base.type.defn, api, is_mixin_scan=True ) ) - if base_decl_class_applied is not None: + + if base_decl_class_applied not in (None, False): cls_metadata.mapped_mro.append(base) baseclasses.extend(base.type.bases) - - -def _add_additional_orm_attributes( - cls: ClassDef, - api: SemanticAnalyzerPluginInterface, - cls_metadata: DeclClassApplied, -) -> None: - """Apply __init__, __table__ and other attributes to the mapped class.""" - if "__init__" not in cls.info.names and cls_metadata.is_mapped: - mapped_attr_names = {n: t for n, t in cls_metadata.mapped_attr_names} - - for mapped_base in cls_metadata.mapped_mro: - base_cls_metadata = DeclClassApplied.deserialize( - mapped_base.type.metadata["_sa_decl_class_applied"], api - ) - for n, t in base_cls_metadata.mapped_attr_names: - mapped_attr_names.setdefault(n, t) - - arguments = [] - for name, typ in mapped_attr_names.items(): - if typ is None: - typ = AnyType(TypeOfAny.special_form) - arguments.append( - Argument( - variable=Var(name, typ), - type_annotation=typ, - initializer=TempNode(typ), - kind=ARG_NAMED_OPT, - ) - ) - add_method_to_class(api, cls, "__init__", arguments, NoneTyp()) - - if "__table__" not in cls.info.names and cls_metadata.has_table: - _apply_placeholder_attr_to_class( - api, cls, "sqlalchemy.sql.schema.Table", "__table__" - ) - if cls_metadata.is_mapped: - _apply_placeholder_attr_to_class( - api, cls, "sqlalchemy.orm.mapper.Mapper", "__mapper__" - ) - - -def _apply_placeholder_attr_to_class( - api: SemanticAnalyzerPluginInterface, - cls: ClassDef, - qualified_name: str, - attrname: str, -): - sym = api.lookup_fully_qualified_or_none(qualified_name) - if sym: - assert isinstance(sym.node, TypeInfo) - type_ = Instance(sym.node, []) - else: - type_ = AnyType(TypeOfAny.special_form) - var = Var(attrname) - var.info = cls.info - var.type = type_ - cls.info.names[attrname] = SymbolTableNode(MDEF, var) - - -def _extract_python_type_from_typeengine( - api: SemanticAnalyzerPluginInterface, node: TypeInfo, type_args -) -> Instance: - if node.fullname == "sqlalchemy.sql.sqltypes.Enum" and type_args: - first_arg = type_args[0] - if isinstance(first_arg, NameExpr) and isinstance( - first_arg.node, TypeInfo - ): - for base_ in first_arg.node.mro: - if base_.fullname == "enum.Enum": - return Instance(first_arg.node, []) - # TODO: support other pep-435 types here - else: - n = api.lookup_fully_qualified("builtins.str") - return Instance(n.node, []) - - for mr in node.mro: - if mr.bases: - for base_ in mr.bases: - if base_.type.fullname == "sqlalchemy.sql.type_api.TypeEngine": - return base_.args[-1] - assert False, "could not extract Python type from node: %s" % node diff --git a/lib/sqlalchemy/ext/mypy/infer.py b/lib/sqlalchemy/ext/mypy/infer.py new file mode 100644 index 000000000..1d77e67d2 --- /dev/null +++ b/lib/sqlalchemy/ext/mypy/infer.py @@ -0,0 +1,398 @@ +# ext/mypy/infer.py +# Copyright (C) 2021 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php + +from typing import Optional +from typing import Union + +from mypy import nodes +from mypy import types +from mypy.messages import format_type +from mypy.nodes import AssignmentStmt +from mypy.nodes import CallExpr +from mypy.nodes import NameExpr +from mypy.nodes import StrExpr +from mypy.nodes import TypeInfo +from mypy.nodes import Var +from mypy.plugin import SemanticAnalyzerPluginInterface +from mypy.subtypes import is_subtype +from mypy.types import AnyType +from mypy.types import Instance +from mypy.types import NoneType +from mypy.types import TypeOfAny +from mypy.types import UnionType + +from . import names +from . import util + + +def _infer_type_from_relationship( + api: SemanticAnalyzerPluginInterface, + stmt: AssignmentStmt, + node: Var, + left_hand_explicit_type: Optional[types.Type], +) -> Union[Instance, UnionType, None]: + """Infer the type of mapping from a relationship. + + E.g.:: + + @reg.mapped + class MyClass: + # ... + + addresses = relationship(Address, uselist=True) + + order: Mapped["Order"] = relationship("Order") + + Will resolve in mypy as:: + + @reg.mapped + class MyClass: + # ... + + addresses: Mapped[List[Address]] + + order: Mapped["Order"] + + """ + + assert isinstance(stmt.rvalue, CallExpr) + target_cls_arg = stmt.rvalue.args[0] + python_type_for_type = None + + if isinstance(target_cls_arg, NameExpr) and isinstance( + target_cls_arg.node, TypeInfo + ): + # type + related_object_type = target_cls_arg.node + python_type_for_type = Instance(related_object_type, []) + + # other cases not covered - an error message directs the user + # to set an explicit type annotation + # + # node.type == str, it's a string + # if isinstance(target_cls_arg, NameExpr) and isinstance( + # target_cls_arg.node, Var + # ) + # points to a type + # isinstance(target_cls_arg, NameExpr) and isinstance( + # target_cls_arg.node, TypeAlias + # ) + # string expression + # isinstance(target_cls_arg, StrExpr) + + uselist_arg = util._get_callexpr_kwarg(stmt.rvalue, "uselist") + collection_cls_arg = util._get_callexpr_kwarg( + stmt.rvalue, "collection_class" + ) + + # this can be used to determine Optional for a many-to-one + # in the same way nullable=False could be used, if we start supporting + # that. + # innerjoin_arg = _get_callexpr_kwarg(stmt.rvalue, "innerjoin") + + if ( + uselist_arg is not None + and uselist_arg.fullname == "builtins.True" + and collection_cls_arg is None + ): + if python_type_for_type is not None: + python_type_for_type = Instance( + api.lookup_fully_qualified("builtins.list").node, + [python_type_for_type], + ) + elif ( + uselist_arg is None or uselist_arg.fullname == "builtins.True" + ) and collection_cls_arg is not None: + if isinstance(collection_cls_arg.node, TypeInfo): + if python_type_for_type is not None: + python_type_for_type = Instance( + collection_cls_arg.node, [python_type_for_type] + ) + else: + util.fail( + api, + "Expected Python collection type for " + "collection_class parameter", + stmt.rvalue, + ) + python_type_for_type = None + elif uselist_arg is not None and uselist_arg.fullname == "builtins.False": + if collection_cls_arg is not None: + util.fail( + api, + "Sending uselist=False and collection_class at the same time " + "does not make sense", + stmt.rvalue, + ) + if python_type_for_type is not None: + python_type_for_type = UnionType( + [python_type_for_type, NoneType()] + ) + + else: + if left_hand_explicit_type is None: + msg = ( + "Can't infer scalar or collection for ORM mapped expression " + "assigned to attribute '{}' if both 'uselist' and " + "'collection_class' arguments are absent from the " + "relationship(); please specify a " + "type annotation on the left hand side." + ) + util.fail(api, msg.format(node.name), node) + + if python_type_for_type is None: + return _infer_type_from_left_hand_type_only( + api, node, left_hand_explicit_type + ) + elif left_hand_explicit_type is not None: + return _infer_type_from_left_and_inferred_right( + api, node, left_hand_explicit_type, python_type_for_type + ) + else: + return python_type_for_type + + +def _infer_type_from_decl_composite_property( + api: SemanticAnalyzerPluginInterface, + stmt: AssignmentStmt, + node: Var, + left_hand_explicit_type: Optional[types.Type], +) -> Union[Instance, UnionType, None]: + """Infer the type of mapping from a CompositeProperty.""" + + assert isinstance(stmt.rvalue, CallExpr) + target_cls_arg = stmt.rvalue.args[0] + python_type_for_type = None + + if isinstance(target_cls_arg, NameExpr) and isinstance( + target_cls_arg.node, TypeInfo + ): + related_object_type = target_cls_arg.node + python_type_for_type = Instance(related_object_type, []) + else: + python_type_for_type = None + + if python_type_for_type is None: + return _infer_type_from_left_hand_type_only( + api, node, left_hand_explicit_type + ) + elif left_hand_explicit_type is not None: + return _infer_type_from_left_and_inferred_right( + api, node, left_hand_explicit_type, python_type_for_type + ) + else: + return python_type_for_type + + +def _infer_type_from_decl_column_property( + api: SemanticAnalyzerPluginInterface, + stmt: AssignmentStmt, + node: Var, + left_hand_explicit_type: Optional[types.Type], +) -> Union[Instance, UnionType, None]: + """Infer the type of mapping from a ColumnProperty. + + This includes mappings against ``column_property()`` as well as the + ``deferred()`` function. + + """ + assert isinstance(stmt.rvalue, CallExpr) + first_prop_arg = stmt.rvalue.args[0] + + if isinstance(first_prop_arg, CallExpr): + type_id = names._type_id_for_callee(first_prop_arg.callee) + else: + type_id = None + + # look for column_property() / deferred() etc with Column as first + # argument + if type_id is names.COLUMN: + return _infer_type_from_decl_column( + api, stmt, node, left_hand_explicit_type, first_prop_arg + ) + else: + return _infer_type_from_left_hand_type_only( + api, node, left_hand_explicit_type + ) + + +def _infer_type_from_decl_column( + api: SemanticAnalyzerPluginInterface, + stmt: AssignmentStmt, + node: Var, + left_hand_explicit_type: Optional[types.Type], + right_hand_expression: CallExpr, +) -> Union[Instance, UnionType, None]: + """Infer the type of mapping from a Column. + + E.g.:: + + @reg.mapped + class MyClass: + # ... + + a = Column(Integer) + + b = Column("b", String) + + c: Mapped[int] = Column(Integer) + + d: bool = Column(Boolean) + + Will resolve in MyPy as:: + + @reg.mapped + class MyClass: + # ... + + a : Mapped[int] + + b : Mapped[str] + + c: Mapped[int] + + d: Mapped[bool] + + """ + assert isinstance(node, Var) + + callee = None + + for column_arg in right_hand_expression.args[0:2]: + if isinstance(column_arg, nodes.CallExpr): + # x = Column(String(50)) + callee = column_arg.callee + type_args = column_arg.args + break + elif isinstance(column_arg, (nodes.NameExpr, nodes.MemberExpr)): + if isinstance(column_arg.node, TypeInfo): + # x = Column(String) + callee = column_arg + type_args = () + break + else: + # x = Column(some_name, String), go to next argument + continue + elif isinstance(column_arg, (StrExpr,)): + # x = Column("name", String), go to next argument + continue + else: + assert False + + if callee is None: + return None + + if isinstance(callee.node, TypeInfo) and names._mro_has_id( + callee.node.mro, names.TYPEENGINE + ): + python_type_for_type = _extract_python_type_from_typeengine( + api, callee.node, type_args + ) + + if left_hand_explicit_type is not None: + + return _infer_type_from_left_and_inferred_right( + api, node, left_hand_explicit_type, python_type_for_type + ) + + else: + python_type_for_type = UnionType( + [python_type_for_type, NoneType()] + ) + return python_type_for_type + else: + # it's not TypeEngine, it's typically implicitly typed + # like ForeignKey. we can't infer from the right side. + return _infer_type_from_left_hand_type_only( + api, node, left_hand_explicit_type + ) + + +def _infer_type_from_left_and_inferred_right( + api: SemanticAnalyzerPluginInterface, + node: Var, + left_hand_explicit_type: Optional[types.Type], + python_type_for_type: Union[Instance, UnionType], +) -> Optional[Union[Instance, UnionType]]: + """Validate type when a left hand annotation is present and we also + could infer the right hand side:: + + attrname: SomeType = Column(SomeDBType) + + """ + if not is_subtype(left_hand_explicit_type, python_type_for_type): + descriptor = api.lookup("__sa_Mapped", node) + + effective_type = Instance(descriptor.node, [python_type_for_type]) + + msg = ( + "Left hand assignment '{}: {}' not compatible " + "with ORM mapped expression of type {}" + ) + util.fail( + api, + msg.format( + node.name, + format_type(left_hand_explicit_type), + format_type(effective_type), + ), + node, + ) + + return left_hand_explicit_type + + +def _infer_type_from_left_hand_type_only( + api: SemanticAnalyzerPluginInterface, + node: Var, + left_hand_explicit_type: Optional[types.Type], +) -> Optional[Union[Instance, UnionType]]: + """Determine the type based on explicit annotation only. + + if no annotation were present, note that we need one there to know + the type. + + """ + if left_hand_explicit_type is None: + msg = ( + "Can't infer type from ORM mapped expression " + "assigned to attribute '{}'; please specify a " + "Python type or " + "Mapped[<python type>] on the left hand side." + ) + util.fail(api, msg.format(node.name), node) + + descriptor = api.lookup("__sa_Mapped", node) + return Instance(descriptor.node, [AnyType(TypeOfAny.special_form)]) + + else: + # use type from the left hand side + return left_hand_explicit_type + + +def _extract_python_type_from_typeengine( + api: SemanticAnalyzerPluginInterface, node: TypeInfo, type_args +) -> Instance: + if node.fullname == "sqlalchemy.sql.sqltypes.Enum" and type_args: + first_arg = type_args[0] + if isinstance(first_arg, NameExpr) and isinstance( + first_arg.node, TypeInfo + ): + for base_ in first_arg.node.mro: + if base_.fullname == "enum.Enum": + return Instance(first_arg.node, []) + # TODO: support other pep-435 types here + else: + n = api.lookup_fully_qualified("builtins.str") + return Instance(n.node, []) + + for mr in node.mro: + if mr.bases: + for base_ in mr.bases: + if base_.type.fullname == "sqlalchemy.sql.type_api.TypeEngine": + return base_.args[-1] + assert False, "could not extract Python type from node: %s" % node diff --git a/lib/sqlalchemy/ext/mypy/names.py b/lib/sqlalchemy/ext/mypy/names.py index d1fd77415..11208f3c7 100644 --- a/lib/sqlalchemy/ext/mypy/names.py +++ b/lib/sqlalchemy/ext/mypy/names.py @@ -36,6 +36,7 @@ DECLARED_ATTR = util.symbol("DECLARED_ATTR") MAPPER_PROPERTY = util.symbol("MAPPER_PROPERTY") AS_DECLARATIVE = util.symbol("AS_DECLARATIVE") AS_DECLARATIVE_BASE = util.symbol("AS_DECLARATIVE_BASE") +DECLARATIVE_MIXIN = util.symbol("DECLARATIVE_MIXIN") _lookup = { "Column": ( @@ -134,6 +135,13 @@ _lookup = { "sqlalchemy.orm.declared_attr", }, ), + "declarative_mixin": ( + DECLARATIVE_MIXIN, + { + "sqlalchemy.orm.decl_api.declarative_mixin", + "sqlalchemy.orm.declarative_mixin", + }, + ), } diff --git a/lib/sqlalchemy/ext/mypy/plugin.py b/lib/sqlalchemy/ext/mypy/plugin.py index 9ca1cb2da..a0aa5bf04 100644 --- a/lib/sqlalchemy/ext/mypy/plugin.py +++ b/lib/sqlalchemy/ext/mypy/plugin.py @@ -55,6 +55,7 @@ class CustomPlugin(Plugin): # subclasses. but then you can just check it here from the "base" # and get the same effect. sym = self.lookup_fully_qualified(fullname) + if ( sym and isinstance(sym.node, TypeInfo) @@ -70,17 +71,18 @@ class CustomPlugin(Plugin): ) -> Optional[Callable[[ClassDefContext], None]]: sym = self.lookup_fully_qualified(fullname) - if ( - sym is not None - and names._type_id_for_named_node(sym.node) - is names.MAPPED_DECORATOR - ): - return _cls_decorator_hook - elif sym is not None and names._type_id_for_named_node(sym.node) in ( - names.AS_DECLARATIVE, - names.AS_DECLARATIVE_BASE, - ): - return _base_cls_decorator_hook + + if sym is not None: + type_id = names._type_id_for_named_node(sym.node) + if type_id is names.MAPPED_DECORATOR: + return _cls_decorator_hook + elif type_id in ( + names.AS_DECLARATIVE, + names.AS_DECLARATIVE_BASE, + ): + return _base_cls_decorator_hook + elif type_id is names.DECLARATIVE_MIXIN: + return _declarative_mixin_hook return None @@ -192,6 +194,13 @@ def _base_cls_hook(ctx: ClassDefContext) -> None: decl_class._scan_declarative_assignments_and_apply_types(ctx.cls, ctx.api) +def _declarative_mixin_hook(ctx: ClassDefContext) -> None: + _add_globals(ctx) + decl_class._scan_declarative_assignments_and_apply_types( + ctx.cls, ctx.api, is_mixin_scan=True + ) + + def _cls_decorator_hook(ctx: ClassDefContext) -> None: _add_globals(ctx) assert isinstance(ctx.reason, nodes.MemberExpr) diff --git a/lib/sqlalchemy/ext/mypy/util.py b/lib/sqlalchemy/ext/mypy/util.py index 7079f3cd7..becce3ebe 100644 --- a/lib/sqlalchemy/ext/mypy/util.py +++ b/lib/sqlalchemy/ext/mypy/util.py @@ -1,18 +1,67 @@ from typing import Optional +from typing import Sequence +from typing import Tuple +from typing import Type from mypy.nodes import CallExpr +from mypy.nodes import CLASSDEF_NO_INFO from mypy.nodes import Context from mypy.nodes import IfStmt +from mypy.nodes import JsonDict from mypy.nodes import NameExpr from mypy.nodes import SymbolTableNode +from mypy.nodes import TypeInfo from mypy.plugin import SemanticAnalyzerPluginInterface +from mypy.plugins.common import deserialize_and_fixup_type from mypy.types import Instance from mypy.types import NoneType -from mypy.types import Type from mypy.types import UnboundType from mypy.types import UnionType +class DeclClassApplied: + def __init__( + self, + is_mapped: bool, + has_table: bool, + mapped_attr_names: Sequence[Tuple[str, Type]], + mapped_mro: Sequence[Type], + ): + self.is_mapped = is_mapped + self.has_table = has_table + self.mapped_attr_names = mapped_attr_names + self.mapped_mro = mapped_mro + + def serialize(self) -> JsonDict: + return { + "is_mapped": self.is_mapped, + "has_table": self.has_table, + "mapped_attr_names": [ + (name, type_.serialize()) + for name, type_ in self.mapped_attr_names + ], + "mapped_mro": [type_.serialize() for type_ in self.mapped_mro], + } + + @classmethod + def deserialize( + cls, data: JsonDict, api: SemanticAnalyzerPluginInterface + ) -> "DeclClassApplied": + + return DeclClassApplied( + is_mapped=data["is_mapped"], + has_table=data["has_table"], + mapped_attr_names=[ + (name, deserialize_and_fixup_type(type_, api)) + for name, type_ in data["mapped_attr_names"] + ], + mapped_mro=[ + deserialize_and_fixup_type(type_, api) + for type_ in data["mapped_mro"] + ], + ) + + def fail(api: SemanticAnalyzerPluginInterface, msg: str, ctx: Context): msg = "[SQLAlchemy Mypy plugin] %s" % msg return api.fail(msg, ctx) @@ -94,3 +143,14 @@ def _unbound_to_instance( ) else: return typ + + +def _info_for_cls(cls, api): + if cls.info is CLASSDEF_NO_INFO: + sym = api.lookup(cls.name, cls) + if sym.node and isinstance(sym.node, TypeInfo): + info = sym.node + else: + info = cls.info + + return info diff --git a/lib/sqlalchemy/orm/__init__.py b/lib/sqlalchemy/orm/__init__.py index 025d826e3..66c3e7e33 100644 --- a/lib/sqlalchemy/orm/__init__.py +++ b/lib/sqlalchemy/orm/__init__.py @@ -23,6 +23,7 @@ from .attributes import QueryableAttribute from .context import QueryContext from .decl_api import as_declarative from .decl_api import declarative_base +from .decl_api import declarative_mixin from .decl_api import DeclarativeMeta from .decl_api import declared_attr from .decl_api import has_inherited_table diff --git a/lib/sqlalchemy/orm/decl_api.py b/lib/sqlalchemy/orm/decl_api.py index ef53e2d39..d9c464815 100644 --- a/lib/sqlalchemy/orm/decl_api.py +++ b/lib/sqlalchemy/orm/decl_api.py @@ -321,6 +321,48 @@ class _stateful_declared_attr(declared_attr): return declared_attr(fn, **self.kw) +def declarative_mixin(cls): + """Mark a class as providing the feature of "declarative mixin". + + E.g.:: + + from sqlalchemy.orm import declared_attr + from sqlalchemy.orm import declarative_mixin + + @declarative_mixin + class MyMixin: + + @declared_attr + def __tablename__(cls): + return cls.__name__.lower() + + __table_args__ = {'mysql_engine': 'InnoDB'} + __mapper_args__= {'always_refresh': True} + + id = Column(Integer, primary_key=True) + + class MyModel(MyMixin, Base): + name = Column(String(1000)) + + The :func:`_orm.declarative_mixin` decorator currently does not modify + the given class in any way; it's current purpose is strictly to assist + the :ref:`Mypy plugin <mypy_toplevel>` in being able to identify + SQLAlchemy declarative mixin classes when no other context is present. + + .. versionadded:: 1.4.6 + + .. seealso:: + + :ref:`orm_mixins_toplevel` + + :ref:`mypy_declarative_mixins` - in the + :ref:`Mypy plugin documentation <mypy_toplevel>` + + """ # noqa: E501 + + return cls + + def declarative_base( bind=None, metadata=None, |
