summaryrefslogtreecommitdiff
path: root/cmd2
diff options
context:
space:
mode:
authorEric Lin <anselor@gmail.com>2021-04-01 13:27:32 -0400
committeranselor <anselor@gmail.com>2021-04-02 18:00:03 -0400
commit48d26a3d668b80f0b319085b0000adbc07054e84 (patch)
tree0f3d4f77458be628e3f25e248b85c76f8fa48921 /cmd2
parent33951e80a4c0436624ec113988760d4a5b23a9f4 (diff)
downloadcmd2-git-48d26a3d668b80f0b319085b0000adbc07054e84.tar.gz
More mypy validation changes. cmd2.py is nearly fully mypy compliant now.
Diffstat (limited to 'cmd2')
-rw-r--r--cmd2/__init__.py8
-rw-r--r--cmd2/argparse_custom.py7
-rw-r--r--cmd2/cmd2.py106
-rw-r--r--cmd2/decorators.py2
-rw-r--r--cmd2/rl_utils.py6
-rw-r--r--cmd2/transcript.py14
-rw-r--r--cmd2/utils.py17
7 files changed, 91 insertions, 69 deletions
diff --git a/cmd2/__init__.py b/cmd2/__init__.py
index 34b16396..e545f394 100644
--- a/cmd2/__init__.py
+++ b/cmd2/__init__.py
@@ -3,10 +3,12 @@
# flake8: noqa F401
"""This simply imports certain things for backwards compatibility."""
-try:
- # For python 3.8 and later
+import sys
+
+# For python 3.8 and late
+if sys.version_info >= (3, 8):
import importlib.metadata as importlib_metadata
-except ImportError:
+else:
# For everyone else
import importlib_metadata
try:
diff --git a/cmd2/argparse_custom.py b/cmd2/argparse_custom.py
index 9c5f5b8b..9ce601c4 100644
--- a/cmd2/argparse_custom.py
+++ b/cmd2/argparse_custom.py
@@ -215,7 +215,6 @@ from typing import (
Tuple,
Type,
Union,
- cast,
)
from . import (
@@ -279,7 +278,7 @@ class CompletionItem(str):
"""
def __new__(cls, value: object, *args: Any, **kwargs: Any) -> 'CompletionItem':
- return cast(CompletionItem, super(CompletionItem, cls).__new__(cls, value)) # type: ignore [call-arg]
+ return super(CompletionItem, cls).__new__(cls, value)
# noinspection PyUnusedLocal
def __init__(self, value: object, desc: str = '', *args: Any) -> None:
@@ -887,7 +886,7 @@ class Cmd2ArgumentParser(argparse.ArgumentParser):
description=description,
epilog=epilog,
parents=parents if parents else [],
- formatter_class=formatter_class,
+ formatter_class=formatter_class, # type: ignore[arg-type]
prefix_chars=prefix_chars,
fromfile_prefix_chars=fromfile_prefix_chars,
argument_default=argument_default,
@@ -930,7 +929,7 @@ class Cmd2ArgumentParser(argparse.ArgumentParser):
formatter = self._get_formatter()
# usage
- formatter.add_usage(self.usage, self._actions, self._mutually_exclusive_groups)
+ formatter.add_usage(self.usage, self._actions, self._mutually_exclusive_groups) # type: ignore[arg-type]
# description
formatter.add_text(self.description)
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py
index 9a06dff2..4a95402e 100644
--- a/cmd2/cmd2.py
+++ b/cmd2/cmd2.py
@@ -54,7 +54,6 @@ from types import (
ModuleType,
)
from typing import (
- IO,
Any,
Callable,
Dict,
@@ -66,6 +65,7 @@ from typing import (
TextIO,
Tuple,
Type,
+ TypeVar,
Union,
cast,
)
@@ -142,7 +142,7 @@ from .utils import (
if rl_type == RlType.NONE: # pragma: no cover
sys.stderr.write(ansi.style_warning(rl_warning))
else:
- from .rl_utils import (
+ from .rl_utils import ( # type: ignore[attr-defined]
readline,
rl_force_redisplay,
)
@@ -1893,9 +1893,6 @@ class Cmd(cmd.Cmd):
# Determine the completer function to use
if custom_settings is None:
- if command is None:
- return
-
# Check if a macro was entered
if command in self.macros:
completer_func = self.path_complete
@@ -1926,7 +1923,7 @@ class Cmd(cmd.Cmd):
# Not a recognized macro or command
else:
# Check if this command should be run as a shell command
- if self.default_to_shell and command in utils.get_exes_in_path(command):
+ if self.default_to_shell and command in utils.get_exes_in_path(cast(str, command)):
completer_func = self.path_complete
else:
completer_func = self.completedefault # type: ignore[assignment]
@@ -2294,8 +2291,8 @@ class Cmd(cmd.Cmd):
# call the postparsing hooks
postparsing_data = plugin.PostparsingData(False, statement)
- for func in self._postparsing_hooks:
- postparsing_data = func(postparsing_data)
+ for postparsing_func in self._postparsing_hooks:
+ postparsing_data = postparsing_func(postparsing_data)
if postparsing_data.stop:
break
@@ -2321,10 +2318,10 @@ class Cmd(cmd.Cmd):
timestart = datetime.datetime.now()
# precommand hooks
- precmd_dataa = plugin.PrecommandData(statement)
- for func in self._precmd_hooks:
- precmd_dataa = func(precmd_dataa)
- statement = precmd_dataa.statement
+ precmd_data = plugin.PrecommandData(statement)
+ for precmd_func in self._precmd_hooks:
+ precmd_data = precmd_func(precmd_data)
+ statement = precmd_data.statement
# call precmd() for compatibility with cmd.Cmd
statement = self.precmd(statement)
@@ -2334,8 +2331,8 @@ class Cmd(cmd.Cmd):
# postcommand hooks
postcmd_data = plugin.PostcommandData(stop, statement)
- for func in self._postcmd_hooks:
- postcmd_data = func(postcmd_data)
+ for postcmd_func in self._postcmd_hooks:
+ postcmd_data = postcmd_func(postcmd_data)
# retrieve the final value of stop, ignoring any statement modification from the hooks
stop = postcmd_data.stop
@@ -2604,7 +2601,9 @@ class Cmd(cmd.Cmd):
import subprocess
# Initialize the redirection saved state
- redir_saved_state = utils.RedirectionSavedState(self.stdout, sys.stdout, self._cur_pipe_proc_reader, self._redirecting)
+ redir_saved_state = utils.RedirectionSavedState(
+ cast(TextIO, self.stdout), sys.stdout, self._cur_pipe_proc_reader, self._redirecting
+ )
# The ProcReader for this command
cmd_pipe_proc_reader: Optional[utils.ProcReader] = None
@@ -2656,7 +2655,7 @@ class Cmd(cmd.Cmd):
raise RedirectionError('Pipe process exited with code {} before command could run'.format(proc.returncode))
else:
redir_saved_state.redirecting = True
- cmd_pipe_proc_reader = utils.ProcReader(proc, self.stdout, sys.stderr)
+ cmd_pipe_proc_reader = utils.ProcReader(proc, cast(TextIO, self.stdout), sys.stderr)
sys.stdout = self.stdout = new_stdout
elif statement.output:
@@ -2713,7 +2712,7 @@ class Cmd(cmd.Cmd):
pass
# Restore the stdout values
- self.stdout = cast(IO[str], saved_redir_state.saved_self_stdout)
+ self.stdout = cast(TextIO, saved_redir_state.saved_self_stdout)
sys.stdout = cast(TextIO, saved_redir_state.saved_sys_stdout)
# Check if we need to wait for the process being piped to
@@ -4206,7 +4205,7 @@ class Cmd(cmd.Cmd):
)
except ImportError:
self.perror("IPython package is not installed")
- return
+ return None
from .py_bridge import (
PyBridge,
@@ -4214,7 +4213,7 @@ class Cmd(cmd.Cmd):
if self.in_pyscript():
self.perror("Recursively entering interactive Python shells is not allowed")
- return
+ return None
try:
self._in_py = True
@@ -4310,13 +4309,13 @@ class Cmd(cmd.Cmd):
if args.clear or args.edit or args.output_file or args.run or args.transcript or args.expanded or args.script:
self.poutput("-v can not be used with any other options")
self.poutput(self.history_parser.format_usage())
- return
+ return None
# -s and -x can only be used if none of these options are present: [-c -r -e -o -t]
if (args.script or args.expanded) and (args.clear or args.edit or args.output_file or args.run or args.transcript):
self.poutput("-s and -x can not be used with -c, -r, -e, -o, or -t")
self.poutput(self.history_parser.format_usage())
- return
+ return None
if args.clear:
# Clear command and readline history
@@ -4329,11 +4328,11 @@ class Cmd(cmd.Cmd):
pass
except OSError as ex:
self.pexcept("Error removing history file '{}': {}".format(self.persistent_history_file, ex))
- return
+ return None
if rl_type != RlType.NONE:
readline.clear_history()
- return
+ return None
# If an argument was supplied, then retrieve partial contents of the history, otherwise retrieve it all
history = self._get_history(args)
@@ -4379,6 +4378,7 @@ class Cmd(cmd.Cmd):
# Display the history items retrieved
for idx, hi in history.items():
self.poutput(hi.pr(idx, script=args.script, expanded=args.expanded, verbose=args.verbose))
+ return None
def _get_history(self, args: argparse.Namespace) -> 'OrderedDict[int, HistoryItem]':
"""If an argument was supplied, then retrieve partial contents of the history; otherwise retrieve entire history.
@@ -4388,7 +4388,7 @@ class Cmd(cmd.Cmd):
if args.arg:
try:
int_arg = int(args.arg)
- return {int_arg: self.history.get(int_arg)}
+ return OrderedDict({int_arg: self.history.get(int_arg)})
except ValueError:
pass
@@ -4486,7 +4486,7 @@ class Cmd(cmd.Cmd):
atexit.register(self._persist_history)
- def _persist_history(self):
+ def _persist_history(self) -> None:
"""Write history out to the history file"""
if not self.persistent_history_file:
return
@@ -4499,7 +4499,7 @@ class Cmd(cmd.Cmd):
msg = "Can not write persistent history file '{}': {}"
self.pexcept(msg.format(self.persistent_history_file, ex))
- def _generate_transcript(self, history: List[Union[HistoryItem, str]], transcript_file: str) -> None:
+ def _generate_transcript(self, history: Union[List[HistoryItem], List[str]], transcript_file: str) -> None:
"""Generate a transcript file from a given history of commands"""
# Validate the transcript file path to make sure directory exists and write access is available
transcript_path = os.path.abspath(os.path.expanduser(transcript_file))
@@ -4542,7 +4542,8 @@ class Cmd(cmd.Cmd):
transcript += command
# Use a StdSim object to capture output
- self.stdout = utils.StdSim(self.stdout)
+ stdsim = utils.StdSim(cast(TextIO, self.stdout))
+ self.stdout = cast(TextIO, stdsim)
# then run the command and let the output go into our buffer
try:
@@ -4554,7 +4555,7 @@ class Cmd(cmd.Cmd):
commands_run += 1
# add the regex-escaped output to the transcript
- transcript += self.stdout.getvalue().replace('/', r'\/')
+ transcript += stdsim.getvalue().replace('/', r'\/')
# check if we are supposed to stop
if stop:
@@ -4667,12 +4668,12 @@ class Cmd(cmd.Cmd):
try:
# An empty file is not an error, so just return
if os.path.getsize(expanded_path) == 0:
- return
+ return None
# Make sure the file is ASCII or UTF-8 encoded text
if not utils.is_text_file(expanded_path):
self.perror(f"'{expanded_path}' is not an ASCII or UTF-8 encoded text file")
- return
+ return None
# Read all lines of the script
with open(expanded_path, encoding='utf-8') as target:
@@ -4696,6 +4697,7 @@ class Cmd(cmd.Cmd):
# Check if a script dir was added before an exception occurred
if orig_script_dir_count != len(self._script_dir):
self._script_dir.pop()
+ return None
relative_run_script_description = run_script_description
relative_run_script_description += (
@@ -4761,10 +4763,10 @@ class Cmd(cmd.Cmd):
self.poutput('cmd2 app: {}'.format(sys.argv[0]))
self.poutput(ansi.style('collected {} transcript{}'.format(num_transcripts, plural), bold=True))
- self.__class__.testfiles = transcripts_expanded
+ setattr(self.__class__, 'testfiles', transcripts_expanded)
sys.argv = [sys.argv[0]] # the --test argument upsets unittest.main()
testcase = TestMyAppCase()
- stream = utils.StdSim(sys.stderr)
+ stream = cast(TextIO, utils.StdSim(sys.stderr))
# noinspection PyTypeChecker
runner = unittest.TextTestRunner(stream=stream)
start_time = time.time()
@@ -5001,7 +5003,7 @@ class Cmd(cmd.Cmd):
if getattr(func, constants.CMD_ATTR_HELP_CATEGORY, None) == category:
self.disable_command(cmd_name, message_to_print)
- def _report_disabled_command_usage(self, *_args, message_to_print: str, **_kwargs) -> None:
+ def _report_disabled_command_usage(self, *_args: Any, message_to_print: str, **_kwargs: Any) -> None:
"""
Report when a disabled command has been run or had help called on it
@@ -5012,7 +5014,7 @@ class Cmd(cmd.Cmd):
# Set apply_style to False so message_to_print's style is not overridden
self.perror(message_to_print, apply_style=False)
- def cmdloop(self, intro: Optional[str] = None) -> int:
+ def cmdloop(self, intro: Optional[str] = None) -> int: # type: ignore[override]
"""This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2.
_cmdloop() provides the main loop equivalent to cmd.cmdloop(). This is a wrapper around that which deals with
@@ -5078,15 +5080,15 @@ class Cmd(cmd.Cmd):
###
def _initialize_plugin_system(self) -> None:
"""Initialize the plugin system"""
- self._preloop_hooks = []
- self._postloop_hooks = []
- self._postparsing_hooks = []
- self._precmd_hooks = []
- self._postcmd_hooks = []
- self._cmdfinalization_hooks = []
+ self._preloop_hooks: List[Callable[[], None]] = []
+ self._postloop_hooks: List[Callable[[], None]] = []
+ self._postparsing_hooks: List[Callable[[plugin.PostparsingData], plugin.PostparsingData]] = []
+ self._precmd_hooks: List[Callable[[plugin.PrecommandData], plugin.PrecommandData]] = []
+ self._postcmd_hooks: List[Callable[[plugin.PostcommandData], plugin.PostcommandData]] = []
+ self._cmdfinalization_hooks: List[Callable[[plugin.CommandFinalizationData], plugin.CommandFinalizationData]] = []
@classmethod
- def _validate_callable_param_count(cls, func: Callable, count: int) -> None:
+ def _validate_callable_param_count(cls, func: Callable[..., Any], count: int) -> None:
"""Ensure a function has the given number of parameters."""
signature = inspect.signature(func)
# validate that the callable has the right number of parameters
@@ -5101,7 +5103,7 @@ class Cmd(cmd.Cmd):
)
@classmethod
- def _validate_prepostloop_callable(cls, func: Callable[[None], None]) -> None:
+ def _validate_prepostloop_callable(cls, func: Callable[[], None]) -> None:
"""Check parameter and return types for preloop and postloop hooks."""
cls._validate_callable_param_count(func, 0)
# make sure there is no return notation
@@ -5113,12 +5115,12 @@ class Cmd(cmd.Cmd):
)
)
- def register_preloop_hook(self, func: Callable[[None], None]) -> None:
+ def register_preloop_hook(self, func: Callable[[], None]) -> None:
"""Register a function to be called at the beginning of the command loop."""
self._validate_prepostloop_callable(func)
self._preloop_hooks.append(func)
- def register_postloop_hook(self, func: Callable[[None], None]) -> None:
+ def register_postloop_hook(self, func: Callable[[], None]) -> None:
"""Register a function to be called at the end of the command loop."""
self._validate_prepostloop_callable(func)
self._postloop_hooks.append(func)
@@ -5126,7 +5128,7 @@ class Cmd(cmd.Cmd):
@classmethod
def _validate_postparsing_callable(cls, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None:
"""Check parameter and return types for postparsing hooks"""
- cls._validate_callable_param_count(func, 1)
+ cls._validate_callable_param_count(cast(Callable[..., Any], func), 1)
signature = inspect.signature(func)
_, param = list(signature.parameters.items())[0]
if param.annotation != plugin.PostparsingData:
@@ -5141,12 +5143,16 @@ class Cmd(cmd.Cmd):
self._validate_postparsing_callable(func)
self._postparsing_hooks.append(func)
+ CommandDataType = TypeVar('CommandDataType')
+
@classmethod
- def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type) -> None:
+ def _validate_prepostcmd_hook(
+ cls, func: Callable[[CommandDataType], CommandDataType], data_type: Type[CommandDataType]
+ ) -> None:
"""Check parameter and return types for pre and post command hooks."""
signature = inspect.signature(func)
# validate that the callable has the right number of parameters
- cls._validate_callable_param_count(func, 1)
+ cls._validate_callable_param_count(cast(Callable[..., Any], func), 1)
# validate the parameter has the right annotation
paramname = list(signature.parameters.keys())[0]
param = signature.parameters[paramname]
@@ -5207,7 +5213,7 @@ class Cmd(cmd.Cmd):
self._validate_cmdfinalization_callable(func)
self._cmdfinalization_hooks.append(func)
- def _resolve_func_self(self, cmd_support_func: Callable, cmd_self: Union[CommandSet, 'Cmd']) -> object:
+ def _resolve_func_self(self, cmd_support_func: Callable[..., Any], cmd_self: Union[CommandSet, 'Cmd']) -> Optional[object]:
"""
Attempt to resolve a candidate instance to pass as 'self' for an unbound class method that was
used when defining command's argparse object. Since we restrict registration to only a single CommandSet
@@ -5218,7 +5224,7 @@ class Cmd(cmd.Cmd):
:return:
"""
# figure out what class the command support function was defined in
- func_class = get_defining_class(cmd_support_func) # type: Optional[Type]
+ func_class: Optional[Type[Any]] = get_defining_class(cmd_support_func)
# Was there a defining class identified? If so, is it a sub-class of CommandSet?
if func_class is not None and issubclass(func_class, CommandSet):
@@ -5229,6 +5235,8 @@ class Cmd(cmd.Cmd):
# 2. Do any of the registered CommandSets in the Cmd2 application exactly match the type?
# 3. Is there a registered CommandSet that is is the only matching subclass?
+ func_self: Optional[Union[CommandSet, 'Cmd']]
+
# check if the command's CommandSet is a sub-class of the support function's defining class
if isinstance(cmd_self, func_class):
# Case 1: Command's CommandSet is a sub-class of the support function's CommandSet
diff --git a/cmd2/decorators.py b/cmd2/decorators.py
index 6d6cdec8..12b3372a 100644
--- a/cmd2/decorators.py
+++ b/cmd2/decorators.py
@@ -108,7 +108,7 @@ def _arg_swap(args: Union[Tuple[Any], List[Any]], search_arg: Any, *replace_arg:
def with_argument_list(
func_arg: Optional[Callable[[List[str]], Optional[bool]]] = None, *, preserve_quotes: bool = False
-) -> Union[Callable[[List[str]], Optional[bool]],]:
+) -> Union[Callable[[List[str]], Optional[bool]]]:
"""
A decorator to alter the arguments passed to a ``do_*`` method. Default
passes a string of whatever the user typed. With this decorator, the
diff --git a/cmd2/rl_utils.py b/cmd2/rl_utils.py
index e56e338c..a8d97fa5 100644
--- a/cmd2/rl_utils.py
+++ b/cmd2/rl_utils.py
@@ -10,13 +10,13 @@ from enum import (
# Prefer statically linked gnureadline if available (for macOS compatibility due to issues with libedit)
try:
# noinspection PyPackageRequirements
- import gnureadline as readline
+ import gnureadline as readline # type: ignore[import]
except ImportError:
# Try to import readline, but allow failure for convenience in Windows unit testing
# Note: If this actually fails, you should install readline on Linux or Mac or pyreadline on Windows
try:
# noinspection PyUnresolvedReferences
- import readline
+ import readline # type: ignore[no-redef]
except ImportError: # pragma: no cover
pass
@@ -182,7 +182,7 @@ def rl_get_point() -> int: # pragma: no cover
return ctypes.c_int.in_dll(readline_lib, "rl_point").value
elif rl_type == RlType.PYREADLINE:
- return readline.rl.mode.l_buffer.point
+ return int(readline.rl.mode.l_buffer.point)
else:
return 0
diff --git a/cmd2/transcript.py b/cmd2/transcript.py
index 5cd59ae8..c336c1a5 100644
--- a/cmd2/transcript.py
+++ b/cmd2/transcript.py
@@ -12,7 +12,11 @@ class is used in cmd2.py::run_transcript_tests()
import re
import unittest
from typing import (
+ TYPE_CHECKING,
+ List,
+ Optional,
Tuple,
+ cast,
)
from . import (
@@ -20,6 +24,11 @@ from . import (
utils,
)
+if TYPE_CHECKING: # pragma: no cover
+ from cmd2 import (
+ Cmd,
+ )
+
class Cmd2TestCase(unittest.TestCase):
"""A unittest class used for transcript testing.
@@ -31,7 +40,7 @@ class Cmd2TestCase(unittest.TestCase):
See example.py
"""
- cmdapp = None
+ cmdapp: Optional['Cmd'] = None
def setUp(self):
if self.cmdapp:
@@ -54,7 +63,8 @@ class Cmd2TestCase(unittest.TestCase):
def _fetchTranscripts(self):
self.transcripts = {}
- for fname in self.cmdapp.testfiles:
+ testfiles = cast(List[str], getattr(self.cmdapp, 'testfiles', []))
+ for fname in testfiles:
tfile = open(fname)
self.transcripts[fname] = iter(tfile.readlines())
tfile.close()
diff --git a/cmd2/utils.py b/cmd2/utils.py
index ec00394e..292f88dc 100644
--- a/cmd2/utils.py
+++ b/cmd2/utils.py
@@ -1,6 +1,5 @@
# coding=utf-8
"""Shared utility functions"""
-
import argparse
import collections
import functools
@@ -17,7 +16,6 @@ from enum import (
Enum,
)
from typing import (
- IO,
TYPE_CHECKING,
Any,
Callable,
@@ -45,6 +43,11 @@ from .argparse_custom import (
if TYPE_CHECKING: # pragma: no cover
import cmd2 # noqa: F401
+ PopenTextIO = subprocess.Popen[bytes]
+
+else:
+ PopenTextIO = subprocess.Popen
+
_T = TypeVar('_T')
@@ -445,7 +448,7 @@ class StdSim:
def __init__(
self,
- inner_stream: Union[TextIO, 'StdSim', IO[str]],
+ inner_stream: Union[TextIO, 'StdSim'],
*,
echo: bool = False,
encoding: str = 'utf-8',
@@ -574,7 +577,7 @@ class ProcReader:
If neither are pipes, then the process will run normally and no output will be captured.
"""
- def __init__(self, proc: subprocess.Popen, stdout: Union[StdSim, IO[str]], stderr: Union[StdSim, IO[str]]) -> None:
+ def __init__(self, proc: PopenTextIO, stdout: Union[StdSim, TextIO], stderr: Union[StdSim, TextIO]) -> None:
"""
ProcReader initializer
:param proc: the Popen process being read from
@@ -650,7 +653,7 @@ class ProcReader:
# Run until process completes
while self._proc.poll() is None:
# noinspection PyUnresolvedReferences
- available = read_stream.peek()
+ available = read_stream.peek() # type: ignore[attr-defined]
if available:
read_stream.read(len(available))
self._write_bytes(write_stream, available)
@@ -699,8 +702,8 @@ class RedirectionSavedState:
def __init__(
self,
- self_stdout: Union[StdSim, IO[str]],
- sys_stdout: Union[StdSim, IO[str]],
+ self_stdout: Union[StdSim, TextIO],
+ sys_stdout: Union[StdSim, TextIO],
pipe_proc_reader: Optional[ProcReader],
saved_redirecting: bool,
) -> None: