summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <sebastian.thiel@icloud.com>2021-03-12 10:23:12 +0800
committerGitHub <noreply@github.com>2021-03-12 10:23:12 +0800
commit63c31b60acf2286095109106a4e9b2a4289ec91f (patch)
tree01dc891f912671c3b98690e9e7f91cafc8fd325e
parent20f4a9d49b466a18f1af1fdfb480bc4520a4cdc2 (diff)
parent85ebfb2f0dedb18673a2d756274bbcecd1f034c4 (diff)
downloadgitpython-63c31b60acf2286095109106a4e9b2a4289ec91f.tar.gz
Merge pull request #1192 from Yobmod/main
Add initial types to repo/fun.py
-rw-r--r--git/refs/symbolic.py2
-rw-r--r--git/repo/base.py281
-rw-r--r--git/repo/fun.py38
-rw-r--r--requirements.txt1
-rw-r--r--test-requirements.txt1
-rw-r--r--tox.ini3
6 files changed, 201 insertions, 125 deletions
diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py
index bd41cb60..22d9c1d5 100644
--- a/git/refs/symbolic.py
+++ b/git/refs/symbolic.py
@@ -513,7 +513,7 @@ class SymbolicReference(object):
return ref
@classmethod
- def create(cls, repo, path, reference='HEAD', force=False, logmsg=None):
+ def create(cls, repo, path, reference='HEAD', force=False, logmsg=None, **kwargs):
"""Create a new symbolic reference, hence a reference pointing to another reference.
:param repo:
diff --git a/git/repo/base.py b/git/repo/base.py
index 8f1ef0a6..99e87643 100644
--- a/git/repo/base.py
+++ b/git/repo/base.py
@@ -4,7 +4,6 @@
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-from collections import namedtuple
import logging
import os
import re
@@ -33,18 +32,44 @@ from .fun import rev_parse, is_git_dir, find_submodule_git_dir, touch, find_work
import gc
import gitdb
-try:
- import pathlib
-except ImportError:
- pathlib = None
-
+# typing ------------------------------------------------------
+
+from git.types import TBD, PathLike
+from typing_extensions import Literal
+from typing import (Any,
+ BinaryIO,
+ Callable,
+ Dict,
+ Iterator,
+ List,
+ Mapping,
+ Optional,
+ TextIO,
+ Tuple,
+ Type,
+ Union,
+ NamedTuple, cast, TYPE_CHECKING)
+
+if TYPE_CHECKING: # only needed for types
+ from git.util import IterableList
+ from git.refs.symbolic import SymbolicReference
+ from git.objects import TagObject, Blob, Tree # NOQA: F401
+
+Lit_config_levels = Literal['system', 'global', 'user', 'repository']
+
+# -----------------------------------------------------------
log = logging.getLogger(__name__)
-BlameEntry = namedtuple('BlameEntry', ['commit', 'linenos', 'orig_path', 'orig_linenos'])
+__all__ = ('Repo',)
-__all__ = ('Repo',)
+BlameEntry = NamedTuple('BlameEntry', [
+ ('commit', Dict[str, TBD]),
+ ('linenos', range),
+ ('orig_path', Optional[str]),
+ ('orig_linenos', range)]
+)
class Repo(object):
@@ -63,11 +88,11 @@ class Repo(object):
'git_dir' is the .git repository directory, which is always set."""
DAEMON_EXPORT_FILE = 'git-daemon-export-ok'
- git = None # Must exist, or __del__ will fail in case we raise on `__init__()`
- working_dir = None
- _working_tree_dir = None
- git_dir = None
- _common_dir = None
+ git = cast('Git', None) # Must exist, or __del__ will fail in case we raise on `__init__()`
+ working_dir = None # type: Optional[PathLike]
+ _working_tree_dir = None # type: Optional[PathLike]
+ git_dir = None # type: Optional[PathLike]
+ _common_dir = None # type: Optional[PathLike]
# precompiled regex
re_whitespace = re.compile(r'\s+')
@@ -79,13 +104,14 @@ class Repo(object):
# invariants
# represents the configuration level of a configuration file
- config_level = ("system", "user", "global", "repository")
+ config_level = ("system", "user", "global", "repository") # type: Tuple[Lit_config_levels, ...]
# Subclass configuration
# Subclasses may easily bring in their own custom types by placing a constructor or type here
GitCommandWrapperType = Git
- def __init__(self, path=None, odbt=GitCmdObjectDB, search_parent_directories=False, expand_vars=True):
+ def __init__(self, path: Optional[PathLike] = None, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB,
+ search_parent_directories: bool = False, expand_vars: bool = True) -> None:
"""Create a new Repo instance
:param path:
@@ -126,8 +152,9 @@ class Repo(object):
warnings.warn("The use of environment variables in paths is deprecated" +
"\nfor security reasons and may be removed in the future!!")
epath = expand_path(epath, expand_vars)
- if not os.path.exists(epath):
- raise NoSuchPathError(epath)
+ if epath is not None:
+ if not os.path.exists(epath):
+ raise NoSuchPathError(epath)
## Walk up the path to find the `.git` dir.
#
@@ -178,6 +205,7 @@ class Repo(object):
# END while curpath
if self.git_dir is None:
+ self.git_dir = cast(PathLike, self.git_dir)
raise InvalidGitRepositoryError(epath)
self._bare = False
@@ -190,7 +218,7 @@ class Repo(object):
try:
common_dir = open(osp.join(self.git_dir, 'commondir'), 'rt').readlines()[0].strip()
self._common_dir = osp.join(self.git_dir, common_dir)
- except (OSError, IOError):
+ except OSError:
self._common_dir = None
# adjust the wd in case we are actually bare - we didn't know that
@@ -199,7 +227,7 @@ class Repo(object):
self._working_tree_dir = None
# END working dir handling
- self.working_dir = self._working_tree_dir or self.common_dir
+ self.working_dir = self._working_tree_dir or self.common_dir # type: Optional[PathLike]
self.git = self.GitCommandWrapperType(self.working_dir)
# special handling, in special times
@@ -208,19 +236,19 @@ class Repo(object):
args.append(self.git)
self.odb = odbt(*args)
- def __enter__(self):
+ def __enter__(self) -> 'Repo':
return self
- def __exit__(self, exc_type, exc_value, traceback):
+ def __exit__(self, exc_type: TBD, exc_value: TBD, traceback: TBD) -> None:
self.close()
- def __del__(self):
+ def __del__(self) -> None:
try:
self.close()
except Exception:
pass
- def close(self):
+ def close(self) -> None:
if self.git:
self.git.clear_cache()
# Tempfiles objects on Windows are holding references to
@@ -235,25 +263,26 @@ class Repo(object):
if is_win:
gc.collect()
- def __eq__(self, rhs):
- if isinstance(rhs, Repo):
+ def __eq__(self, rhs: object) -> bool:
+ if isinstance(rhs, Repo) and self.git_dir:
return self.git_dir == rhs.git_dir
return False
- def __ne__(self, rhs):
+ def __ne__(self, rhs: object) -> bool:
return not self.__eq__(rhs)
- def __hash__(self):
+ def __hash__(self) -> int:
return hash(self.git_dir)
# Description property
- def _get_description(self):
- filename = osp.join(self.git_dir, 'description')
+ def _get_description(self) -> str:
+ filename = osp.join(self.git_dir, 'description') if self.git_dir else ""
with open(filename, 'rb') as fp:
return fp.read().rstrip().decode(defenc)
- def _set_description(self, descr):
- filename = osp.join(self.git_dir, 'description')
+ def _set_description(self, descr: str) -> None:
+
+ filename = osp.join(self.git_dir, 'description') if self.git_dir else ""
with open(filename, 'wb') as fp:
fp.write((descr + '\n').encode(defenc))
@@ -263,25 +292,31 @@ class Repo(object):
del _set_description
@property
- def working_tree_dir(self):
+ def working_tree_dir(self) -> Optional[PathLike]:
""":return: The working tree directory of our git repository. If this is a bare repository, None is returned.
"""
return self._working_tree_dir
@property
- def common_dir(self):
+ def common_dir(self) -> PathLike:
"""
:return: The git dir that holds everything except possibly HEAD,
FETCH_HEAD, ORIG_HEAD, COMMIT_EDITMSG, index, and logs/."""
- return self._common_dir or self.git_dir
+ if self._common_dir:
+ return self._common_dir
+ elif self.git_dir:
+ return self.git_dir
+ else:
+ # or could return ""
+ raise InvalidGitRepositoryError()
@property
- def bare(self):
+ def bare(self) -> bool:
""":return: True if the repository is bare"""
return self._bare
@property
- def heads(self):
+ def heads(self) -> 'IterableList':
"""A list of ``Head`` objects representing the branch heads in
this repo
@@ -289,7 +324,7 @@ class Repo(object):
return Head.list_items(self)
@property
- def references(self):
+ def references(self) -> 'IterableList':
"""A list of Reference objects representing tags, heads and remote references.
:return: IterableList(Reference, ...)"""
@@ -302,24 +337,24 @@ class Repo(object):
branches = heads
@property
- def index(self):
+ def index(self) -> 'IndexFile':
""":return: IndexFile representing this repository's index.
:note: This property can be expensive, as the returned ``IndexFile`` will be
reinitialized. It's recommended to re-use the object."""
return IndexFile(self)
@property
- def head(self):
+ def head(self) -> 'HEAD':
""":return: HEAD Object pointing to the current head reference"""
return HEAD(self, 'HEAD')
@property
- def remotes(self):
+ def remotes(self) -> 'IterableList':
"""A list of Remote objects allowing to access and manipulate remotes
:return: ``git.IterableList(Remote, ...)``"""
return Remote.list_items(self)
- def remote(self, name='origin'):
+ def remote(self, name: str = 'origin') -> 'Remote':
""":return: Remote with the specified name
:raise ValueError: if no remote with such a name exists"""
r = Remote(self, name)
@@ -330,13 +365,13 @@ class Repo(object):
#{ Submodules
@property
- def submodules(self):
+ def submodules(self) -> 'IterableList':
"""
:return: git.IterableList(Submodule, ...) of direct submodules
available from the current head"""
return Submodule.list_items(self)
- def submodule(self, name):
+ def submodule(self, name: str) -> 'IterableList':
""" :return: Submodule with the given name
:raise ValueError: If no such submodule exists"""
try:
@@ -345,7 +380,7 @@ class Repo(object):
raise ValueError("Didn't find submodule named %r" % name) from e
# END exception handling
- def create_submodule(self, *args, **kwargs):
+ def create_submodule(self, *args: Any, **kwargs: Any) -> Submodule:
"""Create a new submodule
:note: See the documentation of Submodule.add for a description of the
@@ -353,13 +388,13 @@ class Repo(object):
:return: created submodules"""
return Submodule.add(self, *args, **kwargs)
- def iter_submodules(self, *args, **kwargs):
+ def iter_submodules(self, *args: Any, **kwargs: Any) -> Iterator:
"""An iterator yielding Submodule instances, see Traversable interface
for a description of args and kwargs
:return: Iterator"""
return RootModule(self).traverse(*args, **kwargs)
- def submodule_update(self, *args, **kwargs):
+ def submodule_update(self, *args: Any, **kwargs: Any) -> Iterator:
"""Update the submodules, keeping the repository consistent as it will
take the previous state into consideration. For more information, please
see the documentation of RootModule.update"""
@@ -368,41 +403,45 @@ class Repo(object):
#}END submodules
@property
- def tags(self):
+ def tags(self) -> 'IterableList':
"""A list of ``Tag`` objects that are available in this repo
:return: ``git.IterableList(TagReference, ...)`` """
return TagReference.list_items(self)
- def tag(self, path):
+ def tag(self, path: PathLike) -> TagReference:
""":return: TagReference Object, reference pointing to a Commit or Tag
:param path: path to the tag reference, i.e. 0.1.5 or tags/0.1.5 """
return TagReference(self, path)
- def create_head(self, path, commit='HEAD', force=False, logmsg=None):
+ def create_head(self, path: PathLike, commit: str = 'HEAD',
+ force: bool = False, logmsg: Optional[str] = None
+ ) -> 'SymbolicReference':
"""Create a new head within the repository.
For more documentation, please see the Head.create method.
:return: newly created Head Reference"""
return Head.create(self, path, commit, force, logmsg)
- def delete_head(self, *heads, **kwargs):
+ def delete_head(self, *heads: HEAD, **kwargs: Any) -> None:
"""Delete the given heads
:param kwargs: Additional keyword arguments to be passed to git-branch"""
return Head.delete(self, *heads, **kwargs)
- def create_tag(self, path, ref='HEAD', message=None, force=False, **kwargs):
+ def create_tag(self, path: PathLike, ref: str = 'HEAD',
+ message: Optional[str] = None, force: bool = False, **kwargs: Any
+ ) -> TagReference:
"""Create a new tag reference.
For more documentation, please see the TagReference.create method.
:return: TagReference object """
return TagReference.create(self, path, ref, message, force, **kwargs)
- def delete_tag(self, *tags):
+ def delete_tag(self, *tags: TBD) -> None:
"""Delete the given tag references"""
return TagReference.delete(self, *tags)
- def create_remote(self, name, url, **kwargs):
+ def create_remote(self, name: str, url: PathLike, **kwargs: Any) -> Remote:
"""Create a new remote.
For more information, please see the documentation of the Remote.create
@@ -411,11 +450,11 @@ class Repo(object):
:return: Remote reference"""
return Remote.create(self, name, url, **kwargs)
- def delete_remote(self, remote):
+ def delete_remote(self, remote: 'Remote') -> Type['Remote']:
"""Delete the given remote."""
return Remote.remove(self, remote)
- def _get_config_path(self, config_level):
+ def _get_config_path(self, config_level: Lit_config_levels) -> str:
# we do not support an absolute path of the gitconfig on windows ,
# use the global config instead
if is_win and config_level == "system":
@@ -429,11 +468,16 @@ class Repo(object):
elif config_level == "global":
return osp.normpath(osp.expanduser("~/.gitconfig"))
elif config_level == "repository":
- return osp.normpath(osp.join(self._common_dir or self.git_dir, "config"))
+ if self._common_dir:
+ return osp.normpath(osp.join(self._common_dir, "config"))
+ elif self.git_dir:
+ return osp.normpath(osp.join(self.git_dir, "config"))
+ else:
+ raise NotADirectoryError
raise ValueError("Invalid configuration level: %r" % config_level)
- def config_reader(self, config_level=None):
+ def config_reader(self, config_level: Optional[Lit_config_levels] = None) -> GitConfigParser:
"""
:return:
GitConfigParser allowing to read the full git configuration, but not to write it
@@ -454,7 +498,7 @@ class Repo(object):
files = [self._get_config_path(config_level)]
return GitConfigParser(files, read_only=True, repo=self)
- def config_writer(self, config_level="repository"):
+ def config_writer(self, config_level: Lit_config_levels = "repository") -> GitConfigParser:
"""
:return:
GitConfigParser allowing to write values of the specified configuration file level.
@@ -469,7 +513,8 @@ class Repo(object):
repository = configuration file for this repository only"""
return GitConfigParser(self._get_config_path(config_level), read_only=False, repo=self)
- def commit(self, rev=None):
+ def commit(self, rev: Optional[TBD] = None
+ ) -> Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree', None]:
"""The Commit object for the specified revision
:param rev: revision specifier, see git-rev-parse for viable options.
@@ -479,12 +524,12 @@ class Repo(object):
return self.head.commit
return self.rev_parse(str(rev) + "^0")
- def iter_trees(self, *args, **kwargs):
+ def iter_trees(self, *args: Any, **kwargs: Any) -> Iterator['Tree']:
""":return: Iterator yielding Tree objects
:note: Takes all arguments known to iter_commits method"""
return (c.tree for c in self.iter_commits(*args, **kwargs))
- def tree(self, rev=None):
+ def tree(self, rev: Union['Commit', 'Tree', None] = None) -> 'Tree':
"""The Tree object for the given treeish revision
Examples::
@@ -501,7 +546,8 @@ class Repo(object):
return self.head.commit.tree
return self.rev_parse(str(rev) + "^{tree}")
- def iter_commits(self, rev=None, paths='', **kwargs):
+ def iter_commits(self, rev: Optional[TBD] = None, paths: Union[PathLike, List[PathLike]] = '',
+ **kwargs: Any) -> Iterator[Commit]:
"""A list of Commit objects representing the history of a given ref/commit
:param rev:
@@ -525,7 +571,8 @@ class Repo(object):
return Commit.iter_items(self, rev, paths, **kwargs)
- def merge_base(self, *rev, **kwargs):
+ def merge_base(self, *rev: TBD, **kwargs: Any
+ ) -> List[Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree', None]]:
"""Find the closest common ancestor for the given revision (e.g. Commits, Tags, References, etc)
:param rev: At least two revs to find the common ancestor for.
@@ -538,9 +585,9 @@ class Repo(object):
raise ValueError("Please specify at least two revs, got only %i" % len(rev))
# end handle input
- res = []
+ res = [] # type: List[Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree', None]]
try:
- lines = self.git.merge_base(*rev, **kwargs).splitlines()
+ lines = self.git.merge_base(*rev, **kwargs).splitlines() # List[str]
except GitCommandError as err:
if err.status == 128:
raise
@@ -556,7 +603,7 @@ class Repo(object):
return res
- def is_ancestor(self, ancestor_rev, rev):
+ def is_ancestor(self, ancestor_rev: 'Commit', rev: 'Commit') -> bool:
"""Check if a commit is an ancestor of another
:param ancestor_rev: Rev which should be an ancestor
@@ -571,12 +618,12 @@ class Repo(object):
raise
return True
- def _get_daemon_export(self):
- filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE)
+ def _get_daemon_export(self) -> bool:
+ filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) if self.git_dir else ""
return osp.exists(filename)
- def _set_daemon_export(self, value):
- filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE)
+ def _set_daemon_export(self, value: object) -> None:
+ filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) if self.git_dir else ""
fileexists = osp.exists(filename)
if value and not fileexists:
touch(filename)
@@ -588,11 +635,11 @@ class Repo(object):
del _get_daemon_export
del _set_daemon_export
- def _get_alternates(self):
+ def _get_alternates(self) -> List[str]:
"""The list of alternates for this repo from which objects can be retrieved
:return: list of strings being pathnames of alternates"""
- alternates_path = osp.join(self.git_dir, 'objects', 'info', 'alternates')
+ alternates_path = osp.join(self.git_dir, 'objects', 'info', 'alternates') if self.git_dir else ""
if osp.exists(alternates_path):
with open(alternates_path, 'rb') as f:
@@ -600,7 +647,7 @@ class Repo(object):
return alts.strip().splitlines()
return []
- def _set_alternates(self, alts):
+ def _set_alternates(self, alts: List[str]) -> None:
"""Sets the alternates
:param alts:
@@ -622,8 +669,8 @@ class Repo(object):
alternates = property(_get_alternates, _set_alternates,
doc="Retrieve a list of alternates paths or set a list paths to be used as alternates")
- def is_dirty(self, index=True, working_tree=True, untracked_files=False,
- submodules=True, path=None):
+ def is_dirty(self, index: bool = True, working_tree: bool = True, untracked_files: bool = False,
+ submodules: bool = True, path: Optional[PathLike] = None) -> bool:
"""
:return:
``True``, the repository is considered dirty. By default it will react
@@ -639,7 +686,7 @@ class Repo(object):
if not submodules:
default_args.append('--ignore-submodules')
if path:
- default_args.extend(["--", path])
+ default_args.extend(["--", str(path)])
if index:
# diff index against HEAD
if osp.isfile(self.index.path) and \
@@ -658,7 +705,7 @@ class Repo(object):
return False
@property
- def untracked_files(self):
+ def untracked_files(self) -> List[str]:
"""
:return:
list(str,...)
@@ -673,7 +720,7 @@ class Repo(object):
consider caching it yourself."""
return self._get_untracked_files()
- def _get_untracked_files(self, *args, **kwargs):
+ def _get_untracked_files(self, *args: Any, **kwargs: Any) -> List[str]:
# make sure we get all files, not only untracked directories
proc = self.git.status(*args,
porcelain=True,
@@ -697,7 +744,7 @@ class Repo(object):
finalize_process(proc)
return untracked_files
- def ignored(self, *paths):
+ def ignored(self, *paths: PathLike) -> List[PathLike]:
"""Checks if paths are ignored via .gitignore
Doing so using the "git check-ignore" method.
@@ -711,13 +758,13 @@ class Repo(object):
return proc.replace("\\\\", "\\").replace('"', "").split("\n")
@property
- def active_branch(self):
+ def active_branch(self) -> 'SymbolicReference':
"""The name of the currently active branch.
:return: Head to the active branch"""
return self.head.reference
- def blame_incremental(self, rev, file, **kwargs):
+ def blame_incremental(self, rev: TBD, file: TBD, **kwargs: Any) -> Optional[Iterator['BlameEntry']]:
"""Iterator for blame information for the given file at the given revision.
Unlike .blame(), this does not return the actual file's contents, only
@@ -732,7 +779,7 @@ class Repo(object):
should get a continuous range spanning all line numbers in the file.
"""
data = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs)
- commits = {}
+ commits = {} # type: Dict[str, TBD]
stream = (line for line in data.split(b'\n') if line)
while True:
@@ -740,10 +787,11 @@ class Repo(object):
line = next(stream) # when exhausted, causes a StopIteration, terminating this function
except StopIteration:
return
- hexsha, orig_lineno, lineno, num_lines = line.split()
- lineno = int(lineno)
- num_lines = int(num_lines)
- orig_lineno = int(orig_lineno)
+ split_line = line.split() # type: Tuple[str, str, str, str]
+ hexsha, orig_lineno_str, lineno_str, num_lines_str = split_line
+ lineno = int(lineno_str)
+ num_lines = int(num_lines_str)
+ orig_lineno = int(orig_lineno_str)
if hexsha not in commits:
# Now read the next few lines and build up a dict of properties
# for this commit
@@ -791,22 +839,24 @@ class Repo(object):
safe_decode(orig_filename),
range(orig_lineno, orig_lineno + num_lines))
- def blame(self, rev, file, incremental=False, **kwargs):
+ def blame(self, rev: TBD, file: TBD, incremental: bool = False, **kwargs: Any
+ ) -> Union[List[List[Union[Optional['Commit'], List[str]]]], Optional[Iterator[BlameEntry]]]:
"""The blame information for the given file at the given revision.
:param rev: revision specifier, see git-rev-parse for viable options.
:return:
list: [git.Commit, list: [<line>]]
- A list of tuples associating a Commit object with a list of lines that
+ A list of lists associating a Commit object with a list of lines that
changed within the given commit. The Commit objects will be given in order
of appearance."""
if incremental:
return self.blame_incremental(rev, file, **kwargs)
data = self.git.blame(rev, '--', file, p=True, stdout_as_string=False, **kwargs)
- commits = {}
- blames = []
- info = None
+ commits = {} # type: Dict[str, Any]
+ blames = [] # type: List[List[Union[Optional['Commit'], List[str]]]]
+
+ info = {} # type: Dict[str, Any] # use Any until TypedDict available
keepends = True
for line in data.splitlines(keepends):
@@ -891,7 +941,8 @@ class Repo(object):
pass
# end handle line contents
blames[-1][0] = c
- blames[-1][1].append(line)
+ if blames[-1][1] is not None:
+ blames[-1][1].append(line)
info = {'id': sha}
# END if we collected commit info
# END distinguish filename,summary,rest
@@ -900,7 +951,8 @@ class Repo(object):
return blames
@classmethod
- def init(cls, path=None, mkdir=True, odbt=GitCmdObjectDB, expand_vars=True, **kwargs):
+ def init(cls, path: PathLike = None, mkdir: bool = True, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB,
+ expand_vars: bool = True, **kwargs: Any) -> 'Repo':
"""Initialize a git repository at the given path if specified
:param path:
@@ -938,9 +990,10 @@ class Repo(object):
return cls(path, odbt=odbt)
@classmethod
- def _clone(cls, git, url, path, odb_default_type, progress, multi_options=None, **kwargs):
- if progress is not None:
- progress = to_progress_instance(progress)
+ def _clone(cls, git: 'Git', url: PathLike, path: PathLike, odb_default_type: Type[GitCmdObjectDB],
+ progress: Optional[Callable], multi_options: Optional[List[str]] = None, **kwargs: Any
+ ) -> 'Repo':
+ progress_checked = to_progress_instance(progress)
odbt = kwargs.pop('odbt', odb_default_type)
@@ -964,9 +1017,10 @@ class Repo(object):
if multi_options:
multi = ' '.join(multi_options).split(' ')
proc = git.clone(multi, Git.polish_url(url), clone_path, with_extended_output=True, as_process=True,
- v=True, universal_newlines=True, **add_progress(kwargs, git, progress))
- if progress:
- handle_process_output(proc, None, progress.new_message_handler(), finalize_process, decode_streams=False)
+ v=True, universal_newlines=True, **add_progress(kwargs, git, progress_checked))
+ if progress_checked:
+ handle_process_output(proc, None, progress_checked.new_message_handler(),
+ finalize_process, decode_streams=False)
else:
(stdout, stderr) = proc.communicate()
log.debug("Cmd(%s)'s unused stdout: %s", getattr(proc, 'args', ''), stdout)
@@ -974,8 +1028,8 @@ class Repo(object):
# our git command could have a different working dir than our actual
# environment, hence we prepend its working dir if required
- if not osp.isabs(path) and git.working_dir:
- path = osp.join(git._working_dir, path)
+ if not osp.isabs(path):
+ path = osp.join(git._working_dir, path) if git._working_dir is not None else path
repo = cls(path, odbt=odbt)
@@ -993,7 +1047,8 @@ class Repo(object):
# END handle remote repo
return repo
- def clone(self, path, progress=None, multi_options=None, **kwargs):
+ def clone(self, path: PathLike, progress: Optional[Callable] = None,
+ multi_options: Optional[List[str]] = None, **kwargs: Any) -> 'Repo':
"""Create a clone from this repository.
:param path: is the full path of the new repo (traditionally ends with ./<name>.git).
@@ -1011,7 +1066,9 @@ class Repo(object):
return self._clone(self.git, self.common_dir, path, type(self.odb), progress, multi_options, **kwargs)
@classmethod
- def clone_from(cls, url, to_path, progress=None, env=None, multi_options=None, **kwargs):
+ def clone_from(cls, url: PathLike, to_path: PathLike, progress: Optional[Callable] = None,
+ env: Optional[Mapping[str, Any]] = None,
+ multi_options: Optional[List[str]] = None, **kwargs: Any) -> 'Repo':
"""Create a clone from the given URL
:param url: valid git url, see http://www.kernel.org/pub/software/scm/git/docs/git-clone.html#URLS
@@ -1031,7 +1088,8 @@ class Repo(object):
git.update_environment(**env)
return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs)
- def archive(self, ostream, treeish=None, prefix=None, **kwargs):
+ def archive(self, ostream: Union[TextIO, BinaryIO], treeish: Optional[str] = None,
+ prefix: Optional[str] = None, **kwargs: Any) -> 'Repo':
"""Archive the tree at the given revision.
:param ostream: file compatible stream object to which the archive will be written as bytes
@@ -1052,14 +1110,14 @@ class Repo(object):
kwargs['prefix'] = prefix
kwargs['output_stream'] = ostream
path = kwargs.pop('path', [])
+ path = cast(Union[PathLike, List[PathLike], Tuple[PathLike, ...]], path)
if not isinstance(path, (tuple, list)):
path = [path]
# end assure paths is list
-
self.git.archive(treeish, *path, **kwargs)
return self
- def has_separate_working_tree(self):
+ def has_separate_working_tree(self) -> bool:
"""
:return: True if our git_dir is not at the root of our working_tree_dir, but a .git file with a
platform agnositic symbolic link. Our git_dir will be wherever the .git file points to
@@ -1067,21 +1125,24 @@ class Repo(object):
"""
if self.bare:
return False
- return osp.isfile(osp.join(self.working_tree_dir, '.git'))
+ if self.working_tree_dir:
+ return osp.isfile(osp.join(self.working_tree_dir, '.git'))
+ else:
+ return False # or raise Error?
rev_parse = rev_parse
- def __repr__(self):
+ def __repr__(self) -> str:
clazz = self.__class__
return '<%s.%s %r>' % (clazz.__module__, clazz.__name__, self.git_dir)
- def currently_rebasing_on(self):
+ def currently_rebasing_on(self) -> Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree', None]:
"""
:return: The commit which is currently being replayed while rebasing.
None if we are not currently rebasing.
"""
- rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD")
+ rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD") if self.git_dir else ""
if not osp.isfile(rebase_head_file):
return None
return self.commit(open(rebase_head_file, "rt").readline().strip())
diff --git a/git/repo/fun.py b/git/repo/fun.py
index 714d4122..70394081 100644
--- a/git/repo/fun.py
+++ b/git/repo/fun.py
@@ -1,4 +1,5 @@
"""Package with general repository related functions"""
+from git.refs.tag import Tag
import os
import stat
from string import digits
@@ -15,18 +16,28 @@ from gitdb.exc import (
import os.path as osp
from git.cmd import Git
+# Typing ----------------------------------------------------------------------
+
+from typing import AnyStr, Union, Optional, cast, TYPE_CHECKING
+from git.types import PathLike
+if TYPE_CHECKING:
+ from .base import Repo
+ from git.db import GitCmdObjectDB
+ from git.objects import Commit, TagObject, Blob, Tree
+
+# ----------------------------------------------------------------------------
__all__ = ('rev_parse', 'is_git_dir', 'touch', 'find_submodule_git_dir', 'name_to_object', 'short_to_long', 'deref_tag',
'to_commit', 'find_worktree_git_dir')
-def touch(filename):
+def touch(filename: str) -> str:
with open(filename, "ab"):
pass
return filename
-def is_git_dir(d):
+def is_git_dir(d: PathLike) -> bool:
""" This is taken from the git setup.c:is_git_directory
function.
@@ -48,7 +59,7 @@ def is_git_dir(d):
return False
-def find_worktree_git_dir(dotgit):
+def find_worktree_git_dir(dotgit: PathLike) -> Optional[str]:
"""Search for a gitdir for this worktree."""
try:
statbuf = os.stat(dotgit)
@@ -67,7 +78,7 @@ def find_worktree_git_dir(dotgit):
return None
-def find_submodule_git_dir(d):
+def find_submodule_git_dir(d: PathLike) -> Optional[PathLike]:
"""Search for a submodule repo."""
if is_git_dir(d):
return d
@@ -75,7 +86,7 @@ def find_submodule_git_dir(d):
try:
with open(d) as fp:
content = fp.read().rstrip()
- except (IOError, OSError):
+ except IOError:
# it's probably not a file
pass
else:
@@ -92,7 +103,7 @@ def find_submodule_git_dir(d):
return None
-def short_to_long(odb, hexsha):
+def short_to_long(odb: 'GitCmdObjectDB', hexsha: AnyStr) -> Optional[bytes]:
""":return: long hexadecimal sha1 from the given less-than-40 byte hexsha
or None if no candidate could be found.
:param hexsha: hexsha with less than 40 byte"""
@@ -103,14 +114,15 @@ def short_to_long(odb, hexsha):
# END exception handling
-def name_to_object(repo, name, return_ref=False):
+def name_to_object(repo: 'Repo', name: str, return_ref: bool = False
+ ) -> Union[SymbolicReference, 'Commit', 'TagObject', 'Blob', 'Tree']:
"""
:return: object specified by the given name, hexshas ( short and long )
as well as references are supported
:param return_ref: if name specifies a reference, we will return the reference
instead of the object. Otherwise it will raise BadObject or BadName
"""
- hexsha = None
+ hexsha = None # type: Union[None, str, bytes]
# is it a hexsha ? Try the most common ones, which is 7 to 40
if repo.re_hexsha_shortened.match(name):
@@ -150,7 +162,7 @@ def name_to_object(repo, name, return_ref=False):
return Object.new_from_sha(repo, hex_to_bin(hexsha))
-def deref_tag(tag):
+def deref_tag(tag: Tag) -> 'TagObject':
"""Recursively dereference a tag and return the resulting object"""
while True:
try:
@@ -161,7 +173,7 @@ def deref_tag(tag):
return tag
-def to_commit(obj):
+def to_commit(obj: Object) -> Union['Commit', 'TagObject']:
"""Convert the given object to a commit if possible and return it"""
if obj.type == 'tag':
obj = deref_tag(obj)
@@ -172,7 +184,7 @@ def to_commit(obj):
return obj
-def rev_parse(repo, rev):
+def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']:
"""
:return: Object at the given revision, either Commit, Tag, Tree or Blob
:param rev: git-rev-parse compatible revision specification as string, please see
@@ -188,7 +200,7 @@ def rev_parse(repo, rev):
raise NotImplementedError("commit by message search ( regex )")
# END handle search
- obj = None
+ obj = cast(Object, None) # not ideal. Should use guards
ref = None
output_type = "commit"
start = 0
@@ -238,7 +250,7 @@ def rev_parse(repo, rev):
pass # error raised later
# END exception handling
elif output_type in ('', 'blob'):
- if obj.type == 'tag':
+ if obj and obj.type == 'tag':
obj = deref_tag(obj)
else:
# cannot do anything for non-tags
diff --git a/requirements.txt b/requirements.txt
index c4e8340d..626a916a 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1 +1,2 @@
gitdb>=4.0.1,<5
+typing-extensions>=3.7.4.0
diff --git a/test-requirements.txt b/test-requirements.txt
index abda95cf..55206899 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -4,3 +4,4 @@ flake8
tox
virtualenv
nose
+typing-extensions>=3.7.4.0
diff --git a/tox.ini b/tox.ini
index ad126ed4..d9d1594d 100644
--- a/tox.ini
+++ b/tox.ini
@@ -23,6 +23,7 @@ commands = {posargs}
# E266 = too many leading '#' for block comment
# E731 = do not assign a lambda expression, use a def
# W293 = Blank line contains whitespace
-ignore = E265,W293,E266,E731
+# W504 = Line break after operator
+ignore = E265,W293,E266,E731, W504
max-line-length = 120
exclude = .tox,.venv,build,dist,doc,git/ext/