diff options
Diffstat (limited to 'src/flake8/checker.py')
| -rw-r--r-- | src/flake8/checker.py | 63 |
1 files changed, 32 insertions, 31 deletions
diff --git a/src/flake8/checker.py b/src/flake8/checker.py index 15ca9bc..ceff965 100644 --- a/src/flake8/checker.py +++ b/src/flake8/checker.py @@ -85,8 +85,8 @@ class Manager: self.options = style_guide.options self.checks = checker_plugins self.jobs = self._job_count() - self._all_checkers = [] # type: List[FileChecker] - self.checkers = [] # type: List[FileChecker] + self._all_checkers: List[FileChecker] = [] + self.checkers: List[FileChecker] = [] self.statistics = { "files": 0, "logical lines": 0, @@ -103,8 +103,7 @@ class Manager: self.statistics[statistic] += checker.statistics[statistic] self.statistics["files"] += len(self.checkers) - def _job_count(self): - # type: () -> int + def _job_count(self) -> int: # First we walk through all of our error cases: # - multiprocessing library is not present # - we're running on windows in which case we know we have significant @@ -165,8 +164,7 @@ class Manager: ) return reported_results_count - def is_path_excluded(self, path): - # type: (str) -> bool + def is_path_excluded(self, path: str) -> bool: """Check if a path is excluded. :param str path: @@ -189,8 +187,7 @@ class Manager: logger=LOG, ) - def make_checkers(self, paths=None): - # type: (Optional[List[str]]) -> None + def make_checkers(self, paths: Optional[List[str]] = None) -> None: """Create checkers for each file.""" if paths is None: paths = self.arguments @@ -235,8 +232,7 @@ class Manager: self.checkers = [c for c in self._all_checkers if c.should_process] LOG.info("Checking %d files", len(self.checkers)) - def report(self): - # type: () -> Tuple[int, int] + def report(self) -> Tuple[int, int]: """Report all of the errors found in the managed file checkers. This iterates over each of the checkers and reports the errors sorted @@ -258,11 +254,11 @@ class Manager: results_found += len(results) return (results_found, results_reported) - def run_parallel(self): # type: () -> None + def run_parallel(self) -> None: """Run the checkers in parallel.""" # fmt: off - final_results = collections.defaultdict(list) # type: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] # noqa: E501 - final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, int]] # noqa: E501 + final_results: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] = collections.defaultdict(list) # noqa: E501 + final_statistics: Dict[str, Dict[str, int]] = collections.defaultdict(dict) # noqa: E501 # fmt: on pool = _try_initialize_processpool(self.jobs) @@ -297,12 +293,12 @@ class Manager: checker.results = final_results[filename] checker.statistics = final_statistics[filename] - def run_serial(self): # type: () -> None + def run_serial(self) -> None: """Run the checkers in serial.""" for checker in self.checkers: checker.run_checks() - def run(self): # type: () -> None + def run(self) -> None: """Run all the checkers. This will intelligently decide whether to run the checks in parallel @@ -356,9 +352,7 @@ class FileChecker: self.options = options self.filename = filename self.checks = checks - # fmt: off - self.results = [] # type: List[Tuple[str, int, int, str, Optional[str]]] # noqa: E501 - # fmt: on + self.results: List[Tuple[str, int, int, str, Optional[str]]] = [] self.statistics = { "tokens": 0, "logical lines": 0, @@ -372,12 +366,11 @@ class FileChecker: self.should_process = not self.processor.should_ignore_file() self.statistics["physical lines"] = len(self.processor.lines) - def __repr__(self): # type: () -> str + def __repr__(self) -> str: """Provide helpful debugging representation.""" return f"FileChecker for {self.filename}" - def _make_processor(self): - # type: () -> Optional[processor.FileProcessor] + def _make_processor(self) -> Optional[processor.FileProcessor]: try: return processor.FileProcessor(self.filename, self.options) except OSError as e: @@ -391,8 +384,13 @@ class FileChecker: self.report("E902", 0, 0, message) return None - def report(self, error_code, line_number, column, text): - # type: (Optional[str], int, int, str) -> str + def report( + self, + error_code: Optional[str], + line_number: int, + column: int, + text: str, + ) -> str: """Report an error by storing it in the results list.""" if error_code is None: error_code, text = text.split(" ", 1) @@ -468,7 +466,7 @@ class FileChecker: column -= column_offset return row, column - def run_ast_checks(self): # type: () -> None + def run_ast_checks(self) -> None: """Run all checks expecting an abstract syntax tree.""" try: ast = self.processor.build_ast() @@ -610,8 +608,9 @@ class FileChecker: else: self.run_logical_checks() - def check_physical_eol(self, token, prev_physical): - # type: (processor._Token, str) -> None + def check_physical_eol( + self, token: processor._Token, prev_physical: str + ) -> None: """Run physical checks if and only if it is at the end of the line.""" # a newline token ends a single physical line. if processor.is_eol_token(token): @@ -640,13 +639,14 @@ class FileChecker: self.run_physical_checks(line + "\n") -def _pool_init(): # type: () -> None +def _pool_init() -> None: """Ensure correct signaling of ^C using multiprocessing.Pool.""" signal.signal(signal.SIGINT, signal.SIG_IGN) -def _try_initialize_processpool(job_count): - # type: (int) -> Optional[multiprocessing.pool.Pool] +def _try_initialize_processpool( + job_count: int, +) -> Optional[multiprocessing.pool.Pool]: """Return a new process pool instance if we are able to create one.""" try: return multiprocessing.Pool(job_count, _pool_init) @@ -675,8 +675,9 @@ def _run_checks(checker): return checker.run_checks() -def find_offset(offset, mapping): - # type: (int, processor._LogicalMapping) -> Tuple[int, int] +def find_offset( + offset: int, mapping: processor._LogicalMapping +) -> Tuple[int, int]: """Find the offset tuple for a single offset.""" if isinstance(offset, tuple): return offset |
