summaryrefslogtreecommitdiff
path: root/git/objects/commit.py
diff options
context:
space:
mode:
Diffstat (limited to 'git/objects/commit.py')
-rw-r--r--git/objects/commit.py339
1 files changed, 220 insertions, 119 deletions
diff --git a/git/objects/commit.py b/git/objects/commit.py
index 96a2a8e5..137cc620 100644
--- a/git/objects/commit.py
+++ b/git/objects/commit.py
@@ -6,12 +6,7 @@
import datetime
from subprocess import Popen, PIPE
from gitdb import IStream
-from git.util import (
- hex_to_bin,
- Actor,
- Stats,
- finalize_process
-)
+from git.util import hex_to_bin, Actor, Stats, finalize_process
from git.diff import Diffable
from git.cmd import Git
@@ -26,13 +21,7 @@ from .util import (
from_timestamp,
)
-from time import (
- time,
- daylight,
- altzone,
- timezone,
- localtime
-)
+from time import time, daylight, altzone, timezone, localtime
import os
from io import BytesIO
import logging
@@ -40,7 +29,18 @@ import logging
# typing ------------------------------------------------------------------
-from typing import Any, IO, Iterator, List, Sequence, Tuple, Union, TYPE_CHECKING, cast, Dict
+from typing import (
+ Any,
+ IO,
+ Iterator,
+ List,
+ Sequence,
+ Tuple,
+ Union,
+ TYPE_CHECKING,
+ cast,
+ Dict,
+)
from git.types import PathLike, Literal
@@ -50,10 +50,10 @@ if TYPE_CHECKING:
# ------------------------------------------------------------------------
-log = logging.getLogger('git.objects.commit')
+log = logging.getLogger("git.objects.commit")
log.addHandler(logging.NullHandler())
-__all__ = ('Commit', )
+__all__ = ("Commit",)
class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
@@ -69,30 +69,44 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
env_committer_date = "GIT_COMMITTER_DATE"
# CONFIGURATION KEYS
- conf_encoding = 'i18n.commitencoding'
+ conf_encoding = "i18n.commitencoding"
# INVARIANTS
default_encoding = "UTF-8"
# object configuration
- type: Literal['commit'] = "commit"
- __slots__ = ("tree",
- "author", "authored_date", "author_tz_offset",
- "committer", "committed_date", "committer_tz_offset",
- "message", "parents", "encoding", "gpgsig")
+ type: Literal["commit"] = "commit"
+ __slots__ = (
+ "tree",
+ "author",
+ "authored_date",
+ "author_tz_offset",
+ "committer",
+ "committed_date",
+ "committer_tz_offset",
+ "message",
+ "parents",
+ "encoding",
+ "gpgsig",
+ )
_id_attribute_ = "hexsha"
- def __init__(self, repo: 'Repo', binsha: bytes, tree: Union[Tree, None] = None,
- author: Union[Actor, None] = None,
- authored_date: Union[int, None] = None,
- author_tz_offset: Union[None, float] = None,
- committer: Union[Actor, None] = None,
- committed_date: Union[int, None] = None,
- committer_tz_offset: Union[None, float] = None,
- message: Union[str, bytes, None] = None,
- parents: Union[Sequence['Commit'], None] = None,
- encoding: Union[str, None] = None,
- gpgsig: Union[str, None] = None) -> None:
+ def __init__(
+ self,
+ repo: "Repo",
+ binsha: bytes,
+ tree: Union[Tree, None] = None,
+ author: Union[Actor, None] = None,
+ authored_date: Union[int, None] = None,
+ author_tz_offset: Union[None, float] = None,
+ committer: Union[Actor, None] = None,
+ committed_date: Union[int, None] = None,
+ committer_tz_offset: Union[None, float] = None,
+ message: Union[str, bytes, None] = None,
+ parents: Union[Sequence["Commit"], None] = None,
+ encoding: Union[str, None] = None,
+ gpgsig: Union[str, None] = None,
+ ) -> None:
"""Instantiate a new Commit. All keyword arguments taking None as default will
be implicitly set on first query.
@@ -130,7 +144,9 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
super(Commit, self).__init__(repo, binsha)
self.binsha = binsha
if tree is not None:
- assert isinstance(tree, Tree), "Tree needs to be a Tree instance, was %s" % type(tree)
+ assert isinstance(
+ tree, Tree
+ ), "Tree needs to be a Tree instance, was %s" % type(tree)
if tree is not None:
self.tree = tree
if author is not None:
@@ -155,16 +171,16 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
self.gpgsig = gpgsig
@classmethod
- def _get_intermediate_items(cls, commit: 'Commit') -> Tuple['Commit', ...]:
+ def _get_intermediate_items(cls, commit: "Commit") -> Tuple["Commit", ...]:
return tuple(commit.parents)
@classmethod
- def _calculate_sha_(cls, repo: 'Repo', commit: 'Commit') -> bytes:
- '''Calculate the sha of a commit.
+ def _calculate_sha_(cls, repo: "Repo", commit: "Commit") -> bytes:
+ """Calculate the sha of a commit.
:param repo: Repo object the commit should be part of
:param commit: Commit object for which to generate the sha
- '''
+ """
stream = BytesIO()
commit._serialize(stream)
@@ -174,18 +190,18 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
istream = repo.odb.store(IStream(cls.type, streamlen, stream))
return istream.binsha
- def replace(self, **kwargs: Any) -> 'Commit':
- '''Create new commit object from existing commit object.
+ def replace(self, **kwargs: Any) -> "Commit":
+ """Create new commit object from existing commit object.
Any values provided as keyword arguments will replace the
corresponding attribute in the new object.
- '''
+ """
attrs = {k: getattr(self, k) for k in self.__slots__}
for attrname in kwargs:
if attrname not in self.__slots__:
- raise ValueError('invalid attribute name')
+ raise ValueError("invalid attribute name")
attrs.update(kwargs)
new_commit = self.__class__(self.repo, self.NULL_BIN_SHA, **attrs)
@@ -214,11 +230,13 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
def summary(self) -> Union[str, bytes]:
""":return: First line of the commit message"""
if isinstance(self.message, str):
- return self.message.split('\n', 1)[0]
+ return self.message.split("\n", 1)[0]
else:
- return self.message.split(b'\n', 1)[0]
+ return self.message.split(b"\n", 1)[0]
- def count(self, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any) -> int:
+ def count(
+ self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any
+ ) -> int:
"""Count the number of commits reachable from this commit
:param paths:
@@ -232,7 +250,9 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# yes, it makes a difference whether empty paths are given or not in our case
# as the empty paths version will ignore merge commits for some reason.
if paths:
- return len(self.repo.git.rev_list(self.hexsha, '--', paths, **kwargs).splitlines())
+ return len(
+ self.repo.git.rev_list(self.hexsha, "--", paths, **kwargs).splitlines()
+ )
return len(self.repo.git.rev_list(self.hexsha, **kwargs).splitlines())
@property
@@ -244,9 +264,13 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
return self.repo.git.name_rev(self)
@classmethod
- def iter_items(cls, repo: 'Repo', rev: Union[str, 'Commit', 'SymbolicReference'], # type: ignore
- paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any
- ) -> Iterator['Commit']:
+ def iter_items(
+ cls,
+ repo: "Repo",
+ rev: Union[str, "Commit", "SymbolicReference"], # type: ignore
+ paths: Union[PathLike, Sequence[PathLike]] = "",
+ **kwargs: Any,
+ ) -> Iterator["Commit"]:
"""Find all commits matching the given criteria.
:param repo: is the Repo
@@ -260,19 +284,21 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
``skip`` is the number of commits to skip
``since`` all commits since i.e. '1970-01-01'
:return: iterator yielding Commit items"""
- if 'pretty' in kwargs:
- raise ValueError("--pretty cannot be used as parsing expects single sha's only")
+ if "pretty" in kwargs:
+ raise ValueError(
+ "--pretty cannot be used as parsing expects single sha's only"
+ )
# END handle pretty
# use -- in any case, to prevent possibility of ambiguous arguments
# see https://github.com/gitpython-developers/GitPython/issues/264
- args_list: List[PathLike] = ['--']
+ args_list: List[PathLike] = ["--"]
if paths:
paths_tup: Tuple[PathLike, ...]
if isinstance(paths, (str, os.PathLike)):
- paths_tup = (paths, )
+ paths_tup = (paths,)
else:
paths_tup = tuple(paths)
@@ -282,37 +308,41 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs)
return cls._iter_from_process_or_stream(repo, proc)
- def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any) -> Iterator['Commit']:
+ def iter_parents(
+ self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any
+ ) -> Iterator["Commit"]:
"""Iterate _all_ parents of this commit.
:param paths:
Optional path or list of paths limiting the Commits to those that
contain at least one of the paths
:param kwargs: All arguments allowed by git-rev-list
- :return: Iterator yielding Commit objects which are parents of self """
+ :return: Iterator yielding Commit objects which are parents of self"""
# skip ourselves
skip = kwargs.get("skip", 1)
- if skip == 0: # skip ourselves
+ if skip == 0: # skip ourselves
skip = 1
- kwargs['skip'] = skip
+ kwargs["skip"] = skip
return self.iter_items(self.repo, self, paths, **kwargs)
- @ property
+ @property
def stats(self) -> Stats:
"""Create a git stat from changes between this commit and its first parent
or from all changes done if this is the very first commit.
:return: git.Stats"""
if not self.parents:
- text = self.repo.git.diff_tree(self.hexsha, '--', numstat=True, root=True)
+ text = self.repo.git.diff_tree(self.hexsha, "--", numstat=True, root=True)
text2 = ""
for line in text.splitlines()[1:]:
(insertions, deletions, filename) = line.split("\t")
text2 += "%s\t%s\t%s\n" % (insertions, deletions, filename)
text = text2
else:
- text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, '--', numstat=True)
+ text = self.repo.git.diff(
+ self.parents[0].hexsha, self.hexsha, "--", numstat=True
+ )
return Stats._list_from_string(self.repo, text)
@property
@@ -352,19 +382,21 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
"""
d = {}
- cmd = ['git', 'interpret-trailers', '--parse']
+ cmd = ["git", "interpret-trailers", "--parse"]
proc: Git.AutoInterrupt = self.repo.git.execute(cmd, as_process=True, istream=PIPE) # type: ignore
trailer: str = proc.communicate(str(self.message).encode())[0].decode()
- if trailer.endswith('\n'):
+ if trailer.endswith("\n"):
trailer = trailer[0:-1]
- if trailer != '':
- for line in trailer.split('\n'):
- key, value = line.split(':', 1)
+ if trailer != "":
+ for line in trailer.split("\n"):
+ key, value = line.split(":", 1)
d[key.strip()] = value.strip()
return d
- @ classmethod
- def _iter_from_process_or_stream(cls, repo: 'Repo', proc_or_stream: Union[Popen, IO]) -> Iterator['Commit']:
+ @classmethod
+ def _iter_from_process_or_stream(
+ cls, repo: "Repo", proc_or_stream: Union[Popen, IO]
+ ) -> Iterator["Commit"]:
"""Parse out commit information into a list of Commit objects
We expect one-line per commit, and parse the actual commit information directly
from our lighting fast object database
@@ -378,11 +410,11 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# def is_stream(inp) -> TypeGuard[IO]:
# return hasattr(proc_or_stream, 'readline')
- if hasattr(proc_or_stream, 'wait'):
+ if hasattr(proc_or_stream, "wait"):
proc_or_stream = cast(Popen, proc_or_stream)
if proc_or_stream.stdout is not None:
stream = proc_or_stream.stdout
- elif hasattr(proc_or_stream, 'readline'):
+ elif hasattr(proc_or_stream, "readline"):
proc_or_stream = cast(IO, proc_or_stream)
stream = proc_or_stream
@@ -402,15 +434,23 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# END for each line in stream
# TODO: Review this - it seems process handling got a bit out of control
# due to many developers trying to fix the open file handles issue
- if hasattr(proc_or_stream, 'wait'):
+ if hasattr(proc_or_stream, "wait"):
proc_or_stream = cast(Popen, proc_or_stream)
finalize_process(proc_or_stream)
- @ classmethod
- def create_from_tree(cls, repo: 'Repo', tree: Union[Tree, str], message: str,
- parent_commits: Union[None, List['Commit']] = None, head: bool = False,
- author: Union[None, Actor] = None, committer: Union[None, Actor] = None,
- author_date: Union[None, str] = None, commit_date: Union[None, str] = None) -> 'Commit':
+ @classmethod
+ def create_from_tree(
+ cls,
+ repo: "Repo",
+ tree: Union[Tree, str],
+ message: str,
+ parent_commits: Union[None, List["Commit"]] = None,
+ head: bool = False,
+ author: Union[None, Actor] = None,
+ committer: Union[None, Actor] = None,
+ author_date: Union[None, str] = None,
+ commit_date: Union[None, str] = None,
+ ) -> "Commit":
"""Commit the given tree, creating a commit object.
:param repo: Repo object the commit should be part of
@@ -473,7 +513,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
is_dst = daylight and localtime().tm_isdst > 0
offset = altzone if is_dst else timezone
- author_date_str = env.get(cls.env_author_date, '')
+ author_date_str = env.get(cls.env_author_date, "")
if author_date:
author_time, author_offset = parse_date(author_date)
elif author_date_str:
@@ -482,7 +522,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
author_time, author_offset = unix_time, offset
# END set author time
- committer_date_str = env.get(cls.env_committer_date, '')
+ committer_date_str = env.get(cls.env_committer_date, "")
if commit_date:
committer_time, committer_offset = parse_date(commit_date)
elif committer_date_str:
@@ -492,7 +532,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# END set committer time
# assume utf8 encoding
- enc_section, enc_option = cls.conf_encoding.split('.')
+ enc_section, enc_option = cls.conf_encoding.split(".")
conf_encoding = cr.get_value(enc_section, enc_option, cls.default_encoding)
if not isinstance(conf_encoding, str):
raise TypeError("conf_encoding could not be coerced to str")
@@ -504,10 +544,20 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# END tree conversion
# CREATE NEW COMMIT
- new_commit = cls(repo, cls.NULL_BIN_SHA, tree,
- author, author_time, author_offset,
- committer, committer_time, committer_offset,
- message, parent_commits, conf_encoding)
+ new_commit = cls(
+ repo,
+ cls.NULL_BIN_SHA,
+ tree,
+ author,
+ author_time,
+ author_offset,
+ committer,
+ committer_time,
+ committer_offset,
+ message,
+ parent_commits,
+ conf_encoding,
+ )
new_commit.binsha = cls._calculate_sha_(repo, new_commit)
@@ -515,48 +565,74 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# need late import here, importing git at the very beginning throws
# as well ...
import git.refs
+
try:
repo.head.set_commit(new_commit, logmsg=message)
except ValueError:
# head is not yet set to the ref our HEAD points to
# Happens on first commit
- master = git.refs.Head.create(repo, repo.head.ref, new_commit, logmsg="commit (initial): %s" % message)
- repo.head.set_reference(master, logmsg='commit: Switching to %s' % master)
+ master = git.refs.Head.create(
+ repo,
+ repo.head.ref,
+ new_commit,
+ logmsg="commit (initial): %s" % message,
+ )
+ repo.head.set_reference(
+ master, logmsg="commit: Switching to %s" % master
+ )
# END handle empty repositories
# END advance head handling
return new_commit
- #{ Serializable Implementation
+ # { Serializable Implementation
- def _serialize(self, stream: BytesIO) -> 'Commit':
+ def _serialize(self, stream: BytesIO) -> "Commit":
write = stream.write
- write(("tree %s\n" % self.tree).encode('ascii'))
+ write(("tree %s\n" % self.tree).encode("ascii"))
for p in self.parents:
- write(("parent %s\n" % p).encode('ascii'))
+ write(("parent %s\n" % p).encode("ascii"))
a = self.author
aname = a.name
c = self.committer
fmt = "%s %s <%s> %s %s\n"
- write((fmt % ("author", aname, a.email,
- self.authored_date,
- altz_to_utctz_str(self.author_tz_offset))).encode(self.encoding))
+ write(
+ (
+ fmt
+ % (
+ "author",
+ aname,
+ a.email,
+ self.authored_date,
+ altz_to_utctz_str(self.author_tz_offset),
+ )
+ ).encode(self.encoding)
+ )
# encode committer
aname = c.name
- write((fmt % ("committer", aname, c.email,
- self.committed_date,
- altz_to_utctz_str(self.committer_tz_offset))).encode(self.encoding))
+ write(
+ (
+ fmt
+ % (
+ "committer",
+ aname,
+ c.email,
+ self.committed_date,
+ altz_to_utctz_str(self.committer_tz_offset),
+ )
+ ).encode(self.encoding)
+ )
if self.encoding != self.default_encoding:
- write(("encoding %s\n" % self.encoding).encode('ascii'))
+ write(("encoding %s\n" % self.encoding).encode("ascii"))
try:
- if self.__getattribute__('gpgsig'):
+ if self.__getattribute__("gpgsig"):
write(b"gpgsig")
for sigline in self.gpgsig.rstrip("\n").split("\n"):
- write((" " + sigline + "\n").encode('ascii'))
+ write((" " + sigline + "\n").encode("ascii"))
except AttributeError:
pass
@@ -570,23 +646,29 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# END handle encoding
return self
- def _deserialize(self, stream: BytesIO) -> 'Commit':
+ def _deserialize(self, stream: BytesIO) -> "Commit":
"""
:param from_rev_list: if true, the stream format is coming from the rev-list command
Otherwise it is assumed to be a plain data stream from our object
"""
readline = stream.readline
- self.tree = Tree(self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, '')
+ self.tree = Tree(
+ self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, ""
+ )
self.parents = []
next_line = None
while True:
parent_line = readline()
- if not parent_line.startswith(b'parent'):
+ if not parent_line.startswith(b"parent"):
next_line = parent_line
break
# END abort reading parents
- self.parents.append(type(self)(self.repo, hex_to_bin(parent_line.split()[-1].decode('ascii'))))
+ self.parents.append(
+ type(self)(
+ self.repo, hex_to_bin(parent_line.split()[-1].decode("ascii"))
+ )
+ )
# END for each parent line
self.parents = tuple(self.parents)
@@ -596,9 +678,9 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# we might run into one or more mergetag blocks, skip those for now
next_line = readline()
- while next_line.startswith(b'mergetag '):
+ while next_line.startswith(b"mergetag "):
next_line = readline()
- while next_line.startswith(b' '):
+ while next_line.startswith(b" "):
next_line = readline()
# end skip mergetags
@@ -612,10 +694,11 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
buf = enc.strip()
while buf:
if buf[0:10] == b"encoding ":
- self.encoding = buf[buf.find(b' ') + 1:].decode(
- self.encoding, 'ignore')
+ self.encoding = buf[buf.find(b" ") + 1 :].decode(
+ self.encoding, "ignore"
+ )
elif buf[0:7] == b"gpgsig ":
- sig = buf[buf.find(b' ') + 1:] + b"\n"
+ sig = buf[buf.find(b" ") + 1 :] + b"\n"
is_next_header = False
while True:
sigbuf = readline()
@@ -627,37 +710,55 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
break
sig += sigbuf[1:]
# end read all signature
- self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, 'ignore')
+ self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, "ignore")
if is_next_header:
continue
buf = readline().strip()
# decode the authors name
try:
- (self.author, self.authored_date, self.author_tz_offset) = \
- parse_actor_and_date(author_line.decode(self.encoding, 'replace'))
+ (
+ self.author,
+ self.authored_date,
+ self.author_tz_offset,
+ ) = parse_actor_and_date(author_line.decode(self.encoding, "replace"))
except UnicodeDecodeError:
- log.error("Failed to decode author line '%s' using encoding %s", author_line, self.encoding,
- exc_info=True)
+ log.error(
+ "Failed to decode author line '%s' using encoding %s",
+ author_line,
+ self.encoding,
+ exc_info=True,
+ )
try:
- self.committer, self.committed_date, self.committer_tz_offset = \
- parse_actor_and_date(committer_line.decode(self.encoding, 'replace'))
+ (
+ self.committer,
+ self.committed_date,
+ self.committer_tz_offset,
+ ) = parse_actor_and_date(committer_line.decode(self.encoding, "replace"))
except UnicodeDecodeError:
- log.error("Failed to decode committer line '%s' using encoding %s", committer_line, self.encoding,
- exc_info=True)
+ log.error(
+ "Failed to decode committer line '%s' using encoding %s",
+ committer_line,
+ self.encoding,
+ exc_info=True,
+ )
# END handle author's encoding
# a stream from our data simply gives us the plain message
# The end of our message stream is marked with a newline that we strip
self.message = stream.read()
try:
- self.message = self.message.decode(self.encoding, 'replace')
+ self.message = self.message.decode(self.encoding, "replace")
except UnicodeDecodeError:
- log.error("Failed to decode message '%s' using encoding %s",
- self.message, self.encoding, exc_info=True)
+ log.error(
+ "Failed to decode message '%s' using encoding %s",
+ self.message,
+ self.encoding,
+ exc_info=True,
+ )
# END exception handling
return self
- #} END serializable implementation
+ # } END serializable implementation