diff options
Diffstat (limited to 'git/refs/log.py')
| -rw-r--r-- | git/refs/log.py | 117 | 
1 files changed, 70 insertions, 47 deletions
| diff --git a/git/refs/log.py b/git/refs/log.py index ddd78bc7..908f93d1 100644 --- a/git/refs/log.py +++ b/git/refs/log.py @@ -1,4 +1,3 @@ -  from mmap import mmap  import re  import time as _time @@ -16,7 +15,7 @@ from git.util import (      assure_directory_exists,      to_native_path,      bin_to_hex, -    file_contents_ro_filepath +    file_contents_ro_filepath,  )  import os.path as osp @@ -41,7 +40,8 @@ __all__ = ["RefLog", "RefLogEntry"]  class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):      """Named tuple allowing easy access to the revlog data fields""" -    _re_hexsha_only = re.compile('^[0-9A-Fa-f]{40}$') + +    _re_hexsha_only = re.compile("^[0-9A-Fa-f]{40}$")      __slots__ = ()      def __repr__(self) -> str: @@ -52,13 +52,15 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):          """:return: a string suitable to be placed in a reflog file"""          act = self.actor          time = self.time -        return "{} {} {} <{}> {!s} {}\t{}\n".format(self.oldhexsha, -                                                    self.newhexsha, -                                                    act.name, -                                                    act.email, -                                                    time[0], -                                                    altz_to_utctz_str(time[1]), -                                                    self.message) +        return "{} {} {} <{}> {!s} {}\t{}\n".format( +            self.oldhexsha, +            self.newhexsha, +            act.name, +            act.email, +            time[0], +            altz_to_utctz_str(time[1]), +            self.message, +        )      @property      def oldhexsha(self) -> str: @@ -80,7 +82,7 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):          """time as tuple:          * [0] = int(time) -        * [1] = int(timezone_offset) in time.altzone format """ +        * [1] = int(timezone_offset) in time.altzone format"""          return self[3]      @property @@ -89,8 +91,15 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):          return self[4]      @classmethod -    def new(cls, oldhexsha: str, newhexsha: str, actor: Actor, time: int, tz_offset: int, message: str -            ) -> 'RefLogEntry':  # skipcq: PYL-W0621 +    def new( +        cls, +        oldhexsha: str, +        newhexsha: str, +        actor: Actor, +        time: int, +        tz_offset: int, +        message: str, +    ) -> "RefLogEntry":  # skipcq: PYL-W0621          """:return: New instance of a RefLogEntry"""          if not isinstance(actor, Actor):              raise ValueError("Need actor instance, got %s" % actor) @@ -98,19 +107,21 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):          return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), message))      @classmethod -    def from_line(cls, line: bytes) -> 'RefLogEntry': +    def from_line(cls, line: bytes) -> "RefLogEntry":          """:return: New RefLogEntry instance from the given revlog line.          :param line: line bytes without trailing newline          :raise ValueError: If line could not be parsed"""          line_str = line.decode(defenc) -        fields = line_str.split('\t', 1) +        fields = line_str.split("\t", 1)          if len(fields) == 1:              info, msg = fields[0], None          elif len(fields) == 2:              info, msg = fields          else: -            raise ValueError("Line must have up to two TAB-separated fields." -                             " Got %s" % repr(line_str)) +            raise ValueError( +                "Line must have up to two TAB-separated fields." +                " Got %s" % repr(line_str) +            )          # END handle first split          oldhexsha = info[:40] @@ -121,14 +132,13 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):              # END if hexsha re doesn't match          # END for each hexsha -        email_end = info.find('>', 82) +        email_end = info.find(">", 82)          if email_end == -1:              raise ValueError("Missing token: >")          # END handle missing end brace -        actor = Actor._from_string(info[82:email_end + 1]) -        time, tz_offset = parse_date( -            info[email_end + 2:])   # skipcq: PYL-W0621 +        actor = Actor._from_string(info[82 : email_end + 1]) +        time, tz_offset = parse_date(info[email_end + 2 :])  # skipcq: PYL-W0621          return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), msg)) @@ -142,9 +152,9 @@ class RefLog(List[RefLogEntry], Serializable):      Reflog entries are ordered, the first added entry is first in the list, the last      entry, i.e. the last change of the head or reference, is last in the list.""" -    __slots__ = ('_path', ) +    __slots__ = ("_path",) -    def __new__(cls, filepath: Union[PathLike, None] = None) -> 'RefLog': +    def __new__(cls, filepath: Union[PathLike, None] = None) -> "RefLog":          inst = super(RefLog, cls).__new__(cls)          return inst @@ -159,8 +169,7 @@ class RefLog(List[RefLogEntry], Serializable):      def _read_from_file(self) -> None:          try: -            fmap = file_contents_ro_filepath( -                self._path, stream=True, allow_mmap=True) +            fmap = file_contents_ro_filepath(self._path, stream=True, allow_mmap=True)          except OSError:              # it is possible and allowed that the file doesn't exist !              return @@ -175,7 +184,7 @@ class RefLog(List[RefLogEntry], Serializable):      # { Interface      @classmethod -    def from_file(cls, filepath: PathLike) -> 'RefLog': +    def from_file(cls, filepath: PathLike) -> "RefLog":          """          :return: a new RefLog instance containing all entries from the reflog              at the given filepath @@ -184,7 +193,7 @@ class RefLog(List[RefLogEntry], Serializable):          return cls(filepath)      @classmethod -    def path(cls, ref: 'SymbolicReference') -> str: +    def path(cls, ref: "SymbolicReference") -> str:          """          :return: string to absolute path at which the reflog of the given ref              instance would be found. The path is not guaranteed to point to a valid @@ -193,7 +202,7 @@ class RefLog(List[RefLogEntry], Serializable):          return osp.join(ref.repo.git_dir, "logs", to_native_path(ref.path))      @classmethod -    def iter_entries(cls, stream: Union[str, 'BytesIO', mmap]) -> Iterator[RefLogEntry]: +    def iter_entries(cls, stream: Union[str, "BytesIO", mmap]) -> Iterator[RefLogEntry]:          """          :return: Iterator yielding RefLogEntry instances, one for each line read              sfrom the given stream. @@ -215,7 +224,7 @@ class RefLog(List[RefLogEntry], Serializable):          # END endless loop      @classmethod -    def entry_at(cls, filepath: PathLike, index: int) -> 'RefLogEntry': +    def entry_at(cls, filepath: PathLike, index: int) -> "RefLogEntry":          """          :return: RefLogEntry at the given index @@ -230,7 +239,7 @@ class RefLog(List[RefLogEntry], Serializable):              all other lines. Nonetheless, the whole file has to be read if              the index is negative          """ -        with open(filepath, 'rb') as fp: +        with open(filepath, "rb") as fp:              if index < 0:                  return RefLogEntry.from_line(fp.readlines()[index].strip())              # read until index is reached @@ -239,7 +248,8 @@ class RefLog(List[RefLogEntry], Serializable):                  line = fp.readline()                  if not line:                      raise IndexError( -                        f"Index file ended at line {i+1}, before given index was reached") +                        f"Index file ended at line {i+1}, before given index was reached" +                    )                  # END abort on eof              # END handle runup @@ -263,9 +273,15 @@ class RefLog(List[RefLogEntry], Serializable):          # END handle change      @classmethod -    def append_entry(cls, config_reader: Union[Actor, 'GitConfigParser', 'SectionConstraint', None], -                     filepath: PathLike, oldbinsha: bytes, newbinsha: bytes, message: str, -                     write: bool = True) -> 'RefLogEntry': +    def append_entry( +        cls, +        config_reader: Union[Actor, "GitConfigParser", "SectionConstraint", None], +        filepath: PathLike, +        oldbinsha: bytes, +        newbinsha: bytes, +        message: str, +        write: bool = True, +    ) -> "RefLogEntry":          """Append a new log entry to the revlog at filepath.          :param config_reader: configuration reader of the repository - used to obtain @@ -286,21 +302,27 @@ class RefLog(List[RefLogEntry], Serializable):              raise ValueError("Shas need to be given in binary format")          # END handle sha type          assure_directory_exists(filepath, is_file=True) -        first_line = message.split('\n')[0] +        first_line = message.split("\n")[0]          if isinstance(config_reader, Actor): -            committer = config_reader  # mypy thinks this is Actor | Gitconfigparser, but why? +            committer = ( +                config_reader  # mypy thinks this is Actor | Gitconfigparser, but why? +            )          else:              committer = Actor.committer(config_reader) -        entry = RefLogEntry(( -            bin_to_hex(oldbinsha).decode('ascii'), -            bin_to_hex(newbinsha).decode('ascii'), -            committer, (int(_time.time()), _time.altzone), first_line -        )) +        entry = RefLogEntry( +            ( +                bin_to_hex(oldbinsha).decode("ascii"), +                bin_to_hex(newbinsha).decode("ascii"), +                committer, +                (int(_time.time()), _time.altzone), +                first_line, +            ) +        )          if write:              lf = LockFile(filepath)              lf._obtain_lock_or_raise() -            fd = open(filepath, 'ab') +            fd = open(filepath, "ab")              try:                  fd.write(entry.format().encode(defenc))              finally: @@ -309,12 +331,13 @@ class RefLog(List[RefLogEntry], Serializable):              # END handle write operation          return entry -    def write(self) -> 'RefLog': +    def write(self) -> "RefLog":          """Write this instance's data to the file we are originating from          :return: self"""          if self._path is None:              raise ValueError( -                "Instance was not initialized with a path, use to_file(...) instead") +                "Instance was not initialized with a path, use to_file(...) instead" +            )          # END assert path          self.to_file(self._path)          return self @@ -322,7 +345,7 @@ class RefLog(List[RefLogEntry], Serializable):      # } END interface      # { Serializable Interface -    def _serialize(self, stream: 'BytesIO') -> 'RefLog': +    def _serialize(self, stream: "BytesIO") -> "RefLog":          write = stream.write          # write all entries @@ -331,7 +354,7 @@ class RefLog(List[RefLogEntry], Serializable):          # END for each entry          return self -    def _deserialize(self, stream: 'BytesIO') -> 'RefLog': +    def _deserialize(self, stream: "BytesIO") -> "RefLog":          self.extend(self.iter_entries(stream)) -    # } END serializable interface +        # } END serializable interface          return self | 
