summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <sebastian.thiel@icloud.com>2021-05-13 10:25:42 +0800
committerSebastian Thiel <sebastian.thiel@icloud.com>2021-05-13 10:25:42 +0800
commit34356322ca137ae6183dfdd8ea6634b64512591a (patch)
tree0dfd3536a1b207d7518c27c7ccae99fe434ba56f
parent2448ac4ca337665eb22b9dd5ca096ef625a8f52b (diff)
parentfed0cadffd20e48bed8e78fd51a245ad666c54f6 (diff)
downloadgitpython-34356322ca137ae6183dfdd8ea6634b64512591a.tar.gz
Merge branch 'addtypes'
-rw-r--r--git/cmd.py308
-rw-r--r--git/compat.py15
-rw-r--r--git/config.py190
-rw-r--r--git/diff.py10
-rw-r--r--git/exc.py15
-rw-r--r--git/repo/base.py3
-rw-r--r--git/types.py8
-rw-r--r--git/util.py48
8 files changed, 390 insertions, 207 deletions
diff --git a/git/cmd.py b/git/cmd.py
index ac3ca2ec..d46ccef3 100644
--- a/git/cmd.py
+++ b/git/cmd.py
@@ -40,6 +40,20 @@ from .util import (
stream_copy,
)
+# typing ---------------------------------------------------------------------------
+
+from typing import (Any, AnyStr, BinaryIO, Callable, Dict, IO, List, Mapping,
+ Sequence, TYPE_CHECKING, TextIO, Tuple, Union, cast, overload)
+
+from git.types import PathLike, Literal, TBD
+
+if TYPE_CHECKING:
+ from git.repo.base import Repo
+ from git.diff import DiffIndex
+
+
+# ---------------------------------------------------------------------------------
+
execute_kwargs = {'istream', 'with_extended_output',
'with_exceptions', 'as_process', 'stdout_as_string',
'output_stream', 'with_stdout', 'kill_after_timeout',
@@ -57,8 +71,17 @@ __all__ = ('Git',)
# Documentation
## @{
-def handle_process_output(process, stdout_handler, stderr_handler,
- finalizer=None, decode_streams=True):
+def handle_process_output(process: subprocess.Popen,
+ stdout_handler: Union[None,
+ Callable[[AnyStr], None],
+ Callable[[List[AnyStr]], None],
+ Callable[[bytes, 'Repo', 'DiffIndex'], None]],
+ stderr_handler: Union[None,
+ Callable[[AnyStr], None],
+ Callable[[List[AnyStr]], None]],
+ finalizer: Union[None,
+ Callable[[subprocess.Popen], None]] = None,
+ decode_streams: bool = True) -> None:
"""Registers for notifications to learn that process output is ready to read, and dispatches lines to
the respective line handlers.
This function returns once the finalizer returns
@@ -75,13 +98,17 @@ def handle_process_output(process, stdout_handler, stderr_handler,
or if decoding must happen later (i.e. for Diffs).
"""
# Use 2 "pump" threads and wait for both to finish.
- def pump_stream(cmdline, name, stream, is_decode, handler):
+ def pump_stream(cmdline: str, name: str, stream: Union[BinaryIO, TextIO], is_decode: bool,
+ handler: Union[None, Callable[[Union[bytes, str]], None]]) -> None:
try:
for line in stream:
if handler:
if is_decode:
- line = line.decode(defenc)
- handler(line)
+ assert isinstance(line, bytes)
+ line_str = line.decode(defenc)
+ handler(line_str)
+ else:
+ handler(line)
except Exception as ex:
log.error("Pumping %r of cmd(%s) failed due to: %r", name, remove_password_if_present(cmdline), ex)
raise CommandError(['<%s-pump>' % name] + remove_password_if_present(cmdline), ex) from ex
@@ -114,17 +141,20 @@ def handle_process_output(process, stdout_handler, stderr_handler,
if finalizer:
return finalizer(process)
+ else:
+ return None
-def dashify(string):
+def dashify(string: str) -> str:
return string.replace('_', '-')
-def slots_to_dict(self, exclude=()):
+def slots_to_dict(self, exclude: Sequence[str] = ()) -> Dict[str, Any]:
+ # annotate self.__slots__ as Tuple[str, ...] once 3.5 dropped
return {s: getattr(self, s) for s in self.__slots__ if s not in exclude}
-def dict_to_slots_and__excluded_are_none(self, d, excluded=()):
+def dict_to_slots_and__excluded_are_none(self, d: Mapping[str, Any], excluded: Sequence[str] = ()) -> None:
for k, v in d.items():
setattr(self, k, v)
for k in excluded:
@@ -163,10 +193,10 @@ class Git(LazyMixin):
_excluded_ = ('cat_file_all', 'cat_file_header', '_version_info')
- def __getstate__(self):
+ def __getstate__(self) -> Dict[str, Any]:
return slots_to_dict(self, exclude=self._excluded_)
- def __setstate__(self, d):
+ def __setstate__(self, d) -> None:
dict_to_slots_and__excluded_are_none(self, d, excluded=self._excluded_)
# CONFIGURATION
@@ -190,7 +220,7 @@ class Git(LazyMixin):
# the top level __init__
@classmethod
- def refresh(cls, path=None):
+ def refresh(cls, path: Union[None, PathLike] = None) -> bool:
"""This gets called by the refresh function (see the top level
__init__).
"""
@@ -305,11 +335,21 @@ class Git(LazyMixin):
return has_git
@classmethod
- def is_cygwin(cls):
+ def is_cygwin(cls) -> bool:
return is_cygwin_git(cls.GIT_PYTHON_GIT_EXECUTABLE)
+ @overload
@classmethod
- def polish_url(cls, url, is_cygwin=None):
+ def polish_url(cls, url: str, is_cygwin: Literal[False] = ...) -> str:
+ ...
+
+ @overload
+ @classmethod
+ def polish_url(cls, url: PathLike, is_cygwin: Union[None, bool] = None) -> str:
+ ...
+
+ @classmethod
+ def polish_url(cls, url: PathLike, is_cygwin: Union[None, bool] = None) -> PathLike:
if is_cygwin is None:
is_cygwin = cls.is_cygwin()
@@ -326,7 +366,6 @@ class Git(LazyMixin):
if url.startswith('~'):
url = os.path.expanduser(url)
url = url.replace("\\\\", "\\").replace("\\", "/")
-
return url
class AutoInterrupt(object):
@@ -339,11 +378,11 @@ class Git(LazyMixin):
__slots__ = ("proc", "args")
- def __init__(self, proc, args):
+ def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None:
self.proc = proc
self.args = args
- def __del__(self):
+ def __del__(self) -> None:
if self.proc is None:
return
@@ -359,13 +398,13 @@ class Git(LazyMixin):
# did the process finish already so we have a return code ?
try:
if proc.poll() is not None:
- return
+ return None
except OSError as ex:
log.info("Ignored error after process had died: %r", ex)
# can be that nothing really exists anymore ...
if os is None or getattr(os, 'kill', None) is None:
- return
+ return None
# try to kill it
try:
@@ -382,10 +421,11 @@ class Git(LazyMixin):
call(("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)), shell=True)
# END exception handling
- def __getattr__(self, attr):
+ def __getattr__(self, attr: str) -> Any:
return getattr(self.proc, attr)
- def wait(self, stderr=b''): # TODO: Bad choice to mimic `proc.wait()` but with different args.
+ # TODO: Bad choice to mimic `proc.wait()` but with different args.
+ def wait(self, stderr: Union[None, bytes] = b'') -> int:
"""Wait for the process and return its status code.
:param stderr: Previously read value of stderr, in case stderr is already closed.
@@ -395,20 +435,22 @@ class Git(LazyMixin):
stderr = b''
stderr = force_bytes(data=stderr, encoding='utf-8')
- status = self.proc.wait()
+ if self.proc is not None:
+ status = self.proc.wait()
- def read_all_from_possibly_closed_stream(stream):
- try:
- return stderr + force_bytes(stream.read())
- except ValueError:
- return stderr or b''
-
- if status != 0:
- errstr = read_all_from_possibly_closed_stream(self.proc.stderr)
- log.debug('AutoInterrupt wait stderr: %r' % (errstr,))
- raise GitCommandError(remove_password_if_present(self.args), status, errstr)
+ def read_all_from_possibly_closed_stream(stream):
+ try:
+ return stderr + force_bytes(stream.read())
+ except ValueError:
+ return stderr or b''
+
+ if status != 0:
+ errstr = read_all_from_possibly_closed_stream(self.proc.stderr)
+ log.debug('AutoInterrupt wait stderr: %r' % (errstr,))
+ raise GitCommandError(remove_password_if_present(self.args), status, errstr)
# END status handling
return status
+
# END auto interrupt
class CatFileContentStream(object):
@@ -422,7 +464,7 @@ class Git(LazyMixin):
__slots__ = ('_stream', '_nbr', '_size')
- def __init__(self, size, stream):
+ def __init__(self, size: int, stream: IO[bytes]) -> None:
self._stream = stream
self._size = size
self._nbr = 0 # num bytes read
@@ -433,7 +475,7 @@ class Git(LazyMixin):
stream.read(1)
# END handle empty streams
- def read(self, size=-1):
+ def read(self, size: int = -1) -> bytes:
bytes_left = self._size - self._nbr
if bytes_left == 0:
return b''
@@ -453,7 +495,7 @@ class Git(LazyMixin):
# END finish reading
return data
- def readline(self, size=-1):
+ def readline(self, size: int = -1) -> bytes:
if self._nbr == self._size:
return b''
@@ -475,7 +517,7 @@ class Git(LazyMixin):
return data
- def readlines(self, size=-1):
+ def readlines(self, size: int = -1) -> List[bytes]:
if self._nbr == self._size:
return []
@@ -496,20 +538,20 @@ class Git(LazyMixin):
return out
# skipcq: PYL-E0301
- def __iter__(self):
+ def __iter__(self) -> 'Git.CatFileContentStream':
return self
- def __next__(self):
+ def __next__(self) -> bytes:
return self.next()
- def next(self):
+ def next(self) -> bytes:
line = self.readline()
if not line:
raise StopIteration
return line
- def __del__(self):
+ def __del__(self) -> None:
bytes_left = self._size - self._nbr
if bytes_left:
# read and discard - seeking is impossible within a stream
@@ -517,7 +559,7 @@ class Git(LazyMixin):
self._stream.read(bytes_left + 1)
# END handle incomplete read
- def __init__(self, working_dir=None):
+ def __init__(self, working_dir: Union[None, PathLike] = None):
"""Initialize this instance with:
:param working_dir:
@@ -527,17 +569,17 @@ class Git(LazyMixin):
.git directory in case of bare repositories."""
super(Git, self).__init__()
self._working_dir = expand_path(working_dir)
- self._git_options = ()
- self._persistent_git_options = []
+ self._git_options = () # type: Union[List[str], Tuple[str, ...]]
+ self._persistent_git_options = [] # type: List[str]
# Extra environment variables to pass to git commands
- self._environment = {}
+ self._environment = {} # type: Dict[str, str]
# cached command slots
self.cat_file_header = None
self.cat_file_all = None
- def __getattr__(self, name):
+ def __getattr__(self, name: str) -> Any:
"""A convenience method as it allows to call the command as if it was
an object.
:return: Callable object that will execute call _call_process with your arguments."""
@@ -545,7 +587,7 @@ class Git(LazyMixin):
return LazyMixin.__getattr__(self, name)
return lambda *args, **kwargs: self._call_process(name, *args, **kwargs)
- def set_persistent_git_options(self, **kwargs):
+ def set_persistent_git_options(self, **kwargs: Any) -> None:
"""Specify command line options to the git executable
for subsequent subcommand calls
@@ -559,43 +601,94 @@ class Git(LazyMixin):
self._persistent_git_options = self.transform_kwargs(
split_single_char_options=True, **kwargs)
- def _set_cache_(self, attr):
+ def _set_cache_(self, attr: str) -> None:
if attr == '_version_info':
# We only use the first 4 numbers, as everything else could be strings in fact (on windows)
- version_numbers = self._call_process('version').split(' ')[2]
- self._version_info = tuple(int(n) for n in version_numbers.split('.')[:4] if n.isdigit())
+ process_version = self._call_process('version') # should be as default *args and **kwargs used
+ version_numbers = process_version.split(' ')[2]
+
+ self._version_info = tuple(
+ int(n) for n in version_numbers.split('.')[:4] if n.isdigit()
+ ) # type: Tuple[int, int, int, int] # type: ignore
else:
super(Git, self)._set_cache_(attr)
# END handle version info
@property
- def working_dir(self):
+ def working_dir(self) -> Union[None, str]:
""":return: Git directory we are working on"""
return self._working_dir
@property
- def version_info(self):
+ def version_info(self) -> Tuple[int, int, int, int]:
"""
:return: tuple(int, int, int, int) tuple with integers representing the major, minor
and additional version numbers as parsed from git version.
This value is generated on demand and is cached"""
return self._version_info
- def execute(self, command,
- istream=None,
- with_extended_output=False,
- with_exceptions=True,
- as_process=False,
- output_stream=None,
- stdout_as_string=True,
- kill_after_timeout=None,
- with_stdout=True,
- universal_newlines=False,
- shell=None,
- env=None,
- max_chunk_size=io.DEFAULT_BUFFER_SIZE,
- **subprocess_kwargs
- ):
+ @overload
+ def execute(self,
+ command: Union[str, Sequence[Any]],
+ *,
+ as_process: Literal[True]
+ ) -> 'AutoInterrupt':
+ ...
+
+ @overload
+ def execute(self,
+ command: Union[str, Sequence[Any]],
+ *,
+ as_process: Literal[False] = False,
+ stdout_as_string: Literal[True]
+ ) -> Union[str, Tuple[int, str, str]]:
+ ...
+
+ @overload
+ def execute(self,
+ command: Union[str, Sequence[Any]],
+ *,
+ as_process: Literal[False] = False,
+ stdout_as_string: Literal[False] = False
+ ) -> Union[bytes, Tuple[int, bytes, str]]:
+ ...
+
+ @overload
+ def execute(self,
+ command: Union[str, Sequence[Any]],
+ *,
+ with_extended_output: Literal[False],
+ as_process: Literal[False],
+ stdout_as_string: Literal[True]
+ ) -> str:
+ ...
+
+ @overload
+ def execute(self,
+ command: Union[str, Sequence[Any]],
+ *,
+ with_extended_output: Literal[False],
+ as_process: Literal[False],
+ stdout_as_string: Literal[False]
+ ) -> bytes:
+ ...
+
+ def execute(self,
+ command: Union[str, Sequence[Any]],
+ istream: Union[None, BinaryIO] = None,
+ with_extended_output: bool = False,
+ with_exceptions: bool = True,
+ as_process: bool = False,
+ output_stream: Union[None, BinaryIO] = None,
+ stdout_as_string: bool = True,
+ kill_after_timeout: Union[None, int] = None,
+ with_stdout: bool = True,
+ universal_newlines: bool = False,
+ shell: Union[None, bool] = None,
+ env: Union[None, Mapping[str, str]] = None,
+ max_chunk_size: int = io.DEFAULT_BUFFER_SIZE,
+ **subprocess_kwargs: Any
+ ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]:
"""Handles executing the command on the shell and consumes and returns
the returned information (stdout)
@@ -737,22 +830,28 @@ class Git(LazyMixin):
creationflags=PROC_CREATIONFLAGS,
**subprocess_kwargs
)
+
except cmd_not_found_exception as err:
raise GitCommandNotFound(redacted_command, err) from err
+ else:
+ proc = cast(Popen, proc)
+ proc.stdout = cast(BinaryIO, proc.stdout)
+ proc.stderr = cast(BinaryIO, proc.stderr)
if as_process:
return self.AutoInterrupt(proc, command)
- def _kill_process(pid):
+ def _kill_process(pid: int) -> None:
""" Callback method to kill a process. """
p = Popen(['ps', '--ppid', str(pid)], stdout=PIPE,
creationflags=PROC_CREATIONFLAGS)
child_pids = []
- for line in p.stdout:
- if len(line.split()) > 0:
- local_pid = (line.split())[0]
- if local_pid.isdigit():
- child_pids.append(int(local_pid))
+ if p.stdout is not None:
+ for line in p.stdout:
+ if len(line.split()) > 0:
+ local_pid = (line.split())[0]
+ if local_pid.isdigit():
+ child_pids.append(int(local_pid))
try:
# Windows does not have SIGKILL, so use SIGTERM instead
sig = getattr(signal, 'SIGKILL', signal.SIGTERM)
@@ -776,8 +875,8 @@ class Git(LazyMixin):
# Wait for the process to return
status = 0
- stdout_value = b''
- stderr_value = b''
+ stdout_value = b'' # type: Union[str, bytes]
+ stderr_value = b'' # type: Union[str, bytes]
newline = "\n" if universal_newlines else b"\n"
try:
if output_stream is None:
@@ -786,16 +885,17 @@ class Git(LazyMixin):
stdout_value, stderr_value = proc.communicate()
if kill_after_timeout:
watchdog.cancel()
- if kill_check.isSet():
+ if kill_check.is_set():
stderr_value = ('Timeout: the command "%s" did not complete in %d '
'secs.' % (" ".join(redacted_command), kill_after_timeout))
if not universal_newlines:
stderr_value = stderr_value.encode(defenc)
# strip trailing "\n"
- if stdout_value.endswith(newline):
+ if stdout_value.endswith(newline): # type: ignore
stdout_value = stdout_value[:-1]
- if stderr_value.endswith(newline):
+ if stderr_value.endswith(newline): # type: ignore
stderr_value = stderr_value[:-1]
+
status = proc.returncode
else:
max_chunk_size = max_chunk_size if max_chunk_size and max_chunk_size > 0 else io.DEFAULT_BUFFER_SIZE
@@ -803,7 +903,7 @@ class Git(LazyMixin):
stdout_value = proc.stdout.read()
stderr_value = proc.stderr.read()
# strip trailing "\n"
- if stderr_value.endswith(newline):
+ if stderr_value.endswith(newline): # type: ignore
stderr_value = stderr_value[:-1]
status = proc.wait()
# END stdout handling
@@ -887,7 +987,7 @@ class Git(LazyMixin):
finally:
self.update_environment(**old_env)
- def transform_kwarg(self, name, value, split_single_char_options):
+ def transform_kwarg(self, name: str, value: Any, split_single_char_options: bool) -> List[str]:
if len(name) == 1:
if value is True:
return ["-%s" % name]
@@ -903,7 +1003,7 @@ class Git(LazyMixin):
return ["--%s=%s" % (dashify(name), value)]
return []
- def transform_kwargs(self, split_single_char_options=True, **kwargs):
+ def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any) -> List[str]:
"""Transforms Python style kwargs into git command line options."""
# Python 3.6 preserves the order of kwargs and thus has a stable
# order. For older versions sort the kwargs by the key to get a stable
@@ -922,7 +1022,7 @@ class Git(LazyMixin):
return args
@classmethod
- def __unpack_args(cls, arg_list):
+ def __unpack_args(cls, arg_list: Sequence[str]) -> List[str]:
if not isinstance(arg_list, (list, tuple)):
return [str(arg_list)]
@@ -936,7 +1036,7 @@ class Git(LazyMixin):
# END for each arg
return outlist
- def __call__(self, **kwargs):
+ def __call__(self, **kwargs: Any) -> 'Git':
"""Specify command line options to the git executable
for a subcommand call
@@ -952,7 +1052,18 @@ class Git(LazyMixin):
split_single_char_options=True, **kwargs)
return self
- def _call_process(self, method, *args, **kwargs):
+ @overload
+ def _call_process(self, method: str, *args: None, **kwargs: None
+ ) -> str:
+ ... # if no args given, execute called with all defaults
+
+ @overload
+ def _call_process(self, method: str, *args: Any, **kwargs: Any
+ ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], 'Git.AutoInterrupt']:
+ ...
+
+ def _call_process(self, method: str, *args: Any, **kwargs: Any
+ ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], 'Git.AutoInterrupt']:
"""Run the given git command with the specified arguments and return
the result as a String
@@ -980,7 +1091,9 @@ class Git(LazyMixin):
git rev-list max-count 10 --header master
- :return: Same as ``execute``"""
+ :return: Same as ``execute``
+ if no args given used execute default (esp. as_process = False, stdout_as_string = True)
+ and return str """
# Handle optional arguments prior to calling transform_kwargs
# otherwise these'll end up in args, which is bad.
exec_kwargs = {k: v for k, v in kwargs.items() if k in execute_kwargs}
@@ -989,11 +1102,12 @@ class Git(LazyMixin):
insert_after_this_arg = opts_kwargs.pop('insert_kwargs_after', None)
# Prepare the argument list
+
opt_args = self.transform_kwargs(**opts_kwargs)
ext_args = self.__unpack_args([a for a in args if a is not None])
if insert_after_this_arg is None:
- args = opt_args + ext_args
+ args_list = opt_args + ext_args
else:
try:
index = ext_args.index(insert_after_this_arg)
@@ -1001,7 +1115,7 @@ class Git(LazyMixin):
raise ValueError("Couldn't find argument '%s' in args %s to insert cmd options after"
% (insert_after_this_arg, str(ext_args))) from err
# end handle error
- args = ext_args[:index + 1] + opt_args + ext_args[index + 1:]
+ args_list = ext_args[:index + 1] + opt_args + ext_args[index + 1:]
# end handle opts_kwargs
call = [self.GIT_PYTHON_GIT_EXECUTABLE]
@@ -1015,11 +1129,11 @@ class Git(LazyMixin):
self._git_options = ()
call.append(dashify(method))
- call.extend(args)
+ call.extend(args_list)
return self.execute(call, **exec_kwargs)
- def _parse_object_header(self, header_line):
+ def _parse_object_header(self, header_line: str) -> Tuple[str, str, int]:
"""
:param header_line:
<hex_sha> type_string size_as_int
@@ -1041,20 +1155,22 @@ class Git(LazyMixin):
raise ValueError("Failed to parse header: %r" % header_line)
return (tokens[0], tokens[1], int(tokens[2]))
- def _prepare_ref(self, ref):
+ def _prepare_ref(self, ref: AnyStr) -> bytes:
# required for command to separate refs on stdin, as bytes
- refstr = ref
if isinstance(ref, bytes):
# Assume 40 bytes hexsha - bin-to-ascii for some reason returns bytes, not text
- refstr = ref.decode('ascii')
+ refstr = ref.decode('ascii') # type: str
elif not isinstance(ref, str):
refstr = str(ref) # could be ref-object
+ else:
+ refstr = ref
if not refstr.endswith("\n"):
refstr += "\n"
return refstr.encode(defenc)
- def _get_persistent_cmd(self, attr_name, cmd_name, *args, **kwargs):
+ def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any
+ ) -> Union['Git.AutoInterrupt', TBD]:
cur_val = getattr(self, attr_name)
if cur_val is not None:
return cur_val
@@ -1066,12 +1182,12 @@ class Git(LazyMixin):
setattr(self, attr_name, cmd)
return cmd
- def __get_object_header(self, cmd, ref):
+ def __get_object_header(self, cmd, ref: AnyStr) -> Tuple[str, str, int]:
cmd.stdin.write(self._prepare_ref(ref))
cmd.stdin.flush()
return self._parse_object_header(cmd.stdout.readline())
- def get_object_header(self, ref):
+ def get_object_header(self, ref: AnyStr) -> Tuple[str, str, int]:
""" Use this method to quickly examine the type and size of the object behind
the given ref.
@@ -1082,7 +1198,7 @@ class Git(LazyMixin):
cmd = self._get_persistent_cmd("cat_file_header", "cat_file", batch_check=True)
return self.__get_object_header(cmd, ref)
- def get_object_data(self, ref):
+ def get_object_data(self, ref: AnyStr) -> Tuple[str, str, int, bytes]:
""" As get_object_header, but returns object data as well
:return: (hexsha, type_string, size_as_int,data_string)
:note: not threadsafe"""
@@ -1091,7 +1207,7 @@ class Git(LazyMixin):
del(stream)
return (hexsha, typename, size, data)
- def stream_object_data(self, ref):
+ def stream_object_data(self, ref: AnyStr) -> Tuple[str, str, int, 'Git.CatFileContentStream']:
""" As get_object_header, but returns the data as a stream
:return: (hexsha, type_string, size_as_int, stream)
@@ -1100,7 +1216,7 @@ class Git(LazyMixin):
hexsha, typename, size = self.__get_object_header(cmd, ref)
return (hexsha, typename, size, self.CatFileContentStream(size, cmd.stdout))
- def clear_cache(self):
+ def clear_cache(self) -> 'Git':
"""Clear all kinds of internal caches to release resources.
Currently persistent commands will be interrupted.
diff --git a/git/compat.py b/git/compat.py
index 4ecd19a9..187618a2 100644
--- a/git/compat.py
+++ b/git/compat.py
@@ -43,10 +43,12 @@ defenc = sys.getfilesystemencoding()
@overload
def safe_decode(s: None) -> None: ...
+
@overload
-def safe_decode(s: Union[IO[str], AnyStr]) -> str: ...
+def safe_decode(s: AnyStr) -> str: ...
+
-def safe_decode(s: Union[IO[str], AnyStr, None]) -> Optional[str]:
+def safe_decode(s: Union[AnyStr, None]) -> Optional[str]:
"""Safely decodes a binary string to unicode"""
if isinstance(s, str):
return s
@@ -61,9 +63,11 @@ def safe_decode(s: Union[IO[str], AnyStr, None]) -> Optional[str]:
@overload
def safe_encode(s: None) -> None: ...
+
@overload
def safe_encode(s: AnyStr) -> bytes: ...
+
def safe_encode(s: Optional[AnyStr]) -> Optional[bytes]:
"""Safely encodes a binary string to unicode"""
if isinstance(s, str):
@@ -79,9 +83,11 @@ def safe_encode(s: Optional[AnyStr]) -> Optional[bytes]:
@overload
def win_encode(s: None) -> None: ...
+
@overload
def win_encode(s: AnyStr) -> bytes: ...
+
def win_encode(s: Optional[AnyStr]) -> Optional[bytes]:
"""Encode unicodes for process arguments on Windows."""
if isinstance(s, str):
@@ -93,7 +99,8 @@ def win_encode(s: Optional[AnyStr]) -> Optional[bytes]:
return None
-def with_metaclass(meta: Type[Any], *bases: Any) -> TBD: # type: ignore ## mypy cannot understand dynamic class creation
+# type: ignore ## mypy cannot understand dynamic class creation
+def with_metaclass(meta: Type[Any], *bases: Any) -> TBD:
"""copied from https://github.com/Byron/bcore/blob/master/src/python/butility/future.py#L15"""
class metaclass(meta): # type: ignore
@@ -105,4 +112,4 @@ def with_metaclass(meta: Type[Any], *bases: Any) -> TBD: # type: ignore ## mypy
return type.__new__(cls, name, (), d)
return meta(name, bases, d)
- return metaclass(meta.__name__ + 'Helper', None, {}) # type: ignore
+ return metaclass(meta.__name__ + 'Helper', None, {}) # type: ignore
diff --git a/git/config.py b/git/config.py
index ea7302f4..cc6fcfa4 100644
--- a/git/config.py
+++ b/git/config.py
@@ -9,7 +9,7 @@ configuration files"""
import abc
from functools import wraps
import inspect
-from io import IOBase
+from io import BufferedReader, IOBase
import logging
import os
import re
@@ -29,14 +29,16 @@ import os.path as osp
import configparser as cp
+from pathlib import Path
+
# typing-------------------------------------------------------
-from typing import TYPE_CHECKING, Tuple
+from typing import Any, Callable, IO, List, Dict, Sequence, TYPE_CHECKING, Tuple, Union, cast, overload
-from git.types import Literal
+from git.types import Literal, Lit_config_levels, PathLike, TBD
if TYPE_CHECKING:
- pass
+ from git.repo.base import Repo
# -------------------------------------------------------------
@@ -59,7 +61,7 @@ CONDITIONAL_INCLUDE_REGEXP = re.compile(r"(?<=includeIf )\"(gitdir|gitdir/i|onbr
class MetaParserBuilder(abc.ABCMeta):
"""Utlity class wrapping base-class methods into decorators that assure read-only properties"""
- def __new__(cls, name, bases, clsdict):
+ def __new__(cls, name: str, bases: TBD, clsdict: Dict[str, Any]) -> TBD:
"""
Equip all base-class methods with a needs_values decorator, and all non-const methods
with a set_dirty_and_flush_changes decorator in addition to that."""
@@ -85,23 +87,23 @@ class MetaParserBuilder(abc.ABCMeta):
return new_type
-def needs_values(func):
+def needs_values(func: Callable) -> Callable:
"""Returns method assuring we read values (on demand) before we try to access them"""
@wraps(func)
- def assure_data_present(self, *args, **kwargs):
+ def assure_data_present(self, *args: Any, **kwargs: Any) -> Any:
self.read()
return func(self, *args, **kwargs)
# END wrapper method
return assure_data_present
-def set_dirty_and_flush_changes(non_const_func):
+def set_dirty_and_flush_changes(non_const_func: Callable) -> Callable:
"""Return method that checks whether given non constant function may be called.
If so, the instance will be set dirty.
Additionally, we flush the changes right to disk"""
- def flush_changes(self, *args, **kwargs):
+ def flush_changes(self, *args: Any, **kwargs: Any) -> Any:
rval = non_const_func(self, *args, **kwargs)
self._dirty = True
self.write()
@@ -124,66 +126,65 @@ class SectionConstraint(object):
_valid_attrs_ = ("get_value", "set_value", "get", "set", "getint", "getfloat", "getboolean", "has_option",
"remove_section", "remove_option", "options")
- def __init__(self, config, section):
+ def __init__(self, config: 'GitConfigParser', section: str) -> None:
self._config = config
self._section_name = section
- def __del__(self):
+ def __del__(self) -> None:
# Yes, for some reason, we have to call it explicitly for it to work in PY3 !
# Apparently __del__ doesn't get call anymore if refcount becomes 0
# Ridiculous ... .
self._config.release()
- def __getattr__(self, attr):
+ def __getattr__(self, attr: str) -> Any:
if attr in self._valid_attrs_:
return lambda *args, **kwargs: self._call_config(attr, *args, **kwargs)
return super(SectionConstraint, self).__getattribute__(attr)
- def _call_config(self, method, *args, **kwargs):
+ def _call_config(self, method: str, *args: Any, **kwargs: Any) -> Any:
"""Call the configuration at the given method which must take a section name
as first argument"""
return getattr(self._config, method)(self._section_name, *args, **kwargs)
@property
- def config(self):
+ def config(self) -> 'GitConfigParser':
"""return: Configparser instance we constrain"""
return self._config
- def release(self):
+ def release(self) -> None:
"""Equivalent to GitConfigParser.release(), which is called on our underlying parser instance"""
return self._config.release()
- def __enter__(self):
+ def __enter__(self) -> 'SectionConstraint':
self._config.__enter__()
return self
- def __exit__(self, exception_type, exception_value, traceback):
+ def __exit__(self, exception_type: str, exception_value: str, traceback: str) -> None:
self._config.__exit__(exception_type, exception_value, traceback)
class _OMD(OrderedDict):
"""Ordered multi-dict."""
- def __setitem__(self, key, value):
+ def __setitem__(self, key: str, value: Any) -> None:
super(_OMD, self).__setitem__(key, [value])
- def add(self, key, value):
+ def add(self, key: str, value: Any) -> None:
if key not in self:
super(_OMD, self).__setitem__(key, [value])
- return
-
+ return None
super(_OMD, self).__getitem__(key).append(value)
- def setall(self, key, values):
+ def setall(self, key: str, values: Any) -> None:
super(_OMD, self).__setitem__(key, values)
- def __getitem__(self, key):
+ def __getitem__(self, key: str) -> Any:
return super(_OMD, self).__getitem__(key)[-1]
- def getlast(self, key):
+ def getlast(self, key: str) -> Any:
return super(_OMD, self).__getitem__(key)[-1]
- def setlast(self, key, value):
+ def setlast(self, key: str, value: Any) -> None:
if key not in self:
super(_OMD, self).__setitem__(key, [value])
return
@@ -191,22 +192,30 @@ class _OMD(OrderedDict):
prior = super(_OMD, self).__getitem__(key)
prior[-1] = value
- def get(self, key, default=None):
+ @overload
+ def get(self, key: str, default: None = ...) -> None:
+ ...
+
+ @overload
+ def get(self, key: str, default: Any = ...) -> Any:
+ ...
+
+ def get(self, key: str, default: Union[Any, None] = None) -> Union[Any, None]:
return super(_OMD, self).get(key, [default])[-1]
- def getall(self, key):
+ def getall(self, key: str) -> Any:
return super(_OMD, self).__getitem__(key)
- def items(self):
+ def items(self) -> List[Tuple[str, Any]]: # type: ignore ## mypy doesn't like overwriting supertype signitures
"""List of (key, last value for key)."""
return [(k, self[k]) for k in self]
- def items_all(self):
+ def items_all(self) -> List[Tuple[str, List[Any]]]:
"""List of (key, list of values for key)."""
return [(k, self.getall(k)) for k in self]
-def get_config_path(config_level: Literal['system', 'global', 'user', 'repository']) -> str:
+def get_config_path(config_level: Lit_config_levels) -> str:
# we do not support an absolute path of the gitconfig on windows ,
# use the global config instead
@@ -264,7 +273,10 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
# list of RawConfigParser methods able to change the instance
_mutating_methods_ = ("add_section", "remove_section", "remove_option", "set")
- def __init__(self, file_or_files=None, read_only=True, merge_includes=True, config_level=None, repo=None):
+ def __init__(self, file_or_files: Union[None, PathLike, IO, Sequence[Union[PathLike, IO]]] = None,
+ read_only: bool = True, merge_includes: bool = True,
+ config_level: Union[Lit_config_levels, None] = None,
+ repo: Union['Repo', None] = None) -> None:
"""Initialize a configuration reader to read the given file_or_files and to
possibly allow changes to it by setting read_only False
@@ -290,11 +302,13 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
self._proxies = self._dict()
if file_or_files is not None:
- self._file_or_files = file_or_files
+ self._file_or_files = file_or_files # type: Union[PathLike, IO, Sequence[Union[PathLike, IO]]]
else:
if config_level is None:
if read_only:
- self._file_or_files = [get_config_path(f) for f in CONFIG_LEVELS if f != 'repository']
+ self._file_or_files = [get_config_path(f) # type: ignore
+ for f in CONFIG_LEVELS # Can type f properly when 3.5 dropped
+ if f != 'repository']
else:
raise ValueError("No configuration level or configuration files specified")
else:
@@ -305,10 +319,10 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
self._is_initialized = False
self._merge_includes = merge_includes
self._repo = repo
- self._lock = None
+ self._lock = None # type: Union['LockFile', None]
self._acquire_lock()
- def _acquire_lock(self):
+ def _acquire_lock(self) -> None:
if not self._read_only:
if not self._lock:
if isinstance(self._file_or_files, (tuple, list)):
@@ -316,9 +330,11 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
"Write-ConfigParsers can operate on a single file only, multiple files have been passed")
# END single file check
- file_or_files = self._file_or_files
- if not isinstance(self._file_or_files, str):
- file_or_files = self._file_or_files.name
+ if isinstance(self._file_or_files, (str, Path)): # cannot narrow by os._pathlike until 3.5 dropped
+ file_or_files = self._file_or_files
+ else:
+ file_or_files = cast(IO, self._file_or_files).name
+
# END get filename from handle/stream
# initialize lock base - we want to write
self._lock = self.t_lock(file_or_files)
@@ -327,19 +343,19 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
self._lock._obtain_lock()
# END read-only check
- def __del__(self):
+ def __del__(self) -> None:
"""Write pending changes if required and release locks"""
# NOTE: only consistent in PY2
self.release()
- def __enter__(self):
+ def __enter__(self) -> 'GitConfigParser':
self._acquire_lock()
return self
- def __exit__(self, exception_type, exception_value, traceback):
+ def __exit__(self, exception_type, exception_value, traceback) -> None:
self.release()
- def release(self):
+ def release(self) -> None:
"""Flush changes and release the configuration write lock. This instance must not be used anymore afterwards.
In Python 3, it's required to explicitly release locks and flush changes, as __del__ is not called
deterministically anymore."""
@@ -359,13 +375,14 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
# Usually when shutting down the interpreter, don'y know how to fix this
pass
finally:
- self._lock._release_lock()
+ if self._lock is not None:
+ self._lock._release_lock()
- def optionxform(self, optionstr):
+ def optionxform(self, optionstr: str) -> str:
"""Do not transform options in any way when writing"""
return optionstr
- def _read(self, fp, fpname):
+ def _read(self, fp: Union[BufferedReader, IO[bytes]], fpname: str) -> None:
"""A direct copy of the py2.4 version of the super class's _read method
to assure it uses ordered dicts. Had to change one line to make it work.
@@ -381,7 +398,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
is_multi_line = False
e = None # None, or an exception
- def string_decode(v):
+ def string_decode(v: str) -> str:
if v[-1] == '\\':
v = v[:-1]
# end cut trailing escapes to prevent decode error
@@ -463,11 +480,12 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
if e:
raise e
- def _has_includes(self):
+ def _has_includes(self) -> Union[bool, int]:
return self._merge_includes and len(self._included_paths())
- def _included_paths(self):
- """Return all paths that must be included to configuration.
+ def _included_paths(self) -> List[Tuple[str, str]]:
+ """Return List all paths that must be included to configuration
+ as Tuples of (option, value).
"""
paths = []
@@ -500,9 +518,9 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
),
value
)
-
- if fnmatch.fnmatchcase(self._repo.git_dir, value):
- paths += self.items(section)
+ if self._repo.git_dir:
+ if fnmatch.fnmatchcase(str(self._repo.git_dir), value):
+ paths += self.items(section)
elif keyword == "onbranch":
try:
@@ -516,33 +534,38 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
return paths
- def read(self):
+ def read(self) -> None:
"""Reads the data stored in the files we have been initialized with. It will
ignore files that cannot be read, possibly leaving an empty configuration
:return: Nothing
:raise IOError: if a file cannot be handled"""
if self._is_initialized:
- return
+ return None
self._is_initialized = True
- if not isinstance(self._file_or_files, (tuple, list)):
- files_to_read = [self._file_or_files]
+ files_to_read = [""] # type: List[Union[PathLike, IO]] ## just for types until 3.5 dropped
+ if isinstance(self._file_or_files, (str)): # replace with PathLike once 3.5 dropped
+ files_to_read = [self._file_or_files] # for str, as str is a type of Sequence
+ elif not isinstance(self._file_or_files, (tuple, list, Sequence)):
+ files_to_read = [self._file_or_files] # for IO or Path
else:
- files_to_read = list(self._file_or_files)
+ files_to_read = list(self._file_or_files) # for lists or tuples
# end assure we have a copy of the paths to handle
seen = set(files_to_read)
num_read_include_files = 0
while files_to_read:
file_path = files_to_read.pop(0)
- fp = file_path
file_ok = False
- if hasattr(fp, "seek"):
- self._read(fp, fp.name)
+ if hasattr(file_path, "seek"):
+ # must be a file objectfile-object
+ file_path = cast(IO[bytes], file_path) # replace with assert to narrow type, once sure
+ self._read(file_path, file_path.name)
else:
# assume a path if it is not a file-object
+ file_path = cast(PathLike, file_path)
try:
with open(file_path, 'rb') as fp:
file_ok = True
@@ -560,6 +583,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
if not file_ok:
continue
# end ignore relative paths if we don't know the configuration file path
+ file_path = cast(PathLike, file_path)
assert osp.isabs(file_path), "Need absolute paths to be sure our cycle checks will work"
include_path = osp.join(osp.dirname(file_path), include_path)
# end make include path absolute
@@ -580,7 +604,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
self._merge_includes = False
# end
- def _write(self, fp):
+ def _write(self, fp: IO) -> None:
"""Write an .ini-format representation of the configuration state in
git compatible format"""
def write_section(name, section_dict):
@@ -599,11 +623,11 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
for name, value in self._sections.items():
write_section(name, value)
- def items(self, section_name):
+ def items(self, section_name: str) -> List[Tuple[str, str]]:
""":return: list((option, value), ...) pairs of all items in the given section"""
return [(k, v) for k, v in super(GitConfigParser, self).items(section_name) if k != '__name__']
- def items_all(self, section_name):
+ def items_all(self, section_name: str) -> List[Tuple[str, List[str]]]:
""":return: list((option, [values...]), ...) pairs of all items in the given section"""
rv = _OMD(self._defaults)
@@ -620,7 +644,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
return rv.items_all()
@needs_values
- def write(self):
+ def write(self) -> None:
"""Write changes to our file, if there are changes at all
:raise IOError: if this is a read-only writer instance or if we could not obtain
@@ -637,39 +661,44 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
if self._has_includes():
log.debug("Skipping write-back of configuration file as include files were merged in." +
"Set merge_includes=False to prevent this.")
- return
+ return None
# end
fp = self._file_or_files
# we have a physical file on disk, so get a lock
- is_file_lock = isinstance(fp, (str, IOBase))
- if is_file_lock:
+ is_file_lock = isinstance(fp, (str, IOBase)) # can't use Pathlike until 3.5 dropped
+ if is_file_lock and self._lock is not None: # else raise Error?
self._lock._obtain_lock()
+
if not hasattr(fp, "seek"):
- with open(self._file_or_files, "wb") as fp:
- self._write(fp)
+ fp = cast(PathLike, fp)
+ with open(fp, "wb") as fp_open:
+ self._write(fp_open)
else:
+ fp = cast(IO, fp)
fp.seek(0)
# make sure we do not overwrite into an existing file
if hasattr(fp, 'truncate'):
fp.truncate()
self._write(fp)
- def _assure_writable(self, method_name):
+ def _assure_writable(self, method_name: str) -> None:
if self.read_only:
raise IOError("Cannot execute non-constant method %s.%s" % (self, method_name))
- def add_section(self, section):
+ def add_section(self, section: str) -> None:
"""Assures added options will stay in order"""
return super(GitConfigParser, self).add_section(section)
@property
- def read_only(self):
+ def read_only(self) -> bool:
""":return: True if this instance may change the configuration file"""
return self._read_only
- def get_value(self, section, option, default=None):
+ def get_value(self, section: str, option: str, default: Union[int, float, str, bool, None] = None
+ ) -> Union[int, float, str, bool]:
+ # can default or return type include bool?
"""Get an option's value.
If multiple values are specified for this option in the section, the
@@ -691,7 +720,8 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
return self._string_to_value(valuestr)
- def get_values(self, section, option, default=None):
+ def get_values(self, section: str, option: str, default: Union[int, float, str, bool, None] = None
+ ) -> List[Union[int, float, str, bool]]:
"""Get an option's values.
If multiple values are specified for this option in the section, all are
@@ -713,16 +743,14 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
return [self._string_to_value(valuestr) for valuestr in lst]
- def _string_to_value(self, valuestr):
+ def _string_to_value(self, valuestr: str) -> Union[int, float, str, bool]:
types = (int, float)
for numtype in types:
try:
val = numtype(valuestr)
-
# truncated value ?
if val != float(valuestr):
continue
-
return val
except (ValueError, TypeError):
continue
@@ -742,14 +770,14 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
return valuestr
- def _value_to_string(self, value):
+ def _value_to_string(self, value: Union[str, bytes, int, float, bool]) -> str:
if isinstance(value, (int, float, bool)):
return str(value)
return force_text(value)
@needs_values
@set_dirty_and_flush_changes
- def set_value(self, section, option, value):
+ def set_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> 'GitConfigParser':
"""Sets the given option in section to the given value.
It will create the section if required, and will not throw as opposed to the default
ConfigParser 'set' method.
@@ -767,7 +795,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
@needs_values
@set_dirty_and_flush_changes
- def add_value(self, section, option, value):
+ def add_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> 'GitConfigParser':
"""Adds a value for the given option in section.
It will create the section if required, and will not throw as opposed to the default
ConfigParser 'set' method. The value becomes the new value of the option as returned
@@ -784,7 +812,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
self._sections[section].add(option, self._value_to_string(value))
return self
- def rename_section(self, section, new_name):
+ def rename_section(self, section: str, new_name: str) -> 'GitConfigParser':
"""rename the given section to new_name
:raise ValueError: if section doesn't exit
:raise ValueError: if a section with new_name does already exist
diff --git a/git/diff.py b/git/diff.py
index 5a7b189f..ca673b0c 100644
--- a/git/diff.py
+++ b/git/diff.py
@@ -22,6 +22,8 @@ if TYPE_CHECKING:
from .objects.tree import Tree
from git.repo.base import Repo
+ from subprocess import Popen
+
Lit_change_type = Literal['A', 'D', 'M', 'R', 'T']
# ------------------------------------------------------------------------
@@ -490,7 +492,7 @@ class Diff(object):
return index
@staticmethod
- def _handle_diff_line(lines_bytes: bytes, repo: 'Repo', index: TBD) -> None:
+ def _handle_diff_line(lines_bytes: bytes, repo: 'Repo', index: DiffIndex) -> None:
lines = lines_bytes.decode(defenc)
for line in lines.split(':')[1:]:
@@ -542,14 +544,14 @@ class Diff(object):
index.append(diff)
@classmethod
- def _index_from_raw_format(cls, repo: 'Repo', proc: TBD) -> DiffIndex:
+ def _index_from_raw_format(cls, repo: 'Repo', proc: 'Popen') -> 'DiffIndex':
"""Create a new DiffIndex from the given stream which must be in raw format.
:return: git.DiffIndex"""
# handles
# :100644 100644 687099101... 37c5e30c8... M .gitignore
index = DiffIndex()
- handle_process_output(proc, lambda bytes: cls._handle_diff_line(
- bytes, repo, index), None, finalize_process, decode_streams=False)
+ handle_process_output(proc, lambda byt: cls._handle_diff_line(byt, repo, index),
+ None, finalize_process, decode_streams=False)
return index
diff --git a/git/exc.py b/git/exc.py
index 6e646921..1e0caf4e 100644
--- a/git/exc.py
+++ b/git/exc.py
@@ -11,7 +11,7 @@ from git.compat import safe_decode
# typing ----------------------------------------------------
-from typing import IO, List, Optional, Tuple, Union, TYPE_CHECKING
+from typing import List, Optional, Tuple, Union, TYPE_CHECKING
from git.types import PathLike
if TYPE_CHECKING:
@@ -49,8 +49,9 @@ class CommandError(GitError):
_msg = "Cmd('%s') failed%s"
def __init__(self, command: Union[List[str], Tuple[str, ...], str],
- status: Union[str, None, Exception] = None,
- stderr: Optional[IO[str]] = None, stdout: Optional[IO[str]] = None) -> None:
+ status: Union[str, int, None, Exception] = None,
+ stderr: Union[bytes, str, None] = None,
+ stdout: Union[bytes, str, None] = None) -> None:
if not isinstance(command, (tuple, list)):
command = command.split()
self.command = command
@@ -91,9 +92,9 @@ class GitCommandError(CommandError):
""" Thrown if execution of the git command fails with non-zero status code. """
def __init__(self, command: Union[List[str], Tuple[str, ...], str],
- status: Union[str, None, Exception] = None,
- stderr: Optional[IO[str]] = None,
- stdout: Optional[IO[str]] = None,
+ status: Union[str, int, None, Exception] = None,
+ stderr: Union[bytes, str, None] = None,
+ stdout: Union[bytes, str, None] = None,
) -> None:
super(GitCommandError, self).__init__(command, status, stderr, stdout)
@@ -139,7 +140,7 @@ class HookExecutionError(CommandError):
via standard output"""
def __init__(self, command: Union[List[str], Tuple[str, ...], str], status: Optional[str],
- stderr: Optional[IO[str]] = None, stdout: Optional[IO[str]] = None) -> None:
+ stderr: Optional[str] = None, stdout: Optional[str] = None) -> None:
super(HookExecutionError, self).__init__(command, status, stderr, stdout)
self._msg = "Hook('%s') failed%s"
diff --git a/git/repo/base.py b/git/repo/base.py
index 94c6e30b..ce5f6bd0 100644
--- a/git/repo/base.py
+++ b/git/repo/base.py
@@ -34,7 +34,7 @@ import gitdb
# typing ------------------------------------------------------
-from git.types import TBD, PathLike, Literal
+from git.types import TBD, PathLike, Lit_config_levels
from typing import (Any, BinaryIO, Callable, Dict,
Iterator, List, Mapping, Optional,
TextIO, Tuple, Type, Union,
@@ -45,7 +45,6 @@ if TYPE_CHECKING: # only needed for types
from git.refs.symbolic import SymbolicReference
from git.objects import TagObject, Blob, Tree # NOQA: F401
-Lit_config_levels = Literal['system', 'global', 'user', 'repository']
# -----------------------------------------------------------
diff --git a/git/types.py b/git/types.py
index 40d4f788..91d35b56 100644
--- a/git/types.py
+++ b/git/types.py
@@ -12,8 +12,6 @@ else:
from typing_extensions import Final, Literal # noqa: F401
-TBD = Any
-
if sys.version_info[:2] < (3, 6):
# os.PathLike (PEP-519) only got introduced with Python 3.6
PathLike = str
@@ -22,4 +20,8 @@ elif sys.version_info[:2] < (3, 9):
PathLike = Union[str, os.PathLike]
elif sys.version_info[:2] >= (3, 9):
# os.PathLike only becomes subscriptable from Python 3.9 onwards
- PathLike = Union[str, os.PathLike[str]]
+ PathLike = Union[str, 'os.PathLike[str]'] # forward ref as pylance complains unless editing with py3.9+
+
+TBD = Any
+
+Lit_config_levels = Literal['system', 'global', 'user', 'repository']
diff --git a/git/util.py b/git/util.py
index 558be1e4..403e66a6 100644
--- a/git/util.py
+++ b/git/util.py
@@ -22,11 +22,13 @@ from urllib.parse import urlsplit, urlunsplit
# typing ---------------------------------------------------------
from typing import (Any, AnyStr, BinaryIO, Callable, Dict, Generator, IO, Iterator, List,
- Optional, Pattern, Sequence, Tuple, Union, cast, TYPE_CHECKING)
+ Optional, Pattern, Sequence, Tuple, Union, cast, TYPE_CHECKING, overload)
+
+
if TYPE_CHECKING:
from git.remote import Remote
from git.repo.base import Repo
-from .types import PathLike, TBD
+from .types import PathLike, TBD, Literal
# ---------------------------------------------------------------------
@@ -281,7 +283,8 @@ _cygpath_parsers = (
def cygpath(path: PathLike) -> PathLike:
"""Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment."""
- path = str(path) # ensure is str and not AnyPath
+ path = str(path) # ensure is str and not AnyPath.
+ #Fix to use Paths when 3.5 dropped. or to be just str if only for urls?
if not path.startswith(('/cygdrive', '//')):
for regex, parser, recurse in _cygpath_parsers:
match = regex.match(path)
@@ -314,11 +317,23 @@ def decygpath(path: PathLike) -> str:
_is_cygwin_cache = {} # type: Dict[str, Optional[bool]]
+@overload
+def is_cygwin_git(git_executable: None) -> Literal[False]:
+ ...
+
+
+@overload
def is_cygwin_git(git_executable: PathLike) -> bool:
+ ...
+
+
+def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool:
if not is_win:
return False
- #from subprocess import check_output
+ if git_executable is None:
+ return False
+
git_executable = str(git_executable)
is_cygwin = _is_cygwin_cache.get(git_executable) # type: Optional[bool]
if is_cygwin is None:
@@ -348,18 +363,31 @@ def get_user_id() -> str:
return "%s@%s" % (getpass.getuser(), platform.node())
-def finalize_process(proc: TBD, **kwargs: Any) -> None:
+def finalize_process(proc: subprocess.Popen, **kwargs: Any) -> None:
"""Wait for the process (clone, fetch, pull or push) and handle its errors accordingly"""
## TODO: No close proc-streams??
proc.wait(**kwargs)
-def expand_path(p: PathLike, expand_vars: bool = True) -> Optional[PathLike]:
+@overload
+def expand_path(p: None, expand_vars: bool = ...) -> None:
+ ...
+
+
+@overload
+def expand_path(p: PathLike, expand_vars: bool = ...) -> str:
+ ...
+
+
+def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[str]:
try:
- p = osp.expanduser(p)
- if expand_vars:
- p = osp.expandvars(p)
- return osp.normpath(osp.abspath(p))
+ if p is not None:
+ p_out = osp.expanduser(p)
+ if expand_vars:
+ p_out = osp.expandvars(p_out)
+ return osp.normpath(osp.abspath(p_out))
+ else:
+ return None
except Exception:
return None