diff options
-rw-r--r-- | git/index/base.py | 31 | ||||
-rw-r--r-- | git/index/fun.py | 62 | ||||
-rw-r--r-- | git/index/util.py | 13 | ||||
-rw-r--r-- | git/objects/fun.py | 68 | ||||
-rw-r--r-- | git/objects/tree.py | 20 | ||||
-rw-r--r-- | git/types.py | 10 |
6 files changed, 123 insertions, 81 deletions
diff --git a/git/index/base.py b/git/index/base.py index d6670b2a..1812faee 100644 --- a/git/index/base.py +++ b/git/index/base.py @@ -18,6 +18,7 @@ from git.compat import ( from git.exc import ( GitCommandError, CheckoutError, + GitError, InvalidGitRepositoryError ) from git.objects import ( @@ -66,10 +67,10 @@ from .util import ( # typing ----------------------------------------------------------------------------- -from typing import (Any, BinaryIO, Callable, Dict, IO, Iterable, Iterator, List, +from typing import (Any, BinaryIO, Callable, Dict, IO, Iterable, Iterator, List, NoReturn, Sequence, TYPE_CHECKING, Tuple, Union) -from git.types import PathLike, TBD +from git.types import Commit_ish, PathLike, TBD if TYPE_CHECKING: from subprocess import Popen @@ -372,13 +373,13 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # UTILITIES @unbare_repo - def _iter_expand_paths(self, paths: Sequence[PathLike]) -> Iterator[PathLike]: + def _iter_expand_paths(self: 'IndexFile', paths: Sequence[PathLike]) -> Iterator[PathLike]: """Expand the directories in list of paths to the corresponding paths accordingly, Note: git will add items multiple times even if a glob overlapped with manually specified paths or if paths where specified multiple times - we respect that and do not prune""" - def raise_exc(e): + def raise_exc(e: Exception) -> NoReturn: raise e r = str(self.repo.working_tree_dir) rs = r + os.sep @@ -426,7 +427,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # END path exception handling # END for each path - def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item, fmakeexc, fprogress, + def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item: TBD, fmakeexc: Callable[..., GitError], + fprogress: Callable[[PathLike, bool, TBD], None], read_from_stdout: bool = True) -> Union[None, str]: """Write path to proc.stdin and make sure it processes the item, including progress. @@ -498,7 +500,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): line.sort() return path_map - @classmethod + @ classmethod def entry_key(cls, *entry: Union[BaseIndexEntry, PathLike, StageType]) -> Tuple[PathLike, StageType]: return entry_key(*entry) @@ -631,8 +633,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): return BaseIndexEntry((stat_mode_to_index_mode(st.st_mode), istream.binsha, 0, to_native_path_linux(filepath))) - @unbare_repo - @git_working_dir + @ unbare_repo + @ git_working_dir def _entries_for_paths(self, paths: List[str], path_rewriter: Callable, fprogress: Callable, entries: List[BaseIndexEntry]) -> List[BaseIndexEntry]: entries_added: List[BaseIndexEntry] = [] @@ -788,8 +790,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # create objects if required, otherwise go with the existing shas null_entries_indices = [i for i, e in enumerate(entries) if e.binsha == Object.NULL_BIN_SHA] if null_entries_indices: - @git_working_dir - def handle_null_entries(self): + @ git_working_dir + def handle_null_entries(self: 'IndexFile') -> None: for ei in null_entries_indices: null_entry = entries[ei] new_entry = self._store_path(null_entry.path, fprogress) @@ -969,8 +971,13 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): return out - def commit(self, message: str, parent_commits=None, head: bool = True, author: Union[None, 'Actor'] = None, - committer: Union[None, 'Actor'] = None, author_date: Union[str, None] = None, + def commit(self, + message: str, + parent_commits: Union[Commit_ish, None] = None, + head: bool = True, + author: Union[None, 'Actor'] = None, + committer: Union[None, 'Actor'] = None, + author_date: Union[str, None] = None, commit_date: Union[str, None] = None, skip_hooks: bool = False) -> Commit: """Commit the current default index file, creating a commit object. diff --git a/git/index/fun.py b/git/index/fun.py index ffd109b1..74f6efbf 100644 --- a/git/index/fun.py +++ b/git/index/fun.py @@ -57,6 +57,7 @@ from git.types import PathLike, TypeGuard if TYPE_CHECKING: from .base import IndexFile + from git.objects.fun import EntryTup # ------------------------------------------------------------------------------------ @@ -188,7 +189,7 @@ def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, i def is_entry_tuple(entry: Tuple) -> TypeGuard[Tuple[PathLike, int]]: return isinstance(entry, tuple) and len(entry) == 2 - + if len(entry) == 1: entry_first = entry[0] assert isinstance(entry_first, BaseIndexEntry) @@ -259,8 +260,8 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 :param sl: slice indicating the range we should process on the entries list :return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name""" - tree_items = [] # type: List[Tuple[Union[bytes, str], int, str]] - tree_items_append = tree_items.append + tree_items: List[Tuple[bytes, int, str]] = [] + ci = sl.start end = sl.stop while ci < end: @@ -272,7 +273,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 rbound = entry.path.find('/', si) if rbound == -1: # its not a tree - tree_items_append((entry.binsha, entry.mode, entry.path[si:])) + tree_items.append((entry.binsha, entry.mode, entry.path[si:])) else: # find common base range base = entry.path[si:rbound] @@ -289,7 +290,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 # enter recursion # ci - 1 as we want to count our current item as well sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1) - tree_items_append((sha, S_IFDIR, base)) + tree_items.append((sha, S_IFDIR, base)) # skip ahead ci = xi @@ -306,7 +307,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 return (istream.binsha, tree_items_stringified) -def _tree_entry_to_baseindexentry(tree_entry: Tuple[str, int, str], stage: int) -> BaseIndexEntry: +def _tree_entry_to_baseindexentry(tree_entry: Tuple[bytes, int, str], stage: int) -> BaseIndexEntry: return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])) @@ -319,14 +320,13 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr :param tree_shas: 1, 2 or 3 trees as identified by their binary 20 byte shas If 1 or two, the entries will effectively correspond to the last given tree If 3 are given, a 3 way merge is performed""" - out = [] # type: List[BaseIndexEntry] - out_append = out.append + out: List[BaseIndexEntry] = [] # one and two way is the same for us, as we don't have to handle an existing # index, instrea if len(tree_shas) in (1, 2): for entry in traverse_tree_recursive(odb, tree_shas[-1], ''): - out_append(_tree_entry_to_baseindexentry(entry, 0)) + out.append(_tree_entry_to_baseindexentry(entry, 0)) # END for each entry return out # END handle single tree @@ -334,8 +334,16 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr if len(tree_shas) > 3: raise ValueError("Cannot handle %i trees at once" % len(tree_shas)) + EntryTupOrNone = Union[EntryTup, None] + + def is_three_entry_list(inp) -> TypeGuard[List[EntryTupOrNone]]: + return isinstance(inp, list) and len(inp) == 3 + # three trees - for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ''): + for three_entries in traverse_trees_recursive(odb, tree_shas, ''): + + assert is_three_entry_list(three_entries) + base, ours, theirs = three_entries if base is not None: # base version exists if ours is not None: @@ -347,23 +355,23 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr if(base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or \ (base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1]): # changed by both - out_append(_tree_entry_to_baseindexentry(base, 1)) - out_append(_tree_entry_to_baseindexentry(ours, 2)) - out_append(_tree_entry_to_baseindexentry(theirs, 3)) + out.append(_tree_entry_to_baseindexentry(base, 1)) + out.append(_tree_entry_to_baseindexentry(ours, 2)) + out.append(_tree_entry_to_baseindexentry(theirs, 3)) elif base[0] != ours[0] or base[1] != ours[1]: # only we changed it - out_append(_tree_entry_to_baseindexentry(ours, 0)) + out.append(_tree_entry_to_baseindexentry(ours, 0)) else: # either nobody changed it, or they did. In either # case, use theirs - out_append(_tree_entry_to_baseindexentry(theirs, 0)) + out.append(_tree_entry_to_baseindexentry(theirs, 0)) # END handle modification else: if ours[0] != base[0] or ours[1] != base[1]: # they deleted it, we changed it, conflict - out_append(_tree_entry_to_baseindexentry(base, 1)) - out_append(_tree_entry_to_baseindexentry(ours, 2)) + out.append(_tree_entry_to_baseindexentry(base, 1)) + out.append(_tree_entry_to_baseindexentry(ours, 2)) # else: # we didn't change it, ignore # pass @@ -376,8 +384,8 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr else: if theirs[0] != base[0] or theirs[1] != base[1]: # deleted in ours, changed theirs, conflict - out_append(_tree_entry_to_baseindexentry(base, 1)) - out_append(_tree_entry_to_baseindexentry(theirs, 3)) + out.append(_tree_entry_to_baseindexentry(base, 1)) + out.append(_tree_entry_to_baseindexentry(theirs, 3)) # END theirs changed # else: # theirs didn't change @@ -386,20 +394,20 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr # END handle ours else: # all three can't be None - if ours is None: + if ours is None and theirs is not None: # added in their branch - out_append(_tree_entry_to_baseindexentry(theirs, 0)) - elif theirs is None: + out.append(_tree_entry_to_baseindexentry(theirs, 0)) + elif theirs is None and ours is not None: # added in our branch - out_append(_tree_entry_to_baseindexentry(ours, 0)) - else: + out.append(_tree_entry_to_baseindexentry(ours, 0)) + elif ours is not None and theirs is not None: # both have it, except for the base, see whether it changed if ours[0] != theirs[0] or ours[1] != theirs[1]: - out_append(_tree_entry_to_baseindexentry(ours, 2)) - out_append(_tree_entry_to_baseindexentry(theirs, 3)) + out.append(_tree_entry_to_baseindexentry(ours, 2)) + out.append(_tree_entry_to_baseindexentry(theirs, 3)) else: # it was added the same in both - out_append(_tree_entry_to_baseindexentry(ours, 0)) + out.append(_tree_entry_to_baseindexentry(ours, 0)) # END handle two items # END handle heads # END handle base exists diff --git a/git/index/util.py b/git/index/util.py index 3b3d6489..4f8af553 100644 --- a/git/index/util.py +++ b/git/index/util.py @@ -11,10 +11,13 @@ import os.path as osp # typing ---------------------------------------------------------------------- -from typing import (Any, Callable) +from typing import (Any, Callable, TYPE_CHECKING) from git.types import PathLike, _T +if TYPE_CHECKING: + from git.index import IndexFile + # --------------------------------------------------------------------------------- @@ -63,7 +66,7 @@ def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]: """ @wraps(func) - def post_clear_cache_if_not_raised(self, *args: Any, **kwargs: Any) -> _T: + def post_clear_cache_if_not_raised(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: rval = func(self, *args, **kwargs) self._delete_entries_cache() return rval @@ -78,7 +81,7 @@ def default_index(func: Callable[..., _T]) -> Callable[..., _T]: on that index only. """ @wraps(func) - def check_default_index(self, *args: Any, **kwargs: Any) -> _T: + def check_default_index(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: if self._file_path != self._index_path(): raise AssertionError( "Cannot call %r on indices that do not represent the default git index" % func.__name__) @@ -93,9 +96,9 @@ def git_working_dir(func: Callable[..., _T]) -> Callable[..., _T]: repository in order to assure relative paths are handled correctly""" @wraps(func) - def set_git_working_dir(self, *args: Any, **kwargs: Any) -> _T: + def set_git_working_dir(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: cur_wd = os.getcwd() - os.chdir(self.repo.working_tree_dir) + os.chdir(str(self.repo.working_tree_dir)) try: return func(self, *args, **kwargs) finally: diff --git a/git/objects/fun.py b/git/objects/fun.py index 339a53b8..89b02ad4 100644 --- a/git/objects/fun.py +++ b/git/objects/fun.py @@ -1,6 +1,8 @@ """Module with functions which are supposed to be as fast as possible""" from stat import S_ISDIR +from git import GitCmdObjectDB + from git.compat import ( safe_decode, defenc @@ -8,7 +10,12 @@ from git.compat import ( # typing ---------------------------------------------- -from typing import List, Tuple +from typing import Callable, List, Sequence, Tuple, TYPE_CHECKING, Union, overload + +if TYPE_CHECKING: + from _typeshed import ReadableBuffer + +EntryTup = Tuple[bytes, int, str] # same as TreeCacheTup in tree.py # --------------------------------------------------- @@ -18,7 +25,7 @@ __all__ = ('tree_to_stream', 'tree_entries_from_data', 'traverse_trees_recursive 'traverse_tree_recursive') -def tree_to_stream(entries, write): +def tree_to_stream(entries: Sequence[EntryTup], write: Callable[['ReadableBuffer'], Union[int, None]]) -> None: """Write the give list of entries into a stream using its write method :param entries: **sorted** list of tuples with (binsha, mode, name) :param write: write method which takes a data string""" @@ -42,12 +49,14 @@ def tree_to_stream(entries, write): # According to my tests, this is exactly what git does, that is it just # takes the input literally, which appears to be utf8 on linux. if isinstance(name, str): - name = name.encode(defenc) - write(b''.join((mode_str, b' ', name, b'\0', binsha))) + name_bytes = name.encode(defenc) + else: + name_bytes = name + write(b''.join((mode_str, b' ', name_bytes, b'\0', binsha))) # END for each item -def tree_entries_from_data(data: bytes) -> List[Tuple[bytes, int, str]]: +def tree_entries_from_data(data: bytes) -> List[EntryTup]: """Reads the binary representation of a tree and returns tuples of Tree items :param data: data block with tree data (as bytes) :return: list(tuple(binsha, mode, tree_relative_path), ...)""" @@ -93,36 +102,49 @@ def tree_entries_from_data(data: bytes) -> List[Tuple[bytes, int, str]]: return out -def _find_by_name(tree_data, name, is_dir, start_at): +def _find_by_name(tree_data: Sequence[Union[EntryTup, None]], name: str, is_dir: bool, start_at: int + ) -> Union[EntryTup, None]: """return data entry matching the given name and tree mode or None. Before the item is returned, the respective data item is set None in the tree_data list to mark it done""" + tree_data_list: List[Union[EntryTup, None]] = list(tree_data) try: - item = tree_data[start_at] + item = tree_data_list[start_at] if item and item[2] == name and S_ISDIR(item[1]) == is_dir: - tree_data[start_at] = None + tree_data_list[start_at] = None return item except IndexError: pass # END exception handling - for index, item in enumerate(tree_data): + for index, item in enumerate(tree_data_list): if item and item[2] == name and S_ISDIR(item[1]) == is_dir: - tree_data[index] = None + tree_data_list[index] = None return item # END if item matches # END for each item return None -def _to_full_path(item, path_prefix): +@ overload +def _to_full_path(item: None, path_prefix: str) -> None: + ... + + +@ overload +def _to_full_path(item: EntryTup, path_prefix: str) -> EntryTup: + ... + + +def _to_full_path(item: Union[EntryTup, None], path_prefix: str) -> Union[EntryTup, None]: """Rebuild entry with given path prefix""" if not item: return item return (item[0], item[1], path_prefix + item[2]) -def traverse_trees_recursive(odb, tree_shas, path_prefix): +def traverse_trees_recursive(odb: GitCmdObjectDB, tree_shas: Sequence[Union[bytes, None]], + path_prefix: str) -> List[Union[EntryTup, None]]: """ :return: list with entries according to the given binary tree-shas. The result is encoded in a list @@ -137,28 +159,29 @@ def traverse_trees_recursive(odb, tree_shas, path_prefix): :param path_prefix: a prefix to be added to the returned paths on this level, set it '' for the first iteration :note: The ordering of the returned items will be partially lost""" - trees_data = [] + trees_data: List[List[Union[EntryTup, None]]] = [] nt = len(tree_shas) for tree_sha in tree_shas: if tree_sha is None: - data = [] + data: List[Union[EntryTup, None]] = [] else: - data = tree_entries_from_data(odb.stream(tree_sha).read()) + data = list(tree_entries_from_data(odb.stream(tree_sha).read())) # make new list for typing as invariant # END handle muted trees trees_data.append(data) # END for each sha to get data for out = [] - out_append = out.append # find all matching entries and recursively process them together if the match # is a tree. If the match is a non-tree item, put it into the result. # Processed items will be set None for ti, tree_data in enumerate(trees_data): + for ii, item in enumerate(tree_data): if not item: continue # END skip already done items + entries: List[Union[EntryTup, None]] entries = [None for _ in range(nt)] entries[ti] = item _sha, mode, name = item @@ -169,17 +192,20 @@ def traverse_trees_recursive(odb, tree_shas, path_prefix): # ti+nt, not ti+1+nt for tio in range(ti + 1, ti + nt): tio = tio % nt - entries[tio] = _find_by_name(trees_data[tio], name, is_dir, ii) - # END for each other item data + td = trees_data[tio] + entries[tio] = _find_by_name(td, name, is_dir, ii) + # END for each other item data +#Revealed type is "builtins.list[Union[Tuple[builtins.bytes, builtins.int, builtins.str], None]]"## # +#Revealed type is "builtins.list[Union[Tuple[builtins.bytes, builtins.int, builtins.str], None]]" # if we are a directory, enter recursion if is_dir: out.extend(traverse_trees_recursive( odb, [((ei and ei[0]) or None) for ei in entries], path_prefix + name + '/')) else: - out_append(tuple(_to_full_path(e, path_prefix) for e in entries)) - # END handle recursion + out.extend([_to_full_path(e, path_prefix) for e in entries]) + # END handle recursion # finally mark it done tree_data[ii] = None # END for each item @@ -190,7 +216,7 @@ def traverse_trees_recursive(odb, tree_shas, path_prefix): return out -def traverse_tree_recursive(odb, tree_sha, path_prefix): +def traverse_tree_recursive(odb: GitCmdObjectDB, tree_sha: bytes, path_prefix: str) -> List[Tuple[bytes, int, str]]: """ :return: list of entries of the tree pointed to by the binary tree_sha. An entry has the following format: diff --git a/git/objects/tree.py b/git/objects/tree.py index 34fb93dc..528cf5ca 100644 --- a/git/objects/tree.py +++ b/git/objects/tree.py @@ -21,8 +21,8 @@ from .fun import ( # typing ------------------------------------------------- -from typing import (Callable, Dict, Generic, Iterable, Iterator, List, - Tuple, Type, TypeVar, Union, cast, TYPE_CHECKING) +from typing import (Callable, Dict, Iterable, Iterator, List, + Tuple, Type, Union, cast, TYPE_CHECKING) from git.types import PathLike, TypeGuard @@ -30,7 +30,7 @@ if TYPE_CHECKING: from git.repo import Repo from io import BytesIO -T_Tree_cache = TypeVar('T_Tree_cache', bound=Tuple[bytes, int, str]) +TreeCacheTup = Tuple[bytes, int, str] TraversedTreeTup = Union[Tuple[Union['Tree', None], IndexObjUnion, Tuple['Submodule', 'Submodule']]] @@ -42,7 +42,7 @@ cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b) __all__ = ("TreeModifier", "Tree") -def git_cmp(t1: T_Tree_cache, t2: T_Tree_cache) -> int: +def git_cmp(t1: TreeCacheTup, t2: TreeCacheTup) -> int: a, b = t1[2], t2[2] assert isinstance(a, str) and isinstance(b, str) # Need as mypy 9.0 cannot unpack TypeVar properly len_a, len_b = len(a), len(b) @@ -55,8 +55,8 @@ def git_cmp(t1: T_Tree_cache, t2: T_Tree_cache) -> int: return len_a - len_b -def merge_sort(a: List[T_Tree_cache], - cmp: Callable[[T_Tree_cache, T_Tree_cache], int]) -> None: +def merge_sort(a: List[TreeCacheTup], + cmp: Callable[[TreeCacheTup, TreeCacheTup], int]) -> None: if len(a) < 2: return None @@ -91,7 +91,7 @@ def merge_sort(a: List[T_Tree_cache], k = k + 1 -class TreeModifier(Generic[T_Tree_cache], object): +class TreeModifier(object): """A utility class providing methods to alter the underlying cache in a list-like fashion. @@ -99,7 +99,7 @@ class TreeModifier(Generic[T_Tree_cache], object): the cache of a tree, will be sorted. Assuring it will be in a serializable state""" __slots__ = '_cache' - def __init__(self, cache: List[T_Tree_cache]) -> None: + def __init__(self, cache: List[TreeCacheTup]) -> None: self._cache = cache def _index_by_name(self, name: str) -> int: @@ -141,7 +141,7 @@ class TreeModifier(Generic[T_Tree_cache], object): sha = to_bin_sha(sha) index = self._index_by_name(name) - def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[T_Tree_cache]: + def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[TreeCacheTup]: return isinstance(inp[0], bytes) and isinstance(inp[1], int) and isinstance([inp], str) item = (sha, mode, name) @@ -167,7 +167,7 @@ class TreeModifier(Generic[T_Tree_cache], object): For more information on the parameters, see ``add`` :param binsha: 20 byte binary sha""" assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str) - tree_cache = cast(T_Tree_cache, (binsha, mode, name)) + tree_cache = (binsha, mode, name) self._cache.append(tree_cache) diff --git a/git/types.py b/git/types.py index f15db3b4..ac1bb2c8 100644 --- a/git/types.py +++ b/git/types.py @@ -7,6 +7,8 @@ import sys from typing import (Callable, Dict, NoReturn, Tuple, Union, Any, Iterator, # noqa: F401 NamedTuple, TYPE_CHECKING, TypeVar) # noqa: F401 +if TYPE_CHECKING: + from git.repo import Repo if sys.version_info[:2] >= (3, 8): from typing import Final, Literal, SupportsIndex, TypedDict, Protocol # noqa: F401 @@ -71,9 +73,5 @@ class HSH_TD(TypedDict): files: Dict[PathLike, Files_TD] -# @runtime_checkable -class RepoLike(Protocol): - """Protocol class to allow structural type-checking of Repo - e.g. when cannot import due to circular imports""" - - def remotes(self): ... # NOQA: E704 +class Has_Repo(Protocol): + repo: 'Repo' |