diff options
| author | Eric Lin <anselor@gmail.com> | 2021-04-01 13:27:32 -0400 |
|---|---|---|
| committer | anselor <anselor@gmail.com> | 2021-04-02 18:00:03 -0400 |
| commit | 48d26a3d668b80f0b319085b0000adbc07054e84 (patch) | |
| tree | 0f3d4f77458be628e3f25e248b85c76f8fa48921 /cmd2 | |
| parent | 33951e80a4c0436624ec113988760d4a5b23a9f4 (diff) | |
| download | cmd2-git-48d26a3d668b80f0b319085b0000adbc07054e84.tar.gz | |
More mypy validation changes. cmd2.py is nearly fully mypy compliant now.
Diffstat (limited to 'cmd2')
| -rw-r--r-- | cmd2/__init__.py | 8 | ||||
| -rw-r--r-- | cmd2/argparse_custom.py | 7 | ||||
| -rw-r--r-- | cmd2/cmd2.py | 106 | ||||
| -rw-r--r-- | cmd2/decorators.py | 2 | ||||
| -rw-r--r-- | cmd2/rl_utils.py | 6 | ||||
| -rw-r--r-- | cmd2/transcript.py | 14 | ||||
| -rw-r--r-- | cmd2/utils.py | 17 |
7 files changed, 91 insertions, 69 deletions
diff --git a/cmd2/__init__.py b/cmd2/__init__.py index 34b16396..e545f394 100644 --- a/cmd2/__init__.py +++ b/cmd2/__init__.py @@ -3,10 +3,12 @@ # flake8: noqa F401 """This simply imports certain things for backwards compatibility.""" -try: - # For python 3.8 and later +import sys + +# For python 3.8 and late +if sys.version_info >= (3, 8): import importlib.metadata as importlib_metadata -except ImportError: +else: # For everyone else import importlib_metadata try: diff --git a/cmd2/argparse_custom.py b/cmd2/argparse_custom.py index 9c5f5b8b..9ce601c4 100644 --- a/cmd2/argparse_custom.py +++ b/cmd2/argparse_custom.py @@ -215,7 +215,6 @@ from typing import ( Tuple, Type, Union, - cast, ) from . import ( @@ -279,7 +278,7 @@ class CompletionItem(str): """ def __new__(cls, value: object, *args: Any, **kwargs: Any) -> 'CompletionItem': - return cast(CompletionItem, super(CompletionItem, cls).__new__(cls, value)) # type: ignore [call-arg] + return super(CompletionItem, cls).__new__(cls, value) # noinspection PyUnusedLocal def __init__(self, value: object, desc: str = '', *args: Any) -> None: @@ -887,7 +886,7 @@ class Cmd2ArgumentParser(argparse.ArgumentParser): description=description, epilog=epilog, parents=parents if parents else [], - formatter_class=formatter_class, + formatter_class=formatter_class, # type: ignore[arg-type] prefix_chars=prefix_chars, fromfile_prefix_chars=fromfile_prefix_chars, argument_default=argument_default, @@ -930,7 +929,7 @@ class Cmd2ArgumentParser(argparse.ArgumentParser): formatter = self._get_formatter() # usage - formatter.add_usage(self.usage, self._actions, self._mutually_exclusive_groups) + formatter.add_usage(self.usage, self._actions, self._mutually_exclusive_groups) # type: ignore[arg-type] # description formatter.add_text(self.description) diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 9a06dff2..4a95402e 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -54,7 +54,6 @@ from types import ( ModuleType, ) from typing import ( - IO, Any, Callable, Dict, @@ -66,6 +65,7 @@ from typing import ( TextIO, Tuple, Type, + TypeVar, Union, cast, ) @@ -142,7 +142,7 @@ from .utils import ( if rl_type == RlType.NONE: # pragma: no cover sys.stderr.write(ansi.style_warning(rl_warning)) else: - from .rl_utils import ( + from .rl_utils import ( # type: ignore[attr-defined] readline, rl_force_redisplay, ) @@ -1893,9 +1893,6 @@ class Cmd(cmd.Cmd): # Determine the completer function to use if custom_settings is None: - if command is None: - return - # Check if a macro was entered if command in self.macros: completer_func = self.path_complete @@ -1926,7 +1923,7 @@ class Cmd(cmd.Cmd): # Not a recognized macro or command else: # Check if this command should be run as a shell command - if self.default_to_shell and command in utils.get_exes_in_path(command): + if self.default_to_shell and command in utils.get_exes_in_path(cast(str, command)): completer_func = self.path_complete else: completer_func = self.completedefault # type: ignore[assignment] @@ -2294,8 +2291,8 @@ class Cmd(cmd.Cmd): # call the postparsing hooks postparsing_data = plugin.PostparsingData(False, statement) - for func in self._postparsing_hooks: - postparsing_data = func(postparsing_data) + for postparsing_func in self._postparsing_hooks: + postparsing_data = postparsing_func(postparsing_data) if postparsing_data.stop: break @@ -2321,10 +2318,10 @@ class Cmd(cmd.Cmd): timestart = datetime.datetime.now() # precommand hooks - precmd_dataa = plugin.PrecommandData(statement) - for func in self._precmd_hooks: - precmd_dataa = func(precmd_dataa) - statement = precmd_dataa.statement + precmd_data = plugin.PrecommandData(statement) + for precmd_func in self._precmd_hooks: + precmd_data = precmd_func(precmd_data) + statement = precmd_data.statement # call precmd() for compatibility with cmd.Cmd statement = self.precmd(statement) @@ -2334,8 +2331,8 @@ class Cmd(cmd.Cmd): # postcommand hooks postcmd_data = plugin.PostcommandData(stop, statement) - for func in self._postcmd_hooks: - postcmd_data = func(postcmd_data) + for postcmd_func in self._postcmd_hooks: + postcmd_data = postcmd_func(postcmd_data) # retrieve the final value of stop, ignoring any statement modification from the hooks stop = postcmd_data.stop @@ -2604,7 +2601,9 @@ class Cmd(cmd.Cmd): import subprocess # Initialize the redirection saved state - redir_saved_state = utils.RedirectionSavedState(self.stdout, sys.stdout, self._cur_pipe_proc_reader, self._redirecting) + redir_saved_state = utils.RedirectionSavedState( + cast(TextIO, self.stdout), sys.stdout, self._cur_pipe_proc_reader, self._redirecting + ) # The ProcReader for this command cmd_pipe_proc_reader: Optional[utils.ProcReader] = None @@ -2656,7 +2655,7 @@ class Cmd(cmd.Cmd): raise RedirectionError('Pipe process exited with code {} before command could run'.format(proc.returncode)) else: redir_saved_state.redirecting = True - cmd_pipe_proc_reader = utils.ProcReader(proc, self.stdout, sys.stderr) + cmd_pipe_proc_reader = utils.ProcReader(proc, cast(TextIO, self.stdout), sys.stderr) sys.stdout = self.stdout = new_stdout elif statement.output: @@ -2713,7 +2712,7 @@ class Cmd(cmd.Cmd): pass # Restore the stdout values - self.stdout = cast(IO[str], saved_redir_state.saved_self_stdout) + self.stdout = cast(TextIO, saved_redir_state.saved_self_stdout) sys.stdout = cast(TextIO, saved_redir_state.saved_sys_stdout) # Check if we need to wait for the process being piped to @@ -4206,7 +4205,7 @@ class Cmd(cmd.Cmd): ) except ImportError: self.perror("IPython package is not installed") - return + return None from .py_bridge import ( PyBridge, @@ -4214,7 +4213,7 @@ class Cmd(cmd.Cmd): if self.in_pyscript(): self.perror("Recursively entering interactive Python shells is not allowed") - return + return None try: self._in_py = True @@ -4310,13 +4309,13 @@ class Cmd(cmd.Cmd): if args.clear or args.edit or args.output_file or args.run or args.transcript or args.expanded or args.script: self.poutput("-v can not be used with any other options") self.poutput(self.history_parser.format_usage()) - return + return None # -s and -x can only be used if none of these options are present: [-c -r -e -o -t] if (args.script or args.expanded) and (args.clear or args.edit or args.output_file or args.run or args.transcript): self.poutput("-s and -x can not be used with -c, -r, -e, -o, or -t") self.poutput(self.history_parser.format_usage()) - return + return None if args.clear: # Clear command and readline history @@ -4329,11 +4328,11 @@ class Cmd(cmd.Cmd): pass except OSError as ex: self.pexcept("Error removing history file '{}': {}".format(self.persistent_history_file, ex)) - return + return None if rl_type != RlType.NONE: readline.clear_history() - return + return None # If an argument was supplied, then retrieve partial contents of the history, otherwise retrieve it all history = self._get_history(args) @@ -4379,6 +4378,7 @@ class Cmd(cmd.Cmd): # Display the history items retrieved for idx, hi in history.items(): self.poutput(hi.pr(idx, script=args.script, expanded=args.expanded, verbose=args.verbose)) + return None def _get_history(self, args: argparse.Namespace) -> 'OrderedDict[int, HistoryItem]': """If an argument was supplied, then retrieve partial contents of the history; otherwise retrieve entire history. @@ -4388,7 +4388,7 @@ class Cmd(cmd.Cmd): if args.arg: try: int_arg = int(args.arg) - return {int_arg: self.history.get(int_arg)} + return OrderedDict({int_arg: self.history.get(int_arg)}) except ValueError: pass @@ -4486,7 +4486,7 @@ class Cmd(cmd.Cmd): atexit.register(self._persist_history) - def _persist_history(self): + def _persist_history(self) -> None: """Write history out to the history file""" if not self.persistent_history_file: return @@ -4499,7 +4499,7 @@ class Cmd(cmd.Cmd): msg = "Can not write persistent history file '{}': {}" self.pexcept(msg.format(self.persistent_history_file, ex)) - def _generate_transcript(self, history: List[Union[HistoryItem, str]], transcript_file: str) -> None: + def _generate_transcript(self, history: Union[List[HistoryItem], List[str]], transcript_file: str) -> None: """Generate a transcript file from a given history of commands""" # Validate the transcript file path to make sure directory exists and write access is available transcript_path = os.path.abspath(os.path.expanduser(transcript_file)) @@ -4542,7 +4542,8 @@ class Cmd(cmd.Cmd): transcript += command # Use a StdSim object to capture output - self.stdout = utils.StdSim(self.stdout) + stdsim = utils.StdSim(cast(TextIO, self.stdout)) + self.stdout = cast(TextIO, stdsim) # then run the command and let the output go into our buffer try: @@ -4554,7 +4555,7 @@ class Cmd(cmd.Cmd): commands_run += 1 # add the regex-escaped output to the transcript - transcript += self.stdout.getvalue().replace('/', r'\/') + transcript += stdsim.getvalue().replace('/', r'\/') # check if we are supposed to stop if stop: @@ -4667,12 +4668,12 @@ class Cmd(cmd.Cmd): try: # An empty file is not an error, so just return if os.path.getsize(expanded_path) == 0: - return + return None # Make sure the file is ASCII or UTF-8 encoded text if not utils.is_text_file(expanded_path): self.perror(f"'{expanded_path}' is not an ASCII or UTF-8 encoded text file") - return + return None # Read all lines of the script with open(expanded_path, encoding='utf-8') as target: @@ -4696,6 +4697,7 @@ class Cmd(cmd.Cmd): # Check if a script dir was added before an exception occurred if orig_script_dir_count != len(self._script_dir): self._script_dir.pop() + return None relative_run_script_description = run_script_description relative_run_script_description += ( @@ -4761,10 +4763,10 @@ class Cmd(cmd.Cmd): self.poutput('cmd2 app: {}'.format(sys.argv[0])) self.poutput(ansi.style('collected {} transcript{}'.format(num_transcripts, plural), bold=True)) - self.__class__.testfiles = transcripts_expanded + setattr(self.__class__, 'testfiles', transcripts_expanded) sys.argv = [sys.argv[0]] # the --test argument upsets unittest.main() testcase = TestMyAppCase() - stream = utils.StdSim(sys.stderr) + stream = cast(TextIO, utils.StdSim(sys.stderr)) # noinspection PyTypeChecker runner = unittest.TextTestRunner(stream=stream) start_time = time.time() @@ -5001,7 +5003,7 @@ class Cmd(cmd.Cmd): if getattr(func, constants.CMD_ATTR_HELP_CATEGORY, None) == category: self.disable_command(cmd_name, message_to_print) - def _report_disabled_command_usage(self, *_args, message_to_print: str, **_kwargs) -> None: + def _report_disabled_command_usage(self, *_args: Any, message_to_print: str, **_kwargs: Any) -> None: """ Report when a disabled command has been run or had help called on it @@ -5012,7 +5014,7 @@ class Cmd(cmd.Cmd): # Set apply_style to False so message_to_print's style is not overridden self.perror(message_to_print, apply_style=False) - def cmdloop(self, intro: Optional[str] = None) -> int: + def cmdloop(self, intro: Optional[str] = None) -> int: # type: ignore[override] """This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2. _cmdloop() provides the main loop equivalent to cmd.cmdloop(). This is a wrapper around that which deals with @@ -5078,15 +5080,15 @@ class Cmd(cmd.Cmd): ### def _initialize_plugin_system(self) -> None: """Initialize the plugin system""" - self._preloop_hooks = [] - self._postloop_hooks = [] - self._postparsing_hooks = [] - self._precmd_hooks = [] - self._postcmd_hooks = [] - self._cmdfinalization_hooks = [] + self._preloop_hooks: List[Callable[[], None]] = [] + self._postloop_hooks: List[Callable[[], None]] = [] + self._postparsing_hooks: List[Callable[[plugin.PostparsingData], plugin.PostparsingData]] = [] + self._precmd_hooks: List[Callable[[plugin.PrecommandData], plugin.PrecommandData]] = [] + self._postcmd_hooks: List[Callable[[plugin.PostcommandData], plugin.PostcommandData]] = [] + self._cmdfinalization_hooks: List[Callable[[plugin.CommandFinalizationData], plugin.CommandFinalizationData]] = [] @classmethod - def _validate_callable_param_count(cls, func: Callable, count: int) -> None: + def _validate_callable_param_count(cls, func: Callable[..., Any], count: int) -> None: """Ensure a function has the given number of parameters.""" signature = inspect.signature(func) # validate that the callable has the right number of parameters @@ -5101,7 +5103,7 @@ class Cmd(cmd.Cmd): ) @classmethod - def _validate_prepostloop_callable(cls, func: Callable[[None], None]) -> None: + def _validate_prepostloop_callable(cls, func: Callable[[], None]) -> None: """Check parameter and return types for preloop and postloop hooks.""" cls._validate_callable_param_count(func, 0) # make sure there is no return notation @@ -5113,12 +5115,12 @@ class Cmd(cmd.Cmd): ) ) - def register_preloop_hook(self, func: Callable[[None], None]) -> None: + def register_preloop_hook(self, func: Callable[[], None]) -> None: """Register a function to be called at the beginning of the command loop.""" self._validate_prepostloop_callable(func) self._preloop_hooks.append(func) - def register_postloop_hook(self, func: Callable[[None], None]) -> None: + def register_postloop_hook(self, func: Callable[[], None]) -> None: """Register a function to be called at the end of the command loop.""" self._validate_prepostloop_callable(func) self._postloop_hooks.append(func) @@ -5126,7 +5128,7 @@ class Cmd(cmd.Cmd): @classmethod def _validate_postparsing_callable(cls, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None: """Check parameter and return types for postparsing hooks""" - cls._validate_callable_param_count(func, 1) + cls._validate_callable_param_count(cast(Callable[..., Any], func), 1) signature = inspect.signature(func) _, param = list(signature.parameters.items())[0] if param.annotation != plugin.PostparsingData: @@ -5141,12 +5143,16 @@ class Cmd(cmd.Cmd): self._validate_postparsing_callable(func) self._postparsing_hooks.append(func) + CommandDataType = TypeVar('CommandDataType') + @classmethod - def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type) -> None: + def _validate_prepostcmd_hook( + cls, func: Callable[[CommandDataType], CommandDataType], data_type: Type[CommandDataType] + ) -> None: """Check parameter and return types for pre and post command hooks.""" signature = inspect.signature(func) # validate that the callable has the right number of parameters - cls._validate_callable_param_count(func, 1) + cls._validate_callable_param_count(cast(Callable[..., Any], func), 1) # validate the parameter has the right annotation paramname = list(signature.parameters.keys())[0] param = signature.parameters[paramname] @@ -5207,7 +5213,7 @@ class Cmd(cmd.Cmd): self._validate_cmdfinalization_callable(func) self._cmdfinalization_hooks.append(func) - def _resolve_func_self(self, cmd_support_func: Callable, cmd_self: Union[CommandSet, 'Cmd']) -> object: + def _resolve_func_self(self, cmd_support_func: Callable[..., Any], cmd_self: Union[CommandSet, 'Cmd']) -> Optional[object]: """ Attempt to resolve a candidate instance to pass as 'self' for an unbound class method that was used when defining command's argparse object. Since we restrict registration to only a single CommandSet @@ -5218,7 +5224,7 @@ class Cmd(cmd.Cmd): :return: """ # figure out what class the command support function was defined in - func_class = get_defining_class(cmd_support_func) # type: Optional[Type] + func_class: Optional[Type[Any]] = get_defining_class(cmd_support_func) # Was there a defining class identified? If so, is it a sub-class of CommandSet? if func_class is not None and issubclass(func_class, CommandSet): @@ -5229,6 +5235,8 @@ class Cmd(cmd.Cmd): # 2. Do any of the registered CommandSets in the Cmd2 application exactly match the type? # 3. Is there a registered CommandSet that is is the only matching subclass? + func_self: Optional[Union[CommandSet, 'Cmd']] + # check if the command's CommandSet is a sub-class of the support function's defining class if isinstance(cmd_self, func_class): # Case 1: Command's CommandSet is a sub-class of the support function's CommandSet diff --git a/cmd2/decorators.py b/cmd2/decorators.py index 6d6cdec8..12b3372a 100644 --- a/cmd2/decorators.py +++ b/cmd2/decorators.py @@ -108,7 +108,7 @@ def _arg_swap(args: Union[Tuple[Any], List[Any]], search_arg: Any, *replace_arg: def with_argument_list( func_arg: Optional[Callable[[List[str]], Optional[bool]]] = None, *, preserve_quotes: bool = False -) -> Union[Callable[[List[str]], Optional[bool]],]: +) -> Union[Callable[[List[str]], Optional[bool]]]: """ A decorator to alter the arguments passed to a ``do_*`` method. Default passes a string of whatever the user typed. With this decorator, the diff --git a/cmd2/rl_utils.py b/cmd2/rl_utils.py index e56e338c..a8d97fa5 100644 --- a/cmd2/rl_utils.py +++ b/cmd2/rl_utils.py @@ -10,13 +10,13 @@ from enum import ( # Prefer statically linked gnureadline if available (for macOS compatibility due to issues with libedit) try: # noinspection PyPackageRequirements - import gnureadline as readline + import gnureadline as readline # type: ignore[import] except ImportError: # Try to import readline, but allow failure for convenience in Windows unit testing # Note: If this actually fails, you should install readline on Linux or Mac or pyreadline on Windows try: # noinspection PyUnresolvedReferences - import readline + import readline # type: ignore[no-redef] except ImportError: # pragma: no cover pass @@ -182,7 +182,7 @@ def rl_get_point() -> int: # pragma: no cover return ctypes.c_int.in_dll(readline_lib, "rl_point").value elif rl_type == RlType.PYREADLINE: - return readline.rl.mode.l_buffer.point + return int(readline.rl.mode.l_buffer.point) else: return 0 diff --git a/cmd2/transcript.py b/cmd2/transcript.py index 5cd59ae8..c336c1a5 100644 --- a/cmd2/transcript.py +++ b/cmd2/transcript.py @@ -12,7 +12,11 @@ class is used in cmd2.py::run_transcript_tests() import re import unittest from typing import ( + TYPE_CHECKING, + List, + Optional, Tuple, + cast, ) from . import ( @@ -20,6 +24,11 @@ from . import ( utils, ) +if TYPE_CHECKING: # pragma: no cover + from cmd2 import ( + Cmd, + ) + class Cmd2TestCase(unittest.TestCase): """A unittest class used for transcript testing. @@ -31,7 +40,7 @@ class Cmd2TestCase(unittest.TestCase): See example.py """ - cmdapp = None + cmdapp: Optional['Cmd'] = None def setUp(self): if self.cmdapp: @@ -54,7 +63,8 @@ class Cmd2TestCase(unittest.TestCase): def _fetchTranscripts(self): self.transcripts = {} - for fname in self.cmdapp.testfiles: + testfiles = cast(List[str], getattr(self.cmdapp, 'testfiles', [])) + for fname in testfiles: tfile = open(fname) self.transcripts[fname] = iter(tfile.readlines()) tfile.close() diff --git a/cmd2/utils.py b/cmd2/utils.py index ec00394e..292f88dc 100644 --- a/cmd2/utils.py +++ b/cmd2/utils.py @@ -1,6 +1,5 @@ # coding=utf-8 """Shared utility functions""" - import argparse import collections import functools @@ -17,7 +16,6 @@ from enum import ( Enum, ) from typing import ( - IO, TYPE_CHECKING, Any, Callable, @@ -45,6 +43,11 @@ from .argparse_custom import ( if TYPE_CHECKING: # pragma: no cover import cmd2 # noqa: F401 + PopenTextIO = subprocess.Popen[bytes] + +else: + PopenTextIO = subprocess.Popen + _T = TypeVar('_T') @@ -445,7 +448,7 @@ class StdSim: def __init__( self, - inner_stream: Union[TextIO, 'StdSim', IO[str]], + inner_stream: Union[TextIO, 'StdSim'], *, echo: bool = False, encoding: str = 'utf-8', @@ -574,7 +577,7 @@ class ProcReader: If neither are pipes, then the process will run normally and no output will be captured. """ - def __init__(self, proc: subprocess.Popen, stdout: Union[StdSim, IO[str]], stderr: Union[StdSim, IO[str]]) -> None: + def __init__(self, proc: PopenTextIO, stdout: Union[StdSim, TextIO], stderr: Union[StdSim, TextIO]) -> None: """ ProcReader initializer :param proc: the Popen process being read from @@ -650,7 +653,7 @@ class ProcReader: # Run until process completes while self._proc.poll() is None: # noinspection PyUnresolvedReferences - available = read_stream.peek() + available = read_stream.peek() # type: ignore[attr-defined] if available: read_stream.read(len(available)) self._write_bytes(write_stream, available) @@ -699,8 +702,8 @@ class RedirectionSavedState: def __init__( self, - self_stdout: Union[StdSim, IO[str]], - sys_stdout: Union[StdSim, IO[str]], + self_stdout: Union[StdSim, TextIO], + sys_stdout: Union[StdSim, TextIO], pipe_proc_reader: Optional[ProcReader], saved_redirecting: bool, ) -> None: |
