summaryrefslogtreecommitdiff
path: root/git/refs/log.py
diff options
context:
space:
mode:
Diffstat (limited to 'git/refs/log.py')
-rw-r--r--git/refs/log.py117
1 files changed, 70 insertions, 47 deletions
diff --git a/git/refs/log.py b/git/refs/log.py
index ddd78bc7..908f93d1 100644
--- a/git/refs/log.py
+++ b/git/refs/log.py
@@ -1,4 +1,3 @@
-
from mmap import mmap
import re
import time as _time
@@ -16,7 +15,7 @@ from git.util import (
assure_directory_exists,
to_native_path,
bin_to_hex,
- file_contents_ro_filepath
+ file_contents_ro_filepath,
)
import os.path as osp
@@ -41,7 +40,8 @@ __all__ = ["RefLog", "RefLogEntry"]
class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):
"""Named tuple allowing easy access to the revlog data fields"""
- _re_hexsha_only = re.compile('^[0-9A-Fa-f]{40}$')
+
+ _re_hexsha_only = re.compile("^[0-9A-Fa-f]{40}$")
__slots__ = ()
def __repr__(self) -> str:
@@ -52,13 +52,15 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):
""":return: a string suitable to be placed in a reflog file"""
act = self.actor
time = self.time
- return "{} {} {} <{}> {!s} {}\t{}\n".format(self.oldhexsha,
- self.newhexsha,
- act.name,
- act.email,
- time[0],
- altz_to_utctz_str(time[1]),
- self.message)
+ return "{} {} {} <{}> {!s} {}\t{}\n".format(
+ self.oldhexsha,
+ self.newhexsha,
+ act.name,
+ act.email,
+ time[0],
+ altz_to_utctz_str(time[1]),
+ self.message,
+ )
@property
def oldhexsha(self) -> str:
@@ -80,7 +82,7 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):
"""time as tuple:
* [0] = int(time)
- * [1] = int(timezone_offset) in time.altzone format """
+ * [1] = int(timezone_offset) in time.altzone format"""
return self[3]
@property
@@ -89,8 +91,15 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):
return self[4]
@classmethod
- def new(cls, oldhexsha: str, newhexsha: str, actor: Actor, time: int, tz_offset: int, message: str
- ) -> 'RefLogEntry': # skipcq: PYL-W0621
+ def new(
+ cls,
+ oldhexsha: str,
+ newhexsha: str,
+ actor: Actor,
+ time: int,
+ tz_offset: int,
+ message: str,
+ ) -> "RefLogEntry": # skipcq: PYL-W0621
""":return: New instance of a RefLogEntry"""
if not isinstance(actor, Actor):
raise ValueError("Need actor instance, got %s" % actor)
@@ -98,19 +107,21 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):
return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), message))
@classmethod
- def from_line(cls, line: bytes) -> 'RefLogEntry':
+ def from_line(cls, line: bytes) -> "RefLogEntry":
""":return: New RefLogEntry instance from the given revlog line.
:param line: line bytes without trailing newline
:raise ValueError: If line could not be parsed"""
line_str = line.decode(defenc)
- fields = line_str.split('\t', 1)
+ fields = line_str.split("\t", 1)
if len(fields) == 1:
info, msg = fields[0], None
elif len(fields) == 2:
info, msg = fields
else:
- raise ValueError("Line must have up to two TAB-separated fields."
- " Got %s" % repr(line_str))
+ raise ValueError(
+ "Line must have up to two TAB-separated fields."
+ " Got %s" % repr(line_str)
+ )
# END handle first split
oldhexsha = info[:40]
@@ -121,14 +132,13 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):
# END if hexsha re doesn't match
# END for each hexsha
- email_end = info.find('>', 82)
+ email_end = info.find(">", 82)
if email_end == -1:
raise ValueError("Missing token: >")
# END handle missing end brace
- actor = Actor._from_string(info[82:email_end + 1])
- time, tz_offset = parse_date(
- info[email_end + 2:]) # skipcq: PYL-W0621
+ actor = Actor._from_string(info[82 : email_end + 1])
+ time, tz_offset = parse_date(info[email_end + 2 :]) # skipcq: PYL-W0621
return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), msg))
@@ -142,9 +152,9 @@ class RefLog(List[RefLogEntry], Serializable):
Reflog entries are ordered, the first added entry is first in the list, the last
entry, i.e. the last change of the head or reference, is last in the list."""
- __slots__ = ('_path', )
+ __slots__ = ("_path",)
- def __new__(cls, filepath: Union[PathLike, None] = None) -> 'RefLog':
+ def __new__(cls, filepath: Union[PathLike, None] = None) -> "RefLog":
inst = super(RefLog, cls).__new__(cls)
return inst
@@ -159,8 +169,7 @@ class RefLog(List[RefLogEntry], Serializable):
def _read_from_file(self) -> None:
try:
- fmap = file_contents_ro_filepath(
- self._path, stream=True, allow_mmap=True)
+ fmap = file_contents_ro_filepath(self._path, stream=True, allow_mmap=True)
except OSError:
# it is possible and allowed that the file doesn't exist !
return
@@ -175,7 +184,7 @@ class RefLog(List[RefLogEntry], Serializable):
# { Interface
@classmethod
- def from_file(cls, filepath: PathLike) -> 'RefLog':
+ def from_file(cls, filepath: PathLike) -> "RefLog":
"""
:return: a new RefLog instance containing all entries from the reflog
at the given filepath
@@ -184,7 +193,7 @@ class RefLog(List[RefLogEntry], Serializable):
return cls(filepath)
@classmethod
- def path(cls, ref: 'SymbolicReference') -> str:
+ def path(cls, ref: "SymbolicReference") -> str:
"""
:return: string to absolute path at which the reflog of the given ref
instance would be found. The path is not guaranteed to point to a valid
@@ -193,7 +202,7 @@ class RefLog(List[RefLogEntry], Serializable):
return osp.join(ref.repo.git_dir, "logs", to_native_path(ref.path))
@classmethod
- def iter_entries(cls, stream: Union[str, 'BytesIO', mmap]) -> Iterator[RefLogEntry]:
+ def iter_entries(cls, stream: Union[str, "BytesIO", mmap]) -> Iterator[RefLogEntry]:
"""
:return: Iterator yielding RefLogEntry instances, one for each line read
sfrom the given stream.
@@ -215,7 +224,7 @@ class RefLog(List[RefLogEntry], Serializable):
# END endless loop
@classmethod
- def entry_at(cls, filepath: PathLike, index: int) -> 'RefLogEntry':
+ def entry_at(cls, filepath: PathLike, index: int) -> "RefLogEntry":
"""
:return: RefLogEntry at the given index
@@ -230,7 +239,7 @@ class RefLog(List[RefLogEntry], Serializable):
all other lines. Nonetheless, the whole file has to be read if
the index is negative
"""
- with open(filepath, 'rb') as fp:
+ with open(filepath, "rb") as fp:
if index < 0:
return RefLogEntry.from_line(fp.readlines()[index].strip())
# read until index is reached
@@ -239,7 +248,8 @@ class RefLog(List[RefLogEntry], Serializable):
line = fp.readline()
if not line:
raise IndexError(
- f"Index file ended at line {i+1}, before given index was reached")
+ f"Index file ended at line {i+1}, before given index was reached"
+ )
# END abort on eof
# END handle runup
@@ -263,9 +273,15 @@ class RefLog(List[RefLogEntry], Serializable):
# END handle change
@classmethod
- def append_entry(cls, config_reader: Union[Actor, 'GitConfigParser', 'SectionConstraint', None],
- filepath: PathLike, oldbinsha: bytes, newbinsha: bytes, message: str,
- write: bool = True) -> 'RefLogEntry':
+ def append_entry(
+ cls,
+ config_reader: Union[Actor, "GitConfigParser", "SectionConstraint", None],
+ filepath: PathLike,
+ oldbinsha: bytes,
+ newbinsha: bytes,
+ message: str,
+ write: bool = True,
+ ) -> "RefLogEntry":
"""Append a new log entry to the revlog at filepath.
:param config_reader: configuration reader of the repository - used to obtain
@@ -286,21 +302,27 @@ class RefLog(List[RefLogEntry], Serializable):
raise ValueError("Shas need to be given in binary format")
# END handle sha type
assure_directory_exists(filepath, is_file=True)
- first_line = message.split('\n')[0]
+ first_line = message.split("\n")[0]
if isinstance(config_reader, Actor):
- committer = config_reader # mypy thinks this is Actor | Gitconfigparser, but why?
+ committer = (
+ config_reader # mypy thinks this is Actor | Gitconfigparser, but why?
+ )
else:
committer = Actor.committer(config_reader)
- entry = RefLogEntry((
- bin_to_hex(oldbinsha).decode('ascii'),
- bin_to_hex(newbinsha).decode('ascii'),
- committer, (int(_time.time()), _time.altzone), first_line
- ))
+ entry = RefLogEntry(
+ (
+ bin_to_hex(oldbinsha).decode("ascii"),
+ bin_to_hex(newbinsha).decode("ascii"),
+ committer,
+ (int(_time.time()), _time.altzone),
+ first_line,
+ )
+ )
if write:
lf = LockFile(filepath)
lf._obtain_lock_or_raise()
- fd = open(filepath, 'ab')
+ fd = open(filepath, "ab")
try:
fd.write(entry.format().encode(defenc))
finally:
@@ -309,12 +331,13 @@ class RefLog(List[RefLogEntry], Serializable):
# END handle write operation
return entry
- def write(self) -> 'RefLog':
+ def write(self) -> "RefLog":
"""Write this instance's data to the file we are originating from
:return: self"""
if self._path is None:
raise ValueError(
- "Instance was not initialized with a path, use to_file(...) instead")
+ "Instance was not initialized with a path, use to_file(...) instead"
+ )
# END assert path
self.to_file(self._path)
return self
@@ -322,7 +345,7 @@ class RefLog(List[RefLogEntry], Serializable):
# } END interface
# { Serializable Interface
- def _serialize(self, stream: 'BytesIO') -> 'RefLog':
+ def _serialize(self, stream: "BytesIO") -> "RefLog":
write = stream.write
# write all entries
@@ -331,7 +354,7 @@ class RefLog(List[RefLogEntry], Serializable):
# END for each entry
return self
- def _deserialize(self, stream: 'BytesIO') -> 'RefLog':
+ def _deserialize(self, stream: "BytesIO") -> "RefLog":
self.extend(self.iter_entries(stream))
- # } END serializable interface
+ # } END serializable interface
return self