summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnthony Sottile <asottile@umich.edu>2021-03-29 19:48:44 -0700
committerAnthony Sottile <asottile@umich.edu>2021-03-30 17:37:13 -0700
commitcb36e206a5ed5610a2f94f9893bbfd75036dc0ad (patch)
treee1116f51f3597547bfdafe1c254689d9916574ec
parent64a610ed19beb3a9b50c7ae83d1ae861fc6613ef (diff)
downloadflake8-cb36e206a5ed5610a2f94f9893bbfd75036dc0ad.tar.gz
com2ann
-rw-r--r--src/flake8/__init__.py2
-rw-r--r--src/flake8/api/legacy.py2
-rw-r--r--src/flake8/checker.py63
-rw-r--r--src/flake8/exceptions.py19
-rw-r--r--src/flake8/formatting/base.py31
-rw-r--r--src/flake8/formatting/default.py18
-rw-r--r--src/flake8/main/application.py67
-rw-r--r--src/flake8/main/cli.py3
-rw-r--r--src/flake8/main/debug.py2
-rw-r--r--src/flake8/main/options.py5
-rw-r--r--src/flake8/options/aggregator.py8
-rw-r--r--src/flake8/options/config.py24
-rw-r--r--src/flake8/options/manager.py151
-rw-r--r--src/flake8/plugins/manager.py23
-rw-r--r--src/flake8/plugins/pyflakes.py6
-rw-r--r--src/flake8/processor.py101
-rw-r--r--src/flake8/statistics.py26
-rw-r--r--src/flake8/style_guide.py127
-rw-r--r--src/flake8/utils.py72
19 files changed, 361 insertions, 389 deletions
diff --git a/src/flake8/__init__.py b/src/flake8/__init__.py
index a6f2b62..7b39e56 100644
--- a/src/flake8/__init__.py
+++ b/src/flake8/__init__.py
@@ -62,7 +62,7 @@ def configure_logging(verbosity, filename=None, logformat=LOG_FORMAT):
if not filename or filename in ("stderr", "stdout"):
fileobj = getattr(sys, filename or "stderr")
- handler_cls = logging.StreamHandler # type: Type[logging.Handler]
+ handler_cls: Type[logging.Handler] = logging.StreamHandler
else:
fileobj = filename
handler_cls = logging.FileHandler
diff --git a/src/flake8/api/legacy.py b/src/flake8/api/legacy.py
index f046b5c..f80cb3d 100644
--- a/src/flake8/api/legacy.py
+++ b/src/flake8/api/legacy.py
@@ -81,7 +81,7 @@ class StyleGuide:
self._file_checker_manager = application.file_checker_manager
@property
- def options(self): # type: () -> argparse.Namespace
+ def options(self) -> argparse.Namespace:
"""Return application's options.
An instance of :class:`argparse.Namespace` containing parsed options.
diff --git a/src/flake8/checker.py b/src/flake8/checker.py
index 15ca9bc..ceff965 100644
--- a/src/flake8/checker.py
+++ b/src/flake8/checker.py
@@ -85,8 +85,8 @@ class Manager:
self.options = style_guide.options
self.checks = checker_plugins
self.jobs = self._job_count()
- self._all_checkers = [] # type: List[FileChecker]
- self.checkers = [] # type: List[FileChecker]
+ self._all_checkers: List[FileChecker] = []
+ self.checkers: List[FileChecker] = []
self.statistics = {
"files": 0,
"logical lines": 0,
@@ -103,8 +103,7 @@ class Manager:
self.statistics[statistic] += checker.statistics[statistic]
self.statistics["files"] += len(self.checkers)
- def _job_count(self):
- # type: () -> int
+ def _job_count(self) -> int:
# First we walk through all of our error cases:
# - multiprocessing library is not present
# - we're running on windows in which case we know we have significant
@@ -165,8 +164,7 @@ class Manager:
)
return reported_results_count
- def is_path_excluded(self, path):
- # type: (str) -> bool
+ def is_path_excluded(self, path: str) -> bool:
"""Check if a path is excluded.
:param str path:
@@ -189,8 +187,7 @@ class Manager:
logger=LOG,
)
- def make_checkers(self, paths=None):
- # type: (Optional[List[str]]) -> None
+ def make_checkers(self, paths: Optional[List[str]] = None) -> None:
"""Create checkers for each file."""
if paths is None:
paths = self.arguments
@@ -235,8 +232,7 @@ class Manager:
self.checkers = [c for c in self._all_checkers if c.should_process]
LOG.info("Checking %d files", len(self.checkers))
- def report(self):
- # type: () -> Tuple[int, int]
+ def report(self) -> Tuple[int, int]:
"""Report all of the errors found in the managed file checkers.
This iterates over each of the checkers and reports the errors sorted
@@ -258,11 +254,11 @@ class Manager:
results_found += len(results)
return (results_found, results_reported)
- def run_parallel(self): # type: () -> None
+ def run_parallel(self) -> None:
"""Run the checkers in parallel."""
# fmt: off
- final_results = collections.defaultdict(list) # type: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] # noqa: E501
- final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, int]] # noqa: E501
+ final_results: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] = collections.defaultdict(list) # noqa: E501
+ final_statistics: Dict[str, Dict[str, int]] = collections.defaultdict(dict) # noqa: E501
# fmt: on
pool = _try_initialize_processpool(self.jobs)
@@ -297,12 +293,12 @@ class Manager:
checker.results = final_results[filename]
checker.statistics = final_statistics[filename]
- def run_serial(self): # type: () -> None
+ def run_serial(self) -> None:
"""Run the checkers in serial."""
for checker in self.checkers:
checker.run_checks()
- def run(self): # type: () -> None
+ def run(self) -> None:
"""Run all the checkers.
This will intelligently decide whether to run the checks in parallel
@@ -356,9 +352,7 @@ class FileChecker:
self.options = options
self.filename = filename
self.checks = checks
- # fmt: off
- self.results = [] # type: List[Tuple[str, int, int, str, Optional[str]]] # noqa: E501
- # fmt: on
+ self.results: List[Tuple[str, int, int, str, Optional[str]]] = []
self.statistics = {
"tokens": 0,
"logical lines": 0,
@@ -372,12 +366,11 @@ class FileChecker:
self.should_process = not self.processor.should_ignore_file()
self.statistics["physical lines"] = len(self.processor.lines)
- def __repr__(self): # type: () -> str
+ def __repr__(self) -> str:
"""Provide helpful debugging representation."""
return f"FileChecker for {self.filename}"
- def _make_processor(self):
- # type: () -> Optional[processor.FileProcessor]
+ def _make_processor(self) -> Optional[processor.FileProcessor]:
try:
return processor.FileProcessor(self.filename, self.options)
except OSError as e:
@@ -391,8 +384,13 @@ class FileChecker:
self.report("E902", 0, 0, message)
return None
- def report(self, error_code, line_number, column, text):
- # type: (Optional[str], int, int, str) -> str
+ def report(
+ self,
+ error_code: Optional[str],
+ line_number: int,
+ column: int,
+ text: str,
+ ) -> str:
"""Report an error by storing it in the results list."""
if error_code is None:
error_code, text = text.split(" ", 1)
@@ -468,7 +466,7 @@ class FileChecker:
column -= column_offset
return row, column
- def run_ast_checks(self): # type: () -> None
+ def run_ast_checks(self) -> None:
"""Run all checks expecting an abstract syntax tree."""
try:
ast = self.processor.build_ast()
@@ -610,8 +608,9 @@ class FileChecker:
else:
self.run_logical_checks()
- def check_physical_eol(self, token, prev_physical):
- # type: (processor._Token, str) -> None
+ def check_physical_eol(
+ self, token: processor._Token, prev_physical: str
+ ) -> None:
"""Run physical checks if and only if it is at the end of the line."""
# a newline token ends a single physical line.
if processor.is_eol_token(token):
@@ -640,13 +639,14 @@ class FileChecker:
self.run_physical_checks(line + "\n")
-def _pool_init(): # type: () -> None
+def _pool_init() -> None:
"""Ensure correct signaling of ^C using multiprocessing.Pool."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
-def _try_initialize_processpool(job_count):
- # type: (int) -> Optional[multiprocessing.pool.Pool]
+def _try_initialize_processpool(
+ job_count: int,
+) -> Optional[multiprocessing.pool.Pool]:
"""Return a new process pool instance if we are able to create one."""
try:
return multiprocessing.Pool(job_count, _pool_init)
@@ -675,8 +675,9 @@ def _run_checks(checker):
return checker.run_checks()
-def find_offset(offset, mapping):
- # type: (int, processor._LogicalMapping) -> Tuple[int, int]
+def find_offset(
+ offset: int, mapping: processor._LogicalMapping
+) -> Tuple[int, int]:
"""Find the offset tuple for a single offset."""
if isinstance(offset, tuple):
return offset
diff --git a/src/flake8/exceptions.py b/src/flake8/exceptions.py
index bcc29de..caa4677 100644
--- a/src/flake8/exceptions.py
+++ b/src/flake8/exceptions.py
@@ -19,14 +19,13 @@ class FailedToLoadPlugin(Flake8Exception):
FORMAT = 'Flake8 failed to load plugin "%(name)s" due to %(exc)s.'
- def __init__(self, plugin_name, exception):
- # type: (str, Exception) -> None
+ def __init__(self, plugin_name: str, exception: Exception) -> None:
"""Initialize our FailedToLoadPlugin exception."""
self.plugin_name = plugin_name
self.original_exception = exception
super().__init__(plugin_name, exception)
- def __str__(self): # type: () -> str
+ def __str__(self) -> str:
"""Format our exception message."""
return self.FORMAT % {
"name": self.plugin_name,
@@ -37,7 +36,7 @@ class FailedToLoadPlugin(Flake8Exception):
class InvalidSyntax(Flake8Exception):
"""Exception raised when tokenizing a file fails."""
- def __init__(self, exception): # type: (Exception) -> None
+ def __init__(self, exception: Exception) -> None:
"""Initialize our InvalidSyntax exception."""
self.original_exception = exception
self.error_message = "{}: {}".format(
@@ -48,7 +47,7 @@ class InvalidSyntax(Flake8Exception):
self.column_number = 0
super().__init__(exception)
- def __str__(self): # type: () -> str
+ def __str__(self) -> str:
"""Format our exception message."""
return self.error_message
@@ -58,14 +57,13 @@ class PluginRequestedUnknownParameters(Flake8Exception):
FORMAT = '"%(name)s" requested unknown parameters causing %(exc)s'
- def __init__(self, plugin, exception):
- # type: (Dict[str, str], Exception) -> None
+ def __init__(self, plugin: Dict[str, str], exception: Exception) -> None:
"""Pop certain keyword arguments for initialization."""
self.plugin = plugin
self.original_exception = exception
super().__init__(plugin, exception)
- def __str__(self): # type: () -> str
+ def __str__(self) -> str:
"""Format our exception message."""
return self.FORMAT % {
"name": self.plugin["plugin_name"],
@@ -78,14 +76,13 @@ class PluginExecutionFailed(Flake8Exception):
FORMAT = '"%(name)s" failed during execution due to "%(exc)s"'
- def __init__(self, plugin, exception):
- # type: (Dict[str, str], Exception) -> None
+ def __init__(self, plugin: Dict[str, str], exception: Exception) -> None:
"""Utilize keyword arguments for message generation."""
self.plugin = plugin
self.original_exception = exception
super().__init__(plugin, exception)
- def __str__(self): # type: () -> str
+ def __str__(self) -> str:
"""Format our exception message."""
return self.FORMAT % {
"name": self.plugin["plugin_name"],
diff --git a/src/flake8/formatting/base.py b/src/flake8/formatting/base.py
index d98b290..81897b2 100644
--- a/src/flake8/formatting/base.py
+++ b/src/flake8/formatting/base.py
@@ -33,8 +33,7 @@ class BaseFormatter:
output filename has been specified.
"""
- def __init__(self, options):
- # type: (argparse.Namespace) -> None
+ def __init__(self, options: argparse.Namespace) -> None:
"""Initialize with the options parsed from config and cli.
This also calls a hook, :meth:`after_init`, so subclasses do not need
@@ -48,14 +47,14 @@ class BaseFormatter:
"""
self.options = options
self.filename = options.output_file
- self.output_fd = None # type: Optional[IO[str]]
+ self.output_fd: Optional[IO[str]] = None
self.newline = "\n"
self.after_init()
- def after_init(self): # type: () -> None
+ def after_init(self) -> None:
"""Initialize the formatter further."""
- def beginning(self, filename): # type: (str) -> None
+ def beginning(self, filename: str) -> None:
"""Notify the formatter that we're starting to process a file.
:param str filename:
@@ -63,7 +62,7 @@ class BaseFormatter:
from.
"""
- def finished(self, filename): # type: (str) -> None
+ def finished(self, filename: str) -> None:
"""Notify the formatter that we've finished processing a file.
:param str filename:
@@ -71,7 +70,7 @@ class BaseFormatter:
from.
"""
- def start(self): # type: () -> None
+ def start(self) -> None:
"""Prepare the formatter to receive input.
This defaults to initializing :attr:`output_fd` if :attr:`filename`
@@ -79,7 +78,7 @@ class BaseFormatter:
if self.filename:
self.output_fd = open(self.filename, "a")
- def handle(self, error): # type: (Violation) -> None
+ def handle(self, error: "Violation") -> None:
"""Handle an error reported by Flake8.
This defaults to calling :meth:`format`, :meth:`show_source`, and
@@ -96,7 +95,7 @@ class BaseFormatter:
source = self.show_source(error)
self.write(line, source)
- def format(self, error): # type: (Violation) -> Optional[str]
+ def format(self, error: "Violation") -> Optional[str]:
"""Format an error reported by Flake8.
This method **must** be implemented by subclasses.
@@ -115,7 +114,7 @@ class BaseFormatter:
"Subclass of BaseFormatter did not implement" " format."
)
- def show_statistics(self, statistics): # type: (Statistics) -> None
+ def show_statistics(self, statistics: "Statistics") -> None:
"""Format and print the statistics."""
for error_code in statistics.error_codes():
stats_for_error_code = statistics.statistics_for(error_code)
@@ -130,8 +129,7 @@ class BaseFormatter:
)
)
- def show_benchmarks(self, benchmarks):
- # type: (List[Tuple[str, float]]) -> None
+ def show_benchmarks(self, benchmarks: List[Tuple[str, float]]) -> None:
"""Format and print the benchmarks."""
# NOTE(sigmavirus24): The format strings are a little confusing, even
# to me, so here's a quick explanation:
@@ -152,7 +150,7 @@ class BaseFormatter:
benchmark = float_format(statistic=statistic, value=value)
self._write(benchmark)
- def show_source(self, error): # type: (Violation) -> Optional[str]
+ def show_source(self, error: "Violation") -> Optional[str]:
"""Show the physical line generating the error.
This also adds an indicator for the particular part of the line that
@@ -183,15 +181,14 @@ class BaseFormatter:
# one
return f"{error.physical_line}{indent}^"
- def _write(self, output): # type: (str) -> None
+ def _write(self, output: str) -> None:
"""Handle logic of whether to use an output file or print()."""
if self.output_fd is not None:
self.output_fd.write(output + self.newline)
if self.output_fd is None or self.options.tee:
print(output, end=self.newline)
- def write(self, line, source):
- # type: (Optional[str], Optional[str]) -> None
+ def write(self, line: Optional[str], source: Optional[str]) -> None:
"""Write the line either to the output file or stdout.
This handles deciding whether to write to a file or print to standard
@@ -209,7 +206,7 @@ class BaseFormatter:
if source:
self._write(source)
- def stop(self): # type: () -> None
+ def stop(self) -> None:
"""Clean up after reporting is finished."""
if self.output_fd is not None:
self.output_fd.close()
diff --git a/src/flake8/formatting/default.py b/src/flake8/formatting/default.py
index dc78254..0a8e09d 100644
--- a/src/flake8/formatting/default.py
+++ b/src/flake8/formatting/default.py
@@ -25,9 +25,9 @@ class SimpleFormatter(base.BaseFormatter):
"""
- error_format = None # type: str
+ error_format: str
- def format(self, error): # type: (Violation) -> Optional[str]
+ def format(self, error: "Violation") -> Optional[str]:
"""Format and write error out.
If an output filename is specified, write formatted errors to that
@@ -51,7 +51,7 @@ class Default(SimpleFormatter):
error_format = "%(path)s:%(row)d:%(col)d: %(code)s %(text)s"
- def after_init(self): # type: () -> None
+ def after_init(self) -> None:
"""Check for a custom format string."""
if self.options.format.lower() != "default":
self.error_format = self.options.format
@@ -68,14 +68,14 @@ class FilenameOnly(SimpleFormatter):
error_format = "%(path)s"
- def after_init(self): # type: () -> None
+ def after_init(self) -> None:
"""Initialize our set of filenames."""
- self.filenames_already_printed = set() # type: Set[str]
+ self.filenames_already_printed: Set[str] = set()
- def show_source(self, error): # type: (Violation) -> Optional[str]
+ def show_source(self, error: "Violation") -> Optional[str]:
"""Do not include the source code."""
- def format(self, error): # type: (Violation) -> Optional[str]
+ def format(self, error: "Violation") -> Optional[str]:
"""Ensure we only print each error once."""
if error.filename not in self.filenames_already_printed:
self.filenames_already_printed.add(error.filename)
@@ -87,8 +87,8 @@ class FilenameOnly(SimpleFormatter):
class Nothing(base.BaseFormatter):
"""Print absolutely nothing."""
- def format(self, error): # type: (Violation) -> Optional[str]
+ def format(self, error: "Violation") -> Optional[str]:
"""Do nothing."""
- def show_source(self, error): # type: (Violation) -> Optional[str]
+ def show_source(self, error: "Violation") -> Optional[str]:
"""Do not print the source."""
diff --git a/src/flake8/main/application.py b/src/flake8/main/application.py
index fb23db5..e3be8c4 100644
--- a/src/flake8/main/application.py
+++ b/src/flake8/main/application.py
@@ -44,7 +44,7 @@ class Application:
#: The timestamp when the Application instance was instantiated.
self.start_time = time.time()
#: The timestamp when the Application finished reported errors.
- self.end_time = None # type: float
+ self.end_time: float = None
#: The name of the program being run
self.program = program
#: The version of the program being run
@@ -63,26 +63,24 @@ class Application:
options.register_default_options(self.option_manager)
#: The instance of :class:`flake8.plugins.manager.Checkers`
- self.check_plugins = None # type: plugin_manager.Checkers
- # fmt: off
+ self.check_plugins: plugin_manager.Checkers = None
#: The instance of :class:`flake8.plugins.manager.ReportFormatters`
- self.formatting_plugins = None # type: plugin_manager.ReportFormatters
- # fmt: on
+ self.formatting_plugins: plugin_manager.ReportFormatters = None
#: The user-selected formatter from :attr:`formatting_plugins`
- self.formatter = None # type: BaseFormatter
+ self.formatter: BaseFormatter = None
#: The :class:`flake8.style_guide.StyleGuideManager` built from the
#: user's options
- self.guide = None # type: style_guide.StyleGuideManager
+ self.guide: style_guide.StyleGuideManager = None
#: The :class:`flake8.checker.Manager` that will handle running all of
#: the checks selected by the user.
- self.file_checker_manager = None # type: checker.Manager
+ self.file_checker_manager: checker.Manager = None
#: The user-supplied options parsed into an instance of
#: :class:`argparse.Namespace`
- self.options = None # type: argparse.Namespace
+ self.options: argparse.Namespace = None
#: The left over arguments that were not parsed by
#: :attr:`option_manager`
- self.args = None # type: List[str]
+ self.args: List[str] = None
#: The number of errors, warnings, and other messages after running
#: flake8 and taking into account ignored errors and lines.
self.result_count = 0
@@ -96,10 +94,11 @@ class Application:
#: Whether the program is processing a diff or not
self.running_against_diff = False
#: The parsed diff information
- self.parsed_diff = {} # type: Dict[str, Set[int]]
+ self.parsed_diff: Dict[str, Set[int]] = {}
- def parse_preliminary_options(self, argv):
- # type: (List[str]) -> Tuple[argparse.Namespace, List[str]]
+ def parse_preliminary_options(
+ self, argv: List[str]
+ ) -> Tuple[argparse.Namespace, List[str]]:
"""Get preliminary options from the CLI, pre-plugin-loading.
We need to know the values of a few standard options so that we can
@@ -123,8 +122,7 @@ class Application:
rest.extend(("--output-file", args.output_file))
return args, rest
- def exit(self):
- # type: () -> None
+ def exit(self) -> None:
"""Handle finalization and exiting the program.
This should be the last thing called on the application instance. It
@@ -140,8 +138,7 @@ class Application:
(self.result_count > 0) or self.catastrophic_failure
)
- def find_plugins(self, config_finder):
- # type: (config.ConfigFileFinder) -> None
+ def find_plugins(self, config_finder: config.ConfigFileFinder) -> None:
"""Find and load the plugins for this application.
Set the :attr:`check_plugins` and :attr:`formatting_plugins` attributes
@@ -163,8 +160,7 @@ class Application:
self.check_plugins.load_plugins()
self.formatting_plugins.load_plugins()
- def register_plugin_options(self):
- # type: () -> None
+ def register_plugin_options(self) -> None:
"""Register options provided by plugins to our option manager."""
self.check_plugins.register_options(self.option_manager)
self.check_plugins.register_plugin_versions(self.option_manager)
@@ -172,10 +168,9 @@ class Application:
def parse_configuration_and_cli(
self,
- config_finder, # type: config.ConfigFileFinder
- argv, # type: List[str]
- ):
- # type: (...) -> None
+ config_finder: config.ConfigFileFinder,
+ argv: List[str],
+ ) -> None:
"""Parse configuration files and the CLI options.
:param config.ConfigFileFinder config_finder:
@@ -215,8 +210,9 @@ class Application:
return formatter_plugin.execute
- def make_formatter(self, formatter_class=None):
- # type: (Optional[Type[BaseFormatter]]) -> None
+ def make_formatter(
+ self, formatter_class: Optional[Type["BaseFormatter"]] = None
+ ) -> None:
"""Initialize a formatter based on the parsed options."""
format_plugin = self.options.format
if 1 <= self.options.quiet < 2:
@@ -229,8 +225,7 @@ class Application:
self.formatter = formatter_class(self.options)
- def make_guide(self):
- # type: () -> None
+ def make_guide(self) -> None:
"""Initialize our StyleGuide."""
self.guide = style_guide.StyleGuideManager(
self.options, self.formatter
@@ -239,8 +234,7 @@ class Application:
if self.running_against_diff:
self.guide.add_diff_ranges(self.parsed_diff)
- def make_file_checker_manager(self):
- # type: () -> None
+ def make_file_checker_manager(self) -> None:
"""Initialize our FileChecker Manager."""
self.file_checker_manager = checker.Manager(
style_guide=self.guide,
@@ -248,8 +242,7 @@ class Application:
checker_plugins=self.check_plugins,
)
- def run_checks(self, files=None):
- # type: (Optional[List[str]]) -> None
+ def run_checks(self, files: Optional[List[str]] = None) -> None:
"""Run the actual checks with the FileChecker Manager.
This method encapsulates the logic to make a
@@ -289,8 +282,7 @@ class Application:
self.formatter.show_benchmarks(statistics)
- def report_errors(self):
- # type: () -> None
+ def report_errors(self) -> None:
"""Report all the errors found by flake8 3.0.
This also updates the :attr:`result_count` attribute with the total
@@ -312,8 +304,7 @@ class Application:
self.formatter.show_statistics(self.guide.stats)
- def initialize(self, argv):
- # type: (List[str]) -> None
+ def initialize(self, argv: List[str]) -> None:
"""Initialize the application to be run.
This finds the plugins, registers their options, and parses the
@@ -347,14 +338,12 @@ class Application:
self.report_benchmarks()
self.formatter.stop()
- def _run(self, argv):
- # type: (List[str]) -> None
+ def _run(self, argv: List[str]) -> None:
self.initialize(argv)
self.run_checks()
self.report()
- def run(self, argv):
- # type: (List[str]) -> None
+ def run(self, argv: List[str]) -> None:
"""Run our application.
This method will also handle KeyboardInterrupt exceptions for the
diff --git a/src/flake8/main/cli.py b/src/flake8/main/cli.py
index 9456ae2..ddbc7c0 100644
--- a/src/flake8/main/cli.py
+++ b/src/flake8/main/cli.py
@@ -6,8 +6,7 @@ from typing import Optional
from flake8.main import application
-def main(argv=None):
- # type: (Optional[List[str]]) -> None
+def main(argv: Optional[List[str]] = None) -> None:
"""Execute the main bit of the application.
This handles the creation of an instance of :class:`Application`, runs it,
diff --git a/src/flake8/main/debug.py b/src/flake8/main/debug.py
index 923c894..9f087c9 100644
--- a/src/flake8/main/debug.py
+++ b/src/flake8/main/debug.py
@@ -59,6 +59,6 @@ def plugins_from(option_manager):
]
-def dependencies(): # type: () -> List[Dict[str, str]]
+def dependencies() -> List[Dict[str, str]]:
"""Generate the list of dependencies we care about."""
return []
diff --git a/src/flake8/main/options.py b/src/flake8/main/options.py
index acadbb1..3dfb482 100644
--- a/src/flake8/main/options.py
+++ b/src/flake8/main/options.py
@@ -6,8 +6,7 @@ from flake8 import defaults
from flake8.main import debug
-def register_preliminary_options(parser):
- # type: (argparse.ArgumentParser) -> None
+def register_preliminary_options(parser: argparse.ArgumentParser) -> None:
"""Register the preliminary options on our OptionManager.
The preliminary options include:
@@ -64,7 +63,7 @@ def register_preliminary_options(parser):
class JobsArgument:
"""Type callback for the --jobs argument."""
- def __init__(self, arg): # type: (str) -> None
+ def __init__(self, arg: str) -> None:
"""Parse and validate the --jobs argument.
:param str arg:
diff --git a/src/flake8/options/aggregator.py b/src/flake8/options/aggregator.py
index 0ab7357..ac9aec5 100644
--- a/src/flake8/options/aggregator.py
+++ b/src/flake8/options/aggregator.py
@@ -15,10 +15,10 @@ LOG = logging.getLogger(__name__)
def aggregate_options(
- manager, # type: OptionManager
- config_finder, # type: config.ConfigFileFinder
- argv, # type: List[str]
-): # type: (...) -> Tuple[argparse.Namespace, List[str]]
+ manager: OptionManager,
+ config_finder: config.ConfigFileFinder,
+ argv: List[str],
+) -> Tuple[argparse.Namespace, List[str]]:
"""Aggregate and merge CLI and config file options.
:param flake8.options.manager.OptionManager manager:
diff --git a/src/flake8/options/config.py b/src/flake8/options/config.py
index dccc622..0a2e039 100644
--- a/src/flake8/options/config.py
+++ b/src/flake8/options/config.py
@@ -19,12 +19,11 @@ class ConfigFileFinder:
def __init__(
self,
- program_name,
- extra_config_files=None,
- config_file=None,
- ignore_config_files=False,
- ):
- # type: (str, Optional[List[str]], Optional[str], bool) -> None
+ program_name: str,
+ extra_config_files: Optional[List[str]] = None,
+ config_file: Optional[str] = None,
+ ignore_config_files: bool = False,
+ ) -> None:
"""Initialize object to find config files.
:param str program_name:
@@ -57,8 +56,7 @@ class ConfigFileFinder:
self.local_directory = os.path.abspath(os.curdir)
@staticmethod
- def _user_config_file(program_name):
- # type: (str) -> str
+ def _user_config_file(program_name: str) -> str:
if utils.is_windows():
home_dir = os.path.expanduser("~")
config_file_basename = "." + program_name
@@ -71,8 +69,9 @@ class ConfigFileFinder:
return os.path.join(home_dir, config_file_basename)
@staticmethod
- def _read_config(*files):
- # type: (*str) -> Tuple[configparser.RawConfigParser, List[str]]
+ def _read_config(
+ *files: str,
+ ) -> Tuple[configparser.RawConfigParser, List[str]]:
config = configparser.RawConfigParser()
found_files = []
@@ -93,8 +92,7 @@ class ConfigFileFinder:
)
return (config, found_files)
- def cli_config(self, files):
- # type: (str) -> configparser.RawConfigParser
+ def cli_config(self, files: str) -> configparser.RawConfigParser:
"""Read and parse the config file specified on the command-line."""
config, found_files = self._read_config(files)
if found_files:
@@ -360,7 +358,7 @@ def get_local_plugins(config_finder):
raw_paths = utils.parse_comma_separated_list(
config.get(section, "paths").strip()
)
- norm_paths = [] # type: List[str]
+ norm_paths: List[str] = []
for base_dir in base_dirs:
norm_paths.extend(
path
diff --git a/src/flake8/options/manager.py b/src/flake8/options/manager.py
index 206094f..5019499 100644
--- a/src/flake8/options/manager.py
+++ b/src/flake8/options/manager.py
@@ -32,7 +32,7 @@ LOG = logging.getLogger(__name__)
_ARG = enum.Enum("_ARG", "NO")
-_optparse_callable_map = {
+_optparse_callable_map: Dict[str, Union[Type[Any], _ARG]] = {
"int": int,
"long": int,
"string": str,
@@ -41,14 +41,13 @@ _optparse_callable_map = {
"choice": _ARG.NO,
# optparse allows this but does not document it
"str": str,
-} # type: Dict[str, Union[Type[Any], _ARG]]
+}
class _CallbackAction(argparse.Action):
"""Shim for optparse-style callback actions."""
- def __init__(self, *args, **kwargs):
- # type: (*Any, **Any) -> None
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
self._callback = kwargs.pop("callback")
self._callback_args = kwargs.pop("callback_args", ())
self._callback_kwargs = kwargs.pop("callback_kwargs", {})
@@ -56,12 +55,11 @@ class _CallbackAction(argparse.Action):
def __call__(
self,
- parser, # type: argparse.ArgumentParser
- namespace, # type: argparse.Namespace
- values, # type: Optional[Union[Sequence[str], str]]
- option_string=None, # type: Optional[str]
- ):
- # type: (...) -> None
+ parser: argparse.ArgumentParser,
+ namespace: argparse.Namespace,
+ values: Optional[Union[Sequence[str], str]],
+ option_string: Optional[str] = None,
+ ) -> None:
if not values:
values = None
elif isinstance(values, list) and len(values) > 1:
@@ -76,14 +74,15 @@ class _CallbackAction(argparse.Action):
)
-def _flake8_normalize(value, *args, **kwargs):
- # type: (str, *str, **bool) -> Union[str, List[str]]
+def _flake8_normalize(
+ value: str, *args: str, **kwargs: bool
+) -> Union[str, List[str]]:
comma_separated_list = kwargs.pop("comma_separated_list", False)
normalize_paths = kwargs.pop("normalize_paths", False)
if kwargs:
raise TypeError(f"Unexpected keyword args: {kwargs}")
- ret = value # type: Union[str, List[str]]
+ ret: Union[str, List[str]] = value
if comma_separated_list and isinstance(ret, str):
ret = utils.parse_comma_separated_list(value)
@@ -101,29 +100,29 @@ class Option:
def __init__(
self,
- short_option_name=_ARG.NO, # type: Union[str, _ARG]
- long_option_name=_ARG.NO, # type: Union[str, _ARG]
+ short_option_name: Union[str, _ARG] = _ARG.NO,
+ long_option_name: Union[str, _ARG] = _ARG.NO,
# Options below here are taken from the optparse.Option class
- action=_ARG.NO, # type: Union[str, Type[argparse.Action], _ARG]
- default=_ARG.NO, # type: Union[Any, _ARG]
- type=_ARG.NO, # type: Union[str, Callable[..., Any], _ARG]
- dest=_ARG.NO, # type: Union[str, _ARG]
- nargs=_ARG.NO, # type: Union[int, str, _ARG]
- const=_ARG.NO, # type: Union[Any, _ARG]
- choices=_ARG.NO, # type: Union[Sequence[Any], _ARG]
- help=_ARG.NO, # type: Union[str, _ARG]
- metavar=_ARG.NO, # type: Union[str, _ARG]
+ action: Union[str, Type[argparse.Action], _ARG] = _ARG.NO,
+ default: Union[Any, _ARG] = _ARG.NO,
+ type: Union[str, Callable[..., Any], _ARG] = _ARG.NO,
+ dest: Union[str, _ARG] = _ARG.NO,
+ nargs: Union[int, str, _ARG] = _ARG.NO,
+ const: Union[Any, _ARG] = _ARG.NO,
+ choices: Union[Sequence[Any], _ARG] = _ARG.NO,
+ help: Union[str, _ARG] = _ARG.NO,
+ metavar: Union[str, _ARG] = _ARG.NO,
# deprecated optparse-only options
- callback=_ARG.NO, # type: Union[Callable[..., Any], _ARG]
- callback_args=_ARG.NO, # type: Union[Sequence[Any], _ARG]
- callback_kwargs=_ARG.NO, # type: Union[Mapping[str, Any], _ARG]
+ callback: Union[Callable[..., Any], _ARG] = _ARG.NO,
+ callback_args: Union[Sequence[Any], _ARG] = _ARG.NO,
+ callback_kwargs: Union[Mapping[str, Any], _ARG] = _ARG.NO,
# Options below are taken from argparse.ArgumentParser.add_argument
- required=_ARG.NO, # type: Union[bool, _ARG]
+ required: Union[bool, _ARG] = _ARG.NO,
# Options below here are specific to Flake8
- parse_from_config=False, # type: bool
- comma_separated_list=False, # type: bool
- normalize_paths=False, # type: bool
- ): # type: (...) -> None
+ parse_from_config: bool = False,
+ comma_separated_list: bool = False,
+ normalize_paths: bool = False,
+ ) -> None:
"""Initialize an Option instance.
The following are all passed directly through to argparse.
@@ -251,7 +250,7 @@ class Option:
self.help = help
self.metavar = metavar
self.required = required
- self.option_kwargs = {
+ self.option_kwargs: Dict[str, Union[Any, _ARG]] = {
"action": self.action,
"default": self.default,
"type": self.type,
@@ -265,14 +264,14 @@ class Option:
"help": self.help,
"metavar": self.metavar,
"required": self.required,
- } # type: Dict[str, Union[Any, _ARG]]
+ }
# Set our custom attributes
self.parse_from_config = parse_from_config
self.comma_separated_list = comma_separated_list
self.normalize_paths = normalize_paths
- self.config_name = None # type: Optional[str]
+ self.config_name: Optional[str] = None
if parse_from_config:
if long_option_name is _ARG.NO:
raise ValueError(
@@ -284,13 +283,13 @@ class Option:
self._opt = None
@property
- def filtered_option_kwargs(self): # type: () -> Dict[str, Any]
+ def filtered_option_kwargs(self) -> Dict[str, Any]:
"""Return any actually-specified arguments."""
return {
k: v for k, v in self.option_kwargs.items() if v is not _ARG.NO
}
- def __repr__(self): # type: () -> str # noqa: D105
+ def __repr__(self) -> str: # noqa: D105
parts = []
for arg in self.option_args:
parts.append(arg)
@@ -298,8 +297,7 @@ class Option:
parts.append(f"{k}={v!r}")
return "Option({})".format(", ".join(parts))
- def normalize(self, value, *normalize_args):
- # type: (Any, *str) -> Any
+ def normalize(self, value: Any, *normalize_args: str) -> Any:
"""Normalize the value based on the option configuration."""
if self.comma_separated_list and isinstance(value, str):
value = utils.parse_comma_separated_list(value)
@@ -312,8 +310,9 @@ class Option:
return value
- def normalize_from_setuptools(self, value):
- # type: (str) -> Union[int, float, complex, bool, str]
+ def normalize_from_setuptools(
+ self, value: str
+ ) -> Union[int, float, complex, bool, str]:
"""Normalize the value received from setuptools."""
value = self.normalize(value)
if self.type is int or self.action == "count":
@@ -330,13 +329,12 @@ class Option:
return False
return value
- def to_argparse(self):
- # type: () -> Tuple[List[str], Dict[str, Any]]
+ def to_argparse(self) -> Tuple[List[str], Dict[str, Any]]:
"""Convert a Flake8 Option to argparse ``add_argument`` arguments."""
return self.option_args, self.filtered_option_kwargs
@property
- def to_optparse(self): # type: () -> NoReturn
+ def to_optparse(self) -> "NoReturn":
"""No longer functional."""
raise AttributeError("to_optparse: flake8 now uses argparse")
@@ -351,11 +349,11 @@ class OptionManager:
def __init__(
self,
- prog,
- version,
- usage="%(prog)s [options] file file ...",
- parents=None,
- ): # type: (str, str, str, Optional[List[argparse.ArgumentParser]]) -> None # noqa: E501
+ prog: str,
+ version: str,
+ usage: str = "%(prog)s [options] file file ...",
+ parents: Optional[List[argparse.ArgumentParser]] = None,
+ ) -> None: # noqa: E501
"""Initialize an instance of an OptionManager.
:param str prog:
@@ -371,10 +369,10 @@ class OptionManager:
if parents is None:
parents = []
- self.parser = argparse.ArgumentParser(
+ self.parser: argparse.ArgumentParser = argparse.ArgumentParser(
prog=prog, usage=usage, parents=parents
- ) # type: argparse.ArgumentParser
- self._current_group = None # type: Optional[argparse._ArgumentGroup]
+ )
+ self._current_group: Optional[argparse._ArgumentGroup] = None
self.version_action = cast(
"argparse._VersionAction",
self.parser.add_argument(
@@ -382,16 +380,16 @@ class OptionManager:
),
)
self.parser.add_argument("filenames", nargs="*", metavar="filename")
- self.config_options_dict = {} # type: Dict[str, Option]
- self.options = [] # type: List[Option]
+ self.config_options_dict: Dict[str, Option] = {}
+ self.options: List[Option] = []
self.program_name = prog
self.version = version
- self.registered_plugins = set() # type: Set[PluginVersion]
- self.extended_default_ignore = set() # type: Set[str]
- self.extended_default_select = set() # type: Set[str]
+ self.registered_plugins: Set[PluginVersion] = set()
+ self.extended_default_ignore: Set[str] = set()
+ self.extended_default_select: Set[str] = set()
@contextlib.contextmanager
- def group(self, name): # type: (str) -> Generator[None, None, None]
+ def group(self, name: str) -> Generator[None, None, None]:
"""Attach options to an argparse group during this context."""
group = self.parser.add_argument_group(name)
self._current_group, orig_group = group, self._current_group
@@ -400,7 +398,7 @@ class OptionManager:
finally:
self._current_group = orig_group
- def add_option(self, *args, **kwargs): # type: (*Any, **Any) -> None
+ def add_option(self, *args: Any, **kwargs: Any) -> None:
"""Create and register a new option.
See parameters for :class:`~flake8.options.manager.Option` for
@@ -425,8 +423,7 @@ class OptionManager:
self.config_options_dict[name.replace("_", "-")] = option
LOG.debug('Registered option "%s".', option)
- def remove_from_default_ignore(self, error_codes):
- # type: (Sequence[str]) -> None
+ def remove_from_default_ignore(self, error_codes: Sequence[str]) -> None:
"""Remove specified error codes from the default ignore list.
:param list error_codes:
@@ -444,8 +441,7 @@ class OptionManager:
error_code,
)
- def extend_default_ignore(self, error_codes):
- # type: (Sequence[str]) -> None
+ def extend_default_ignore(self, error_codes: Sequence[str]) -> None:
"""Extend the default ignore list with the error codes provided.
:param list error_codes:
@@ -455,8 +451,7 @@ class OptionManager:
LOG.debug("Extending default ignore list with %r", error_codes)
self.extended_default_ignore.update(error_codes)
- def extend_default_select(self, error_codes):
- # type: (Sequence[str]) -> None
+ def extend_default_select(self, error_codes: Sequence[str]) -> None:
"""Extend the default select list with the error codes provided.
:param list error_codes:
@@ -467,22 +462,21 @@ class OptionManager:
self.extended_default_select.update(error_codes)
def generate_versions(
- self, format_str="%(name)s: %(version)s", join_on=", "
- ):
- # type: (str, str) -> str
+ self, format_str: str = "%(name)s: %(version)s", join_on: str = ", "
+ ) -> str:
"""Generate a comma-separated list of versions of plugins."""
return join_on.join(
format_str % plugin._asdict()
for plugin in sorted(self.registered_plugins)
)
- def update_version_string(self): # type: () -> None
+ def update_version_string(self) -> None:
"""Update the flake8 version string."""
self.version_action.version = "{} ({}) {}".format(
self.version, self.generate_versions(), utils.get_python_version()
)
- def generate_epilog(self): # type: () -> None
+ def generate_epilog(self) -> None:
"""Create an epilog with the version and name of each of plugin."""
plugin_version_format = "%(name)s: %(version)s"
self.parser.epilog = "Installed plugins: " + self.generate_versions(
@@ -491,10 +485,9 @@ class OptionManager:
def parse_args(
self,
- args=None, # type: Optional[List[str]]
- values=None, # type: Optional[argparse.Namespace]
- ):
- # type: (...) -> Tuple[argparse.Namespace, List[str]]
+ args: Optional[List[str]] = None,
+ values: Optional[argparse.Namespace] = None,
+ ) -> Tuple[argparse.Namespace, List[str]]:
"""Proxy to calling the OptionParser's parse_args method."""
self.generate_epilog()
self.update_version_string()
@@ -504,8 +497,9 @@ class OptionManager:
# TODO: refactor callers to not need this
return parsed_args, parsed_args.filenames
- def parse_known_args(self, args=None):
- # type: (Optional[List[str]]) -> Tuple[argparse.Namespace, List[str]]
+ def parse_known_args(
+ self, args: Optional[List[str]] = None
+ ) -> Tuple[argparse.Namespace, List[str]]:
"""Parse only the known arguments from the argument values.
Replicate a little argparse behaviour while we're still on
@@ -515,8 +509,9 @@ class OptionManager:
self.update_version_string()
return self.parser.parse_known_args(args)
- def register_plugin(self, name, version, local=False):
- # type: (str, str, bool) -> None
+ def register_plugin(
+ self, name: str, version: str, local: bool = False
+ ) -> None:
"""Register a plugin relying on the OptionManager.
:param str name:
diff --git a/src/flake8/plugins/manager.py b/src/flake8/plugins/manager.py
index ab4cbd8..d36f95b 100644
--- a/src/flake8/plugins/manager.py
+++ b/src/flake8/plugins/manager.py
@@ -35,14 +35,14 @@ class Plugin:
self.name = name
self.entry_point = entry_point
self.local = local
- self._plugin = None # type: Any
+ self._plugin: Any = None
self._parameters = None
- self._parameter_names = None # type: Optional[List[str]]
+ self._parameter_names: Optional[List[str]] = None
self._group = None
self._plugin_name = None
self._version = None
- def __repr__(self): # type: () -> str
+ def __repr__(self) -> str:
"""Provide an easy to read description of the current plugin."""
return 'Plugin(name="{}", entry_point="{}")'.format(
self.name, self.entry_point.value
@@ -88,7 +88,7 @@ class Plugin:
return self._parameters
@property
- def parameter_names(self): # type: () -> List[str]
+ def parameter_names(self) -> List[str]:
"""List of argument names that need to be passed to the plugin."""
if self._parameter_names is None:
self._parameter_names = list(self.parameters)
@@ -104,7 +104,7 @@ class Plugin:
return self._plugin
@property
- def version(self): # type: () -> str
+ def version(self) -> str:
"""Return the version of the plugin."""
version = self._version
if version is None:
@@ -226,8 +226,9 @@ class Plugin:
class PluginManager: # pylint: disable=too-few-public-methods
"""Find and manage plugins consistently."""
- def __init__(self, namespace, local_plugins=None):
- # type: (str, Optional[List[str]]) -> None
+ def __init__(
+ self, namespace: str, local_plugins: Optional[List[str]] = None
+ ) -> None:
"""Initialize the manager.
:param str namespace:
@@ -236,8 +237,8 @@ class PluginManager: # pylint: disable=too-few-public-methods
Plugins from config (as "X = path.to:Plugin" strings).
"""
self.namespace = namespace
- self.plugins = {} # type: Dict[str, Plugin]
- self.names = [] # type: List[str]
+ self.plugins: Dict[str, Plugin] = {}
+ self.names: List[str] = []
self._load_local_plugins(local_plugins or [])
self._load_entrypoint_plugins()
@@ -314,7 +315,7 @@ class PluginManager: # pylint: disable=too-few-public-methods
:rtype:
tuple
"""
- plugins_seen = set() # type: Set[str]
+ plugins_seen: Set[str] = set()
for entry_point_name in self.names:
plugin = self.plugins[entry_point_name]
plugin_name = plugin.plugin_name
@@ -349,7 +350,7 @@ def version_for(plugin):
class PluginTypeManager:
"""Parent class for most of the specific plugin types."""
- namespace = None # type: str
+ namespace: str
def __init__(self, local_plugins=None):
"""Initialize the plugin type's manager.
diff --git a/src/flake8/plugins/pyflakes.py b/src/flake8/plugins/pyflakes.py
index 5d51799..9709887 100644
--- a/src/flake8/plugins/pyflakes.py
+++ b/src/flake8/plugins/pyflakes.py
@@ -1,9 +1,7 @@
"""Plugin built-in to Flake8 to treat pyflakes as a plugin."""
-
import os
from typing import List
-import pyflakes
import pyflakes.checker
from flake8 import utils
@@ -66,8 +64,8 @@ class FlakesChecker(pyflakes.checker.Checker):
name = "pyflakes"
version = pyflakes.__version__
with_doctest = False
- include_in_doctest = [] # type: List[str]
- exclude_from_doctest = [] # type: List[str]
+ include_in_doctest: List[str] = []
+ exclude_from_doctest: List[str] = []
def __init__(self, tree, file_tokens, filename):
"""Initialize the PyFlakes plugin with an AST tree and filename."""
diff --git a/src/flake8/processor.py b/src/flake8/processor.py
index 20a99de..fdecfa8 100644
--- a/src/flake8/processor.py
+++ b/src/flake8/processor.py
@@ -60,8 +60,12 @@ class FileProcessor:
#: always ``False``, included for compatibility
noqa = False
- def __init__(self, filename, options, lines=None):
- # type: (str, argparse.Namespace, Optional[List[str]]) -> None
+ def __init__(
+ self,
+ filename: str,
+ options: argparse.Namespace,
+ lines: Optional[List[str]] = None,
+ ) -> None:
"""Initialice our file processor.
:param str filename:
@@ -78,13 +82,13 @@ class FileProcessor:
#: Number of blank lines
self.blank_lines = 0
#: Checker states for each plugin?
- self._checker_states = {} # type: Dict[str, Dict[Any, Any]]
+ self._checker_states: Dict[str, Dict[Any, Any]] = {}
#: Current checker state
- self.checker_state = {} # type: Dict[Any, Any]
+ self.checker_state: Dict[Any, Any] = {}
#: User provided option for hang closing
self.hang_closing = options.hang_closing
#: Character used for indentation
- self.indent_char = None # type: Optional[str]
+ self.indent_char: Optional[str] = None
#: Current level of indentation
self.indent_level = 0
#: Number of spaces used for indentation
@@ -108,19 +112,19 @@ class FileProcessor:
#: Previous unindented (i.e. top-level) logical line
self.previous_unindented_logical_line = ""
#: Current set of tokens
- self.tokens = [] # type: List[_Token]
+ self.tokens: List[_Token] = []
#: Total number of lines in the file
self.total_lines = len(self.lines)
#: Verbosity level of Flake8
self.verbose = options.verbose
#: Statistics dictionary
self.statistics = {"logical lines": 0}
- self._file_tokens = None # type: Optional[List[_Token]]
+ self._file_tokens: Optional[List[_Token]] = None
# map from line number to the line we'll search for `noqa` in
- self._noqa_line_mapping = None # type: Optional[Dict[int, str]]
+ self._noqa_line_mapping: Optional[Dict[int, str]] = None
@property
- def file_tokens(self): # type: () -> List[_Token]
+ def file_tokens(self) -> List[_Token]:
"""Return the complete set of tokens for a file.
Accessing this attribute *may* raise an InvalidSyntax exception.
@@ -139,28 +143,28 @@ class FileProcessor:
return self._file_tokens
@contextlib.contextmanager
- def inside_multiline(self, line_number):
- # type: (int) -> Generator[None, None, None]
+ def inside_multiline(
+ self, line_number: int
+ ) -> Generator[None, None, None]:
"""Context-manager to toggle the multiline attribute."""
self.line_number = line_number
self.multiline = True
yield
self.multiline = False
- def reset_blank_before(self): # type: () -> None
+ def reset_blank_before(self) -> None:
"""Reset the blank_before attribute to zero."""
self.blank_before = 0
- def delete_first_token(self): # type: () -> None
+ def delete_first_token(self) -> None:
"""Delete the first token in the list of tokens."""
del self.tokens[0]
- def visited_new_blank_line(self): # type: () -> None
+ def visited_new_blank_line(self) -> None:
"""Note that we visited a new blank line."""
self.blank_lines += 1
- def update_state(self, mapping):
- # type: (_LogicalMapping) -> None
+ def update_state(self, mapping: _LogicalMapping) -> None:
"""Update the indent level based on the logical line mapping."""
(start_row, start_col) = mapping[0][1]
start_line = self.lines[start_row - 1]
@@ -168,15 +172,14 @@ class FileProcessor:
if self.blank_before < self.blank_lines:
self.blank_before = self.blank_lines
- def update_checker_state_for(self, plugin):
- # type: (Dict[str, Any]) -> None
+ def update_checker_state_for(self, plugin: Dict[str, Any]) -> None:
"""Update the checker_state attribute for the plugin."""
if "checker_state" in plugin["parameters"]:
self.checker_state = self._checker_states.setdefault(
plugin["name"], {}
)
- def next_logical_line(self): # type: () -> None
+ def next_logical_line(self) -> None:
"""Record the previous logical line.
This also resets the tokens list and the blank_lines count.
@@ -189,11 +192,11 @@ class FileProcessor:
self.blank_lines = 0
self.tokens = []
- def build_logical_line_tokens(self): # type: () -> _Logical
+ def build_logical_line_tokens(self) -> _Logical:
"""Build the mapping, comments, and logical line lists."""
logical = []
comments = []
- mapping = [] # type: _LogicalMapping
+ mapping: _LogicalMapping = []
length = 0
previous_row = previous_column = None
for token_type, text, start, end, line in self.tokens:
@@ -224,12 +227,11 @@ class FileProcessor:
(previous_row, previous_column) = end
return comments, logical, mapping
- def build_ast(self): # type: () -> ast.AST
+ def build_ast(self) -> ast.AST:
"""Build an abstract syntax tree from the list of lines."""
return ast.parse("".join(self.lines))
- def build_logical_line(self):
- # type: () -> Tuple[str, str, _LogicalMapping]
+ def build_logical_line(self) -> Tuple[str, str, _LogicalMapping]:
"""Build a logical line from the current tokens list."""
comments, logical, mapping_list = self.build_logical_line_tokens()
joined_comments = "".join(comments)
@@ -237,8 +239,7 @@ class FileProcessor:
self.statistics["logical lines"] += 1
return joined_comments, self.logical_line, mapping_list
- def split_line(self, token):
- # type: (_Token) -> Generator[str, None, None]
+ def split_line(self, token: _Token) -> Generator[str, None, None]:
"""Split a physical line's line based on new-lines.
This also auto-increments the line number for the caller.
@@ -247,8 +248,11 @@ class FileProcessor:
yield line
self.line_number += 1
- def keyword_arguments_for(self, parameters, arguments=None):
- # type: (Dict[str, bool], Optional[Dict[str, Any]]) -> Dict[str, Any]
+ def keyword_arguments_for(
+ self,
+ parameters: Dict[str, bool],
+ arguments: Optional[Dict[str, Any]] = None,
+ ) -> Dict[str, Any]:
"""Generate the keyword arguments for a list of parameters."""
if arguments is None:
arguments = {}
@@ -269,7 +273,7 @@ class FileProcessor:
)
return arguments
- def generate_tokens(self): # type: () -> Generator[_Token, None, None]
+ def generate_tokens(self) -> Generator[_Token, None, None]:
"""Tokenize the file and yield the tokens.
:raises flake8.exceptions.InvalidSyntax:
@@ -285,13 +289,14 @@ class FileProcessor:
except (tokenize.TokenError, SyntaxError) as exc:
raise exceptions.InvalidSyntax(exception=exc)
- def _noqa_line_range(self, min_line, max_line):
- # type: (int, int) -> Dict[int, str]
+ def _noqa_line_range(
+ self, min_line: int, max_line: int
+ ) -> Dict[int, str]:
line_range = range(min_line, max_line + 1)
joined = "".join(self.lines[min_line - 1 : max_line])
return dict.fromkeys(line_range, joined)
- def noqa_line_for(self, line_number): # type: (int) -> Optional[str]
+ def noqa_line_for(self, line_number: int) -> Optional[str]:
"""Retrieve the line which will be used to determine noqa."""
if self._noqa_line_mapping is None:
try:
@@ -331,7 +336,7 @@ class FileProcessor:
# retrieve a physical line (since none exist).
return self._noqa_line_mapping.get(line_number)
- def next_line(self): # type: () -> str
+ def next_line(self) -> str:
"""Get the next line from the list."""
if self.line_number >= self.total_lines:
return ""
@@ -341,8 +346,7 @@ class FileProcessor:
self.indent_char = line[0]
return line
- def read_lines(self):
- # type: () -> List[str]
+ def read_lines(self) -> List[str]:
"""Read the lines for this file checker."""
if self.filename is None or self.filename == "-":
self.filename = self.options.stdin_display_name or "stdin"
@@ -351,8 +355,7 @@ class FileProcessor:
lines = self.read_lines_from_filename()
return lines
- def read_lines_from_filename(self):
- # type: () -> List[str]
+ def read_lines_from_filename(self) -> List[str]:
"""Read the lines for a file."""
try:
with tokenize.open(self.filename) as fd:
@@ -363,13 +366,11 @@ class FileProcessor:
with open(self.filename, encoding="latin-1") as fd:
return fd.readlines()
- def read_lines_from_stdin(self):
- # type: () -> List[str]
+ def read_lines_from_stdin(self) -> List[str]:
"""Read the lines from standard in."""
return utils.stdin_get_lines()
- def should_ignore_file(self):
- # type: () -> bool
+ def should_ignore_file(self) -> bool:
"""Check if ``flake8: noqa`` is in the file to be ignored.
:returns:
@@ -391,8 +392,7 @@ class FileProcessor:
else:
return False
- def strip_utf_bom(self):
- # type: () -> None
+ def strip_utf_bom(self) -> None:
"""Strip the UTF bom from the lines of the file."""
if not self.lines:
# If we have nothing to analyze quit early
@@ -409,23 +409,22 @@ class FileProcessor:
self.lines[0] = self.lines[0][3:]
-def is_eol_token(token): # type: (_Token) -> bool
+def is_eol_token(token: _Token) -> bool:
"""Check if the token is an end-of-line token."""
return token[0] in NEWLINE or token[4][token[3][1] :].lstrip() == "\\\n"
-def is_multiline_string(token): # type: (_Token) -> bool
+def is_multiline_string(token: _Token) -> bool:
"""Check if this is a multiline string."""
return token[0] == tokenize.STRING and "\n" in token[1]
-def token_is_newline(token): # type: (_Token) -> bool
+def token_is_newline(token: _Token) -> bool:
"""Check if the token type is a newline token type."""
return token[0] in NEWLINE
-def count_parentheses(current_parentheses_count, token_text):
- # type: (int, str) -> int
+def count_parentheses(current_parentheses_count: int, token_text: str) -> int:
"""Count the number of parentheses."""
if token_text in "([{": # nosec
return current_parentheses_count + 1
@@ -434,7 +433,7 @@ def count_parentheses(current_parentheses_count, token_text):
return current_parentheses_count
-def log_token(log, token): # type: (logging.Logger, _Token) -> None
+def log_token(log: logging.Logger, token: _Token) -> None:
"""Log a token to a provided logging object."""
if token[2][0] == token[3][0]:
pos = "[{}:{}]".format(token[2][1] or "", token[3][1])
@@ -449,7 +448,7 @@ def log_token(log, token): # type: (logging.Logger, _Token) -> None
# NOTE(sigmavirus24): This was taken wholesale from
# https://github.com/PyCQA/pycodestyle
-def expand_indent(line): # type: (str) -> int
+def expand_indent(line: str) -> int:
r"""Return the amount of indentation.
Tabs are expanded to the next multiple of 8.
@@ -479,7 +478,7 @@ def expand_indent(line): # type: (str) -> int
# NOTE(sigmavirus24): This was taken wholesale from
# https://github.com/PyCQA/pycodestyle. The in-line comments were edited to be
# more descriptive.
-def mutate_string(text): # type: (str) -> str
+def mutate_string(text: str) -> str:
"""Replace contents with 'xxx' to prevent syntax matching.
>>> mutate_string('"abc"')
diff --git a/src/flake8/statistics.py b/src/flake8/statistics.py
index 0ccc746..073bfe4 100644
--- a/src/flake8/statistics.py
+++ b/src/flake8/statistics.py
@@ -13,11 +13,11 @@ if TYPE_CHECKING:
class Statistics:
"""Manager of aggregated statistics for a run of Flake8."""
- def __init__(self): # type: () -> None
+ def __init__(self) -> None:
"""Initialize the underlying dictionary for our statistics."""
- self._store = {} # type: Dict[Key, Statistic]
+ self._store: Dict[Key, "Statistic"] = {}
- def error_codes(self): # type: () -> List[str]
+ def error_codes(self) -> List[str]:
"""Return all unique error codes stored.
:returns:
@@ -27,7 +27,7 @@ class Statistics:
"""
return sorted({key.code for key in self._store})
- def record(self, error): # type: (Violation) -> None
+ def record(self, error: "Violation") -> None:
"""Add the fact that the error was seen in the file.
:param error:
@@ -41,8 +41,9 @@ class Statistics:
self._store[key] = Statistic.create_from(error)
self._store[key].increment()
- def statistics_for(self, prefix, filename=None):
- # type: (str, Optional[str]) -> Generator[Statistic, None, None]
+ def statistics_for(
+ self, prefix: str, filename: Optional[str] = None
+ ) -> Generator["Statistic", None, None]:
"""Generate statistics for the prefix and filename.
If you have a :class:`Statistics` object that has recorded errors,
@@ -83,11 +84,11 @@ class Key(collections.namedtuple("Key", ["filename", "code"])):
__slots__ = ()
@classmethod
- def create_from(cls, error): # type: (Violation) -> Key
+ def create_from(cls, error: "Violation") -> "Key":
"""Create a Key from :class:`flake8.style_guide.Violation`."""
return cls(filename=error.filename, code=error.code)
- def matches(self, prefix, filename): # type: (str, Optional[str]) -> bool
+ def matches(self, prefix: str, filename: Optional[str]) -> bool:
"""Determine if this key matches some constraints.
:param str prefix:
@@ -114,8 +115,9 @@ class Statistic:
convenience methods on it.
"""
- def __init__(self, error_code, filename, message, count):
- # type: (str, str, str, int) -> None
+ def __init__(
+ self, error_code: str, filename: str, message: str, count: int
+ ) -> None:
"""Initialize our Statistic."""
self.error_code = error_code
self.filename = filename
@@ -123,7 +125,7 @@ class Statistic:
self.count = count
@classmethod
- def create_from(cls, error): # type: (Violation) -> Statistic
+ def create_from(cls, error: "Violation") -> "Statistic":
"""Create a Statistic from a :class:`flake8.style_guide.Violation`."""
return cls(
error_code=error.code,
@@ -132,6 +134,6 @@ class Statistic:
count=0,
)
- def increment(self): # type: () -> None
+ def increment(self) -> None:
"""Increment the number of times we've seen this error in this file."""
self.count += 1
diff --git a/src/flake8/style_guide.py b/src/flake8/style_guide.py
index 915021d..c35c739 100644
--- a/src/flake8/style_guide.py
+++ b/src/flake8/style_guide.py
@@ -50,7 +50,7 @@ class Decision(enum.Enum):
@functools.lru_cache(maxsize=512)
-def find_noqa(physical_line): # type: (str) -> Optional[Match[str]]
+def find_noqa(physical_line: str) -> Optional[Match[str]]:
return defaults.NOQA_INLINE_REGEXP.search(physical_line)
@@ -70,8 +70,7 @@ _Violation = collections.namedtuple(
class Violation(_Violation):
"""Class representing a violation reported by Flake8."""
- def is_inline_ignored(self, disable_noqa):
- # type: (bool) -> bool
+ def is_inline_ignored(self, disable_noqa: bool) -> bool:
"""Determine if a comment has been added to ignore this line.
:param bool disable_noqa:
@@ -112,8 +111,7 @@ class Violation(_Violation):
)
return False
- def is_in(self, diff):
- # type: (Dict[str, Set[int]]) -> bool
+ def is_in(self, diff: Dict[str, Set[int]]) -> bool:
"""Determine if the violation is included in a diff's line ranges.
This function relies on the parsed data added via
@@ -156,9 +154,9 @@ class DecisionEngine:
ignored.
"""
- def __init__(self, options): # type: (argparse.Namespace) -> None
+ def __init__(self, options: argparse.Namespace) -> None:
"""Initialize the engine."""
- self.cache = {} # type: Dict[str, Decision]
+ self.cache: Dict[str, Decision] = {}
self.selected = tuple(options.select)
self.extended_selected = tuple(
sorted(options.extended_default_select, reverse=True)
@@ -176,16 +174,15 @@ class DecisionEngine:
self.using_default_ignore = set(self.ignored) == set(defaults.IGNORE)
self.using_default_select = set(self.selected) == set(defaults.SELECT)
- def _in_all_selected(self, code): # type: (str) -> bool
+ def _in_all_selected(self, code: str) -> bool:
return bool(self.all_selected) and code.startswith(self.all_selected)
- def _in_extended_selected(self, code): # type: (str) -> bool
+ def _in_extended_selected(self, code: str) -> bool:
return bool(self.extended_selected) and code.startswith(
self.extended_selected
)
- def was_selected(self, code):
- # type: (str) -> Union[Selected, Ignored]
+ def was_selected(self, code: str) -> Union[Selected, Ignored]:
"""Determine if the code has been selected by the user.
:param str code:
@@ -208,8 +205,7 @@ class DecisionEngine:
return Ignored.Implicitly
- def was_ignored(self, code):
- # type: (str) -> Union[Selected, Ignored]
+ def was_ignored(self, code: str) -> Union[Selected, Ignored]:
"""Determine if the code has been ignored by the user.
:param str code:
@@ -226,8 +222,7 @@ class DecisionEngine:
return Selected.Implicitly
- def more_specific_decision_for(self, code):
- # type: (str) -> Decision
+ def more_specific_decision_for(self, code: str) -> Decision:
select = find_first_match(code, self.all_selected)
extra_select = find_first_match(code, self.extended_selected)
ignore = find_first_match(code, self.ignored)
@@ -275,8 +270,7 @@ class DecisionEngine:
return Decision.Ignored
return Decision.Selected
- def make_decision(self, code):
- # type: (str) -> Decision
+ def make_decision(self, code: str) -> Decision:
"""Decide if code should be ignored or selected."""
LOG.debug('Deciding if "%s" should be reported', code)
selected = self.was_selected(code)
@@ -302,8 +296,7 @@ class DecisionEngine:
decision = Decision.Ignored # pylint: disable=R0204
return decision
- def decision_for(self, code):
- # type: (str) -> Decision
+ def decision_for(self, code: str) -> Decision:
"""Return the decision for a specific code.
This method caches the decisions for codes to avoid retracing the same
@@ -330,10 +323,10 @@ class StyleGuideManager:
def __init__(
self,
- options, # type: argparse.Namespace
- formatter, # type: base_formatter.BaseFormatter
- decider=None, # type: Optional[DecisionEngine]
- ): # type: (...) -> None
+ options: argparse.Namespace,
+ formatter: base_formatter.BaseFormatter,
+ decider: Optional[DecisionEngine] = None,
+ ) -> None:
"""Initialize our StyleGuide.
.. todo:: Add parameter documentation.
@@ -342,7 +335,7 @@ class StyleGuideManager:
self.formatter = formatter
self.stats = statistics.Statistics()
self.decider = decider or DecisionEngine(options)
- self.style_guides = [] # type: List[StyleGuide]
+ self.style_guides: List[StyleGuide] = []
self.default_style_guide = StyleGuide(
options, formatter, self.stats, decider=decider
)
@@ -353,8 +346,9 @@ class StyleGuideManager:
)
)
- def populate_style_guides_with(self, options):
- # type: (argparse.Namespace) -> Generator[StyleGuide, None, None]
+ def populate_style_guides_with(
+ self, options: argparse.Namespace
+ ) -> Generator["StyleGuide", None, None]:
"""Generate style guides from the per-file-ignores option.
:param options:
@@ -375,7 +369,7 @@ class StyleGuideManager:
)
@functools.lru_cache(maxsize=None)
- def style_guide_for(self, filename): # type: (str) -> StyleGuide
+ def style_guide_for(self, filename: str) -> "StyleGuide":
"""Find the StyleGuide for the filename in particular."""
guides = sorted(
(g for g in self.style_guides if g.applies_to(filename)),
@@ -386,8 +380,9 @@ class StyleGuideManager:
return guides[0]
@contextlib.contextmanager
- def processing_file(self, filename):
- # type: (str) -> Generator[StyleGuide, None, None]
+ def processing_file(
+ self, filename: str
+ ) -> Generator["StyleGuide", None, None]:
"""Record the fact that we're processing the file's results."""
guide = self.style_guide_for(filename)
with guide.processing_file(filename):
@@ -395,14 +390,13 @@ class StyleGuideManager:
def handle_error(
self,
- code,
- filename,
- line_number,
- column_number,
- text,
- physical_line=None,
- ):
- # type: (str, str, int, Optional[int], str, Optional[str]) -> int
+ code: str,
+ filename: str,
+ line_number: int,
+ column_number: Optional[int],
+ text: str,
+ physical_line: Optional[str] = None,
+ ) -> int:
"""Handle an error reported by a check.
:param str code:
@@ -430,8 +424,7 @@ class StyleGuideManager:
code, filename, line_number, column_number, text, physical_line
)
- def add_diff_ranges(self, diffinfo):
- # type: (Dict[str, Set[int]]) -> None
+ def add_diff_ranges(self, diffinfo: Dict[str, Set[int]]) -> None:
"""Update the StyleGuides to filter out information not in the diff.
This provides information to the underlying StyleGuides so that only
@@ -449,11 +442,11 @@ class StyleGuide:
def __init__(
self,
- options, # type: argparse.Namespace
- formatter, # type: base_formatter.BaseFormatter
- stats, # type: statistics.Statistics
- filename=None, # type: Optional[str]
- decider=None, # type: Optional[DecisionEngine]
+ options: argparse.Namespace,
+ formatter: base_formatter.BaseFormatter,
+ stats: statistics.Statistics,
+ filename: Optional[str] = None,
+ decider: Optional[DecisionEngine] = None,
):
"""Initialize our StyleGuide.
@@ -466,14 +459,17 @@ class StyleGuide:
self.filename = filename
if self.filename:
self.filename = utils.normalize_path(self.filename)
- self._parsed_diff = {} # type: Dict[str, Set[int]]
+ self._parsed_diff: Dict[str, Set[int]] = {}
- def __repr__(self): # type: () -> str
+ def __repr__(self) -> str:
"""Make it easier to debug which StyleGuide we're using."""
return f"<StyleGuide [{self.filename}]>"
- def copy(self, filename=None, extend_ignore_with=None):
- # type: (Optional[str], Optional[Sequence[str]]) -> StyleGuide
+ def copy(
+ self,
+ filename: Optional[str] = None,
+ extend_ignore_with: Optional[Sequence[str]] = None,
+ ) -> "StyleGuide":
"""Create a copy of this style guide with different values."""
filename = filename or self.filename
options = copy.deepcopy(self.options)
@@ -483,14 +479,15 @@ class StyleGuide:
)
@contextlib.contextmanager
- def processing_file(self, filename):
- # type: (str) -> Generator[StyleGuide, None, None]
+ def processing_file(
+ self, filename: str
+ ) -> Generator["StyleGuide", None, None]:
"""Record the fact that we're processing the file's results."""
self.formatter.beginning(filename)
yield self
self.formatter.finished(filename)
- def applies_to(self, filename): # type: (str) -> bool
+ def applies_to(self, filename: str) -> bool:
"""Check if this StyleGuide applies to the file.
:param str filename:
@@ -510,8 +507,7 @@ class StyleGuide:
logger=LOG,
)
- def should_report_error(self, code):
- # type: (str) -> Decision
+ def should_report_error(self, code: str) -> Decision:
"""Determine if the error code should be reported or ignored.
This method only cares about the select and ignore rules as specified
@@ -527,14 +523,13 @@ class StyleGuide:
def handle_error(
self,
- code,
- filename,
- line_number,
- column_number,
- text,
- physical_line=None,
- ):
- # type: (str, str, int, Optional[int], str, Optional[str]) -> int
+ code: str,
+ filename: str,
+ line_number: int,
+ column_number: Optional[int],
+ text: str,
+ physical_line: Optional[str] = None,
+ ) -> int:
"""Handle an error reported by a check.
:param str code:
@@ -586,8 +581,7 @@ class StyleGuide:
return 1
return 0
- def add_diff_ranges(self, diffinfo):
- # type: (Dict[str, Set[int]]) -> None
+ def add_diff_ranges(self, diffinfo: Dict[str, Set[int]]) -> None:
"""Update the StyleGuide to filter out information not in the diff.
This provides information to the StyleGuide so that only the errors
@@ -599,14 +593,15 @@ class StyleGuide:
self._parsed_diff = diffinfo
-def find_more_specific(selected, ignored): # type: (str, str) -> Decision
+def find_more_specific(selected: str, ignored: str) -> Decision:
if selected.startswith(ignored) and selected != ignored:
return Decision.Selected
return Decision.Ignored
-def find_first_match(error_code, code_list):
- # type: (str, Tuple[str, ...]) -> Optional[str]
+def find_first_match(
+ error_code: str, code_list: Tuple[str, ...]
+) -> Optional[str]:
startswith = error_code.startswith
for code in code_list:
if startswith(code):
diff --git a/src/flake8/utils.py b/src/flake8/utils.py
index aec5a43..530801d 100644
--- a/src/flake8/utils.py
+++ b/src/flake8/utils.py
@@ -32,8 +32,9 @@ COMMA_SEPARATED_LIST_RE = re.compile(r"[,\s]")
LOCAL_PLUGIN_LIST_RE = re.compile(r"[,\t\n\r\f\v]")
-def parse_comma_separated_list(value, regexp=COMMA_SEPARATED_LIST_RE):
- # type: (str, Pattern[str]) -> List[str]
+def parse_comma_separated_list(
+ value: str, regexp: Pattern[str] = COMMA_SEPARATED_LIST_RE
+) -> List[str]:
"""Parse a comma-separated list.
:param value:
@@ -67,8 +68,7 @@ _FILE_LIST_TOKEN_TYPES = [
]
-def _tokenize_files_to_codes_mapping(value):
- # type: (str) -> List[_Token]
+def _tokenize_files_to_codes_mapping(value: str) -> List[_Token]:
tokens = []
i = 0
while i < len(value):
@@ -85,8 +85,9 @@ def _tokenize_files_to_codes_mapping(value):
return tokens
-def parse_files_to_codes_mapping(value_): # noqa: C901
- # type: (Union[Sequence[str], str]) -> List[Tuple[str, List[str]]]
+def parse_files_to_codes_mapping( # noqa: C901
+ value_: Union[Sequence[str], str]
+) -> List[Tuple[str, List[str]]]:
"""Parse a files-to-codes mapping.
A files-to-codes mapping a sequence of values specified as
@@ -101,17 +102,17 @@ def parse_files_to_codes_mapping(value_): # noqa: C901
else:
value = value_
- ret = [] # type: List[Tuple[str, List[str]]]
+ ret: List[Tuple[str, List[str]]] = []
if not value.strip():
return ret
class State:
seen_sep = True
seen_colon = False
- filenames = [] # type: List[str]
- codes = [] # type: List[str]
+ filenames: List[str] = []
+ codes: List[str] = []
- def _reset(): # type: () -> None
+ def _reset() -> None:
if State.codes:
for filename in State.filenames:
ret.append((filename, State.codes))
@@ -120,8 +121,8 @@ def parse_files_to_codes_mapping(value_): # noqa: C901
State.filenames = []
State.codes = []
- def _unexpected_token(): # type: () -> exceptions.ExecutionError
- def _indent(s): # type: (str) -> str
+ def _unexpected_token() -> exceptions.ExecutionError:
+ def _indent(s: str) -> str:
return " " + s.strip().replace("\n", "\n ")
return exceptions.ExecutionError(
@@ -163,8 +164,9 @@ def parse_files_to_codes_mapping(value_): # noqa: C901
return ret
-def normalize_paths(paths, parent=os.curdir):
- # type: (Sequence[str], str) -> List[str]
+def normalize_paths(
+ paths: Sequence[str], parent: str = os.curdir
+) -> List[str]:
"""Normalize a list of paths relative to a parent directory.
:returns:
@@ -176,8 +178,7 @@ def normalize_paths(paths, parent=os.curdir):
return [normalize_path(p, parent) for p in paths]
-def normalize_path(path, parent=os.curdir):
- # type: (str, str) -> str
+def normalize_path(path: str, parent: str = os.curdir) -> str:
"""Normalize a single-path.
:returns:
@@ -199,7 +200,7 @@ def normalize_path(path, parent=os.curdir):
@functools.lru_cache(maxsize=1)
-def stdin_get_value(): # type: () -> str
+def stdin_get_value() -> str:
"""Get and cache it so plugins can use it."""
stdin_value = sys.stdin.buffer.read()
fd = io.BytesIO(stdin_value)
@@ -211,13 +212,12 @@ def stdin_get_value(): # type: () -> str
return stdin_value.decode("utf-8")
-def stdin_get_lines(): # type: () -> List[str]
+def stdin_get_lines() -> List[str]:
"""Return lines of stdin split according to file splitting."""
return list(io.StringIO(stdin_get_value()))
-def parse_unified_diff(diff=None):
- # type: (Optional[str]) -> Dict[str, Set[int]]
+def parse_unified_diff(diff: Optional[str] = None) -> Dict[str, Set[int]]:
"""Parse the unified diff passed on stdin.
:returns:
@@ -231,7 +231,7 @@ def parse_unified_diff(diff=None):
number_of_rows = None
current_path = None
- parsed_paths = collections.defaultdict(set) # type: Dict[str, Set[int]]
+ parsed_paths: Dict[str, Set[int]] = collections.defaultdict(set)
for line in diff.splitlines():
if number_of_rows:
# NOTE(sigmavirus24): Below we use a slice because stdin may be
@@ -289,8 +289,7 @@ def parse_unified_diff(diff=None):
return parsed_paths
-def is_windows():
- # type: () -> bool
+def is_windows() -> bool:
"""Determine if we're running on Windows.
:returns:
@@ -301,8 +300,7 @@ def is_windows():
return os.name == "nt"
-def is_using_stdin(paths):
- # type: (List[str]) -> bool
+def is_using_stdin(paths: List[str]) -> bool:
"""Determine if we're going to read from stdin.
:param list paths:
@@ -315,12 +313,14 @@ def is_using_stdin(paths):
return "-" in paths
-def _default_predicate(*args): # type: (*str) -> bool
+def _default_predicate(*args: str) -> bool:
return False
-def filenames_from(arg, predicate=None):
- # type: (str, Optional[Callable[[str], bool]]) -> Generator[str, None, None] # noqa: E501
+def filenames_from(
+ arg: str, predicate: Optional[Callable[[str], bool]] = None
+) -> Generator[str, None, None]:
+ # noqa: E501
"""Generate filenames from an argument.
:param str arg:
@@ -360,8 +360,7 @@ def filenames_from(arg, predicate=None):
yield arg
-def fnmatch(filename, patterns):
- # type: (str, Sequence[str]) -> bool
+def fnmatch(filename: str, patterns: Sequence[str]) -> bool:
"""Wrap :func:`fnmatch.fnmatch` to add some functionality.
:param str filename:
@@ -379,8 +378,7 @@ def fnmatch(filename, patterns):
return any(_fnmatch.fnmatch(filename, pattern) for pattern in patterns)
-def parameters_for(plugin):
- # type: (Plugin) -> Dict[str, bool]
+def parameters_for(plugin: "Plugin") -> Dict[str, bool]:
"""Return the parameters for the plugin.
This will inspect the plugin and return either the function parameters
@@ -414,8 +412,12 @@ def parameters_for(plugin):
return parameters
-def matches_filename(path, patterns, log_message, logger):
- # type: (str, Sequence[str], str, logging.Logger) -> bool
+def matches_filename(
+ path: str,
+ patterns: Sequence[str],
+ log_message: str,
+ logger: logging.Logger,
+) -> bool:
"""Use fnmatch to discern if a path exists in patterns.
:param str path:
@@ -447,7 +449,7 @@ def matches_filename(path, patterns, log_message, logger):
return match
-def get_python_version(): # type: () -> str
+def get_python_version() -> str:
"""Find and format the python implementation and version.
:returns: