summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <sebastian.thiel@icloud.com>2021-03-17 17:49:09 +0800
committerGitHub <noreply@github.com>2021-03-17 17:49:09 +0800
commit6643a9feb39d4d49c894c1d25e3d4d71e180208a (patch)
tree002c4c12c1da90f1889672942cdb500ad0dce47d
parent690722a611a25a1afcdb0163d3cfd0a8c89d1d04 (diff)
parentc93e971f3e0aa4dea12a0cb169539fe85681e381 (diff)
downloadgitpython-6643a9feb39d4d49c894c1d25e3d4d71e180208a.tar.gz
Merge pull request #1202 from Yobmod/main
Add more types
-rw-r--r--git/__init__.py12
-rw-r--r--git/cmd.py6
-rw-r--r--git/compat.py38
-rw-r--r--git/config.py4
-rw-r--r--git/db.py24
-rw-r--r--git/diff.py117
-rw-r--r--git/exc.py44
-rw-r--r--git/remote.py10
-rw-r--r--git/repo/base.py54
-rw-r--r--git/util.py256
-rw-r--r--test-requirements.txt1
11 files changed, 338 insertions, 228 deletions
diff --git a/git/__init__.py b/git/__init__.py
index 53440830..ae9254a2 100644
--- a/git/__init__.py
+++ b/git/__init__.py
@@ -5,18 +5,20 @@
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
# flake8: noqa
#@PydevCodeAnalysisIgnore
+from git.exc import * # @NoMove @IgnorePep8
import inspect
import os
import sys
-
import os.path as osp
+from typing import Optional
+from git.types import PathLike
__version__ = 'git'
#{ Initialization
-def _init_externals():
+def _init_externals() -> None:
"""Initialize external projects by putting them into the path"""
if __version__ == 'git' and 'PYOXIDIZER' not in os.environ:
sys.path.insert(1, osp.join(osp.dirname(__file__), 'ext', 'gitdb'))
@@ -29,13 +31,13 @@ def _init_externals():
#} END initialization
+
#################
_init_externals()
#################
#{ Imports
-from git.exc import * # @NoMove @IgnorePep8
try:
from git.config import GitConfigParser # @NoMove @IgnorePep8
from git.objects import * # @NoMove @IgnorePep8
@@ -65,7 +67,8 @@ __all__ = [name for name, obj in locals().items()
#{ Initialize git executable path
GIT_OK = None
-def refresh(path=None):
+
+def refresh(path: Optional[PathLike] = None) -> None:
"""Convenience method for setting the git executable path."""
global GIT_OK
GIT_OK = False
@@ -78,6 +81,7 @@ def refresh(path=None):
GIT_OK = True
#} END initialize git executable path
+
#################
try:
refresh()
diff --git a/git/cmd.py b/git/cmd.py
index ec630d93..0395a708 100644
--- a/git/cmd.py
+++ b/git/cmd.py
@@ -210,7 +210,7 @@ class Git(LazyMixin):
# - a GitCommandNotFound error is spawned by ourselves
# - a PermissionError is spawned if the git executable provided
# cannot be executed for whatever reason
-
+
has_git = False
try:
cls().version()
@@ -498,7 +498,7 @@ class Git(LazyMixin):
# skipcq: PYL-E0301
def __iter__(self):
return self
-
+
def __next__(self):
return self.next()
@@ -639,7 +639,7 @@ class Git(LazyMixin):
:param env:
A dictionary of environment variables to be passed to `subprocess.Popen`.
-
+
:param max_chunk_size:
Maximum number of bytes in one chunk of data passed to the output_stream in
one invocation of write() method. If the given number is not positive then
diff --git a/git/compat.py b/git/compat.py
index de8a238b..a0aea1ac 100644
--- a/git/compat.py
+++ b/git/compat.py
@@ -11,40 +11,50 @@ import locale
import os
import sys
-
from gitdb.utils.encoding import (
force_bytes, # @UnusedImport
force_text # @UnusedImport
)
+# typing --------------------------------------------------------------------
+
+from typing import Any, AnyStr, Dict, Optional, Type
+from git.types import TBD
+
+# ---------------------------------------------------------------------------
-is_win = (os.name == 'nt')
+
+is_win = (os.name == 'nt') # type: bool
is_posix = (os.name == 'posix')
is_darwin = (os.name == 'darwin')
defenc = sys.getfilesystemencoding()
-def safe_decode(s):
+def safe_decode(s: Optional[AnyStr]) -> Optional[str]:
"""Safely decodes a binary string to unicode"""
if isinstance(s, str):
return s
elif isinstance(s, bytes):
return s.decode(defenc, 'surrogateescape')
- elif s is not None:
+ elif s is None:
+ return None
+ else:
raise TypeError('Expected bytes or text, but got %r' % (s,))
-def safe_encode(s):
- """Safely decodes a binary string to unicode"""
+def safe_encode(s: Optional[AnyStr]) -> Optional[bytes]:
+ """Safely encodes a binary string to unicode"""
if isinstance(s, str):
return s.encode(defenc)
elif isinstance(s, bytes):
return s
- elif s is not None:
+ elif s is None:
+ return None
+ else:
raise TypeError('Expected bytes or text, but got %r' % (s,))
-def win_encode(s):
+def win_encode(s: Optional[AnyStr]) -> Optional[bytes]:
"""Encode unicodes for process arguments on Windows."""
if isinstance(s, str):
return s.encode(locale.getpreferredencoding(False))
@@ -52,16 +62,20 @@ def win_encode(s):
return s
elif s is not None:
raise TypeError('Expected bytes or text, but got %r' % (s,))
+ return None
+
-def with_metaclass(meta, *bases):
+def with_metaclass(meta: Type[Any], *bases: Any) -> 'metaclass': # type: ignore ## mypy cannot understand dynamic class creation
"""copied from https://github.com/Byron/bcore/blob/master/src/python/butility/future.py#L15"""
- class metaclass(meta):
+
+ class metaclass(meta): # type: ignore
__call__ = type.__call__
- __init__ = type.__init__
+ __init__ = type.__init__ # type: ignore
- def __new__(cls, name, nbases, d):
+ def __new__(cls, name: str, nbases: Optional[int], d: Dict[str, Any]) -> TBD:
if nbases is None:
return type.__new__(cls, name, (), d)
return meta(name, bases, d)
+
return metaclass(meta.__name__ + 'Helper', None, {})
diff --git a/git/config.py b/git/config.py
index 9f09efe2..aadb0aac 100644
--- a/git/config.py
+++ b/git/config.py
@@ -16,6 +16,8 @@ import re
import fnmatch
from collections import OrderedDict
+from typing_extensions import Literal
+
from git.compat import (
defenc,
force_text,
@@ -194,7 +196,7 @@ class _OMD(OrderedDict):
return [(k, self.getall(k)) for k in self]
-def get_config_path(config_level):
+def get_config_path(config_level: Literal['system', 'global', 'user', 'repository']) -> str:
# we do not support an absolute path of the gitconfig on windows ,
# use the global config instead
diff --git a/git/db.py b/git/db.py
index de2e9991..73051abf 100644
--- a/git/db.py
+++ b/git/db.py
@@ -7,11 +7,19 @@ from gitdb.base import (
from gitdb.db import GitDB # @UnusedImport
from gitdb.db import LooseObjectDB
-from .exc import (
- GitCommandError,
- BadObject
-)
+from gitdb.exc import BadObject
+from git.exc import GitCommandError
+
+# typing-------------------------------------------------
+
+from typing import TYPE_CHECKING, AnyStr
+from git.types import PathLike
+
+if TYPE_CHECKING:
+ from git.cmd import Git
+
+# --------------------------------------------------------
__all__ = ('GitCmdObjectDB', 'GitDB')
@@ -28,23 +36,23 @@ class GitCmdObjectDB(LooseObjectDB):
have packs and the other implementations
"""
- def __init__(self, root_path, git):
+ def __init__(self, root_path: PathLike, git: 'Git') -> None:
"""Initialize this instance with the root and a git command"""
super(GitCmdObjectDB, self).__init__(root_path)
self._git = git
- def info(self, sha):
+ def info(self, sha: bytes) -> OInfo:
hexsha, typename, size = self._git.get_object_header(bin_to_hex(sha))
return OInfo(hex_to_bin(hexsha), typename, size)
- def stream(self, sha):
+ def stream(self, sha: bytes) -> OStream:
"""For now, all lookup is done by git itself"""
hexsha, typename, size, stream = self._git.stream_object_data(bin_to_hex(sha))
return OStream(hex_to_bin(hexsha), typename, size, stream)
# { Interface
- def partial_to_complete_sha_hex(self, partial_hexsha):
+ def partial_to_complete_sha_hex(self, partial_hexsha: AnyStr) -> bytes:
""":return: Full binary 20 byte sha from the given partial hexsha
:raise AmbiguousObjectName:
:raise BadObject:
diff --git a/git/diff.py b/git/diff.py
index a9dc4b57..129223cb 100644
--- a/git/diff.py
+++ b/git/diff.py
@@ -3,8 +3,8 @@
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-import re
+import re
from git.cmd import handle_process_output
from git.compat import defenc
from git.util import finalize_process, hex_to_bin
@@ -13,22 +13,36 @@ from .objects.blob import Blob
from .objects.util import mode_str_to_int
+# typing ------------------------------------------------------------------
+
+from typing import Any, Iterator, List, Match, Optional, Tuple, Type, Union, TYPE_CHECKING
+from typing_extensions import Final, Literal
+from git.types import TBD
+
+if TYPE_CHECKING:
+ from .objects.tree import Tree
+ from git.repo.base import Repo
+
+Lit_change_type = Literal['A', 'D', 'M', 'R', 'T']
+
+# ------------------------------------------------------------------------
+
__all__ = ('Diffable', 'DiffIndex', 'Diff', 'NULL_TREE')
# Special object to compare against the empty tree in diffs
-NULL_TREE = object()
+NULL_TREE = object() # type: Final[object]
_octal_byte_re = re.compile(b'\\\\([0-9]{3})')
-def _octal_repl(matchobj):
+def _octal_repl(matchobj: Match) -> bytes:
value = matchobj.group(1)
value = int(value, 8)
value = bytes(bytearray((value,)))
return value
-def decode_path(path, has_ab_prefix=True):
+def decode_path(path: bytes, has_ab_prefix: bool = True) -> Optional[bytes]:
if path == b'/dev/null':
return None
@@ -60,7 +74,7 @@ class Diffable(object):
class Index(object):
pass
- def _process_diff_args(self, args):
+ def _process_diff_args(self, args: List[Union[str, 'Diffable', object]]) -> List[Union[str, 'Diffable', object]]:
"""
:return:
possibly altered version of the given args list.
@@ -68,7 +82,9 @@ class Diffable(object):
Subclasses can use it to alter the behaviour of the superclass"""
return args
- def diff(self, other=Index, paths=None, create_patch=False, **kwargs):
+ def diff(self, other: Union[Type[Index], Type['Tree'], object, None, str] = Index,
+ paths: Union[str, List[str], Tuple[str, ...], None] = None,
+ create_patch: bool = False, **kwargs: Any) -> 'DiffIndex':
"""Creates diffs between two items being trees, trees and index or an
index and the working tree. It will detect renames automatically.
@@ -99,7 +115,7 @@ class Diffable(object):
:note:
On a bare repository, 'other' needs to be provided as Index or as
as Tree/Commit, or a git command error will occur"""
- args = []
+ args = [] # type: List[Union[str, Diffable, object]]
args.append("--abbrev=40") # we need full shas
args.append("--full-index") # get full index paths, not only filenames
@@ -117,6 +133,9 @@ class Diffable(object):
if paths is not None and not isinstance(paths, (tuple, list)):
paths = [paths]
+ if hasattr(self, 'repo'): # else raise Error?
+ self.repo = self.repo # type: 'Repo'
+
diff_cmd = self.repo.git.diff
if other is self.Index:
args.insert(0, '--cached')
@@ -163,7 +182,7 @@ class DiffIndex(list):
# T = Changed in the type
change_type = ("A", "C", "D", "R", "M", "T")
- def iter_change_type(self, change_type):
+ def iter_change_type(self, change_type: Lit_change_type) -> Iterator['Diff']:
"""
:return:
iterator yielding Diff instances that match the given change_type
@@ -180,7 +199,7 @@ class DiffIndex(list):
if change_type not in self.change_type:
raise ValueError("Invalid change type: %s" % change_type)
- for diff in self:
+ for diff in self: # type: 'Diff'
if diff.change_type == change_type:
yield diff
elif change_type == "A" and diff.new_file:
@@ -255,22 +274,21 @@ class Diff(object):
"new_file", "deleted_file", "copied_file", "raw_rename_from",
"raw_rename_to", "diff", "change_type", "score")
- def __init__(self, repo, a_rawpath, b_rawpath, a_blob_id, b_blob_id, a_mode,
- b_mode, new_file, deleted_file, copied_file, raw_rename_from,
- raw_rename_to, diff, change_type, score):
-
- self.a_mode = a_mode
- self.b_mode = b_mode
+ def __init__(self, repo: 'Repo',
+ a_rawpath: Optional[bytes], b_rawpath: Optional[bytes],
+ a_blob_id: Union[str, bytes, None], b_blob_id: Union[str, bytes, None],
+ a_mode: Union[bytes, str, None], b_mode: Union[bytes, str, None],
+ new_file: bool, deleted_file: bool, copied_file: bool,
+ raw_rename_from: Optional[bytes], raw_rename_to: Optional[bytes],
+ diff: Union[str, bytes, None], change_type: Optional[str], score: Optional[int]) -> None:
assert a_rawpath is None or isinstance(a_rawpath, bytes)
assert b_rawpath is None or isinstance(b_rawpath, bytes)
self.a_rawpath = a_rawpath
self.b_rawpath = b_rawpath
- if self.a_mode:
- self.a_mode = mode_str_to_int(self.a_mode)
- if self.b_mode:
- self.b_mode = mode_str_to_int(self.b_mode)
+ self.a_mode = mode_str_to_int(a_mode) if a_mode else None
+ self.b_mode = mode_str_to_int(b_mode) if b_mode else None
# Determine whether this diff references a submodule, if it does then
# we need to overwrite "repo" to the corresponding submodule's repo instead
@@ -305,27 +323,27 @@ class Diff(object):
self.change_type = change_type
self.score = score
- def __eq__(self, other):
+ def __eq__(self, other: object) -> bool:
for name in self.__slots__:
if getattr(self, name) != getattr(other, name):
return False
# END for each name
return True
- def __ne__(self, other):
+ def __ne__(self, other: object) -> bool:
return not (self == other)
- def __hash__(self):
+ def __hash__(self) -> int:
return hash(tuple(getattr(self, n) for n in self.__slots__))
- def __str__(self):
- h = "%s"
+ def __str__(self) -> str:
+ h = "%s" # type: str
if self.a_blob:
h %= self.a_blob.path
elif self.b_blob:
h %= self.b_blob.path
- msg = ''
+ msg = '' # type: str
line = None # temp line
line_length = 0 # line length
for b, n in zip((self.a_blob, self.b_blob), ('lhs', 'rhs')):
@@ -354,7 +372,7 @@ class Diff(object):
if self.diff:
msg += '\n---'
try:
- msg += self.diff.decode(defenc)
+ msg += self.diff.decode(defenc) if isinstance(self.diff, bytes) else self.diff
except UnicodeDecodeError:
msg += 'OMITTED BINARY DATA'
# end handle encoding
@@ -368,36 +386,36 @@ class Diff(object):
return res
@property
- def a_path(self):
+ def a_path(self) -> Optional[str]:
return self.a_rawpath.decode(defenc, 'replace') if self.a_rawpath else None
@property
- def b_path(self):
+ def b_path(self) -> Optional[str]:
return self.b_rawpath.decode(defenc, 'replace') if self.b_rawpath else None
@property
- def rename_from(self):
+ def rename_from(self) -> Optional[str]:
return self.raw_rename_from.decode(defenc, 'replace') if self.raw_rename_from else None
@property
- def rename_to(self):
+ def rename_to(self) -> Optional[str]:
return self.raw_rename_to.decode(defenc, 'replace') if self.raw_rename_to else None
@property
- def renamed(self):
+ def renamed(self) -> bool:
""":returns: True if the blob of our diff has been renamed
:note: This property is deprecated, please use ``renamed_file`` instead.
"""
return self.renamed_file
@property
- def renamed_file(self):
+ def renamed_file(self) -> bool:
""":returns: True if the blob of our diff has been renamed
"""
return self.rename_from != self.rename_to
@classmethod
- def _pick_best_path(cls, path_match, rename_match, path_fallback_match):
+ def _pick_best_path(cls, path_match: bytes, rename_match: bytes, path_fallback_match: bytes) -> Optional[bytes]:
if path_match:
return decode_path(path_match)
@@ -410,21 +428,23 @@ class Diff(object):
return None
@classmethod
- def _index_from_patch_format(cls, repo, proc):
+ def _index_from_patch_format(cls, repo: 'Repo', proc: TBD) -> DiffIndex:
"""Create a new DiffIndex from the given text which must be in patch format
:param repo: is the repository we are operating on - it is required
:param stream: result of 'git diff' as a stream (supporting file protocol)
:return: git.DiffIndex """
## FIXME: Here SLURPING raw, need to re-phrase header-regexes linewise.
- text = []
- handle_process_output(proc, text.append, None, finalize_process, decode_streams=False)
+ text_list = [] # type: List[bytes]
+ handle_process_output(proc, text_list.append, None, finalize_process, decode_streams=False)
# for now, we have to bake the stream
- text = b''.join(text)
+ text = b''.join(text_list)
index = DiffIndex()
previous_header = None
header = None
+ a_path, b_path = None, None # for mypy
+ a_mode, b_mode = None, None # for mypy
for _header in cls.re_header.finditer(text):
a_path_fallback, b_path_fallback, \
old_mode, new_mode, \
@@ -464,14 +484,14 @@ class Diff(object):
previous_header = _header
header = _header
# end for each header we parse
- if index:
+ if index and header:
index[-1].diff = text[header.end():]
# end assign last diff
return index
@classmethod
- def _index_from_raw_format(cls, repo, proc):
+ def _index_from_raw_format(cls, repo: 'Repo', proc: TBD) -> DiffIndex:
"""Create a new DiffIndex from the given stream which must be in raw format.
:return: git.DiffIndex"""
# handles
@@ -479,12 +499,13 @@ class Diff(object):
index = DiffIndex()
- def handle_diff_line(lines):
- lines = lines.decode(defenc)
+ def handle_diff_line(lines_bytes: bytes) -> None:
+ lines = lines_bytes.decode(defenc)
for line in lines.split(':')[1:]:
meta, _, path = line.partition('\x00')
path = path.rstrip('\x00')
+ a_blob_id, b_blob_id = None, None # Type: Optional[str]
old_mode, new_mode, a_blob_id, b_blob_id, _change_type = meta.split(None, 4)
# Change type can be R100
# R: status letter
@@ -504,20 +525,20 @@ class Diff(object):
# NOTE: We cannot conclude from the existence of a blob to change type
# as diffs with the working do not have blobs yet
if change_type == 'D':
- b_blob_id = None
+ b_blob_id = None # Optional[str]
deleted_file = True
elif change_type == 'A':
a_blob_id = None
new_file = True
elif change_type == 'C':
copied_file = True
- a_path, b_path = path.split('\x00', 1)
- a_path = a_path.encode(defenc)
- b_path = b_path.encode(defenc)
+ a_path_str, b_path_str = path.split('\x00', 1)
+ a_path = a_path_str.encode(defenc)
+ b_path = b_path_str.encode(defenc)
elif change_type == 'R':
- a_path, b_path = path.split('\x00', 1)
- a_path = a_path.encode(defenc)
- b_path = b_path.encode(defenc)
+ a_path_str, b_path_str = path.split('\x00', 1)
+ a_path = a_path_str.encode(defenc)
+ b_path = b_path_str.encode(defenc)
rename_from, rename_to = a_path, b_path
elif change_type == 'T':
# Nothing to do
diff --git a/git/exc.py b/git/exc.py
index 71a40bdf..c066e5e2 100644
--- a/git/exc.py
+++ b/git/exc.py
@@ -8,6 +8,16 @@
from gitdb.exc import * # NOQA @UnusedWildImport skipcq: PYL-W0401, PYL-W0614
from git.compat import safe_decode
+# typing ----------------------------------------------------
+
+from typing import IO, List, Optional, Tuple, Union, TYPE_CHECKING
+from git.types import PathLike
+
+if TYPE_CHECKING:
+ from git.repo.base import Repo
+
+# ------------------------------------------------------------------
+
class GitError(Exception):
""" Base class for all package exceptions """
@@ -37,7 +47,9 @@ class CommandError(GitError):
#: "'%s' failed%s"
_msg = "Cmd('%s') failed%s"
- def __init__(self, command, status=None, stderr=None, stdout=None):
+ def __init__(self, command: Union[List[str], Tuple[str, ...], str],
+ status: Union[str, None, Exception] = None,
+ stderr: Optional[IO[str]] = None, stdout: Optional[IO[str]] = None) -> None:
if not isinstance(command, (tuple, list)):
command = command.split()
self.command = command
@@ -53,12 +65,12 @@ class CommandError(GitError):
status = "'%s'" % s if isinstance(status, str) else s
self._cmd = safe_decode(command[0])
- self._cmdline = ' '.join(safe_decode(i) for i in command)
+ self._cmdline = ' '.join(str(safe_decode(i)) for i in command)
self._cause = status and " due to: %s" % status or "!"
- self.stdout = stdout and "\n stdout: '%s'" % safe_decode(stdout) or ''
- self.stderr = stderr and "\n stderr: '%s'" % safe_decode(stderr) or ''
+ self.stdout = stdout and "\n stdout: '%s'" % safe_decode(str(stdout)) or ''
+ self.stderr = stderr and "\n stderr: '%s'" % safe_decode(str(stderr)) or ''
- def __str__(self):
+ def __str__(self) -> str:
return (self._msg + "\n cmdline: %s%s%s") % (
self._cmd, self._cause, self._cmdline, self.stdout, self.stderr)
@@ -66,7 +78,8 @@ class CommandError(GitError):
class GitCommandNotFound(CommandError):
"""Thrown if we cannot find the `git` executable in the PATH or at the path given by
the GIT_PYTHON_GIT_EXECUTABLE environment variable"""
- def __init__(self, command, cause):
+
+ def __init__(self, command: Union[List[str], Tuple[str], str], cause: Union[str, Exception]) -> None:
super(GitCommandNotFound, self).__init__(command, cause)
self._msg = "Cmd('%s') not found%s"
@@ -74,7 +87,11 @@ class GitCommandNotFound(CommandError):
class GitCommandError(CommandError):
""" Thrown if execution of the git command fails with non-zero status code. """
- def __init__(self, command, status, stderr=None, stdout=None):
+ def __init__(self, command: Union[List[str], Tuple[str, ...], str],
+ status: Union[str, None, Exception] = None,
+ stderr: Optional[IO[str]] = None,
+ stdout: Optional[IO[str]] = None,
+ ) -> None:
super(GitCommandError, self).__init__(command, status, stderr, stdout)
@@ -92,13 +109,15 @@ class CheckoutError(GitError):
were checked out successfully and hence match the version stored in the
index"""
- def __init__(self, message, failed_files, valid_files, failed_reasons):
+ def __init__(self, message: str, failed_files: List[PathLike], valid_files: List[PathLike],
+ failed_reasons: List[str]) -> None:
+
Exception.__init__(self, message)
self.failed_files = failed_files
self.failed_reasons = failed_reasons
self.valid_files = valid_files
- def __str__(self):
+ def __str__(self) -> str:
return Exception.__str__(self) + ":%s" % self.failed_files
@@ -116,7 +135,8 @@ class HookExecutionError(CommandError):
"""Thrown if a hook exits with a non-zero exit code. It provides access to the exit code and the string returned
via standard output"""
- def __init__(self, command, status, stderr=None, stdout=None):
+ def __init__(self, command: Union[List[str], Tuple[str, ...], str], status: Optional[str],
+ stderr: Optional[IO[str]] = None, stdout: Optional[IO[str]] = None) -> None:
super(HookExecutionError, self).__init__(command, status, stderr, stdout)
self._msg = "Hook('%s') failed%s"
@@ -124,9 +144,9 @@ class HookExecutionError(CommandError):
class RepositoryDirtyError(GitError):
"""Thrown whenever an operation on a repository fails as it has uncommitted changes that would be overwritten"""
- def __init__(self, repo, message):
+ def __init__(self, repo: 'Repo', message: str) -> None:
self.repo = repo
self.message = message
- def __str__(self):
+ def __str__(self) -> str:
return "Operation cannot be performed on %r: %s" % (self.repo, self.message)
diff --git a/git/remote.py b/git/remote.py
index 65916614..4194af1f 100644
--- a/git/remote.py
+++ b/git/remote.py
@@ -34,6 +34,14 @@ from .refs import (
TagReference
)
+# typing-------------------------------------------------------
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from git.repo.base import Repo
+
+# -------------------------------------------------------------
log = logging.getLogger('git.remote')
log.addHandler(logging.NullHandler())
@@ -403,7 +411,7 @@ class Remote(LazyMixin, Iterable):
:param repo: The repository we are a remote of
:param name: the name of the remote, i.e. 'origin'"""
- self.repo = repo
+ self.repo = repo # type: 'Repo'
self.name = name
if is_win:
diff --git a/git/repo/base.py b/git/repo/base.py
index 99e87643..24bc5754 100644
--- a/git/repo/base.py
+++ b/git/repo/base.py
@@ -36,18 +36,9 @@ import gitdb
from git.types import TBD, PathLike
from typing_extensions import Literal
-from typing import (Any,
- BinaryIO,
- Callable,
- Dict,
- Iterator,
- List,
- Mapping,
- Optional,
- TextIO,
- Tuple,
- Type,
- Union,
+from typing import (Any, BinaryIO, Callable, Dict,
+ Iterator, List, Mapping, Optional,
+ TextIO, Tuple, Type, Union,
NamedTuple, cast, TYPE_CHECKING)
if TYPE_CHECKING: # only needed for types
@@ -231,10 +222,11 @@ class Repo(object):
self.git = self.GitCommandWrapperType(self.working_dir)
# special handling, in special times
- args = [osp.join(self.common_dir, 'objects')]
+ rootpath = osp.join(self.common_dir, 'objects')
if issubclass(odbt, GitCmdObjectDB):
- args.append(self.git)
- self.odb = odbt(*args)
+ self.odb = odbt(rootpath, self.git)
+ else:
+ self.odb = odbt(rootpath)
def __enter__(self) -> 'Repo':
return self
@@ -276,13 +268,14 @@ class Repo(object):
# Description property
def _get_description(self) -> str:
- filename = osp.join(self.git_dir, 'description') if self.git_dir else ""
+ if self.git_dir:
+ filename = osp.join(self.git_dir, 'description')
with open(filename, 'rb') as fp:
return fp.read().rstrip().decode(defenc)
def _set_description(self, descr: str) -> None:
-
- filename = osp.join(self.git_dir, 'description') if self.git_dir else ""
+ if self.git_dir:
+ filename = osp.join(self.git_dir, 'description')
with open(filename, 'wb') as fp:
fp.write((descr + '\n').encode(defenc))
@@ -422,7 +415,7 @@ class Repo(object):
:return: newly created Head Reference"""
return Head.create(self, path, commit, force, logmsg)
- def delete_head(self, *heads: HEAD, **kwargs: Any) -> None:
+ def delete_head(self, *heads: 'SymbolicReference', **kwargs: Any) -> None:
"""Delete the given heads
:param kwargs: Additional keyword arguments to be passed to git-branch"""
@@ -468,12 +461,11 @@ class Repo(object):
elif config_level == "global":
return osp.normpath(osp.expanduser("~/.gitconfig"))
elif config_level == "repository":
- if self._common_dir:
- return osp.normpath(osp.join(self._common_dir, "config"))
- elif self.git_dir:
- return osp.normpath(osp.join(self.git_dir, "config"))
- else:
+ repo_dir = self._common_dir or self.git_dir
+ if not repo_dir:
raise NotADirectoryError
+ else:
+ return osp.normpath(osp.join(repo_dir, "config"))
raise ValueError("Invalid configuration level: %r" % config_level)
@@ -514,7 +506,7 @@ class Repo(object):
return GitConfigParser(self._get_config_path(config_level), read_only=False, repo=self)
def commit(self, rev: Optional[TBD] = None
- ) -> Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree', None]:
+ ) -> Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree']:
"""The Commit object for the specified revision
:param rev: revision specifier, see git-rev-parse for viable options.
@@ -619,11 +611,13 @@ class Repo(object):
return True
def _get_daemon_export(self) -> bool:
- filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) if self.git_dir else ""
+ if self.git_dir:
+ filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE)
return osp.exists(filename)
def _set_daemon_export(self, value: object) -> None:
- filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) if self.git_dir else ""
+ if self.git_dir:
+ filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE)
fileexists = osp.exists(filename)
if value and not fileexists:
touch(filename)
@@ -639,7 +633,8 @@ class Repo(object):
"""The list of alternates for this repo from which objects can be retrieved
:return: list of strings being pathnames of alternates"""
- alternates_path = osp.join(self.git_dir, 'objects', 'info', 'alternates') if self.git_dir else ""
+ if self.git_dir:
+ alternates_path = osp.join(self.git_dir, 'objects', 'info', 'alternates')
if osp.exists(alternates_path):
with open(alternates_path, 'rb') as f:
@@ -1142,7 +1137,8 @@ class Repo(object):
None if we are not currently rebasing.
"""
- rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD") if self.git_dir else ""
+ if self.git_dir:
+ rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD")
if not osp.isfile(rebase_head_file):
return None
return self.commit(open(rebase_head_file, "rt").readline().strip())
diff --git a/git/util.py b/git/util.py
index 04c96789..0f475a46 100644
--- a/git/util.py
+++ b/git/util.py
@@ -3,6 +3,7 @@
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
+
import contextlib
from functools import wraps
import getpass
@@ -17,7 +18,19 @@ from sys import maxsize
import time
from unittest import SkipTest
-from gitdb.util import (# NOQA @IgnorePep8
+# typing ---------------------------------------------------------
+
+from typing import (Any, AnyStr, BinaryIO, Callable, Dict, Generator, IO, List,
+ NoReturn, Optional, Pattern, Sequence, Tuple, Union, cast, TYPE_CHECKING)
+if TYPE_CHECKING:
+ from git.remote import Remote
+ from git.repo.base import Repo
+from .types import PathLike, TBD
+
+# ---------------------------------------------------------------------
+
+
+from gitdb.util import ( # NOQA @IgnorePep8
make_sha,
LockedFD, # @UnusedImport
file_contents_ro, # @UnusedImport
@@ -29,7 +42,7 @@ from gitdb.util import (# NOQA @IgnorePep8
hex_to_bin, # @UnusedImport
)
-from git.compat import is_win
+from .compat import is_win
import os.path as osp
from .exc import InvalidGitRepositoryError
@@ -47,6 +60,9 @@ __all__ = ["stream_copy", "join_path", "to_native_path_linux",
log = logging.getLogger(__name__)
+# types############################################################
+
+
#: We need an easy way to see if Appveyor TCs start failing,
#: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy,
#: till then, we wish to hide them.
@@ -56,22 +72,23 @@ HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get('HIDE_WINDOWS_FREEZE_ERRO
#{ Utility Methods
-def unbare_repo(func):
+def unbare_repo(func: Callable) -> Callable:
"""Methods with this decorator raise InvalidGitRepositoryError if they
encounter a bare repository"""
@wraps(func)
- def wrapper(self, *args, **kwargs):
+ def wrapper(self: 'Remote', *args: Any, **kwargs: Any) -> TBD:
if self.repo.bare:
raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__)
# END bare method
return func(self, *args, **kwargs)
# END wrapper
+
return wrapper
@contextlib.contextmanager
-def cwd(new_dir):
+def cwd(new_dir: PathLike) -> Generator[PathLike, None, None]:
old_dir = os.getcwd()
os.chdir(new_dir)
try:
@@ -80,13 +97,13 @@ def cwd(new_dir):
os.chdir(old_dir)
-def rmtree(path):
+def rmtree(path: PathLike) -> None:
"""Remove the given recursively.
:note: we use shutil rmtree but adjust its behaviour to see whether files that
couldn't be deleted are read-only. Windows will not remove them in that case"""
- def onerror(func, path, exc_info):
+ def onerror(func: Callable, path: PathLike, exc_info: TBD) -> None:
# Is the error an access error ?
os.chmod(path, stat.S_IWUSR)
@@ -100,7 +117,7 @@ def rmtree(path):
return shutil.rmtree(path, False, onerror)
-def rmfile(path):
+def rmfile(path: PathLike) -> None:
"""Ensure file deleted also on *Windows* where read-only files need special treatment."""
if osp.isfile(path):
if is_win:
@@ -108,7 +125,7 @@ def rmfile(path):
os.remove(path)
-def stream_copy(source, destination, chunk_size=512 * 1024):
+def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int:
"""Copy all data from the source stream into the destination stream in chunks
of size chunk_size
@@ -124,11 +141,12 @@ def stream_copy(source, destination, chunk_size=512 * 1024):
return br
-def join_path(a, *p):
+def join_path(a: PathLike, *p: PathLike) -> PathLike:
"""Join path tokens together similar to osp.join, but always use
'/' instead of possibly '\' on windows."""
- path = a
+ path = str(a)
for b in p:
+ b = str(b)
if not b:
continue
if b.startswith('/'):
@@ -142,22 +160,24 @@ def join_path(a, *p):
if is_win:
- def to_native_path_windows(path):
+ def to_native_path_windows(path: PathLike) -> PathLike:
+ path = str(path)
return path.replace('/', '\\')
- def to_native_path_linux(path):
+ def to_native_path_linux(path: PathLike) -> PathLike:
+ path = str(path)
return path.replace('\\', '/')
__all__.append("to_native_path_windows")
to_native_path = to_native_path_windows
else:
# no need for any work on linux
- def to_native_path_linux(path):
+ def to_native_path_linux(path: PathLike) -> PathLike:
return path
to_native_path = to_native_path_linux
-def join_path_native(a, *p):
+def join_path_native(a: PathLike, *p: PathLike) -> PathLike:
"""
As join path, but makes sure an OS native path is returned. This is only
needed to play it safe on my dear windows and to assure nice paths that only
@@ -165,7 +185,7 @@ def join_path_native(a, *p):
return to_native_path(join_path(a, *p))
-def assure_directory_exists(path, is_file=False):
+def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool:
"""Assure that the directory pointed to by path exists.
:param is_file: If True, path is assumed to be a file and handled correctly.
@@ -180,18 +200,18 @@ def assure_directory_exists(path, is_file=False):
return False
-def _get_exe_extensions():
+def _get_exe_extensions() -> Sequence[str]:
PATHEXT = os.environ.get('PATHEXT', None)
- return tuple(p.upper() for p in PATHEXT.split(os.pathsep)) \
- if PATHEXT \
- else (('.BAT', 'COM', '.EXE') if is_win else ())
+ return tuple(p.upper() for p in PATHEXT.split(os.pathsep)) if PATHEXT \
+ else ('.BAT', 'COM', '.EXE') if is_win \
+ else ('')
-def py_where(program, path=None):
+def py_where(program: str, path: Optional[PathLike] = None) -> List[str]:
# From: http://stackoverflow.com/a/377028/548792
winprog_exts = _get_exe_extensions()
- def is_exec(fpath):
+ def is_exec(fpath: str) -> bool:
return osp.isfile(fpath) and os.access(fpath, os.X_OK) and (
os.name != 'nt' or not winprog_exts or any(fpath.upper().endswith(ext)
for ext in winprog_exts))
@@ -199,7 +219,7 @@ def py_where(program, path=None):
progs = []
if not path:
path = os.environ["PATH"]
- for folder in path.split(os.pathsep):
+ for folder in str(path).split(os.pathsep):
folder = folder.strip('"')
if folder:
exe_path = osp.join(folder, program)
@@ -209,11 +229,11 @@ def py_where(program, path=None):
return progs
-def _cygexpath(drive, path):
+def _cygexpath(drive: Optional[str], path: PathLike) -> str:
if osp.isabs(path) and not drive:
## Invoked from `cygpath()` directly with `D:Apps\123`?
# It's an error, leave it alone just slashes)
- p = path
+ p = path # convert to str if AnyPath given
else:
p = path and osp.normpath(osp.expandvars(osp.expanduser(path)))
if osp.isabs(p):
@@ -224,8 +244,8 @@ def _cygexpath(drive, path):
p = cygpath(p)
elif drive:
p = '/cygdrive/%s/%s' % (drive.lower(), p)
-
- return p.replace('\\', '/')
+ p_str = str(p) # ensure it is a str and not AnyPath
+ return p_str.replace('\\', '/')
_cygpath_parsers = (
@@ -237,27 +257,30 @@ _cygpath_parsers = (
),
(re.compile(r"\\\\\?\\(\w):[/\\](.*)"),
- _cygexpath,
+ (_cygexpath),
False
),
(re.compile(r"(\w):[/\\](.*)"),
- _cygexpath,
+ (_cygexpath),
False
),
(re.compile(r"file:(.*)", re.I),
(lambda rest_path: rest_path),
- True),
+ True
+ ),
(re.compile(r"(\w{2,}:.*)"), # remote URL, do nothing
(lambda url: url),
- False),
-)
+ False
+ ),
+) # type: Tuple[Tuple[Pattern[str], Callable, bool], ...]
-def cygpath(path):
+def cygpath(path: PathLike) -> PathLike:
"""Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment."""
+ path = str(path) # ensure is str and not AnyPath
if not path.startswith(('/cygdrive', '//')):
for regex, parser, recurse in _cygpath_parsers:
match = regex.match(path)
@@ -275,7 +298,8 @@ def cygpath(path):
_decygpath_regex = re.compile(r"/cygdrive/(\w)(/.*)?")
-def decygpath(path):
+def decygpath(path: PathLike) -> str:
+ path = str(path)
m = _decygpath_regex.match(path)
if m:
drive, rest_path = m.groups()
@@ -286,23 +310,23 @@ def decygpath(path):
#: Store boolean flags denoting if a specific Git executable
#: is from a Cygwin installation (since `cache_lru()` unsupported on PY2).
-_is_cygwin_cache = {}
+_is_cygwin_cache = {} # type: Dict[str, Optional[bool]]
-def is_cygwin_git(git_executable):
+def is_cygwin_git(git_executable: PathLike) -> bool:
if not is_win:
return False
#from subprocess import check_output
-
- is_cygwin = _is_cygwin_cache.get(git_executable)
+ git_executable = str(git_executable)
+ is_cygwin = _is_cygwin_cache.get(git_executable) # type: Optional[bool]
if is_cygwin is None:
is_cygwin = False
try:
git_dir = osp.dirname(git_executable)
if not git_dir:
res = py_where(git_executable)
- git_dir = osp.dirname(res[0]) if res else None
+ git_dir = osp.dirname(res[0]) if res else ""
## Just a name given, not a real path.
uname_cmd = osp.join(git_dir, 'uname')
@@ -318,18 +342,18 @@ def is_cygwin_git(git_executable):
return is_cygwin
-def get_user_id():
+def get_user_id() -> str:
""":return: string identifying the currently active system user as name@node"""
return "%s@%s" % (getpass.getuser(), platform.node())
-def finalize_process(proc, **kwargs):
+def finalize_process(proc: TBD, **kwargs: Any) -> None:
"""Wait for the process (clone, fetch, pull or push) and handle its errors accordingly"""
## TODO: No close proc-streams??
proc.wait(**kwargs)
-def expand_path(p, expand_vars=True):
+def expand_path(p: PathLike, expand_vars: bool = True) -> Optional[PathLike]:
try:
p = osp.expanduser(p)
if expand_vars:
@@ -364,13 +388,13 @@ class RemoteProgress(object):
re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)")
re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)")
- def __init__(self):
- self._seen_ops = []
- self._cur_line = None
- self.error_lines = []
- self.other_lines = []
+ def __init__(self) -> None:
+ self._seen_ops = [] # type: List[TBD]
+ self._cur_line = None # type: Optional[str]
+ self.error_lines = [] # type: List[str]
+ self.other_lines = [] # type: List[str]
- def _parse_progress_line(self, line):
+ def _parse_progress_line(self, line: AnyStr) -> None:
"""Parse progress information from the given line as retrieved by git-push
or git-fetch.
@@ -382,7 +406,12 @@ class RemoteProgress(object):
# Compressing objects: 50% (1/2)
# Compressing objects: 100% (2/2)
# Compressing objects: 100% (2/2), done.
- self._cur_line = line = line.decode('utf-8') if isinstance(line, bytes) else line
+ if isinstance(line, bytes): # mypy argues about ternary assignment
+ line_str = line.decode('utf-8')
+ else:
+ line_str = line
+ self._cur_line = line_str
+
if self.error_lines or self._cur_line.startswith(('error:', 'fatal:')):
self.error_lines.append(self._cur_line)
return
@@ -390,25 +419,25 @@ class RemoteProgress(object):
# find escape characters and cut them away - regex will not work with
# them as they are non-ascii. As git might expect a tty, it will send them
last_valid_index = None
- for i, c in enumerate(reversed(line)):
+ for i, c in enumerate(reversed(line_str)):
if ord(c) < 32:
# its a slice index
last_valid_index = -i - 1
# END character was non-ascii
# END for each character in line
if last_valid_index is not None:
- line = line[:last_valid_index]
+ line_str = line_str[:last_valid_index]
# END cut away invalid part
- line = line.rstrip()
+ line_str = line_str.rstrip()
cur_count, max_count = None, None
- match = self.re_op_relative.match(line)
+ match = self.re_op_relative.match(line_str)
if match is None:
- match = self.re_op_absolute.match(line)
+ match = self.re_op_absolute.match(line_str)
if not match:
- self.line_dropped(line)
- self.other_lines.append(line)
+ self.line_dropped(line_str)
+ self.other_lines.append(line_str)
return
# END could not get match
@@ -437,10 +466,10 @@ class RemoteProgress(object):
# This can't really be prevented, so we drop the line verbosely
# to make sure we get informed in case the process spits out new
# commands at some point.
- self.line_dropped(line)
+ self.line_dropped(line_str)
# Note: Don't add this line to the other lines, as we have to silently
# drop it
- return
+ return None
# END handle op code
# figure out stage
@@ -465,21 +494,22 @@ class RemoteProgress(object):
max_count and float(max_count),
message)
- def new_message_handler(self):
+ def new_message_handler(self) -> Callable[[str], None]:
"""
:return:
a progress handler suitable for handle_process_output(), passing lines on to this Progress
handler in a suitable format"""
- def handler(line):
+ def handler(line: AnyStr) -> None:
return self._parse_progress_line(line.rstrip())
# end
return handler
- def line_dropped(self, line):
+ def line_dropped(self, line: str) -> None:
"""Called whenever a line could not be understood and was therefore dropped."""
pass
- def update(self, op_code, cur_count, max_count=None, message=''):
+ def update(self, op_code: int, cur_count: Union[str, float], max_count: Union[str, float, None] = None,
+ message: str = '',) -> None:
"""Called whenever the progress changes
:param op_code:
@@ -510,11 +540,11 @@ class CallableRemoteProgress(RemoteProgress):
"""An implementation forwarding updates to any callable"""
__slots__ = ('_callable')
- def __init__(self, fn):
+ def __init__(self, fn: Callable) -> None:
self._callable = fn
super(CallableRemoteProgress, self).__init__()
- def update(self, *args, **kwargs):
+ def update(self, *args: Any, **kwargs: Any) -> None:
self._callable(*args, **kwargs)
@@ -539,27 +569,27 @@ class Actor(object):
__slots__ = ('name', 'email')
- def __init__(self, name, email):
+ def __init__(self, name: Optional[str], email: Optional[str]) -> None:
self.name = name
self.email = email
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
return self.name == other.name and self.email == other.email
- def __ne__(self, other):
+ def __ne__(self, other: Any) -> bool:
return not (self == other)
- def __hash__(self):
+ def __hash__(self) -> int:
return hash((self.name, self.email))
- def __str__(self):
- return self.name
+ def __str__(self) -> str:
+ return self.name if self.name else ""
- def __repr__(self):
+ def __repr__(self) -> str:
return '<git.Actor "%s <%s>">' % (self.name, self.email)
@classmethod
- def _from_string(cls, string):
+ def _from_string(cls, string: str) -> 'Actor':
"""Create an Actor from a string.
:param string: is the string, which is expected to be in regular git format
@@ -580,17 +610,17 @@ class Actor(object):
# END handle name/email matching
@classmethod
- def _main_actor(cls, env_name, env_email, config_reader=None):
+ def _main_actor(cls, env_name: str, env_email: str, config_reader: Optional[TBD] = None) -> 'Actor':
actor = Actor('', '')
user_id = None # We use this to avoid multiple calls to getpass.getuser()
- def default_email():
+ def default_email() -> str:
nonlocal user_id
if not user_id:
user_id = get_user_id()
return user_id
- def default_name():
+ def default_name() -> str:
return default_email().split('@')[0]
for attr, evar, cvar, default in (('name', env_name, cls.conf_name, default_name),
@@ -609,7 +639,7 @@ class Actor(object):
return actor
@classmethod
- def committer(cls, config_reader=None):
+ def committer(cls, config_reader: Optional[TBD] = None) -> 'Actor':
"""
:return: Actor instance corresponding to the configured committer. It behaves
similar to the git implementation, such that the environment will override
@@ -620,7 +650,7 @@ class Actor(object):
return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader)
@classmethod
- def author(cls, config_reader=None):
+ def author(cls, config_reader: Optional[TBD] = None) -> 'Actor':
"""Same as committer(), but defines the main author. It may be specified in the environment,
but defaults to the committer"""
return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader)
@@ -654,16 +684,18 @@ class Stats(object):
files = number of changed files as int"""
__slots__ = ("total", "files")
- def __init__(self, total, files):
+ def __init__(self, total: Dict[str, Dict[str, int]], files: Dict[str, Dict[str, int]]):
self.total = total
self.files = files
@classmethod
- def _list_from_string(cls, repo, text):
+ def _list_from_string(cls, repo: 'Repo', text: str) -> 'Stats':
"""Create a Stat object from output retrieved by git-diff.
:return: git.Stat"""
- hsh = {'total': {'insertions': 0, 'deletions': 0, 'lines': 0, 'files': 0}, 'files': {}}
+ hsh = {'total': {'insertions': 0, 'deletions': 0, 'lines': 0, 'files': 0},
+ 'files': {}
+ } # type: Dict[str, Dict[str, TBD]] ## need typeddict or refactor for mypy
for line in text.splitlines():
(raw_insertions, raw_deletions, filename) = line.split("\t")
insertions = raw_insertions != '-' and int(raw_insertions) or 0
@@ -689,25 +721,25 @@ class IndexFileSHA1Writer(object):
:note: Based on the dulwich project"""
__slots__ = ("f", "sha1")
- def __init__(self, f):
+ def __init__(self, f: IO) -> None:
self.f = f
self.sha1 = make_sha(b"")
- def write(self, data):
+ def write(self, data: AnyStr) -> int:
self.sha1.update(data)
return self.f.write(data)
- def write_sha(self):
+ def write_sha(self) -> bytes:
sha = self.sha1.digest()
self.f.write(sha)
return sha
- def close(self):
+ def close(self) -> bytes:
sha = self.write_sha()
self.f.close()
return sha
- def tell(self):
+ def tell(self) -> int:
return self.f.tell()
@@ -721,23 +753,23 @@ class LockFile(object):
Locks will automatically be released on destruction"""
__slots__ = ("_file_path", "_owns_lock")
- def __init__(self, file_path):
+ def __init__(self, file_path: PathLike) -> None:
self._file_path = file_path
self._owns_lock = False
- def __del__(self):
+ def __del__(self) -> None:
self._release_lock()
- def _lock_file_path(self):
+ def _lock_file_path(self) -> str:
""":return: Path to lockfile"""
return "%s.lock" % (self._file_path)
- def _has_lock(self):
+ def _has_lock(self) -> bool:
""":return: True if we have a lock and if the lockfile still exists
:raise AssertionError: if our lock-file does not exist"""
return self._owns_lock
- def _obtain_lock_or_raise(self):
+ def _obtain_lock_or_raise(self) -> None:
"""Create a lock file as flag for other instances, mark our instance as lock-holder
:raise IOError: if a lock was already present or a lock file could not be written"""
@@ -759,12 +791,12 @@ class LockFile(object):
self._owns_lock = True
- def _obtain_lock(self):
+ def _obtain_lock(self) -> None:
"""The default implementation will raise if a lock cannot be obtained.
Subclasses may override this method to provide a different implementation"""
return self._obtain_lock_or_raise()
- def _release_lock(self):
+ def _release_lock(self) -> None:
"""Release our lock if we have one"""
if not self._has_lock():
return
@@ -789,7 +821,7 @@ class BlockingLockFile(LockFile):
can never be obtained."""
__slots__ = ("_check_interval", "_max_block_time")
- def __init__(self, file_path, check_interval_s=0.3, max_block_time_s=maxsize):
+ def __init__(self, file_path: PathLike, check_interval_s: float = 0.3, max_block_time_s: int = maxsize) -> None:
"""Configure the instance
:param check_interval_s:
@@ -801,7 +833,7 @@ class BlockingLockFile(LockFile):
self._check_interval = check_interval_s
self._max_block_time = max_block_time_s
- def _obtain_lock(self):
+ def _obtain_lock(self) -> None:
"""This method blocks until it obtained the lock, or raises IOError if
it ran out of time or if the parent directory was not available anymore.
If this method returns, you are guaranteed to own the lock"""
@@ -848,14 +880,14 @@ class IterableList(list):
can be left out."""
__slots__ = ('_id_attr', '_prefix')
- def __new__(cls, id_attr, prefix=''):
+ def __new__(cls, id_attr: str, prefix: str = '') -> 'IterableList':
return super(IterableList, cls).__new__(cls)
- def __init__(self, id_attr, prefix=''):
+ def __init__(self, id_attr: str, prefix: str = '') -> None:
self._id_attr = id_attr
self._prefix = prefix
- def __contains__(self, attr):
+ def __contains__(self, attr: object) -> bool:
# first try identity match for performance
try:
rval = list.__contains__(self, attr)
@@ -867,13 +899,13 @@ class IterableList(list):
# otherwise make a full name search
try:
- getattr(self, attr)
+ getattr(self, cast(str, attr)) # use cast to silence mypy
return True
except (AttributeError, TypeError):
return False
# END handle membership
- def __getattr__(self, attr):
+ def __getattr__(self, attr: str) -> Any:
attr = self._prefix + attr
for item in self:
if getattr(item, self._id_attr) == attr:
@@ -881,20 +913,24 @@ class IterableList(list):
# END for each item
return list.__getattribute__(self, attr)
- def __getitem__(self, index):
+ def __getitem__(self, index: Union[int, slice, str]) -> Any:
if isinstance(index, int):
return list.__getitem__(self, index)
-
- try:
- return getattr(self, index)
- except AttributeError as e:
- raise IndexError("No item found with id %r" % (self._prefix + index)) from e
+ elif isinstance(index, slice):
+ raise ValueError("Index should be an int or str")
+ else:
+ try:
+ return getattr(self, index)
+ except AttributeError as e:
+ raise IndexError("No item found with id %r" % (self._prefix + index)) from e
# END handle getattr
- def __delitem__(self, index):
- delindex = index
+ def __delitem__(self, index: Union[int, str, slice]) -> None:
+
+ delindex = cast(int, index)
if not isinstance(index, int):
delindex = -1
+ assert not isinstance(index, slice)
name = self._prefix + index
for i, item in enumerate(self):
if getattr(item, self._id_attr) == name:
@@ -917,7 +953,7 @@ class Iterable(object):
_id_attribute_ = "attribute that most suitably identifies your instance"
@classmethod
- def list_items(cls, repo, *args, **kwargs):
+ def list_items(cls, repo: 'Repo', *args: Any, **kwargs: Any) -> 'IterableList':
"""
Find all items of this type - subclasses can specify args and kwargs differently.
If no args are given, subclasses are obliged to return all items if no additional
@@ -931,7 +967,7 @@ class Iterable(object):
return out_list
@classmethod
- def iter_items(cls, repo, *args, **kwargs):
+ def iter_items(cls, repo: 'Repo', *args: Any, **kwargs: Any) -> NoReturn:
"""For more information about the arguments, see list_items
:return: iterator yielding Items"""
raise NotImplementedError("To be implemented by Subclass")
@@ -940,5 +976,5 @@ class Iterable(object):
class NullHandler(logging.Handler):
- def emit(self, record):
+ def emit(self, record: object) -> None:
pass
diff --git a/test-requirements.txt b/test-requirements.txt
index 55206899..0734820f 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -4,4 +4,5 @@ flake8
tox
virtualenv
nose
+gitdb>=4.0.1,<5
typing-extensions>=3.7.4.0