diff options
author | Sebastian Thiel <sebastian.thiel@icloud.com> | 2021-07-20 07:04:18 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-07-20 07:04:18 +0800 |
commit | cfd653aeeb3cb807f2cd2ae9fff5568880ac67da (patch) | |
tree | 8514389e82efdcdcdeaab5d3a85553883f28117c /git/refs/log.py | |
parent | acbd6bad9ded9a1d59e80e71d334d64b0244f5cd (diff) | |
parent | 600df043e76924d43a4f9f88f4e3241740f34c77 (diff) | |
download | gitpython-cfd653aeeb3cb807f2cd2ae9fff5568880ac67da.tar.gz |
Merge pull request #1295 from Yobmod/main
Add types to refs
Diffstat (limited to 'git/refs/log.py')
-rw-r--r-- | git/refs/log.py | 140 |
1 files changed, 86 insertions, 54 deletions
diff --git a/git/refs/log.py b/git/refs/log.py index f850ba24..643b4114 100644 --- a/git/refs/log.py +++ b/git/refs/log.py @@ -1,5 +1,7 @@ + +from mmap import mmap import re -import time +import time as _time from git.compat import defenc from git.objects.util import ( @@ -20,20 +22,33 @@ from git.util import ( import os.path as osp +# typing ------------------------------------------------------------------ + +from typing import Iterator, List, Tuple, Union, TYPE_CHECKING + +from git.types import PathLike + +if TYPE_CHECKING: + from git.refs import SymbolicReference + from io import BytesIO + from git.config import GitConfigParser, SectionConstraint # NOQA + +# ------------------------------------------------------------------------------ + __all__ = ["RefLog", "RefLogEntry"] -class RefLogEntry(tuple): +class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]): """Named tuple allowing easy access to the revlog data fields""" _re_hexsha_only = re.compile('^[0-9A-Fa-f]{40}$') __slots__ = () - def __repr__(self): + def __repr__(self) -> str: """Representation of ourselves in git reflog format""" return self.format() - def format(self): + def format(self) -> str: """:return: a string suitable to be placed in a reflog file""" act = self.actor time = self.time @@ -46,22 +61,22 @@ class RefLogEntry(tuple): self.message) @property - def oldhexsha(self): + def oldhexsha(self) -> str: """The hexsha to the commit the ref pointed to before the change""" return self[0] @property - def newhexsha(self): + def newhexsha(self) -> str: """The hexsha to the commit the ref now points to, after the change""" return self[1] @property - def actor(self): + def actor(self) -> Actor: """Actor instance, providing access""" return self[2] @property - def time(self): + def time(self) -> Tuple[int, int]: """time as tuple: * [0] = int(time) @@ -69,12 +84,13 @@ class RefLogEntry(tuple): return self[3] @property - def message(self): + def message(self) -> str: """Message describing the operation that acted on the reference""" return self[4] @classmethod - def new(cls, oldhexsha, newhexsha, actor, time, tz_offset, message): # skipcq: PYL-W0621 + def new(cls, oldhexsha: str, newhexsha: str, actor: Actor, time: int, tz_offset: int, message: str + ) -> 'RefLogEntry': # skipcq: PYL-W0621 """:return: New instance of a RefLogEntry""" if not isinstance(actor, Actor): raise ValueError("Need actor instance, got %s" % actor) @@ -111,14 +127,15 @@ class RefLogEntry(tuple): # END handle missing end brace actor = Actor._from_string(info[82:email_end + 1]) - time, tz_offset = parse_date(info[email_end + 2:]) # skipcq: PYL-W0621 + time, tz_offset = parse_date( + info[email_end + 2:]) # skipcq: PYL-W0621 return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), msg)) -class RefLog(list, Serializable): +class RefLog(List[RefLogEntry], Serializable): - """A reflog contains reflog entries, each of which defines a certain state + """A reflog contains RefLogEntrys, each of which defines a certain state of the head in question. Custom query methods allow to retrieve log entries by date or by other criteria. @@ -127,11 +144,11 @@ class RefLog(list, Serializable): __slots__ = ('_path', ) - def __new__(cls, filepath=None): + def __new__(cls, filepath: Union[PathLike, None] = None) -> 'RefLog': inst = super(RefLog, cls).__new__(cls) return inst - def __init__(self, filepath=None): + def __init__(self, filepath: Union[PathLike, None] = None): """Initialize this instance with an optional filepath, from which we will initialize our data. The path is also used to write changes back using the write() method""" @@ -142,7 +159,8 @@ class RefLog(list, Serializable): def _read_from_file(self): try: - fmap = file_contents_ro_filepath(self._path, stream=True, allow_mmap=True) + fmap = file_contents_ro_filepath( + self._path, stream=True, allow_mmap=True) except OSError: # it is possible and allowed that the file doesn't exist ! return @@ -154,10 +172,10 @@ class RefLog(list, Serializable): fmap.close() # END handle closing of handle - #{ Interface + # { Interface @classmethod - def from_file(cls, filepath): + def from_file(cls, filepath: PathLike) -> 'RefLog': """ :return: a new RefLog instance containing all entries from the reflog at the given filepath @@ -166,7 +184,7 @@ class RefLog(list, Serializable): return cls(filepath) @classmethod - def path(cls, ref): + def path(cls, ref: 'SymbolicReference') -> str: """ :return: string to absolute path at which the reflog of the given ref instance would be found. The path is not guaranteed to point to a valid @@ -175,28 +193,34 @@ class RefLog(list, Serializable): return osp.join(ref.repo.git_dir, "logs", to_native_path(ref.path)) @classmethod - def iter_entries(cls, stream): + def iter_entries(cls, stream: Union[str, 'BytesIO', mmap]) -> Iterator[RefLogEntry]: """ :return: Iterator yielding RefLogEntry instances, one for each line read sfrom the given stream. :param stream: file-like object containing the revlog in its native format - or basestring instance pointing to a file to read""" + or string instance pointing to a file to read""" new_entry = RefLogEntry.from_line if isinstance(stream, str): - stream = file_contents_ro_filepath(stream) + # default args return mmap on py>3 + _stream = file_contents_ro_filepath(stream) + assert isinstance(_stream, mmap) + else: + _stream = stream # END handle stream type while True: - line = stream.readline() + line = _stream.readline() if not line: return yield new_entry(line.strip()) # END endless loop - stream.close() @classmethod - def entry_at(cls, filepath, index): - """:return: RefLogEntry at the given index + def entry_at(cls, filepath: PathLike, index: int) -> 'RefLogEntry': + """ + :return: RefLogEntry at the given index + :param filepath: full path to the index file from which to read the entry + :param index: python list compatible index, i.e. it may be negative to specify an entry counted from the end of the list @@ -210,21 +234,19 @@ class RefLog(list, Serializable): if index < 0: return RefLogEntry.from_line(fp.readlines()[index].strip()) # read until index is reached + for i in range(index + 1): line = fp.readline() if not line: - break + raise IndexError( + f"Index file ended at line {i+1}, before given index was reached") # END abort on eof # END handle runup - if i != index or not line: # skipcq:PYL-W0631 - raise IndexError - # END handle exception - return RefLogEntry.from_line(line.strip()) # END handle index - def to_file(self, filepath): + def to_file(self, filepath: PathLike) -> None: """Write the contents of the reflog instance to a file at the given filepath. :param filepath: path to file, parent directories are assumed to exist""" lfd = LockedFD(filepath) @@ -241,65 +263,75 @@ class RefLog(list, Serializable): # END handle change @classmethod - def append_entry(cls, config_reader, filepath, oldbinsha, newbinsha, message): + def append_entry(cls, config_reader: Union[Actor, 'GitConfigParser', 'SectionConstraint', None], + filepath: PathLike, oldbinsha: bytes, newbinsha: bytes, message: str, + write: bool = True) -> 'RefLogEntry': """Append a new log entry to the revlog at filepath. :param config_reader: configuration reader of the repository - used to obtain - user information. May also be an Actor instance identifying the committer directly. - May also be None + user information. May also be an Actor instance identifying the committer directly or None. :param filepath: full path to the log file :param oldbinsha: binary sha of the previous commit :param newbinsha: binary sha of the current commit :param message: message describing the change to the reference :param write: If True, the changes will be written right away. Otherwise the change will not be written + :return: RefLogEntry objects which was appended to the log + :note: As we are append-only, concurrent access is not a problem as we do not interfere with readers.""" + if len(oldbinsha) != 20 or len(newbinsha) != 20: raise ValueError("Shas need to be given in binary format") # END handle sha type assure_directory_exists(filepath, is_file=True) first_line = message.split('\n')[0] - committer = isinstance(config_reader, Actor) and config_reader or Actor.committer(config_reader) + if isinstance(config_reader, Actor): + committer = config_reader # mypy thinks this is Actor | Gitconfigparser, but why? + else: + committer = Actor.committer(config_reader) entry = RefLogEntry(( bin_to_hex(oldbinsha).decode('ascii'), bin_to_hex(newbinsha).decode('ascii'), - committer, (int(time.time()), time.altzone), first_line + committer, (int(_time.time()), _time.altzone), first_line )) - lf = LockFile(filepath) - lf._obtain_lock_or_raise() - fd = open(filepath, 'ab') - try: - fd.write(entry.format().encode(defenc)) - finally: - fd.close() - lf._release_lock() - # END handle write operation - + if write: + lf = LockFile(filepath) + lf._obtain_lock_or_raise() + fd = open(filepath, 'ab') + try: + fd.write(entry.format().encode(defenc)) + finally: + fd.close() + lf._release_lock() + # END handle write operation return entry - def write(self): + def write(self) -> 'RefLog': """Write this instance's data to the file we are originating from :return: self""" if self._path is None: - raise ValueError("Instance was not initialized with a path, use to_file(...) instead") + raise ValueError( + "Instance was not initialized with a path, use to_file(...) instead") # END assert path self.to_file(self._path) return self - #} END interface + # } END interface - #{ Serializable Interface - def _serialize(self, stream): + # { Serializable Interface + def _serialize(self, stream: 'BytesIO') -> 'RefLog': write = stream.write # write all entries for e in self: write(e.format().encode(defenc)) # END for each entry + return self - def _deserialize(self, stream): + def _deserialize(self, stream: 'BytesIO') -> 'RefLog': self.extend(self.iter_entries(stream)) - #} END serializable interface + # } END serializable interface + return self |