summaryrefslogtreecommitdiff
path: root/git/index/fun.py
diff options
context:
space:
mode:
authorSebastian Thiel <sebastian.thiel@icloud.com>2021-05-18 08:29:52 +0800
committerGitHub <noreply@github.com>2021-05-18 08:29:52 +0800
commitb11bcfa3df4d0b792823930bffae126fd12673f7 (patch)
tree85dc837ef6cadec84d232f5e7110d2dfc44c3b1a /git/index/fun.py
parent33346b25c3a4fb5ea37202d88d6a6c66379099c5 (diff)
parentc30bf3ba7548a0e996907b9a097ec322760eb43a (diff)
downloadgitpython-b11bcfa3df4d0b792823930bffae126fd12673f7.tar.gz
Merge pull request #1244 from Yobmod/main
Added types to Index submodule
Diffstat (limited to 'git/index/fun.py')
-rw-r--r--git/index/fun.py82
1 files changed, 51 insertions, 31 deletions
diff --git a/git/index/fun.py b/git/index/fun.py
index e92e8e38..f40928c3 100644
--- a/git/index/fun.py
+++ b/git/index/fun.py
@@ -1,6 +1,7 @@
# Contains standalone functions to accompany the index implementation and make it
# more versatile
# NOTE: Autodoc hates it if this is a docstring
+
from io import BytesIO
import os
from stat import (
@@ -47,6 +48,17 @@ from .util import (
unpack
)
+# typing -----------------------------------------------------------------------------
+
+from typing import (Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast)
+
+from git.types import PathLike
+
+if TYPE_CHECKING:
+ from .base import IndexFile
+
+# ------------------------------------------------------------------------------------
+
S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule
CE_NAMEMASK_INV = ~CE_NAMEMASK
@@ -55,12 +67,12 @@ __all__ = ('write_cache', 'read_cache', 'write_tree_from_cache', 'entry_key',
'stat_mode_to_index_mode', 'S_IFGITLINK', 'run_commit_hook', 'hook_path')
-def hook_path(name, git_dir):
+def hook_path(name: str, git_dir: PathLike) -> str:
""":return: path to the given named hook in the given git repository directory"""
return osp.join(git_dir, 'hooks', name)
-def run_commit_hook(name, index, *args):
+def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None:
"""Run the commit hook of the given name. Silently ignores hooks that do not exist.
:param name: name of hook, like 'pre-commit'
:param index: IndexFile instance
@@ -68,10 +80,10 @@ def run_commit_hook(name, index, *args):
:raises HookExecutionError: """
hp = hook_path(name, index.repo.git_dir)
if not os.access(hp, os.X_OK):
- return
+ return None
env = os.environ.copy()
- env['GIT_INDEX_FILE'] = safe_decode(index.path)
+ env['GIT_INDEX_FILE'] = safe_decode(str(index.path))
env['GIT_EDITOR'] = ':'
try:
cmd = subprocess.Popen([hp] + list(args),
@@ -84,11 +96,11 @@ def run_commit_hook(name, index, *args):
except Exception as ex:
raise HookExecutionError(hp, ex) from ex
else:
- stdout = []
- stderr = []
- handle_process_output(cmd, stdout.append, stderr.append, finalize_process)
- stdout = ''.join(stdout)
- stderr = ''.join(stderr)
+ stdout_list = [] # type: List[str]
+ stderr_list = [] # type: List[str]
+ handle_process_output(cmd, stdout_list.append, stderr_list.append, finalize_process)
+ stdout = ''.join(stdout_list)
+ stderr = ''.join(stderr_list)
if cmd.returncode != 0:
stdout = force_text(stdout, defenc)
stderr = force_text(stderr, defenc)
@@ -106,7 +118,9 @@ def stat_mode_to_index_mode(mode):
return S_IFREG | 0o644 | (mode & 0o111) # blobs with or without executable bit
-def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1Writer):
+def write_cache(entries: Sequence[Union[BaseIndexEntry, 'IndexEntry']], stream: IO[bytes],
+ extension_data: Union[None, bytes] = None,
+ ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer) -> None:
"""Write the cache represented by entries to a stream
:param entries: **sorted** list of entries
@@ -119,10 +133,10 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1
:param extension_data: any kind of data to write as a trailer, it must begin
a 4 byte identifier, followed by its size ( 4 bytes )"""
# wrap the stream into a compatible writer
- stream = ShaStreamCls(stream)
+ stream_sha = ShaStreamCls(stream)
- tell = stream.tell
- write = stream.write
+ tell = stream_sha.tell
+ write = stream_sha.write
# header
version = 2
@@ -134,8 +148,8 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1
beginoffset = tell()
write(entry[4]) # ctime
write(entry[5]) # mtime
- path = entry[3]
- path = force_bytes(path, encoding=defenc)
+ path_str = entry[3] # type: str
+ path = force_bytes(path_str, encoding=defenc)
plen = len(path) & CE_NAMEMASK # path length
assert plen == len(path), "Path %s too long to fit into index" % entry[3]
flags = plen | (entry[2] & CE_NAMEMASK_INV) # clear possible previous values
@@ -148,34 +162,38 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1
# write previously cached extensions data
if extension_data is not None:
- stream.write(extension_data)
+ stream_sha.write(extension_data)
# write the sha over the content
- stream.write_sha()
+ stream_sha.write_sha()
-def read_header(stream):
+def read_header(stream: IO[bytes]) -> Tuple[int, int]:
"""Return tuple(version_long, num_entries) from the given stream"""
type_id = stream.read(4)
if type_id != b"DIRC":
raise AssertionError("Invalid index file header: %r" % type_id)
- version, num_entries = unpack(">LL", stream.read(4 * 2))
+ unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2)))
+ version, num_entries = unpacked
# TODO: handle version 3: extended data, see read-cache.c
assert version in (1, 2)
return version, num_entries
-def entry_key(*entry):
+def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, int]:
""":return: Key suitable to be used for the index.entries dictionary
:param entry: One instance of type BaseIndexEntry or the path and the stage"""
if len(entry) == 1:
- return (entry[0].path, entry[0].stage)
- return tuple(entry)
+ entry_first = cast(BaseIndexEntry, entry[0]) # type: BaseIndexEntry
+ return (entry_first.path, entry_first.stage)
+ else:
+ entry = cast(Tuple[PathLike, int], tuple(entry))
+ return entry
# END handle entry
-def read_cache(stream):
+def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'IndexEntry'], bytes, bytes]:
"""Read a cache file from the given stream
:return: tuple(version, entries_dict, extension_data, content_sha)
* version is the integer version number
@@ -184,7 +202,7 @@ def read_cache(stream):
* content_sha is a 20 byte sha on all cache file contents"""
version, num_entries = read_header(stream)
count = 0
- entries = {}
+ entries = {} # type: Dict[Tuple[PathLike, int], 'IndexEntry']
read = stream.read
tell = stream.tell
@@ -223,7 +241,8 @@ def read_cache(stream):
return (version, entries, extension_data, content_sha)
-def write_tree_from_cache(entries, odb, sl, si=0):
+def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0
+ ) -> Tuple[bytes, List[Tuple[str, int, str]]]:
"""Create a tree from the given sorted list of entries and put the respective
trees into the given object database
@@ -233,7 +252,7 @@ def write_tree_from_cache(entries, odb, sl, si=0):
:param sl: slice indicating the range we should process on the entries list
:return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of
tree entries being a tuple of hexsha, mode, name"""
- tree_items = []
+ tree_items = [] # type: List[Tuple[Union[bytes, str], int, str]]
tree_items_append = tree_items.append
ci = sl.start
end = sl.stop
@@ -272,18 +291,19 @@ def write_tree_from_cache(entries, odb, sl, si=0):
# finally create the tree
sio = BytesIO()
- tree_to_stream(tree_items, sio.write)
+ tree_to_stream(tree_items, sio.write) # converts bytes of each item[0] to str
+ tree_items_stringified = cast(List[Tuple[str, int, str]], tree_items) # type: List[Tuple[str, int, str]]
sio.seek(0)
istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio))
- return (istream.binsha, tree_items)
+ return (istream.binsha, tree_items_stringified)
-def _tree_entry_to_baseindexentry(tree_entry, stage):
+def _tree_entry_to_baseindexentry(tree_entry: Tuple[str, int, str], stage: int) -> BaseIndexEntry:
return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2]))
-def aggressive_tree_merge(odb, tree_shas):
+def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]:
"""
:return: list of BaseIndexEntries representing the aggressive merge of the given
trees. All valid entries are on stage 0, whereas the conflicting ones are left
@@ -292,7 +312,7 @@ def aggressive_tree_merge(odb, tree_shas):
:param tree_shas: 1, 2 or 3 trees as identified by their binary 20 byte shas
If 1 or two, the entries will effectively correspond to the last given tree
If 3 are given, a 3 way merge is performed"""
- out = []
+ out = [] # type: List[BaseIndexEntry]
out_append = out.append
# one and two way is the same for us, as we don't have to handle an existing