summaryrefslogtreecommitdiff
path: root/git/objects
diff options
context:
space:
mode:
Diffstat (limited to 'git/objects')
-rw-r--r--git/objects/base.py9
-rw-r--r--git/objects/commit.py46
-rw-r--r--git/objects/submodule/base.py117
-rw-r--r--git/objects/submodule/util.py10
-rw-r--r--git/objects/tree.py71
-rw-r--r--git/objects/util.py155
6 files changed, 290 insertions, 118 deletions
diff --git a/git/objects/base.py b/git/objects/base.py
index 884f9651..4e2ed493 100644
--- a/git/objects/base.py
+++ b/git/objects/base.py
@@ -17,15 +17,16 @@ from .util import get_object_type_by_name
from typing import Any, TYPE_CHECKING, Optional, Union
-from git.types import PathLike
+from git.types import PathLike, Commit_ish
if TYPE_CHECKING:
from git.repo import Repo
from gitdb.base import OStream
from .tree import Tree
from .blob import Blob
- from .tag import TagObject
- from .commit import Commit
+ from .submodule.base import Submodule
+
+IndexObjUnion = Union['Tree', 'Blob', 'Submodule']
# --------------------------------------------------------------------------
@@ -71,7 +72,7 @@ class Object(LazyMixin):
return repo.rev_parse(str(id))
@classmethod
- def new_from_sha(cls, repo: 'Repo', sha1: bytes) -> Union['Commit', 'TagObject', 'Tree', 'Blob']:
+ def new_from_sha(cls, repo: 'Repo', sha1: bytes) -> Commit_ish:
"""
:return: new object instance of a type appropriate to represent the given
binary sha1
diff --git a/git/objects/commit.py b/git/objects/commit.py
index 0b707450..7d3ea4fa 100644
--- a/git/objects/commit.py
+++ b/git/objects/commit.py
@@ -3,12 +3,10 @@
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-
from gitdb import IStream
from git.util import (
hex_to_bin,
Actor,
- IterableObj,
Stats,
finalize_process
)
@@ -17,8 +15,8 @@ from git.diff import Diffable
from .tree import Tree
from . import base
from .util import (
- Traversable,
Serializable,
+ TraversableIterableObj,
parse_date,
altz_to_utctz_str,
parse_actor_and_date,
@@ -36,18 +34,25 @@ import os
from io import BytesIO
import logging
-from typing import List, Tuple, Union, TYPE_CHECKING
+
+# typing ------------------------------------------------------------------
+
+from typing import Any, Iterator, List, Sequence, Tuple, Union, TYPE_CHECKING
+
+from git.types import PathLike
if TYPE_CHECKING:
from git.repo import Repo
+# ------------------------------------------------------------------------
+
log = logging.getLogger('git.objects.commit')
log.addHandler(logging.NullHandler())
__all__ = ('Commit', )
-class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
+class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
"""Wraps a git Commit object.
@@ -73,7 +78,8 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
"message", "parents", "encoding", "gpgsig")
_id_attribute_ = "hexsha"
- def __init__(self, repo, binsha, tree=None, author=None, authored_date=None, author_tz_offset=None,
+ def __init__(self, repo, binsha, tree=None, author: Union[Actor, None] = None,
+ authored_date=None, author_tz_offset=None,
committer=None, committed_date=None, committer_tz_offset=None,
message=None, parents: Union[Tuple['Commit', ...], List['Commit'], None] = None,
encoding=None, gpgsig=None):
@@ -139,7 +145,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
self.gpgsig = gpgsig
@classmethod
- def _get_intermediate_items(cls, commit: 'Commit') -> Tuple['Commit', ...]: # type: ignore ## cos overriding super
+ def _get_intermediate_items(cls, commit: 'Commit') -> Tuple['Commit', ...]:
return tuple(commit.parents)
@classmethod
@@ -225,7 +231,9 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
return self.repo.git.name_rev(self)
@classmethod
- def iter_items(cls, repo, rev, paths='', **kwargs):
+ def iter_items(cls, repo: 'Repo', rev, # type: ignore
+ paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any
+ ) -> Iterator['Commit']:
"""Find all commits matching the given criteria.
:param repo: is the Repo
@@ -245,15 +253,23 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
# use -- in any case, to prevent possibility of ambiguous arguments
# see https://github.com/gitpython-developers/GitPython/issues/264
- args = ['--']
+
+ args_list: List[Union[PathLike, Sequence[PathLike]]] = ['--']
+
if paths:
- args.extend((paths, ))
+ paths_tup: Tuple[PathLike, ...]
+ if isinstance(paths, (str, os.PathLike)):
+ paths_tup = (paths, )
+ else:
+ paths_tup = tuple(paths)
+
+ args_list.extend(paths_tup)
# END if paths
- proc = repo.git.rev_list(rev, args, as_process=True, **kwargs)
+ proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs)
return cls._iter_from_process_or_stream(repo, proc)
- def iter_parents(self, paths='', **kwargs):
+ def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs) -> Iterator['Commit']:
"""Iterate _all_ parents of this commit.
:param paths:
@@ -269,7 +285,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
return self.iter_items(self.repo, self, paths, **kwargs)
- @property
+ @ property
def stats(self):
"""Create a git stat from changes between this commit and its first parent
or from all changes done if this is the very first commit.
@@ -286,7 +302,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, '--', numstat=True)
return Stats._list_from_string(self.repo, text)
- @classmethod
+ @ classmethod
def _iter_from_process_or_stream(cls, repo, proc_or_stream):
"""Parse out commit information into a list of Commit objects
We expect one-line per commit, and parse the actual commit information directly
@@ -317,7 +333,7 @@ class Commit(base.Object, IterableObj, Diffable, Traversable, Serializable):
if hasattr(proc_or_stream, 'wait'):
finalize_process(proc_or_stream)
- @classmethod
+ @ classmethod
def create_from_tree(cls, repo, tree, message, parent_commits=None, head=False, author=None, committer=None,
author_date=None, commit_date=None):
"""Commit the given tree, creating a commit object.
diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py
index f0b8babc..68e16b5f 100644
--- a/git/objects/submodule/base.py
+++ b/git/objects/submodule/base.py
@@ -3,6 +3,7 @@ from io import BytesIO
import logging
import os
import stat
+
from unittest import SkipTest
import uuid
@@ -24,9 +25,9 @@ from git.exc import (
BadName
)
from git.objects.base import IndexObject, Object
-from git.objects.util import Traversable
+from git.objects.util import TraversableIterableObj
+
from git.util import (
- IterableObj,
join_path_native,
to_native_path_linux,
RemoteProgress,
@@ -48,6 +49,13 @@ from .util import (
# typing ----------------------------------------------------------------------
+from typing import Dict, TYPE_CHECKING
+from typing import Any, Iterator, Union
+
+from git.types import Commit_ish, PathLike
+
+if TYPE_CHECKING:
+ from git.repo import Repo
# -----------------------------------------------------------------------------
@@ -64,7 +72,7 @@ class UpdateProgress(RemoteProgress):
"""Class providing detailed progress information to the caller who should
derive from it and implement the ``update(...)`` message"""
CLONE, FETCH, UPDWKTREE = [1 << x for x in range(RemoteProgress._num_op_codes, RemoteProgress._num_op_codes + 3)]
- _num_op_codes = RemoteProgress._num_op_codes + 3
+ _num_op_codes: int = RemoteProgress._num_op_codes + 3
__slots__ = ()
@@ -79,7 +87,7 @@ UPDWKTREE = UpdateProgress.UPDWKTREE
# IndexObject comes via util module, its a 'hacky' fix thanks to pythons import
# mechanism which cause plenty of trouble of the only reason for packages and
# modules is refactoring - subpackages shouldn't depend on parent packages
-class Submodule(IndexObject, IterableObj, Traversable):
+class Submodule(IndexObject, TraversableIterableObj):
"""Implements access to a git submodule. They are special in that their sha
represents a commit in the submodule's repository which is to be checked out
@@ -101,7 +109,14 @@ class Submodule(IndexObject, IterableObj, Traversable):
__slots__ = ('_parent_commit', '_url', '_branch_path', '_name', '__weakref__')
_cache_attrs = ('path', '_url', '_branch_path')
- def __init__(self, repo, binsha, mode=None, path=None, name=None, parent_commit=None, url=None, branch_path=None):
+ def __init__(self, repo: 'Repo', binsha: bytes,
+ mode: Union[int, None] = None,
+ path: Union[PathLike, None] = None,
+ name: Union[str, None] = None,
+ parent_commit: Union[Commit_ish, None] = None,
+ url: str = None,
+ branch_path: Union[PathLike, None] = None
+ ) -> None:
"""Initialize this instance with its attributes. We only document the ones
that differ from ``IndexObject``
@@ -121,15 +136,16 @@ class Submodule(IndexObject, IterableObj, Traversable):
if name is not None:
self._name = name
- def _set_cache_(self, attr):
+ def _set_cache_(self, attr: str) -> None:
if attr in ('path', '_url', '_branch_path'):
reader = self.config_reader()
# default submodule values
try:
self.path = reader.get('path')
except cp.NoSectionError as e:
- raise ValueError("This submodule instance does not exist anymore in '%s' file"
- % osp.join(self.repo.working_tree_dir, '.gitmodules')) from e
+ if self.repo.working_tree_dir is not None:
+ raise ValueError("This submodule instance does not exist anymore in '%s' file"
+ % osp.join(self.repo.working_tree_dir, '.gitmodules')) from e
# end
self._url = reader.get('url')
# git-python extension values - optional
@@ -150,33 +166,35 @@ class Submodule(IndexObject, IterableObj, Traversable):
# END handle intermediate items
@classmethod
- def _need_gitfile_submodules(cls, git):
+ def _need_gitfile_submodules(cls, git: Git) -> bool:
return git.version_info[:3] >= (1, 7, 5)
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
"""Compare with another submodule"""
# we may only compare by name as this should be the ID they are hashed with
# Otherwise this type wouldn't be hashable
# return self.path == other.path and self.url == other.url and super(Submodule, self).__eq__(other)
return self._name == other._name
- def __ne__(self, other):
+ def __ne__(self, other: object) -> bool:
"""Compare with another submodule for inequality"""
return not (self == other)
- def __hash__(self):
+ def __hash__(self) -> int:
"""Hash this instance using its logical id, not the sha"""
return hash(self._name)
- def __str__(self):
+ def __str__(self) -> str:
return self._name
- def __repr__(self):
+ def __repr__(self) -> str:
return "git.%s(name=%s, path=%s, url=%s, branch_path=%s)"\
% (type(self).__name__, self._name, self.path, self.url, self.branch_path)
@classmethod
- def _config_parser(cls, repo, parent_commit, read_only):
+ def _config_parser(cls, repo: 'Repo',
+ parent_commit: Union[Commit_ish, None],
+ read_only: bool) -> SubmoduleConfigParser:
""":return: Config Parser constrained to our submodule in read or write mode
:raise IOError: If the .gitmodules file cannot be found, either locally or in the repository
at the given parent commit. Otherwise the exception would be delayed until the first
@@ -189,8 +207,8 @@ class Submodule(IndexObject, IterableObj, Traversable):
# We are most likely in an empty repository, so the HEAD doesn't point to a valid ref
pass
# end handle parent_commit
-
- if not repo.bare and parent_matches_head:
+ fp_module: Union[str, BytesIO]
+ if not repo.bare and parent_matches_head and repo.working_tree_dir:
fp_module = osp.join(repo.working_tree_dir, cls.k_modules_file)
else:
assert parent_commit is not None, "need valid parent_commit in bare repositories"
@@ -219,13 +237,13 @@ class Submodule(IndexObject, IterableObj, Traversable):
# END for each name to delete
@classmethod
- def _sio_modules(cls, parent_commit):
+ def _sio_modules(cls, parent_commit: Commit_ish) -> BytesIO:
""":return: Configuration file as BytesIO - we only access it through the respective blob's data"""
sio = BytesIO(parent_commit.tree[cls.k_modules_file].data_stream.read())
sio.name = cls.k_modules_file
return sio
- def _config_parser_constrained(self, read_only):
+ def _config_parser_constrained(self, read_only: bool) -> SectionConstraint:
""":return: Config Parser constrained to our submodule in read or write mode"""
try:
pc = self.parent_commit
@@ -248,7 +266,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
""":return: Repo instance of newly cloned repository
:param repo: our parent repository
:param url: url to clone from
- :param path: repository-relative path to the submodule checkout location
+ :param path: repository - relative path to the submodule checkout location
:param name: canonical of the submodule
:param kwrags: additinoal arguments given to git.clone"""
module_abspath = cls._module_abspath(repo, path, name)
@@ -269,7 +287,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
@classmethod
def _to_relative_path(cls, parent_repo, path):
- """:return: a path guaranteed to be relative to the given parent-repository
+ """:return: a path guaranteed to be relative to the given parent - repository
:raise ValueError: if path is not contained in the parent repository's working tree"""
path = to_native_path_linux(path)
if path.endswith('/'):
@@ -291,11 +309,11 @@ class Submodule(IndexObject, IterableObj, Traversable):
@classmethod
def _write_git_file_and_module_config(cls, working_tree_dir, module_abspath):
- """Writes a .git file containing a (preferably) relative path to the actual git module repository.
+ """Writes a .git file containing a(preferably) relative path to the actual git module repository.
It is an error if the module_abspath cannot be made into a relative path, relative to the working_tree_dir
:note: will overwrite existing files !
:note: as we rewrite both the git file as well as the module configuration, we might fail on the configuration
- and will not roll back changes done to the git file. This should be a non-issue, but may easily be fixed
+ and will not roll back changes done to the git file. This should be a non - issue, but may easily be fixed
if it becomes one
:param working_tree_dir: directory to write the .git file into
:param module_abspath: absolute path to the bare repository
@@ -316,8 +334,9 @@ class Submodule(IndexObject, IterableObj, Traversable):
#{ Edit Interface
@classmethod
- def add(cls, repo, name, path, url=None, branch=None, no_checkout=False, depth=None, env=None,
- clone_multi_options=None):
+ def add(cls, repo: 'Repo', name: str, path: PathLike, url: Union[str, None] = None,
+ branch=None, no_checkout: bool = False, depth=None, env=None, clone_multi_options=None
+ ) -> 'Submodule':
"""Add a new submodule to the given repository. This will alter the index
as well as the .gitmodules file, but will not create a new commit.
If the submodule already exists, no matter if the configuration differs
@@ -363,8 +382,8 @@ class Submodule(IndexObject, IterableObj, Traversable):
# assure we never put backslashes into the url, as some operating systems
# like it ...
- if url is not None:
- url = to_native_path_linux(url)
+ # if url is not None:
+ # url = to_native_path_linux(url) to_native_path_linux does nothing??
# END assure url correctness
# INSTANTIATE INTERMEDIATE SM
@@ -408,7 +427,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
url = urls[0]
else:
# clone new repo
- kwargs = {'n': no_checkout}
+ kwargs: Dict[str, Union[bool, int]] = {'n': no_checkout}
if not branch_is_default:
kwargs['b'] = br.name
# END setup checkout-branch
@@ -468,7 +487,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
was specified for this submodule and the branch existed remotely
:param progress: UpdateProgress instance or None if no progress should be shown
:param dry_run: if True, the operation will only be simulated, but not performed.
- All performed operations are read-only
+ All performed operations are read - only
:param force:
If True, we may reset heads even if the repository in question is dirty. Additinoally we will be allowed
to set a tracking branch which is ahead of its remote branch back into the past or the location of the
@@ -476,7 +495,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
If False, local tracking branches that are in the future of their respective remote branches will simply
not be moved.
:param keep_going: if True, we will ignore but log all errors, and keep going recursively.
- Unless dry_run is set as well, keep_going could cause subsequent/inherited errors you wouldn't see
+ Unless dry_run is set as well, keep_going could cause subsequent / inherited errors you wouldn't see
otherwise.
In conjunction with dry_run, it can be useful to anticipate all errors when updating submodules
:param env: Optional dictionary containing the desired environment variables.
@@ -685,9 +704,9 @@ class Submodule(IndexObject, IterableObj, Traversable):
adjusting our index entry accordingly.
:param module_path: the path to which to move our module in the parent repostory's working tree,
- given as repository-relative or absolute path. Intermediate directories will be created
+ given as repository - relative or absolute path. Intermediate directories will be created
accordingly. If the path already exists, it must be empty.
- Trailing (back)slashes are removed automatically
+ Trailing(back)slashes are removed automatically
:param configuration: if True, the configuration will be adjusted to let
the submodule point to the given path.
:param module: if True, the repository managed by this submodule
@@ -696,7 +715,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
:return: self
:raise ValueError: if the module path existed and was not empty, or was a file
:note: Currently the method is not atomic, and it could leave the repository
- in an inconsistent state if a sub-step fails for some reason
+ in an inconsistent state if a sub - step fails for some reason
"""
if module + configuration < 1:
raise ValueError("You must specify to move at least the module or the configuration of the submodule")
@@ -790,19 +809,19 @@ class Submodule(IndexObject, IterableObj, Traversable):
@unbare_repo
def remove(self, module=True, force=False, configuration=True, dry_run=False):
"""Remove this submodule from the repository. This will remove our entry
- from the .gitmodules file and the entry in the .git/config file.
+ from the .gitmodules file and the entry in the .git / config file.
:param module: If True, the module checkout we point to will be deleted
as well. If the module is currently on a commit which is not part
of any branch in the remote, if the currently checked out branch
working tree, or untracked files,
- is ahead of its tracking branch, if you have modifications in the
+ is ahead of its tracking branch, if you have modifications in the
In case the removal of the repository fails for these reasons, the
submodule status will not have been altered.
- If this submodule has child-modules on its own, these will be deleted
+ If this submodule has child - modules on its own, these will be deleted
prior to touching the own module.
:param force: Enforces the deletion of the module even though it contains
- modifications. This basically enforces a brute-force file system based
+ modifications. This basically enforces a brute - force file system based
deletion.
:param configuration: if True, the submodule is deleted from the configuration,
otherwise it isn't. Although this should be enabled most of the times,
@@ -942,7 +961,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
return self
- def set_parent_commit(self, commit, check=True):
+ def set_parent_commit(self, commit: Union[Commit_ish, None], check=True):
"""Set this instance to use the given commit whose tree is supposed to
contain the .gitmodules blob.
@@ -1015,9 +1034,9 @@ class Submodule(IndexObject, IterableObj, Traversable):
"""Rename this submodule
:note: This method takes care of renaming the submodule in various places, such as
- * $parent_git_dir/config
- * $working_tree_dir/.gitmodules
- * (git >=v1.8.0: move submodule repository to new name)
+ * $parent_git_dir / config
+ * $working_tree_dir / .gitmodules
+ * (git >= v1.8.0: move submodule repository to new name)
As .gitmodules will be changed, you would need to make a commit afterwards. The changed .gitmodules file
will already be added to the index
@@ -1091,7 +1110,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
def exists(self):
"""
:return: True if the submodule exists, False otherwise. Please note that
- a submodule may exist (in the .gitmodules file) even though its module
+ a submodule may exist ( in the .gitmodules file) even though its module
doesn't exist on disk"""
# keep attributes for later, and restore them if we have no valid data
# this way we do not actually alter the state of the object
@@ -1131,7 +1150,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
@property
def branch_path(self):
"""
- :return: full (relative) path as string to the branch we would checkout
+ :return: full(relative) path as string to the branch we would checkout
from the remote and track"""
return self._branch_path
@@ -1144,7 +1163,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
@property
def url(self):
- """:return: The url to the repository which our module-repository refers to"""
+ """:return: The url to the repository which our module - repository refers to"""
return self._url
@property
@@ -1160,7 +1179,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
""":return: The name of this submodule. It is used to identify it within the
.gitmodules file.
:note: by default, the name is the path at which to find the submodule, but
- in git-python it should be a unique identifier similar to the identifiers
+ in git - python it should be a unique identifier similar to the identifiers
used for remotes, which allows to change the path of the submodule
easily
"""
@@ -1187,17 +1206,16 @@ class Submodule(IndexObject, IterableObj, Traversable):
#{ Iterable Interface
@classmethod
- def iter_items(cls, repo, parent_commit='HEAD'):
+ def iter_items(cls, repo: 'Repo', parent_commit: Union[Commit_ish, str] = 'HEAD', *Args: Any, **kwargs: Any
+ ) -> Iterator['Submodule']:
""":return: iterator yielding Submodule instances available in the given repository"""
try:
pc = repo.commit(parent_commit) # parent commit instance
parser = cls._config_parser(repo, pc, read_only=True)
except (IOError, BadName):
- return
+ return iter([])
# END handle empty iterator
- rt = pc.tree # root tree
-
for sms in parser.sections():
n = sm_name(sms)
p = parser.get(sms, 'path')
@@ -1210,6 +1228,7 @@ class Submodule(IndexObject, IterableObj, Traversable):
# get the binsha
index = repo.index
try:
+ rt = pc.tree # root tree
sm = rt[p]
except KeyError:
# try the index, maybe it was just added
diff --git a/git/objects/submodule/util.py b/git/objects/submodule/util.py
index b4796b30..5290000b 100644
--- a/git/objects/submodule/util.py
+++ b/git/objects/submodule/util.py
@@ -4,10 +4,12 @@ from git.config import GitConfigParser
from io import BytesIO
import weakref
-from typing import TYPE_CHECKING
+
+from typing import Any, TYPE_CHECKING, Union
if TYPE_CHECKING:
from .base import Submodule
+ from weakref import ReferenceType
__all__ = ('sm_section', 'sm_name', 'mkhead', 'find_first_remote_branch',
'SubmoduleConfigParser')
@@ -58,8 +60,8 @@ class SubmoduleConfigParser(GitConfigParser):
Please note that no mutating method will work in bare mode
"""
- def __init__(self, *args, **kwargs):
- self._smref = None
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ self._smref: Union['ReferenceType[Submodule]', None] = None
self._index = None
self._auto_write = True
super(SubmoduleConfigParser, self).__init__(*args, **kwargs)
@@ -89,7 +91,7 @@ class SubmoduleConfigParser(GitConfigParser):
#} END interface
#{ Overridden Methods
- def write(self):
+ def write(self) -> None:
rval = super(SubmoduleConfigParser, self).write()
self.flush_to_index()
return rval
diff --git a/git/objects/tree.py b/git/objects/tree.py
index 191fe27c..f61a4a87 100644
--- a/git/objects/tree.py
+++ b/git/objects/tree.py
@@ -3,12 +3,13 @@
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
+
from git.util import join_path
import git.diff as diff
from git.util import to_bin_sha
from . import util
-from .base import IndexObject
+from .base import IndexObject, IndexObjUnion
from .blob import Blob
from .submodule.base import Submodule
@@ -20,14 +21,19 @@ from .fun import (
# typing -------------------------------------------------
-from typing import Callable, Dict, Generic, Iterable, Iterator, List, Tuple, Type, TypeVar, Union, cast, TYPE_CHECKING
+from typing import (Callable, Dict, Generic, Iterable, Iterator, List,
+ Tuple, Type, TypeVar, Union, cast, TYPE_CHECKING)
-from git.types import PathLike
+from git.types import PathLike, TypeGuard
if TYPE_CHECKING:
from git.repo import Repo
from io import BytesIO
+T_Tree_cache = TypeVar('T_Tree_cache', bound=Tuple[bytes, int, str])
+TraversedTreeTup = Union[Tuple[Union['Tree', None], IndexObjUnion,
+ Tuple['Submodule', 'Submodule']]]
+
#--------------------------------------------------------
@@ -35,8 +41,6 @@ cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b)
__all__ = ("TreeModifier", "Tree")
-T_Tree_cache = TypeVar('T_Tree_cache', bound=Union[Tuple[bytes, int, str]])
-
def git_cmp(t1: T_Tree_cache, t2: T_Tree_cache) -> int:
a, b = t1[2], t2[2]
@@ -137,8 +141,12 @@ class TreeModifier(Generic[T_Tree_cache], object):
sha = to_bin_sha(sha)
index = self._index_by_name(name)
- assert isinstance(sha, bytes) and isinstance(mode, int) and isinstance(name, str)
- item = cast(T_Tree_cache, (sha, mode, name)) # use Typeguard from typing-extensions 3.10.0
+ def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[T_Tree_cache]:
+ return isinstance(inp[0], bytes) and isinstance(inp[1], int) and isinstance([inp], str)
+
+ item = (sha, mode, name)
+ assert is_tree_cache(item)
+
if index == -1:
self._cache.append(item)
else:
@@ -194,7 +202,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
symlink_id = 0o12
tree_id = 0o04
- _map_id_to_type: Dict[int, Union[Type[Submodule], Type[Blob], Type['Tree']]] = {
+ _map_id_to_type: Dict[int, Type[IndexObjUnion]] = {
commit_id: Submodule,
blob_id: Blob,
symlink_id: Blob
@@ -205,7 +213,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
super(Tree, self).__init__(repo, binsha, mode, path)
@ classmethod
- def _get_intermediate_items(cls, index_object: 'Tree', # type: ignore
+ def _get_intermediate_items(cls, index_object: 'Tree',
) -> Union[Tuple['Tree', ...], Tuple[()]]:
if index_object.type == "tree":
index_object = cast('Tree', index_object)
@@ -222,7 +230,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
# END handle attribute
def _iter_convert_to_object(self, iterable: Iterable[Tuple[bytes, int, str]]
- ) -> Iterator[Union[Blob, 'Tree', Submodule]]:
+ ) -> Iterator[IndexObjUnion]:
"""Iterable yields tuples of (binsha, mode, name), which will be converted
to the respective object representation"""
for binsha, mode, name in iterable:
@@ -233,7 +241,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e
# END for each item
- def join(self, file: str) -> Union[Blob, 'Tree', Submodule]:
+ def join(self, file: str) -> IndexObjUnion:
"""Find the named object in this tree's contents
:return: ``git.Blob`` or ``git.Tree`` or ``git.Submodule``
@@ -266,7 +274,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
raise KeyError(msg % file)
# END handle long paths
- def __truediv__(self, file: str) -> Union['Tree', Blob, Submodule]:
+ def __truediv__(self, file: str) -> IndexObjUnion:
"""For PY3 only"""
return self.join(file)
@@ -289,24 +297,45 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
See the ``TreeModifier`` for more information on how to alter the cache"""
return TreeModifier(self._cache)
- def traverse(self, predicate=lambda i, d: True,
- prune=lambda i, d: False, depth=-1, branch_first=True,
- visit_once=False, ignore_self=1):
- """For documentation, see util.Traversable.traverse
+ def traverse(self, # type: ignore # overrides super()
+ predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True,
+ prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False,
+ depth: int = -1,
+ branch_first: bool = True,
+ visit_once: bool = False,
+ ignore_self: int = 1,
+ as_edge: bool = False
+ ) -> Union[Iterator[IndexObjUnion],
+ Iterator[TraversedTreeTup]]:
+ """For documentation, see util.Traversable._traverse()
Trees are set to visit_once = False to gain more performance in the traversal"""
- return super(Tree, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self)
+
+ # """
+ # # To typecheck instead of using cast.
+ # import itertools
+ # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]:
+ # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1])
+
+ # ret = super(Tree, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self)
+ # ret_tup = itertools.tee(ret, 2)
+ # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}"
+ # return ret_tup[0]"""
+ return cast(Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]],
+ super(Tree, self).traverse(predicate, prune, depth, # type: ignore
+ branch_first, visit_once, ignore_self))
# List protocol
- def __getslice__(self, i: int, j: int) -> List[Union[Blob, 'Tree', Submodule]]:
+
+ def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]:
return list(self._iter_convert_to_object(self._cache[i:j]))
- def __iter__(self) -> Iterator[Union[Blob, 'Tree', Submodule]]:
+ def __iter__(self) -> Iterator[IndexObjUnion]:
return self._iter_convert_to_object(self._cache)
def __len__(self) -> int:
return len(self._cache)
- def __getitem__(self, item: Union[str, int, slice]) -> Union[Blob, 'Tree', Submodule]:
+ def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion:
if isinstance(item, int):
info = self._cache[item]
return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2]))
@@ -318,7 +347,7 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
raise TypeError("Invalid index type: %r" % item)
- def __contains__(self, item: Union[IndexObject, PathLike]) -> bool:
+ def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool:
if isinstance(item, IndexObject):
for info in self._cache:
if item.binsha == info[0]:
diff --git a/git/objects/util.py b/git/objects/util.py
index 8b8148a9..ec81f87e 100644
--- a/git/objects/util.py
+++ b/git/objects/util.py
@@ -7,6 +7,7 @@
from git.util import (
IterableList,
+ IterableObj,
Actor
)
@@ -19,18 +20,24 @@ import calendar
from datetime import datetime, timedelta, tzinfo
# typing ------------------------------------------------------------
-from typing import (Any, Callable, Deque, Iterator, TypeVar, TYPE_CHECKING, Tuple, Type, Union, cast)
+from typing import (Any, Callable, Deque, Iterator, NamedTuple, overload, Sequence,
+ TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast)
+
+from git.types import Literal, TypeGuard
if TYPE_CHECKING:
from io import BytesIO, StringIO
- from .submodule.base import Submodule # noqa: F401
from .commit import Commit
from .blob import Blob
from .tag import TagObject
- from .tree import Tree
+ from .tree import Tree, TraversedTreeTup
from subprocess import Popen
-
-T_Iterableobj = TypeVar('T_Iterableobj')
+
+
+T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse()
+
+TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule
+ 'TraversedTreeTup'] # for tree.traverse()
# --------------------------------------------------------------------
@@ -287,7 +294,7 @@ class Traversable(object):
__slots__ = ()
@classmethod
- def _get_intermediate_items(cls, item):
+ def _get_intermediate_items(cls, item) -> Sequence['Traversable']:
"""
Returns:
Tuple of items connected to the given item.
@@ -299,23 +306,30 @@ class Traversable(object):
"""
raise NotImplementedError("To be implemented in subclass")
- def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList:
+ def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList['TraversableIterableObj']:
"""
:return: IterableList with the results of the traversal as produced by
- traverse()"""
- out: IterableList = IterableList(self._id_attribute_) # type: ignore[attr-defined] # defined in sublcasses
+ traverse()
+ List objects must be IterableObj and Traversable e.g. Commit, Submodule"""
+
+ def is_TraversableIterableObj(inp: 'Traversable') -> TypeGuard['TraversableIterableObj']:
+ # return isinstance(self, TraversableIterableObj)
+ # Can it be anythin else?
+ return isinstance(self, Traversable)
+
+ assert is_TraversableIterableObj(self), f"{type(self)}"
+ out: IterableList['TraversableIterableObj'] = IterableList(self._id_attribute_)
out.extend(self.traverse(*args, **kwargs))
return out
def traverse(self,
- predicate: Callable[[object, int], bool] = lambda i, d: True,
- prune: Callable[[object, int], bool] = lambda i, d: False,
- depth: int = -1,
- branch_first: bool = True,
- visit_once: bool = True, ignore_self: int = 1, as_edge: bool = False
- ) -> Union[Iterator['Traversable'], Iterator[Tuple['Traversable', 'Traversable']]]:
+ predicate: Callable[[Union['Traversable', TraversedTup], int], bool] = lambda i, d: True,
+ prune: Callable[[Union['Traversable', TraversedTup], int], bool] = lambda i, d: False,
+ depth: int = -1, branch_first: bool = True, visit_once: bool = True,
+ ignore_self: int = 1, as_edge: bool = False
+ ) -> Union[Iterator[Union['Traversable', 'Blob']],
+ Iterator[TraversedTup]]:
""":return: iterator yielding of items found when traversing self
-
:param predicate: f(i,d) returns False if item i at depth d should not be included in the result
:param prune:
@@ -344,21 +358,37 @@ class Traversable(object):
if True, return a pair of items, first being the source, second the
destination, i.e. tuple(src, dest) with the edge spanning from
source to destination"""
+
+ """
+ Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]]
+ Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]]
+ Tree -> Iterator[Union[Blob, Tree, Submodule,
+ Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]]
+
+ ignore_self=True is_edge=True -> Iterator[item]
+ ignore_self=True is_edge=False --> Iterator[item]
+ ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]]
+ ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]"""
+ class TraverseNT(NamedTuple):
+ depth: int
+ item: 'Traversable'
+ src: Union['Traversable', None]
+
visited = set()
- stack = deque() # type: Deque[Tuple[int, Traversable, Union[Traversable, None]]]
- stack.append((0, self, None)) # self is always depth level 0
+ stack = deque() # type: Deque[TraverseNT]
+ stack.append(TraverseNT(0, self, None)) # self is always depth level 0
- def addToStack(stack: Deque[Tuple[int, 'Traversable', Union['Traversable', None]]],
- item: 'Traversable',
+ def addToStack(stack: Deque[TraverseNT],
+ src_item: 'Traversable',
branch_first: bool,
- depth) -> None:
+ depth: int) -> None:
lst = self._get_intermediate_items(item)
- if not lst:
+ if not lst: # empty list
return None
if branch_first:
- stack.extendleft((depth, i, item) for i in lst)
+ stack.extendleft(TraverseNT(depth, i, src_item) for i in lst)
else:
- reviter = ((depth, lst[i], item) for i in range(len(lst) - 1, -1, -1))
+ reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1))
stack.extend(reviter)
# END addToStack local method
@@ -371,7 +401,12 @@ class Traversable(object):
if visit_once:
visited.add(item)
- rval = (as_edge and (src, item)) or item
+ rval: Union['Traversable', Tuple[Union[None, 'Traversable'], 'Traversable']]
+ if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item)
+ rval = (src, item)
+ else:
+ rval = item
+
if prune(rval, d):
continue
@@ -405,3 +440,73 @@ class Serializable(object):
:param stream: a file-like object
:return: self"""
raise NotImplementedError("To be implemented in subclass")
+
+
+class TraversableIterableObj(Traversable, IterableObj):
+ __slots__ = ()
+
+ TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj]
+
+ @overload # type: ignore
+ def traverse(self: T_TIobj,
+ predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
+ prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
+ depth: int, branch_first: bool, visit_once: bool,
+ ignore_self: Literal[True],
+ as_edge: Literal[False],
+ ) -> Iterator[T_TIobj]:
+ ...
+
+ @overload
+ def traverse(self: T_TIobj,
+ predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
+ prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
+ depth: int, branch_first: bool, visit_once: bool,
+ ignore_self: Literal[False],
+ as_edge: Literal[True],
+ ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]:
+ ...
+
+ @overload
+ def traverse(self: T_TIobj,
+ predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
+ prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
+ depth: int, branch_first: bool, visit_once: bool,
+ ignore_self: Literal[True],
+ as_edge: Literal[True],
+ ) -> Iterator[Tuple[T_TIobj, T_TIobj]]:
+ ...
+
+ def traverse(self: T_TIobj,
+ predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int],
+ bool] = lambda i, d: True,
+ prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int],
+ bool] = lambda i, d: False,
+ depth: int = -1, branch_first: bool = True, visit_once: bool = True,
+ ignore_self: int = 1, as_edge: bool = False
+ ) -> Union[Iterator[T_TIobj],
+ Iterator[Tuple[T_TIobj, T_TIobj]],
+ Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]]:
+ """For documentation, see util.Traversable._traverse()"""
+
+ """
+ # To typecheck instead of using cast.
+ import itertools
+ from git.types import TypeGuard
+ def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]:
+ for x in inp[1]:
+ if not isinstance(x, tuple) and len(x) != 2:
+ if all(isinstance(inner, Commit) for inner in x):
+ continue
+ return True
+
+ ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge)
+ ret_tup = itertools.tee(ret, 2)
+ assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}"
+ return ret_tup[0]
+ """
+ return cast(Union[Iterator[T_TIobj],
+ Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]],
+ super(TraversableIterableObj, self).traverse(
+ predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore
+ ))