diff options
Diffstat (limited to 'git')
| -rw-r--r-- | git/__init__.py | 12 | ||||
| -rw-r--r-- | git/cmd.py | 340 | ||||
| -rw-r--r-- | git/compat.py | 74 | ||||
| -rw-r--r-- | git/config.py | 200 | ||||
| -rw-r--r-- | git/db.py | 30 | ||||
| -rw-r--r-- | git/diff.py | 205 | ||||
| -rw-r--r-- | git/exc.py | 49 | ||||
| m--------- | git/ext/gitdb | 0 | ||||
| -rw-r--r-- | git/index/base.py | 232 | ||||
| -rw-r--r-- | git/index/fun.py | 85 | ||||
| -rw-r--r-- | git/index/typ.py | 55 | ||||
| -rw-r--r-- | git/index/util.py | 27 | ||||
| -rw-r--r-- | git/objects/__init__.py | 4 | ||||
| -rw-r--r-- | git/objects/base.py | 3 | ||||
| -rw-r--r-- | git/refs/log.py | 4 | ||||
| -rw-r--r-- | git/refs/reference.py | 4 | ||||
| -rw-r--r-- | git/refs/symbolic.py | 2 | ||||
| -rw-r--r-- | git/refs/tag.py | 3 | ||||
| -rw-r--r-- | git/remote.py | 198 | ||||
| -rw-r--r-- | git/repo/base.py | 117 | ||||
| -rw-r--r-- | git/repo/fun.py | 4 | ||||
| -rw-r--r-- | git/types.py | 25 | ||||
| -rw-r--r-- | git/util.py | 323 |
23 files changed, 1265 insertions, 731 deletions
diff --git a/git/__init__.py b/git/__init__.py index 53440830..ae9254a2 100644 --- a/git/__init__.py +++ b/git/__init__.py @@ -5,18 +5,20 @@ # the BSD License: http://www.opensource.org/licenses/bsd-license.php # flake8: noqa #@PydevCodeAnalysisIgnore +from git.exc import * # @NoMove @IgnorePep8 import inspect import os import sys - import os.path as osp +from typing import Optional +from git.types import PathLike __version__ = 'git' #{ Initialization -def _init_externals(): +def _init_externals() -> None: """Initialize external projects by putting them into the path""" if __version__ == 'git' and 'PYOXIDIZER' not in os.environ: sys.path.insert(1, osp.join(osp.dirname(__file__), 'ext', 'gitdb')) @@ -29,13 +31,13 @@ def _init_externals(): #} END initialization + ################# _init_externals() ################# #{ Imports -from git.exc import * # @NoMove @IgnorePep8 try: from git.config import GitConfigParser # @NoMove @IgnorePep8 from git.objects import * # @NoMove @IgnorePep8 @@ -65,7 +67,8 @@ __all__ = [name for name, obj in locals().items() #{ Initialize git executable path GIT_OK = None -def refresh(path=None): + +def refresh(path: Optional[PathLike] = None) -> None: """Convenience method for setting the git executable path.""" global GIT_OK GIT_OK = False @@ -78,6 +81,7 @@ def refresh(path=None): GIT_OK = True #} END initialize git executable path + ################# try: refresh() @@ -27,7 +27,7 @@ from git.compat import ( is_win, ) from git.exc import CommandError -from git.util import is_cygwin_git, cygpath, expand_path +from git.util import is_cygwin_git, cygpath, expand_path, remove_password_if_present from .exc import ( GitCommandError, @@ -38,6 +38,20 @@ from .util import ( stream_copy, ) +# typing --------------------------------------------------------------------------- + +from typing import (Any, AnyStr, BinaryIO, Callable, Dict, IO, List, Mapping, + Sequence, TYPE_CHECKING, TextIO, Tuple, Union, cast, overload) + +from git.types import PathLike, Literal, TBD + +if TYPE_CHECKING: + from git.repo.base import Repo + from git.diff import DiffIndex + + +# --------------------------------------------------------------------------------- + execute_kwargs = {'istream', 'with_extended_output', 'with_exceptions', 'as_process', 'stdout_as_string', 'output_stream', 'with_stdout', 'kill_after_timeout', @@ -55,8 +69,17 @@ __all__ = ('Git',) # Documentation ## @{ -def handle_process_output(process, stdout_handler, stderr_handler, - finalizer=None, decode_streams=True): +def handle_process_output(process: subprocess.Popen, + stdout_handler: Union[None, + Callable[[AnyStr], None], + Callable[[List[AnyStr]], None], + Callable[[bytes, 'Repo', 'DiffIndex'], None]], + stderr_handler: Union[None, + Callable[[AnyStr], None], + Callable[[List[AnyStr]], None]], + finalizer: Union[None, + Callable[[subprocess.Popen], None]] = None, + decode_streams: bool = True) -> None: """Registers for notifications to learn that process output is ready to read, and dispatches lines to the respective line handlers. This function returns once the finalizer returns @@ -73,16 +96,20 @@ def handle_process_output(process, stdout_handler, stderr_handler, or if decoding must happen later (i.e. for Diffs). """ # Use 2 "pump" threads and wait for both to finish. - def pump_stream(cmdline, name, stream, is_decode, handler): + def pump_stream(cmdline: str, name: str, stream: Union[BinaryIO, TextIO], is_decode: bool, + handler: Union[None, Callable[[Union[bytes, str]], None]]) -> None: try: for line in stream: if handler: if is_decode: - line = line.decode(defenc) - handler(line) + assert isinstance(line, bytes) + line_str = line.decode(defenc) + handler(line_str) + else: + handler(line) except Exception as ex: - log.error("Pumping %r of cmd(%s) failed due to: %r", name, cmdline, ex) - raise CommandError(['<%s-pump>' % name] + cmdline, ex) from ex + log.error("Pumping %r of cmd(%s) failed due to: %r", name, remove_password_if_present(cmdline), ex) + raise CommandError(['<%s-pump>' % name] + remove_password_if_present(cmdline), ex) from ex finally: stream.close() @@ -101,7 +128,7 @@ def handle_process_output(process, stdout_handler, stderr_handler, for name, stream, handler in pumps: t = threading.Thread(target=pump_stream, args=(cmdline, name, stream, decode_streams, handler)) - t.setDaemon(True) + t.daemon = True t.start() threads.append(t) @@ -112,17 +139,20 @@ def handle_process_output(process, stdout_handler, stderr_handler, if finalizer: return finalizer(process) + else: + return None -def dashify(string): +def dashify(string: str) -> str: return string.replace('_', '-') -def slots_to_dict(self, exclude=()): +def slots_to_dict(self, exclude: Sequence[str] = ()) -> Dict[str, Any]: + # annotate self.__slots__ as Tuple[str, ...] once 3.5 dropped return {s: getattr(self, s) for s in self.__slots__ if s not in exclude} -def dict_to_slots_and__excluded_are_none(self, d, excluded=()): +def dict_to_slots_and__excluded_are_none(self, d: Mapping[str, Any], excluded: Sequence[str] = ()) -> None: for k, v in d.items(): setattr(self, k, v) for k in excluded: @@ -136,7 +166,7 @@ CREATE_NO_WINDOW = 0x08000000 ## CREATE_NEW_PROCESS_GROUP is needed to allow killing it afterwards, # see https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal -PROC_CREATIONFLAGS = (CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP +PROC_CREATIONFLAGS = (CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined] if is_win else 0) @@ -161,10 +191,10 @@ class Git(LazyMixin): _excluded_ = ('cat_file_all', 'cat_file_header', '_version_info') - def __getstate__(self): + def __getstate__(self) -> Dict[str, Any]: return slots_to_dict(self, exclude=self._excluded_) - def __setstate__(self, d): + def __setstate__(self, d) -> None: dict_to_slots_and__excluded_are_none(self, d, excluded=self._excluded_) # CONFIGURATION @@ -188,7 +218,7 @@ class Git(LazyMixin): # the top level __init__ @classmethod - def refresh(cls, path=None): + def refresh(cls, path: Union[None, PathLike] = None) -> bool: """This gets called by the refresh function (see the top level __init__). """ @@ -208,7 +238,7 @@ class Git(LazyMixin): # - a GitCommandNotFound error is spawned by ourselves # - a PermissionError is spawned if the git executable provided # cannot be executed for whatever reason - + has_git = False try: cls().version() @@ -303,11 +333,21 @@ class Git(LazyMixin): return has_git @classmethod - def is_cygwin(cls): + def is_cygwin(cls) -> bool: return is_cygwin_git(cls.GIT_PYTHON_GIT_EXECUTABLE) + @overload + @classmethod + def polish_url(cls, url: str, is_cygwin: Literal[False] = ...) -> str: + ... + + @overload @classmethod - def polish_url(cls, url, is_cygwin=None): + def polish_url(cls, url: PathLike, is_cygwin: Union[None, bool] = None) -> str: + ... + + @classmethod + def polish_url(cls, url: PathLike, is_cygwin: Union[None, bool] = None) -> PathLike: if is_cygwin is None: is_cygwin = cls.is_cygwin() @@ -324,7 +364,6 @@ class Git(LazyMixin): if url.startswith('~'): url = os.path.expanduser(url) url = url.replace("\\\\", "\\").replace("\\", "/") - return url class AutoInterrupt(object): @@ -337,11 +376,11 @@ class Git(LazyMixin): __slots__ = ("proc", "args") - def __init__(self, proc, args): + def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None: self.proc = proc self.args = args - def __del__(self): + def __del__(self) -> None: if self.proc is None: return @@ -357,13 +396,13 @@ class Git(LazyMixin): # did the process finish already so we have a return code ? try: if proc.poll() is not None: - return + return None except OSError as ex: log.info("Ignored error after process had died: %r", ex) # can be that nothing really exists anymore ... if os is None or getattr(os, 'kill', None) is None: - return + return None # try to kill it try: @@ -380,10 +419,11 @@ class Git(LazyMixin): call(("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)), shell=True) # END exception handling - def __getattr__(self, attr): + def __getattr__(self, attr: str) -> Any: return getattr(self.proc, attr) - def wait(self, stderr=b''): # TODO: Bad choice to mimic `proc.wait()` but with different args. + # TODO: Bad choice to mimic `proc.wait()` but with different args. + def wait(self, stderr: Union[None, bytes] = b'') -> int: """Wait for the process and return its status code. :param stderr: Previously read value of stderr, in case stderr is already closed. @@ -393,20 +433,22 @@ class Git(LazyMixin): stderr = b'' stderr = force_bytes(data=stderr, encoding='utf-8') - status = self.proc.wait() + if self.proc is not None: + status = self.proc.wait() - def read_all_from_possibly_closed_stream(stream): - try: - return stderr + force_bytes(stream.read()) - except ValueError: - return stderr or b'' - - if status != 0: - errstr = read_all_from_possibly_closed_stream(self.proc.stderr) - log.debug('AutoInterrupt wait stderr: %r' % (errstr,)) - raise GitCommandError(self.args, status, errstr) + def read_all_from_possibly_closed_stream(stream): + try: + return stderr + force_bytes(stream.read()) + except ValueError: + return stderr or b'' + + if status != 0: + errstr = read_all_from_possibly_closed_stream(self.proc.stderr) + log.debug('AutoInterrupt wait stderr: %r' % (errstr,)) + raise GitCommandError(remove_password_if_present(self.args), status, errstr) # END status handling return status + # END auto interrupt class CatFileContentStream(object): @@ -420,7 +462,7 @@ class Git(LazyMixin): __slots__ = ('_stream', '_nbr', '_size') - def __init__(self, size, stream): + def __init__(self, size: int, stream: IO[bytes]) -> None: self._stream = stream self._size = size self._nbr = 0 # num bytes read @@ -431,7 +473,7 @@ class Git(LazyMixin): stream.read(1) # END handle empty streams - def read(self, size=-1): + def read(self, size: int = -1) -> bytes: bytes_left = self._size - self._nbr if bytes_left == 0: return b'' @@ -451,7 +493,7 @@ class Git(LazyMixin): # END finish reading return data - def readline(self, size=-1): + def readline(self, size: int = -1) -> bytes: if self._nbr == self._size: return b'' @@ -473,7 +515,7 @@ class Git(LazyMixin): return data - def readlines(self, size=-1): + def readlines(self, size: int = -1) -> List[bytes]: if self._nbr == self._size: return [] @@ -494,20 +536,20 @@ class Git(LazyMixin): return out # skipcq: PYL-E0301 - def __iter__(self): + def __iter__(self) -> 'Git.CatFileContentStream': return self - - def __next__(self): + + def __next__(self) -> bytes: return self.next() - def next(self): + def next(self) -> bytes: line = self.readline() if not line: raise StopIteration return line - def __del__(self): + def __del__(self) -> None: bytes_left = self._size - self._nbr if bytes_left: # read and discard - seeking is impossible within a stream @@ -515,7 +557,7 @@ class Git(LazyMixin): self._stream.read(bytes_left + 1) # END handle incomplete read - def __init__(self, working_dir=None): + def __init__(self, working_dir: Union[None, PathLike] = None): """Initialize this instance with: :param working_dir: @@ -525,17 +567,17 @@ class Git(LazyMixin): .git directory in case of bare repositories.""" super(Git, self).__init__() self._working_dir = expand_path(working_dir) - self._git_options = () - self._persistent_git_options = [] + self._git_options = () # type: Union[List[str], Tuple[str, ...]] + self._persistent_git_options = [] # type: List[str] # Extra environment variables to pass to git commands - self._environment = {} + self._environment = {} # type: Dict[str, str] # cached command slots self.cat_file_header = None self.cat_file_all = None - def __getattr__(self, name): + def __getattr__(self, name: str) -> Any: """A convenience method as it allows to call the command as if it was an object. :return: Callable object that will execute call _call_process with your arguments.""" @@ -543,7 +585,7 @@ class Git(LazyMixin): return LazyMixin.__getattr__(self, name) return lambda *args, **kwargs: self._call_process(name, *args, **kwargs) - def set_persistent_git_options(self, **kwargs): + def set_persistent_git_options(self, **kwargs: Any) -> None: """Specify command line options to the git executable for subsequent subcommand calls @@ -557,43 +599,94 @@ class Git(LazyMixin): self._persistent_git_options = self.transform_kwargs( split_single_char_options=True, **kwargs) - def _set_cache_(self, attr): + def _set_cache_(self, attr: str) -> None: if attr == '_version_info': # We only use the first 4 numbers, as everything else could be strings in fact (on windows) - version_numbers = self._call_process('version').split(' ')[2] - self._version_info = tuple(int(n) for n in version_numbers.split('.')[:4] if n.isdigit()) + process_version = self._call_process('version') # should be as default *args and **kwargs used + version_numbers = process_version.split(' ')[2] + + self._version_info = tuple( + int(n) for n in version_numbers.split('.')[:4] if n.isdigit() + ) # type: Tuple[int, int, int, int] # type: ignore else: super(Git, self)._set_cache_(attr) # END handle version info @property - def working_dir(self): + def working_dir(self) -> Union[None, PathLike]: """:return: Git directory we are working on""" return self._working_dir @property - def version_info(self): + def version_info(self) -> Tuple[int, int, int, int]: """ :return: tuple(int, int, int, int) tuple with integers representing the major, minor and additional version numbers as parsed from git version. This value is generated on demand and is cached""" return self._version_info - def execute(self, command, - istream=None, - with_extended_output=False, - with_exceptions=True, - as_process=False, - output_stream=None, - stdout_as_string=True, - kill_after_timeout=None, - with_stdout=True, - universal_newlines=False, - shell=None, - env=None, - max_chunk_size=io.DEFAULT_BUFFER_SIZE, - **subprocess_kwargs - ): + @overload + def execute(self, + command: Union[str, Sequence[Any]], + *, + as_process: Literal[True] + ) -> 'AutoInterrupt': + ... + + @overload + def execute(self, + command: Union[str, Sequence[Any]], + *, + as_process: Literal[False] = False, + stdout_as_string: Literal[True] + ) -> Union[str, Tuple[int, str, str]]: + ... + + @overload + def execute(self, + command: Union[str, Sequence[Any]], + *, + as_process: Literal[False] = False, + stdout_as_string: Literal[False] = False + ) -> Union[bytes, Tuple[int, bytes, str]]: + ... + + @overload + def execute(self, + command: Union[str, Sequence[Any]], + *, + with_extended_output: Literal[False], + as_process: Literal[False], + stdout_as_string: Literal[True] + ) -> str: + ... + + @overload + def execute(self, + command: Union[str, Sequence[Any]], + *, + with_extended_output: Literal[False], + as_process: Literal[False], + stdout_as_string: Literal[False] + ) -> bytes: + ... + + def execute(self, + command: Union[str, Sequence[Any]], + istream: Union[None, BinaryIO] = None, + with_extended_output: bool = False, + with_exceptions: bool = True, + as_process: bool = False, + output_stream: Union[None, BinaryIO] = None, + stdout_as_string: bool = True, + kill_after_timeout: Union[None, int] = None, + with_stdout: bool = True, + universal_newlines: bool = False, + shell: Union[None, bool] = None, + env: Union[None, Mapping[str, str]] = None, + max_chunk_size: int = io.DEFAULT_BUFFER_SIZE, + **subprocess_kwargs: Any + ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]: """Handles executing the command on the shell and consumes and returns the returned information (stdout) @@ -637,7 +730,7 @@ class Git(LazyMixin): :param env: A dictionary of environment variables to be passed to `subprocess.Popen`. - + :param max_chunk_size: Maximum number of bytes in one chunk of data passed to the output_stream in one invocation of write() method. If the given number is not positive then @@ -681,8 +774,10 @@ class Git(LazyMixin): :note: If you add additional keyword arguments to the signature of this method, you must update the execute_kwargs tuple housed in this module.""" + # Remove password for the command if present + redacted_command = remove_password_if_present(command) if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != 'full' or as_process): - log.info(' '.join(command)) + log.info(' '.join(redacted_command)) # Allow the user to have the command executed in their working dir. cwd = self._working_dir or os.getcwd() @@ -703,7 +798,7 @@ class Git(LazyMixin): if is_win: cmd_not_found_exception = OSError if kill_after_timeout: - raise GitCommandError(command, '"kill_after_timeout" feature is not supported on Windows.') + raise GitCommandError(redacted_command, '"kill_after_timeout" feature is not supported on Windows.') else: if sys.version_info[0] > 2: cmd_not_found_exception = FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable @@ -718,7 +813,7 @@ class Git(LazyMixin): if istream: istream_ok = "<valid stream>" log.debug("Popen(%s, cwd=%s, universal_newlines=%s, shell=%s, istream=%s)", - command, cwd, universal_newlines, shell, istream_ok) + redacted_command, cwd, universal_newlines, shell, istream_ok) try: proc = Popen(command, env=env, @@ -733,22 +828,28 @@ class Git(LazyMixin): creationflags=PROC_CREATIONFLAGS, **subprocess_kwargs ) + except cmd_not_found_exception as err: - raise GitCommandNotFound(command, err) from err + raise GitCommandNotFound(redacted_command, err) from err + else: + proc = cast(Popen, proc) + proc.stdout = cast(BinaryIO, proc.stdout) + proc.stderr = cast(BinaryIO, proc.stderr) if as_process: return self.AutoInterrupt(proc, command) - def _kill_process(pid): + def _kill_process(pid: int) -> None: """ Callback method to kill a process. """ p = Popen(['ps', '--ppid', str(pid)], stdout=PIPE, creationflags=PROC_CREATIONFLAGS) child_pids = [] - for line in p.stdout: - if len(line.split()) > 0: - local_pid = (line.split())[0] - if local_pid.isdigit(): - child_pids.append(int(local_pid)) + if p.stdout is not None: + for line in p.stdout: + if len(line.split()) > 0: + local_pid = (line.split())[0] + if local_pid.isdigit(): + child_pids.append(int(local_pid)) try: # Windows does not have SIGKILL, so use SIGTERM instead sig = getattr(signal, 'SIGKILL', signal.SIGTERM) @@ -772,8 +873,8 @@ class Git(LazyMixin): # Wait for the process to return status = 0 - stdout_value = b'' - stderr_value = b'' + stdout_value = b'' # type: Union[str, bytes] + stderr_value = b'' # type: Union[str, bytes] newline = "\n" if universal_newlines else b"\n" try: if output_stream is None: @@ -782,16 +883,17 @@ class Git(LazyMixin): stdout_value, stderr_value = proc.communicate() if kill_after_timeout: watchdog.cancel() - if kill_check.isSet(): + if kill_check.is_set(): stderr_value = ('Timeout: the command "%s" did not complete in %d ' - 'secs.' % (" ".join(command), kill_after_timeout)) + 'secs.' % (" ".join(redacted_command), kill_after_timeout)) if not universal_newlines: stderr_value = stderr_value.encode(defenc) # strip trailing "\n" - if stdout_value.endswith(newline): + if stdout_value.endswith(newline): # type: ignore stdout_value = stdout_value[:-1] - if stderr_value.endswith(newline): + if stderr_value.endswith(newline): # type: ignore stderr_value = stderr_value[:-1] + status = proc.returncode else: max_chunk_size = max_chunk_size if max_chunk_size and max_chunk_size > 0 else io.DEFAULT_BUFFER_SIZE @@ -799,7 +901,7 @@ class Git(LazyMixin): stdout_value = proc.stdout.read() stderr_value = proc.stderr.read() # strip trailing "\n" - if stderr_value.endswith(newline): + if stderr_value.endswith(newline): # type: ignore stderr_value = stderr_value[:-1] status = proc.wait() # END stdout handling @@ -808,7 +910,7 @@ class Git(LazyMixin): proc.stderr.close() if self.GIT_PYTHON_TRACE == 'full': - cmdstr = " ".join(command) + cmdstr = " ".join(redacted_command) def as_text(stdout_value): return not output_stream and safe_decode(stdout_value) or '<OUTPUT_STREAM>' @@ -824,7 +926,7 @@ class Git(LazyMixin): # END handle debug printing if with_exceptions and status != 0: - raise GitCommandError(command, status, stderr_value, stdout_value) + raise GitCommandError(redacted_command, status, stderr_value, stdout_value) if isinstance(stdout_value, bytes) and stdout_as_string: # could also be output_stream stdout_value = safe_decode(stdout_value) @@ -883,7 +985,7 @@ class Git(LazyMixin): finally: self.update_environment(**old_env) - def transform_kwarg(self, name, value, split_single_char_options): + def transform_kwarg(self, name: str, value: Any, split_single_char_options: bool) -> List[str]: if len(name) == 1: if value is True: return ["-%s" % name] @@ -899,7 +1001,7 @@ class Git(LazyMixin): return ["--%s=%s" % (dashify(name), value)] return [] - def transform_kwargs(self, split_single_char_options=True, **kwargs): + def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any) -> List[str]: """Transforms Python style kwargs into git command line options.""" args = [] for k, v in kwargs.items(): @@ -911,7 +1013,7 @@ class Git(LazyMixin): return args @classmethod - def __unpack_args(cls, arg_list): + def __unpack_args(cls, arg_list: Sequence[str]) -> List[str]: if not isinstance(arg_list, (list, tuple)): return [str(arg_list)] @@ -925,7 +1027,7 @@ class Git(LazyMixin): # END for each arg return outlist - def __call__(self, **kwargs): + def __call__(self, **kwargs: Any) -> 'Git': """Specify command line options to the git executable for a subcommand call @@ -941,7 +1043,18 @@ class Git(LazyMixin): split_single_char_options=True, **kwargs) return self - def _call_process(self, method, *args, **kwargs): + @overload + def _call_process(self, method: str, *args: None, **kwargs: None + ) -> str: + ... # if no args given, execute called with all defaults + + @overload + def _call_process(self, method: str, *args: Any, **kwargs: Any + ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], 'Git.AutoInterrupt']: + ... + + def _call_process(self, method: str, *args: Any, **kwargs: Any + ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], 'Git.AutoInterrupt']: """Run the given git command with the specified arguments and return the result as a String @@ -969,7 +1082,9 @@ class Git(LazyMixin): git rev-list max-count 10 --header master - :return: Same as ``execute``""" + :return: Same as ``execute`` + if no args given used execute default (esp. as_process = False, stdout_as_string = True) + and return str """ # Handle optional arguments prior to calling transform_kwargs # otherwise these'll end up in args, which is bad. exec_kwargs = {k: v for k, v in kwargs.items() if k in execute_kwargs} @@ -978,11 +1093,12 @@ class Git(LazyMixin): insert_after_this_arg = opts_kwargs.pop('insert_kwargs_after', None) # Prepare the argument list + opt_args = self.transform_kwargs(**opts_kwargs) ext_args = self.__unpack_args([a for a in args if a is not None]) if insert_after_this_arg is None: - args = opt_args + ext_args + args_list = opt_args + ext_args else: try: index = ext_args.index(insert_after_this_arg) @@ -990,7 +1106,7 @@ class Git(LazyMixin): raise ValueError("Couldn't find argument '%s' in args %s to insert cmd options after" % (insert_after_this_arg, str(ext_args))) from err # end handle error - args = ext_args[:index + 1] + opt_args + ext_args[index + 1:] + args_list = ext_args[:index + 1] + opt_args + ext_args[index + 1:] # end handle opts_kwargs call = [self.GIT_PYTHON_GIT_EXECUTABLE] @@ -1004,11 +1120,11 @@ class Git(LazyMixin): self._git_options = () call.append(dashify(method)) - call.extend(args) + call.extend(args_list) return self.execute(call, **exec_kwargs) - def _parse_object_header(self, header_line): + def _parse_object_header(self, header_line: str) -> Tuple[str, str, int]: """ :param header_line: <hex_sha> type_string size_as_int @@ -1030,20 +1146,22 @@ class Git(LazyMixin): raise ValueError("Failed to parse header: %r" % header_line) return (tokens[0], tokens[1], int(tokens[2])) - def _prepare_ref(self, ref): + def _prepare_ref(self, ref: AnyStr) -> bytes: # required for command to separate refs on stdin, as bytes - refstr = ref if isinstance(ref, bytes): # Assume 40 bytes hexsha - bin-to-ascii for some reason returns bytes, not text - refstr = ref.decode('ascii') + refstr = ref.decode('ascii') # type: str elif not isinstance(ref, str): refstr = str(ref) # could be ref-object + else: + refstr = ref if not refstr.endswith("\n"): refstr += "\n" return refstr.encode(defenc) - def _get_persistent_cmd(self, attr_name, cmd_name, *args, **kwargs): + def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any + ) -> Union['Git.AutoInterrupt', TBD]: cur_val = getattr(self, attr_name) if cur_val is not None: return cur_val @@ -1055,12 +1173,12 @@ class Git(LazyMixin): setattr(self, attr_name, cmd) return cmd - def __get_object_header(self, cmd, ref): + def __get_object_header(self, cmd, ref: AnyStr) -> Tuple[str, str, int]: cmd.stdin.write(self._prepare_ref(ref)) cmd.stdin.flush() return self._parse_object_header(cmd.stdout.readline()) - def get_object_header(self, ref): + def get_object_header(self, ref: str) -> Tuple[str, str, int]: """ Use this method to quickly examine the type and size of the object behind the given ref. @@ -1071,7 +1189,7 @@ class Git(LazyMixin): cmd = self._get_persistent_cmd("cat_file_header", "cat_file", batch_check=True) return self.__get_object_header(cmd, ref) - def get_object_data(self, ref): + def get_object_data(self, ref: str) -> Tuple[str, str, int, bytes]: """ As get_object_header, but returns object data as well :return: (hexsha, type_string, size_as_int,data_string) :note: not threadsafe""" @@ -1080,7 +1198,7 @@ class Git(LazyMixin): del(stream) return (hexsha, typename, size, data) - def stream_object_data(self, ref): + def stream_object_data(self, ref: str) -> Tuple[str, str, int, 'Git.CatFileContentStream']: """ As get_object_header, but returns the data as a stream :return: (hexsha, type_string, size_as_int, stream) @@ -1089,7 +1207,7 @@ class Git(LazyMixin): hexsha, typename, size = self.__get_object_header(cmd, ref) return (hexsha, typename, size, self.CatFileContentStream(size, cmd.stdout)) - def clear_cache(self): + def clear_cache(self) -> 'Git': """Clear all kinds of internal caches to release resources. Currently persistent commands will be interrupted. diff --git a/git/compat.py b/git/compat.py index de8a238b..187618a2 100644 --- a/git/compat.py +++ b/git/compat.py @@ -11,40 +11,84 @@ import locale import os import sys - from gitdb.utils.encoding import ( force_bytes, # @UnusedImport force_text # @UnusedImport ) +# typing -------------------------------------------------------------------- + +from typing import ( + Any, + AnyStr, + Dict, + IO, + Optional, + Tuple, + Type, + Union, + overload, +) +from git.types import TBD + +# --------------------------------------------------------------------------- + -is_win = (os.name == 'nt') +is_win = (os.name == 'nt') # type: bool is_posix = (os.name == 'posix') is_darwin = (os.name == 'darwin') defenc = sys.getfilesystemencoding() -def safe_decode(s): +@overload +def safe_decode(s: None) -> None: ... + + +@overload +def safe_decode(s: AnyStr) -> str: ... + + +def safe_decode(s: Union[AnyStr, None]) -> Optional[str]: """Safely decodes a binary string to unicode""" if isinstance(s, str): return s elif isinstance(s, bytes): return s.decode(defenc, 'surrogateescape') - elif s is not None: + elif s is None: + return None + else: raise TypeError('Expected bytes or text, but got %r' % (s,)) -def safe_encode(s): - """Safely decodes a binary string to unicode""" +@overload +def safe_encode(s: None) -> None: ... + + +@overload +def safe_encode(s: AnyStr) -> bytes: ... + + +def safe_encode(s: Optional[AnyStr]) -> Optional[bytes]: + """Safely encodes a binary string to unicode""" if isinstance(s, str): return s.encode(defenc) elif isinstance(s, bytes): return s - elif s is not None: + elif s is None: + return None + else: raise TypeError('Expected bytes or text, but got %r' % (s,)) -def win_encode(s): +@overload +def win_encode(s: None) -> None: ... + + +@overload +def win_encode(s: AnyStr) -> bytes: ... + + +def win_encode(s: Optional[AnyStr]) -> Optional[bytes]: """Encode unicodes for process arguments on Windows.""" if isinstance(s, str): return s.encode(locale.getpreferredencoding(False)) @@ -52,16 +96,20 @@ def win_encode(s): return s elif s is not None: raise TypeError('Expected bytes or text, but got %r' % (s,)) + return None -def with_metaclass(meta, *bases): +# type: ignore ## mypy cannot understand dynamic class creation +def with_metaclass(meta: Type[Any], *bases: Any) -> TBD: """copied from https://github.com/Byron/bcore/blob/master/src/python/butility/future.py#L15""" - class metaclass(meta): + + class metaclass(meta): # type: ignore __call__ = type.__call__ - __init__ = type.__init__ + __init__ = type.__init__ # type: ignore - def __new__(cls, name, nbases, d): + def __new__(cls, name: str, nbases: Optional[Tuple[int, ...]], d: Dict[str, Any]) -> TBD: if nbases is None: return type.__new__(cls, name, (), d) return meta(name, bases, d) - return metaclass(meta.__name__ + 'Helper', None, {}) + + return metaclass(meta.__name__ + 'Helper', None, {}) # type: ignore diff --git a/git/config.py b/git/config.py index 9f09efe2..cc6fcfa4 100644 --- a/git/config.py +++ b/git/config.py @@ -9,7 +9,7 @@ configuration files""" import abc from functools import wraps import inspect -from io import IOBase +from io import BufferedReader, IOBase import logging import os import re @@ -22,12 +22,25 @@ from git.compat import ( with_metaclass, is_win, ) + from git.util import LockFile import os.path as osp import configparser as cp +from pathlib import Path + +# typing------------------------------------------------------- + +from typing import Any, Callable, IO, List, Dict, Sequence, TYPE_CHECKING, Tuple, Union, cast, overload + +from git.types import Literal, Lit_config_levels, PathLike, TBD + +if TYPE_CHECKING: + from git.repo.base import Repo + +# ------------------------------------------------------------- __all__ = ('GitConfigParser', 'SectionConstraint') @@ -37,7 +50,8 @@ log.addHandler(logging.NullHandler()) # invariants # represents the configuration level of a configuration file -CONFIG_LEVELS = ("system", "user", "global", "repository") +CONFIG_LEVELS = ("system", "user", "global", "repository" + ) # type: Tuple[Literal['system'], Literal['user'], Literal['global'], Literal['repository']] # Section pattern to detect conditional includes. # https://git-scm.com/docs/git-config#_conditional_includes @@ -47,7 +61,7 @@ CONDITIONAL_INCLUDE_REGEXP = re.compile(r"(?<=includeIf )\"(gitdir|gitdir/i|onbr class MetaParserBuilder(abc.ABCMeta): """Utlity class wrapping base-class methods into decorators that assure read-only properties""" - def __new__(cls, name, bases, clsdict): + def __new__(cls, name: str, bases: TBD, clsdict: Dict[str, Any]) -> TBD: """ Equip all base-class methods with a needs_values decorator, and all non-const methods with a set_dirty_and_flush_changes decorator in addition to that.""" @@ -73,23 +87,23 @@ class MetaParserBuilder(abc.ABCMeta): return new_type -def needs_values(func): +def needs_values(func: Callable) -> Callable: """Returns method assuring we read values (on demand) before we try to access them""" @wraps(func) - def assure_data_present(self, *args, **kwargs): + def assure_data_present(self, *args: Any, **kwargs: Any) -> Any: self.read() return func(self, *args, **kwargs) # END wrapper method return assure_data_present -def set_dirty_and_flush_changes(non_const_func): +def set_dirty_and_flush_changes(non_const_func: Callable) -> Callable: """Return method that checks whether given non constant function may be called. If so, the instance will be set dirty. Additionally, we flush the changes right to disk""" - def flush_changes(self, *args, **kwargs): + def flush_changes(self, *args: Any, **kwargs: Any) -> Any: rval = non_const_func(self, *args, **kwargs) self._dirty = True self.write() @@ -112,66 +126,65 @@ class SectionConstraint(object): _valid_attrs_ = ("get_value", "set_value", "get", "set", "getint", "getfloat", "getboolean", "has_option", "remove_section", "remove_option", "options") - def __init__(self, config, section): + def __init__(self, config: 'GitConfigParser', section: str) -> None: self._config = config self._section_name = section - def __del__(self): + def __del__(self) -> None: # Yes, for some reason, we have to call it explicitly for it to work in PY3 ! # Apparently __del__ doesn't get call anymore if refcount becomes 0 # Ridiculous ... . self._config.release() - def __getattr__(self, attr): + def __getattr__(self, attr: str) -> Any: if attr in self._valid_attrs_: return lambda *args, **kwargs: self._call_config(attr, *args, **kwargs) return super(SectionConstraint, self).__getattribute__(attr) - def _call_config(self, method, *args, **kwargs): + def _call_config(self, method: str, *args: Any, **kwargs: Any) -> Any: """Call the configuration at the given method which must take a section name as first argument""" return getattr(self._config, method)(self._section_name, *args, **kwargs) @property - def config(self): + def config(self) -> 'GitConfigParser': """return: Configparser instance we constrain""" return self._config - def release(self): + def release(self) -> None: """Equivalent to GitConfigParser.release(), which is called on our underlying parser instance""" return self._config.release() - def __enter__(self): + def __enter__(self) -> 'SectionConstraint': self._config.__enter__() return self - def __exit__(self, exception_type, exception_value, traceback): + def __exit__(self, exception_type: str, exception_value: str, traceback: str) -> None: self._config.__exit__(exception_type, exception_value, traceback) class _OMD(OrderedDict): """Ordered multi-dict.""" - def __setitem__(self, key, value): + def __setitem__(self, key: str, value: Any) -> None: super(_OMD, self).__setitem__(key, [value]) - def add(self, key, value): + def add(self, key: str, value: Any) -> None: if key not in self: super(_OMD, self).__setitem__(key, [value]) - return - + return None super(_OMD, self).__getitem__(key).append(value) - def setall(self, key, values): + def setall(self, key: str, values: Any) -> None: super(_OMD, self).__setitem__(key, values) - def __getitem__(self, key): + def __getitem__(self, key: str) -> Any: return super(_OMD, self).__getitem__(key)[-1] - def getlast(self, key): + def getlast(self, key: str) -> Any: return super(_OMD, self).__getitem__(key)[-1] - def setlast(self, key, value): + def setlast(self, key: str, value: Any) -> None: if key not in self: super(_OMD, self).__setitem__(key, [value]) return @@ -179,22 +192,30 @@ class _OMD(OrderedDict): prior = super(_OMD, self).__getitem__(key) prior[-1] = value - def get(self, key, default=None): + @overload + def get(self, key: str, default: None = ...) -> None: + ... + + @overload + def get(self, key: str, default: Any = ...) -> Any: + ... + + def get(self, key: str, default: Union[Any, None] = None) -> Union[Any, None]: return super(_OMD, self).get(key, [default])[-1] - def getall(self, key): + def getall(self, key: str) -> Any: return super(_OMD, self).__getitem__(key) - def items(self): + def items(self) -> List[Tuple[str, Any]]: # type: ignore ## mypy doesn't like overwriting supertype signitures """List of (key, last value for key).""" return [(k, self[k]) for k in self] - def items_all(self): + def items_all(self) -> List[Tuple[str, List[Any]]]: """List of (key, list of values for key).""" return [(k, self.getall(k)) for k in self] -def get_config_path(config_level): +def get_config_path(config_level: Lit_config_levels) -> str: # we do not support an absolute path of the gitconfig on windows , # use the global config instead @@ -214,7 +235,7 @@ def get_config_path(config_level): raise ValueError("Invalid configuration level: %r" % config_level) -class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, object)): +class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, object)): # type: ignore ## mypy does not understand dynamic class creation # noqa: E501 """Implements specifics required to read git style configuration files. @@ -252,7 +273,10 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje # list of RawConfigParser methods able to change the instance _mutating_methods_ = ("add_section", "remove_section", "remove_option", "set") - def __init__(self, file_or_files=None, read_only=True, merge_includes=True, config_level=None, repo=None): + def __init__(self, file_or_files: Union[None, PathLike, IO, Sequence[Union[PathLike, IO]]] = None, + read_only: bool = True, merge_includes: bool = True, + config_level: Union[Lit_config_levels, None] = None, + repo: Union['Repo', None] = None) -> None: """Initialize a configuration reader to read the given file_or_files and to possibly allow changes to it by setting read_only False @@ -278,11 +302,13 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje self._proxies = self._dict() if file_or_files is not None: - self._file_or_files = file_or_files + self._file_or_files = file_or_files # type: Union[PathLike, IO, Sequence[Union[PathLike, IO]]] else: if config_level is None: if read_only: - self._file_or_files = [get_config_path(f) for f in CONFIG_LEVELS if f != 'repository'] + self._file_or_files = [get_config_path(f) # type: ignore + for f in CONFIG_LEVELS # Can type f properly when 3.5 dropped + if f != 'repository'] else: raise ValueError("No configuration level or configuration files specified") else: @@ -293,10 +319,10 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje self._is_initialized = False self._merge_includes = merge_includes self._repo = repo - self._lock = None + self._lock = None # type: Union['LockFile', None] self._acquire_lock() - def _acquire_lock(self): + def _acquire_lock(self) -> None: if not self._read_only: if not self._lock: if isinstance(self._file_or_files, (tuple, list)): @@ -304,9 +330,11 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje "Write-ConfigParsers can operate on a single file only, multiple files have been passed") # END single file check - file_or_files = self._file_or_files - if not isinstance(self._file_or_files, str): - file_or_files = self._file_or_files.name + if isinstance(self._file_or_files, (str, Path)): # cannot narrow by os._pathlike until 3.5 dropped + file_or_files = self._file_or_files + else: + file_or_files = cast(IO, self._file_or_files).name + # END get filename from handle/stream # initialize lock base - we want to write self._lock = self.t_lock(file_or_files) @@ -315,19 +343,19 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje self._lock._obtain_lock() # END read-only check - def __del__(self): + def __del__(self) -> None: """Write pending changes if required and release locks""" # NOTE: only consistent in PY2 self.release() - def __enter__(self): + def __enter__(self) -> 'GitConfigParser': self._acquire_lock() return self - def __exit__(self, exception_type, exception_value, traceback): + def __exit__(self, exception_type, exception_value, traceback) -> None: self.release() - def release(self): + def release(self) -> None: """Flush changes and release the configuration write lock. This instance must not be used anymore afterwards. In Python 3, it's required to explicitly release locks and flush changes, as __del__ is not called deterministically anymore.""" @@ -347,13 +375,14 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje # Usually when shutting down the interpreter, don'y know how to fix this pass finally: - self._lock._release_lock() + if self._lock is not None: + self._lock._release_lock() - def optionxform(self, optionstr): + def optionxform(self, optionstr: str) -> str: """Do not transform options in any way when writing""" return optionstr - def _read(self, fp, fpname): + def _read(self, fp: Union[BufferedReader, IO[bytes]], fpname: str) -> None: """A direct copy of the py2.4 version of the super class's _read method to assure it uses ordered dicts. Had to change one line to make it work. @@ -369,7 +398,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje is_multi_line = False e = None # None, or an exception - def string_decode(v): + def string_decode(v: str) -> str: if v[-1] == '\\': v = v[:-1] # end cut trailing escapes to prevent decode error @@ -451,11 +480,12 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje if e: raise e - def _has_includes(self): + def _has_includes(self) -> Union[bool, int]: return self._merge_includes and len(self._included_paths()) - def _included_paths(self): - """Return all paths that must be included to configuration. + def _included_paths(self) -> List[Tuple[str, str]]: + """Return List all paths that must be included to configuration + as Tuples of (option, value). """ paths = [] @@ -488,9 +518,9 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje ), value ) - - if fnmatch.fnmatchcase(self._repo.git_dir, value): - paths += self.items(section) + if self._repo.git_dir: + if fnmatch.fnmatchcase(str(self._repo.git_dir), value): + paths += self.items(section) elif keyword == "onbranch": try: @@ -504,33 +534,38 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje return paths - def read(self): + def read(self) -> None: """Reads the data stored in the files we have been initialized with. It will ignore files that cannot be read, possibly leaving an empty configuration :return: Nothing :raise IOError: if a file cannot be handled""" if self._is_initialized: - return + return None self._is_initialized = True - if not isinstance(self._file_or_files, (tuple, list)): - files_to_read = [self._file_or_files] + files_to_read = [""] # type: List[Union[PathLike, IO]] ## just for types until 3.5 dropped + if isinstance(self._file_or_files, (str)): # replace with PathLike once 3.5 dropped + files_to_read = [self._file_or_files] # for str, as str is a type of Sequence + elif not isinstance(self._file_or_files, (tuple, list, Sequence)): + files_to_read = [self._file_or_files] # for IO or Path else: - files_to_read = list(self._file_or_files) + files_to_read = list(self._file_or_files) # for lists or tuples # end assure we have a copy of the paths to handle seen = set(files_to_read) num_read_include_files = 0 while files_to_read: file_path = files_to_read.pop(0) - fp = file_path file_ok = False - if hasattr(fp, "seek"): - self._read(fp, fp.name) + if hasattr(file_path, "seek"): + # must be a file objectfile-object + file_path = cast(IO[bytes], file_path) # replace with assert to narrow type, once sure + self._read(file_path, file_path.name) else: # assume a path if it is not a file-object + file_path = cast(PathLike, file_path) try: with open(file_path, 'rb') as fp: file_ok = True @@ -548,6 +583,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje if not file_ok: continue # end ignore relative paths if we don't know the configuration file path + file_path = cast(PathLike, file_path) assert osp.isabs(file_path), "Need absolute paths to be sure our cycle checks will work" include_path = osp.join(osp.dirname(file_path), include_path) # end make include path absolute @@ -568,7 +604,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje self._merge_includes = False # end - def _write(self, fp): + def _write(self, fp: IO) -> None: """Write an .ini-format representation of the configuration state in git compatible format""" def write_section(name, section_dict): @@ -587,11 +623,11 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje for name, value in self._sections.items(): write_section(name, value) - def items(self, section_name): + def items(self, section_name: str) -> List[Tuple[str, str]]: """:return: list((option, value), ...) pairs of all items in the given section""" return [(k, v) for k, v in super(GitConfigParser, self).items(section_name) if k != '__name__'] - def items_all(self, section_name): + def items_all(self, section_name: str) -> List[Tuple[str, List[str]]]: """:return: list((option, [values...]), ...) pairs of all items in the given section""" rv = _OMD(self._defaults) @@ -608,7 +644,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje return rv.items_all() @needs_values - def write(self): + def write(self) -> None: """Write changes to our file, if there are changes at all :raise IOError: if this is a read-only writer instance or if we could not obtain @@ -625,39 +661,44 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje if self._has_includes(): log.debug("Skipping write-back of configuration file as include files were merged in." + "Set merge_includes=False to prevent this.") - return + return None # end fp = self._file_or_files # we have a physical file on disk, so get a lock - is_file_lock = isinstance(fp, (str, IOBase)) - if is_file_lock: + is_file_lock = isinstance(fp, (str, IOBase)) # can't use Pathlike until 3.5 dropped + if is_file_lock and self._lock is not None: # else raise Error? self._lock._obtain_lock() + if not hasattr(fp, "seek"): - with open(self._file_or_files, "wb") as fp: - self._write(fp) + fp = cast(PathLike, fp) + with open(fp, "wb") as fp_open: + self._write(fp_open) else: + fp = cast(IO, fp) fp.seek(0) # make sure we do not overwrite into an existing file if hasattr(fp, 'truncate'): fp.truncate() self._write(fp) - def _assure_writable(self, method_name): + def _assure_writable(self, method_name: str) -> None: if self.read_only: raise IOError("Cannot execute non-constant method %s.%s" % (self, method_name)) - def add_section(self, section): + def add_section(self, section: str) -> None: """Assures added options will stay in order""" return super(GitConfigParser, self).add_section(section) @property - def read_only(self): + def read_only(self) -> bool: """:return: True if this instance may change the configuration file""" return self._read_only - def get_value(self, section, option, default=None): + def get_value(self, section: str, option: str, default: Union[int, float, str, bool, None] = None + ) -> Union[int, float, str, bool]: + # can default or return type include bool? """Get an option's value. If multiple values are specified for this option in the section, the @@ -679,7 +720,8 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje return self._string_to_value(valuestr) - def get_values(self, section, option, default=None): + def get_values(self, section: str, option: str, default: Union[int, float, str, bool, None] = None + ) -> List[Union[int, float, str, bool]]: """Get an option's values. If multiple values are specified for this option in the section, all are @@ -701,16 +743,14 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje return [self._string_to_value(valuestr) for valuestr in lst] - def _string_to_value(self, valuestr): + def _string_to_value(self, valuestr: str) -> Union[int, float, str, bool]: types = (int, float) for numtype in types: try: val = numtype(valuestr) - # truncated value ? if val != float(valuestr): continue - return val except (ValueError, TypeError): continue @@ -730,14 +770,14 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje return valuestr - def _value_to_string(self, value): + def _value_to_string(self, value: Union[str, bytes, int, float, bool]) -> str: if isinstance(value, (int, float, bool)): return str(value) return force_text(value) @needs_values @set_dirty_and_flush_changes - def set_value(self, section, option, value): + def set_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> 'GitConfigParser': """Sets the given option in section to the given value. It will create the section if required, and will not throw as opposed to the default ConfigParser 'set' method. @@ -755,7 +795,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje @needs_values @set_dirty_and_flush_changes - def add_value(self, section, option, value): + def add_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> 'GitConfigParser': """Adds a value for the given option in section. It will create the section if required, and will not throw as opposed to the default ConfigParser 'set' method. The value becomes the new value of the option as returned @@ -772,7 +812,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje self._sections[section].add(option, self._value_to_string(value)) return self - def rename_section(self, section, new_name): + def rename_section(self, section: str, new_name: str) -> 'GitConfigParser': """rename the given section to new_name :raise ValueError: if section doesn't exit :raise ValueError: if a section with new_name does already exist @@ -7,15 +7,21 @@ from gitdb.base import ( from gitdb.db import GitDB # @UnusedImport from gitdb.db import LooseObjectDB -from .exc import ( - GitCommandError, - BadObject -) +from gitdb.exc import BadObject +from git.exc import GitCommandError +# typing------------------------------------------------- -__all__ = ('GitCmdObjectDB', 'GitDB') +from typing import TYPE_CHECKING +from git.types import PathLike + +if TYPE_CHECKING: + from git.cmd import Git + -# class GitCmdObjectDB(CompoundDB, ObjectDBW): +# -------------------------------------------------------- + +__all__ = ('GitCmdObjectDB', 'GitDB') class GitCmdObjectDB(LooseObjectDB): @@ -28,23 +34,23 @@ class GitCmdObjectDB(LooseObjectDB): have packs and the other implementations """ - def __init__(self, root_path, git): + def __init__(self, root_path: PathLike, git: 'Git') -> None: """Initialize this instance with the root and a git command""" super(GitCmdObjectDB, self).__init__(root_path) self._git = git - def info(self, sha): - hexsha, typename, size = self._git.get_object_header(bin_to_hex(sha)) + def info(self, binsha: bytes) -> OInfo: + hexsha, typename, size = self._git.get_object_header(bin_to_hex(binsha)) return OInfo(hex_to_bin(hexsha), typename, size) - def stream(self, sha): + def stream(self, binsha: bytes) -> OStream: """For now, all lookup is done by git itself""" - hexsha, typename, size, stream = self._git.stream_object_data(bin_to_hex(sha)) + hexsha, typename, size, stream = self._git.stream_object_data(bin_to_hex(binsha)) return OStream(hex_to_bin(hexsha), typename, size, stream) # { Interface - def partial_to_complete_sha_hex(self, partial_hexsha): + def partial_to_complete_sha_hex(self, partial_hexsha: str) -> bytes: """:return: Full binary 20 byte sha from the given partial hexsha :raise AmbiguousObjectName: :raise BadObject: diff --git a/git/diff.py b/git/diff.py index a9dc4b57..a40fc244 100644 --- a/git/diff.py +++ b/git/diff.py @@ -3,8 +3,8 @@ # # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php -import re +import re from git.cmd import handle_process_output from git.compat import defenc from git.util import finalize_process, hex_to_bin @@ -13,22 +13,37 @@ from .objects.blob import Blob from .objects.util import mode_str_to_int +# typing ------------------------------------------------------------------ + +from typing import Any, Iterator, List, Match, Optional, Tuple, Type, Union, TYPE_CHECKING +from git.types import PathLike, TBD, Final, Literal + +if TYPE_CHECKING: + from .objects.tree import Tree + from git.repo.base import Repo + + from subprocess import Popen + +Lit_change_type = Literal['A', 'D', 'M', 'R', 'T'] + +# ------------------------------------------------------------------------ + __all__ = ('Diffable', 'DiffIndex', 'Diff', 'NULL_TREE') # Special object to compare against the empty tree in diffs -NULL_TREE = object() +NULL_TREE = object() # type: Final[object] _octal_byte_re = re.compile(b'\\\\([0-9]{3})') -def _octal_repl(matchobj): +def _octal_repl(matchobj: Match) -> bytes: value = matchobj.group(1) value = int(value, 8) value = bytes(bytearray((value,))) return value -def decode_path(path, has_ab_prefix=True): +def decode_path(path: bytes, has_ab_prefix: bool = True) -> Optional[bytes]: if path == b'/dev/null': return None @@ -60,7 +75,7 @@ class Diffable(object): class Index(object): pass - def _process_diff_args(self, args): + def _process_diff_args(self, args: List[Union[str, 'Diffable', object]]) -> List[Union[str, 'Diffable', object]]: """ :return: possibly altered version of the given args list. @@ -68,7 +83,9 @@ class Diffable(object): Subclasses can use it to alter the behaviour of the superclass""" return args - def diff(self, other=Index, paths=None, create_patch=False, **kwargs): + def diff(self, other: Union[Type[Index], Type['Tree'], object, None, str] = Index, + paths: Union[PathLike, List[PathLike], Tuple[PathLike, ...], None] = None, + create_patch: bool = False, **kwargs: Any) -> 'DiffIndex': """Creates diffs between two items being trees, trees and index or an index and the working tree. It will detect renames automatically. @@ -99,7 +116,7 @@ class Diffable(object): :note: On a bare repository, 'other' needs to be provided as Index or as as Tree/Commit, or a git command error will occur""" - args = [] + args = [] # type: List[Union[str, Diffable, object]] args.append("--abbrev=40") # we need full shas args.append("--full-index") # get full index paths, not only filenames @@ -117,6 +134,9 @@ class Diffable(object): if paths is not None and not isinstance(paths, (tuple, list)): paths = [paths] + if hasattr(self, 'repo'): # else raise Error? + self.repo = self.repo # type: 'Repo' + diff_cmd = self.repo.git.diff if other is self.Index: args.insert(0, '--cached') @@ -163,7 +183,7 @@ class DiffIndex(list): # T = Changed in the type change_type = ("A", "C", "D", "R", "M", "T") - def iter_change_type(self, change_type): + def iter_change_type(self, change_type: Lit_change_type) -> Iterator['Diff']: """ :return: iterator yielding Diff instances that match the given change_type @@ -180,7 +200,7 @@ class DiffIndex(list): if change_type not in self.change_type: raise ValueError("Invalid change type: %s" % change_type) - for diff in self: + for diff in self: # type: 'Diff' if diff.change_type == change_type: yield diff elif change_type == "A" and diff.new_file: @@ -255,22 +275,21 @@ class Diff(object): "new_file", "deleted_file", "copied_file", "raw_rename_from", "raw_rename_to", "diff", "change_type", "score") - def __init__(self, repo, a_rawpath, b_rawpath, a_blob_id, b_blob_id, a_mode, - b_mode, new_file, deleted_file, copied_file, raw_rename_from, - raw_rename_to, diff, change_type, score): - - self.a_mode = a_mode - self.b_mode = b_mode + def __init__(self, repo: 'Repo', + a_rawpath: Optional[bytes], b_rawpath: Optional[bytes], + a_blob_id: Union[str, bytes, None], b_blob_id: Union[str, bytes, None], + a_mode: Union[bytes, str, None], b_mode: Union[bytes, str, None], + new_file: bool, deleted_file: bool, copied_file: bool, + raw_rename_from: Optional[bytes], raw_rename_to: Optional[bytes], + diff: Union[str, bytes, None], change_type: Optional[str], score: Optional[int]) -> None: assert a_rawpath is None or isinstance(a_rawpath, bytes) assert b_rawpath is None or isinstance(b_rawpath, bytes) self.a_rawpath = a_rawpath self.b_rawpath = b_rawpath - if self.a_mode: - self.a_mode = mode_str_to_int(self.a_mode) - if self.b_mode: - self.b_mode = mode_str_to_int(self.b_mode) + self.a_mode = mode_str_to_int(a_mode) if a_mode else None + self.b_mode = mode_str_to_int(b_mode) if b_mode else None # Determine whether this diff references a submodule, if it does then # we need to overwrite "repo" to the corresponding submodule's repo instead @@ -305,27 +324,27 @@ class Diff(object): self.change_type = change_type self.score = score - def __eq__(self, other): + def __eq__(self, other: object) -> bool: for name in self.__slots__: if getattr(self, name) != getattr(other, name): return False # END for each name return True - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not (self == other) - def __hash__(self): + def __hash__(self) -> int: return hash(tuple(getattr(self, n) for n in self.__slots__)) - def __str__(self): - h = "%s" + def __str__(self) -> str: + h = "%s" # type: str if self.a_blob: h %= self.a_blob.path elif self.b_blob: h %= self.b_blob.path - msg = '' + msg = '' # type: str line = None # temp line line_length = 0 # line length for b, n in zip((self.a_blob, self.b_blob), ('lhs', 'rhs')): @@ -354,7 +373,7 @@ class Diff(object): if self.diff: msg += '\n---' try: - msg += self.diff.decode(defenc) + msg += self.diff.decode(defenc) if isinstance(self.diff, bytes) else self.diff except UnicodeDecodeError: msg += 'OMITTED BINARY DATA' # end handle encoding @@ -368,36 +387,36 @@ class Diff(object): return res @property - def a_path(self): + def a_path(self) -> Optional[str]: return self.a_rawpath.decode(defenc, 'replace') if self.a_rawpath else None @property - def b_path(self): + def b_path(self) -> Optional[str]: return self.b_rawpath.decode(defenc, 'replace') if self.b_rawpath else None @property - def rename_from(self): + def rename_from(self) -> Optional[str]: return self.raw_rename_from.decode(defenc, 'replace') if self.raw_rename_from else None @property - def rename_to(self): + def rename_to(self) -> Optional[str]: return self.raw_rename_to.decode(defenc, 'replace') if self.raw_rename_to else None @property - def renamed(self): + def renamed(self) -> bool: """:returns: True if the blob of our diff has been renamed :note: This property is deprecated, please use ``renamed_file`` instead. """ return self.renamed_file @property - def renamed_file(self): + def renamed_file(self) -> bool: """:returns: True if the blob of our diff has been renamed """ return self.rename_from != self.rename_to @classmethod - def _pick_best_path(cls, path_match, rename_match, path_fallback_match): + def _pick_best_path(cls, path_match: bytes, rename_match: bytes, path_fallback_match: bytes) -> Optional[bytes]: if path_match: return decode_path(path_match) @@ -410,21 +429,23 @@ class Diff(object): return None @classmethod - def _index_from_patch_format(cls, repo, proc): + def _index_from_patch_format(cls, repo: 'Repo', proc: TBD) -> DiffIndex: """Create a new DiffIndex from the given text which must be in patch format :param repo: is the repository we are operating on - it is required :param stream: result of 'git diff' as a stream (supporting file protocol) :return: git.DiffIndex """ ## FIXME: Here SLURPING raw, need to re-phrase header-regexes linewise. - text = [] - handle_process_output(proc, text.append, None, finalize_process, decode_streams=False) + text_list = [] # type: List[bytes] + handle_process_output(proc, text_list.append, None, finalize_process, decode_streams=False) # for now, we have to bake the stream - text = b''.join(text) + text = b''.join(text_list) index = DiffIndex() previous_header = None header = None + a_path, b_path = None, None # for mypy + a_mode, b_mode = None, None # for mypy for _header in cls.re_header.finditer(text): a_path_fallback, b_path_fallback, \ old_mode, new_mode, \ @@ -464,71 +485,73 @@ class Diff(object): previous_header = _header header = _header # end for each header we parse - if index: + if index and header: index[-1].diff = text[header.end():] # end assign last diff return index + @staticmethod + def _handle_diff_line(lines_bytes: bytes, repo: 'Repo', index: DiffIndex) -> None: + lines = lines_bytes.decode(defenc) + + for line in lines.split(':')[1:]: + meta, _, path = line.partition('\x00') + path = path.rstrip('\x00') + a_blob_id, b_blob_id = None, None # Type: Optional[str] + old_mode, new_mode, a_blob_id, b_blob_id, _change_type = meta.split(None, 4) + # Change type can be R100 + # R: status letter + # 100: score (in case of copy and rename) + change_type = _change_type[0] + score_str = ''.join(_change_type[1:]) + score = int(score_str) if score_str.isdigit() else None + path = path.strip() + a_path = path.encode(defenc) + b_path = path.encode(defenc) + deleted_file = False + new_file = False + copied_file = False + rename_from = None + rename_to = None + + # NOTE: We cannot conclude from the existence of a blob to change type + # as diffs with the working do not have blobs yet + if change_type == 'D': + b_blob_id = None # Optional[str] + deleted_file = True + elif change_type == 'A': + a_blob_id = None + new_file = True + elif change_type == 'C': + copied_file = True + a_path_str, b_path_str = path.split('\x00', 1) + a_path = a_path_str.encode(defenc) + b_path = b_path_str.encode(defenc) + elif change_type == 'R': + a_path_str, b_path_str = path.split('\x00', 1) + a_path = a_path_str.encode(defenc) + b_path = b_path_str.encode(defenc) + rename_from, rename_to = a_path, b_path + elif change_type == 'T': + # Nothing to do + pass + # END add/remove handling + + diff = Diff(repo, a_path, b_path, a_blob_id, b_blob_id, old_mode, new_mode, + new_file, deleted_file, copied_file, rename_from, rename_to, + '', change_type, score) + index.append(diff) + @classmethod - def _index_from_raw_format(cls, repo, proc): + def _index_from_raw_format(cls, repo: 'Repo', proc: 'Popen') -> 'DiffIndex': """Create a new DiffIndex from the given stream which must be in raw format. :return: git.DiffIndex""" # handles # :100644 100644 687099101... 37c5e30c8... M .gitignore index = DiffIndex() - - def handle_diff_line(lines): - lines = lines.decode(defenc) - - for line in lines.split(':')[1:]: - meta, _, path = line.partition('\x00') - path = path.rstrip('\x00') - old_mode, new_mode, a_blob_id, b_blob_id, _change_type = meta.split(None, 4) - # Change type can be R100 - # R: status letter - # 100: score (in case of copy and rename) - change_type = _change_type[0] - score_str = ''.join(_change_type[1:]) - score = int(score_str) if score_str.isdigit() else None - path = path.strip() - a_path = path.encode(defenc) - b_path = path.encode(defenc) - deleted_file = False - new_file = False - copied_file = False - rename_from = None - rename_to = None - - # NOTE: We cannot conclude from the existence of a blob to change type - # as diffs with the working do not have blobs yet - if change_type == 'D': - b_blob_id = None - deleted_file = True - elif change_type == 'A': - a_blob_id = None - new_file = True - elif change_type == 'C': - copied_file = True - a_path, b_path = path.split('\x00', 1) - a_path = a_path.encode(defenc) - b_path = b_path.encode(defenc) - elif change_type == 'R': - a_path, b_path = path.split('\x00', 1) - a_path = a_path.encode(defenc) - b_path = b_path.encode(defenc) - rename_from, rename_to = a_path, b_path - elif change_type == 'T': - # Nothing to do - pass - # END add/remove handling - - diff = Diff(repo, a_path, b_path, a_blob_id, b_blob_id, old_mode, new_mode, - new_file, deleted_file, copied_file, rename_from, rename_to, - '', change_type, score) - index.append(diff) - - handle_process_output(proc, handle_diff_line, None, finalize_process, decode_streams=False) + handle_process_output(proc, lambda byt: cls._handle_diff_line(byt, repo, index), + None, finalize_process, decode_streams=False) return index @@ -5,9 +5,20 @@ # the BSD License: http://www.opensource.org/licenses/bsd-license.php """ Module containing all exceptions thrown throughout the git package, """ +from gitdb.exc import BadName # NOQA @UnusedWildImport skipcq: PYL-W0401, PYL-W0614 from gitdb.exc import * # NOQA @UnusedWildImport skipcq: PYL-W0401, PYL-W0614 from git.compat import safe_decode +# typing ---------------------------------------------------- + +from typing import List, Sequence, Tuple, Union, TYPE_CHECKING +from git.types import PathLike + +if TYPE_CHECKING: + from git.repo.base import Repo + +# ------------------------------------------------------------------ + class GitError(Exception): """ Base class for all package exceptions """ @@ -37,7 +48,10 @@ class CommandError(GitError): #: "'%s' failed%s" _msg = "Cmd('%s') failed%s" - def __init__(self, command, status=None, stderr=None, stdout=None): + def __init__(self, command: Union[List[str], Tuple[str, ...], str], + status: Union[str, int, None, Exception] = None, + stderr: Union[bytes, str, None] = None, + stdout: Union[bytes, str, None] = None) -> None: if not isinstance(command, (tuple, list)): command = command.split() self.command = command @@ -55,10 +69,12 @@ class CommandError(GitError): self._cmd = safe_decode(command[0]) self._cmdline = ' '.join(safe_decode(i) for i in command) self._cause = status and " due to: %s" % status or "!" - self.stdout = stdout and "\n stdout: '%s'" % safe_decode(stdout) or '' - self.stderr = stderr and "\n stderr: '%s'" % safe_decode(stderr) or '' + stdout_decode = safe_decode(stdout) + stderr_decode = safe_decode(stderr) + self.stdout = stdout_decode and "\n stdout: '%s'" % stdout_decode or '' + self.stderr = stderr_decode and "\n stderr: '%s'" % stderr_decode or '' - def __str__(self): + def __str__(self) -> str: return (self._msg + "\n cmdline: %s%s%s") % ( self._cmd, self._cause, self._cmdline, self.stdout, self.stderr) @@ -66,7 +82,8 @@ class CommandError(GitError): class GitCommandNotFound(CommandError): """Thrown if we cannot find the `git` executable in the PATH or at the path given by the GIT_PYTHON_GIT_EXECUTABLE environment variable""" - def __init__(self, command, cause): + + def __init__(self, command: Union[List[str], Tuple[str], str], cause: Union[str, Exception]) -> None: super(GitCommandNotFound, self).__init__(command, cause) self._msg = "Cmd('%s') not found%s" @@ -74,7 +91,11 @@ class GitCommandNotFound(CommandError): class GitCommandError(CommandError): """ Thrown if execution of the git command fails with non-zero status code. """ - def __init__(self, command, status, stderr=None, stdout=None): + def __init__(self, command: Union[List[str], Tuple[str, ...], str], + status: Union[str, int, None, Exception] = None, + stderr: Union[bytes, str, None] = None, + stdout: Union[bytes, str, None] = None, + ) -> None: super(GitCommandError, self).__init__(command, status, stderr, stdout) @@ -92,13 +113,15 @@ class CheckoutError(GitError): were checked out successfully and hence match the version stored in the index""" - def __init__(self, message, failed_files, valid_files, failed_reasons): + def __init__(self, message: str, failed_files: Sequence[PathLike], valid_files: Sequence[PathLike], + failed_reasons: List[str]) -> None: + Exception.__init__(self, message) self.failed_files = failed_files self.failed_reasons = failed_reasons self.valid_files = valid_files - def __str__(self): + def __str__(self) -> str: return Exception.__str__(self) + ":%s" % self.failed_files @@ -116,7 +139,11 @@ class HookExecutionError(CommandError): """Thrown if a hook exits with a non-zero exit code. It provides access to the exit code and the string returned via standard output""" - def __init__(self, command, status, stderr=None, stdout=None): + def __init__(self, command: Union[List[str], Tuple[str, ...], str], + status: Union[str, int, None, Exception], + stderr: Union[bytes, str, None] = None, + stdout: Union[bytes, str, None] = None) -> None: + super(HookExecutionError, self).__init__(command, status, stderr, stdout) self._msg = "Hook('%s') failed%s" @@ -124,9 +151,9 @@ class HookExecutionError(CommandError): class RepositoryDirtyError(GitError): """Thrown whenever an operation on a repository fails as it has uncommitted changes that would be overwritten""" - def __init__(self, repo, message): + def __init__(self, repo: 'Repo', message: str) -> None: self.repo = repo self.message = message - def __str__(self): + def __str__(self) -> str: return "Operation cannot be performed on %r: %s" % (self.repo, self.message) diff --git a/git/ext/gitdb b/git/ext/gitdb -Subproject e45fd0792ee9a987a4df26e3139f5c3b107f009 +Subproject 03ab3a1d40c04d6a944299c21db61cf9ce30f6b diff --git a/git/index/base.py b/git/index/base.py index 5b3667ac..04424060 100644 --- a/git/index/base.py +++ b/git/index/base.py @@ -3,6 +3,7 @@ # # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php +from git.refs.reference import Reference import glob from io import BytesIO import os @@ -63,6 +64,23 @@ from .util import ( git_working_dir ) +# typing ----------------------------------------------------------------------------- + +from typing import (Any, BinaryIO, Callable, Dict, IO, Iterable, Iterator, List, + Sequence, TYPE_CHECKING, Tuple, Union) + +from git.types import PathLike, TBD + +if TYPE_CHECKING: + from subprocess import Popen + from git.repo import Repo + + +StageType = int +Treeish = Union[Tree, Commit, str, bytes] + +# ------------------------------------------------------------------------------------ + __all__ = ('IndexFile', 'CheckoutError') @@ -93,7 +111,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): _VERSION = 2 # latest version we support S_IFGITLINK = S_IFGITLINK # a submodule - def __init__(self, repo, file_path=None): + def __init__(self, repo: 'Repo', file_path: PathLike = None) -> None: """Initialize this Index instance, optionally from the given ``file_path``. If no file_path is given, we will be created from the current index file. @@ -102,9 +120,9 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): self.repo = repo self.version = self._VERSION self._extension_data = b'' - self._file_path = file_path or self._index_path() + self._file_path = file_path or self._index_path() # type: PathLike - def _set_cache_(self, attr): + def _set_cache_(self, attr: str) -> None: if attr == "entries": # read the current index # try memory map for speed @@ -115,8 +133,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): ok = True except OSError: # in new repositories, there may be no index, which means we are empty - self.entries = {} - return + self.entries = {} # type: Dict[Tuple[PathLike, StageType], IndexEntry] + return None finally: if not ok: lfd.rollback() @@ -133,15 +151,18 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): else: super(IndexFile, self)._set_cache_(attr) - def _index_path(self): - return join_path_native(self.repo.git_dir, "index") + def _index_path(self) -> PathLike: + if self.repo.git_dir: + return join_path_native(self.repo.git_dir, "index") + else: + raise GitCommandError("No git directory given to join index path") @property - def path(self): + def path(self) -> PathLike: """ :return: Path to the index file we are representing """ return self._file_path - def _delete_entries_cache(self): + def _delete_entries_cache(self) -> None: """Safely clear the entries cache so it can be recreated""" try: del(self.entries) @@ -152,18 +173,18 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): #{ Serializable Interface - def _deserialize(self, stream): + def _deserialize(self, stream: IO) -> 'IndexFile': """Initialize this instance with index values read from the given stream""" self.version, self.entries, self._extension_data, _conten_sha = read_cache(stream) return self - def _entries_sorted(self): + def _entries_sorted(self) -> List[TBD]: """:return: list of entries, in a sorted fashion, first by path, then by stage""" return sorted(self.entries.values(), key=lambda e: (e.path, e.stage)) - def _serialize(self, stream, ignore_extension_data=False): + def _serialize(self, stream: IO, ignore_extension_data: bool = False) -> 'IndexFile': entries = self._entries_sorted() - extension_data = self._extension_data + extension_data = self._extension_data # type: Union[None, bytes] if ignore_extension_data: extension_data = None write_cache(entries, stream, extension_data) @@ -171,7 +192,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): #} END serializable interface - def write(self, file_path=None, ignore_extension_data=False): + def write(self, file_path: Union[None, PathLike] = None, ignore_extension_data: bool = False) -> None: """Write the current state to our file path or to the given one :param file_path: @@ -191,7 +212,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): Alternatively, use IndexFile.write_tree() to handle this case automatically - :return: self""" + :return: self # does it? or returns None?""" # make sure we have our entries read before getting a write lock # else it would be done when streaming. This can happen # if one doesn't change the index, but writes it right away @@ -215,7 +236,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): @post_clear_cache @default_index - def merge_tree(self, rhs, base=None): + def merge_tree(self, rhs: Treeish, base: Union[None, Treeish] = None) -> 'IndexFile': """Merge the given rhs treeish into the current index, possibly taking a common base treeish into account. @@ -242,7 +263,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # -i : ignore working tree status # --aggressive : handle more merge cases # -m : do an actual merge - args = ["--aggressive", "-i", "-m"] + args = ["--aggressive", "-i", "-m"] # type: List[Union[Treeish, str]] if base is not None: args.append(base) args.append(rhs) @@ -251,7 +272,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): return self @classmethod - def new(cls, repo, *tree_sha): + def new(cls, repo: 'Repo', *tree_sha: Union[str, Tree]) -> 'IndexFile': """ Merge the given treeish revisions into a new index which is returned. This method behaves like git-read-tree --aggressive when doing the merge. @@ -264,18 +285,20 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): New IndexFile instance. Its path will be undefined. If you intend to write such a merged Index, supply an alternate file_path to its 'write' method.""" - base_entries = aggressive_tree_merge(repo.odb, [to_bin_sha(str(t)) for t in tree_sha]) + tree_sha_bytes = [to_bin_sha(str(t)) for t in tree_sha] # List[bytes] + base_entries = aggressive_tree_merge(repo.odb, tree_sha_bytes) inst = cls(repo) # convert to entries dict - entries = dict(zip(((e.path, e.stage) for e in base_entries), - (IndexEntry.from_base(e) for e in base_entries))) + entries = dict(zip( + ((e.path, e.stage) for e in base_entries), + (IndexEntry.from_base(e) for e in base_entries))) # type: Dict[Tuple[PathLike, int], IndexEntry] inst.entries = entries return inst @classmethod - def from_tree(cls, repo, *treeish, **kwargs): + def from_tree(cls, repo: 'Repo', *treeish: Treeish, **kwargs: Any) -> 'IndexFile': """Merge the given treeish revisions into a new index which is returned. The original index will remain unaltered @@ -312,7 +335,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): if len(treeish) == 0 or len(treeish) > 3: raise ValueError("Please specify between 1 and 3 treeish, got %i" % len(treeish)) - arg_list = [] + arg_list = [] # type: List[Union[Treeish, str]] # ignore that working tree and index possibly are out of date if len(treeish) > 1: # drop unmerged entries when reading our index and merging @@ -331,7 +354,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # as it considers existing entries. moving it essentially clears the index. # Unfortunately there is no 'soft' way to do it. # The TemporaryFileSwap assure the original file get put back - index_handler = TemporaryFileSwap(join_path_native(repo.git_dir, 'index')) + if repo.git_dir: + index_handler = TemporaryFileSwap(join_path_native(repo.git_dir, 'index')) try: repo.git.read_tree(*arg_list, **kwargs) index = cls(repo, tmp_index) @@ -346,7 +370,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # UTILITIES @unbare_repo - def _iter_expand_paths(self, paths): + def _iter_expand_paths(self, paths: Sequence[PathLike]) -> Iterator[PathLike]: """Expand the directories in list of paths to the corresponding paths accordingly, Note: git will add items multiple times even if a glob overlapped @@ -354,10 +378,10 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): times - we respect that and do not prune""" def raise_exc(e): raise e - r = self.repo.working_tree_dir + r = str(self.repo.working_tree_dir) rs = r + os.sep for path in paths: - abs_path = path + abs_path = str(path) if not osp.isabs(abs_path): abs_path = osp.join(r, path) # END make absolute path @@ -374,7 +398,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # end check symlink # if the path is not already pointing to an existing file, resolve globs if possible - if not os.path.exists(path) and ('?' in path or '*' in path or '[' in path): + if not os.path.exists(abs_path) and ('?' in abs_path or '*' in abs_path or '[' in abs_path): resolved_paths = glob.glob(abs_path) # not abs_path in resolved_paths: # a glob() resolving to the same path we are feeding it with @@ -396,12 +420,12 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # END for each subdirectory except OSError: # was a file or something that could not be iterated - yield path.replace(rs, '') + yield abs_path.replace(rs, '') # END path exception handling # END for each path - def _write_path_to_stdin(self, proc, filepath, item, fmakeexc, fprogress, - read_from_stdout=True): + def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item, fmakeexc, fprogress, + read_from_stdout: bool = True) -> Union[None, str]: """Write path to proc.stdin and make sure it processes the item, including progress. :return: stdout string @@ -417,20 +441,24 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): we will close stdin to break the pipe.""" fprogress(filepath, False, item) - rval = None - try: - proc.stdin.write(("%s\n" % filepath).encode(defenc)) - except IOError as e: - # pipe broke, usually because some error happened - raise fmakeexc() from e - # END write exception handling - proc.stdin.flush() - if read_from_stdout: + rval = None # type: Union[None, str] + + if proc.stdin is not None: + try: + proc.stdin.write(("%s\n" % filepath).encode(defenc)) + except IOError as e: + # pipe broke, usually because some error happened + raise fmakeexc() from e + # END write exception handling + proc.stdin.flush() + + if read_from_stdout and proc.stdout is not None: rval = proc.stdout.readline().strip() fprogress(filepath, True, item) return rval - def iter_blobs(self, predicate=lambda t: True): + def iter_blobs(self, predicate: Callable[[Tuple[StageType, Blob]], bool] = lambda t: True + ) -> Iterator[Tuple[StageType, Blob]]: """ :return: Iterator yielding tuples of Blob objects and stages, tuple(stage, Blob) @@ -446,20 +474,21 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): yield output # END for each entry - def unmerged_blobs(self): + def unmerged_blobs(self) -> Dict[PathLike, List[Tuple[StageType, Blob]]]: """ :return: - Iterator yielding dict(path : list( tuple( stage, Blob, ...))), being + Dict(path : list( tuple( stage, Blob, ...))), being a dictionary associating a path in the index with a list containing sorted stage/blob pairs + :note: Blobs that have been removed in one side simply do not exist in the given stage. I.e. a file removed on the 'other' branch whose entries are at stage 3 will not have a stage 3 entry. """ is_unmerged_blob = lambda t: t[0] != 0 - path_map = {} + path_map = {} # type: Dict[PathLike, List[Tuple[TBD, Blob]]] for stage, blob in self.iter_blobs(is_unmerged_blob): path_map.setdefault(blob.path, []).append((stage, blob)) # END for each unmerged blob @@ -468,10 +497,10 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): return path_map @classmethod - def entry_key(cls, *entry): + def entry_key(cls, *entry: Union[BaseIndexEntry, PathLike, StageType]) -> Tuple[PathLike, StageType]: return entry_key(*entry) - def resolve_blobs(self, iter_blobs): + def resolve_blobs(self, iter_blobs: Iterator[Blob]) -> 'IndexFile': """Resolve the blobs given in blob iterator. This will effectively remove the index entries of the respective path at all non-null stages and add the given blob as new stage null blob. @@ -489,7 +518,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): for blob in iter_blobs: stage_null_key = (blob.path, 0) if stage_null_key in self.entries: - raise ValueError("Path %r already exists at stage 0" % blob.path) + raise ValueError("Path %r already exists at stage 0" % str(blob.path)) # END assert blob is not stage 0 already # delete all possible stages @@ -506,7 +535,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): return self - def update(self): + def update(self) -> 'IndexFile': """Reread the contents of our index file, discarding all cached information we might have. @@ -517,7 +546,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # allows to lazily reread on demand return self - def write_tree(self): + def write_tree(self) -> Tree: """Writes this index to a corresponding Tree object into the repository's object database and return it. @@ -542,7 +571,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): root_tree._cache = tree_items return root_tree - def _process_diff_args(self, args): + def _process_diff_args(self, args: List[Union[str, diff.Diffable, object]] + ) -> List[Union[str, diff.Diffable, object]]: try: args.pop(args.index(self)) except IndexError: @@ -550,18 +580,19 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # END remove self return args - def _to_relative_path(self, path): + def _to_relative_path(self, path: PathLike) -> PathLike: """:return: Version of path relative to our git directory or raise ValueError if it is not within our git direcotory""" if not osp.isabs(path): return path if self.repo.bare: raise InvalidGitRepositoryError("require non-bare repository") - if not path.startswith(self.repo.working_tree_dir): + if not str(path).startswith(str(self.repo.working_tree_dir)): raise ValueError("Absolute path %r is not in git repository at %r" % (path, self.repo.working_tree_dir)) return os.path.relpath(path, self.repo.working_tree_dir) - def _preprocess_add_items(self, items): + def _preprocess_add_items(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]] + ) -> Tuple[List[PathLike], List[BaseIndexEntry]]: """ Split the items into two lists of path strings and BaseEntries. """ paths = [] entries = [] @@ -581,13 +612,14 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # END for each item return paths, entries - def _store_path(self, filepath, fprogress): + def _store_path(self, filepath: PathLike, fprogress: Callable) -> BaseIndexEntry: """Store file at filepath in the database and return the base index entry Needs the git_working_dir decorator active ! This must be assured in the calling code""" st = os.lstat(filepath) # handles non-symlinks as well if S_ISLNK(st.st_mode): # in PY3, readlink is string, but we need bytes. In PY2, it's just OS encoded bytes, we assume UTF-8 - open_stream = lambda: BytesIO(force_bytes(os.readlink(filepath), encoding=defenc)) + open_stream = lambda: BytesIO(force_bytes(os.readlink(filepath), + encoding=defenc)) # type: Callable[[], BinaryIO] else: open_stream = lambda: open(filepath, 'rb') with open_stream() as stream: @@ -599,16 +631,18 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): @unbare_repo @git_working_dir - def _entries_for_paths(self, paths, path_rewriter, fprogress, entries): - entries_added = [] + def _entries_for_paths(self, paths: List[str], path_rewriter: Callable, fprogress: Callable, + entries: List[BaseIndexEntry]) -> List[BaseIndexEntry]: + entries_added = [] # type: List[BaseIndexEntry] if path_rewriter: for path in paths: if osp.isabs(path): abspath = path - gitrelative_path = path[len(self.repo.working_tree_dir) + 1:] + gitrelative_path = path[len(str(self.repo.working_tree_dir)) + 1:] else: gitrelative_path = path - abspath = osp.join(self.repo.working_tree_dir, gitrelative_path) + if self.repo.working_tree_dir: + abspath = osp.join(self.repo.working_tree_dir, gitrelative_path) # end obtain relative and absolute paths blob = Blob(self.repo, Blob.NULL_BIN_SHA, @@ -628,8 +662,9 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # END path handling return entries_added - def add(self, items, force=True, fprogress=lambda *args: None, path_rewriter=None, - write=True, write_extension_data=False): + def add(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]], force: bool = True, + fprogress: Callable = lambda *args: None, path_rewriter: Callable = None, + write: bool = True, write_extension_data: bool = False) -> List[BaseIndexEntry]: """Add files from the working tree, specific blobs or BaseIndexEntries to the index. @@ -816,7 +851,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): @post_clear_cache @default_index - def remove(self, items, working_tree=False, **kwargs): + def remove(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]], working_tree: bool = False, + **kwargs: Any) -> List[str]: """Remove the given items from the index and optionally from the working tree as well. @@ -867,7 +903,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): @post_clear_cache @default_index - def move(self, items, skip_errors=False, **kwargs): + def move(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]], skip_errors: bool = False, + **kwargs: Any) -> List[Tuple[str, str]]: """Rename/move the items, whereas the last item is considered the destination of the move operation. If the destination is a file, the first item ( of two ) must be a file as well. If the destination is a directory, it may be preceded @@ -929,9 +966,9 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): return out - def commit(self, message, parent_commits=None, head=True, author=None, - committer=None, author_date=None, commit_date=None, - skip_hooks=False): + def commit(self, message: str, parent_commits=None, head: bool = True, author: str = None, + committer: str = None, author_date: str = None, commit_date: str = None, + skip_hooks: bool = False) -> Commit: """Commit the current default index file, creating a commit object. For more information on the arguments, see tree.commit. @@ -955,33 +992,39 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): run_commit_hook('post-commit', self) return rval - def _write_commit_editmsg(self, message): + def _write_commit_editmsg(self, message: str) -> None: with open(self._commit_editmsg_filepath(), "wb") as commit_editmsg_file: commit_editmsg_file.write(message.encode(defenc)) - def _remove_commit_editmsg(self): + def _remove_commit_editmsg(self) -> None: os.remove(self._commit_editmsg_filepath()) - def _read_commit_editmsg(self): + def _read_commit_editmsg(self) -> str: with open(self._commit_editmsg_filepath(), "rb") as commit_editmsg_file: return commit_editmsg_file.read().decode(defenc) - def _commit_editmsg_filepath(self): + def _commit_editmsg_filepath(self) -> str: return osp.join(self.repo.common_dir, "COMMIT_EDITMSG") - @classmethod - def _flush_stdin_and_wait(cls, proc, ignore_stdout=False): - proc.stdin.flush() - proc.stdin.close() - stdout = '' - if not ignore_stdout: + def _flush_stdin_and_wait(cls, proc: 'Popen[bytes]', ignore_stdout: bool = False) -> bytes: + stdin_IO = proc.stdin + if stdin_IO: + stdin_IO.flush() + stdin_IO.close() + + stdout = b'' + if not ignore_stdout and proc.stdout: stdout = proc.stdout.read() - proc.stdout.close() - proc.wait() + + if proc.stdout: + proc.stdout.close() + proc.wait() return stdout @default_index - def checkout(self, paths=None, force=False, fprogress=lambda *args: None, **kwargs): + def checkout(self, paths: Union[None, Iterable[PathLike]] = None, force: bool = False, + fprogress: Callable = lambda *args: None, **kwargs: Any + ) -> Union[None, Iterator[PathLike], Sequence[PathLike]]: """Checkout the given paths or all files from the version known to the index into the working tree. @@ -1032,12 +1075,15 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): failed_reasons = [] unknown_lines = [] - def handle_stderr(proc, iter_checked_out_files): - stderr = proc.stderr.read() - if not stderr: - return + def handle_stderr(proc: 'Popen[bytes]', iter_checked_out_files: Iterable[PathLike]) -> None: + + stderr_IO = proc.stderr + if not stderr_IO: + return None # return early if stderr empty + else: + stderr_bytes = stderr_IO.read() # line contents: - stderr = stderr.decode(defenc) + stderr = stderr_bytes.decode(defenc) # git-checkout-index: this already exists endings = (' already exists', ' is not in the cache', ' does not exist at stage', ' is unmerged') for line in stderr.splitlines(): @@ -1101,7 +1147,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): proc = self.repo.git.checkout_index(args, **kwargs) # FIXME: Reading from GIL! make_exc = lambda: GitCommandError(("git-checkout-index",) + tuple(args), 128, proc.stderr.read()) - checked_out_files = [] + checked_out_files = [] # type: List[PathLike] for path in paths: co_path = to_native_path_linux(self._to_relative_path(path)) @@ -1111,11 +1157,11 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): try: self.entries[(co_path, 0)] except KeyError: - folder = co_path + folder = str(co_path) if not folder.endswith('/'): folder += '/' for entry in self.entries.values(): - if entry.path.startswith(folder): + if str(entry.path).startswith(folder): p = entry.path self._write_path_to_stdin(proc, p, p, make_exc, fprogress, read_from_stdout=False) @@ -1145,7 +1191,9 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): assert "Should not reach this point" @default_index - def reset(self, commit='HEAD', working_tree=False, paths=None, head=False, **kwargs): + def reset(self, commit: Union[Commit, Reference, str] = 'HEAD', working_tree: bool = False, + paths: Union[None, Iterable[PathLike]] = None, + head: bool = False, **kwargs: Any) -> 'IndexFile': """Reset the index to reflect the tree at the given commit. This will not adjust our HEAD reference as opposed to HEAD.reset by default. @@ -1213,10 +1261,12 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): return self @default_index - def diff(self, other=diff.Diffable.Index, paths=None, create_patch=False, **kwargs): + def diff(self, other: Union[diff.Diffable.Index, 'IndexFile.Index', Treeish, None, object] = diff.Diffable.Index, + paths: Union[str, List[PathLike], Tuple[PathLike, ...]] = None, create_patch: bool = False, **kwargs: Any + ) -> diff.DiffIndex: """Diff this index against the working copy or a Tree or Commit object - For a documentation of the parameters and return values, see + For a documentation of the parameters and return values, see, Diffable.diff :note: @@ -1234,7 +1284,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): other = self.repo.rev_parse(other) # END object conversion - if isinstance(other, Object): + if isinstance(other, Object): # for Tree or Commit # invert the existing R flag cur_val = kwargs.get('R', False) kwargs['R'] = not cur_val diff --git a/git/index/fun.py b/git/index/fun.py index e92e8e38..1012f480 100644 --- a/git/index/fun.py +++ b/git/index/fun.py @@ -1,6 +1,7 @@ # Contains standalone functions to accompany the index implementation and make it # more versatile # NOTE: Autodoc hates it if this is a docstring + from io import BytesIO import os from stat import ( @@ -10,6 +11,7 @@ from stat import ( S_ISDIR, S_IFMT, S_IFREG, + S_IXUSR, ) import subprocess @@ -47,6 +49,17 @@ from .util import ( unpack ) +# typing ----------------------------------------------------------------------------- + +from typing import (Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast) + +from git.types import PathLike + +if TYPE_CHECKING: + from .base import IndexFile + +# ------------------------------------------------------------------------------------ + S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule CE_NAMEMASK_INV = ~CE_NAMEMASK @@ -55,12 +68,12 @@ __all__ = ('write_cache', 'read_cache', 'write_tree_from_cache', 'entry_key', 'stat_mode_to_index_mode', 'S_IFGITLINK', 'run_commit_hook', 'hook_path') -def hook_path(name, git_dir): +def hook_path(name: str, git_dir: PathLike) -> str: """:return: path to the given named hook in the given git repository directory""" return osp.join(git_dir, 'hooks', name) -def run_commit_hook(name, index, *args): +def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None: """Run the commit hook of the given name. Silently ignores hooks that do not exist. :param name: name of hook, like 'pre-commit' :param index: IndexFile instance @@ -68,10 +81,10 @@ def run_commit_hook(name, index, *args): :raises HookExecutionError: """ hp = hook_path(name, index.repo.git_dir) if not os.access(hp, os.X_OK): - return + return None env = os.environ.copy() - env['GIT_INDEX_FILE'] = safe_decode(index.path) + env['GIT_INDEX_FILE'] = safe_decode(str(index.path)) env['GIT_EDITOR'] = ':' try: cmd = subprocess.Popen([hp] + list(args), @@ -84,11 +97,11 @@ def run_commit_hook(name, index, *args): except Exception as ex: raise HookExecutionError(hp, ex) from ex else: - stdout = [] - stderr = [] - handle_process_output(cmd, stdout.append, stderr.append, finalize_process) - stdout = ''.join(stdout) - stderr = ''.join(stderr) + stdout_list = [] # type: List[str] + stderr_list = [] # type: List[str] + handle_process_output(cmd, stdout_list.append, stderr_list.append, finalize_process) + stdout = ''.join(stdout_list) + stderr = ''.join(stderr_list) if cmd.returncode != 0: stdout = force_text(stdout, defenc) stderr = force_text(stderr, defenc) @@ -103,10 +116,12 @@ def stat_mode_to_index_mode(mode): return S_IFLNK if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules return S_IFGITLINK - return S_IFREG | 0o644 | (mode & 0o111) # blobs with or without executable bit + return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) # blobs with or without executable bit -def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1Writer): +def write_cache(entries: Sequence[Union[BaseIndexEntry, 'IndexEntry']], stream: IO[bytes], + extension_data: Union[None, bytes] = None, + ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer) -> None: """Write the cache represented by entries to a stream :param entries: **sorted** list of entries @@ -119,10 +134,10 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1 :param extension_data: any kind of data to write as a trailer, it must begin a 4 byte identifier, followed by its size ( 4 bytes )""" # wrap the stream into a compatible writer - stream = ShaStreamCls(stream) + stream_sha = ShaStreamCls(stream) - tell = stream.tell - write = stream.write + tell = stream_sha.tell + write = stream_sha.write # header version = 2 @@ -134,8 +149,8 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1 beginoffset = tell() write(entry[4]) # ctime write(entry[5]) # mtime - path = entry[3] - path = force_bytes(path, encoding=defenc) + path_str = entry[3] # type: str + path = force_bytes(path_str, encoding=defenc) plen = len(path) & CE_NAMEMASK # path length assert plen == len(path), "Path %s too long to fit into index" % entry[3] flags = plen | (entry[2] & CE_NAMEMASK_INV) # clear possible previous values @@ -148,34 +163,38 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1 # write previously cached extensions data if extension_data is not None: - stream.write(extension_data) + stream_sha.write(extension_data) # write the sha over the content - stream.write_sha() + stream_sha.write_sha() -def read_header(stream): +def read_header(stream: IO[bytes]) -> Tuple[int, int]: """Return tuple(version_long, num_entries) from the given stream""" type_id = stream.read(4) if type_id != b"DIRC": raise AssertionError("Invalid index file header: %r" % type_id) - version, num_entries = unpack(">LL", stream.read(4 * 2)) + unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2))) + version, num_entries = unpacked # TODO: handle version 3: extended data, see read-cache.c assert version in (1, 2) return version, num_entries -def entry_key(*entry): +def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, int]: """:return: Key suitable to be used for the index.entries dictionary :param entry: One instance of type BaseIndexEntry or the path and the stage""" if len(entry) == 1: - return (entry[0].path, entry[0].stage) - return tuple(entry) + entry_first = cast(BaseIndexEntry, entry[0]) # type: BaseIndexEntry + return (entry_first.path, entry_first.stage) + else: + entry = cast(Tuple[PathLike, int], tuple(entry)) + return entry # END handle entry -def read_cache(stream): +def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'IndexEntry'], bytes, bytes]: """Read a cache file from the given stream :return: tuple(version, entries_dict, extension_data, content_sha) * version is the integer version number @@ -184,7 +203,7 @@ def read_cache(stream): * content_sha is a 20 byte sha on all cache file contents""" version, num_entries = read_header(stream) count = 0 - entries = {} + entries = {} # type: Dict[Tuple[PathLike, int], 'IndexEntry'] read = stream.read tell = stream.tell @@ -223,7 +242,8 @@ def read_cache(stream): return (version, entries, extension_data, content_sha) -def write_tree_from_cache(entries, odb, sl, si=0): +def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 + ) -> Tuple[bytes, List[Tuple[str, int, str]]]: """Create a tree from the given sorted list of entries and put the respective trees into the given object database @@ -233,7 +253,7 @@ def write_tree_from_cache(entries, odb, sl, si=0): :param sl: slice indicating the range we should process on the entries list :return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name""" - tree_items = [] + tree_items = [] # type: List[Tuple[Union[bytes, str], int, str]] tree_items_append = tree_items.append ci = sl.start end = sl.stop @@ -272,18 +292,19 @@ def write_tree_from_cache(entries, odb, sl, si=0): # finally create the tree sio = BytesIO() - tree_to_stream(tree_items, sio.write) + tree_to_stream(tree_items, sio.write) # converts bytes of each item[0] to str + tree_items_stringified = cast(List[Tuple[str, int, str]], tree_items) # type: List[Tuple[str, int, str]] sio.seek(0) istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio)) - return (istream.binsha, tree_items) + return (istream.binsha, tree_items_stringified) -def _tree_entry_to_baseindexentry(tree_entry, stage): +def _tree_entry_to_baseindexentry(tree_entry: Tuple[str, int, str], stage: int) -> BaseIndexEntry: return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])) -def aggressive_tree_merge(odb, tree_shas): +def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]: """ :return: list of BaseIndexEntries representing the aggressive merge of the given trees. All valid entries are on stage 0, whereas the conflicting ones are left @@ -292,7 +313,7 @@ def aggressive_tree_merge(odb, tree_shas): :param tree_shas: 1, 2 or 3 trees as identified by their binary 20 byte shas If 1 or two, the entries will effectively correspond to the last given tree If 3 are given, a 3 way merge is performed""" - out = [] + out = [] # type: List[BaseIndexEntry] out_append = out.append # one and two way is the same for us, as we don't have to handle an existing diff --git a/git/index/typ.py b/git/index/typ.py index 2a7dd799..bb1a0384 100644 --- a/git/index/typ.py +++ b/git/index/typ.py @@ -9,6 +9,17 @@ from .util import ( from git.objects import Blob +# typing ---------------------------------------------------------------------- + +from typing import (List, Sequence, TYPE_CHECKING, Tuple, cast) + +from git.types import PathLike + +if TYPE_CHECKING: + from git.repo import Repo + +# --------------------------------------------------------------------------------- + __all__ = ('BlobFilter', 'BaseIndexEntry', 'IndexEntry') #{ Invariants @@ -31,7 +42,7 @@ class BlobFilter(object): """ __slots__ = 'paths' - def __init__(self, paths): + def __init__(self, paths: Sequence[PathLike]) -> None: """ :param paths: tuple or list of paths which are either pointing to directories or @@ -39,7 +50,7 @@ class BlobFilter(object): """ self.paths = paths - def __call__(self, stage_blob): + def __call__(self, stage_blob: Blob) -> bool: path = stage_blob[1].path for p in self.paths: if path.startswith(p): @@ -57,29 +68,29 @@ class BaseIndexEntry(tuple): expecting a BaseIndexEntry can also handle full IndexEntries even if they use numeric indices for performance reasons. """ - def __str__(self): + def __str__(self) -> str: return "%o %s %i\t%s" % (self.mode, self.hexsha, self.stage, self.path) - def __repr__(self): + def __repr__(self) -> str: return "(%o, %s, %i, %s)" % (self.mode, self.hexsha, self.stage, self.path) @property - def mode(self): + def mode(self) -> int: """ File Mode, compatible to stat module constants """ return self[0] @property - def binsha(self): + def binsha(self) -> bytes: """binary sha of the blob """ return self[1] @property - def hexsha(self): + def hexsha(self) -> str: """hex version of our sha""" return b2a_hex(self[1]).decode('ascii') @property - def stage(self): + def stage(self) -> int: """Stage of the entry, either: * 0 = default stage @@ -92,21 +103,21 @@ class BaseIndexEntry(tuple): return (self[2] & CE_STAGEMASK) >> CE_STAGESHIFT @property - def path(self): + def path(self) -> str: """:return: our path relative to the repository working tree root""" return self[3] @property - def flags(self): + def flags(self) -> List[str]: """:return: flags stored with this entry""" return self[2] @classmethod - def from_blob(cls, blob, stage=0): + def from_blob(cls, blob: Blob, stage: int = 0) -> 'BaseIndexEntry': """:return: Fully equipped BaseIndexEntry at the given stage""" return cls((blob.mode, blob.binsha, stage << CE_STAGESHIFT, blob.path)) - def to_blob(self, repo): + def to_blob(self, repo: 'Repo') -> Blob: """:return: Blob using the information of this index entry""" return Blob(repo, self.binsha, self.mode, self.path) @@ -120,40 +131,40 @@ class IndexEntry(BaseIndexEntry): See the properties for a mapping between names and tuple indices. """ @property - def ctime(self): + def ctime(self) -> Tuple[int, int]: """ :return: Tuple(int_time_seconds_since_epoch, int_nano_seconds) of the file's creation time""" - return unpack(">LL", self[4]) + return cast(Tuple[int, int], unpack(">LL", self[4])) @property - def mtime(self): + def mtime(self) -> Tuple[int, int]: """See ctime property, but returns modification time """ - return unpack(">LL", self[5]) + return cast(Tuple[int, int], unpack(">LL", self[5])) @property - def dev(self): + def dev(self) -> int: """ Device ID """ return self[6] @property - def inode(self): + def inode(self) -> int: """ Inode ID """ return self[7] @property - def uid(self): + def uid(self) -> int: """ User ID """ return self[8] @property - def gid(self): + def gid(self) -> int: """ Group ID """ return self[9] @property - def size(self): + def size(self) -> int: """:return: Uncompressed size of the blob """ return self[10] @@ -169,7 +180,7 @@ class IndexEntry(BaseIndexEntry): return IndexEntry((base.mode, base.binsha, base.flags, base.path, time, time, 0, 0, 0, 0, 0)) @classmethod - def from_blob(cls, blob, stage=0): + def from_blob(cls, blob: Blob, stage: int = 0) -> 'IndexEntry': """:return: Minimal entry resembling the given blob object""" time = pack(">LL", 0, 0) return IndexEntry((blob.mode, blob.binsha, stage << CE_STAGESHIFT, blob.path, diff --git a/git/index/util.py b/git/index/util.py index 02742a5d..471e9262 100644 --- a/git/index/util.py +++ b/git/index/util.py @@ -9,6 +9,15 @@ from git.compat import is_win import os.path as osp +# typing ---------------------------------------------------------------------- + +from typing import (Any, Callable) + +from git.types import PathLike + +# --------------------------------------------------------------------------------- + + __all__ = ('TemporaryFileSwap', 'post_clear_cache', 'default_index', 'git_working_dir') #{ Aliases @@ -24,16 +33,16 @@ class TemporaryFileSwap(object): and moving it back on to where on object deletion.""" __slots__ = ("file_path", "tmp_file_path") - def __init__(self, file_path): + def __init__(self, file_path: PathLike) -> None: self.file_path = file_path - self.tmp_file_path = self.file_path + tempfile.mktemp('', '', '') + self.tmp_file_path = str(self.file_path) + tempfile.mktemp('', '', '') # it may be that the source does not exist try: os.rename(self.file_path, self.tmp_file_path) except OSError: pass - def __del__(self): + def __del__(self) -> None: if osp.isfile(self.tmp_file_path): if is_win and osp.exists(self.file_path): os.remove(self.file_path) @@ -43,7 +52,7 @@ class TemporaryFileSwap(object): #{ Decorators -def post_clear_cache(func): +def post_clear_cache(func: Callable[..., Any]) -> Callable[..., Any]: """Decorator for functions that alter the index using the git command. This would invalidate our possibly existing entries dictionary which is why it must be deleted to allow it to be lazily reread later. @@ -54,7 +63,7 @@ def post_clear_cache(func): """ @wraps(func) - def post_clear_cache_if_not_raised(self, *args, **kwargs): + def post_clear_cache_if_not_raised(self, *args: Any, **kwargs: Any) -> Any: rval = func(self, *args, **kwargs) self._delete_entries_cache() return rval @@ -63,13 +72,13 @@ def post_clear_cache(func): return post_clear_cache_if_not_raised -def default_index(func): +def default_index(func: Callable[..., Any]) -> Callable[..., Any]: """Decorator assuring the wrapped method may only run if we are the default repository index. This is as we rely on git commands that operate on that index only. """ @wraps(func) - def check_default_index(self, *args, **kwargs): + def check_default_index(self, *args: Any, **kwargs: Any) -> Any: if self._file_path != self._index_path(): raise AssertionError( "Cannot call %r on indices that do not represent the default git index" % func.__name__) @@ -79,12 +88,12 @@ def default_index(func): return check_default_index -def git_working_dir(func): +def git_working_dir(func: Callable[..., Any]) -> Callable[..., None]: """Decorator which changes the current working dir to the one of the git repository in order to assure relative paths are handled correctly""" @wraps(func) - def set_git_working_dir(self, *args, **kwargs): + def set_git_working_dir(self, *args: Any, **kwargs: Any) -> None: cur_wd = os.getcwd() os.chdir(self.repo.working_tree_dir) try: diff --git a/git/objects/__init__.py b/git/objects/__init__.py index 23b2416a..897eb98f 100644 --- a/git/objects/__init__.py +++ b/git/objects/__init__.py @@ -16,8 +16,8 @@ from .tag import * from .tree import * # Fix import dependency - add IndexObject to the util module, so that it can be # imported by the submodule.base -smutil.IndexObject = IndexObject -smutil.Object = Object +smutil.IndexObject = IndexObject # type: ignore[attr-defined] +smutil.Object = Object # type: ignore[attr-defined] del(smutil) # must come after submodule was made available diff --git a/git/objects/base.py b/git/objects/base.py index cccb5ec6..59f0e836 100644 --- a/git/objects/base.py +++ b/git/objects/base.py @@ -7,6 +7,7 @@ from git.util import LazyMixin, join_path_native, stream_copy, bin_to_hex import gitdb.typ as dbtyp import os.path as osp +from typing import Optional # noqa: F401 unused import from .util import get_object_type_by_name @@ -24,7 +25,7 @@ class Object(LazyMixin): TYPES = (dbtyp.str_blob_type, dbtyp.str_tree_type, dbtyp.str_commit_type, dbtyp.str_tag_type) __slots__ = ("repo", "binsha", "size") - type = None # to be set by subclass + type = None # type: Optional[str] # to be set by subclass def __init__(self, repo, binsha): """Initialize an object by identifying it by its binary sha. diff --git a/git/refs/log.py b/git/refs/log.py index fcd2c23c..363c3c5d 100644 --- a/git/refs/log.py +++ b/git/refs/log.py @@ -97,8 +97,8 @@ class RefLogEntry(tuple): " Got %s" % repr(line)) # END handle first split - oldhexsha = info[:40] - newhexsha = info[41:81] + oldhexsha = info[:40] # type: str + newhexsha = info[41:81] # type: str for hexsha in (oldhexsha, newhexsha): if not cls._re_hexsha_only.match(hexsha): raise ValueError("Invalid hexsha: %r" % (hexsha,)) diff --git a/git/refs/reference.py b/git/refs/reference.py index aaa9b63f..9014f555 100644 --- a/git/refs/reference.py +++ b/git/refs/reference.py @@ -103,7 +103,7 @@ class Reference(SymbolicReference, LazyMixin, Iterable): #{ Remote Interface - @property + @property # type: ignore ## mypy cannot deal with properties with an extra decorator (2021-04-21) @require_remote_ref_path def remote_name(self): """ @@ -114,7 +114,7 @@ class Reference(SymbolicReference, LazyMixin, Iterable): # /refs/remotes/<remote name>/<branch_name> return tokens[2] - @property + @property # type: ignore ## mypy cannot deal with properties with an extra decorator (2021-04-21) @require_remote_ref_path def remote_head(self): """:return: Name of the remote head itself, i.e. master. diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py index 22d9c1d5..64a6591a 100644 --- a/git/refs/symbolic.py +++ b/git/refs/symbolic.py @@ -45,7 +45,7 @@ class SymbolicReference(object): _remote_common_path_default = "refs/remotes" _id_attribute_ = "name" - def __init__(self, repo, path): + def __init__(self, repo, path, check_path=None): self.repo = repo self.path = path diff --git a/git/refs/tag.py b/git/refs/tag.py index 8f88c522..4d84239e 100644 --- a/git/refs/tag.py +++ b/git/refs/tag.py @@ -18,7 +18,8 @@ class TagReference(Reference): print(tagref.tag.message)""" __slots__ = () - _common_path_default = "refs/tags" + _common_default = "tags" + _common_path_default = Reference._common_path_default + "/" + _common_default @property def commit(self): diff --git a/git/remote.py b/git/remote.py index 65916614..6ea4b2a1 100644 --- a/git/remote.py +++ b/git/remote.py @@ -9,14 +9,14 @@ import logging import re from git.cmd import handle_process_output, Git -from git.compat import (defenc, force_text, is_win) +from git.compat import (defenc, force_text) from git.exc import GitCommandError from git.util import ( LazyMixin, Iterable, IterableList, RemoteProgress, - CallableRemoteProgress + CallableRemoteProgress, ) from git.util import ( join_path, @@ -34,6 +34,21 @@ from .refs import ( TagReference ) +# typing------------------------------------------------------- + +from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, TYPE_CHECKING, Union, cast, overload + +from git.types import PathLike, Literal, TBD + +if TYPE_CHECKING: + from git.repo.base import Repo + from git.objects.commit import Commit + from git.objects.blob import Blob + from git.objects.tree import Tree + from git.objects.tag import TagObject + +flagKeyLiteral = Literal[' ', '!', '+', '-', '*', '=', 't'] +# ------------------------------------------------------------- log = logging.getLogger('git.remote') log.addHandler(logging.NullHandler()) @@ -44,7 +59,7 @@ __all__ = ('RemoteProgress', 'PushInfo', 'FetchInfo', 'Remote') #{ Utilities -def add_progress(kwargs, git, progress): +def add_progress(kwargs: Any, git: Git, progress: Union[Callable[..., Any], None]) -> Any: """Add the --progress flag to the given kwargs dict if supported by the git command. If the actual progress in the given progress instance is not given, we do not request any progress @@ -60,7 +75,23 @@ def add_progress(kwargs, git, progress): #} END utilities -def to_progress_instance(progress): +@overload +def to_progress_instance(progress: None) -> RemoteProgress: + ... + + +@overload +def to_progress_instance(progress: Callable[..., Any]) -> CallableRemoteProgress: + ... + + +@overload +def to_progress_instance(progress: RemoteProgress) -> RemoteProgress: + ... + + +def to_progress_instance(progress: Union[Callable[..., Any], RemoteProgress, None] + ) -> Union[RemoteProgress, CallableRemoteProgress]: """Given the 'progress' return a suitable object derived from RemoteProgress(). """ @@ -104,9 +135,10 @@ class PushInfo(object): '=': UP_TO_DATE, '!': ERROR} - def __init__(self, flags, local_ref, remote_ref_string, remote, old_commit=None, - summary=''): - """ Initialize a new instance """ + def __init__(self, flags: int, local_ref: Union[SymbolicReference, None], remote_ref_string: str, remote: 'Remote', + old_commit: Optional[str] = None, summary: str = '') -> None: + """ Initialize a new instance + local_ref: HEAD | Head | RemoteReference | TagReference | Reference | SymbolicReference | None """ self.flags = flags self.local_ref = local_ref self.remote_ref_string = remote_ref_string @@ -115,11 +147,11 @@ class PushInfo(object): self.summary = summary @property - def old_commit(self): + def old_commit(self) -> Union[str, SymbolicReference, 'Commit', 'TagObject', 'Blob', 'Tree', None]: return self._old_commit_sha and self._remote.repo.commit(self._old_commit_sha) or None @property - def remote_ref(self): + def remote_ref(self) -> Union[RemoteReference, TagReference]: """ :return: Remote Reference or TagReference in the local repository corresponding @@ -135,7 +167,7 @@ class PushInfo(object): # END @classmethod - def _from_line(cls, remote, line): + def _from_line(cls, remote: 'Remote', line: str) -> 'PushInfo': """Create a new PushInfo instance as parsed from line which is expected to be like refs/heads/master:refs/heads/master 05d2687..1d0568e as bytes""" control_character, from_to, summary = line.split('\t', 3) @@ -151,7 +183,7 @@ class PushInfo(object): # from_to handling from_ref_string, to_ref_string = from_to.split(':') if flags & cls.DELETED: - from_ref = None + from_ref = None # type: Union[SymbolicReference, None] else: if from_ref_string == "(delete)": from_ref = None @@ -159,7 +191,7 @@ class PushInfo(object): from_ref = Reference.from_path(remote.repo, from_ref_string) # commit handling, could be message or commit info - old_commit = None + old_commit = None # type: Optional[str] if summary.startswith('['): if "[rejected]" in summary: flags |= cls.REJECTED @@ -218,10 +250,10 @@ class FetchInfo(object): '=': HEAD_UPTODATE, ' ': FAST_FORWARD, '-': TAG_UPDATE, - } + } # type: Dict[flagKeyLiteral, int] @classmethod - def refresh(cls): + def refresh(cls) -> Literal[True]: """This gets called by the refresh function (see the top level __init__). """ @@ -244,7 +276,9 @@ class FetchInfo(object): return True - def __init__(self, ref, flags, note='', old_commit=None, remote_ref_path=None): + def __init__(self, ref: SymbolicReference, flags: int, note: str = '', + old_commit: Union['Commit', TagReference, 'Tree', 'Blob', None] = None, + remote_ref_path: Optional[PathLike] = None) -> None: """ Initialize a new instance """ @@ -254,21 +288,21 @@ class FetchInfo(object): self.old_commit = old_commit self.remote_ref_path = remote_ref_path - def __str__(self): + def __str__(self) -> str: return self.name @property - def name(self): + def name(self) -> str: """:return: Name of our remote ref""" return self.ref.name @property - def commit(self): + def commit(self) -> 'Commit': """:return: Commit of our remote ref""" return self.ref.commit @classmethod - def _from_line(cls, repo, line, fetch_line): + def _from_line(cls, repo: 'Repo', line: str, fetch_line: str) -> 'FetchInfo': """Parse information from the given line as returned by git-fetch -v and return a new FetchInfo object representing this information. @@ -290,7 +324,9 @@ class FetchInfo(object): raise ValueError("Failed to parse line: %r" % line) # parse lines - control_character, operation, local_remote_ref, remote_local_ref, note = match.groups() + control_character, operation, local_remote_ref, remote_local_ref_str, note = match.groups() + control_character = cast(flagKeyLiteral, control_character) # can do this neater once 3.5 dropped + try: _new_hex_sha, _fetch_operation, fetch_note = fetch_line.split("\t") ref_type_name, fetch_note = fetch_note.split(' ', 1) @@ -306,7 +342,7 @@ class FetchInfo(object): # END control char exception handling # parse operation string for more info - makes no sense for symbolic refs, but we parse it anyway - old_commit = None + old_commit = None # type: Union[Commit, TagReference, Tree, Blob, None] is_tag_operation = False if 'rejected' in operation: flags |= cls.REJECTED @@ -330,7 +366,7 @@ class FetchInfo(object): # the fetch result is stored in FETCH_HEAD which destroys the rule we usually # have. In that case we use a symbolic reference which is detached ref_type = None - if remote_local_ref == "FETCH_HEAD": + if remote_local_ref_str == "FETCH_HEAD": ref_type = SymbolicReference elif ref_type_name == "tag" or is_tag_operation: # the ref_type_name can be branch, whereas we are still seeing a tag operation. It happens during @@ -358,21 +394,21 @@ class FetchInfo(object): # by the 'ref/' prefix. Otherwise even a tag could be in refs/remotes, which is when it will have the # 'tags/' subdirectory in its path. # We don't want to test for actual existence, but try to figure everything out analytically. - ref_path = None - remote_local_ref = remote_local_ref.strip() - if remote_local_ref.startswith(Reference._common_path_default + "/"): + ref_path = None # type: Optional[PathLike] + remote_local_ref_str = remote_local_ref_str.strip() + if remote_local_ref_str.startswith(Reference._common_path_default + "/"): # always use actual type if we get absolute paths # Will always be the case if something is fetched outside of refs/remotes (if its not a tag) - ref_path = remote_local_ref + ref_path = remote_local_ref_str if ref_type is not TagReference and not \ - remote_local_ref.startswith(RemoteReference._common_path_default + "/"): + remote_local_ref_str.startswith(RemoteReference._common_path_default + "/"): ref_type = Reference # END downgrade remote reference - elif ref_type is TagReference and 'tags/' in remote_local_ref: + elif ref_type is TagReference and 'tags/' in remote_local_ref_str: # even though its a tag, it is located in refs/remotes - ref_path = join_path(RemoteReference._common_path_default, remote_local_ref) + ref_path = join_path(RemoteReference._common_path_default, remote_local_ref_str) else: - ref_path = join_path(ref_type._common_path_default, remote_local_ref) + ref_path = join_path(ref_type._common_path_default, remote_local_ref_str) # END obtain refpath # even though the path could be within the git conventions, we make @@ -398,25 +434,15 @@ class Remote(LazyMixin, Iterable): __slots__ = ("repo", "name", "_config_reader") _id_attribute_ = "name" - def __init__(self, repo, name): + def __init__(self, repo: 'Repo', name: str) -> None: """Initialize a remote instance :param repo: The repository we are a remote of :param name: the name of the remote, i.e. 'origin'""" - self.repo = repo + self.repo = repo # type: 'Repo' self.name = name - if is_win: - # some oddity: on windows, python 2.5, it for some reason does not realize - # that it has the config_writer property, but instead calls __getattr__ - # which will not yield the expected results. 'pinging' the members - # with a dir call creates the config_writer property that we require - # ... bugs like these make me wonder whether python really wants to be used - # for production. It doesn't happen on linux though. - dir(self) - # END windows special handling - - def __getattr__(self, attr): + def __getattr__(self, attr: str) -> Any: """Allows to call this instance like remote.special( \\*args, \\*\\*kwargs) to call git-remote special self.name""" if attr == "_config_reader": @@ -430,10 +456,10 @@ class Remote(LazyMixin, Iterable): return super(Remote, self).__getattr__(attr) # END handle exception - def _config_section_name(self): + def _config_section_name(self) -> str: return 'remote "%s"' % self.name - def _set_cache_(self, attr): + def _set_cache_(self, attr: str) -> Any: if attr == "_config_reader": # NOTE: This is cached as __getattr__ is overridden to return remote config values implicitly, such as # in print(r.pushurl) @@ -441,22 +467,22 @@ class Remote(LazyMixin, Iterable): else: super(Remote, self)._set_cache_(attr) - def __str__(self): + def __str__(self) -> str: return self.name - def __repr__(self): + def __repr__(self) -> str: return '<git.%s "%s">' % (self.__class__.__name__, self.name) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and self.name == other.name - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not (self == other) - def __hash__(self): + def __hash__(self) -> int: return hash(self.name) - def exists(self): + def exists(self) -> bool: """ :return: True if this is a valid, existing remote. Valid remotes have an entry in the repository's configuration""" @@ -471,7 +497,7 @@ class Remote(LazyMixin, Iterable): # end @classmethod - def iter_items(cls, repo): + def iter_items(cls, repo: 'Repo', *args: Any, **kwargs: Any) -> Iterator['Remote']: """:return: Iterator yielding Remote objects of the given repository""" for section in repo.config_reader("repository").sections(): if not section.startswith('remote '): @@ -483,7 +509,7 @@ class Remote(LazyMixin, Iterable): yield Remote(repo, section[lbound + 1:rbound]) # END for each configuration section - def set_url(self, new_url, old_url=None, **kwargs): + def set_url(self, new_url: str, old_url: Optional[str] = None, **kwargs: Any) -> 'Remote': """Configure URLs on current remote (cf command git remote set_url) This command manages URLs on the remote. @@ -500,7 +526,7 @@ class Remote(LazyMixin, Iterable): self.repo.git.remote(scmd, self.name, new_url, **kwargs) return self - def add_url(self, url, **kwargs): + def add_url(self, url: str, **kwargs: Any) -> 'Remote': """Adds a new url on current remote (special case of git remote set_url) This command adds new URLs to a given remote, making it possible to have @@ -511,7 +537,7 @@ class Remote(LazyMixin, Iterable): """ return self.set_url(url, add=True) - def delete_url(self, url, **kwargs): + def delete_url(self, url: str, **kwargs: Any) -> 'Remote': """Deletes a new url on current remote (special case of git remote set_url) This command deletes new URLs to a given remote, making it possible to have @@ -523,10 +549,11 @@ class Remote(LazyMixin, Iterable): return self.set_url(url, delete=True) @property - def urls(self): + def urls(self) -> Iterator[str]: """:return: Iterator yielding all configured URL targets on a remote as strings""" try: - remote_details = self.repo.git.remote("get-url", "--all", self.name) + # can replace cast with type assert? + remote_details = cast(str, self.repo.git.remote("get-url", "--all", self.name)) for line in remote_details.split('\n'): yield line except GitCommandError as ex: @@ -537,23 +564,23 @@ class Remote(LazyMixin, Iterable): # if 'Unknown subcommand: get-url' in str(ex): try: - remote_details = self.repo.git.remote("show", self.name) + remote_details = cast(str, self.repo.git.remote("show", self.name)) for line in remote_details.split('\n'): if ' Push URL:' in line: yield line.split(': ')[-1] - except GitCommandError as ex: - if any(msg in str(ex) for msg in ['correct access rights', 'cannot run ssh']): + except GitCommandError as _ex: + if any(msg in str(_ex) for msg in ['correct access rights', 'cannot run ssh']): # If ssh is not setup to access this repository, see issue 694 - remote_details = self.repo.git.config('--get-all', 'remote.%s.url' % self.name) + remote_details = cast(str, self.repo.git.config('--get-all', 'remote.%s.url' % self.name)) for line in remote_details.split('\n'): yield line else: - raise ex + raise _ex else: raise ex @property - def refs(self): + def refs(self) -> IterableList: """ :return: IterableList of RemoteReference objects. It is prefixed, allowing @@ -564,7 +591,7 @@ class Remote(LazyMixin, Iterable): return out_refs @property - def stale_refs(self): + def stale_refs(self) -> IterableList: """ :return: IterableList RemoteReference objects that do not have a corresponding @@ -585,7 +612,7 @@ class Remote(LazyMixin, Iterable): # * [would prune] origin/new_branch token = " * [would prune] " if not line.startswith(token): - raise ValueError("Could not parse git-remote prune result: %r" % line) + continue ref_name = line.replace(token, "") # sometimes, paths start with a full ref name, like refs/tags/foo, see #260 if ref_name.startswith(Reference._common_path_default + '/'): @@ -598,7 +625,7 @@ class Remote(LazyMixin, Iterable): return out_refs @classmethod - def create(cls, repo, name, url, **kwargs): + def create(cls, repo: 'Repo', name: str, url: str, **kwargs: Any) -> 'Remote': """Create a new remote to the given repository :param repo: Repository instance that is to receive the new remote :param name: Desired name of the remote @@ -615,7 +642,7 @@ class Remote(LazyMixin, Iterable): add = create @classmethod - def remove(cls, repo, name): + def remove(cls, repo: 'Repo', name: str) -> str: """Remove the remote with the given name :return: the passed remote name to remove """ @@ -627,7 +654,7 @@ class Remote(LazyMixin, Iterable): # alias rm = remove - def rename(self, new_name): + def rename(self, new_name: str) -> 'Remote': """Rename self to the given new_name :return: self """ if self.name == new_name: @@ -639,7 +666,7 @@ class Remote(LazyMixin, Iterable): return self - def update(self, **kwargs): + def update(self, **kwargs: Any) -> 'Remote': """Fetch all changes for this remote, including new branches which will be forced in ( in case your local remote branch is not part the new remote branches ancestry anymore ). @@ -653,7 +680,8 @@ class Remote(LazyMixin, Iterable): self.repo.git.remote(scmd, self.name, **kwargs) return self - def _get_fetch_info_from_stderr(self, proc, progress): + def _get_fetch_info_from_stderr(self, proc: TBD, + progress: Union[Callable[..., Any], RemoteProgress, None]) -> IterableList: progress = to_progress_instance(progress) # skip first line as it is some remote info we are not interested in @@ -712,7 +740,8 @@ class Remote(LazyMixin, Iterable): log.warning("Git informed while fetching: %s", err_line.strip()) return output - def _get_push_info(self, proc, progress): + def _get_push_info(self, proc: TBD, + progress: Union[Callable[..., Any], RemoteProgress, None]) -> IterableList: progress = to_progress_instance(progress) # read progress information from stderr @@ -720,9 +749,9 @@ class Remote(LazyMixin, Iterable): # read the lines manually as it will use carriage returns between the messages # to override the previous one. This is why we read the bytes manually progress_handler = progress.new_message_handler() - output = [] + output = IterableList('push_infos') - def stdout_handler(line): + def stdout_handler(line: str) -> None: try: output.append(PushInfo._from_line(self, line)) except ValueError: @@ -741,7 +770,7 @@ class Remote(LazyMixin, Iterable): return output - def _assert_refspec(self): + def _assert_refspec(self) -> None: """Turns out we can't deal with remotes if the refspec is missing""" config = self.config_reader unset = 'placeholder' @@ -754,7 +783,9 @@ class Remote(LazyMixin, Iterable): finally: config.release() - def fetch(self, refspec=None, progress=None, verbose=True, **kwargs): + def fetch(self, refspec: Union[str, List[str], None] = None, + progress: Union[Callable[..., Any], None] = None, + verbose: bool = True, **kwargs: Any) -> IterableList: """Fetch the latest changes for this remote :param refspec: @@ -785,9 +816,10 @@ class Remote(LazyMixin, Iterable): if refspec is None: # No argument refspec, then ensure the repo's config has a fetch refspec. self._assert_refspec() + kwargs = add_progress(kwargs, self.repo.git, progress) if isinstance(refspec, list): - args = refspec + args = refspec # type: Sequence[Optional[str]] # should need this - check logic for passing None through else: args = [refspec] @@ -798,7 +830,9 @@ class Remote(LazyMixin, Iterable): self.repo.odb.update_cache() return res - def pull(self, refspec=None, progress=None, **kwargs): + def pull(self, refspec: Union[str, List[str], None] = None, + progress: Union[Callable[..., Any], None] = None, + **kwargs: Any) -> IterableList: """Pull changes from the given branch, being the same as a fetch followed by a merge of branch with your local branch. @@ -817,7 +851,9 @@ class Remote(LazyMixin, Iterable): self.repo.odb.update_cache() return res - def push(self, refspec=None, progress=None, **kwargs): + def push(self, refspec: Union[str, List[str], None] = None, + progress: Union[Callable[..., Any], None] = None, + **kwargs: Any) -> IterableList: """Push changes from source branch in refspec to target branch in refspec. :param refspec: see 'fetch' method @@ -848,14 +884,14 @@ class Remote(LazyMixin, Iterable): return self._get_push_info(proc, progress) @property - def config_reader(self): + def config_reader(self) -> SectionConstraint: """ :return: GitConfigParser compatible object able to read options for only our remote. Hence you may simple type config.get("pushurl") to obtain the information""" return self._config_reader - def _clear_cache(self): + def _clear_cache(self) -> None: try: del(self._config_reader) except AttributeError: @@ -863,7 +899,7 @@ class Remote(LazyMixin, Iterable): # END handle exception @property - def config_writer(self): + def config_writer(self) -> SectionConstraint: """ :return: GitConfigParser compatible object able to write options for this remote. :note: diff --git a/git/repo/base.py b/git/repo/base.py index 99e87643..6cc56031 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -3,12 +3,13 @@ # # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php - import logging import os import re import warnings +from gitdb.exc import BadObject + from git.cmd import ( Git, handle_process_output @@ -25,7 +26,7 @@ from git.index import IndexFile from git.objects import Submodule, RootModule, Commit from git.refs import HEAD, Head, Reference, TagReference from git.remote import Remote, add_progress, to_progress_instance -from git.util import Actor, finalize_process, decygpath, hex_to_bin, expand_path +from git.util import Actor, finalize_process, decygpath, hex_to_bin, expand_path, remove_password_if_present import os.path as osp from .fun import rev_parse, is_git_dir, find_submodule_git_dir, touch, find_worktree_git_dir @@ -34,20 +35,10 @@ import gitdb # typing ------------------------------------------------------ -from git.types import TBD, PathLike -from typing_extensions import Literal -from typing import (Any, - BinaryIO, - Callable, - Dict, - Iterator, - List, - Mapping, - Optional, - TextIO, - Tuple, - Type, - Union, +from git.types import TBD, PathLike, Lit_config_levels +from typing import (Any, BinaryIO, Callable, Dict, + Iterator, List, Mapping, Optional, Sequence, + TextIO, Tuple, Type, Union, NamedTuple, cast, TYPE_CHECKING) if TYPE_CHECKING: # only needed for types @@ -55,7 +46,6 @@ if TYPE_CHECKING: # only needed for types from git.refs.symbolic import SymbolicReference from git.objects import TagObject, Blob, Tree # NOQA: F401 -Lit_config_levels = Literal['system', 'global', 'user', 'repository'] # ----------------------------------------------------------- @@ -91,8 +81,8 @@ class Repo(object): git = cast('Git', None) # Must exist, or __del__ will fail in case we raise on `__init__()` working_dir = None # type: Optional[PathLike] _working_tree_dir = None # type: Optional[PathLike] - git_dir = None # type: Optional[PathLike] - _common_dir = None # type: Optional[PathLike] + git_dir = "" # type: PathLike + _common_dir = "" # type: PathLike # precompiled regex re_whitespace = re.compile(r'\s+') @@ -219,7 +209,7 @@ class Repo(object): common_dir = open(osp.join(self.git_dir, 'commondir'), 'rt').readlines()[0].strip() self._common_dir = osp.join(self.git_dir, common_dir) except OSError: - self._common_dir = None + self._common_dir = "" # adjust the wd in case we are actually bare - we didn't know that # in the first place @@ -231,10 +221,11 @@ class Repo(object): self.git = self.GitCommandWrapperType(self.working_dir) # special handling, in special times - args = [osp.join(self.common_dir, 'objects')] + rootpath = osp.join(self.common_dir, 'objects') if issubclass(odbt, GitCmdObjectDB): - args.append(self.git) - self.odb = odbt(*args) + self.odb = odbt(rootpath, self.git) + else: + self.odb = odbt(rootpath) def __enter__(self) -> 'Repo': return self @@ -276,13 +267,14 @@ class Repo(object): # Description property def _get_description(self) -> str: - filename = osp.join(self.git_dir, 'description') if self.git_dir else "" + if self.git_dir: + filename = osp.join(self.git_dir, 'description') with open(filename, 'rb') as fp: return fp.read().rstrip().decode(defenc) def _set_description(self, descr: str) -> None: - - filename = osp.join(self.git_dir, 'description') if self.git_dir else "" + if self.git_dir: + filename = osp.join(self.git_dir, 'description') with open(filename, 'wb') as fp: fp.write((descr + '\n').encode(defenc)) @@ -411,7 +403,17 @@ class Repo(object): def tag(self, path: PathLike) -> TagReference: """:return: TagReference Object, reference pointing to a Commit or Tag :param path: path to the tag reference, i.e. 0.1.5 or tags/0.1.5 """ - return TagReference(self, path) + full_path = self._to_full_tag_path(path) + return TagReference(self, full_path) + + @staticmethod + def _to_full_tag_path(path): + if path.startswith(TagReference._common_path_default + '/'): + return path + if path.startswith(TagReference._common_default + '/'): + return Reference._common_path_default + '/' + path + else: + return TagReference._common_path_default + '/' + path def create_head(self, path: PathLike, commit: str = 'HEAD', force: bool = False, logmsg: Optional[str] = None @@ -422,7 +424,7 @@ class Repo(object): :return: newly created Head Reference""" return Head.create(self, path, commit, force, logmsg) - def delete_head(self, *heads: HEAD, **kwargs: Any) -> None: + def delete_head(self, *heads: 'SymbolicReference', **kwargs: Any) -> None: """Delete the given heads :param kwargs: Additional keyword arguments to be passed to git-branch""" @@ -441,7 +443,7 @@ class Repo(object): """Delete the given tag references""" return TagReference.delete(self, *tags) - def create_remote(self, name: str, url: PathLike, **kwargs: Any) -> Remote: + def create_remote(self, name: str, url: str, **kwargs: Any) -> Remote: """Create a new remote. For more information, please see the documentation of the Remote.create @@ -450,7 +452,7 @@ class Repo(object): :return: Remote reference""" return Remote.create(self, name, url, **kwargs) - def delete_remote(self, remote: 'Remote') -> Type['Remote']: + def delete_remote(self, remote: 'Remote') -> str: """Delete the given remote.""" return Remote.remove(self, remote) @@ -468,12 +470,11 @@ class Repo(object): elif config_level == "global": return osp.normpath(osp.expanduser("~/.gitconfig")) elif config_level == "repository": - if self._common_dir: - return osp.normpath(osp.join(self._common_dir, "config")) - elif self.git_dir: - return osp.normpath(osp.join(self.git_dir, "config")) - else: + repo_dir = self._common_dir or self.git_dir + if not repo_dir: raise NotADirectoryError + else: + return osp.normpath(osp.join(repo_dir, "config")) raise ValueError("Invalid configuration level: %r" % config_level) @@ -514,7 +515,7 @@ class Repo(object): return GitConfigParser(self._get_config_path(config_level), read_only=False, repo=self) def commit(self, rev: Optional[TBD] = None - ) -> Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree', None]: + ) -> Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree']: """The Commit object for the specified revision :param rev: revision specifier, see git-rev-parse for viable options. @@ -546,7 +547,7 @@ class Repo(object): return self.head.commit.tree return self.rev_parse(str(rev) + "^{tree}") - def iter_commits(self, rev: Optional[TBD] = None, paths: Union[PathLike, List[PathLike]] = '', + def iter_commits(self, rev: Optional[TBD] = None, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any) -> Iterator[Commit]: """A list of Commit objects representing the history of a given ref/commit @@ -618,12 +619,31 @@ class Repo(object): raise return True + def is_valid_object(self, sha: str, object_type: str = None) -> bool: + try: + complete_sha = self.odb.partial_to_complete_sha_hex(sha) + object_info = self.odb.info(complete_sha) + if object_type: + if object_info.type == object_type.encode(): + return True + else: + log.debug("Commit hash points to an object of type '%s'. Requested were objects of type '%s'", + object_info.type.decode(), object_type) + return False + else: + return True + except BadObject: + log.debug("Commit hash is invalid.") + return False + def _get_daemon_export(self) -> bool: - filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) if self.git_dir else "" + if self.git_dir: + filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) return osp.exists(filename) def _set_daemon_export(self, value: object) -> None: - filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) if self.git_dir else "" + if self.git_dir: + filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) fileexists = osp.exists(filename) if value and not fileexists: touch(filename) @@ -639,7 +659,8 @@ class Repo(object): """The list of alternates for this repo from which objects can be retrieved :return: list of strings being pathnames of alternates""" - alternates_path = osp.join(self.git_dir, 'objects', 'info', 'alternates') if self.git_dir else "" + if self.git_dir: + alternates_path = osp.join(self.git_dir, 'objects', 'info', 'alternates') if osp.exists(alternates_path): with open(alternates_path, 'rb') as f: @@ -993,8 +1014,6 @@ class Repo(object): def _clone(cls, git: 'Git', url: PathLike, path: PathLike, odb_default_type: Type[GitCmdObjectDB], progress: Optional[Callable], multi_options: Optional[List[str]] = None, **kwargs: Any ) -> 'Repo': - progress_checked = to_progress_instance(progress) - odbt = kwargs.pop('odbt', odb_default_type) # when pathlib.Path or other classbased path is passed @@ -1017,13 +1036,16 @@ class Repo(object): if multi_options: multi = ' '.join(multi_options).split(' ') proc = git.clone(multi, Git.polish_url(url), clone_path, with_extended_output=True, as_process=True, - v=True, universal_newlines=True, **add_progress(kwargs, git, progress_checked)) - if progress_checked: - handle_process_output(proc, None, progress_checked.new_message_handler(), + v=True, universal_newlines=True, **add_progress(kwargs, git, progress)) + if progress: + handle_process_output(proc, None, to_progress_instance(progress).new_message_handler(), finalize_process, decode_streams=False) else: (stdout, stderr) = proc.communicate() - log.debug("Cmd(%s)'s unused stdout: %s", getattr(proc, 'args', ''), stdout) + cmdline = getattr(proc, 'args', '') + cmdline = remove_password_if_present(cmdline) + + log.debug("Cmd(%s)'s unused stdout: %s", cmdline, stdout) finalize_process(proc, stderr=stderr) # our git command could have a different working dir than our actual @@ -1142,7 +1164,8 @@ class Repo(object): None if we are not currently rebasing. """ - rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD") if self.git_dir else "" + if self.git_dir: + rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD") if not osp.isfile(rebase_head_file): return None return self.commit(open(rebase_head_file, "rt").readline().strip()) diff --git a/git/repo/fun.py b/git/repo/fun.py index 70394081..e96b62e0 100644 --- a/git/repo/fun.py +++ b/git/repo/fun.py @@ -18,7 +18,7 @@ from git.cmd import Git # Typing ---------------------------------------------------------------------- -from typing import AnyStr, Union, Optional, cast, TYPE_CHECKING +from typing import Union, Optional, cast, TYPE_CHECKING from git.types import PathLike if TYPE_CHECKING: from .base import Repo @@ -103,7 +103,7 @@ def find_submodule_git_dir(d: PathLike) -> Optional[PathLike]: return None -def short_to_long(odb: 'GitCmdObjectDB', hexsha: AnyStr) -> Optional[bytes]: +def short_to_long(odb: 'GitCmdObjectDB', hexsha: str) -> Optional[bytes]: """:return: long hexadecimal sha1 from the given less-than-40 byte hexsha or None if no candidate could be found. :param hexsha: hexsha with less than 40 byte""" diff --git a/git/types.py b/git/types.py index dc44c123..91d35b56 100644 --- a/git/types.py +++ b/git/types.py @@ -1,6 +1,27 @@ -import os # @UnusedImport ## not really unused, is in type string +# -*- coding: utf-8 -*- +# This module is part of GitPython and is released under +# the BSD License: http://www.opensource.org/licenses/bsd-license.php + +import os +import sys from typing import Union, Any +if sys.version_info[:2] >= (3, 8): + from typing import Final, Literal # noqa: F401 +else: + from typing_extensions import Final, Literal # noqa: F401 + + +if sys.version_info[:2] < (3, 6): + # os.PathLike (PEP-519) only got introduced with Python 3.6 + PathLike = str +elif sys.version_info[:2] < (3, 9): + # Python >= 3.6, < 3.9 + PathLike = Union[str, os.PathLike] +elif sys.version_info[:2] >= (3, 9): + # os.PathLike only becomes subscriptable from Python 3.9 onwards + PathLike = Union[str, 'os.PathLike[str]'] # forward ref as pylance complains unless editing with py3.9+ TBD = Any -PathLike = Union[str, 'os.PathLike[str]'] + +Lit_config_levels = Literal['system', 'global', 'user', 'repository'] diff --git a/git/util.py b/git/util.py index 04c96789..edbd5f1e 100644 --- a/git/util.py +++ b/git/util.py @@ -3,6 +3,7 @@ # # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php + import contextlib from functools import wraps import getpass @@ -16,8 +17,24 @@ import stat from sys import maxsize import time from unittest import SkipTest +from urllib.parse import urlsplit, urlunsplit + +# typing --------------------------------------------------------- + +from typing import (Any, AnyStr, BinaryIO, Callable, Dict, Generator, IO, Iterator, List, + Optional, Pattern, Sequence, Tuple, Union, cast, TYPE_CHECKING, overload) + +import pathlib + +if TYPE_CHECKING: + from git.remote import Remote + from git.repo.base import Repo +from .types import PathLike, TBD, Literal + +# --------------------------------------------------------------------- -from gitdb.util import (# NOQA @IgnorePep8 + +from gitdb.util import ( # NOQA @IgnorePep8 make_sha, LockedFD, # @UnusedImport file_contents_ro, # @UnusedImport @@ -29,7 +46,7 @@ from gitdb.util import (# NOQA @IgnorePep8 hex_to_bin, # @UnusedImport ) -from git.compat import is_win +from .compat import is_win import os.path as osp from .exc import InvalidGitRepositoryError @@ -47,6 +64,9 @@ __all__ = ["stream_copy", "join_path", "to_native_path_linux", log = logging.getLogger(__name__) +# types############################################################ + + #: We need an easy way to see if Appveyor TCs start failing, #: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy, #: till then, we wish to hide them. @@ -56,22 +76,23 @@ HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get('HIDE_WINDOWS_FREEZE_ERRO #{ Utility Methods -def unbare_repo(func): +def unbare_repo(func: Callable) -> Callable: """Methods with this decorator raise InvalidGitRepositoryError if they encounter a bare repository""" @wraps(func) - def wrapper(self, *args, **kwargs): + def wrapper(self: 'Remote', *args: Any, **kwargs: Any) -> TBD: if self.repo.bare: raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__) # END bare method return func(self, *args, **kwargs) # END wrapper + return wrapper @contextlib.contextmanager -def cwd(new_dir): +def cwd(new_dir: PathLike) -> Generator[PathLike, None, None]: old_dir = os.getcwd() os.chdir(new_dir) try: @@ -80,13 +101,13 @@ def cwd(new_dir): os.chdir(old_dir) -def rmtree(path): +def rmtree(path: PathLike) -> None: """Remove the given recursively. :note: we use shutil rmtree but adjust its behaviour to see whether files that couldn't be deleted are read-only. Windows will not remove them in that case""" - def onerror(func, path, exc_info): + def onerror(func: Callable, path: PathLike, exc_info: TBD) -> None: # Is the error an access error ? os.chmod(path, stat.S_IWUSR) @@ -100,7 +121,7 @@ def rmtree(path): return shutil.rmtree(path, False, onerror) -def rmfile(path): +def rmfile(path: PathLike) -> None: """Ensure file deleted also on *Windows* where read-only files need special treatment.""" if osp.isfile(path): if is_win: @@ -108,7 +129,7 @@ def rmfile(path): os.remove(path) -def stream_copy(source, destination, chunk_size=512 * 1024): +def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int: """Copy all data from the source stream into the destination stream in chunks of size chunk_size @@ -124,11 +145,12 @@ def stream_copy(source, destination, chunk_size=512 * 1024): return br -def join_path(a, *p): +def join_path(a: PathLike, *p: PathLike) -> PathLike: """Join path tokens together similar to osp.join, but always use '/' instead of possibly '\' on windows.""" - path = a + path = str(a) for b in p: + b = str(b) if not b: continue if b.startswith('/'): @@ -142,22 +164,24 @@ def join_path(a, *p): if is_win: - def to_native_path_windows(path): + def to_native_path_windows(path: PathLike) -> PathLike: + path = str(path) return path.replace('/', '\\') - def to_native_path_linux(path): + def to_native_path_linux(path: PathLike) -> PathLike: + path = str(path) return path.replace('\\', '/') __all__.append("to_native_path_windows") to_native_path = to_native_path_windows else: # no need for any work on linux - def to_native_path_linux(path): + def to_native_path_linux(path: PathLike) -> PathLike: return path to_native_path = to_native_path_linux -def join_path_native(a, *p): +def join_path_native(a: PathLike, *p: PathLike) -> PathLike: """ As join path, but makes sure an OS native path is returned. This is only needed to play it safe on my dear windows and to assure nice paths that only @@ -165,7 +189,7 @@ def join_path_native(a, *p): return to_native_path(join_path(a, *p)) -def assure_directory_exists(path, is_file=False): +def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool: """Assure that the directory pointed to by path exists. :param is_file: If True, path is assumed to be a file and handled correctly. @@ -180,18 +204,18 @@ def assure_directory_exists(path, is_file=False): return False -def _get_exe_extensions(): +def _get_exe_extensions() -> Sequence[str]: PATHEXT = os.environ.get('PATHEXT', None) - return tuple(p.upper() for p in PATHEXT.split(os.pathsep)) \ - if PATHEXT \ - else (('.BAT', 'COM', '.EXE') if is_win else ()) + return tuple(p.upper() for p in PATHEXT.split(os.pathsep)) if PATHEXT \ + else ('.BAT', 'COM', '.EXE') if is_win \ + else ('') -def py_where(program, path=None): +def py_where(program: str, path: Optional[PathLike] = None) -> List[str]: # From: http://stackoverflow.com/a/377028/548792 winprog_exts = _get_exe_extensions() - def is_exec(fpath): + def is_exec(fpath: str) -> bool: return osp.isfile(fpath) and os.access(fpath, os.X_OK) and ( os.name != 'nt' or not winprog_exts or any(fpath.upper().endswith(ext) for ext in winprog_exts)) @@ -199,7 +223,7 @@ def py_where(program, path=None): progs = [] if not path: path = os.environ["PATH"] - for folder in path.split(os.pathsep): + for folder in str(path).split(os.pathsep): folder = folder.strip('"') if folder: exe_path = osp.join(folder, program) @@ -209,11 +233,11 @@ def py_where(program, path=None): return progs -def _cygexpath(drive, path): +def _cygexpath(drive: Optional[str], path: PathLike) -> str: if osp.isabs(path) and not drive: ## Invoked from `cygpath()` directly with `D:Apps\123`? # It's an error, leave it alone just slashes) - p = path + p = path # convert to str if AnyPath given else: p = path and osp.normpath(osp.expandvars(osp.expanduser(path))) if osp.isabs(p): @@ -224,8 +248,8 @@ def _cygexpath(drive, path): p = cygpath(p) elif drive: p = '/cygdrive/%s/%s' % (drive.lower(), p) - - return p.replace('\\', '/') + p_str = str(p) # ensure it is a str and not AnyPath + return p_str.replace('\\', '/') _cygpath_parsers = ( @@ -237,27 +261,31 @@ _cygpath_parsers = ( ), (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), - _cygexpath, + (_cygexpath), False ), (re.compile(r"(\w):[/\\](.*)"), - _cygexpath, + (_cygexpath), False ), (re.compile(r"file:(.*)", re.I), (lambda rest_path: rest_path), - True), + True + ), (re.compile(r"(\w{2,}:.*)"), # remote URL, do nothing (lambda url: url), - False), -) + False + ), +) # type: Tuple[Tuple[Pattern[str], Callable, bool], ...] -def cygpath(path): +def cygpath(path: PathLike) -> PathLike: """Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment.""" + path = str(path) # ensure is str and not AnyPath. + #Fix to use Paths when 3.5 dropped. or to be just str if only for urls? if not path.startswith(('/cygdrive', '//')): for regex, parser, recurse in _cygpath_parsers: match = regex.match(path) @@ -275,7 +303,8 @@ def cygpath(path): _decygpath_regex = re.compile(r"/cygdrive/(\w)(/.*)?") -def decygpath(path): +def decygpath(path: PathLike) -> str: + path = str(path) m = _decygpath_regex.match(path) if m: drive, rest_path = m.groups() @@ -286,23 +315,35 @@ def decygpath(path): #: Store boolean flags denoting if a specific Git executable #: is from a Cygwin installation (since `cache_lru()` unsupported on PY2). -_is_cygwin_cache = {} +_is_cygwin_cache = {} # type: Dict[str, Optional[bool]] + +@overload +def is_cygwin_git(git_executable: None) -> Literal[False]: + ... -def is_cygwin_git(git_executable): + +@overload +def is_cygwin_git(git_executable: PathLike) -> bool: + ... + + +def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool: if not is_win: return False - #from subprocess import check_output + if git_executable is None: + return False - is_cygwin = _is_cygwin_cache.get(git_executable) + git_executable = str(git_executable) + is_cygwin = _is_cygwin_cache.get(git_executable) # type: Optional[bool] if is_cygwin is None: is_cygwin = False try: git_dir = osp.dirname(git_executable) if not git_dir: res = py_where(git_executable) - git_dir = osp.dirname(res[0]) if res else None + git_dir = osp.dirname(res[0]) if res else "" ## Just a name given, not a real path. uname_cmd = osp.join(git_dir, 'uname') @@ -318,26 +359,67 @@ def is_cygwin_git(git_executable): return is_cygwin -def get_user_id(): +def get_user_id() -> str: """:return: string identifying the currently active system user as name@node""" return "%s@%s" % (getpass.getuser(), platform.node()) -def finalize_process(proc, **kwargs): +def finalize_process(proc: subprocess.Popen, **kwargs: Any) -> None: """Wait for the process (clone, fetch, pull or push) and handle its errors accordingly""" ## TODO: No close proc-streams?? proc.wait(**kwargs) -def expand_path(p, expand_vars=True): +@overload +def expand_path(p: None, expand_vars: bool = ...) -> None: + ... + + +@overload +def expand_path(p: PathLike, expand_vars: bool = ...) -> str: + # improve these overloads when 3.5 dropped + ... + + +def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]: + if isinstance(p, pathlib.Path): + return p.resolve() try: - p = osp.expanduser(p) + p = osp.expanduser(p) # type: ignore if expand_vars: - p = osp.expandvars(p) - return osp.normpath(osp.abspath(p)) + p = osp.expandvars(p) # type: ignore + return osp.normpath(osp.abspath(p)) # type: ignore except Exception: return None + +def remove_password_if_present(cmdline): + """ + Parse any command line argument and if on of the element is an URL with a + password, replace it by stars (in-place). + + If nothing found just returns the command line as-is. + + This should be used for every log line that print a command line. + """ + new_cmdline = [] + for index, to_parse in enumerate(cmdline): + new_cmdline.append(to_parse) + try: + url = urlsplit(to_parse) + # Remove password from the URL if present + if url.password is None: + continue + + edited_url = url._replace( + netloc=url.netloc.replace(url.password, "*****")) + new_cmdline[index] = urlunsplit(edited_url) + except ValueError: + # This is not a valid URL + continue + return new_cmdline + + #} END utilities #{ Classes @@ -364,13 +446,13 @@ class RemoteProgress(object): re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)") re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)") - def __init__(self): - self._seen_ops = [] - self._cur_line = None - self.error_lines = [] - self.other_lines = [] + def __init__(self) -> None: + self._seen_ops = [] # type: List[TBD] + self._cur_line = None # type: Optional[str] + self.error_lines = [] # type: List[str] + self.other_lines = [] # type: List[str] - def _parse_progress_line(self, line): + def _parse_progress_line(self, line: AnyStr) -> None: """Parse progress information from the given line as retrieved by git-push or git-fetch. @@ -382,33 +464,38 @@ class RemoteProgress(object): # Compressing objects: 50% (1/2) # Compressing objects: 100% (2/2) # Compressing objects: 100% (2/2), done. - self._cur_line = line = line.decode('utf-8') if isinstance(line, bytes) else line - if self.error_lines or self._cur_line.startswith(('error:', 'fatal:')): + if isinstance(line, bytes): # mypy argues about ternary assignment + line_str = line.decode('utf-8') + else: + line_str = line + self._cur_line = line_str + + if self._cur_line.startswith(('error:', 'fatal:')): self.error_lines.append(self._cur_line) return # find escape characters and cut them away - regex will not work with # them as they are non-ascii. As git might expect a tty, it will send them last_valid_index = None - for i, c in enumerate(reversed(line)): + for i, c in enumerate(reversed(line_str)): if ord(c) < 32: # its a slice index last_valid_index = -i - 1 # END character was non-ascii # END for each character in line if last_valid_index is not None: - line = line[:last_valid_index] + line_str = line_str[:last_valid_index] # END cut away invalid part - line = line.rstrip() + line_str = line_str.rstrip() cur_count, max_count = None, None - match = self.re_op_relative.match(line) + match = self.re_op_relative.match(line_str) if match is None: - match = self.re_op_absolute.match(line) + match = self.re_op_absolute.match(line_str) if not match: - self.line_dropped(line) - self.other_lines.append(line) + self.line_dropped(line_str) + self.other_lines.append(line_str) return # END could not get match @@ -437,10 +524,10 @@ class RemoteProgress(object): # This can't really be prevented, so we drop the line verbosely # to make sure we get informed in case the process spits out new # commands at some point. - self.line_dropped(line) + self.line_dropped(line_str) # Note: Don't add this line to the other lines, as we have to silently # drop it - return + return None # END handle op code # figure out stage @@ -465,21 +552,22 @@ class RemoteProgress(object): max_count and float(max_count), message) - def new_message_handler(self): + def new_message_handler(self) -> Callable[[str], None]: """ :return: a progress handler suitable for handle_process_output(), passing lines on to this Progress handler in a suitable format""" - def handler(line): + def handler(line: AnyStr) -> None: return self._parse_progress_line(line.rstrip()) # end return handler - def line_dropped(self, line): + def line_dropped(self, line: str) -> None: """Called whenever a line could not be understood and was therefore dropped.""" pass - def update(self, op_code, cur_count, max_count=None, message=''): + def update(self, op_code: int, cur_count: Union[str, float], max_count: Union[str, float, None] = None, + message: str = '',) -> None: """Called whenever the progress changes :param op_code: @@ -510,11 +598,11 @@ class CallableRemoteProgress(RemoteProgress): """An implementation forwarding updates to any callable""" __slots__ = ('_callable') - def __init__(self, fn): + def __init__(self, fn: Callable) -> None: self._callable = fn super(CallableRemoteProgress, self).__init__() - def update(self, *args, **kwargs): + def update(self, *args: Any, **kwargs: Any) -> None: self._callable(*args, **kwargs) @@ -539,27 +627,27 @@ class Actor(object): __slots__ = ('name', 'email') - def __init__(self, name, email): + def __init__(self, name: Optional[str], email: Optional[str]) -> None: self.name = name self.email = email - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: return self.name == other.name and self.email == other.email - def __ne__(self, other): + def __ne__(self, other: Any) -> bool: return not (self == other) - def __hash__(self): + def __hash__(self) -> int: return hash((self.name, self.email)) - def __str__(self): - return self.name + def __str__(self) -> str: + return self.name if self.name else "" - def __repr__(self): + def __repr__(self) -> str: return '<git.Actor "%s <%s>">' % (self.name, self.email) @classmethod - def _from_string(cls, string): + def _from_string(cls, string: str) -> 'Actor': """Create an Actor from a string. :param string: is the string, which is expected to be in regular git format @@ -580,17 +668,17 @@ class Actor(object): # END handle name/email matching @classmethod - def _main_actor(cls, env_name, env_email, config_reader=None): + def _main_actor(cls, env_name: str, env_email: str, config_reader: Optional[TBD] = None) -> 'Actor': actor = Actor('', '') user_id = None # We use this to avoid multiple calls to getpass.getuser() - def default_email(): + def default_email() -> str: nonlocal user_id if not user_id: user_id = get_user_id() return user_id - def default_name(): + def default_name() -> str: return default_email().split('@')[0] for attr, evar, cvar, default in (('name', env_name, cls.conf_name, default_name), @@ -609,7 +697,7 @@ class Actor(object): return actor @classmethod - def committer(cls, config_reader=None): + def committer(cls, config_reader: Optional[TBD] = None) -> 'Actor': """ :return: Actor instance corresponding to the configured committer. It behaves similar to the git implementation, such that the environment will override @@ -620,7 +708,7 @@ class Actor(object): return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader) @classmethod - def author(cls, config_reader=None): + def author(cls, config_reader: Optional[TBD] = None) -> 'Actor': """Same as committer(), but defines the main author. It may be specified in the environment, but defaults to the committer""" return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader) @@ -654,16 +742,18 @@ class Stats(object): files = number of changed files as int""" __slots__ = ("total", "files") - def __init__(self, total, files): + def __init__(self, total: Dict[str, Dict[str, int]], files: Dict[str, Dict[str, int]]): self.total = total self.files = files @classmethod - def _list_from_string(cls, repo, text): + def _list_from_string(cls, repo: 'Repo', text: str) -> 'Stats': """Create a Stat object from output retrieved by git-diff. :return: git.Stat""" - hsh = {'total': {'insertions': 0, 'deletions': 0, 'lines': 0, 'files': 0}, 'files': {}} + hsh = {'total': {'insertions': 0, 'deletions': 0, 'lines': 0, 'files': 0}, + 'files': {} + } # type: Dict[str, Dict[str, TBD]] ## need typeddict or refactor for mypy for line in text.splitlines(): (raw_insertions, raw_deletions, filename) = line.split("\t") insertions = raw_insertions != '-' and int(raw_insertions) or 0 @@ -689,25 +779,25 @@ class IndexFileSHA1Writer(object): :note: Based on the dulwich project""" __slots__ = ("f", "sha1") - def __init__(self, f): + def __init__(self, f: IO) -> None: self.f = f self.sha1 = make_sha(b"") - def write(self, data): + def write(self, data: AnyStr) -> int: self.sha1.update(data) return self.f.write(data) - def write_sha(self): + def write_sha(self) -> bytes: sha = self.sha1.digest() self.f.write(sha) return sha - def close(self): + def close(self) -> bytes: sha = self.write_sha() self.f.close() return sha - def tell(self): + def tell(self) -> int: return self.f.tell() @@ -721,23 +811,23 @@ class LockFile(object): Locks will automatically be released on destruction""" __slots__ = ("_file_path", "_owns_lock") - def __init__(self, file_path): + def __init__(self, file_path: PathLike) -> None: self._file_path = file_path self._owns_lock = False - def __del__(self): + def __del__(self) -> None: self._release_lock() - def _lock_file_path(self): + def _lock_file_path(self) -> str: """:return: Path to lockfile""" return "%s.lock" % (self._file_path) - def _has_lock(self): + def _has_lock(self) -> bool: """:return: True if we have a lock and if the lockfile still exists :raise AssertionError: if our lock-file does not exist""" return self._owns_lock - def _obtain_lock_or_raise(self): + def _obtain_lock_or_raise(self) -> None: """Create a lock file as flag for other instances, mark our instance as lock-holder :raise IOError: if a lock was already present or a lock file could not be written""" @@ -759,12 +849,12 @@ class LockFile(object): self._owns_lock = True - def _obtain_lock(self): + def _obtain_lock(self) -> None: """The default implementation will raise if a lock cannot be obtained. Subclasses may override this method to provide a different implementation""" return self._obtain_lock_or_raise() - def _release_lock(self): + def _release_lock(self) -> None: """Release our lock if we have one""" if not self._has_lock(): return @@ -789,7 +879,7 @@ class BlockingLockFile(LockFile): can never be obtained.""" __slots__ = ("_check_interval", "_max_block_time") - def __init__(self, file_path, check_interval_s=0.3, max_block_time_s=maxsize): + def __init__(self, file_path: PathLike, check_interval_s: float = 0.3, max_block_time_s: int = maxsize) -> None: """Configure the instance :param check_interval_s: @@ -801,7 +891,7 @@ class BlockingLockFile(LockFile): self._check_interval = check_interval_s self._max_block_time = max_block_time_s - def _obtain_lock(self): + def _obtain_lock(self) -> None: """This method blocks until it obtained the lock, or raises IOError if it ran out of time or if the parent directory was not available anymore. If this method returns, you are guaranteed to own the lock""" @@ -848,14 +938,14 @@ class IterableList(list): can be left out.""" __slots__ = ('_id_attr', '_prefix') - def __new__(cls, id_attr, prefix=''): + def __new__(cls, id_attr: str, prefix: str = '') -> 'IterableList': return super(IterableList, cls).__new__(cls) - def __init__(self, id_attr, prefix=''): + def __init__(self, id_attr: str, prefix: str = '') -> None: self._id_attr = id_attr self._prefix = prefix - def __contains__(self, attr): + def __contains__(self, attr: object) -> bool: # first try identity match for performance try: rval = list.__contains__(self, attr) @@ -867,13 +957,13 @@ class IterableList(list): # otherwise make a full name search try: - getattr(self, attr) + getattr(self, cast(str, attr)) # use cast to silence mypy return True except (AttributeError, TypeError): return False # END handle membership - def __getattr__(self, attr): + def __getattr__(self, attr: str) -> Any: attr = self._prefix + attr for item in self: if getattr(item, self._id_attr) == attr: @@ -881,20 +971,24 @@ class IterableList(list): # END for each item return list.__getattribute__(self, attr) - def __getitem__(self, index): + def __getitem__(self, index: Union[int, slice, str]) -> Any: if isinstance(index, int): return list.__getitem__(self, index) - - try: - return getattr(self, index) - except AttributeError as e: - raise IndexError("No item found with id %r" % (self._prefix + index)) from e + elif isinstance(index, slice): + raise ValueError("Index should be an int or str") + else: + try: + return getattr(self, index) + except AttributeError as e: + raise IndexError("No item found with id %r" % (self._prefix + index)) from e # END handle getattr - def __delitem__(self, index): - delindex = index + def __delitem__(self, index: Union[int, str, slice]) -> None: + + delindex = cast(int, index) if not isinstance(index, int): delindex = -1 + assert not isinstance(index, slice) name = self._prefix + index for i, item in enumerate(self): if getattr(item, self._id_attr) == name: @@ -917,7 +1011,7 @@ class Iterable(object): _id_attribute_ = "attribute that most suitably identifies your instance" @classmethod - def list_items(cls, repo, *args, **kwargs): + def list_items(cls, repo: 'Repo', *args: Any, **kwargs: Any) -> 'IterableList': """ Find all items of this type - subclasses can specify args and kwargs differently. If no args are given, subclasses are obliged to return all items if no additional @@ -931,7 +1025,8 @@ class Iterable(object): return out_list @classmethod - def iter_items(cls, repo, *args, **kwargs): + def iter_items(cls, repo: 'Repo', *args: Any, **kwargs: Any) -> Iterator[TBD]: + # return typed to be compatible with subtypes e.g. Remote """For more information about the arguments, see list_items :return: iterator yielding Items""" raise NotImplementedError("To be implemented by Subclass") @@ -940,5 +1035,5 @@ class Iterable(object): class NullHandler(logging.Handler): - def emit(self, record): + def emit(self, record: object) -> None: pass |
