diff options
Diffstat (limited to 'git/objects')
-rw-r--r-- | git/objects/base.py | 9 | ||||
-rw-r--r-- | git/objects/commit.py | 46 | ||||
-rw-r--r-- | git/objects/submodule/base.py | 117 | ||||
-rw-r--r-- | git/objects/submodule/util.py | 10 | ||||
-rw-r--r-- | git/objects/tree.py | 71 | ||||
-rw-r--r-- | git/objects/util.py | 155 |
6 files changed, 290 insertions, 118 deletions
diff --git a/git/objects/base.py b/git/objects/base.py index 884f9651..4e2ed493 100644 --- a/git/objects/base.py +++ b/git/objects/base.py @@ -17,15 +17,16 @@ from .util import get_object_type_by_name from typing import Any, TYPE_CHECKING, Optional, Union -from git.types import PathLike +from git.types import PathLike, Commit_ish if TYPE_CHECKING: from git.repo import Repo from gitdb.base import OStream from .tree import Tree from .blob import Blob - from .tag import TagObject - from .commit import Commit + from .submodule.base import Submodule + +IndexObjUnion = Union['Tree', 'Blob', 'Submodule'] # -------------------------------------------------------------------------- @@ -71,7 +72,7 @@ class Object(LazyMixin): return repo.rev_parse(str(id)) @classmethod - def new_from_sha(cls, repo: 'Repo', sha1: bytes) -> Union['Commit', 'TagObject', 'Tree', 'Blob']: + def new_from_sha(cls, repo: 'Repo', sha1: bytes) -> Commit_ish: """ :return: new object instance of a type appropriate to represent the given binary sha1 diff --git a/git/objects/commit.py b/git/objects/commit.py index 0b707450..7d3ea4fa 100644 --- a/git/objects/commit.py +++ b/git/objects/commit.py @@ -3,12 +3,10 @@ # # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php - from gitdb import IStream from git.util import ( hex_to_bin, Actor, - IterableObj, Stats, finalize_process ) @@ -17,8 +15,8 @@ from git.diff import Diffable from .tree import Tree from . import base from .util import ( - Traversable, Serializable, + TraversableIterableObj, parse_date, altz_to_utctz_str, parse_actor_and_date, @@ -36,18 +34,25 @@ import os from io import BytesIO import logging -from typing import List, Tuple, Union, TYPE_CHECKING + +# typing ------------------------------------------------------------------ + +from typing import Any, Iterator, List, Sequence, Tuple, Union, TYPE_CHECKING + +from git.types import PathLike if TYPE_CHECKING: from git.repo import Repo +# ------------------------------------------------------------------------ + log = logging.getLogger('git.objects.commit') log.addHandler(logging.NullHandler()) __all__ = ('Commit', ) -class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable): +class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): """Wraps a git Commit object. @@ -73,7 +78,8 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable): "message", "parents", "encoding", "gpgsig") _id_attribute_ = "hexsha" - def __init__(self, repo, binsha, tree=None, author=None, authored_date=None, author_tz_offset=None, + def __init__(self, repo, binsha, tree=None, author: Union[Actor, None] = None, + authored_date=None, author_tz_offset=None, committer=None, committed_date=None, committer_tz_offset=None, message=None, parents: Union[Tuple['Commit', ...], List['Commit'], None] = None, encoding=None, gpgsig=None): @@ -139,7 +145,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable): self.gpgsig = gpgsig @classmethod - def _get_intermediate_items(cls, commit: 'Commit') -> Tuple['Commit', ...]: # type: ignore ## cos overriding super + def _get_intermediate_items(cls, commit: 'Commit') -> Tuple['Commit', ...]: return tuple(commit.parents) @classmethod @@ -225,7 +231,9 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable): return self.repo.git.name_rev(self) @classmethod - def iter_items(cls, repo, rev, paths='', **kwargs): + def iter_items(cls, repo: 'Repo', rev, # type: ignore + paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any + ) -> Iterator['Commit']: """Find all commits matching the given criteria. :param repo: is the Repo @@ -245,15 +253,23 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable): # use -- in any case, to prevent possibility of ambiguous arguments # see https://github.com/gitpython-developers/GitPython/issues/264 - args = ['--'] + + args_list: List[Union[PathLike, Sequence[PathLike]]] = ['--'] + if paths: - args.extend((paths, )) + paths_tup: Tuple[PathLike, ...] + if isinstance(paths, (str, os.PathLike)): + paths_tup = (paths, ) + else: + paths_tup = tuple(paths) + + args_list.extend(paths_tup) # END if paths - proc = repo.git.rev_list(rev, args, as_process=True, **kwargs) + proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs) return cls._iter_from_process_or_stream(repo, proc) - def iter_parents(self, paths='', **kwargs): + def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs) -> Iterator['Commit']: """Iterate _all_ parents of this commit. :param paths: @@ -269,7 +285,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable): return self.iter_items(self.repo, self, paths, **kwargs) - @property + @ property def stats(self): """Create a git stat from changes between this commit and its first parent or from all changes done if this is the very first commit. @@ -286,7 +302,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable): text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, '--', numstat=True) return Stats._list_from_string(self.repo, text) - @classmethod + @ classmethod def _iter_from_process_or_stream(cls, repo, proc_or_stream): """Parse out commit information into a list of Commit objects We expect one-line per commit, and parse the actual commit information directly @@ -317,7 +333,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable): if hasattr(proc_or_stream, 'wait'): finalize_process(proc_or_stream) - @classmethod + @ classmethod def create_from_tree(cls, repo, tree, message, parent_commits=None, head=False, author=None, committer=None, author_date=None, commit_date=None): """Commit the given tree, creating a commit object. diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py index f0b8babc..68e16b5f 100644 --- a/git/objects/submodule/base.py +++ b/git/objects/submodule/base.py @@ -3,6 +3,7 @@ from io import BytesIO import logging import os import stat + from unittest import SkipTest import uuid @@ -24,9 +25,9 @@ from git.exc import ( BadName ) from git.objects.base import IndexObject, Object -from git.objects.util import Traversable +from git.objects.util import TraversableIterableObj + from git.util import ( - IterableObj, join_path_native, to_native_path_linux, RemoteProgress, @@ -48,6 +49,13 @@ from .util import ( # typing ---------------------------------------------------------------------- +from typing import Dict, TYPE_CHECKING +from typing import Any, Iterator, Union + +from git.types import Commit_ish, PathLike + +if TYPE_CHECKING: + from git.repo import Repo # ----------------------------------------------------------------------------- @@ -64,7 +72,7 @@ class UpdateProgress(RemoteProgress): """Class providing detailed progress information to the caller who should derive from it and implement the ``update(...)`` message""" CLONE, FETCH, UPDWKTREE = [1 << x for x in range(RemoteProgress._num_op_codes, RemoteProgress._num_op_codes + 3)] - _num_op_codes = RemoteProgress._num_op_codes + 3 + _num_op_codes: int = RemoteProgress._num_op_codes + 3 __slots__ = () @@ -79,7 +87,7 @@ UPDWKTREE = UpdateProgress.UPDWKTREE # IndexObject comes via util module, its a 'hacky' fix thanks to pythons import # mechanism which cause plenty of trouble of the only reason for packages and # modules is refactoring - subpackages shouldn't depend on parent packages -class Submodule(IndexObject, IterableObj, Traversable): +class Submodule(IndexObject, TraversableIterableObj): """Implements access to a git submodule. They are special in that their sha represents a commit in the submodule's repository which is to be checked out @@ -101,7 +109,14 @@ class Submodule(IndexObject, IterableObj, Traversable): __slots__ = ('_parent_commit', '_url', '_branch_path', '_name', '__weakref__') _cache_attrs = ('path', '_url', '_branch_path') - def __init__(self, repo, binsha, mode=None, path=None, name=None, parent_commit=None, url=None, branch_path=None): + def __init__(self, repo: 'Repo', binsha: bytes, + mode: Union[int, None] = None, + path: Union[PathLike, None] = None, + name: Union[str, None] = None, + parent_commit: Union[Commit_ish, None] = None, + url: str = None, + branch_path: Union[PathLike, None] = None + ) -> None: """Initialize this instance with its attributes. We only document the ones that differ from ``IndexObject`` @@ -121,15 +136,16 @@ class Submodule(IndexObject, IterableObj, Traversable): if name is not None: self._name = name - def _set_cache_(self, attr): + def _set_cache_(self, attr: str) -> None: if attr in ('path', '_url', '_branch_path'): reader = self.config_reader() # default submodule values try: self.path = reader.get('path') except cp.NoSectionError as e: - raise ValueError("This submodule instance does not exist anymore in '%s' file" - % osp.join(self.repo.working_tree_dir, '.gitmodules')) from e + if self.repo.working_tree_dir is not None: + raise ValueError("This submodule instance does not exist anymore in '%s' file" + % osp.join(self.repo.working_tree_dir, '.gitmodules')) from e # end self._url = reader.get('url') # git-python extension values - optional @@ -150,33 +166,35 @@ class Submodule(IndexObject, IterableObj, Traversable): # END handle intermediate items @classmethod - def _need_gitfile_submodules(cls, git): + def _need_gitfile_submodules(cls, git: Git) -> bool: return git.version_info[:3] >= (1, 7, 5) - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: """Compare with another submodule""" # we may only compare by name as this should be the ID they are hashed with # Otherwise this type wouldn't be hashable # return self.path == other.path and self.url == other.url and super(Submodule, self).__eq__(other) return self._name == other._name - def __ne__(self, other): + def __ne__(self, other: object) -> bool: """Compare with another submodule for inequality""" return not (self == other) - def __hash__(self): + def __hash__(self) -> int: """Hash this instance using its logical id, not the sha""" return hash(self._name) - def __str__(self): + def __str__(self) -> str: return self._name - def __repr__(self): + def __repr__(self) -> str: return "git.%s(name=%s, path=%s, url=%s, branch_path=%s)"\ % (type(self).__name__, self._name, self.path, self.url, self.branch_path) @classmethod - def _config_parser(cls, repo, parent_commit, read_only): + def _config_parser(cls, repo: 'Repo', + parent_commit: Union[Commit_ish, None], + read_only: bool) -> SubmoduleConfigParser: """:return: Config Parser constrained to our submodule in read or write mode :raise IOError: If the .gitmodules file cannot be found, either locally or in the repository at the given parent commit. Otherwise the exception would be delayed until the first @@ -189,8 +207,8 @@ class Submodule(IndexObject, IterableObj, Traversable): # We are most likely in an empty repository, so the HEAD doesn't point to a valid ref pass # end handle parent_commit - - if not repo.bare and parent_matches_head: + fp_module: Union[str, BytesIO] + if not repo.bare and parent_matches_head and repo.working_tree_dir: fp_module = osp.join(repo.working_tree_dir, cls.k_modules_file) else: assert parent_commit is not None, "need valid parent_commit in bare repositories" @@ -219,13 +237,13 @@ class Submodule(IndexObject, IterableObj, Traversable): # END for each name to delete @classmethod - def _sio_modules(cls, parent_commit): + def _sio_modules(cls, parent_commit: Commit_ish) -> BytesIO: """:return: Configuration file as BytesIO - we only access it through the respective blob's data""" sio = BytesIO(parent_commit.tree[cls.k_modules_file].data_stream.read()) sio.name = cls.k_modules_file return sio - def _config_parser_constrained(self, read_only): + def _config_parser_constrained(self, read_only: bool) -> SectionConstraint: """:return: Config Parser constrained to our submodule in read or write mode""" try: pc = self.parent_commit @@ -248,7 +266,7 @@ class Submodule(IndexObject, IterableObj, Traversable): """:return: Repo instance of newly cloned repository :param repo: our parent repository :param url: url to clone from - :param path: repository-relative path to the submodule checkout location + :param path: repository - relative path to the submodule checkout location :param name: canonical of the submodule :param kwrags: additinoal arguments given to git.clone""" module_abspath = cls._module_abspath(repo, path, name) @@ -269,7 +287,7 @@ class Submodule(IndexObject, IterableObj, Traversable): @classmethod def _to_relative_path(cls, parent_repo, path): - """:return: a path guaranteed to be relative to the given parent-repository + """:return: a path guaranteed to be relative to the given parent - repository :raise ValueError: if path is not contained in the parent repository's working tree""" path = to_native_path_linux(path) if path.endswith('/'): @@ -291,11 +309,11 @@ class Submodule(IndexObject, IterableObj, Traversable): @classmethod def _write_git_file_and_module_config(cls, working_tree_dir, module_abspath): - """Writes a .git file containing a (preferably) relative path to the actual git module repository. + """Writes a .git file containing a(preferably) relative path to the actual git module repository. It is an error if the module_abspath cannot be made into a relative path, relative to the working_tree_dir :note: will overwrite existing files ! :note: as we rewrite both the git file as well as the module configuration, we might fail on the configuration - and will not roll back changes done to the git file. This should be a non-issue, but may easily be fixed + and will not roll back changes done to the git file. This should be a non - issue, but may easily be fixed if it becomes one :param working_tree_dir: directory to write the .git file into :param module_abspath: absolute path to the bare repository @@ -316,8 +334,9 @@ class Submodule(IndexObject, IterableObj, Traversable): #{ Edit Interface @classmethod - def add(cls, repo, name, path, url=None, branch=None, no_checkout=False, depth=None, env=None, - clone_multi_options=None): + def add(cls, repo: 'Repo', name: str, path: PathLike, url: Union[str, None] = None, + branch=None, no_checkout: bool = False, depth=None, env=None, clone_multi_options=None + ) -> 'Submodule': """Add a new submodule to the given repository. This will alter the index as well as the .gitmodules file, but will not create a new commit. If the submodule already exists, no matter if the configuration differs @@ -363,8 +382,8 @@ class Submodule(IndexObject, IterableObj, Traversable): # assure we never put backslashes into the url, as some operating systems # like it ... - if url is not None: - url = to_native_path_linux(url) + # if url is not None: + # url = to_native_path_linux(url) to_native_path_linux does nothing?? # END assure url correctness # INSTANTIATE INTERMEDIATE SM @@ -408,7 +427,7 @@ class Submodule(IndexObject, IterableObj, Traversable): url = urls[0] else: # clone new repo - kwargs = {'n': no_checkout} + kwargs: Dict[str, Union[bool, int]] = {'n': no_checkout} if not branch_is_default: kwargs['b'] = br.name # END setup checkout-branch @@ -468,7 +487,7 @@ class Submodule(IndexObject, IterableObj, Traversable): was specified for this submodule and the branch existed remotely :param progress: UpdateProgress instance or None if no progress should be shown :param dry_run: if True, the operation will only be simulated, but not performed. - All performed operations are read-only + All performed operations are read - only :param force: If True, we may reset heads even if the repository in question is dirty. Additinoally we will be allowed to set a tracking branch which is ahead of its remote branch back into the past or the location of the @@ -476,7 +495,7 @@ class Submodule(IndexObject, IterableObj, Traversable): If False, local tracking branches that are in the future of their respective remote branches will simply not be moved. :param keep_going: if True, we will ignore but log all errors, and keep going recursively. - Unless dry_run is set as well, keep_going could cause subsequent/inherited errors you wouldn't see + Unless dry_run is set as well, keep_going could cause subsequent / inherited errors you wouldn't see otherwise. In conjunction with dry_run, it can be useful to anticipate all errors when updating submodules :param env: Optional dictionary containing the desired environment variables. @@ -685,9 +704,9 @@ class Submodule(IndexObject, IterableObj, Traversable): adjusting our index entry accordingly. :param module_path: the path to which to move our module in the parent repostory's working tree, - given as repository-relative or absolute path. Intermediate directories will be created + given as repository - relative or absolute path. Intermediate directories will be created accordingly. If the path already exists, it must be empty. - Trailing (back)slashes are removed automatically + Trailing(back)slashes are removed automatically :param configuration: if True, the configuration will be adjusted to let the submodule point to the given path. :param module: if True, the repository managed by this submodule @@ -696,7 +715,7 @@ class Submodule(IndexObject, IterableObj, Traversable): :return: self :raise ValueError: if the module path existed and was not empty, or was a file :note: Currently the method is not atomic, and it could leave the repository - in an inconsistent state if a sub-step fails for some reason + in an inconsistent state if a sub - step fails for some reason """ if module + configuration < 1: raise ValueError("You must specify to move at least the module or the configuration of the submodule") @@ -790,19 +809,19 @@ class Submodule(IndexObject, IterableObj, Traversable): @unbare_repo def remove(self, module=True, force=False, configuration=True, dry_run=False): """Remove this submodule from the repository. This will remove our entry - from the .gitmodules file and the entry in the .git/config file. + from the .gitmodules file and the entry in the .git / config file. :param module: If True, the module checkout we point to will be deleted as well. If the module is currently on a commit which is not part of any branch in the remote, if the currently checked out branch working tree, or untracked files, - is ahead of its tracking branch, if you have modifications in the + is ahead of its tracking branch, if you have modifications in the In case the removal of the repository fails for these reasons, the submodule status will not have been altered. - If this submodule has child-modules on its own, these will be deleted + If this submodule has child - modules on its own, these will be deleted prior to touching the own module. :param force: Enforces the deletion of the module even though it contains - modifications. This basically enforces a brute-force file system based + modifications. This basically enforces a brute - force file system based deletion. :param configuration: if True, the submodule is deleted from the configuration, otherwise it isn't. Although this should be enabled most of the times, @@ -942,7 +961,7 @@ class Submodule(IndexObject, IterableObj, Traversable): return self - def set_parent_commit(self, commit, check=True): + def set_parent_commit(self, commit: Union[Commit_ish, None], check=True): """Set this instance to use the given commit whose tree is supposed to contain the .gitmodules blob. @@ -1015,9 +1034,9 @@ class Submodule(IndexObject, IterableObj, Traversable): """Rename this submodule :note: This method takes care of renaming the submodule in various places, such as - * $parent_git_dir/config - * $working_tree_dir/.gitmodules - * (git >=v1.8.0: move submodule repository to new name) + * $parent_git_dir / config + * $working_tree_dir / .gitmodules + * (git >= v1.8.0: move submodule repository to new name) As .gitmodules will be changed, you would need to make a commit afterwards. The changed .gitmodules file will already be added to the index @@ -1091,7 +1110,7 @@ class Submodule(IndexObject, IterableObj, Traversable): def exists(self): """ :return: True if the submodule exists, False otherwise. Please note that - a submodule may exist (in the .gitmodules file) even though its module + a submodule may exist ( in the .gitmodules file) even though its module doesn't exist on disk""" # keep attributes for later, and restore them if we have no valid data # this way we do not actually alter the state of the object @@ -1131,7 +1150,7 @@ class Submodule(IndexObject, IterableObj, Traversable): @property def branch_path(self): """ - :return: full (relative) path as string to the branch we would checkout + :return: full(relative) path as string to the branch we would checkout from the remote and track""" return self._branch_path @@ -1144,7 +1163,7 @@ class Submodule(IndexObject, IterableObj, Traversable): @property def url(self): - """:return: The url to the repository which our module-repository refers to""" + """:return: The url to the repository which our module - repository refers to""" return self._url @property @@ -1160,7 +1179,7 @@ class Submodule(IndexObject, IterableObj, Traversable): """:return: The name of this submodule. It is used to identify it within the .gitmodules file. :note: by default, the name is the path at which to find the submodule, but - in git-python it should be a unique identifier similar to the identifiers + in git - python it should be a unique identifier similar to the identifiers used for remotes, which allows to change the path of the submodule easily """ @@ -1187,17 +1206,16 @@ class Submodule(IndexObject, IterableObj, Traversable): #{ Iterable Interface @classmethod - def iter_items(cls, repo, parent_commit='HEAD'): + def iter_items(cls, repo: 'Repo', parent_commit: Union[Commit_ish, str] = 'HEAD', *Args: Any, **kwargs: Any + ) -> Iterator['Submodule']: """:return: iterator yielding Submodule instances available in the given repository""" try: pc = repo.commit(parent_commit) # parent commit instance parser = cls._config_parser(repo, pc, read_only=True) except (IOError, BadName): - return + return iter([]) # END handle empty iterator - rt = pc.tree # root tree - for sms in parser.sections(): n = sm_name(sms) p = parser.get(sms, 'path') @@ -1210,6 +1228,7 @@ class Submodule(IndexObject, IterableObj, Traversable): # get the binsha index = repo.index try: + rt = pc.tree # root tree sm = rt[p] except KeyError: # try the index, maybe it was just added diff --git a/git/objects/submodule/util.py b/git/objects/submodule/util.py index b4796b30..5290000b 100644 --- a/git/objects/submodule/util.py +++ b/git/objects/submodule/util.py @@ -4,10 +4,12 @@ from git.config import GitConfigParser from io import BytesIO import weakref -from typing import TYPE_CHECKING + +from typing import Any, TYPE_CHECKING, Union if TYPE_CHECKING: from .base import Submodule + from weakref import ReferenceType __all__ = ('sm_section', 'sm_name', 'mkhead', 'find_first_remote_branch', 'SubmoduleConfigParser') @@ -58,8 +60,8 @@ class SubmoduleConfigParser(GitConfigParser): Please note that no mutating method will work in bare mode """ - def __init__(self, *args, **kwargs): - self._smref = None + def __init__(self, *args: Any, **kwargs: Any) -> None: + self._smref: Union['ReferenceType[Submodule]', None] = None self._index = None self._auto_write = True super(SubmoduleConfigParser, self).__init__(*args, **kwargs) @@ -89,7 +91,7 @@ class SubmoduleConfigParser(GitConfigParser): #} END interface #{ Overridden Methods - def write(self): + def write(self) -> None: rval = super(SubmoduleConfigParser, self).write() self.flush_to_index() return rval diff --git a/git/objects/tree.py b/git/objects/tree.py index 191fe27c..f61a4a87 100644 --- a/git/objects/tree.py +++ b/git/objects/tree.py @@ -3,12 +3,13 @@ # # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php + from git.util import join_path import git.diff as diff from git.util import to_bin_sha from . import util -from .base import IndexObject +from .base import IndexObject, IndexObjUnion from .blob import Blob from .submodule.base import Submodule @@ -20,14 +21,19 @@ from .fun import ( # typing ------------------------------------------------- -from typing import Callable, Dict, Generic, Iterable, Iterator, List, Tuple, Type, TypeVar, Union, cast, TYPE_CHECKING +from typing import (Callable, Dict, Generic, Iterable, Iterator, List, + Tuple, Type, TypeVar, Union, cast, TYPE_CHECKING) -from git.types import PathLike +from git.types import PathLike, TypeGuard if TYPE_CHECKING: from git.repo import Repo from io import BytesIO +T_Tree_cache = TypeVar('T_Tree_cache', bound=Tuple[bytes, int, str]) +TraversedTreeTup = Union[Tuple[Union['Tree', None], IndexObjUnion, + Tuple['Submodule', 'Submodule']]] + #-------------------------------------------------------- @@ -35,8 +41,6 @@ cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b) __all__ = ("TreeModifier", "Tree") -T_Tree_cache = TypeVar('T_Tree_cache', bound=Union[Tuple[bytes, int, str]]) - def git_cmp(t1: T_Tree_cache, t2: T_Tree_cache) -> int: a, b = t1[2], t2[2] @@ -137,8 +141,12 @@ class TreeModifier(Generic[T_Tree_cache], object): sha = to_bin_sha(sha) index = self._index_by_name(name) - assert isinstance(sha, bytes) and isinstance(mode, int) and isinstance(name, str) - item = cast(T_Tree_cache, (sha, mode, name)) # use Typeguard from typing-extensions 3.10.0 + def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[T_Tree_cache]: + return isinstance(inp[0], bytes) and isinstance(inp[1], int) and isinstance([inp], str) + + item = (sha, mode, name) + assert is_tree_cache(item) + if index == -1: self._cache.append(item) else: @@ -194,7 +202,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable): symlink_id = 0o12 tree_id = 0o04 - _map_id_to_type: Dict[int, Union[Type[Submodule], Type[Blob], Type['Tree']]] = { + _map_id_to_type: Dict[int, Type[IndexObjUnion]] = { commit_id: Submodule, blob_id: Blob, symlink_id: Blob @@ -205,7 +213,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable): super(Tree, self).__init__(repo, binsha, mode, path) @ classmethod - def _get_intermediate_items(cls, index_object: 'Tree', # type: ignore + def _get_intermediate_items(cls, index_object: 'Tree', ) -> Union[Tuple['Tree', ...], Tuple[()]]: if index_object.type == "tree": index_object = cast('Tree', index_object) @@ -222,7 +230,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable): # END handle attribute def _iter_convert_to_object(self, iterable: Iterable[Tuple[bytes, int, str]] - ) -> Iterator[Union[Blob, 'Tree', Submodule]]: + ) -> Iterator[IndexObjUnion]: """Iterable yields tuples of (binsha, mode, name), which will be converted to the respective object representation""" for binsha, mode, name in iterable: @@ -233,7 +241,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable): raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e # END for each item - def join(self, file: str) -> Union[Blob, 'Tree', Submodule]: + def join(self, file: str) -> IndexObjUnion: """Find the named object in this tree's contents :return: ``git.Blob`` or ``git.Tree`` or ``git.Submodule`` @@ -266,7 +274,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable): raise KeyError(msg % file) # END handle long paths - def __truediv__(self, file: str) -> Union['Tree', Blob, Submodule]: + def __truediv__(self, file: str) -> IndexObjUnion: """For PY3 only""" return self.join(file) @@ -289,24 +297,45 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable): See the ``TreeModifier`` for more information on how to alter the cache""" return TreeModifier(self._cache) - def traverse(self, predicate=lambda i, d: True, - prune=lambda i, d: False, depth=-1, branch_first=True, - visit_once=False, ignore_self=1): - """For documentation, see util.Traversable.traverse + def traverse(self, # type: ignore # overrides super() + predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True, + prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False, + depth: int = -1, + branch_first: bool = True, + visit_once: bool = False, + ignore_self: int = 1, + as_edge: bool = False + ) -> Union[Iterator[IndexObjUnion], + Iterator[TraversedTreeTup]]: + """For documentation, see util.Traversable._traverse() Trees are set to visit_once = False to gain more performance in the traversal""" - return super(Tree, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self) + + # """ + # # To typecheck instead of using cast. + # import itertools + # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]: + # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1]) + + # ret = super(Tree, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self) + # ret_tup = itertools.tee(ret, 2) + # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}" + # return ret_tup[0]""" + return cast(Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]], + super(Tree, self).traverse(predicate, prune, depth, # type: ignore + branch_first, visit_once, ignore_self)) # List protocol - def __getslice__(self, i: int, j: int) -> List[Union[Blob, 'Tree', Submodule]]: + + def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]: return list(self._iter_convert_to_object(self._cache[i:j])) - def __iter__(self) -> Iterator[Union[Blob, 'Tree', Submodule]]: + def __iter__(self) -> Iterator[IndexObjUnion]: return self._iter_convert_to_object(self._cache) def __len__(self) -> int: return len(self._cache) - def __getitem__(self, item: Union[str, int, slice]) -> Union[Blob, 'Tree', Submodule]: + def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion: if isinstance(item, int): info = self._cache[item] return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2])) @@ -318,7 +347,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable): raise TypeError("Invalid index type: %r" % item) - def __contains__(self, item: Union[IndexObject, PathLike]) -> bool: + def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool: if isinstance(item, IndexObject): for info in self._cache: if item.binsha == info[0]: diff --git a/git/objects/util.py b/git/objects/util.py index 8b8148a9..ec81f87e 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -7,6 +7,7 @@ from git.util import ( IterableList, + IterableObj, Actor ) @@ -19,18 +20,24 @@ import calendar from datetime import datetime, timedelta, tzinfo # typing ------------------------------------------------------------ -from typing import (Any, Callable, Deque, Iterator, TypeVar, TYPE_CHECKING, Tuple, Type, Union, cast) +from typing import (Any, Callable, Deque, Iterator, NamedTuple, overload, Sequence, + TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast) + +from git.types import Literal, TypeGuard if TYPE_CHECKING: from io import BytesIO, StringIO - from .submodule.base import Submodule # noqa: F401 from .commit import Commit from .blob import Blob from .tag import TagObject - from .tree import Tree + from .tree import Tree, TraversedTreeTup from subprocess import Popen - -T_Iterableobj = TypeVar('T_Iterableobj') + + +T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse() + +TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule + 'TraversedTreeTup'] # for tree.traverse() # -------------------------------------------------------------------- @@ -287,7 +294,7 @@ class Traversable(object): __slots__ = () @classmethod - def _get_intermediate_items(cls, item): + def _get_intermediate_items(cls, item) -> Sequence['Traversable']: """ Returns: Tuple of items connected to the given item. @@ -299,23 +306,30 @@ class Traversable(object): """ raise NotImplementedError("To be implemented in subclass") - def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList: + def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList['TraversableIterableObj']: """ :return: IterableList with the results of the traversal as produced by - traverse()""" - out: IterableList = IterableList(self._id_attribute_) # type: ignore[attr-defined] # defined in sublcasses + traverse() + List objects must be IterableObj and Traversable e.g. Commit, Submodule""" + + def is_TraversableIterableObj(inp: 'Traversable') -> TypeGuard['TraversableIterableObj']: + # return isinstance(self, TraversableIterableObj) + # Can it be anythin else? + return isinstance(self, Traversable) + + assert is_TraversableIterableObj(self), f"{type(self)}" + out: IterableList['TraversableIterableObj'] = IterableList(self._id_attribute_) out.extend(self.traverse(*args, **kwargs)) return out def traverse(self, - predicate: Callable[[object, int], bool] = lambda i, d: True, - prune: Callable[[object, int], bool] = lambda i, d: False, - depth: int = -1, - branch_first: bool = True, - visit_once: bool = True, ignore_self: int = 1, as_edge: bool = False - ) -> Union[Iterator['Traversable'], Iterator[Tuple['Traversable', 'Traversable']]]: + predicate: Callable[[Union['Traversable', TraversedTup], int], bool] = lambda i, d: True, + prune: Callable[[Union['Traversable', TraversedTup], int], bool] = lambda i, d: False, + depth: int = -1, branch_first: bool = True, visit_once: bool = True, + ignore_self: int = 1, as_edge: bool = False + ) -> Union[Iterator[Union['Traversable', 'Blob']], + Iterator[TraversedTup]]: """:return: iterator yielding of items found when traversing self - :param predicate: f(i,d) returns False if item i at depth d should not be included in the result :param prune: @@ -344,21 +358,37 @@ class Traversable(object): if True, return a pair of items, first being the source, second the destination, i.e. tuple(src, dest) with the edge spanning from source to destination""" + + """ + Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]] + Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]] + Tree -> Iterator[Union[Blob, Tree, Submodule, + Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]] + + ignore_self=True is_edge=True -> Iterator[item] + ignore_self=True is_edge=False --> Iterator[item] + ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]] + ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]""" + class TraverseNT(NamedTuple): + depth: int + item: 'Traversable' + src: Union['Traversable', None] + visited = set() - stack = deque() # type: Deque[Tuple[int, Traversable, Union[Traversable, None]]] - stack.append((0, self, None)) # self is always depth level 0 + stack = deque() # type: Deque[TraverseNT] + stack.append(TraverseNT(0, self, None)) # self is always depth level 0 - def addToStack(stack: Deque[Tuple[int, 'Traversable', Union['Traversable', None]]], - item: 'Traversable', + def addToStack(stack: Deque[TraverseNT], + src_item: 'Traversable', branch_first: bool, - depth) -> None: + depth: int) -> None: lst = self._get_intermediate_items(item) - if not lst: + if not lst: # empty list return None if branch_first: - stack.extendleft((depth, i, item) for i in lst) + stack.extendleft(TraverseNT(depth, i, src_item) for i in lst) else: - reviter = ((depth, lst[i], item) for i in range(len(lst) - 1, -1, -1)) + reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1)) stack.extend(reviter) # END addToStack local method @@ -371,7 +401,12 @@ class Traversable(object): if visit_once: visited.add(item) - rval = (as_edge and (src, item)) or item + rval: Union['Traversable', Tuple[Union[None, 'Traversable'], 'Traversable']] + if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item) + rval = (src, item) + else: + rval = item + if prune(rval, d): continue @@ -405,3 +440,73 @@ class Serializable(object): :param stream: a file-like object :return: self""" raise NotImplementedError("To be implemented in subclass") + + +class TraversableIterableObj(Traversable, IterableObj): + __slots__ = () + + TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj] + + @overload # type: ignore + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[True], + as_edge: Literal[False], + ) -> Iterator[T_TIobj]: + ... + + @overload + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[False], + as_edge: Literal[True], + ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: + ... + + @overload + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], + prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[True], + as_edge: Literal[True], + ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: + ... + + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], + bool] = lambda i, d: True, + prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], + bool] = lambda i, d: False, + depth: int = -1, branch_first: bool = True, visit_once: bool = True, + ignore_self: int = 1, as_edge: bool = False + ) -> Union[Iterator[T_TIobj], + Iterator[Tuple[T_TIobj, T_TIobj]], + Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]]: + """For documentation, see util.Traversable._traverse()""" + + """ + # To typecheck instead of using cast. + import itertools + from git.types import TypeGuard + def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]: + for x in inp[1]: + if not isinstance(x, tuple) and len(x) != 2: + if all(isinstance(inner, Commit) for inner in x): + continue + return True + + ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge) + ret_tup = itertools.tee(ret, 2) + assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}" + return ret_tup[0] + """ + return cast(Union[Iterator[T_TIobj], + Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], + super(TraversableIterableObj, self).traverse( + predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore + )) |