diff options
| author | Yobmod <yobmod@gmail.com> | 2021-07-08 21:42:30 +0100 |
|---|---|---|
| committer | Yobmod <yobmod@gmail.com> | 2021-07-08 21:42:30 +0100 |
| commit | 5d3818ed3d51d400517a352b5b62e966164af8cf (patch) | |
| tree | f58e6f846ec24da39ac05f3f44a752f1addece80 /git/index | |
| parent | 2e2fe186d09272c3cb6c96467fff362deb90994f (diff) | |
| download | gitpython-5d3818ed3d51d400517a352b5b62e966164af8cf.tar.gz | |
Finish initial typing of index folder
Diffstat (limited to 'git/index')
| -rw-r--r-- | git/index/base.py | 31 | ||||
| -rw-r--r-- | git/index/fun.py | 62 | ||||
| -rw-r--r-- | git/index/util.py | 13 |
3 files changed, 62 insertions, 44 deletions
diff --git a/git/index/base.py b/git/index/base.py index d6670b2a..1812faee 100644 --- a/git/index/base.py +++ b/git/index/base.py @@ -18,6 +18,7 @@ from git.compat import ( from git.exc import ( GitCommandError, CheckoutError, + GitError, InvalidGitRepositoryError ) from git.objects import ( @@ -66,10 +67,10 @@ from .util import ( # typing ----------------------------------------------------------------------------- -from typing import (Any, BinaryIO, Callable, Dict, IO, Iterable, Iterator, List, +from typing import (Any, BinaryIO, Callable, Dict, IO, Iterable, Iterator, List, NoReturn, Sequence, TYPE_CHECKING, Tuple, Union) -from git.types import PathLike, TBD +from git.types import Commit_ish, PathLike, TBD if TYPE_CHECKING: from subprocess import Popen @@ -372,13 +373,13 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # UTILITIES @unbare_repo - def _iter_expand_paths(self, paths: Sequence[PathLike]) -> Iterator[PathLike]: + def _iter_expand_paths(self: 'IndexFile', paths: Sequence[PathLike]) -> Iterator[PathLike]: """Expand the directories in list of paths to the corresponding paths accordingly, Note: git will add items multiple times even if a glob overlapped with manually specified paths or if paths where specified multiple times - we respect that and do not prune""" - def raise_exc(e): + def raise_exc(e: Exception) -> NoReturn: raise e r = str(self.repo.working_tree_dir) rs = r + os.sep @@ -426,7 +427,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # END path exception handling # END for each path - def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item, fmakeexc, fprogress, + def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item: TBD, fmakeexc: Callable[..., GitError], + fprogress: Callable[[PathLike, bool, TBD], None], read_from_stdout: bool = True) -> Union[None, str]: """Write path to proc.stdin and make sure it processes the item, including progress. @@ -498,7 +500,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): line.sort() return path_map - @classmethod + @ classmethod def entry_key(cls, *entry: Union[BaseIndexEntry, PathLike, StageType]) -> Tuple[PathLike, StageType]: return entry_key(*entry) @@ -631,8 +633,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): return BaseIndexEntry((stat_mode_to_index_mode(st.st_mode), istream.binsha, 0, to_native_path_linux(filepath))) - @unbare_repo - @git_working_dir + @ unbare_repo + @ git_working_dir def _entries_for_paths(self, paths: List[str], path_rewriter: Callable, fprogress: Callable, entries: List[BaseIndexEntry]) -> List[BaseIndexEntry]: entries_added: List[BaseIndexEntry] = [] @@ -788,8 +790,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): # create objects if required, otherwise go with the existing shas null_entries_indices = [i for i, e in enumerate(entries) if e.binsha == Object.NULL_BIN_SHA] if null_entries_indices: - @git_working_dir - def handle_null_entries(self): + @ git_working_dir + def handle_null_entries(self: 'IndexFile') -> None: for ei in null_entries_indices: null_entry = entries[ei] new_entry = self._store_path(null_entry.path, fprogress) @@ -969,8 +971,13 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): return out - def commit(self, message: str, parent_commits=None, head: bool = True, author: Union[None, 'Actor'] = None, - committer: Union[None, 'Actor'] = None, author_date: Union[str, None] = None, + def commit(self, + message: str, + parent_commits: Union[Commit_ish, None] = None, + head: bool = True, + author: Union[None, 'Actor'] = None, + committer: Union[None, 'Actor'] = None, + author_date: Union[str, None] = None, commit_date: Union[str, None] = None, skip_hooks: bool = False) -> Commit: """Commit the current default index file, creating a commit object. diff --git a/git/index/fun.py b/git/index/fun.py index ffd109b1..74f6efbf 100644 --- a/git/index/fun.py +++ b/git/index/fun.py @@ -57,6 +57,7 @@ from git.types import PathLike, TypeGuard if TYPE_CHECKING: from .base import IndexFile + from git.objects.fun import EntryTup # ------------------------------------------------------------------------------------ @@ -188,7 +189,7 @@ def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, i def is_entry_tuple(entry: Tuple) -> TypeGuard[Tuple[PathLike, int]]: return isinstance(entry, tuple) and len(entry) == 2 - + if len(entry) == 1: entry_first = entry[0] assert isinstance(entry_first, BaseIndexEntry) @@ -259,8 +260,8 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 :param sl: slice indicating the range we should process on the entries list :return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name""" - tree_items = [] # type: List[Tuple[Union[bytes, str], int, str]] - tree_items_append = tree_items.append + tree_items: List[Tuple[bytes, int, str]] = [] + ci = sl.start end = sl.stop while ci < end: @@ -272,7 +273,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 rbound = entry.path.find('/', si) if rbound == -1: # its not a tree - tree_items_append((entry.binsha, entry.mode, entry.path[si:])) + tree_items.append((entry.binsha, entry.mode, entry.path[si:])) else: # find common base range base = entry.path[si:rbound] @@ -289,7 +290,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 # enter recursion # ci - 1 as we want to count our current item as well sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1) - tree_items_append((sha, S_IFDIR, base)) + tree_items.append((sha, S_IFDIR, base)) # skip ahead ci = xi @@ -306,7 +307,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 return (istream.binsha, tree_items_stringified) -def _tree_entry_to_baseindexentry(tree_entry: Tuple[str, int, str], stage: int) -> BaseIndexEntry: +def _tree_entry_to_baseindexentry(tree_entry: Tuple[bytes, int, str], stage: int) -> BaseIndexEntry: return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])) @@ -319,14 +320,13 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr :param tree_shas: 1, 2 or 3 trees as identified by their binary 20 byte shas If 1 or two, the entries will effectively correspond to the last given tree If 3 are given, a 3 way merge is performed""" - out = [] # type: List[BaseIndexEntry] - out_append = out.append + out: List[BaseIndexEntry] = [] # one and two way is the same for us, as we don't have to handle an existing # index, instrea if len(tree_shas) in (1, 2): for entry in traverse_tree_recursive(odb, tree_shas[-1], ''): - out_append(_tree_entry_to_baseindexentry(entry, 0)) + out.append(_tree_entry_to_baseindexentry(entry, 0)) # END for each entry return out # END handle single tree @@ -334,8 +334,16 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr if len(tree_shas) > 3: raise ValueError("Cannot handle %i trees at once" % len(tree_shas)) + EntryTupOrNone = Union[EntryTup, None] + + def is_three_entry_list(inp) -> TypeGuard[List[EntryTupOrNone]]: + return isinstance(inp, list) and len(inp) == 3 + # three trees - for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ''): + for three_entries in traverse_trees_recursive(odb, tree_shas, ''): + + assert is_three_entry_list(three_entries) + base, ours, theirs = three_entries if base is not None: # base version exists if ours is not None: @@ -347,23 +355,23 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr if(base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or \ (base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1]): # changed by both - out_append(_tree_entry_to_baseindexentry(base, 1)) - out_append(_tree_entry_to_baseindexentry(ours, 2)) - out_append(_tree_entry_to_baseindexentry(theirs, 3)) + out.append(_tree_entry_to_baseindexentry(base, 1)) + out.append(_tree_entry_to_baseindexentry(ours, 2)) + out.append(_tree_entry_to_baseindexentry(theirs, 3)) elif base[0] != ours[0] or base[1] != ours[1]: # only we changed it - out_append(_tree_entry_to_baseindexentry(ours, 0)) + out.append(_tree_entry_to_baseindexentry(ours, 0)) else: # either nobody changed it, or they did. In either # case, use theirs - out_append(_tree_entry_to_baseindexentry(theirs, 0)) + out.append(_tree_entry_to_baseindexentry(theirs, 0)) # END handle modification else: if ours[0] != base[0] or ours[1] != base[1]: # they deleted it, we changed it, conflict - out_append(_tree_entry_to_baseindexentry(base, 1)) - out_append(_tree_entry_to_baseindexentry(ours, 2)) + out.append(_tree_entry_to_baseindexentry(base, 1)) + out.append(_tree_entry_to_baseindexentry(ours, 2)) # else: # we didn't change it, ignore # pass @@ -376,8 +384,8 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr else: if theirs[0] != base[0] or theirs[1] != base[1]: # deleted in ours, changed theirs, conflict - out_append(_tree_entry_to_baseindexentry(base, 1)) - out_append(_tree_entry_to_baseindexentry(theirs, 3)) + out.append(_tree_entry_to_baseindexentry(base, 1)) + out.append(_tree_entry_to_baseindexentry(theirs, 3)) # END theirs changed # else: # theirs didn't change @@ -386,20 +394,20 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr # END handle ours else: # all three can't be None - if ours is None: + if ours is None and theirs is not None: # added in their branch - out_append(_tree_entry_to_baseindexentry(theirs, 0)) - elif theirs is None: + out.append(_tree_entry_to_baseindexentry(theirs, 0)) + elif theirs is None and ours is not None: # added in our branch - out_append(_tree_entry_to_baseindexentry(ours, 0)) - else: + out.append(_tree_entry_to_baseindexentry(ours, 0)) + elif ours is not None and theirs is not None: # both have it, except for the base, see whether it changed if ours[0] != theirs[0] or ours[1] != theirs[1]: - out_append(_tree_entry_to_baseindexentry(ours, 2)) - out_append(_tree_entry_to_baseindexentry(theirs, 3)) + out.append(_tree_entry_to_baseindexentry(ours, 2)) + out.append(_tree_entry_to_baseindexentry(theirs, 3)) else: # it was added the same in both - out_append(_tree_entry_to_baseindexentry(ours, 0)) + out.append(_tree_entry_to_baseindexentry(ours, 0)) # END handle two items # END handle heads # END handle base exists diff --git a/git/index/util.py b/git/index/util.py index 3b3d6489..4f8af553 100644 --- a/git/index/util.py +++ b/git/index/util.py @@ -11,10 +11,13 @@ import os.path as osp # typing ---------------------------------------------------------------------- -from typing import (Any, Callable) +from typing import (Any, Callable, TYPE_CHECKING) from git.types import PathLike, _T +if TYPE_CHECKING: + from git.index import IndexFile + # --------------------------------------------------------------------------------- @@ -63,7 +66,7 @@ def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]: """ @wraps(func) - def post_clear_cache_if_not_raised(self, *args: Any, **kwargs: Any) -> _T: + def post_clear_cache_if_not_raised(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: rval = func(self, *args, **kwargs) self._delete_entries_cache() return rval @@ -78,7 +81,7 @@ def default_index(func: Callable[..., _T]) -> Callable[..., _T]: on that index only. """ @wraps(func) - def check_default_index(self, *args: Any, **kwargs: Any) -> _T: + def check_default_index(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: if self._file_path != self._index_path(): raise AssertionError( "Cannot call %r on indices that do not represent the default git index" % func.__name__) @@ -93,9 +96,9 @@ def git_working_dir(func: Callable[..., _T]) -> Callable[..., _T]: repository in order to assure relative paths are handled correctly""" @wraps(func) - def set_git_working_dir(self, *args: Any, **kwargs: Any) -> _T: + def set_git_working_dir(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: cur_wd = os.getcwd() - os.chdir(self.repo.working_tree_dir) + os.chdir(str(self.repo.working_tree_dir)) try: return func(self, *args, **kwargs) finally: |
