diff options
Diffstat (limited to 'git/refs/log.py')
-rw-r--r-- | git/refs/log.py | 117 |
1 files changed, 70 insertions, 47 deletions
diff --git a/git/refs/log.py b/git/refs/log.py index ddd78bc7..908f93d1 100644 --- a/git/refs/log.py +++ b/git/refs/log.py @@ -1,4 +1,3 @@ - from mmap import mmap import re import time as _time @@ -16,7 +15,7 @@ from git.util import ( assure_directory_exists, to_native_path, bin_to_hex, - file_contents_ro_filepath + file_contents_ro_filepath, ) import os.path as osp @@ -41,7 +40,8 @@ __all__ = ["RefLog", "RefLogEntry"] class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]): """Named tuple allowing easy access to the revlog data fields""" - _re_hexsha_only = re.compile('^[0-9A-Fa-f]{40}$') + + _re_hexsha_only = re.compile("^[0-9A-Fa-f]{40}$") __slots__ = () def __repr__(self) -> str: @@ -52,13 +52,15 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]): """:return: a string suitable to be placed in a reflog file""" act = self.actor time = self.time - return "{} {} {} <{}> {!s} {}\t{}\n".format(self.oldhexsha, - self.newhexsha, - act.name, - act.email, - time[0], - altz_to_utctz_str(time[1]), - self.message) + return "{} {} {} <{}> {!s} {}\t{}\n".format( + self.oldhexsha, + self.newhexsha, + act.name, + act.email, + time[0], + altz_to_utctz_str(time[1]), + self.message, + ) @property def oldhexsha(self) -> str: @@ -80,7 +82,7 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]): """time as tuple: * [0] = int(time) - * [1] = int(timezone_offset) in time.altzone format """ + * [1] = int(timezone_offset) in time.altzone format""" return self[3] @property @@ -89,8 +91,15 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]): return self[4] @classmethod - def new(cls, oldhexsha: str, newhexsha: str, actor: Actor, time: int, tz_offset: int, message: str - ) -> 'RefLogEntry': # skipcq: PYL-W0621 + def new( + cls, + oldhexsha: str, + newhexsha: str, + actor: Actor, + time: int, + tz_offset: int, + message: str, + ) -> "RefLogEntry": # skipcq: PYL-W0621 """:return: New instance of a RefLogEntry""" if not isinstance(actor, Actor): raise ValueError("Need actor instance, got %s" % actor) @@ -98,19 +107,21 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]): return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), message)) @classmethod - def from_line(cls, line: bytes) -> 'RefLogEntry': + def from_line(cls, line: bytes) -> "RefLogEntry": """:return: New RefLogEntry instance from the given revlog line. :param line: line bytes without trailing newline :raise ValueError: If line could not be parsed""" line_str = line.decode(defenc) - fields = line_str.split('\t', 1) + fields = line_str.split("\t", 1) if len(fields) == 1: info, msg = fields[0], None elif len(fields) == 2: info, msg = fields else: - raise ValueError("Line must have up to two TAB-separated fields." - " Got %s" % repr(line_str)) + raise ValueError( + "Line must have up to two TAB-separated fields." + " Got %s" % repr(line_str) + ) # END handle first split oldhexsha = info[:40] @@ -121,14 +132,13 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]): # END if hexsha re doesn't match # END for each hexsha - email_end = info.find('>', 82) + email_end = info.find(">", 82) if email_end == -1: raise ValueError("Missing token: >") # END handle missing end brace - actor = Actor._from_string(info[82:email_end + 1]) - time, tz_offset = parse_date( - info[email_end + 2:]) # skipcq: PYL-W0621 + actor = Actor._from_string(info[82 : email_end + 1]) + time, tz_offset = parse_date(info[email_end + 2 :]) # skipcq: PYL-W0621 return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), msg)) @@ -142,9 +152,9 @@ class RefLog(List[RefLogEntry], Serializable): Reflog entries are ordered, the first added entry is first in the list, the last entry, i.e. the last change of the head or reference, is last in the list.""" - __slots__ = ('_path', ) + __slots__ = ("_path",) - def __new__(cls, filepath: Union[PathLike, None] = None) -> 'RefLog': + def __new__(cls, filepath: Union[PathLike, None] = None) -> "RefLog": inst = super(RefLog, cls).__new__(cls) return inst @@ -159,8 +169,7 @@ class RefLog(List[RefLogEntry], Serializable): def _read_from_file(self) -> None: try: - fmap = file_contents_ro_filepath( - self._path, stream=True, allow_mmap=True) + fmap = file_contents_ro_filepath(self._path, stream=True, allow_mmap=True) except OSError: # it is possible and allowed that the file doesn't exist ! return @@ -175,7 +184,7 @@ class RefLog(List[RefLogEntry], Serializable): # { Interface @classmethod - def from_file(cls, filepath: PathLike) -> 'RefLog': + def from_file(cls, filepath: PathLike) -> "RefLog": """ :return: a new RefLog instance containing all entries from the reflog at the given filepath @@ -184,7 +193,7 @@ class RefLog(List[RefLogEntry], Serializable): return cls(filepath) @classmethod - def path(cls, ref: 'SymbolicReference') -> str: + def path(cls, ref: "SymbolicReference") -> str: """ :return: string to absolute path at which the reflog of the given ref instance would be found. The path is not guaranteed to point to a valid @@ -193,7 +202,7 @@ class RefLog(List[RefLogEntry], Serializable): return osp.join(ref.repo.git_dir, "logs", to_native_path(ref.path)) @classmethod - def iter_entries(cls, stream: Union[str, 'BytesIO', mmap]) -> Iterator[RefLogEntry]: + def iter_entries(cls, stream: Union[str, "BytesIO", mmap]) -> Iterator[RefLogEntry]: """ :return: Iterator yielding RefLogEntry instances, one for each line read sfrom the given stream. @@ -215,7 +224,7 @@ class RefLog(List[RefLogEntry], Serializable): # END endless loop @classmethod - def entry_at(cls, filepath: PathLike, index: int) -> 'RefLogEntry': + def entry_at(cls, filepath: PathLike, index: int) -> "RefLogEntry": """ :return: RefLogEntry at the given index @@ -230,7 +239,7 @@ class RefLog(List[RefLogEntry], Serializable): all other lines. Nonetheless, the whole file has to be read if the index is negative """ - with open(filepath, 'rb') as fp: + with open(filepath, "rb") as fp: if index < 0: return RefLogEntry.from_line(fp.readlines()[index].strip()) # read until index is reached @@ -239,7 +248,8 @@ class RefLog(List[RefLogEntry], Serializable): line = fp.readline() if not line: raise IndexError( - f"Index file ended at line {i+1}, before given index was reached") + f"Index file ended at line {i+1}, before given index was reached" + ) # END abort on eof # END handle runup @@ -263,9 +273,15 @@ class RefLog(List[RefLogEntry], Serializable): # END handle change @classmethod - def append_entry(cls, config_reader: Union[Actor, 'GitConfigParser', 'SectionConstraint', None], - filepath: PathLike, oldbinsha: bytes, newbinsha: bytes, message: str, - write: bool = True) -> 'RefLogEntry': + def append_entry( + cls, + config_reader: Union[Actor, "GitConfigParser", "SectionConstraint", None], + filepath: PathLike, + oldbinsha: bytes, + newbinsha: bytes, + message: str, + write: bool = True, + ) -> "RefLogEntry": """Append a new log entry to the revlog at filepath. :param config_reader: configuration reader of the repository - used to obtain @@ -286,21 +302,27 @@ class RefLog(List[RefLogEntry], Serializable): raise ValueError("Shas need to be given in binary format") # END handle sha type assure_directory_exists(filepath, is_file=True) - first_line = message.split('\n')[0] + first_line = message.split("\n")[0] if isinstance(config_reader, Actor): - committer = config_reader # mypy thinks this is Actor | Gitconfigparser, but why? + committer = ( + config_reader # mypy thinks this is Actor | Gitconfigparser, but why? + ) else: committer = Actor.committer(config_reader) - entry = RefLogEntry(( - bin_to_hex(oldbinsha).decode('ascii'), - bin_to_hex(newbinsha).decode('ascii'), - committer, (int(_time.time()), _time.altzone), first_line - )) + entry = RefLogEntry( + ( + bin_to_hex(oldbinsha).decode("ascii"), + bin_to_hex(newbinsha).decode("ascii"), + committer, + (int(_time.time()), _time.altzone), + first_line, + ) + ) if write: lf = LockFile(filepath) lf._obtain_lock_or_raise() - fd = open(filepath, 'ab') + fd = open(filepath, "ab") try: fd.write(entry.format().encode(defenc)) finally: @@ -309,12 +331,13 @@ class RefLog(List[RefLogEntry], Serializable): # END handle write operation return entry - def write(self) -> 'RefLog': + def write(self) -> "RefLog": """Write this instance's data to the file we are originating from :return: self""" if self._path is None: raise ValueError( - "Instance was not initialized with a path, use to_file(...) instead") + "Instance was not initialized with a path, use to_file(...) instead" + ) # END assert path self.to_file(self._path) return self @@ -322,7 +345,7 @@ class RefLog(List[RefLogEntry], Serializable): # } END interface # { Serializable Interface - def _serialize(self, stream: 'BytesIO') -> 'RefLog': + def _serialize(self, stream: "BytesIO") -> "RefLog": write = stream.write # write all entries @@ -331,7 +354,7 @@ class RefLog(List[RefLogEntry], Serializable): # END for each entry return self - def _deserialize(self, stream: 'BytesIO') -> 'RefLog': + def _deserialize(self, stream: "BytesIO") -> "RefLog": self.extend(self.iter_entries(stream)) - # } END serializable interface + # } END serializable interface return self |