summaryrefslogtreecommitdiff
path: root/git
diff options
context:
space:
mode:
Diffstat (limited to 'git')
-rw-r--r--git/cmd.py4
-rw-r--r--git/config.py14
-rw-r--r--git/index/base.py10
-rw-r--r--git/objects/base.py9
-rw-r--r--git/objects/commit.py117
-rw-r--r--git/objects/submodule/base.py115
-rw-r--r--git/objects/submodule/util.py10
-rw-r--r--git/objects/tree.py71
-rw-r--r--git/objects/util.py157
-rw-r--r--git/refs/head.py8
-rw-r--r--git/refs/symbolic.py4
-rw-r--r--git/remote.py18
-rw-r--r--git/repo/base.py18
-rw-r--r--git/types.py15
-rw-r--r--git/util.py52
15 files changed, 421 insertions, 201 deletions
diff --git a/git/cmd.py b/git/cmd.py
index e078e4a1..7df85581 100644
--- a/git/cmd.py
+++ b/git/cmd.py
@@ -342,11 +342,11 @@ class Git(LazyMixin):
@overload
@classmethod
- def polish_url(cls, url: PathLike, is_cygwin: Union[None, bool] = None) -> str:
+ def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> str:
...
@classmethod
- def polish_url(cls, url: PathLike, is_cygwin: Union[None, bool] = None) -> PathLike:
+ def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> PathLike:
if is_cygwin is None:
is_cygwin = cls.is_cygwin()
diff --git a/git/config.py b/git/config.py
index cc6fcfa4..5c5ceea8 100644
--- a/git/config.py
+++ b/git/config.py
@@ -29,8 +29,6 @@ import os.path as osp
import configparser as cp
-from pathlib import Path
-
# typing-------------------------------------------------------
from typing import Any, Callable, IO, List, Dict, Sequence, TYPE_CHECKING, Tuple, Union, cast, overload
@@ -330,7 +328,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
"Write-ConfigParsers can operate on a single file only, multiple files have been passed")
# END single file check
- if isinstance(self._file_or_files, (str, Path)): # cannot narrow by os._pathlike until 3.5 dropped
+ if isinstance(self._file_or_files, (str, os.PathLike)):
file_or_files = self._file_or_files
else:
file_or_files = cast(IO, self._file_or_files).name
@@ -696,6 +694,16 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
""":return: True if this instance may change the configuration file"""
return self._read_only
+ @overload
+ def get_value(self, section: str, option: str, default: str
+ ) -> str:
+ ...
+
+ @overload
+ def get_value(self, section: str, option: str, default: float
+ ) -> float:
+ ...
+
def get_value(self, section: str, option: str, default: Union[int, float, str, bool, None] = None
) -> Union[int, float, str, bool]:
# can default or return type include bool?
diff --git a/git/index/base.py b/git/index/base.py
index e2b3f8fa..f4ffba7b 100644
--- a/git/index/base.py
+++ b/git/index/base.py
@@ -3,7 +3,7 @@
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-from git.refs.reference import Reference
+
import glob
from io import BytesIO
import os
@@ -74,6 +74,8 @@ from git.types import PathLike, TBD
if TYPE_CHECKING:
from subprocess import Popen
from git.repo import Repo
+ from git.refs.reference import Reference
+ from git.util import Actor
StageType = int
@@ -966,8 +968,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
return out
- def commit(self, message: str, parent_commits=None, head: bool = True, author: str = None,
- committer: str = None, author_date: str = None, commit_date: str = None,
+ def commit(self, message: str, parent_commits=None, head: bool = True, author: Union[None, 'Actor'] = None,
+ committer: Union[None, 'Actor'] = None, author_date: str = None, commit_date: str = None,
skip_hooks: bool = False) -> Commit:
"""Commit the current default index file, creating a commit object.
For more information on the arguments, see tree.commit.
@@ -1191,7 +1193,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
assert "Should not reach this point"
@default_index
- def reset(self, commit: Union[Commit, Reference, str] = 'HEAD', working_tree: bool = False,
+ def reset(self, commit: Union[Commit, 'Reference', str] = 'HEAD', working_tree: bool = False,
paths: Union[None, Iterable[PathLike]] = None,
head: bool = False, **kwargs: Any) -> 'IndexFile':
"""Reset the index to reflect the tree at the given commit. This will not
diff --git a/git/objects/base.py b/git/objects/base.py
index 884f9651..4e2ed493 100644
--- a/git/objects/base.py
+++ b/git/objects/base.py
@@ -17,15 +17,16 @@ from .util import get_object_type_by_name
from typing import Any, TYPE_CHECKING, Optional, Union
-from git.types import PathLike
+from git.types import PathLike, Commit_ish
if TYPE_CHECKING:
from git.repo import Repo
from gitdb.base import OStream
from .tree import Tree
from .blob import Blob
- from .tag import TagObject
- from .commit import Commit
+ from .submodule.base import Submodule
+
+IndexObjUnion = Union['Tree', 'Blob', 'Submodule']
# --------------------------------------------------------------------------
@@ -71,7 +72,7 @@ class Object(LazyMixin):
return repo.rev_parse(str(id))
@classmethod
- def new_from_sha(cls, repo: 'Repo', sha1: bytes) -> Union['Commit', 'TagObject', 'Tree', 'Blob']:
+ def new_from_sha(cls, repo: 'Repo', sha1: bytes) -> Commit_ish:
"""
:return: new object instance of a type appropriate to represent the given
binary sha1
diff --git a/git/objects/commit.py b/git/objects/commit.py
index 0b707450..81978ae8 100644
--- a/git/objects/commit.py
+++ b/git/objects/commit.py
@@ -3,12 +3,12 @@
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-
+import datetime
+from subprocess import Popen
from gitdb import IStream
from git.util import (
hex_to_bin,
Actor,
- IterableObj,
Stats,
finalize_process
)
@@ -17,8 +17,8 @@ from git.diff import Diffable
from .tree import Tree
from . import base
from .util import (
- Traversable,
Serializable,
+ TraversableIterableObj,
parse_date,
altz_to_utctz_str,
parse_actor_and_date,
@@ -36,18 +36,25 @@ import os
from io import BytesIO
import logging
-from typing import List, Tuple, Union, TYPE_CHECKING
+
+# typing ------------------------------------------------------------------
+
+from typing import Any, IO, Iterator, List, Sequence, Tuple, Union, TYPE_CHECKING
+
+from git.types import PathLike, TypeGuard
if TYPE_CHECKING:
from git.repo import Repo
+# ------------------------------------------------------------------------
+
log = logging.getLogger('git.objects.commit')
log.addHandler(logging.NullHandler())
__all__ = ('Commit', )
-class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
+class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
"""Wraps a git Commit object.
@@ -73,10 +80,17 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
"message", "parents", "encoding", "gpgsig")
_id_attribute_ = "hexsha"
- def __init__(self, repo, binsha, tree=None, author=None, authored_date=None, author_tz_offset=None,
- committer=None, committed_date=None, committer_tz_offset=None,
- message=None, parents: Union[Tuple['Commit', ...], List['Commit'], None] = None,
- encoding=None, gpgsig=None):
+ def __init__(self, repo: 'Repo', binsha: bytes, tree: 'Tree' = None,
+ author: Union[Actor, None] = None,
+ authored_date: Union[int, None] = None,
+ author_tz_offset: Union[None, float] = None,
+ committer: Union[Actor, None] = None,
+ committed_date: Union[int, None] = None,
+ committer_tz_offset: Union[None, float] = None,
+ message: Union[str, bytes, None] = None,
+ parents: Union[Sequence['Commit'], None] = None,
+ encoding: Union[str, None] = None,
+ gpgsig: Union[str, None] = None) -> None:
"""Instantiate a new Commit. All keyword arguments taking None as default will
be implicitly set on first query.
@@ -139,7 +153,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
self.gpgsig = gpgsig
@classmethod
- def _get_intermediate_items(cls, commit: 'Commit') -> Tuple['Commit', ...]: # type: ignore ## cos overriding super
+ def _get_intermediate_items(cls, commit: 'Commit') -> Tuple['Commit', ...]:
return tuple(commit.parents)
@classmethod
@@ -158,7 +172,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
istream = repo.odb.store(IStream(cls.type, streamlen, stream))
return istream.binsha
- def replace(self, **kwargs):
+ def replace(self, **kwargs: Any) -> 'Commit':
'''Create new commit object from existing commit object.
Any values provided as keyword arguments will replace the
@@ -177,7 +191,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
return new_commit
- def _set_cache_(self, attr):
+ def _set_cache_(self, attr: str) -> None:
if attr in Commit.__slots__:
# read the data in a chunk, its faster - then provide a file wrapper
_binsha, _typename, self.size, stream = self.repo.odb.stream(self.binsha)
@@ -187,19 +201,22 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
# END handle attrs
@property
- def authored_datetime(self):
+ def authored_datetime(self) -> 'datetime.datetime':
return from_timestamp(self.authored_date, self.author_tz_offset)
@property
- def committed_datetime(self):
+ def committed_datetime(self) -> 'datetime.datetime':
return from_timestamp(self.committed_date, self.committer_tz_offset)
@property
- def summary(self):
+ def summary(self) -> Union[str, bytes]:
""":return: First line of the commit message"""
- return self.message.split('\n', 1)[0]
+ if isinstance(self.message, str):
+ return self.message.split('\n', 1)[0]
+ else:
+ return self.message.split(b'\n', 1)[0]
- def count(self, paths='', **kwargs):
+ def count(self, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any) -> int:
"""Count the number of commits reachable from this commit
:param paths:
@@ -217,7 +234,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
return len(self.repo.git.rev_list(self.hexsha, **kwargs).splitlines())
@property
- def name_rev(self):
+ def name_rev(self) -> str:
"""
:return:
String describing the commits hex sha based on the closest Reference.
@@ -225,7 +242,9 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
return self.repo.git.name_rev(self)
@classmethod
- def iter_items(cls, repo, rev, paths='', **kwargs):
+ def iter_items(cls, repo: 'Repo', rev: str, # type: ignore
+ paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any
+ ) -> Iterator['Commit']:
"""Find all commits matching the given criteria.
:param repo: is the Repo
@@ -245,15 +264,23 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
# use -- in any case, to prevent possibility of ambiguous arguments
# see https://github.com/gitpython-developers/GitPython/issues/264
- args = ['--']
+
+ args_list: List[PathLike] = ['--']
+
if paths:
- args.extend((paths, ))
+ paths_tup: Tuple[PathLike, ...]
+ if isinstance(paths, (str, os.PathLike)):
+ paths_tup = (paths, )
+ else:
+ paths_tup = tuple(paths)
+
+ args_list.extend(paths_tup)
# END if paths
- proc = repo.git.rev_list(rev, args, as_process=True, **kwargs)
+ proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs)
return cls._iter_from_process_or_stream(repo, proc)
- def iter_parents(self, paths='', **kwargs):
+ def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs) -> Iterator['Commit']:
"""Iterate _all_ parents of this commit.
:param paths:
@@ -269,8 +296,8 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
return self.iter_items(self.repo, self, paths, **kwargs)
- @property
- def stats(self):
+ @ property
+ def stats(self) -> Stats:
"""Create a git stat from changes between this commit and its first parent
or from all changes done if this is the very first commit.
@@ -286,17 +313,26 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, '--', numstat=True)
return Stats._list_from_string(self.repo, text)
- @classmethod
- def _iter_from_process_or_stream(cls, repo, proc_or_stream):
+ @ classmethod
+ def _iter_from_process_or_stream(cls, repo: 'Repo', proc_or_stream: Union[Popen, IO]) -> Iterator['Commit']:
"""Parse out commit information into a list of Commit objects
We expect one-line per commit, and parse the actual commit information directly
from our lighting fast object database
:param proc: git-rev-list process instance - one sha per line
:return: iterator returning Commit objects"""
- stream = proc_or_stream
- if not hasattr(stream, 'readline'):
- stream = proc_or_stream.stdout
+
+ def is_proc(inp) -> TypeGuard[Popen]:
+ return hasattr(proc_or_stream, 'wait') and not hasattr(proc_or_stream, 'readline')
+
+ def is_stream(inp) -> TypeGuard[IO]:
+ return hasattr(proc_or_stream, 'readline')
+
+ if is_proc(proc_or_stream):
+ if proc_or_stream.stdout is not None:
+ stream = proc_or_stream.stdout
+ elif is_stream(proc_or_stream):
+ stream = proc_or_stream
readline = stream.readline
while True:
@@ -314,19 +350,21 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
# END for each line in stream
# TODO: Review this - it seems process handling got a bit out of control
# due to many developers trying to fix the open file handles issue
- if hasattr(proc_or_stream, 'wait'):
+ if is_proc(proc_or_stream):
finalize_process(proc_or_stream)
- @classmethod
- def create_from_tree(cls, repo, tree, message, parent_commits=None, head=False, author=None, committer=None,
- author_date=None, commit_date=None):
+ @ classmethod
+ def create_from_tree(cls, repo: 'Repo', tree: Union['Tree', str], message: str,
+ parent_commits: Union[None, List['Commit']] = None, head: bool = False,
+ author: Union[None, Actor] = None, committer: Union[None, Actor] = None,
+ author_date: Union[None, str] = None, commit_date: Union[None, str] = None):
"""Commit the given tree, creating a commit object.
:param repo: Repo object the commit should be part of
:param tree: Tree object or hex or bin sha
the tree of the new commit
:param message: Commit message. It may be an empty string if no message is provided.
- It will be converted to a string in any case.
+ It will be converted to a string , in any case.
:param parent_commits:
Optional Commit objects to use as parents for the new commit.
If empty list, the commit will have no parents at all and become
@@ -460,7 +498,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
write(("encoding %s\n" % self.encoding).encode('ascii'))
try:
- if self.__getattribute__('gpgsig') is not None:
+ if self.__getattribute__('gpgsig'):
write(b"gpgsig")
for sigline in self.gpgsig.rstrip("\n").split("\n"):
write((" " + sigline + "\n").encode('ascii'))
@@ -510,7 +548,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
# now we can have the encoding line, or an empty line followed by the optional
# message.
self.encoding = self.default_encoding
- self.gpgsig = None
+ self.gpgsig = ""
# read headers
enc = next_line
@@ -539,7 +577,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
# decode the authors name
try:
- self.author, self.authored_date, self.author_tz_offset = \
+ (self.author, self.authored_date, self.author_tz_offset) = \
parse_actor_and_date(author_line.decode(self.encoding, 'replace'))
except UnicodeDecodeError:
log.error("Failed to decode author line '%s' using encoding %s", author_line, self.encoding,
@@ -559,7 +597,8 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
try:
self.message = self.message.decode(self.encoding, 'replace')
except UnicodeDecodeError:
- log.error("Failed to decode message '%s' using encoding %s", self.message, self.encoding, exc_info=True)
+ log.error("Failed to decode message '%s' using encoding %s",
+ self.message, self.encoding, exc_info=True)
# END exception handling
return self
diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py
index f0b8babc..c95b66f2 100644
--- a/git/objects/submodule/base.py
+++ b/git/objects/submodule/base.py
@@ -3,6 +3,7 @@ from io import BytesIO
import logging
import os
import stat
+
from unittest import SkipTest
import uuid
@@ -24,9 +25,9 @@ from git.exc import (
BadName
)
from git.objects.base import IndexObject, Object
-from git.objects.util import Traversable
+from git.objects.util import TraversableIterableObj
+
from git.util import (
- IterableObj,
join_path_native,
to_native_path_linux,
RemoteProgress,
@@ -48,6 +49,13 @@ from .util import (
# typing ----------------------------------------------------------------------
+from typing import Dict, TYPE_CHECKING
+from typing import Any, Iterator, Union
+
+from git.types import Commit_ish, PathLike
+
+if TYPE_CHECKING:
+ from git.repo import Repo
# -----------------------------------------------------------------------------
@@ -64,7 +72,7 @@ class UpdateProgress(RemoteProgress):
"""Class providing detailed progress information to the caller who should
derive from it and implement the ``update(...)`` message"""
CLONE, FETCH, UPDWKTREE = [1 << x for x in range(RemoteProgress._num_op_codes, RemoteProgress._num_op_codes + 3)]
- _num_op_codes = RemoteProgress._num_op_codes + 3
+ _num_op_codes: int = RemoteProgress._num_op_codes + 3
__slots__ = ()
@@ -79,7 +87,7 @@ UPDWKTREE = UpdateProgress.UPDWKTREE
# IndexObject comes via util module, its a 'hacky' fix thanks to pythons import
# mechanism which cause plenty of trouble of the only reason for packages and
# modules is refactoring - subpackages shouldn't depend on parent packages
-class Submodule(IndexObject, IterableObj, Traversable):
+class Submodule(IndexObject, TraversableIterableObj):
"""Implements access to a git submodule. They are special in that their sha
represents a commit in the submodule's repository which is to be checked out
@@ -101,7 +109,14 @@ class Submodule(IndexObject, IterableObj, Traversable):
__slots__ = ('_parent_commit', '_url', '_branch_path', '_name', '__weakref__')
_cache_attrs = ('path', '_url', '_branch_path')
- def __init__(self, repo, binsha, mode=None, path=None, name=None, parent_commit=None, url=None, branch_path=None):
+ def __init__(self, repo: 'Repo', binsha: bytes,
+ mode: Union[int, None] = None,
+ path: Union[PathLike, None] = None,
+ name: Union[str, None] = None,
+ parent_commit: Union[Commit_ish, None] = None,
+ url: str = None,
+ branch_path: Union[PathLike, None] = None
+ ) -> None:
"""Initialize this instance with its attributes. We only document the ones
that differ from ``IndexObject``
@@ -121,15 +136,16 @@ class Submodule(IndexObject, IterableObj, Traversable):
if name is not None:
self._name = name
- def _set_cache_(self, attr):
+ def _set_cache_(self, attr: str) -> None:
if attr in ('path', '_url', '_branch_path'):
reader = self.config_reader()
# default submodule values
try:
self.path = reader.get('path')
except cp.NoSectionError as e:
- raise ValueError("This submodule instance does not exist anymore in '%s' file"
- % osp.join(self.repo.working_tree_dir, '.gitmodules')) from e
+ if self.repo.working_tree_dir is not None:
+ raise ValueError("This submodule instance does not exist anymore in '%s' file"
+ % osp.join(self.repo.working_tree_dir, '.gitmodules')) from e
# end
self._url = reader.get('url')
# git-python extension values - optional
@@ -150,33 +166,35 @@ class Submodule(IndexObject, IterableObj, Traversable):
# END handle intermediate items
@classmethod
- def _need_gitfile_submodules(cls, git):
+ def _need_gitfile_submodules(cls, git: Git) -> bool:
return git.version_info[:3] >= (1, 7, 5)
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
"""Compare with another submodule"""
# we may only compare by name as this should be the ID they are hashed with
# Otherwise this type wouldn't be hashable
# return self.path == other.path and self.url == other.url and super(Submodule, self).__eq__(other)
return self._name == other._name
- def __ne__(self, other):
+ def __ne__(self, other: object) -> bool:
"""Compare with another submodule for inequality"""
return not (self == other)
- def __hash__(self):
+ def __hash__(self) -> int:
"""Hash this instance using its logical id, not the sha"""
return hash(self._name)
- def __str__(self):
+ def __str__(self) -> str:
return self._name
- def __repr__(self):
+ def __repr__(self) -> str:
return "git.%s(name=%s, path=%s, url=%s, branch_path=%s)"\
% (type(self).__name__, self._name, self.path, self.url, self.branch_path)
@classmethod
- def _config_parser(cls, repo, parent_commit, read_only):
+ def _config_parser(cls, repo: 'Repo',
+ parent_commit: Union[Commit_ish, None],
+ read_only: bool) -> SubmoduleConfigParser:
""":return: Config Parser constrained to our submodule in read or write mode
:raise IOError: If the .gitmodules file cannot be found, either locally or in the repository
at the given parent commit. Otherwise the exception would be delayed until the first
@@ -189,8 +207,8 @@ class Submodule(IndexObject, IterableObj, Traversable):
# We are most likely in an empty repository, so the HEAD doesn't point to a valid ref
pass
# end handle parent_commit
-
- if not repo.bare and parent_matches_head:
+ fp_module: Union[str, BytesIO]
+ if not repo.bare and parent_matches_head and repo.working_tree_dir:
fp_module = osp.join(repo.working_tree_dir, cls.k_modules_file)
else:
assert parent_commit is not None, "need valid parent_commit in bare repositories"
@@ -219,13 +237,13 @@ class Submodule(IndexObject, IterableObj, Traversable):
# END for each name to delete
@classmethod
- def _sio_modules(cls, parent_commit):
+ def _sio_modules(cls, parent_commit: Commit_ish) -> BytesIO:
""":return: Configuration file as BytesIO - we only access it through the respective blob's data"""
sio = BytesIO(parent_commit.tree[cls.k_modules_file].data_stream.read())
sio.name = cls.k_modules_file
return sio
- def _config_parser_constrained(self, read_only):
+ def _config_parser_constrained(self, read_only: bool) -> SectionConstraint:
""":return: Config Parser constrained to our submodule in read or write mode"""
try:
pc = self.parent_commit
@@ -248,7 +266,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
""":return: Repo instance of newly cloned repository
:param repo: our parent repository
:param url: url to clone from
- :param path: repository-relative path to the submodule checkout location
+ :param path: repository - relative path to the submodule checkout location
:param name: canonical of the submodule
:param kwrags: additinoal arguments given to git.clone"""
module_abspath = cls._module_abspath(repo, path, name)
@@ -269,7 +287,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
@classmethod
def _to_relative_path(cls, parent_repo, path):
- """:return: a path guaranteed to be relative to the given parent-repository
+ """:return: a path guaranteed to be relative to the given parent - repository
:raise ValueError: if path is not contained in the parent repository's working tree"""
path = to_native_path_linux(path)
if path.endswith('/'):
@@ -291,11 +309,11 @@ class Submodule(IndexObject, IterableObj, Traversable):
@classmethod
def _write_git_file_and_module_config(cls, working_tree_dir, module_abspath):
- """Writes a .git file containing a (preferably) relative path to the actual git module repository.
+ """Writes a .git file containing a(preferably) relative path to the actual git module repository.
It is an error if the module_abspath cannot be made into a relative path, relative to the working_tree_dir
:note: will overwrite existing files !
:note: as we rewrite both the git file as well as the module configuration, we might fail on the configuration
- and will not roll back changes done to the git file. This should be a non-issue, but may easily be fixed
+ and will not roll back changes done to the git file. This should be a non - issue, but may easily be fixed
if it becomes one
:param working_tree_dir: directory to write the .git file into
:param module_abspath: absolute path to the bare repository
@@ -316,8 +334,9 @@ class Submodule(IndexObject, IterableObj, Traversable):
#{ Edit Interface
@classmethod
- def add(cls, repo, name, path, url=None, branch=None, no_checkout=False, depth=None, env=None,
- clone_multi_options=None):
+ def add(cls, repo: 'Repo', name: str, path: PathLike, url: Union[str, None] = None,
+ branch=None, no_checkout: bool = False, depth=None, env=None, clone_multi_options=None
+ ) -> 'Submodule':
"""Add a new submodule to the given repository. This will alter the index
as well as the .gitmodules file, but will not create a new commit.
If the submodule already exists, no matter if the configuration differs
@@ -408,7 +427,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
url = urls[0]
else:
# clone new repo
- kwargs = {'n': no_checkout}
+ kwargs: Dict[str, Union[bool, int]] = {'n': no_checkout}
if not branch_is_default:
kwargs['b'] = br.name
# END setup checkout-branch
@@ -468,7 +487,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
was specified for this submodule and the branch existed remotely
:param progress: UpdateProgress instance or None if no progress should be shown
:param dry_run: if True, the operation will only be simulated, but not performed.
- All performed operations are read-only
+ All performed operations are read - only
:param force:
If True, we may reset heads even if the repository in question is dirty. Additinoally we will be allowed
to set a tracking branch which is ahead of its remote branch back into the past or the location of the
@@ -476,7 +495,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
If False, local tracking branches that are in the future of their respective remote branches will simply
not be moved.
:param keep_going: if True, we will ignore but log all errors, and keep going recursively.
- Unless dry_run is set as well, keep_going could cause subsequent/inherited errors you wouldn't see
+ Unless dry_run is set as well, keep_going could cause subsequent / inherited errors you wouldn't see
otherwise.
In conjunction with dry_run, it can be useful to anticipate all errors when updating submodules
:param env: Optional dictionary containing the desired environment variables.
@@ -685,9 +704,9 @@ class Submodule(IndexObject, IterableObj, Traversable):
adjusting our index entry accordingly.
:param module_path: the path to which to move our module in the parent repostory's working tree,
- given as repository-relative or absolute path. Intermediate directories will be created
+ given as repository - relative or absolute path. Intermediate directories will be created
accordingly. If the path already exists, it must be empty.
- Trailing (back)slashes are removed automatically
+ Trailing(back)slashes are removed automatically
:param configuration: if True, the configuration will be adjusted to let
the submodule point to the given path.
:param module: if True, the repository managed by this submodule
@@ -696,7 +715,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
:return: self
:raise ValueError: if the module path existed and was not empty, or was a file
:note: Currently the method is not atomic, and it could leave the repository
- in an inconsistent state if a sub-step fails for some reason
+ in an inconsistent state if a sub - step fails for some reason
"""
if module + configuration < 1:
raise ValueError("You must specify to move at least the module or the configuration of the submodule")
@@ -790,19 +809,19 @@ class Submodule(IndexObject, IterableObj, Traversable):
@unbare_repo
def remove(self, module=True, force=False, configuration=True, dry_run=False):
"""Remove this submodule from the repository. This will remove our entry
- from the .gitmodules file and the entry in the .git/config file.
+ from the .gitmodules file and the entry in the .git / config file.
:param module: If True, the module checkout we point to will be deleted
as well. If the module is currently on a commit which is not part
of any branch in the remote, if the currently checked out branch
working tree, or untracked files,
- is ahead of its tracking branch, if you have modifications in the
+ is ahead of its tracking branch, if you have modifications in the
In case the removal of the repository fails for these reasons, the
submodule status will not have been altered.
- If this submodule has child-modules on its own, these will be deleted
+ If this submodule has child - modules on its own, these will be deleted
prior to touching the own module.
:param force: Enforces the deletion of the module even though it contains
- modifications. This basically enforces a brute-force file system based
+ modifications. This basically enforces a brute - force file system based
deletion.
:param configuration: if True, the submodule is deleted from the configuration,
otherwise it isn't. Although this should be enabled most of the times,
@@ -942,7 +961,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
return self
- def set_parent_commit(self, commit, check=True):
+ def set_parent_commit(self, commit: Union[Commit_ish, None], check=True):
"""Set this instance to use the given commit whose tree is supposed to
contain the .gitmodules blob.
@@ -981,7 +1000,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
# If check is False, we might see a parent-commit that doesn't even contain the submodule anymore.
# in that case, mark our sha as being NULL
try:
- self.binsha = pctree[self.path].binsha
+ self.binsha = pctree[str(self.path)].binsha
except KeyError:
self.binsha = self.NULL_BIN_SHA
# end
@@ -1015,9 +1034,9 @@ class Submodule(IndexObject, IterableObj, Traversable):
"""Rename this submodule
:note: This method takes care of renaming the submodule in various places, such as
- * $parent_git_dir/config
- * $working_tree_dir/.gitmodules
- * (git >=v1.8.0: move submodule repository to new name)
+ * $parent_git_dir / config
+ * $working_tree_dir / .gitmodules
+ * (git >= v1.8.0: move submodule repository to new name)
As .gitmodules will be changed, you would need to make a commit afterwards. The changed .gitmodules file
will already be added to the index
@@ -1091,7 +1110,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
def exists(self):
"""
:return: True if the submodule exists, False otherwise. Please note that
- a submodule may exist (in the .gitmodules file) even though its module
+ a submodule may exist ( in the .gitmodules file) even though its module
doesn't exist on disk"""
# keep attributes for later, and restore them if we have no valid data
# this way we do not actually alter the state of the object
@@ -1131,7 +1150,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
@property
def branch_path(self):
"""
- :return: full (relative) path as string to the branch we would checkout
+ :return: full(relative) path as string to the branch we would checkout
from the remote and track"""
return self._branch_path
@@ -1144,7 +1163,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
@property
def url(self):
- """:return: The url to the repository which our module-repository refers to"""
+ """:return: The url to the repository which our module - repository refers to"""
return self._url
@property
@@ -1160,7 +1179,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
""":return: The name of this submodule. It is used to identify it within the
.gitmodules file.
:note: by default, the name is the path at which to find the submodule, but
- in git-python it should be a unique identifier similar to the identifiers
+ in git - python it should be a unique identifier similar to the identifiers
used for remotes, which allows to change the path of the submodule
easily
"""
@@ -1187,17 +1206,16 @@ class Submodule(IndexObject, IterableObj, Traversable):
#{ Iterable Interface
@classmethod
- def iter_items(cls, repo, parent_commit='HEAD'):
+ def iter_items(cls, repo: 'Repo', parent_commit: Union[Commit_ish, str] = 'HEAD', *Args: Any, **kwargs: Any
+ ) -> Iterator['Submodule']:
""":return: iterator yielding Submodule instances available in the given repository"""
try:
pc = repo.commit(parent_commit) # parent commit instance
parser = cls._config_parser(repo, pc, read_only=True)
except (IOError, BadName):
- return
+ return iter([])
# END handle empty iterator
- rt = pc.tree # root tree
-
for sms in parser.sections():
n = sm_name(sms)
p = parser.get(sms, 'path')
@@ -1210,6 +1228,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
# get the binsha
index = repo.index
try:
+ rt = pc.tree # root tree
sm = rt[p]
except KeyError:
# try the index, maybe it was just added
diff --git a/git/objects/submodule/util.py b/git/objects/submodule/util.py
index b4796b30..5290000b 100644
--- a/git/objects/submodule/util.py
+++ b/git/objects/submodule/util.py
@@ -4,10 +4,12 @@ from git.config import GitConfigParser
from io import BytesIO
import weakref
-from typing import TYPE_CHECKING
+
+from typing import Any, TYPE_CHECKING, Union
if TYPE_CHECKING:
from .base import Submodule
+ from weakref import ReferenceType
__all__ = ('sm_section', 'sm_name', 'mkhead', 'find_first_remote_branch',
'SubmoduleConfigParser')
@@ -58,8 +60,8 @@ class SubmoduleConfigParser(GitConfigParser):
Please note that no mutating method will work in bare mode
"""
- def __init__(self, *args, **kwargs):
- self._smref = None
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ self._smref: Union['ReferenceType[Submodule]', None] = None
self._index = None
self._auto_write = True
super(SubmoduleConfigParser, self).__init__(*args, **kwargs)
@@ -89,7 +91,7 @@ class SubmoduleConfigParser(GitConfigParser):
#} END interface
#{ Overridden Methods
- def write(self):
+ def write(self) -> None:
rval = super(SubmoduleConfigParser, self).write()
self.flush_to_index()
return rval
diff --git a/git/objects/tree.py b/git/objects/tree.py
index 191fe27c..2e8d8a79 100644
--- a/git/objects/tree.py
+++ b/git/objects/tree.py
@@ -3,12 +3,13 @@
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
+
from git.util import join_path
import git.diff as diff
from git.util import to_bin_sha
from . import util
-from .base import IndexObject
+from .base import IndexObject, IndexObjUnion
from .blob import Blob
from .submodule.base import Submodule
@@ -20,14 +21,19 @@ from .fun import (
# typing -------------------------------------------------
-from typing import Callable, Dict, Generic, Iterable, Iterator, List, Tuple, Type, TypeVar, Union, cast, TYPE_CHECKING
+from typing import (Callable, Dict, Generic, Iterable, Iterator, List,
+ Tuple, Type, TypeVar, Union, cast, TYPE_CHECKING)
-from git.types import PathLike
+from git.types import PathLike, TypeGuard
if TYPE_CHECKING:
from git.repo import Repo
from io import BytesIO
+T_Tree_cache = TypeVar('T_Tree_cache', bound=Tuple[bytes, int, str])
+TraversedTreeTup = Union[Tuple[Union['Tree', None], IndexObjUnion,
+ Tuple['Submodule', 'Submodule']]]
+
#--------------------------------------------------------
@@ -35,8 +41,6 @@ cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b)
__all__ = ("TreeModifier", "Tree")
-T_Tree_cache = TypeVar('T_Tree_cache', bound=Union[Tuple[bytes, int, str]])
-
def git_cmp(t1: T_Tree_cache, t2: T_Tree_cache) -> int:
a, b = t1[2], t2[2]
@@ -137,8 +141,12 @@ class TreeModifier(Generic[T_Tree_cache], object):
sha = to_bin_sha(sha)
index = self._index_by_name(name)
- assert isinstance(sha, bytes) and isinstance(mode, int) and isinstance(name, str)
- item = cast(T_Tree_cache, (sha, mode, name)) # use Typeguard from typing-extensions 3.10.0
+ def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[T_Tree_cache]:
+ return isinstance(inp[0], bytes) and isinstance(inp[1], int) and isinstance([inp], str)
+
+ item = (sha, mode, name)
+ assert is_tree_cache(item)
+
if index == -1:
self._cache.append(item)
else:
@@ -194,7 +202,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
symlink_id = 0o12
tree_id = 0o04
- _map_id_to_type: Dict[int, Union[Type[Submodule], Type[Blob], Type['Tree']]] = {
+ _map_id_to_type: Dict[int, Type[IndexObjUnion]] = {
commit_id: Submodule,
blob_id: Blob,
symlink_id: Blob
@@ -205,7 +213,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
super(Tree, self).__init__(repo, binsha, mode, path)
@ classmethod
- def _get_intermediate_items(cls, index_object: 'Tree', # type: ignore
+ def _get_intermediate_items(cls, index_object: 'Tree',
) -> Union[Tuple['Tree', ...], Tuple[()]]:
if index_object.type == "tree":
index_object = cast('Tree', index_object)
@@ -222,7 +230,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
# END handle attribute
def _iter_convert_to_object(self, iterable: Iterable[Tuple[bytes, int, str]]
- ) -> Iterator[Union[Blob, 'Tree', Submodule]]:
+ ) -> Iterator[IndexObjUnion]:
"""Iterable yields tuples of (binsha, mode, name), which will be converted
to the respective object representation"""
for binsha, mode, name in iterable:
@@ -233,7 +241,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e
# END for each item
- def join(self, file: str) -> Union[Blob, 'Tree', Submodule]:
+ def join(self, file: str) -> IndexObjUnion:
"""Find the named object in this tree's contents
:return: ``git.Blob`` or ``git.Tree`` or ``git.Submodule``
@@ -266,7 +274,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
raise KeyError(msg % file)
# END handle long paths
- def __truediv__(self, file: str) -> Union['Tree', Blob, Submodule]:
+ def __truediv__(self, file: str) -> IndexObjUnion:
"""For PY3 only"""
return self.join(file)
@@ -289,24 +297,45 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
See the ``TreeModifier`` for more information on how to alter the cache"""
return TreeModifier(self._cache)
- def traverse(self, predicate=lambda i, d: True,
- prune=lambda i, d: False, depth=-1, branch_first=True,
- visit_once=False, ignore_self=1):
- """For documentation, see util.Traversable.traverse
+ def traverse(self, # type: ignore # overrides super()
+ predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True,
+ prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False,
+ depth: int = -1,
+ branch_first: bool = True,
+ visit_once: bool = False,
+ ignore_self: int = 1,
+ as_edge: bool = False
+ ) -> Union[Iterator[IndexObjUnion],
+ Iterator[TraversedTreeTup]]:
+ """For documentation, see util.Traversable._traverse()
Trees are set to visit_once = False to gain more performance in the traversal"""
- return super(Tree, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self)
+
+ # """
+ # # To typecheck instead of using cast.
+ # import itertools
+ # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]:
+ # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1])
+
+ # ret = super(Tree, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self)
+ # ret_tup = itertools.tee(ret, 2)
+ # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}"
+ # return ret_tup[0]"""
+ return cast(Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]],
+ super(Tree, self).traverse(predicate, prune, depth, # type: ignore
+ branch_first, visit_once, ignore_self))
# List protocol
- def __getslice__(self, i: int, j: int) -> List[Union[Blob, 'Tree', Submodule]]:
+
+ def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]:
return list(self._iter_convert_to_object(self._cache[i:j]))
- def __iter__(self) -> Iterator[Union[Blob, 'Tree', Submodule]]:
+ def __iter__(self) -> Iterator[IndexObjUnion]:
return self._iter_convert_to_object(self._cache)
def __len__(self) -> int:
return len(self._cache)
- def __getitem__(self, item: Union[str, int, slice]) -> Union[Blob, 'Tree', Submodule]:
+ def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion:
if isinstance(item, int):
info = self._cache[item]
return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2]))
@@ -318,7 +347,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
raise TypeError("Invalid index type: %r" % item)
- def __contains__(self, item: Union[IndexObject, PathLike]) -> bool:
+ def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool:
if isinstance(item, IndexObject):
for info in self._cache:
if item.binsha == info[0]:
diff --git a/git/objects/util.py b/git/objects/util.py
index 8b8148a9..0b449b7b 100644
--- a/git/objects/util.py
+++ b/git/objects/util.py
@@ -7,6 +7,7 @@
from git.util import (
IterableList,
+ IterableObj,
Actor
)
@@ -19,18 +20,24 @@ import calendar
from datetime import datetime, timedelta, tzinfo
# typing ------------------------------------------------------------
-from typing import (Any, Callable, Deque, Iterator, TypeVar, TYPE_CHECKING, Tuple, Type, Union, cast)
+from typing import (Any, Callable, Deque, Iterator, NamedTuple, overload, Sequence,
+ TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast)
+
+from git.types import Literal, TypeGuard
if TYPE_CHECKING:
from io import BytesIO, StringIO
- from .submodule.base import Submodule # noqa: F401
from .commit import Commit
from .blob import Blob
from .tag import TagObject
- from .tree import Tree
+ from .tree import Tree, TraversedTreeTup
from subprocess import Popen
-
-T_Iterableobj = TypeVar('T_Iterableobj')
+
+
+T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse()
+
+TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule
+ 'TraversedTreeTup'] # for tree.traverse()
# --------------------------------------------------------------------
@@ -92,7 +99,7 @@ def utctz_to_altz(utctz: str) -> int:
return -1 * int(float(utctz) / 100 * 3600)
-def altz_to_utctz_str(altz: int) -> str:
+def altz_to_utctz_str(altz: float) -> str:
"""As above, but inverses the operation, returning a string that can be used
in commit objects"""
utci = -1 * int((float(altz) / 3600) * 100)
@@ -287,7 +294,7 @@ class Traversable(object):
__slots__ = ()
@classmethod
- def _get_intermediate_items(cls, item):
+ def _get_intermediate_items(cls, item) -> Sequence['Traversable']:
"""
Returns:
Tuple of items connected to the given item.
@@ -299,23 +306,30 @@ class Traversable(object):
"""
raise NotImplementedError("To be implemented in subclass")
- def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList:
+ def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList['TraversableIterableObj']:
"""
:return: IterableList with the results of the traversal as produced by
- traverse()"""
- out: IterableList = IterableList(self._id_attribute_) # type: ignore[attr-defined] # defined in sublcasses
+ traverse()
+ List objects must be IterableObj and Traversable e.g. Commit, Submodule"""
+
+ def is_TraversableIterableObj(inp: 'Traversable') -> TypeGuard['TraversableIterableObj']:
+ # return isinstance(self, TraversableIterableObj)
+ # Can it be anythin else?
+ return isinstance(self, Traversable)
+
+ assert is_TraversableIterableObj(self), f"{type(self)}"
+ out: IterableList['TraversableIterableObj'] = IterableList(self._id_attribute_)
out.extend(self.traverse(*args, **kwargs))
return out
def traverse(self,
- predicate: Callable[[object, int], bool] = lambda i, d: True,
- prune: Callable[[object, int], bool] = lambda i, d: False,
- depth: int = -1,
- branch_first: bool = True,
- visit_once: bool = True, ignore_self: int = 1, as_edge: bool = False
- ) -> Union[Iterator['Traversable'], Iterator[Tuple['Traversable', 'Traversable']]]:
+ predicate: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: True,
+ prune: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: False,
+ depth: int = -1, branch_first: bool = True, visit_once: bool = True,
+ ignore_self: int = 1, as_edge: bool = False
+ ) -> Union[Iterator[Union['Traversable', 'Blob']],
+ Iterator[TraversedTup]]:
""":return: iterator yielding of items found when traversing self
-
:param predicate: f(i,d) returns False if item i at depth d should not be included in the result
:param prune:
@@ -344,21 +358,37 @@ class Traversable(object):
if True, return a pair of items, first being the source, second the
destination, i.e. tuple(src, dest) with the edge spanning from
source to destination"""
+
+ """
+ Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]]
+ Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]]
+ Tree -> Iterator[Union[Blob, Tree, Submodule,
+ Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]]
+
+ ignore_self=True is_edge=True -> Iterator[item]
+ ignore_self=True is_edge=False --> Iterator[item]
+ ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]]
+ ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]"""
+ class TraverseNT(NamedTuple):
+ depth: int
+ item: Union['Traversable', 'Blob']
+ src: Union['Traversable', None]
+
visited = set()
- stack = deque() # type: Deque[Tuple[int, Traversable, Union[Traversable, None]]]
- stack.append((0, self, None)) # self is always depth level 0
+ stack = deque() # type: Deque[TraverseNT]
+ stack.append(TraverseNT(0, self, None)) # self is always depth level 0
- def addToStack(stack: Deque[Tuple[int, 'Traversable', Union['Traversable', None]]],
- item: 'Traversable',
+ def addToStack(stack: Deque[TraverseNT],
+ src_item: 'Traversable',
branch_first: bool,
- depth) -> None:
+ depth: int) -> None:
lst = self._get_intermediate_items(item)
- if not lst:
+ if not lst: # empty list
return None
if branch_first:
- stack.extendleft((depth, i, item) for i in lst)
+ stack.extendleft(TraverseNT(depth, i, src_item) for i in lst)
else:
- reviter = ((depth, lst[i], item) for i in range(len(lst) - 1, -1, -1))
+ reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1))
stack.extend(reviter)
# END addToStack local method
@@ -371,7 +401,12 @@ class Traversable(object):
if visit_once:
visited.add(item)
- rval = (as_edge and (src, item)) or item
+ rval: Union[TraversedTup, 'Traversable', 'Blob']
+ if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item)
+ rval = (src, item)
+ else:
+ rval = item
+
if prune(rval, d):
continue
@@ -405,3 +440,73 @@ class Serializable(object):
:param stream: a file-like object
:return: self"""
raise NotImplementedError("To be implemented in subclass")
+
+
+class TraversableIterableObj(Traversable, IterableObj):
+ __slots__ = ()
+
+ TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj]
+
+ @overload # type: ignore
+ def traverse(self: T_TIobj,
+ predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
+ prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
+ depth: int, branch_first: bool, visit_once: bool,
+ ignore_self: Literal[True],
+ as_edge: Literal[False],
+ ) -> Iterator[T_TIobj]:
+ ...
+
+ @overload
+ def traverse(self: T_TIobj,
+ predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
+ prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
+ depth: int, branch_first: bool, visit_once: bool,
+ ignore_self: Literal[False],
+ as_edge: Literal[True],
+ ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]:
+ ...
+
+ @overload
+ def traverse(self: T_TIobj,
+ predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
+ prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
+ depth: int, branch_first: bool, visit_once: bool,
+ ignore_self: Literal[True],
+ as_edge: Literal[True],
+ ) -> Iterator[Tuple[T_TIobj, T_TIobj]]:
+ ...
+
+ def traverse(self: T_TIobj,
+ predicate: Callable[[Union[T_TIobj, TIobj_tuple], int],
+ bool] = lambda i, d: True,
+ prune: Callable[[Union[T_TIobj, TIobj_tuple], int],
+ bool] = lambda i, d: False,
+ depth: int = -1, branch_first: bool = True, visit_once: bool = True,
+ ignore_self: int = 1, as_edge: bool = False
+ ) -> Union[Iterator[T_TIobj],
+ Iterator[Tuple[T_TIobj, T_TIobj]],
+ Iterator[TIobj_tuple]]:
+ """For documentation, see util.Traversable._traverse()"""
+
+ """
+ # To typecheck instead of using cast.
+ import itertools
+ from git.types import TypeGuard
+ def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]:
+ for x in inp[1]:
+ if not isinstance(x, tuple) and len(x) != 2:
+ if all(isinstance(inner, Commit) for inner in x):
+ continue
+ return True
+
+ ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge)
+ ret_tup = itertools.tee(ret, 2)
+ assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}"
+ return ret_tup[0]
+ """
+ return cast(Union[Iterator[T_TIobj],
+ Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]],
+ super(TraversableIterableObj, self).traverse(
+ predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore
+ ))
diff --git a/git/refs/head.py b/git/refs/head.py
index cc838590..c698004d 100644
--- a/git/refs/head.py
+++ b/git/refs/head.py
@@ -5,6 +5,9 @@ from git.exc import GitCommandError
from .symbolic import SymbolicReference
from .reference import Reference
+from typing import Union
+from git.types import Commit_ish
+
__all__ = ["HEAD", "Head"]
@@ -12,7 +15,7 @@ def strip_quotes(string):
if string.startswith('"') and string.endswith('"'):
return string[1:-1]
return string
-
+
class HEAD(SymbolicReference):
@@ -33,7 +36,7 @@ class HEAD(SymbolicReference):
to contain the previous value of HEAD"""
return SymbolicReference(self.repo, self._ORIG_HEAD_NAME)
- def reset(self, commit='HEAD', index=True, working_tree=False,
+ def reset(self, commit: Union[Commit_ish, SymbolicReference, str] = 'HEAD', index=True, working_tree=False,
paths=None, **kwargs):
"""Reset our HEAD to the given commit optionally synchronizing
the index and working tree. The reference we refer to will be set to
@@ -60,6 +63,7 @@ class HEAD(SymbolicReference):
Additional arguments passed to git-reset.
:return: self"""
+ mode: Union[str, None]
mode = "--soft"
if index:
mode = "--mixed"
diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py
index 64a6591a..ca0691d9 100644
--- a/git/refs/symbolic.py
+++ b/git/refs/symbolic.py
@@ -1,7 +1,8 @@
import os
from git.compat import defenc
-from git.objects import Object, Commit
+from git.objects import Object
+from git.objects.commit import Commit
from git.util import (
join_path,
join_path_native,
@@ -19,7 +20,6 @@ import os.path as osp
from .log import RefLog
-
__all__ = ["SymbolicReference"]
diff --git a/git/remote.py b/git/remote.py
index a6232db3..0ef54ea7 100644
--- a/git/remote.py
+++ b/git/remote.py
@@ -38,14 +38,12 @@ from .refs import (
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, TYPE_CHECKING, Union, overload
-from git.types import PathLike, Literal, TBD, TypeGuard
+from git.types import PathLike, Literal, TBD, TypeGuard, Commit_ish
if TYPE_CHECKING:
from git.repo.base import Repo
- from git.objects.commit import Commit
- from git.objects.blob import Blob
- from git.objects.tree import Tree
- from git.objects.tag import TagObject
+ # from git.objects.commit import Commit
+ # from git.objects import Blob, Tree, TagObject
flagKeyLiteral = Literal[' ', '!', '+', '-', '*', '=', 't', '?']
@@ -130,6 +128,7 @@ class PushInfo(IterableObj, object):
info.summary # summary line providing human readable english text about the push
"""
__slots__ = ('local_ref', 'remote_ref_string', 'flags', '_old_commit_sha', '_remote', 'summary')
+ _id_attribute_ = 'pushinfo'
NEW_TAG, NEW_HEAD, NO_MATCH, REJECTED, REMOTE_REJECTED, REMOTE_FAILURE, DELETED, \
FORCED_UPDATE, FAST_FORWARD, UP_TO_DATE, ERROR = [1 << x for x in range(11)]
@@ -154,7 +153,7 @@ class PushInfo(IterableObj, object):
self.summary = summary
@property
- def old_commit(self) -> Union[str, SymbolicReference, 'Commit', 'TagObject', 'Blob', 'Tree', None]:
+ def old_commit(self) -> Union[str, SymbolicReference, 'Commit_ish', None]:
return self._old_commit_sha and self._remote.repo.commit(self._old_commit_sha) or None
@property
@@ -244,6 +243,7 @@ class FetchInfo(IterableObj, object):
info.remote_ref_path # The path from which we fetched on the remote. It's the remote's version of our info.ref
"""
__slots__ = ('ref', 'old_commit', 'flags', 'note', 'remote_ref_path')
+ _id_attribute_ = 'fetchinfo'
NEW_TAG, NEW_HEAD, HEAD_UPTODATE, TAG_UPDATE, REJECTED, FORCED_UPDATE, \
FAST_FORWARD, ERROR = [1 << x for x in range(8)]
@@ -284,7 +284,7 @@ class FetchInfo(IterableObj, object):
return True
def __init__(self, ref: SymbolicReference, flags: int, note: str = '',
- old_commit: Union['Commit', TagReference, 'Tree', 'Blob', None] = None,
+ old_commit: Union[Commit_ish, None] = None,
remote_ref_path: Optional[PathLike] = None) -> None:
"""
Initialize a new instance
@@ -304,7 +304,7 @@ class FetchInfo(IterableObj, object):
return self.ref.name
@property
- def commit(self) -> 'Commit':
+ def commit(self) -> Commit_ish:
""":return: Commit of our remote ref"""
return self.ref.commit
@@ -349,7 +349,7 @@ class FetchInfo(IterableObj, object):
# END control char exception handling
# parse operation string for more info - makes no sense for symbolic refs, but we parse it anyway
- old_commit = None # type: Union[Commit, TagReference, Tree, Blob, None]
+ old_commit = None # type: Union[Commit_ish, None]
is_tag_operation = False
if 'rejected' in operation:
flags |= cls.REJECTED
diff --git a/git/repo/base.py b/git/repo/base.py
index 52727504..d77b19c1 100644
--- a/git/repo/base.py
+++ b/git/repo/base.py
@@ -36,7 +36,7 @@ import gitdb
# typing ------------------------------------------------------
-from git.types import TBD, PathLike, Lit_config_levels
+from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish
from typing import (Any, BinaryIO, Callable, Dict,
Iterator, List, Mapping, Optional, Sequence,
TextIO, Tuple, Type, Union,
@@ -45,7 +45,7 @@ from typing import (Any, BinaryIO, Callable, Dict,
if TYPE_CHECKING: # only needed for types
from git.util import IterableList
from git.refs.symbolic import SymbolicReference
- from git.objects import TagObject, Blob, Tree # NOQA: F401
+ from git.objects import Tree
# -----------------------------------------------------------
@@ -515,8 +515,8 @@ class Repo(object):
repository = configuration file for this repository only"""
return GitConfigParser(self._get_config_path(config_level), read_only=False, repo=self)
- def commit(self, rev: Optional[TBD] = None
- ) -> Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree']:
+ def commit(self, rev: Optional[str] = None
+ ) -> Commit:
"""The Commit object for the specified revision
:param rev: revision specifier, see git-rev-parse for viable options.
@@ -531,7 +531,7 @@ class Repo(object):
:note: Takes all arguments known to iter_commits method"""
return (c.tree for c in self.iter_commits(*args, **kwargs))
- def tree(self, rev: Union['Commit', 'Tree', str, None] = None) -> 'Tree':
+ def tree(self, rev: Union[Tree_ish, str, None] = None) -> 'Tree':
"""The Tree object for the given treeish revision
Examples::
@@ -574,7 +574,7 @@ class Repo(object):
return Commit.iter_items(self, rev, paths, **kwargs)
def merge_base(self, *rev: TBD, **kwargs: Any
- ) -> List[Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree', None]]:
+ ) -> List[Union['SymbolicReference', Commit_ish, None]]:
"""Find the closest common ancestor for the given revision (e.g. Commits, Tags, References, etc)
:param rev: At least two revs to find the common ancestor for.
@@ -587,7 +587,7 @@ class Repo(object):
raise ValueError("Please specify at least two revs, got only %i" % len(rev))
# end handle input
- res = [] # type: List[Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree', None]]
+ res = [] # type: List[Union['SymbolicReference', Commit_ish, None]]
try:
lines = self.git.merge_base(*rev, **kwargs).splitlines() # List[str]
except GitCommandError as err:
@@ -1036,7 +1036,7 @@ class Repo(object):
multi = None
if multi_options:
multi = ' '.join(multi_options).split(' ')
- proc = git.clone(multi, Git.polish_url(url), clone_path, with_extended_output=True, as_process=True,
+ proc = git.clone(multi, Git.polish_url(str(url)), clone_path, with_extended_output=True, as_process=True,
v=True, universal_newlines=True, **add_progress(kwargs, git, progress))
if progress:
handle_process_output(proc, None, to_progress_instance(progress).new_message_handler(),
@@ -1159,7 +1159,7 @@ class Repo(object):
clazz = self.__class__
return '<%s.%s %r>' % (clazz.__module__, clazz.__name__, self.git_dir)
- def currently_rebasing_on(self) -> Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree', None]:
+ def currently_rebasing_on(self) -> Union['SymbolicReference', Commit_ish, None]:
"""
:return: The commit which is currently being replayed while rebasing.
diff --git a/git/types.py b/git/types.py
index e3b49170..fb63f46e 100644
--- a/git/types.py
+++ b/git/types.py
@@ -4,12 +4,13 @@
import os
import sys
-from typing import Dict, Union, Any
+from typing import Dict, Union, Any, TYPE_CHECKING
+
if sys.version_info[:2] >= (3, 8):
- from typing import Final, Literal, SupportsIndex, TypedDict # noqa: F401
+ from typing import Final, Literal, SupportsIndex, TypedDict, Protocol # noqa: F401
else:
- from typing_extensions import Final, Literal, SupportsIndex, TypedDict # noqa: F401
+ from typing_extensions import Final, Literal, SupportsIndex, TypedDict, Protocol # noqa: F401
if sys.version_info[:2] >= (3, 10):
from typing import TypeGuard # noqa: F401
@@ -18,14 +19,20 @@ else:
if sys.version_info[:2] < (3, 9):
- # Python >= 3.6, < 3.9
PathLike = Union[str, os.PathLike]
elif sys.version_info[:2] >= (3, 9):
# os.PathLike only becomes subscriptable from Python 3.9 onwards
PathLike = Union[str, 'os.PathLike[str]'] # forward ref as pylance complains unless editing with py3.9+
+if TYPE_CHECKING:
+ from git.objects import Commit, Tree, TagObject, Blob
+ # from git.refs import SymbolicReference
+
TBD = Any
+Tree_ish = Union['Commit', 'Tree']
+Commit_ish = Union['Commit', 'TagObject', 'Blob', 'Tree']
+
Lit_config_levels = Literal['system', 'global', 'user', 'repository']
diff --git a/git/util.py b/git/util.py
index eccaa74e..abc82bd3 100644
--- a/git/util.py
+++ b/git/util.py
@@ -4,6 +4,9 @@
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
+from .exc import InvalidGitRepositoryError
+import os.path as osp
+from .compat import is_win
import contextlib
from functools import wraps
import getpass
@@ -20,6 +23,8 @@ from unittest import SkipTest
from urllib.parse import urlsplit, urlunsplit
import warnings
+# from git.objects.util import Traversable
+
# typing ---------------------------------------------------------
from typing import (Any, AnyStr, BinaryIO, Callable, Dict, Generator, IO, Iterator, List,
@@ -32,7 +37,11 @@ if TYPE_CHECKING:
from git.repo.base import Repo
from git.config import GitConfigParser, SectionConstraint
-from .types import PathLike, Literal, SupportsIndex, HSH_TD, Files_TD
+from .types import (Literal, Protocol, SupportsIndex, # because behind py version guards
+ PathLike, HSH_TD, Total_TD, Files_TD) # aliases
+
+T_IterableObj = TypeVar('T_IterableObj', bound='IterableObj', covariant=True)
+# So IterableList[Head] is subtype of IterableList[IterableObj]
# ---------------------------------------------------------------------
@@ -49,11 +58,6 @@ from gitdb.util import ( # NOQA @IgnorePep8
hex_to_bin, # @UnusedImport
)
-from .compat import is_win
-import os.path as osp
-
-from .exc import InvalidGitRepositoryError
-
# NOTE: Some of the unused imports might be used/imported by others.
# Handle once test-cases are back up and running.
@@ -171,7 +175,7 @@ if is_win:
path = str(path)
return path.replace('/', '\\')
- def to_native_path_linux(path: PathLike) -> PathLike:
+ def to_native_path_linux(path: PathLike) -> str:
path = str(path)
return path.replace('\\', '/')
@@ -179,8 +183,9 @@ if is_win:
to_native_path = to_native_path_windows
else:
# no need for any work on linux
- def to_native_path_linux(path: PathLike) -> PathLike:
- return path
+ def to_native_path_linux(path: PathLike) -> str:
+ return str(path)
+
to_native_path = to_native_path_linux
@@ -236,7 +241,7 @@ def py_where(program: str, path: Optional[PathLike] = None) -> List[str]:
return progs
-def _cygexpath(drive: Optional[str], path: PathLike) -> str:
+def _cygexpath(drive: Optional[str], path: str) -> str:
if osp.isabs(path) and not drive:
## Invoked from `cygpath()` directly with `D:Apps\123`?
# It's an error, leave it alone just slashes)
@@ -285,7 +290,7 @@ _cygpath_parsers = (
) # type: Tuple[Tuple[Pattern[str], Callable, bool], ...]
-def cygpath(path: PathLike) -> PathLike:
+def cygpath(path: str) -> str:
"""Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment."""
path = str(path) # ensure is str and not AnyPath.
#Fix to use Paths when 3.5 dropped. or to be just str if only for urls?
@@ -433,7 +438,7 @@ class RemoteProgress(object):
Handler providing an interface to parse progress information emitted by git-push
and git-fetch and to dispatch callbacks allowing subclasses to react to the progress.
"""
- _num_op_codes = 9
+ _num_op_codes: int = 9
BEGIN, END, COUNTING, COMPRESSING, WRITING, RECEIVING, RESOLVING, FINDING_SOURCES, CHECKING_OUT = \
[1 << x for x in range(_num_op_codes)]
STAGE_MASK = BEGIN | END
@@ -746,8 +751,6 @@ class Stats(object):
files = number of changed files as int"""
__slots__ = ("total", "files")
- from git.types import Total_TD, Files_TD
-
def __init__(self, total: Total_TD, files: Dict[PathLike, Files_TD]):
self.total = total
self.files = files
@@ -931,10 +934,7 @@ class BlockingLockFile(LockFile):
# END endless loop
-T = TypeVar('T', bound='IterableObj')
-
-
-class IterableList(List[T]):
+class IterableList(List[T_IterableObj]):
"""
List of iterable objects allowing to query an object by id or by named index::
@@ -1046,7 +1046,7 @@ class Iterable(object):
@classmethod
def list_items(cls, repo, *args, **kwargs):
"""
- Deprecaated, use IterableObj instead.
+ Deprecated, use IterableObj instead.
Find all items of this type - subclasses can specify args and kwargs differently.
If no args are given, subclasses are obliged to return all items if no additional
arguments arg given.
@@ -1066,14 +1066,17 @@ class Iterable(object):
raise NotImplementedError("To be implemented by Subclass")
-class IterableObj():
+class IterableObj(Protocol):
"""Defines an interface for iterable items which is to assure a uniform
- way to retrieve and iterate items within the git repository"""
+ way to retrieve and iterate items within the git repository
+
+ Subclasses = [Submodule, Commit, Reference, PushInfo, FetchInfo, Remote]"""
+
__slots__ = ()
- _id_attribute_ = "attribute that most suitably identifies your instance"
+ _id_attribute_: str
@classmethod
- def list_items(cls, repo: 'Repo', *args: Any, **kwargs: Any) -> IterableList[T]:
+ def list_items(cls, repo: 'Repo', *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]:
"""
Find all items of this type - subclasses can specify args and kwargs differently.
If no args are given, subclasses are obliged to return all items if no additional
@@ -1087,7 +1090,8 @@ class IterableObj():
return out_list
@classmethod
- def iter_items(cls, repo: 'Repo', *args: Any, **kwargs: Any) -> Iterator[T]:
+ def iter_items(cls, repo: 'Repo', *args: Any, **kwargs: Any
+ ) -> Iterator[T_IterableObj]:
# return typed to be compatible with subtypes e.g. Remote
"""For more information about the arguments, see list_items
:return: iterator yielding Items"""