diff options
Diffstat (limited to 'git/objects/commit.py')
-rw-r--r-- | git/objects/commit.py | 339 |
1 files changed, 220 insertions, 119 deletions
diff --git a/git/objects/commit.py b/git/objects/commit.py index 96a2a8e5..137cc620 100644 --- a/git/objects/commit.py +++ b/git/objects/commit.py @@ -6,12 +6,7 @@ import datetime from subprocess import Popen, PIPE from gitdb import IStream -from git.util import ( - hex_to_bin, - Actor, - Stats, - finalize_process -) +from git.util import hex_to_bin, Actor, Stats, finalize_process from git.diff import Diffable from git.cmd import Git @@ -26,13 +21,7 @@ from .util import ( from_timestamp, ) -from time import ( - time, - daylight, - altzone, - timezone, - localtime -) +from time import time, daylight, altzone, timezone, localtime import os from io import BytesIO import logging @@ -40,7 +29,18 @@ import logging # typing ------------------------------------------------------------------ -from typing import Any, IO, Iterator, List, Sequence, Tuple, Union, TYPE_CHECKING, cast, Dict +from typing import ( + Any, + IO, + Iterator, + List, + Sequence, + Tuple, + Union, + TYPE_CHECKING, + cast, + Dict, +) from git.types import PathLike, Literal @@ -50,10 +50,10 @@ if TYPE_CHECKING: # ------------------------------------------------------------------------ -log = logging.getLogger('git.objects.commit') +log = logging.getLogger("git.objects.commit") log.addHandler(logging.NullHandler()) -__all__ = ('Commit', ) +__all__ = ("Commit",) class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): @@ -69,30 +69,44 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): env_committer_date = "GIT_COMMITTER_DATE" # CONFIGURATION KEYS - conf_encoding = 'i18n.commitencoding' + conf_encoding = "i18n.commitencoding" # INVARIANTS default_encoding = "UTF-8" # object configuration - type: Literal['commit'] = "commit" - __slots__ = ("tree", - "author", "authored_date", "author_tz_offset", - "committer", "committed_date", "committer_tz_offset", - "message", "parents", "encoding", "gpgsig") + type: Literal["commit"] = "commit" + __slots__ = ( + "tree", + "author", + "authored_date", + "author_tz_offset", + "committer", + "committed_date", + "committer_tz_offset", + "message", + "parents", + "encoding", + "gpgsig", + ) _id_attribute_ = "hexsha" - def __init__(self, repo: 'Repo', binsha: bytes, tree: Union[Tree, None] = None, - author: Union[Actor, None] = None, - authored_date: Union[int, None] = None, - author_tz_offset: Union[None, float] = None, - committer: Union[Actor, None] = None, - committed_date: Union[int, None] = None, - committer_tz_offset: Union[None, float] = None, - message: Union[str, bytes, None] = None, - parents: Union[Sequence['Commit'], None] = None, - encoding: Union[str, None] = None, - gpgsig: Union[str, None] = None) -> None: + def __init__( + self, + repo: "Repo", + binsha: bytes, + tree: Union[Tree, None] = None, + author: Union[Actor, None] = None, + authored_date: Union[int, None] = None, + author_tz_offset: Union[None, float] = None, + committer: Union[Actor, None] = None, + committed_date: Union[int, None] = None, + committer_tz_offset: Union[None, float] = None, + message: Union[str, bytes, None] = None, + parents: Union[Sequence["Commit"], None] = None, + encoding: Union[str, None] = None, + gpgsig: Union[str, None] = None, + ) -> None: """Instantiate a new Commit. All keyword arguments taking None as default will be implicitly set on first query. @@ -130,7 +144,9 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): super(Commit, self).__init__(repo, binsha) self.binsha = binsha if tree is not None: - assert isinstance(tree, Tree), "Tree needs to be a Tree instance, was %s" % type(tree) + assert isinstance( + tree, Tree + ), "Tree needs to be a Tree instance, was %s" % type(tree) if tree is not None: self.tree = tree if author is not None: @@ -155,16 +171,16 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): self.gpgsig = gpgsig @classmethod - def _get_intermediate_items(cls, commit: 'Commit') -> Tuple['Commit', ...]: + def _get_intermediate_items(cls, commit: "Commit") -> Tuple["Commit", ...]: return tuple(commit.parents) @classmethod - def _calculate_sha_(cls, repo: 'Repo', commit: 'Commit') -> bytes: - '''Calculate the sha of a commit. + def _calculate_sha_(cls, repo: "Repo", commit: "Commit") -> bytes: + """Calculate the sha of a commit. :param repo: Repo object the commit should be part of :param commit: Commit object for which to generate the sha - ''' + """ stream = BytesIO() commit._serialize(stream) @@ -174,18 +190,18 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): istream = repo.odb.store(IStream(cls.type, streamlen, stream)) return istream.binsha - def replace(self, **kwargs: Any) -> 'Commit': - '''Create new commit object from existing commit object. + def replace(self, **kwargs: Any) -> "Commit": + """Create new commit object from existing commit object. Any values provided as keyword arguments will replace the corresponding attribute in the new object. - ''' + """ attrs = {k: getattr(self, k) for k in self.__slots__} for attrname in kwargs: if attrname not in self.__slots__: - raise ValueError('invalid attribute name') + raise ValueError("invalid attribute name") attrs.update(kwargs) new_commit = self.__class__(self.repo, self.NULL_BIN_SHA, **attrs) @@ -214,11 +230,13 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): def summary(self) -> Union[str, bytes]: """:return: First line of the commit message""" if isinstance(self.message, str): - return self.message.split('\n', 1)[0] + return self.message.split("\n", 1)[0] else: - return self.message.split(b'\n', 1)[0] + return self.message.split(b"\n", 1)[0] - def count(self, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any) -> int: + def count( + self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any + ) -> int: """Count the number of commits reachable from this commit :param paths: @@ -232,7 +250,9 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # yes, it makes a difference whether empty paths are given or not in our case # as the empty paths version will ignore merge commits for some reason. if paths: - return len(self.repo.git.rev_list(self.hexsha, '--', paths, **kwargs).splitlines()) + return len( + self.repo.git.rev_list(self.hexsha, "--", paths, **kwargs).splitlines() + ) return len(self.repo.git.rev_list(self.hexsha, **kwargs).splitlines()) @property @@ -244,9 +264,13 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): return self.repo.git.name_rev(self) @classmethod - def iter_items(cls, repo: 'Repo', rev: Union[str, 'Commit', 'SymbolicReference'], # type: ignore - paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any - ) -> Iterator['Commit']: + def iter_items( + cls, + repo: "Repo", + rev: Union[str, "Commit", "SymbolicReference"], # type: ignore + paths: Union[PathLike, Sequence[PathLike]] = "", + **kwargs: Any, + ) -> Iterator["Commit"]: """Find all commits matching the given criteria. :param repo: is the Repo @@ -260,19 +284,21 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): ``skip`` is the number of commits to skip ``since`` all commits since i.e. '1970-01-01' :return: iterator yielding Commit items""" - if 'pretty' in kwargs: - raise ValueError("--pretty cannot be used as parsing expects single sha's only") + if "pretty" in kwargs: + raise ValueError( + "--pretty cannot be used as parsing expects single sha's only" + ) # END handle pretty # use -- in any case, to prevent possibility of ambiguous arguments # see https://github.com/gitpython-developers/GitPython/issues/264 - args_list: List[PathLike] = ['--'] + args_list: List[PathLike] = ["--"] if paths: paths_tup: Tuple[PathLike, ...] if isinstance(paths, (str, os.PathLike)): - paths_tup = (paths, ) + paths_tup = (paths,) else: paths_tup = tuple(paths) @@ -282,37 +308,41 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs) return cls._iter_from_process_or_stream(repo, proc) - def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any) -> Iterator['Commit']: + def iter_parents( + self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any + ) -> Iterator["Commit"]: """Iterate _all_ parents of this commit. :param paths: Optional path or list of paths limiting the Commits to those that contain at least one of the paths :param kwargs: All arguments allowed by git-rev-list - :return: Iterator yielding Commit objects which are parents of self """ + :return: Iterator yielding Commit objects which are parents of self""" # skip ourselves skip = kwargs.get("skip", 1) - if skip == 0: # skip ourselves + if skip == 0: # skip ourselves skip = 1 - kwargs['skip'] = skip + kwargs["skip"] = skip return self.iter_items(self.repo, self, paths, **kwargs) - @ property + @property def stats(self) -> Stats: """Create a git stat from changes between this commit and its first parent or from all changes done if this is the very first commit. :return: git.Stats""" if not self.parents: - text = self.repo.git.diff_tree(self.hexsha, '--', numstat=True, root=True) + text = self.repo.git.diff_tree(self.hexsha, "--", numstat=True, root=True) text2 = "" for line in text.splitlines()[1:]: (insertions, deletions, filename) = line.split("\t") text2 += "%s\t%s\t%s\n" % (insertions, deletions, filename) text = text2 else: - text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, '--', numstat=True) + text = self.repo.git.diff( + self.parents[0].hexsha, self.hexsha, "--", numstat=True + ) return Stats._list_from_string(self.repo, text) @property @@ -352,19 +382,21 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): """ d = {} - cmd = ['git', 'interpret-trailers', '--parse'] + cmd = ["git", "interpret-trailers", "--parse"] proc: Git.AutoInterrupt = self.repo.git.execute(cmd, as_process=True, istream=PIPE) # type: ignore trailer: str = proc.communicate(str(self.message).encode())[0].decode() - if trailer.endswith('\n'): + if trailer.endswith("\n"): trailer = trailer[0:-1] - if trailer != '': - for line in trailer.split('\n'): - key, value = line.split(':', 1) + if trailer != "": + for line in trailer.split("\n"): + key, value = line.split(":", 1) d[key.strip()] = value.strip() return d - @ classmethod - def _iter_from_process_or_stream(cls, repo: 'Repo', proc_or_stream: Union[Popen, IO]) -> Iterator['Commit']: + @classmethod + def _iter_from_process_or_stream( + cls, repo: "Repo", proc_or_stream: Union[Popen, IO] + ) -> Iterator["Commit"]: """Parse out commit information into a list of Commit objects We expect one-line per commit, and parse the actual commit information directly from our lighting fast object database @@ -378,11 +410,11 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # def is_stream(inp) -> TypeGuard[IO]: # return hasattr(proc_or_stream, 'readline') - if hasattr(proc_or_stream, 'wait'): + if hasattr(proc_or_stream, "wait"): proc_or_stream = cast(Popen, proc_or_stream) if proc_or_stream.stdout is not None: stream = proc_or_stream.stdout - elif hasattr(proc_or_stream, 'readline'): + elif hasattr(proc_or_stream, "readline"): proc_or_stream = cast(IO, proc_or_stream) stream = proc_or_stream @@ -402,15 +434,23 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # END for each line in stream # TODO: Review this - it seems process handling got a bit out of control # due to many developers trying to fix the open file handles issue - if hasattr(proc_or_stream, 'wait'): + if hasattr(proc_or_stream, "wait"): proc_or_stream = cast(Popen, proc_or_stream) finalize_process(proc_or_stream) - @ classmethod - def create_from_tree(cls, repo: 'Repo', tree: Union[Tree, str], message: str, - parent_commits: Union[None, List['Commit']] = None, head: bool = False, - author: Union[None, Actor] = None, committer: Union[None, Actor] = None, - author_date: Union[None, str] = None, commit_date: Union[None, str] = None) -> 'Commit': + @classmethod + def create_from_tree( + cls, + repo: "Repo", + tree: Union[Tree, str], + message: str, + parent_commits: Union[None, List["Commit"]] = None, + head: bool = False, + author: Union[None, Actor] = None, + committer: Union[None, Actor] = None, + author_date: Union[None, str] = None, + commit_date: Union[None, str] = None, + ) -> "Commit": """Commit the given tree, creating a commit object. :param repo: Repo object the commit should be part of @@ -473,7 +513,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): is_dst = daylight and localtime().tm_isdst > 0 offset = altzone if is_dst else timezone - author_date_str = env.get(cls.env_author_date, '') + author_date_str = env.get(cls.env_author_date, "") if author_date: author_time, author_offset = parse_date(author_date) elif author_date_str: @@ -482,7 +522,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): author_time, author_offset = unix_time, offset # END set author time - committer_date_str = env.get(cls.env_committer_date, '') + committer_date_str = env.get(cls.env_committer_date, "") if commit_date: committer_time, committer_offset = parse_date(commit_date) elif committer_date_str: @@ -492,7 +532,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # END set committer time # assume utf8 encoding - enc_section, enc_option = cls.conf_encoding.split('.') + enc_section, enc_option = cls.conf_encoding.split(".") conf_encoding = cr.get_value(enc_section, enc_option, cls.default_encoding) if not isinstance(conf_encoding, str): raise TypeError("conf_encoding could not be coerced to str") @@ -504,10 +544,20 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # END tree conversion # CREATE NEW COMMIT - new_commit = cls(repo, cls.NULL_BIN_SHA, tree, - author, author_time, author_offset, - committer, committer_time, committer_offset, - message, parent_commits, conf_encoding) + new_commit = cls( + repo, + cls.NULL_BIN_SHA, + tree, + author, + author_time, + author_offset, + committer, + committer_time, + committer_offset, + message, + parent_commits, + conf_encoding, + ) new_commit.binsha = cls._calculate_sha_(repo, new_commit) @@ -515,48 +565,74 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # need late import here, importing git at the very beginning throws # as well ... import git.refs + try: repo.head.set_commit(new_commit, logmsg=message) except ValueError: # head is not yet set to the ref our HEAD points to # Happens on first commit - master = git.refs.Head.create(repo, repo.head.ref, new_commit, logmsg="commit (initial): %s" % message) - repo.head.set_reference(master, logmsg='commit: Switching to %s' % master) + master = git.refs.Head.create( + repo, + repo.head.ref, + new_commit, + logmsg="commit (initial): %s" % message, + ) + repo.head.set_reference( + master, logmsg="commit: Switching to %s" % master + ) # END handle empty repositories # END advance head handling return new_commit - #{ Serializable Implementation + # { Serializable Implementation - def _serialize(self, stream: BytesIO) -> 'Commit': + def _serialize(self, stream: BytesIO) -> "Commit": write = stream.write - write(("tree %s\n" % self.tree).encode('ascii')) + write(("tree %s\n" % self.tree).encode("ascii")) for p in self.parents: - write(("parent %s\n" % p).encode('ascii')) + write(("parent %s\n" % p).encode("ascii")) a = self.author aname = a.name c = self.committer fmt = "%s %s <%s> %s %s\n" - write((fmt % ("author", aname, a.email, - self.authored_date, - altz_to_utctz_str(self.author_tz_offset))).encode(self.encoding)) + write( + ( + fmt + % ( + "author", + aname, + a.email, + self.authored_date, + altz_to_utctz_str(self.author_tz_offset), + ) + ).encode(self.encoding) + ) # encode committer aname = c.name - write((fmt % ("committer", aname, c.email, - self.committed_date, - altz_to_utctz_str(self.committer_tz_offset))).encode(self.encoding)) + write( + ( + fmt + % ( + "committer", + aname, + c.email, + self.committed_date, + altz_to_utctz_str(self.committer_tz_offset), + ) + ).encode(self.encoding) + ) if self.encoding != self.default_encoding: - write(("encoding %s\n" % self.encoding).encode('ascii')) + write(("encoding %s\n" % self.encoding).encode("ascii")) try: - if self.__getattribute__('gpgsig'): + if self.__getattribute__("gpgsig"): write(b"gpgsig") for sigline in self.gpgsig.rstrip("\n").split("\n"): - write((" " + sigline + "\n").encode('ascii')) + write((" " + sigline + "\n").encode("ascii")) except AttributeError: pass @@ -570,23 +646,29 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # END handle encoding return self - def _deserialize(self, stream: BytesIO) -> 'Commit': + def _deserialize(self, stream: BytesIO) -> "Commit": """ :param from_rev_list: if true, the stream format is coming from the rev-list command Otherwise it is assumed to be a plain data stream from our object """ readline = stream.readline - self.tree = Tree(self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, '') + self.tree = Tree( + self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, "" + ) self.parents = [] next_line = None while True: parent_line = readline() - if not parent_line.startswith(b'parent'): + if not parent_line.startswith(b"parent"): next_line = parent_line break # END abort reading parents - self.parents.append(type(self)(self.repo, hex_to_bin(parent_line.split()[-1].decode('ascii')))) + self.parents.append( + type(self)( + self.repo, hex_to_bin(parent_line.split()[-1].decode("ascii")) + ) + ) # END for each parent line self.parents = tuple(self.parents) @@ -596,9 +678,9 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # we might run into one or more mergetag blocks, skip those for now next_line = readline() - while next_line.startswith(b'mergetag '): + while next_line.startswith(b"mergetag "): next_line = readline() - while next_line.startswith(b' '): + while next_line.startswith(b" "): next_line = readline() # end skip mergetags @@ -612,10 +694,11 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): buf = enc.strip() while buf: if buf[0:10] == b"encoding ": - self.encoding = buf[buf.find(b' ') + 1:].decode( - self.encoding, 'ignore') + self.encoding = buf[buf.find(b" ") + 1 :].decode( + self.encoding, "ignore" + ) elif buf[0:7] == b"gpgsig ": - sig = buf[buf.find(b' ') + 1:] + b"\n" + sig = buf[buf.find(b" ") + 1 :] + b"\n" is_next_header = False while True: sigbuf = readline() @@ -627,37 +710,55 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): break sig += sigbuf[1:] # end read all signature - self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, 'ignore') + self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, "ignore") if is_next_header: continue buf = readline().strip() # decode the authors name try: - (self.author, self.authored_date, self.author_tz_offset) = \ - parse_actor_and_date(author_line.decode(self.encoding, 'replace')) + ( + self.author, + self.authored_date, + self.author_tz_offset, + ) = parse_actor_and_date(author_line.decode(self.encoding, "replace")) except UnicodeDecodeError: - log.error("Failed to decode author line '%s' using encoding %s", author_line, self.encoding, - exc_info=True) + log.error( + "Failed to decode author line '%s' using encoding %s", + author_line, + self.encoding, + exc_info=True, + ) try: - self.committer, self.committed_date, self.committer_tz_offset = \ - parse_actor_and_date(committer_line.decode(self.encoding, 'replace')) + ( + self.committer, + self.committed_date, + self.committer_tz_offset, + ) = parse_actor_and_date(committer_line.decode(self.encoding, "replace")) except UnicodeDecodeError: - log.error("Failed to decode committer line '%s' using encoding %s", committer_line, self.encoding, - exc_info=True) + log.error( + "Failed to decode committer line '%s' using encoding %s", + committer_line, + self.encoding, + exc_info=True, + ) # END handle author's encoding # a stream from our data simply gives us the plain message # The end of our message stream is marked with a newline that we strip self.message = stream.read() try: - self.message = self.message.decode(self.encoding, 'replace') + self.message = self.message.decode(self.encoding, "replace") except UnicodeDecodeError: - log.error("Failed to decode message '%s' using encoding %s", - self.message, self.encoding, exc_info=True) + log.error( + "Failed to decode message '%s' using encoding %s", + self.message, + self.encoding, + exc_info=True, + ) # END exception handling return self - #} END serializable implementation + # } END serializable implementation |