summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2021-03-26 19:45:29 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2021-04-05 19:23:52 -0400
commit606096ae01c71298da4d3fda3f62c730d9985105 (patch)
tree8e745f10d60683edbf1da49a557bebc9994fb7ba /lib
parent4e0f90424afe03547c21ec3b6365755bb5288075 (diff)
downloadsqlalchemy-606096ae01c71298da4d3fda3f62c730d9985105.tar.gz
Adjust for mypy incremental behaviors
Applied a series of refactorings and fixes to accommodate for Mypy "incremental" mode across multiple files, which previously was not taken into account. In this mode the Mypy plugin has to accommodate Python datatypes expressed in other files coming in with less information than they have on a direct run. Additionally, a new decorator :func:`_orm.declarative_mixin` is added, which is necessary for the Mypy plugin to be able to definifitely identify a Declarative mixin class that is otherwise not used inside a particular Python file. discussion: With incremental / deserialized mypy runs, it appears that when we look at a base class that comes from another file, cls.info is set to a special undefined node that matches CLASSDEF_NO_INFO, and we otherwise can't touch it without crashing. Additionally, sometimes cls.defs.body is present but empty. However, it appears that both of these cases can be sidestepped, first by doing a lookup() for the type name where we get a SymbolTableNode that then has the TypeInfo we wanted when we tried touching cls.info, and then however we got the TypeInfo, if cls.defs.body is empty we can just look in the names to get at the symbols for that class; we just can't access AssignmentStmts, but that's fine because we just need the information for classes we aren't actually type checking. This work also revealed there's no easy way to detect a mixin class so we just create a new decorator to mark that. will make code look better in any case. Fixes: #6147 Change-Id: Ia8fac8acfeec931d8f280491cffc5c6cb4a1204e
Diffstat (limited to 'lib')
-rw-r--r--lib/sqlalchemy/ext/mypy/apply.py215
-rw-r--r--lib/sqlalchemy/ext/mypy/decl_class.py771
-rw-r--r--lib/sqlalchemy/ext/mypy/infer.py398
-rw-r--r--lib/sqlalchemy/ext/mypy/names.py8
-rw-r--r--lib/sqlalchemy/ext/mypy/plugin.py31
-rw-r--r--lib/sqlalchemy/ext/mypy/util.py62
-rw-r--r--lib/sqlalchemy/orm/__init__.py1
-rw-r--r--lib/sqlalchemy/orm/decl_api.py42
8 files changed, 874 insertions, 654 deletions
diff --git a/lib/sqlalchemy/ext/mypy/apply.py b/lib/sqlalchemy/ext/mypy/apply.py
new file mode 100644
index 000000000..6442cbc22
--- /dev/null
+++ b/lib/sqlalchemy/ext/mypy/apply.py
@@ -0,0 +1,215 @@
+# ext/mypy/apply.py
+# Copyright (C) 2021 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+from typing import Optional
+from typing import Union
+
+from mypy import nodes
+from mypy.nodes import ARG_NAMED_OPT
+from mypy.nodes import Argument
+from mypy.nodes import AssignmentStmt
+from mypy.nodes import CallExpr
+from mypy.nodes import ClassDef
+from mypy.nodes import MDEF
+from mypy.nodes import NameExpr
+from mypy.nodes import StrExpr
+from mypy.nodes import SymbolTableNode
+from mypy.nodes import TempNode
+from mypy.nodes import TypeInfo
+from mypy.nodes import Var
+from mypy.plugin import SemanticAnalyzerPluginInterface
+from mypy.plugins.common import add_method_to_class
+from mypy.types import AnyType
+from mypy.types import Instance
+from mypy.types import NoneTyp
+from mypy.types import TypeOfAny
+from mypy.types import UnionType
+
+from . import util
+
+
+def _apply_mypy_mapped_attr(
+ cls: ClassDef,
+ api: SemanticAnalyzerPluginInterface,
+ item: Union[NameExpr, StrExpr],
+ cls_metadata: util.DeclClassApplied,
+):
+ if isinstance(item, NameExpr):
+ name = item.name
+ elif isinstance(item, StrExpr):
+ name = item.value
+ else:
+ return
+
+ for stmt in cls.defs.body:
+ if isinstance(stmt, AssignmentStmt) and stmt.lvalues[0].name == name:
+ break
+ else:
+ util.fail(api, "Can't find mapped attribute {}".format(name), cls)
+ return
+
+ if stmt.type is None:
+ util.fail(
+ api,
+ "Statement linked from _mypy_mapped_attrs has no "
+ "typing information",
+ stmt,
+ )
+ return
+
+ left_hand_explicit_type = stmt.type
+
+ cls_metadata.mapped_attr_names.append((name, left_hand_explicit_type))
+
+ _apply_type_to_mapped_statement(
+ api, stmt, stmt.lvalues[0], left_hand_explicit_type, None
+ )
+
+
+def _re_apply_declarative_assignments(
+ cls: ClassDef,
+ api: SemanticAnalyzerPluginInterface,
+ cls_metadata: util.DeclClassApplied,
+):
+ """For multiple class passes, re-apply our left-hand side types as mypy
+ seems to reset them in place.
+
+ """
+ mapped_attr_lookup = {
+ name: typ for name, typ in cls_metadata.mapped_attr_names
+ }
+
+ descriptor = api.lookup("__sa_Mapped", cls)
+ for stmt in cls.defs.body:
+ # for a re-apply, all of our statements are AssignmentStmt;
+ # @declared_attr calls will have been converted and this
+ # currently seems to be preserved by mypy (but who knows if this
+ # will change).
+ if (
+ isinstance(stmt, AssignmentStmt)
+ and stmt.lvalues[0].name in mapped_attr_lookup
+ ):
+ typ = mapped_attr_lookup[stmt.lvalues[0].name]
+ left_node = stmt.lvalues[0].node
+
+ inst = Instance(descriptor.node, [typ])
+ left_node.type = inst
+
+
+def _apply_type_to_mapped_statement(
+ api: SemanticAnalyzerPluginInterface,
+ stmt: AssignmentStmt,
+ lvalue: NameExpr,
+ left_hand_explicit_type: Optional[Union[Instance, UnionType]],
+ python_type_for_type: Union[Instance, UnionType],
+) -> None:
+ """Apply the Mapped[<type>] annotation and right hand object to a
+ declarative assignment statement.
+
+ This converts a Python declarative class statement such as::
+
+ class User(Base):
+ # ...
+
+ attrname = Column(Integer)
+
+ To one that describes the final Python behavior to Mypy::
+
+ class User(Base):
+ # ...
+
+ attrname : Mapped[Optional[int]] = <meaningless temp node>
+
+ """
+ descriptor = api.lookup("__sa_Mapped", stmt)
+ left_node = lvalue.node
+
+ inst = Instance(descriptor.node, [python_type_for_type])
+
+ if left_hand_explicit_type is not None:
+ left_node.type = Instance(descriptor.node, [left_hand_explicit_type])
+ else:
+ lvalue.is_inferred_def = False
+ left_node.type = inst
+
+ # so to have it skip the right side totally, we can do this:
+ # stmt.rvalue = TempNode(AnyType(TypeOfAny.special_form))
+
+ # however, if we instead manufacture a new node that uses the old
+ # one, then we can still get type checking for the call itself,
+ # e.g. the Column, relationship() call, etc.
+
+ # rewrite the node as:
+ # <attr> : Mapped[<typ>] =
+ # _sa_Mapped._empty_constructor(<original CallExpr from rvalue>)
+ # the original right-hand side is maintained so it gets type checked
+ # internally
+ column_descriptor = nodes.NameExpr("__sa_Mapped")
+ column_descriptor.fullname = "sqlalchemy.orm.Mapped"
+ mm = nodes.MemberExpr(column_descriptor, "_empty_constructor")
+ orig_call_expr = stmt.rvalue
+ stmt.rvalue = CallExpr(mm, [orig_call_expr], [nodes.ARG_POS], ["arg1"])
+
+
+def _add_additional_orm_attributes(
+ cls: ClassDef,
+ api: SemanticAnalyzerPluginInterface,
+ cls_metadata: util.DeclClassApplied,
+) -> None:
+ """Apply __init__, __table__ and other attributes to the mapped class."""
+
+ info = util._info_for_cls(cls, api)
+ if "__init__" not in info.names and cls_metadata.is_mapped:
+ mapped_attr_names = {n: t for n, t in cls_metadata.mapped_attr_names}
+
+ for mapped_base in cls_metadata.mapped_mro:
+ base_cls_metadata = util.DeclClassApplied.deserialize(
+ mapped_base.type.metadata["_sa_decl_class_applied"], api
+ )
+ for n, t in base_cls_metadata.mapped_attr_names:
+ mapped_attr_names.setdefault(n, t)
+
+ arguments = []
+ for name, typ in mapped_attr_names.items():
+ if typ is None:
+ typ = AnyType(TypeOfAny.special_form)
+ arguments.append(
+ Argument(
+ variable=Var(name, typ),
+ type_annotation=typ,
+ initializer=TempNode(typ),
+ kind=ARG_NAMED_OPT,
+ )
+ )
+ add_method_to_class(api, cls, "__init__", arguments, NoneTyp())
+
+ if "__table__" not in info.names and cls_metadata.has_table:
+ _apply_placeholder_attr_to_class(
+ api, cls, "sqlalchemy.sql.schema.Table", "__table__"
+ )
+ if cls_metadata.is_mapped:
+ _apply_placeholder_attr_to_class(
+ api, cls, "sqlalchemy.orm.mapper.Mapper", "__mapper__"
+ )
+
+
+def _apply_placeholder_attr_to_class(
+ api: SemanticAnalyzerPluginInterface,
+ cls: ClassDef,
+ qualified_name: str,
+ attrname: str,
+):
+ sym = api.lookup_fully_qualified_or_none(qualified_name)
+ if sym:
+ assert isinstance(sym.node, TypeInfo)
+ type_ = Instance(sym.node, [])
+ else:
+ type_ = AnyType(TypeOfAny.special_form)
+ var = Var(attrname)
+ var.info = cls.info
+ var.type = type_
+ cls.info.names[attrname] = SymbolTableNode(MDEF, var)
diff --git a/lib/sqlalchemy/ext/mypy/decl_class.py b/lib/sqlalchemy/ext/mypy/decl_class.py
index 46f3cc30e..a0e272f71 100644
--- a/lib/sqlalchemy/ext/mypy/decl_class.py
+++ b/lib/sqlalchemy/ext/mypy/decl_class.py
@@ -6,23 +6,14 @@
# the MIT License: http://www.opensource.org/licenses/mit-license.php
from typing import Optional
-from typing import Sequence
-from typing import Tuple
from typing import Type
-from typing import Union
from mypy import nodes
-from mypy import types
-from mypy.messages import format_type
-from mypy.nodes import ARG_NAMED_OPT
-from mypy.nodes import Argument
from mypy.nodes import AssignmentStmt
from mypy.nodes import CallExpr
from mypy.nodes import ClassDef
from mypy.nodes import Decorator
-from mypy.nodes import JsonDict
from mypy.nodes import ListExpr
-from mypy.nodes import MDEF
from mypy.nodes import NameExpr
from mypy.nodes import PlaceholderNode
from mypy.nodes import RefExpr
@@ -32,73 +23,30 @@ from mypy.nodes import TempNode
from mypy.nodes import TypeInfo
from mypy.nodes import Var
from mypy.plugin import SemanticAnalyzerPluginInterface
-from mypy.plugins.common import add_method_to_class
-from mypy.plugins.common import deserialize_and_fixup_type
-from mypy.subtypes import is_subtype
from mypy.types import AnyType
from mypy.types import Instance
-from mypy.types import NoneTyp
from mypy.types import NoneType
from mypy.types import TypeOfAny
from mypy.types import UnboundType
from mypy.types import UnionType
+from . import apply
+from . import infer
from . import names
from . import util
-class DeclClassApplied:
- def __init__(
- self,
- is_mapped: bool,
- has_table: bool,
- mapped_attr_names: Sequence[Tuple[str, Type]],
- mapped_mro: Sequence[Type],
- ):
- self.is_mapped = is_mapped
- self.has_table = has_table
- self.mapped_attr_names = mapped_attr_names
- self.mapped_mro = mapped_mro
-
- def serialize(self) -> JsonDict:
- return {
- "is_mapped": self.is_mapped,
- "has_table": self.has_table,
- "mapped_attr_names": [
- (name, type_.serialize())
- for name, type_ in self.mapped_attr_names
- ],
- "mapped_mro": [type_.serialize() for type_ in self.mapped_mro],
- }
-
- @classmethod
- def deserialize(
- cls, data: JsonDict, api: SemanticAnalyzerPluginInterface
- ) -> "DeclClassApplied":
-
- return DeclClassApplied(
- is_mapped=data["is_mapped"],
- has_table=data["has_table"],
- mapped_attr_names=[
- (name, deserialize_and_fixup_type(type_, api))
- for name, type_ in data["mapped_attr_names"]
- ],
- mapped_mro=[
- deserialize_and_fixup_type(type_, api)
- for type_ in data["mapped_mro"]
- ],
- )
-
-
def _scan_declarative_assignments_and_apply_types(
cls: ClassDef, api: SemanticAnalyzerPluginInterface, is_mixin_scan=False
-) -> Optional[DeclClassApplied]:
+) -> Optional[util.DeclClassApplied]:
+
+ info = util._info_for_cls(cls, api)
if cls.fullname.startswith("builtins"):
return None
- elif "_sa_decl_class_applied" in cls.info.metadata:
- cls_metadata = DeclClassApplied.deserialize(
- cls.info.metadata["_sa_decl_class_applied"], api
+ elif "_sa_decl_class_applied" in info.metadata:
+ cls_metadata = util.DeclClassApplied.deserialize(
+ info.metadata["_sa_decl_class_applied"], api
)
# ensure that a class that's mapped is always picked up by
@@ -112,30 +60,117 @@ def _scan_declarative_assignments_and_apply_types(
# removing our ability to re-scan. but we have the types
# here, so lets re-apply them.
- _re_apply_declarative_assignments(cls, api, cls_metadata)
+ apply._re_apply_declarative_assignments(cls, api, cls_metadata)
return cls_metadata
- cls_metadata = DeclClassApplied(not is_mixin_scan, False, [], [])
+ cls_metadata = util.DeclClassApplied(not is_mixin_scan, False, [], [])
+
+ if not cls.defs.body:
+ # when we get a mixin class from another file, the body is
+ # empty (!) but the names are in the symbol table. so use that.
- for stmt in util._flatten_typechecking(cls.defs.body):
- if isinstance(stmt, AssignmentStmt):
- _scan_declarative_assignment_stmt(cls, api, stmt, cls_metadata)
- elif isinstance(stmt, Decorator):
- _scan_declarative_decorator_stmt(cls, api, stmt, cls_metadata)
+ for sym_name, sym in info.names.items():
+ _scan_symbol_table_entry(cls, api, sym_name, sym, cls_metadata)
+ else:
+ for stmt in util._flatten_typechecking(cls.defs.body):
+ if isinstance(stmt, AssignmentStmt):
+ _scan_declarative_assignment_stmt(cls, api, stmt, cls_metadata)
+ elif isinstance(stmt, Decorator):
+ _scan_declarative_decorator_stmt(cls, api, stmt, cls_metadata)
_scan_for_mapped_bases(cls, api, cls_metadata)
- _add_additional_orm_attributes(cls, api, cls_metadata)
- cls.info.metadata["_sa_decl_class_applied"] = cls_metadata.serialize()
+ if not is_mixin_scan:
+ apply._add_additional_orm_attributes(cls, api, cls_metadata)
+
+ info.metadata["_sa_decl_class_applied"] = cls_metadata.serialize()
return cls_metadata
+def _scan_symbol_table_entry(
+ cls: ClassDef,
+ api: SemanticAnalyzerPluginInterface,
+ name: str,
+ value: SymbolTableNode,
+ cls_metadata: util.DeclClassApplied,
+):
+ """Extract mapping information from a SymbolTableNode that's in the
+ type.names dictionary.
+
+ """
+ if not isinstance(value.type, Instance):
+ return
+
+ left_hand_explicit_type = None
+ type_id = names._type_id_for_named_node(value.type.type)
+ # type_id = names._type_id_for_unbound_type(value.type.type, cls, api)
+
+ err = False
+
+ # TODO: this is nearly the same logic as that of
+ # _scan_declarative_decorator_stmt, likely can be merged
+ if type_id in {
+ names.MAPPED,
+ names.RELATIONSHIP,
+ names.COMPOSITE_PROPERTY,
+ names.MAPPER_PROPERTY,
+ names.SYNONYM_PROPERTY,
+ names.COLUMN_PROPERTY,
+ }:
+ if value.type.args:
+ left_hand_explicit_type = value.type.args[0]
+ else:
+ err = True
+ elif type_id is names.COLUMN:
+ if not value.type.args:
+ err = True
+ else:
+ typeengine_arg = value.type.args[0]
+ if isinstance(typeengine_arg, Instance):
+ typeengine_arg = typeengine_arg.type
+
+ if isinstance(typeengine_arg, (UnboundType, TypeInfo)):
+ sym = api.lookup(typeengine_arg.name, typeengine_arg)
+ if sym is not None:
+ if names._mro_has_id(sym.node.mro, names.TYPEENGINE):
+
+ left_hand_explicit_type = UnionType(
+ [
+ infer._extract_python_type_from_typeengine(
+ api, sym.node, []
+ ),
+ NoneType(),
+ ]
+ )
+ else:
+ util.fail(
+ api,
+ "Column type should be a TypeEngine "
+ "subclass not '{}'".format(sym.node.fullname),
+ value.type,
+ )
+
+ if err:
+ msg = (
+ "Can't infer type from attribute {} on class {}. "
+ "please specify a return type from this function that is "
+ "one of: Mapped[<python type>], relationship[<target class>], "
+ "Column[<TypeEngine>], MapperProperty[<python type>]"
+ )
+ util.fail(api, msg.format(name, cls.name))
+
+ left_hand_explicit_type = AnyType(TypeOfAny.special_form)
+
+ if left_hand_explicit_type is not None:
+ cls_metadata.mapped_attr_names.append((name, left_hand_explicit_type))
+
+
def _scan_declarative_decorator_stmt(
cls: ClassDef,
api: SemanticAnalyzerPluginInterface,
stmt: Decorator,
- cls_metadata: DeclClassApplied,
+ cls_metadata: util.DeclClassApplied,
):
"""Extract mapping information from a @declared_attr in a declarative
class.
@@ -201,7 +236,7 @@ def _scan_declarative_decorator_stmt(
left_hand_explicit_type = UnionType(
[
- _extract_python_type_from_typeengine(
+ infer._extract_python_type_from_typeengine(
api, sym.node, []
),
NoneType(),
@@ -279,7 +314,7 @@ def _scan_declarative_assignment_stmt(
cls: ClassDef,
api: SemanticAnalyzerPluginInterface,
stmt: AssignmentStmt,
- cls_metadata: DeclClassApplied,
+ cls_metadata: util.DeclClassApplied,
):
"""Extract mapping information from an assignment statement in a
declarative class.
@@ -317,7 +352,7 @@ def _scan_declarative_assignment_stmt(
else:
for item in stmt.rvalue.items:
if isinstance(item, (NameExpr, StrExpr)):
- _apply_mypy_mapped_attr(cls, api, item, cls_metadata)
+ apply._apply_mypy_mapped_attr(cls, api, item, cls_metadata)
left_hand_mapped_type: Type = None
@@ -378,24 +413,26 @@ def _scan_declarative_assignment_stmt(
if type_id is None:
return
elif type_id is names.COLUMN:
- python_type_for_type = _infer_type_from_decl_column(
+ python_type_for_type = infer._infer_type_from_decl_column(
api, stmt, node, left_hand_explicit_type, stmt.rvalue
)
elif type_id is names.RELATIONSHIP:
- python_type_for_type = _infer_type_from_relationship(
+ python_type_for_type = infer._infer_type_from_relationship(
api, stmt, node, left_hand_explicit_type
)
elif type_id is names.COLUMN_PROPERTY:
- python_type_for_type = _infer_type_from_decl_column_property(
+ python_type_for_type = infer._infer_type_from_decl_column_property(
api, stmt, node, left_hand_explicit_type
)
elif type_id is names.SYNONYM_PROPERTY:
- python_type_for_type = _infer_type_from_left_hand_type_only(
+ python_type_for_type = infer._infer_type_from_left_hand_type_only(
api, node, left_hand_explicit_type
)
elif type_id is names.COMPOSITE_PROPERTY:
- python_type_for_type = _infer_type_from_decl_composite_property(
- api, stmt, node, left_hand_explicit_type
+ python_type_for_type = (
+ infer._infer_type_from_decl_composite_property(
+ api, stmt, node, left_hand_explicit_type
+ )
)
else:
return
@@ -407,7 +444,7 @@ def _scan_declarative_assignment_stmt(
assert python_type_for_type is not None
- _apply_type_to_mapped_statement(
+ apply._apply_type_to_mapped_statement(
api,
stmt,
lvalue,
@@ -416,486 +453,10 @@ def _scan_declarative_assignment_stmt(
)
-def _apply_mypy_mapped_attr(
- cls: ClassDef,
- api: SemanticAnalyzerPluginInterface,
- item: Union[NameExpr, StrExpr],
- cls_metadata: DeclClassApplied,
-):
- if isinstance(item, NameExpr):
- name = item.name
- elif isinstance(item, StrExpr):
- name = item.value
- else:
- return
-
- for stmt in cls.defs.body:
- if isinstance(stmt, AssignmentStmt) and stmt.lvalues[0].name == name:
- break
- else:
- util.fail(api, "Can't find mapped attribute {}".format(name), cls)
- return
-
- if stmt.type is None:
- util.fail(
- api,
- "Statement linked from _mypy_mapped_attrs has no "
- "typing information",
- stmt,
- )
- return
-
- left_hand_explicit_type = stmt.type
-
- cls_metadata.mapped_attr_names.append((name, left_hand_explicit_type))
-
- _apply_type_to_mapped_statement(
- api, stmt, stmt.lvalues[0], left_hand_explicit_type, None
- )
-
-
-def _infer_type_from_relationship(
- api: SemanticAnalyzerPluginInterface,
- stmt: AssignmentStmt,
- node: Var,
- left_hand_explicit_type: Optional[types.Type],
-) -> Union[Instance, UnionType, None]:
- """Infer the type of mapping from a relationship.
-
- E.g.::
-
- @reg.mapped
- class MyClass:
- # ...
-
- addresses = relationship(Address, uselist=True)
-
- order: Mapped["Order"] = relationship("Order")
-
- Will resolve in mypy as::
-
- @reg.mapped
- class MyClass:
- # ...
-
- addresses: Mapped[List[Address]]
-
- order: Mapped["Order"]
-
- """
-
- assert isinstance(stmt.rvalue, CallExpr)
- target_cls_arg = stmt.rvalue.args[0]
- python_type_for_type = None
-
- if isinstance(target_cls_arg, NameExpr) and isinstance(
- target_cls_arg.node, TypeInfo
- ):
- # type
- related_object_type = target_cls_arg.node
- python_type_for_type = Instance(related_object_type, [])
-
- # other cases not covered - an error message directs the user
- # to set an explicit type annotation
- #
- # node.type == str, it's a string
- # if isinstance(target_cls_arg, NameExpr) and isinstance(
- # target_cls_arg.node, Var
- # )
- # points to a type
- # isinstance(target_cls_arg, NameExpr) and isinstance(
- # target_cls_arg.node, TypeAlias
- # )
- # string expression
- # isinstance(target_cls_arg, StrExpr)
-
- uselist_arg = util._get_callexpr_kwarg(stmt.rvalue, "uselist")
- collection_cls_arg = util._get_callexpr_kwarg(
- stmt.rvalue, "collection_class"
- )
-
- # this can be used to determine Optional for a many-to-one
- # in the same way nullable=False could be used, if we start supporting
- # that.
- # innerjoin_arg = _get_callexpr_kwarg(stmt.rvalue, "innerjoin")
-
- if (
- uselist_arg is not None
- and uselist_arg.fullname == "builtins.True"
- and collection_cls_arg is None
- ):
- if python_type_for_type is not None:
- python_type_for_type = Instance(
- api.lookup_fully_qualified("builtins.list").node,
- [python_type_for_type],
- )
- elif (
- uselist_arg is None or uselist_arg.fullname == "builtins.True"
- ) and collection_cls_arg is not None:
- if isinstance(collection_cls_arg.node, TypeInfo):
- if python_type_for_type is not None:
- python_type_for_type = Instance(
- collection_cls_arg.node, [python_type_for_type]
- )
- else:
- util.fail(
- api,
- "Expected Python collection type for "
- "collection_class parameter",
- stmt.rvalue,
- )
- python_type_for_type = None
- elif uselist_arg is not None and uselist_arg.fullname == "builtins.False":
- if collection_cls_arg is not None:
- util.fail(
- api,
- "Sending uselist=False and collection_class at the same time "
- "does not make sense",
- stmt.rvalue,
- )
- if python_type_for_type is not None:
- python_type_for_type = UnionType(
- [python_type_for_type, NoneType()]
- )
-
- else:
- if left_hand_explicit_type is None:
- msg = (
- "Can't infer scalar or collection for ORM mapped expression "
- "assigned to attribute '{}' if both 'uselist' and "
- "'collection_class' arguments are absent from the "
- "relationship(); please specify a "
- "type annotation on the left hand side."
- )
- util.fail(api, msg.format(node.name), node)
-
- if python_type_for_type is None:
- return _infer_type_from_left_hand_type_only(
- api, node, left_hand_explicit_type
- )
- elif left_hand_explicit_type is not None:
- return _infer_type_from_left_and_inferred_right(
- api, node, left_hand_explicit_type, python_type_for_type
- )
- else:
- return python_type_for_type
-
-
-def _infer_type_from_decl_composite_property(
- api: SemanticAnalyzerPluginInterface,
- stmt: AssignmentStmt,
- node: Var,
- left_hand_explicit_type: Optional[types.Type],
-) -> Union[Instance, UnionType, None]:
- """Infer the type of mapping from a CompositeProperty."""
-
- assert isinstance(stmt.rvalue, CallExpr)
- target_cls_arg = stmt.rvalue.args[0]
- python_type_for_type = None
-
- if isinstance(target_cls_arg, NameExpr) and isinstance(
- target_cls_arg.node, TypeInfo
- ):
- related_object_type = target_cls_arg.node
- python_type_for_type = Instance(related_object_type, [])
- else:
- python_type_for_type = None
-
- if python_type_for_type is None:
- return _infer_type_from_left_hand_type_only(
- api, node, left_hand_explicit_type
- )
- elif left_hand_explicit_type is not None:
- return _infer_type_from_left_and_inferred_right(
- api, node, left_hand_explicit_type, python_type_for_type
- )
- else:
- return python_type_for_type
-
-
-def _infer_type_from_decl_column_property(
- api: SemanticAnalyzerPluginInterface,
- stmt: AssignmentStmt,
- node: Var,
- left_hand_explicit_type: Optional[types.Type],
-) -> Union[Instance, UnionType, None]:
- """Infer the type of mapping from a ColumnProperty.
-
- This includes mappings against ``column_property()`` as well as the
- ``deferred()`` function.
-
- """
- assert isinstance(stmt.rvalue, CallExpr)
- first_prop_arg = stmt.rvalue.args[0]
-
- if isinstance(first_prop_arg, CallExpr):
- type_id = names._type_id_for_callee(first_prop_arg.callee)
- else:
- type_id = None
-
- print(stmt.lvalues[0].name)
-
- # look for column_property() / deferred() etc with Column as first
- # argument
- if type_id is names.COLUMN:
- return _infer_type_from_decl_column(
- api, stmt, node, left_hand_explicit_type, first_prop_arg
- )
- else:
- return _infer_type_from_left_hand_type_only(
- api, node, left_hand_explicit_type
- )
-
-
-def _infer_type_from_decl_column(
- api: SemanticAnalyzerPluginInterface,
- stmt: AssignmentStmt,
- node: Var,
- left_hand_explicit_type: Optional[types.Type],
- right_hand_expression: CallExpr,
-) -> Union[Instance, UnionType, None]:
- """Infer the type of mapping from a Column.
-
- E.g.::
-
- @reg.mapped
- class MyClass:
- # ...
-
- a = Column(Integer)
-
- b = Column("b", String)
-
- c: Mapped[int] = Column(Integer)
-
- d: bool = Column(Boolean)
-
- Will resolve in MyPy as::
-
- @reg.mapped
- class MyClass:
- # ...
-
- a : Mapped[int]
-
- b : Mapped[str]
-
- c: Mapped[int]
-
- d: Mapped[bool]
-
- """
- assert isinstance(node, Var)
-
- callee = None
-
- for column_arg in right_hand_expression.args[0:2]:
- if isinstance(column_arg, nodes.CallExpr):
- # x = Column(String(50))
- callee = column_arg.callee
- type_args = column_arg.args
- break
- elif isinstance(column_arg, (nodes.NameExpr, nodes.MemberExpr)):
- if isinstance(column_arg.node, TypeInfo):
- # x = Column(String)
- callee = column_arg
- type_args = ()
- break
- else:
- # x = Column(some_name, String), go to next argument
- continue
- elif isinstance(column_arg, (StrExpr,)):
- # x = Column("name", String), go to next argument
- continue
- else:
- assert False
-
- if callee is None:
- return None
-
- if isinstance(callee.node, TypeInfo) and names._mro_has_id(
- callee.node.mro, names.TYPEENGINE
- ):
- python_type_for_type = _extract_python_type_from_typeengine(
- api, callee.node, type_args
- )
-
- if left_hand_explicit_type is not None:
-
- return _infer_type_from_left_and_inferred_right(
- api, node, left_hand_explicit_type, python_type_for_type
- )
-
- else:
- python_type_for_type = UnionType(
- [python_type_for_type, NoneType()]
- )
- return python_type_for_type
- else:
- # it's not TypeEngine, it's typically implicitly typed
- # like ForeignKey. we can't infer from the right side.
- return _infer_type_from_left_hand_type_only(
- api, node, left_hand_explicit_type
- )
-
-
-def _infer_type_from_left_and_inferred_right(
- api: SemanticAnalyzerPluginInterface,
- node: Var,
- left_hand_explicit_type: Optional[types.Type],
- python_type_for_type: Union[Instance, UnionType],
-) -> Optional[Union[Instance, UnionType]]:
- """Validate type when a left hand annotation is present and we also
- could infer the right hand side::
-
- attrname: SomeType = Column(SomeDBType)
-
- """
- if not is_subtype(left_hand_explicit_type, python_type_for_type):
- descriptor = api.lookup("__sa_Mapped", node)
-
- effective_type = Instance(descriptor.node, [python_type_for_type])
-
- msg = (
- "Left hand assignment '{}: {}' not compatible "
- "with ORM mapped expression of type {}"
- )
- util.fail(
- api,
- msg.format(
- node.name,
- format_type(left_hand_explicit_type),
- format_type(effective_type),
- ),
- node,
- )
-
- return left_hand_explicit_type
-
-
-def _infer_type_from_left_hand_type_only(
- api: SemanticAnalyzerPluginInterface,
- node: Var,
- left_hand_explicit_type: Optional[types.Type],
-) -> Optional[Union[Instance, UnionType]]:
- """Determine the type based on explicit annotation only.
-
- if no annotation were present, note that we need one there to know
- the type.
-
- """
- if left_hand_explicit_type is None:
- msg = (
- "Can't infer type from ORM mapped expression "
- "assigned to attribute '{}'; please specify a "
- "Python type or "
- "Mapped[<python type>] on the left hand side."
- )
- util.fail(api, msg.format(node.name), node)
-
- descriptor = api.lookup("__sa_Mapped", node)
- return Instance(descriptor.node, [AnyType(TypeOfAny.special_form)])
-
- else:
- # use type from the left hand side
- return left_hand_explicit_type
-
-
-def _re_apply_declarative_assignments(
- cls: ClassDef,
- api: SemanticAnalyzerPluginInterface,
- cls_metadata: DeclClassApplied,
-):
- """For multiple class passes, re-apply our left-hand side types as mypy
- seems to reset them in place.
-
- """
- mapped_attr_lookup = {
- name: typ for name, typ in cls_metadata.mapped_attr_names
- }
-
- descriptor = api.lookup("__sa_Mapped", cls)
- for stmt in cls.defs.body:
- # for a re-apply, all of our statements are AssignmentStmt;
- # @declared_attr calls will have been converted and this
- # currently seems to be preserved by mypy (but who knows if this
- # will change).
- if (
- isinstance(stmt, AssignmentStmt)
- and stmt.lvalues[0].name in mapped_attr_lookup
- ):
- typ = mapped_attr_lookup[stmt.lvalues[0].name]
- left_node = stmt.lvalues[0].node
-
- inst = Instance(descriptor.node, [typ])
- left_node.type = inst
-
-
-def _apply_type_to_mapped_statement(
- api: SemanticAnalyzerPluginInterface,
- stmt: AssignmentStmt,
- lvalue: NameExpr,
- left_hand_explicit_type: Optional[Union[Instance, UnionType]],
- python_type_for_type: Union[Instance, UnionType],
-) -> None:
- """Apply the Mapped[<type>] annotation and right hand object to a
- declarative assignment statement.
-
- This converts a Python declarative class statement such as::
-
- class User(Base):
- # ...
-
- attrname = Column(Integer)
-
- To one that describes the final Python behavior to Mypy::
-
- class User(Base):
- # ...
-
- attrname : Mapped[Optional[int]] = <meaningless temp node>
-
- """
- descriptor = api.lookup("__sa_Mapped", stmt)
- left_node = lvalue.node
-
- inst = Instance(descriptor.node, [python_type_for_type])
-
- if left_hand_explicit_type is not None:
- left_node.type = Instance(descriptor.node, [left_hand_explicit_type])
- else:
- lvalue.is_inferred_def = False
- left_node.type = inst
-
- # so to have it skip the right side totally, we can do this:
- # stmt.rvalue = TempNode(AnyType(TypeOfAny.special_form))
-
- # however, if we instead manufacture a new node that uses the old
- # one, then we can still get type checking for the call itself,
- # e.g. the Column, relationship() call, etc.
-
- # rewrite the node as:
- # <attr> : Mapped[<typ>] =
- # _sa_Mapped._empty_constructor(<original CallExpr from rvalue>)
- # the original right-hand side is maintained so it gets type checked
- # internally
- api.add_symbol_table_node("_sa_Mapped", descriptor)
- column_descriptor = nodes.NameExpr("_sa_Mapped")
- column_descriptor.fullname = "sqlalchemy.orm.Mapped"
- mm = nodes.MemberExpr(column_descriptor, "_empty_constructor")
- orig_call_expr = stmt.rvalue
- stmt.rvalue = CallExpr(
- mm,
- [orig_call_expr],
- [nodes.ARG_POS],
- ["arg1"],
- )
-
-
def _scan_for_mapped_bases(
cls: ClassDef,
api: SemanticAnalyzerPluginInterface,
- cls_metadata: DeclClassApplied,
+ cls_metadata: util.DeclClassApplied,
) -> None:
"""Given a class, iterate through its superclass hierarchy to find
all other classes that are considered as ORM-significant.
@@ -905,99 +466,25 @@ def _scan_for_mapped_bases(
"""
- baseclasses = list(cls.info.bases)
+ info = util._info_for_cls(cls, api)
+
+ baseclasses = list(info.bases)
+
while baseclasses:
base: Instance = baseclasses.pop(0)
+ if base.type.fullname.startswith("builtins"):
+ continue
+
# scan each base for mapped attributes. if they are not already
- # scanned, that means they are unmapped mixins
+ # scanned (but have all their type info), that means they are unmapped
+ # mixins
base_decl_class_applied = (
_scan_declarative_assignments_and_apply_types(
base.type.defn, api, is_mixin_scan=True
)
)
- if base_decl_class_applied is not None:
+
+ if base_decl_class_applied not in (None, False):
cls_metadata.mapped_mro.append(base)
baseclasses.extend(base.type.bases)
-
-
-def _add_additional_orm_attributes(
- cls: ClassDef,
- api: SemanticAnalyzerPluginInterface,
- cls_metadata: DeclClassApplied,
-) -> None:
- """Apply __init__, __table__ and other attributes to the mapped class."""
- if "__init__" not in cls.info.names and cls_metadata.is_mapped:
- mapped_attr_names = {n: t for n, t in cls_metadata.mapped_attr_names}
-
- for mapped_base in cls_metadata.mapped_mro:
- base_cls_metadata = DeclClassApplied.deserialize(
- mapped_base.type.metadata["_sa_decl_class_applied"], api
- )
- for n, t in base_cls_metadata.mapped_attr_names:
- mapped_attr_names.setdefault(n, t)
-
- arguments = []
- for name, typ in mapped_attr_names.items():
- if typ is None:
- typ = AnyType(TypeOfAny.special_form)
- arguments.append(
- Argument(
- variable=Var(name, typ),
- type_annotation=typ,
- initializer=TempNode(typ),
- kind=ARG_NAMED_OPT,
- )
- )
- add_method_to_class(api, cls, "__init__", arguments, NoneTyp())
-
- if "__table__" not in cls.info.names and cls_metadata.has_table:
- _apply_placeholder_attr_to_class(
- api, cls, "sqlalchemy.sql.schema.Table", "__table__"
- )
- if cls_metadata.is_mapped:
- _apply_placeholder_attr_to_class(
- api, cls, "sqlalchemy.orm.mapper.Mapper", "__mapper__"
- )
-
-
-def _apply_placeholder_attr_to_class(
- api: SemanticAnalyzerPluginInterface,
- cls: ClassDef,
- qualified_name: str,
- attrname: str,
-):
- sym = api.lookup_fully_qualified_or_none(qualified_name)
- if sym:
- assert isinstance(sym.node, TypeInfo)
- type_ = Instance(sym.node, [])
- else:
- type_ = AnyType(TypeOfAny.special_form)
- var = Var(attrname)
- var.info = cls.info
- var.type = type_
- cls.info.names[attrname] = SymbolTableNode(MDEF, var)
-
-
-def _extract_python_type_from_typeengine(
- api: SemanticAnalyzerPluginInterface, node: TypeInfo, type_args
-) -> Instance:
- if node.fullname == "sqlalchemy.sql.sqltypes.Enum" and type_args:
- first_arg = type_args[0]
- if isinstance(first_arg, NameExpr) and isinstance(
- first_arg.node, TypeInfo
- ):
- for base_ in first_arg.node.mro:
- if base_.fullname == "enum.Enum":
- return Instance(first_arg.node, [])
- # TODO: support other pep-435 types here
- else:
- n = api.lookup_fully_qualified("builtins.str")
- return Instance(n.node, [])
-
- for mr in node.mro:
- if mr.bases:
- for base_ in mr.bases:
- if base_.type.fullname == "sqlalchemy.sql.type_api.TypeEngine":
- return base_.args[-1]
- assert False, "could not extract Python type from node: %s" % node
diff --git a/lib/sqlalchemy/ext/mypy/infer.py b/lib/sqlalchemy/ext/mypy/infer.py
new file mode 100644
index 000000000..1d77e67d2
--- /dev/null
+++ b/lib/sqlalchemy/ext/mypy/infer.py
@@ -0,0 +1,398 @@
+# ext/mypy/infer.py
+# Copyright (C) 2021 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+from typing import Optional
+from typing import Union
+
+from mypy import nodes
+from mypy import types
+from mypy.messages import format_type
+from mypy.nodes import AssignmentStmt
+from mypy.nodes import CallExpr
+from mypy.nodes import NameExpr
+from mypy.nodes import StrExpr
+from mypy.nodes import TypeInfo
+from mypy.nodes import Var
+from mypy.plugin import SemanticAnalyzerPluginInterface
+from mypy.subtypes import is_subtype
+from mypy.types import AnyType
+from mypy.types import Instance
+from mypy.types import NoneType
+from mypy.types import TypeOfAny
+from mypy.types import UnionType
+
+from . import names
+from . import util
+
+
+def _infer_type_from_relationship(
+ api: SemanticAnalyzerPluginInterface,
+ stmt: AssignmentStmt,
+ node: Var,
+ left_hand_explicit_type: Optional[types.Type],
+) -> Union[Instance, UnionType, None]:
+ """Infer the type of mapping from a relationship.
+
+ E.g.::
+
+ @reg.mapped
+ class MyClass:
+ # ...
+
+ addresses = relationship(Address, uselist=True)
+
+ order: Mapped["Order"] = relationship("Order")
+
+ Will resolve in mypy as::
+
+ @reg.mapped
+ class MyClass:
+ # ...
+
+ addresses: Mapped[List[Address]]
+
+ order: Mapped["Order"]
+
+ """
+
+ assert isinstance(stmt.rvalue, CallExpr)
+ target_cls_arg = stmt.rvalue.args[0]
+ python_type_for_type = None
+
+ if isinstance(target_cls_arg, NameExpr) and isinstance(
+ target_cls_arg.node, TypeInfo
+ ):
+ # type
+ related_object_type = target_cls_arg.node
+ python_type_for_type = Instance(related_object_type, [])
+
+ # other cases not covered - an error message directs the user
+ # to set an explicit type annotation
+ #
+ # node.type == str, it's a string
+ # if isinstance(target_cls_arg, NameExpr) and isinstance(
+ # target_cls_arg.node, Var
+ # )
+ # points to a type
+ # isinstance(target_cls_arg, NameExpr) and isinstance(
+ # target_cls_arg.node, TypeAlias
+ # )
+ # string expression
+ # isinstance(target_cls_arg, StrExpr)
+
+ uselist_arg = util._get_callexpr_kwarg(stmt.rvalue, "uselist")
+ collection_cls_arg = util._get_callexpr_kwarg(
+ stmt.rvalue, "collection_class"
+ )
+
+ # this can be used to determine Optional for a many-to-one
+ # in the same way nullable=False could be used, if we start supporting
+ # that.
+ # innerjoin_arg = _get_callexpr_kwarg(stmt.rvalue, "innerjoin")
+
+ if (
+ uselist_arg is not None
+ and uselist_arg.fullname == "builtins.True"
+ and collection_cls_arg is None
+ ):
+ if python_type_for_type is not None:
+ python_type_for_type = Instance(
+ api.lookup_fully_qualified("builtins.list").node,
+ [python_type_for_type],
+ )
+ elif (
+ uselist_arg is None or uselist_arg.fullname == "builtins.True"
+ ) and collection_cls_arg is not None:
+ if isinstance(collection_cls_arg.node, TypeInfo):
+ if python_type_for_type is not None:
+ python_type_for_type = Instance(
+ collection_cls_arg.node, [python_type_for_type]
+ )
+ else:
+ util.fail(
+ api,
+ "Expected Python collection type for "
+ "collection_class parameter",
+ stmt.rvalue,
+ )
+ python_type_for_type = None
+ elif uselist_arg is not None and uselist_arg.fullname == "builtins.False":
+ if collection_cls_arg is not None:
+ util.fail(
+ api,
+ "Sending uselist=False and collection_class at the same time "
+ "does not make sense",
+ stmt.rvalue,
+ )
+ if python_type_for_type is not None:
+ python_type_for_type = UnionType(
+ [python_type_for_type, NoneType()]
+ )
+
+ else:
+ if left_hand_explicit_type is None:
+ msg = (
+ "Can't infer scalar or collection for ORM mapped expression "
+ "assigned to attribute '{}' if both 'uselist' and "
+ "'collection_class' arguments are absent from the "
+ "relationship(); please specify a "
+ "type annotation on the left hand side."
+ )
+ util.fail(api, msg.format(node.name), node)
+
+ if python_type_for_type is None:
+ return _infer_type_from_left_hand_type_only(
+ api, node, left_hand_explicit_type
+ )
+ elif left_hand_explicit_type is not None:
+ return _infer_type_from_left_and_inferred_right(
+ api, node, left_hand_explicit_type, python_type_for_type
+ )
+ else:
+ return python_type_for_type
+
+
+def _infer_type_from_decl_composite_property(
+ api: SemanticAnalyzerPluginInterface,
+ stmt: AssignmentStmt,
+ node: Var,
+ left_hand_explicit_type: Optional[types.Type],
+) -> Union[Instance, UnionType, None]:
+ """Infer the type of mapping from a CompositeProperty."""
+
+ assert isinstance(stmt.rvalue, CallExpr)
+ target_cls_arg = stmt.rvalue.args[0]
+ python_type_for_type = None
+
+ if isinstance(target_cls_arg, NameExpr) and isinstance(
+ target_cls_arg.node, TypeInfo
+ ):
+ related_object_type = target_cls_arg.node
+ python_type_for_type = Instance(related_object_type, [])
+ else:
+ python_type_for_type = None
+
+ if python_type_for_type is None:
+ return _infer_type_from_left_hand_type_only(
+ api, node, left_hand_explicit_type
+ )
+ elif left_hand_explicit_type is not None:
+ return _infer_type_from_left_and_inferred_right(
+ api, node, left_hand_explicit_type, python_type_for_type
+ )
+ else:
+ return python_type_for_type
+
+
+def _infer_type_from_decl_column_property(
+ api: SemanticAnalyzerPluginInterface,
+ stmt: AssignmentStmt,
+ node: Var,
+ left_hand_explicit_type: Optional[types.Type],
+) -> Union[Instance, UnionType, None]:
+ """Infer the type of mapping from a ColumnProperty.
+
+ This includes mappings against ``column_property()`` as well as the
+ ``deferred()`` function.
+
+ """
+ assert isinstance(stmt.rvalue, CallExpr)
+ first_prop_arg = stmt.rvalue.args[0]
+
+ if isinstance(first_prop_arg, CallExpr):
+ type_id = names._type_id_for_callee(first_prop_arg.callee)
+ else:
+ type_id = None
+
+ # look for column_property() / deferred() etc with Column as first
+ # argument
+ if type_id is names.COLUMN:
+ return _infer_type_from_decl_column(
+ api, stmt, node, left_hand_explicit_type, first_prop_arg
+ )
+ else:
+ return _infer_type_from_left_hand_type_only(
+ api, node, left_hand_explicit_type
+ )
+
+
+def _infer_type_from_decl_column(
+ api: SemanticAnalyzerPluginInterface,
+ stmt: AssignmentStmt,
+ node: Var,
+ left_hand_explicit_type: Optional[types.Type],
+ right_hand_expression: CallExpr,
+) -> Union[Instance, UnionType, None]:
+ """Infer the type of mapping from a Column.
+
+ E.g.::
+
+ @reg.mapped
+ class MyClass:
+ # ...
+
+ a = Column(Integer)
+
+ b = Column("b", String)
+
+ c: Mapped[int] = Column(Integer)
+
+ d: bool = Column(Boolean)
+
+ Will resolve in MyPy as::
+
+ @reg.mapped
+ class MyClass:
+ # ...
+
+ a : Mapped[int]
+
+ b : Mapped[str]
+
+ c: Mapped[int]
+
+ d: Mapped[bool]
+
+ """
+ assert isinstance(node, Var)
+
+ callee = None
+
+ for column_arg in right_hand_expression.args[0:2]:
+ if isinstance(column_arg, nodes.CallExpr):
+ # x = Column(String(50))
+ callee = column_arg.callee
+ type_args = column_arg.args
+ break
+ elif isinstance(column_arg, (nodes.NameExpr, nodes.MemberExpr)):
+ if isinstance(column_arg.node, TypeInfo):
+ # x = Column(String)
+ callee = column_arg
+ type_args = ()
+ break
+ else:
+ # x = Column(some_name, String), go to next argument
+ continue
+ elif isinstance(column_arg, (StrExpr,)):
+ # x = Column("name", String), go to next argument
+ continue
+ else:
+ assert False
+
+ if callee is None:
+ return None
+
+ if isinstance(callee.node, TypeInfo) and names._mro_has_id(
+ callee.node.mro, names.TYPEENGINE
+ ):
+ python_type_for_type = _extract_python_type_from_typeengine(
+ api, callee.node, type_args
+ )
+
+ if left_hand_explicit_type is not None:
+
+ return _infer_type_from_left_and_inferred_right(
+ api, node, left_hand_explicit_type, python_type_for_type
+ )
+
+ else:
+ python_type_for_type = UnionType(
+ [python_type_for_type, NoneType()]
+ )
+ return python_type_for_type
+ else:
+ # it's not TypeEngine, it's typically implicitly typed
+ # like ForeignKey. we can't infer from the right side.
+ return _infer_type_from_left_hand_type_only(
+ api, node, left_hand_explicit_type
+ )
+
+
+def _infer_type_from_left_and_inferred_right(
+ api: SemanticAnalyzerPluginInterface,
+ node: Var,
+ left_hand_explicit_type: Optional[types.Type],
+ python_type_for_type: Union[Instance, UnionType],
+) -> Optional[Union[Instance, UnionType]]:
+ """Validate type when a left hand annotation is present and we also
+ could infer the right hand side::
+
+ attrname: SomeType = Column(SomeDBType)
+
+ """
+ if not is_subtype(left_hand_explicit_type, python_type_for_type):
+ descriptor = api.lookup("__sa_Mapped", node)
+
+ effective_type = Instance(descriptor.node, [python_type_for_type])
+
+ msg = (
+ "Left hand assignment '{}: {}' not compatible "
+ "with ORM mapped expression of type {}"
+ )
+ util.fail(
+ api,
+ msg.format(
+ node.name,
+ format_type(left_hand_explicit_type),
+ format_type(effective_type),
+ ),
+ node,
+ )
+
+ return left_hand_explicit_type
+
+
+def _infer_type_from_left_hand_type_only(
+ api: SemanticAnalyzerPluginInterface,
+ node: Var,
+ left_hand_explicit_type: Optional[types.Type],
+) -> Optional[Union[Instance, UnionType]]:
+ """Determine the type based on explicit annotation only.
+
+ if no annotation were present, note that we need one there to know
+ the type.
+
+ """
+ if left_hand_explicit_type is None:
+ msg = (
+ "Can't infer type from ORM mapped expression "
+ "assigned to attribute '{}'; please specify a "
+ "Python type or "
+ "Mapped[<python type>] on the left hand side."
+ )
+ util.fail(api, msg.format(node.name), node)
+
+ descriptor = api.lookup("__sa_Mapped", node)
+ return Instance(descriptor.node, [AnyType(TypeOfAny.special_form)])
+
+ else:
+ # use type from the left hand side
+ return left_hand_explicit_type
+
+
+def _extract_python_type_from_typeengine(
+ api: SemanticAnalyzerPluginInterface, node: TypeInfo, type_args
+) -> Instance:
+ if node.fullname == "sqlalchemy.sql.sqltypes.Enum" and type_args:
+ first_arg = type_args[0]
+ if isinstance(first_arg, NameExpr) and isinstance(
+ first_arg.node, TypeInfo
+ ):
+ for base_ in first_arg.node.mro:
+ if base_.fullname == "enum.Enum":
+ return Instance(first_arg.node, [])
+ # TODO: support other pep-435 types here
+ else:
+ n = api.lookup_fully_qualified("builtins.str")
+ return Instance(n.node, [])
+
+ for mr in node.mro:
+ if mr.bases:
+ for base_ in mr.bases:
+ if base_.type.fullname == "sqlalchemy.sql.type_api.TypeEngine":
+ return base_.args[-1]
+ assert False, "could not extract Python type from node: %s" % node
diff --git a/lib/sqlalchemy/ext/mypy/names.py b/lib/sqlalchemy/ext/mypy/names.py
index d1fd77415..11208f3c7 100644
--- a/lib/sqlalchemy/ext/mypy/names.py
+++ b/lib/sqlalchemy/ext/mypy/names.py
@@ -36,6 +36,7 @@ DECLARED_ATTR = util.symbol("DECLARED_ATTR")
MAPPER_PROPERTY = util.symbol("MAPPER_PROPERTY")
AS_DECLARATIVE = util.symbol("AS_DECLARATIVE")
AS_DECLARATIVE_BASE = util.symbol("AS_DECLARATIVE_BASE")
+DECLARATIVE_MIXIN = util.symbol("DECLARATIVE_MIXIN")
_lookup = {
"Column": (
@@ -134,6 +135,13 @@ _lookup = {
"sqlalchemy.orm.declared_attr",
},
),
+ "declarative_mixin": (
+ DECLARATIVE_MIXIN,
+ {
+ "sqlalchemy.orm.decl_api.declarative_mixin",
+ "sqlalchemy.orm.declarative_mixin",
+ },
+ ),
}
diff --git a/lib/sqlalchemy/ext/mypy/plugin.py b/lib/sqlalchemy/ext/mypy/plugin.py
index 9ca1cb2da..a0aa5bf04 100644
--- a/lib/sqlalchemy/ext/mypy/plugin.py
+++ b/lib/sqlalchemy/ext/mypy/plugin.py
@@ -55,6 +55,7 @@ class CustomPlugin(Plugin):
# subclasses. but then you can just check it here from the "base"
# and get the same effect.
sym = self.lookup_fully_qualified(fullname)
+
if (
sym
and isinstance(sym.node, TypeInfo)
@@ -70,17 +71,18 @@ class CustomPlugin(Plugin):
) -> Optional[Callable[[ClassDefContext], None]]:
sym = self.lookup_fully_qualified(fullname)
- if (
- sym is not None
- and names._type_id_for_named_node(sym.node)
- is names.MAPPED_DECORATOR
- ):
- return _cls_decorator_hook
- elif sym is not None and names._type_id_for_named_node(sym.node) in (
- names.AS_DECLARATIVE,
- names.AS_DECLARATIVE_BASE,
- ):
- return _base_cls_decorator_hook
+
+ if sym is not None:
+ type_id = names._type_id_for_named_node(sym.node)
+ if type_id is names.MAPPED_DECORATOR:
+ return _cls_decorator_hook
+ elif type_id in (
+ names.AS_DECLARATIVE,
+ names.AS_DECLARATIVE_BASE,
+ ):
+ return _base_cls_decorator_hook
+ elif type_id is names.DECLARATIVE_MIXIN:
+ return _declarative_mixin_hook
return None
@@ -192,6 +194,13 @@ def _base_cls_hook(ctx: ClassDefContext) -> None:
decl_class._scan_declarative_assignments_and_apply_types(ctx.cls, ctx.api)
+def _declarative_mixin_hook(ctx: ClassDefContext) -> None:
+ _add_globals(ctx)
+ decl_class._scan_declarative_assignments_and_apply_types(
+ ctx.cls, ctx.api, is_mixin_scan=True
+ )
+
+
def _cls_decorator_hook(ctx: ClassDefContext) -> None:
_add_globals(ctx)
assert isinstance(ctx.reason, nodes.MemberExpr)
diff --git a/lib/sqlalchemy/ext/mypy/util.py b/lib/sqlalchemy/ext/mypy/util.py
index 7079f3cd7..becce3ebe 100644
--- a/lib/sqlalchemy/ext/mypy/util.py
+++ b/lib/sqlalchemy/ext/mypy/util.py
@@ -1,18 +1,67 @@
from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import Type
from mypy.nodes import CallExpr
+from mypy.nodes import CLASSDEF_NO_INFO
from mypy.nodes import Context
from mypy.nodes import IfStmt
+from mypy.nodes import JsonDict
from mypy.nodes import NameExpr
from mypy.nodes import SymbolTableNode
+from mypy.nodes import TypeInfo
from mypy.plugin import SemanticAnalyzerPluginInterface
+from mypy.plugins.common import deserialize_and_fixup_type
from mypy.types import Instance
from mypy.types import NoneType
-from mypy.types import Type
from mypy.types import UnboundType
from mypy.types import UnionType
+class DeclClassApplied:
+ def __init__(
+ self,
+ is_mapped: bool,
+ has_table: bool,
+ mapped_attr_names: Sequence[Tuple[str, Type]],
+ mapped_mro: Sequence[Type],
+ ):
+ self.is_mapped = is_mapped
+ self.has_table = has_table
+ self.mapped_attr_names = mapped_attr_names
+ self.mapped_mro = mapped_mro
+
+ def serialize(self) -> JsonDict:
+ return {
+ "is_mapped": self.is_mapped,
+ "has_table": self.has_table,
+ "mapped_attr_names": [
+ (name, type_.serialize())
+ for name, type_ in self.mapped_attr_names
+ ],
+ "mapped_mro": [type_.serialize() for type_ in self.mapped_mro],
+ }
+
+ @classmethod
+ def deserialize(
+ cls, data: JsonDict, api: SemanticAnalyzerPluginInterface
+ ) -> "DeclClassApplied":
+
+ return DeclClassApplied(
+ is_mapped=data["is_mapped"],
+ has_table=data["has_table"],
+ mapped_attr_names=[
+ (name, deserialize_and_fixup_type(type_, api))
+ for name, type_ in data["mapped_attr_names"]
+ ],
+ mapped_mro=[
+ deserialize_and_fixup_type(type_, api)
+ for type_ in data["mapped_mro"]
+ ],
+ )
+
+
def fail(api: SemanticAnalyzerPluginInterface, msg: str, ctx: Context):
msg = "[SQLAlchemy Mypy plugin] %s" % msg
return api.fail(msg, ctx)
@@ -94,3 +143,14 @@ def _unbound_to_instance(
)
else:
return typ
+
+
+def _info_for_cls(cls, api):
+ if cls.info is CLASSDEF_NO_INFO:
+ sym = api.lookup(cls.name, cls)
+ if sym.node and isinstance(sym.node, TypeInfo):
+ info = sym.node
+ else:
+ info = cls.info
+
+ return info
diff --git a/lib/sqlalchemy/orm/__init__.py b/lib/sqlalchemy/orm/__init__.py
index 025d826e3..66c3e7e33 100644
--- a/lib/sqlalchemy/orm/__init__.py
+++ b/lib/sqlalchemy/orm/__init__.py
@@ -23,6 +23,7 @@ from .attributes import QueryableAttribute
from .context import QueryContext
from .decl_api import as_declarative
from .decl_api import declarative_base
+from .decl_api import declarative_mixin
from .decl_api import DeclarativeMeta
from .decl_api import declared_attr
from .decl_api import has_inherited_table
diff --git a/lib/sqlalchemy/orm/decl_api.py b/lib/sqlalchemy/orm/decl_api.py
index ef53e2d39..d9c464815 100644
--- a/lib/sqlalchemy/orm/decl_api.py
+++ b/lib/sqlalchemy/orm/decl_api.py
@@ -321,6 +321,48 @@ class _stateful_declared_attr(declared_attr):
return declared_attr(fn, **self.kw)
+def declarative_mixin(cls):
+ """Mark a class as providing the feature of "declarative mixin".
+
+ E.g.::
+
+ from sqlalchemy.orm import declared_attr
+ from sqlalchemy.orm import declarative_mixin
+
+ @declarative_mixin
+ class MyMixin:
+
+ @declared_attr
+ def __tablename__(cls):
+ return cls.__name__.lower()
+
+ __table_args__ = {'mysql_engine': 'InnoDB'}
+ __mapper_args__= {'always_refresh': True}
+
+ id = Column(Integer, primary_key=True)
+
+ class MyModel(MyMixin, Base):
+ name = Column(String(1000))
+
+ The :func:`_orm.declarative_mixin` decorator currently does not modify
+ the given class in any way; it's current purpose is strictly to assist
+ the :ref:`Mypy plugin <mypy_toplevel>` in being able to identify
+ SQLAlchemy declarative mixin classes when no other context is present.
+
+ .. versionadded:: 1.4.6
+
+ .. seealso::
+
+ :ref:`orm_mixins_toplevel`
+
+ :ref:`mypy_declarative_mixins` - in the
+ :ref:`Mypy plugin documentation <mypy_toplevel>`
+
+ """ # noqa: E501
+
+ return cls
+
+
def declarative_base(
bind=None,
metadata=None,