summaryrefslogtreecommitdiff
path: root/src/flake8/checker.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/flake8/checker.py')
-rw-r--r--src/flake8/checker.py63
1 files changed, 32 insertions, 31 deletions
diff --git a/src/flake8/checker.py b/src/flake8/checker.py
index 15ca9bc..ceff965 100644
--- a/src/flake8/checker.py
+++ b/src/flake8/checker.py
@@ -85,8 +85,8 @@ class Manager:
self.options = style_guide.options
self.checks = checker_plugins
self.jobs = self._job_count()
- self._all_checkers = [] # type: List[FileChecker]
- self.checkers = [] # type: List[FileChecker]
+ self._all_checkers: List[FileChecker] = []
+ self.checkers: List[FileChecker] = []
self.statistics = {
"files": 0,
"logical lines": 0,
@@ -103,8 +103,7 @@ class Manager:
self.statistics[statistic] += checker.statistics[statistic]
self.statistics["files"] += len(self.checkers)
- def _job_count(self):
- # type: () -> int
+ def _job_count(self) -> int:
# First we walk through all of our error cases:
# - multiprocessing library is not present
# - we're running on windows in which case we know we have significant
@@ -165,8 +164,7 @@ class Manager:
)
return reported_results_count
- def is_path_excluded(self, path):
- # type: (str) -> bool
+ def is_path_excluded(self, path: str) -> bool:
"""Check if a path is excluded.
:param str path:
@@ -189,8 +187,7 @@ class Manager:
logger=LOG,
)
- def make_checkers(self, paths=None):
- # type: (Optional[List[str]]) -> None
+ def make_checkers(self, paths: Optional[List[str]] = None) -> None:
"""Create checkers for each file."""
if paths is None:
paths = self.arguments
@@ -235,8 +232,7 @@ class Manager:
self.checkers = [c for c in self._all_checkers if c.should_process]
LOG.info("Checking %d files", len(self.checkers))
- def report(self):
- # type: () -> Tuple[int, int]
+ def report(self) -> Tuple[int, int]:
"""Report all of the errors found in the managed file checkers.
This iterates over each of the checkers and reports the errors sorted
@@ -258,11 +254,11 @@ class Manager:
results_found += len(results)
return (results_found, results_reported)
- def run_parallel(self): # type: () -> None
+ def run_parallel(self) -> None:
"""Run the checkers in parallel."""
# fmt: off
- final_results = collections.defaultdict(list) # type: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] # noqa: E501
- final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, int]] # noqa: E501
+ final_results: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] = collections.defaultdict(list) # noqa: E501
+ final_statistics: Dict[str, Dict[str, int]] = collections.defaultdict(dict) # noqa: E501
# fmt: on
pool = _try_initialize_processpool(self.jobs)
@@ -297,12 +293,12 @@ class Manager:
checker.results = final_results[filename]
checker.statistics = final_statistics[filename]
- def run_serial(self): # type: () -> None
+ def run_serial(self) -> None:
"""Run the checkers in serial."""
for checker in self.checkers:
checker.run_checks()
- def run(self): # type: () -> None
+ def run(self) -> None:
"""Run all the checkers.
This will intelligently decide whether to run the checks in parallel
@@ -356,9 +352,7 @@ class FileChecker:
self.options = options
self.filename = filename
self.checks = checks
- # fmt: off
- self.results = [] # type: List[Tuple[str, int, int, str, Optional[str]]] # noqa: E501
- # fmt: on
+ self.results: List[Tuple[str, int, int, str, Optional[str]]] = []
self.statistics = {
"tokens": 0,
"logical lines": 0,
@@ -372,12 +366,11 @@ class FileChecker:
self.should_process = not self.processor.should_ignore_file()
self.statistics["physical lines"] = len(self.processor.lines)
- def __repr__(self): # type: () -> str
+ def __repr__(self) -> str:
"""Provide helpful debugging representation."""
return f"FileChecker for {self.filename}"
- def _make_processor(self):
- # type: () -> Optional[processor.FileProcessor]
+ def _make_processor(self) -> Optional[processor.FileProcessor]:
try:
return processor.FileProcessor(self.filename, self.options)
except OSError as e:
@@ -391,8 +384,13 @@ class FileChecker:
self.report("E902", 0, 0, message)
return None
- def report(self, error_code, line_number, column, text):
- # type: (Optional[str], int, int, str) -> str
+ def report(
+ self,
+ error_code: Optional[str],
+ line_number: int,
+ column: int,
+ text: str,
+ ) -> str:
"""Report an error by storing it in the results list."""
if error_code is None:
error_code, text = text.split(" ", 1)
@@ -468,7 +466,7 @@ class FileChecker:
column -= column_offset
return row, column
- def run_ast_checks(self): # type: () -> None
+ def run_ast_checks(self) -> None:
"""Run all checks expecting an abstract syntax tree."""
try:
ast = self.processor.build_ast()
@@ -610,8 +608,9 @@ class FileChecker:
else:
self.run_logical_checks()
- def check_physical_eol(self, token, prev_physical):
- # type: (processor._Token, str) -> None
+ def check_physical_eol(
+ self, token: processor._Token, prev_physical: str
+ ) -> None:
"""Run physical checks if and only if it is at the end of the line."""
# a newline token ends a single physical line.
if processor.is_eol_token(token):
@@ -640,13 +639,14 @@ class FileChecker:
self.run_physical_checks(line + "\n")
-def _pool_init(): # type: () -> None
+def _pool_init() -> None:
"""Ensure correct signaling of ^C using multiprocessing.Pool."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
-def _try_initialize_processpool(job_count):
- # type: (int) -> Optional[multiprocessing.pool.Pool]
+def _try_initialize_processpool(
+ job_count: int,
+) -> Optional[multiprocessing.pool.Pool]:
"""Return a new process pool instance if we are able to create one."""
try:
return multiprocessing.Pool(job_count, _pool_init)
@@ -675,8 +675,9 @@ def _run_checks(checker):
return checker.run_checks()
-def find_offset(offset, mapping):
- # type: (int, processor._LogicalMapping) -> Tuple[int, int]
+def find_offset(
+ offset: int, mapping: processor._LogicalMapping
+) -> Tuple[int, int]:
"""Find the offset tuple for a single offset."""
if isinstance(offset, tuple):
return offset