From 025fe17da390c410e5bae4d6db0832afbfa26442 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 17 May 2021 13:11:57 +0100 Subject: add types to index.fun.py --- git/index/fun.py | 77 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 46 insertions(+), 31 deletions(-) (limited to 'git/index/fun.py') diff --git a/git/index/fun.py b/git/index/fun.py index cc43f0a4..95dc3d56 100644 --- a/git/index/fun.py +++ b/git/index/fun.py @@ -1,7 +1,7 @@ # Contains standalone functions to accompany the index implementation and make it # more versatile # NOTE: Autodoc hates it if this is a docstring -from git.types import PathLike + from io import BytesIO import os from stat import ( @@ -13,7 +13,6 @@ from stat import ( S_IFREG, ) import subprocess -from typing import List, Tuple, Union, cast from git.cmd import PROC_CREATIONFLAGS, handle_process_output from git.compat import ( @@ -49,6 +48,17 @@ from .util import ( unpack ) +# typing ----------------------------------------------------------------------------- + +from typing import (Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast) + +from git.types import PathLike + +if TYPE_CHECKING: + from .base import IndexFile + +# ------------------------------------------------------------------------------------ + S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule CE_NAMEMASK_INV = ~CE_NAMEMASK @@ -57,12 +67,12 @@ __all__ = ('write_cache', 'read_cache', 'write_tree_from_cache', 'entry_key', 'stat_mode_to_index_mode', 'S_IFGITLINK', 'run_commit_hook', 'hook_path') -def hook_path(name, git_dir): +def hook_path(name: str, git_dir: PathLike) -> str: """:return: path to the given named hook in the given git repository directory""" return osp.join(git_dir, 'hooks', name) -def run_commit_hook(name, index, *args): +def run_commit_hook(name: str, index: IndexFile, *args: str) -> None: """Run the commit hook of the given name. Silently ignores hooks that do not exist. :param name: name of hook, like 'pre-commit' :param index: IndexFile instance @@ -70,10 +80,10 @@ def run_commit_hook(name, index, *args): :raises HookExecutionError: """ hp = hook_path(name, index.repo.git_dir) if not os.access(hp, os.X_OK): - return + return None env = os.environ.copy() - env['GIT_INDEX_FILE'] = safe_decode(index.path) + env['GIT_INDEX_FILE'] = safe_decode(str(index.path)) env['GIT_EDITOR'] = ':' try: cmd = subprocess.Popen([hp] + list(args), @@ -86,14 +96,14 @@ def run_commit_hook(name, index, *args): except Exception as ex: raise HookExecutionError(hp, ex) from ex else: - stdout = [] - stderr = [] - handle_process_output(cmd, stdout.append, stderr.append, finalize_process) - stdout = ''.join(stdout) - stderr = ''.join(stderr) + stdout_list = [] # type: List[str] + stderr_list = [] # type: List[str] + handle_process_output(cmd, stdout_list.append, stderr_list.append, finalize_process) + stdout_str = ''.join(stderr_list) + stderr_str = ''.join(stderr_list) if cmd.returncode != 0: - stdout = force_text(stdout, defenc) - stderr = force_text(stderr, defenc) + stdout = force_text(stdout_str, defenc) + stderr = force_text(stderr_str, defenc) raise HookExecutionError(hp, cmd.returncode, stderr, stdout) # end handle return code @@ -108,7 +118,9 @@ def stat_mode_to_index_mode(mode): return S_IFREG | 0o644 | (mode & 0o111) # blobs with or without executable bit -def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1Writer): +def write_cache(entries: Sequence[Union[BaseIndexEntry, 'IndexEntry']], stream: IO[bytes], + extension_data: Union[None, bytes] = None, + ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer) -> None: """Write the cache represented by entries to a stream :param entries: **sorted** list of entries @@ -121,10 +133,10 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1 :param extension_data: any kind of data to write as a trailer, it must begin a 4 byte identifier, followed by its size ( 4 bytes )""" # wrap the stream into a compatible writer - stream = ShaStreamCls(stream) + stream_sha = ShaStreamCls(stream) - tell = stream.tell - write = stream.write + tell = stream_sha.tell + write = stream_sha.write # header version = 2 @@ -136,8 +148,8 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1 beginoffset = tell() write(entry[4]) # ctime write(entry[5]) # mtime - path = entry[3] - path = force_bytes(path, encoding=defenc) + path_str = entry[3] # type: str + path = force_bytes(path_str, encoding=defenc) plen = len(path) & CE_NAMEMASK # path length assert plen == len(path), "Path %s too long to fit into index" % entry[3] flags = plen | (entry[2] & CE_NAMEMASK_INV) # clear possible previous values @@ -150,18 +162,19 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1 # write previously cached extensions data if extension_data is not None: - stream.write(extension_data) + stream_sha.write(extension_data) # write the sha over the content - stream.write_sha() + stream_sha.write_sha() -def read_header(stream): +def read_header(stream: IO[bytes]) -> Tuple[int, int]: """Return tuple(version_long, num_entries) from the given stream""" type_id = stream.read(4) if type_id != b"DIRC": raise AssertionError("Invalid index file header: %r" % type_id) - version, num_entries = unpack(">LL", stream.read(4 * 2)) + unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2))) + version, num_entries = unpacked # TODO: handle version 3: extended data, see read-cache.c assert version in (1, 2) @@ -180,7 +193,7 @@ def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, i # END handle entry -def read_cache(stream): +def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'IndexEntry'], bytes, bytes]: """Read a cache file from the given stream :return: tuple(version, entries_dict, extension_data, content_sha) * version is the integer version number @@ -189,7 +202,7 @@ def read_cache(stream): * content_sha is a 20 byte sha on all cache file contents""" version, num_entries = read_header(stream) count = 0 - entries = {} + entries = {} # type: Dict[Tuple[PathLike, int], 'IndexEntry'] read = stream.read tell = stream.tell @@ -228,7 +241,8 @@ def read_cache(stream): return (version, entries, extension_data, content_sha) -def write_tree_from_cache(entries, odb, sl, si=0): +def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 + ) -> Tuple[bytes, List[Tuple[str, int, str]]]: """Create a tree from the given sorted list of entries and put the respective trees into the given object database @@ -238,7 +252,7 @@ def write_tree_from_cache(entries, odb, sl, si=0): :param sl: slice indicating the range we should process on the entries list :return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name""" - tree_items = [] + tree_items = [] # type: List[Tuple[Union[bytes, str], int, str]] tree_items_append = tree_items.append ci = sl.start end = sl.stop @@ -277,18 +291,19 @@ def write_tree_from_cache(entries, odb, sl, si=0): # finally create the tree sio = BytesIO() - tree_to_stream(tree_items, sio.write) + tree_to_stream(tree_items, sio.write) # converts bytes of each item[0] to str + tree_items_stringified = cast(List[Tuple[str, int, str]], tree_items) # type: List[Tuple[str, int, str]] sio.seek(0) istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio)) - return (istream.binsha, tree_items) + return (istream.binsha, tree_items_stringified) -def _tree_entry_to_baseindexentry(tree_entry, stage): +def _tree_entry_to_baseindexentry(tree_entry: Tuple[str, int, str], stage: int) -> BaseIndexEntry: return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])) -def aggressive_tree_merge(odb, tree_shas) -> List[BaseIndexEntry]: +def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]: """ :return: list of BaseIndexEntries representing the aggressive merge of the given trees. All valid entries are on stage 0, whereas the conflicting ones are left -- cgit v1.2.1