summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <sebastian.thiel@icloud.com>2021-07-11 12:30:59 +0800
committerGitHub <noreply@github.com>2021-07-11 12:30:59 +0800
commit9523033fd1ff499ae3d151b23b851b7e2b64bf75 (patch)
treec2f7b467186269926c48c4b2cf02d8364dc39751
parent0a6d9d669979cc5a89ca73f8f4843cba47b4486a (diff)
parent94c66525a6e7d5c74a9aee65d14630bb674439f7 (diff)
downloadgitpython-9523033fd1ff499ae3d151b23b851b7e2b64bf75.tar.gz
Merge pull request #1285 from Yobmod/main
Finish initial typing of Index and Submodule
-rw-r--r--git/cmd.py2
-rw-r--r--git/config.py43
-rw-r--r--git/diff.py101
-rw-r--r--git/index/base.py97
-rw-r--r--git/index/fun.py62
-rw-r--r--git/index/util.py21
-rw-r--r--git/objects/commit.py2
-rw-r--r--git/objects/fun.py61
-rw-r--r--git/objects/submodule/base.py89
-rw-r--r--git/objects/submodule/root.py34
-rw-r--r--git/objects/submodule/util.py23
-rw-r--r--git/objects/tree.py49
-rw-r--r--git/objects/util.py57
-rw-r--r--git/refs/head.py11
-rw-r--r--git/refs/symbolic.py3
-rw-r--r--git/remote.py26
-rw-r--r--git/repo/base.py36
-rw-r--r--git/types.py50
-rw-r--r--git/util.py43
-rw-r--r--mypy.ini5
20 files changed, 504 insertions, 311 deletions
diff --git a/git/cmd.py b/git/cmd.py
index 7df85581..dd887a18 100644
--- a/git/cmd.py
+++ b/git/cmd.py
@@ -831,7 +831,7 @@ class Git(LazyMixin):
except cmd_not_found_exception as err:
raise GitCommandNotFound(redacted_command, err) from err
else:
- proc = cast(Popen, proc)
+ # replace with a typeguard for Popen[bytes]?
proc.stdout = cast(BinaryIO, proc.stdout)
proc.stderr = cast(BinaryIO, proc.stderr)
diff --git a/git/config.py b/git/config.py
index 5c5ceea8..2c863f93 100644
--- a/git/config.py
+++ b/git/config.py
@@ -31,12 +31,14 @@ import configparser as cp
# typing-------------------------------------------------------
-from typing import Any, Callable, IO, List, Dict, Sequence, TYPE_CHECKING, Tuple, Union, cast, overload
+from typing import (Any, Callable, IO, List, Dict, Sequence,
+ TYPE_CHECKING, Tuple, Union, cast, overload)
-from git.types import Literal, Lit_config_levels, PathLike, TBD
+from git.types import Lit_config_levels, ConfigLevels_Tup, PathLike, TBD, assert_never, is_config_level
if TYPE_CHECKING:
from git.repo.base import Repo
+ from io import BytesIO
# -------------------------------------------------------------
@@ -48,8 +50,10 @@ log.addHandler(logging.NullHandler())
# invariants
# represents the configuration level of a configuration file
-CONFIG_LEVELS = ("system", "user", "global", "repository"
- ) # type: Tuple[Literal['system'], Literal['user'], Literal['global'], Literal['repository']]
+
+
+CONFIG_LEVELS: ConfigLevels_Tup = ("system", "user", "global", "repository")
+
# Section pattern to detect conditional includes.
# https://git-scm.com/docs/git-config#_conditional_includes
@@ -229,8 +233,9 @@ def get_config_path(config_level: Lit_config_levels) -> str:
return osp.normpath(osp.expanduser("~/.gitconfig"))
elif config_level == "repository":
raise ValueError("No repo to get repository configuration from. Use Repo._get_config_path")
-
- raise ValueError("Invalid configuration level: %r" % config_level)
+ else:
+ # Should not reach here. Will raise ValueError if does. Static typing will warn missing elifs
+ assert_never(config_level, ValueError(f"Invalid configuration level: {config_level!r}"))
class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, object)): # type: ignore ## mypy does not understand dynamic class creation # noqa: E501
@@ -271,7 +276,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
# list of RawConfigParser methods able to change the instance
_mutating_methods_ = ("add_section", "remove_section", "remove_option", "set")
- def __init__(self, file_or_files: Union[None, PathLike, IO, Sequence[Union[PathLike, IO]]] = None,
+ def __init__(self, file_or_files: Union[None, PathLike, 'BytesIO', Sequence[Union[PathLike, 'BytesIO']]] = None,
read_only: bool = True, merge_includes: bool = True,
config_level: Union[Lit_config_levels, None] = None,
repo: Union['Repo', None] = None) -> None:
@@ -300,13 +305,13 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
self._proxies = self._dict()
if file_or_files is not None:
- self._file_or_files = file_or_files # type: Union[PathLike, IO, Sequence[Union[PathLike, IO]]]
+ self._file_or_files: Union[PathLike, 'BytesIO', Sequence[Union[PathLike, 'BytesIO']]] = file_or_files
else:
if config_level is None:
if read_only:
- self._file_or_files = [get_config_path(f) # type: ignore
- for f in CONFIG_LEVELS # Can type f properly when 3.5 dropped
- if f != 'repository']
+ self._file_or_files = [get_config_path(f)
+ for f in CONFIG_LEVELS
+ if is_config_level(f) and f != 'repository']
else:
raise ValueError("No configuration level or configuration files specified")
else:
@@ -323,15 +328,13 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
def _acquire_lock(self) -> None:
if not self._read_only:
if not self._lock:
- if isinstance(self._file_or_files, (tuple, list)):
- raise ValueError(
- "Write-ConfigParsers can operate on a single file only, multiple files have been passed")
- # END single file check
-
if isinstance(self._file_or_files, (str, os.PathLike)):
file_or_files = self._file_or_files
+ elif isinstance(self._file_or_files, (tuple, list, Sequence)):
+ raise ValueError(
+ "Write-ConfigParsers can operate on a single file only, multiple files have been passed")
else:
- file_or_files = cast(IO, self._file_or_files).name
+ file_or_files = self._file_or_files.name
# END get filename from handle/stream
# initialize lock base - we want to write
@@ -649,7 +652,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
a file lock"""
self._assure_writable("write")
if not self._dirty:
- return
+ return None
if isinstance(self._file_or_files, (list, tuple)):
raise AssertionError("Cannot write back if there is not exactly a single file to write to, have %i files"
@@ -665,7 +668,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
fp = self._file_or_files
# we have a physical file on disk, so get a lock
- is_file_lock = isinstance(fp, (str, IOBase)) # can't use Pathlike until 3.5 dropped
+ is_file_lock = isinstance(fp, (str, os.PathLike, IOBase)) # can't use Pathlike until 3.5 dropped
if is_file_lock and self._lock is not None: # else raise Error?
self._lock._obtain_lock()
@@ -674,7 +677,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje
with open(fp, "wb") as fp_open:
self._write(fp_open)
else:
- fp = cast(IO, fp)
+ fp = cast('BytesIO', fp)
fp.seek(0)
# make sure we do not overwrite into an existing file
if hasattr(fp, 'truncate'):
diff --git a/git/diff.py b/git/diff.py
index 346a2ca7..51dac390 100644
--- a/git/diff.py
+++ b/git/diff.py
@@ -15,19 +15,26 @@ from .objects.util import mode_str_to_int
# typing ------------------------------------------------------------------
-from typing import Any, Iterator, List, Match, Optional, Tuple, Type, Union, TYPE_CHECKING
-from git.types import PathLike, TBD, Literal
+from typing import Any, Iterator, List, Match, Optional, Tuple, Type, TypeVar, Union, TYPE_CHECKING
+from git.types import PathLike, TBD, Literal, TypeGuard
if TYPE_CHECKING:
from .objects.tree import Tree
+ from .objects import Commit
from git.repo.base import Repo
-
+ from git.objects.base import IndexObject
from subprocess import Popen
-Lit_change_type = Literal['A', 'D', 'M', 'R', 'T']
+Lit_change_type = Literal['A', 'D', 'C', 'M', 'R', 'T', 'U']
+
+
+def is_change_type(inp: str) -> TypeGuard[Lit_change_type]:
+ # return True
+ return inp in ['A', 'D', 'C', 'M', 'R', 'T', 'U']
# ------------------------------------------------------------------------
+
__all__ = ('Diffable', 'DiffIndex', 'Diff', 'NULL_TREE')
# Special object to compare against the empty tree in diffs
@@ -75,7 +82,8 @@ class Diffable(object):
class Index(object):
pass
- def _process_diff_args(self, args: List[Union[str, 'Diffable', object]]) -> List[Union[str, 'Diffable', object]]:
+ def _process_diff_args(self, args: List[Union[str, 'Diffable', Type['Diffable.Index'], object]]
+ ) -> List[Union[str, 'Diffable', Type['Diffable.Index'], object]]:
"""
:return:
possibly altered version of the given args list.
@@ -83,7 +91,7 @@ class Diffable(object):
Subclasses can use it to alter the behaviour of the superclass"""
return args
- def diff(self, other: Union[Type[Index], Type['Tree'], object, None, str] = Index,
+ def diff(self, other: Union[Type['Index'], 'Tree', 'Commit', None, str, object] = Index,
paths: Union[PathLike, List[PathLike], Tuple[PathLike, ...], None] = None,
create_patch: bool = False, **kwargs: Any) -> 'DiffIndex':
"""Creates diffs between two items being trees, trees and index or an
@@ -116,7 +124,7 @@ class Diffable(object):
:note:
On a bare repository, 'other' needs to be provided as Index or as
as Tree/Commit, or a git command error will occur"""
- args = [] # type: List[Union[str, Diffable, object]]
+ args: List[Union[PathLike, Diffable, Type['Diffable.Index'], object]] = []
args.append("--abbrev=40") # we need full shas
args.append("--full-index") # get full index paths, not only filenames
@@ -134,8 +142,8 @@ class Diffable(object):
if paths is not None and not isinstance(paths, (tuple, list)):
paths = [paths]
- if hasattr(self, 'repo'): # else raise Error?
- self.repo = self.repo # type: 'Repo'
+ if hasattr(self, 'Has_Repo'):
+ self.repo: Repo = self.repo
diff_cmd = self.repo.git.diff
if other is self.Index:
@@ -169,7 +177,10 @@ class Diffable(object):
return index
-class DiffIndex(list):
+T_Diff = TypeVar('T_Diff', bound='Diff')
+
+
+class DiffIndex(List[T_Diff]):
"""Implements an Index for diffs, allowing a list of Diffs to be queried by
the diff properties.
@@ -183,7 +194,7 @@ class DiffIndex(list):
# T = Changed in the type
change_type = ("A", "C", "D", "R", "M", "T")
- def iter_change_type(self, change_type: Lit_change_type) -> Iterator['Diff']:
+ def iter_change_type(self, change_type: Lit_change_type) -> Iterator[T_Diff]:
"""
:return:
iterator yielding Diff instances that match the given change_type
@@ -200,19 +211,19 @@ class DiffIndex(list):
if change_type not in self.change_type:
raise ValueError("Invalid change type: %s" % change_type)
- for diff in self: # type: 'Diff'
- if diff.change_type == change_type:
- yield diff
- elif change_type == "A" and diff.new_file:
- yield diff
- elif change_type == "D" and diff.deleted_file:
- yield diff
- elif change_type == "C" and diff.copied_file:
- yield diff
- elif change_type == "R" and diff.renamed:
- yield diff
- elif change_type == "M" and diff.a_blob and diff.b_blob and diff.a_blob != diff.b_blob:
- yield diff
+ for diffidx in self:
+ if diffidx.change_type == change_type:
+ yield diffidx
+ elif change_type == "A" and diffidx.new_file:
+ yield diffidx
+ elif change_type == "D" and diffidx.deleted_file:
+ yield diffidx
+ elif change_type == "C" and diffidx.copied_file:
+ yield diffidx
+ elif change_type == "R" and diffidx.renamed:
+ yield diffidx
+ elif change_type == "M" and diffidx.a_blob and diffidx.b_blob and diffidx.a_blob != diffidx.b_blob:
+ yield diffidx
# END for each diff
@@ -281,7 +292,7 @@ class Diff(object):
a_mode: Union[bytes, str, None], b_mode: Union[bytes, str, None],
new_file: bool, deleted_file: bool, copied_file: bool,
raw_rename_from: Optional[bytes], raw_rename_to: Optional[bytes],
- diff: Union[str, bytes, None], change_type: Optional[str], score: Optional[int]) -> None:
+ diff: Union[str, bytes, None], change_type: Optional[Lit_change_type], score: Optional[int]) -> None:
assert a_rawpath is None or isinstance(a_rawpath, bytes)
assert b_rawpath is None or isinstance(b_rawpath, bytes)
@@ -300,19 +311,21 @@ class Diff(object):
repo = submodule.module()
break
+ self.a_blob: Union['IndexObject', None]
if a_blob_id is None or a_blob_id == self.NULL_HEX_SHA:
self.a_blob = None
else:
self.a_blob = Blob(repo, hex_to_bin(a_blob_id), mode=self.a_mode, path=self.a_path)
+ self.b_blob: Union['IndexObject', None]
if b_blob_id is None or b_blob_id == self.NULL_HEX_SHA:
self.b_blob = None
else:
self.b_blob = Blob(repo, hex_to_bin(b_blob_id), mode=self.b_mode, path=self.b_path)
- self.new_file = new_file
- self.deleted_file = deleted_file
- self.copied_file = copied_file
+ self.new_file: bool = new_file
+ self.deleted_file: bool = deleted_file
+ self.copied_file: bool = copied_file
# be clear and use None instead of empty strings
assert raw_rename_from is None or isinstance(raw_rename_from, bytes)
@@ -321,7 +334,7 @@ class Diff(object):
self.raw_rename_to = raw_rename_to or None
self.diff = diff
- self.change_type = change_type
+ self.change_type: Union[Lit_change_type, None] = change_type
self.score = score
def __eq__(self, other: object) -> bool:
@@ -386,36 +399,36 @@ class Diff(object):
# end
return res
- @property
+ @ property
def a_path(self) -> Optional[str]:
return self.a_rawpath.decode(defenc, 'replace') if self.a_rawpath else None
- @property
+ @ property
def b_path(self) -> Optional[str]:
return self.b_rawpath.decode(defenc, 'replace') if self.b_rawpath else None
- @property
+ @ property
def rename_from(self) -> Optional[str]:
return self.raw_rename_from.decode(defenc, 'replace') if self.raw_rename_from else None
- @property
+ @ property
def rename_to(self) -> Optional[str]:
return self.raw_rename_to.decode(defenc, 'replace') if self.raw_rename_to else None
- @property
+ @ property
def renamed(self) -> bool:
""":returns: True if the blob of our diff has been renamed
:note: This property is deprecated, please use ``renamed_file`` instead.
"""
return self.renamed_file
- @property
+ @ property
def renamed_file(self) -> bool:
""":returns: True if the blob of our diff has been renamed
"""
return self.rename_from != self.rename_to
- @classmethod
+ @ classmethod
def _pick_best_path(cls, path_match: bytes, rename_match: bytes, path_fallback_match: bytes) -> Optional[bytes]:
if path_match:
return decode_path(path_match)
@@ -428,7 +441,7 @@ class Diff(object):
return None
- @classmethod
+ @ classmethod
def _index_from_patch_format(cls, repo: 'Repo', proc: TBD) -> DiffIndex:
"""Create a new DiffIndex from the given text which must be in patch format
:param repo: is the repository we are operating on - it is required
@@ -441,7 +454,7 @@ class Diff(object):
# for now, we have to bake the stream
text = b''.join(text_list)
- index = DiffIndex()
+ index: 'DiffIndex' = DiffIndex()
previous_header = None
header = None
a_path, b_path = None, None # for mypy
@@ -491,19 +504,21 @@ class Diff(object):
return index
- @staticmethod
+ @ staticmethod
def _handle_diff_line(lines_bytes: bytes, repo: 'Repo', index: DiffIndex) -> None:
lines = lines_bytes.decode(defenc)
for line in lines.split(':')[1:]:
meta, _, path = line.partition('\x00')
path = path.rstrip('\x00')
- a_blob_id, b_blob_id = None, None # Type: Optional[str]
+ a_blob_id: Optional[str]
+ b_blob_id: Optional[str]
old_mode, new_mode, a_blob_id, b_blob_id, _change_type = meta.split(None, 4)
# Change type can be R100
# R: status letter
# 100: score (in case of copy and rename)
- change_type = _change_type[0]
+ assert is_change_type(_change_type[0]), f"Unexpected value for change_type received: {_change_type[0]}"
+ change_type: Lit_change_type = _change_type[0]
score_str = ''.join(_change_type[1:])
score = int(score_str) if score_str.isdigit() else None
path = path.strip()
@@ -543,14 +558,14 @@ class Diff(object):
'', change_type, score)
index.append(diff)
- @classmethod
+ @ classmethod
def _index_from_raw_format(cls, repo: 'Repo', proc: 'Popen') -> 'DiffIndex':
"""Create a new DiffIndex from the given stream which must be in raw format.
:return: git.DiffIndex"""
# handles
# :100644 100644 687099101... 37c5e30c8... M .gitignore
- index = DiffIndex()
+ index: 'DiffIndex' = DiffIndex()
handle_process_output(proc, lambda byt: cls._handle_diff_line(byt, repo, index),
None, finalize_process, decode_streams=False)
diff --git a/git/index/base.py b/git/index/base.py
index f4ffba7b..3aa06e38 100644
--- a/git/index/base.py
+++ b/git/index/base.py
@@ -18,6 +18,7 @@ from git.compat import (
from git.exc import (
GitCommandError,
CheckoutError,
+ GitError,
InvalidGitRepositoryError
)
from git.objects import (
@@ -40,7 +41,7 @@ from git.util import (
from gitdb.base import IStream
from gitdb.db import MemoryDB
-import git.diff as diff
+import git.diff as git_diff
import os.path as osp
from .fun import (
@@ -66,10 +67,10 @@ from .util import (
# typing -----------------------------------------------------------------------------
-from typing import (Any, BinaryIO, Callable, Dict, IO, Iterable, Iterator, List,
- Sequence, TYPE_CHECKING, Tuple, Union)
+from typing import (Any, BinaryIO, Callable, Dict, IO, Iterable, Iterator, List, NoReturn,
+ Sequence, TYPE_CHECKING, Tuple, Type, Union)
-from git.types import PathLike, TBD
+from git.types import Commit_ish, PathLike, TBD
if TYPE_CHECKING:
from subprocess import Popen
@@ -87,7 +88,7 @@ Treeish = Union[Tree, Commit, str, bytes]
__all__ = ('IndexFile', 'CheckoutError')
-class IndexFile(LazyMixin, diff.Diffable, Serializable):
+class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
"""
Implements an Index that can be manipulated using a native implementation in
@@ -113,7 +114,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
_VERSION = 2 # latest version we support
S_IFGITLINK = S_IFGITLINK # a submodule
- def __init__(self, repo: 'Repo', file_path: PathLike = None) -> None:
+ def __init__(self, repo: 'Repo', file_path: Union[PathLike, None] = None) -> None:
"""Initialize this Index instance, optionally from the given ``file_path``.
If no file_path is given, we will be created from the current index file.
@@ -372,13 +373,13 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
# UTILITIES
@unbare_repo
- def _iter_expand_paths(self, paths: Sequence[PathLike]) -> Iterator[PathLike]:
+ def _iter_expand_paths(self: 'IndexFile', paths: Sequence[PathLike]) -> Iterator[PathLike]:
"""Expand the directories in list of paths to the corresponding paths accordingly,
Note: git will add items multiple times even if a glob overlapped
with manually specified paths or if paths where specified multiple
times - we respect that and do not prune"""
- def raise_exc(e):
+ def raise_exc(e: Exception) -> NoReturn:
raise e
r = str(self.repo.working_tree_dir)
rs = r + os.sep
@@ -410,7 +411,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
# whose name contains wildcard characters.
if abs_path not in resolved_paths:
for f in self._iter_expand_paths(glob.glob(abs_path)):
- yield f.replace(rs, '')
+ yield str(f).replace(rs, '')
continue
# END glob handling
try:
@@ -426,7 +427,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
# END path exception handling
# END for each path
- def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item, fmakeexc, fprogress,
+ def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item: TBD, fmakeexc: Callable[..., GitError],
+ fprogress: Callable[[PathLike, bool, TBD], None],
read_from_stdout: bool = True) -> Union[None, str]:
"""Write path to proc.stdin and make sure it processes the item, including progress.
@@ -498,7 +500,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
line.sort()
return path_map
- @classmethod
+ @ classmethod
def entry_key(cls, *entry: Union[BaseIndexEntry, PathLike, StageType]) -> Tuple[PathLike, StageType]:
return entry_key(*entry)
@@ -570,11 +572,12 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
# note: additional deserialization could be saved if write_tree_from_cache
# would return sorted tree entries
root_tree = Tree(self.repo, binsha, path='')
- root_tree._cache = tree_items # type: ignore
+ root_tree._cache = tree_items # type: ignore # should this be encoded to [bytes, int, str]?
return root_tree
- def _process_diff_args(self, args: List[Union[str, diff.Diffable, object]]
- ) -> List[Union[str, diff.Diffable, object]]:
+ def _process_diff_args(self, # type: ignore[override]
+ args: List[Union[str, 'git_diff.Diffable', Type['git_diff.Diffable.Index']]]
+ ) -> List[Union[str, 'git_diff.Diffable', Type['git_diff.Diffable.Index']]]:
try:
args.pop(args.index(self))
except IndexError:
@@ -593,7 +596,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
raise ValueError("Absolute path %r is not in git repository at %r" % (path, self.repo.working_tree_dir))
return os.path.relpath(path, self.repo.working_tree_dir)
- def _preprocess_add_items(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]]
+ def _preprocess_add_items(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']]
) -> Tuple[List[PathLike], List[BaseIndexEntry]]:
""" Split the items into two lists of path strings and BaseEntries. """
paths = []
@@ -631,11 +634,11 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
return BaseIndexEntry((stat_mode_to_index_mode(st.st_mode),
istream.binsha, 0, to_native_path_linux(filepath)))
- @unbare_repo
- @git_working_dir
+ @ unbare_repo
+ @ git_working_dir
def _entries_for_paths(self, paths: List[str], path_rewriter: Callable, fprogress: Callable,
entries: List[BaseIndexEntry]) -> List[BaseIndexEntry]:
- entries_added = [] # type: List[BaseIndexEntry]
+ entries_added: List[BaseIndexEntry] = []
if path_rewriter:
for path in paths:
if osp.isabs(path):
@@ -664,8 +667,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
# END path handling
return entries_added
- def add(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]], force: bool = True,
- fprogress: Callable = lambda *args: None, path_rewriter: Callable = None,
+ def add(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']], force: bool = True,
+ fprogress: Callable = lambda *args: None, path_rewriter: Union[Callable[..., PathLike], None] = None,
write: bool = True, write_extension_data: bool = False) -> List[BaseIndexEntry]:
"""Add files from the working tree, specific blobs or BaseIndexEntries
to the index.
@@ -769,7 +772,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
# automatically
# paths can be git-added, for everything else we use git-update-index
paths, entries = self._preprocess_add_items(items)
- entries_added = []
+ entries_added: List[BaseIndexEntry] = []
# This code needs a working tree, therefore we try not to run it unless required.
# That way, we are OK on a bare repository as well.
# If there are no paths, the rewriter has nothing to do either
@@ -788,8 +791,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
# create objects if required, otherwise go with the existing shas
null_entries_indices = [i for i, e in enumerate(entries) if e.binsha == Object.NULL_BIN_SHA]
if null_entries_indices:
- @git_working_dir
- def handle_null_entries(self):
+ @ git_working_dir
+ def handle_null_entries(self: 'IndexFile') -> None:
for ei in null_entries_indices:
null_entry = entries[ei]
new_entry = self._store_path(null_entry.path, fprogress)
@@ -833,12 +836,13 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
return entries_added
- def _items_to_rela_paths(self, items):
+ def _items_to_rela_paths(self, items: Union[PathLike, Sequence[Union[PathLike, BaseIndexEntry, Blob, Submodule]]]
+ ) -> List[PathLike]:
"""Returns a list of repo-relative paths from the given items which
may be absolute or relative paths, entries or blobs"""
paths = []
# if string put in list
- if isinstance(items, str):
+ if isinstance(items, (str, os.PathLike)):
items = [items]
for item in items:
@@ -851,9 +855,9 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
# END for each item
return paths
- @post_clear_cache
- @default_index
- def remove(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]], working_tree: bool = False,
+ @ post_clear_cache
+ @ default_index
+ def remove(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']], working_tree: bool = False,
**kwargs: Any) -> List[str]:
"""Remove the given items from the index and optionally from
the working tree as well.
@@ -903,9 +907,9 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
# rm 'path'
return [p[4:-1] for p in removed_paths]
- @post_clear_cache
- @default_index
- def move(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]], skip_errors: bool = False,
+ @ post_clear_cache
+ @ default_index
+ def move(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']], skip_errors: bool = False,
**kwargs: Any) -> List[Tuple[str, str]]:
"""Rename/move the items, whereas the last item is considered the destination of
the move operation. If the destination is a file, the first item ( of two )
@@ -968,8 +972,14 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
return out
- def commit(self, message: str, parent_commits=None, head: bool = True, author: Union[None, 'Actor'] = None,
- committer: Union[None, 'Actor'] = None, author_date: str = None, commit_date: str = None,
+ def commit(self,
+ message: str,
+ parent_commits: Union[Commit_ish, None] = None,
+ head: bool = True,
+ author: Union[None, 'Actor'] = None,
+ committer: Union[None, 'Actor'] = None,
+ author_date: Union[str, None] = None,
+ commit_date: Union[str, None] = None,
skip_hooks: bool = False) -> Commit:
"""Commit the current default index file, creating a commit object.
For more information on the arguments, see tree.commit.
@@ -1023,7 +1033,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
proc.wait()
return stdout
- @default_index
+ @ default_index
def checkout(self, paths: Union[None, Iterable[PathLike]] = None, force: bool = False,
fprogress: Callable = lambda *args: None, **kwargs: Any
) -> Union[None, Iterator[PathLike], Sequence[PathLike]]:
@@ -1192,7 +1202,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
# END paths handling
assert "Should not reach this point"
- @default_index
+ @ default_index
def reset(self, commit: Union[Commit, 'Reference', str] = 'HEAD', working_tree: bool = False,
paths: Union[None, Iterable[PathLike]] = None,
head: bool = False, **kwargs: Any) -> 'IndexFile':
@@ -1262,10 +1272,12 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
return self
- @default_index
- def diff(self, other: Union[diff.Diffable.Index, 'IndexFile.Index', Treeish, None, object] = diff.Diffable.Index,
- paths: Union[str, List[PathLike], Tuple[PathLike, ...]] = None, create_patch: bool = False, **kwargs: Any
- ) -> diff.DiffIndex:
+ # @ default_index, breaks typing for some reason, copied into function
+ def diff(self, # type: ignore[override]
+ other: Union[Type['git_diff.Diffable.Index'], 'Tree', 'Commit', str, None] = git_diff.Diffable.Index,
+ paths: Union[PathLike, List[PathLike], Tuple[PathLike, ...], None] = None,
+ create_patch: bool = False, **kwargs: Any
+ ) -> git_diff.DiffIndex:
"""Diff this index against the working copy or a Tree or Commit object
For a documentation of the parameters and return values, see,
@@ -1275,9 +1287,14 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
Will only work with indices that represent the default git index as
they have not been initialized with a stream.
"""
+
+ # only run if we are the default repository index
+ if self._file_path != self._index_path():
+ raise AssertionError(
+ "Cannot call %r on indices that do not represent the default git index" % self.diff())
# index against index is always empty
if other is self.Index:
- return diff.DiffIndex()
+ return git_diff.DiffIndex()
# index against anything but None is a reverse diff with the respective
# item. Handle existing -R flags properly. Transform strings to the object
diff --git a/git/index/fun.py b/git/index/fun.py
index ffd109b1..e5e566a0 100644
--- a/git/index/fun.py
+++ b/git/index/fun.py
@@ -57,6 +57,8 @@ from git.types import PathLike, TypeGuard
if TYPE_CHECKING:
from .base import IndexFile
+ from git.objects.tree import TreeCacheTup
+ # from git.objects.fun import EntryTupOrNone
# ------------------------------------------------------------------------------------
@@ -186,16 +188,15 @@ def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, i
""":return: Key suitable to be used for the index.entries dictionary
:param entry: One instance of type BaseIndexEntry or the path and the stage"""
- def is_entry_tuple(entry: Tuple) -> TypeGuard[Tuple[PathLike, int]]:
- return isinstance(entry, tuple) and len(entry) == 2
-
+ def is_entry_key_tup(entry_key: Tuple) -> TypeGuard[Tuple[PathLike, int]]:
+ return isinstance(entry_key, tuple) and len(entry_key) == 2
+
if len(entry) == 1:
entry_first = entry[0]
assert isinstance(entry_first, BaseIndexEntry)
return (entry_first.path, entry_first.stage)
else:
- # entry = tuple(entry)
- assert is_entry_tuple(entry)
+ assert is_entry_key_tup(entry)
return entry
# END handle entry
@@ -249,7 +250,7 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde
def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0
- ) -> Tuple[bytes, List[Tuple[str, int, str]]]:
+ ) -> Tuple[bytes, List['TreeCacheTup']]:
"""Create a tree from the given sorted list of entries and put the respective
trees into the given object database
@@ -259,8 +260,8 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0
:param sl: slice indicating the range we should process on the entries list
:return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of
tree entries being a tuple of hexsha, mode, name"""
- tree_items = [] # type: List[Tuple[Union[bytes, str], int, str]]
- tree_items_append = tree_items.append
+ tree_items: List['TreeCacheTup'] = []
+
ci = sl.start
end = sl.stop
while ci < end:
@@ -272,7 +273,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0
rbound = entry.path.find('/', si)
if rbound == -1:
# its not a tree
- tree_items_append((entry.binsha, entry.mode, entry.path[si:]))
+ tree_items.append((entry.binsha, entry.mode, entry.path[si:]))
else:
# find common base range
base = entry.path[si:rbound]
@@ -289,7 +290,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0
# enter recursion
# ci - 1 as we want to count our current item as well
sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1)
- tree_items_append((sha, S_IFDIR, base))
+ tree_items.append((sha, S_IFDIR, base))
# skip ahead
ci = xi
@@ -298,15 +299,14 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0
# finally create the tree
sio = BytesIO()
- tree_to_stream(tree_items, sio.write) # converts bytes of each item[0] to str
- tree_items_stringified = cast(List[Tuple[str, int, str]], tree_items)
+ tree_to_stream(tree_items, sio.write) # writes to stream as bytes, but doesnt change tree_items
sio.seek(0)
istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio))
- return (istream.binsha, tree_items_stringified)
+ return (istream.binsha, tree_items)
-def _tree_entry_to_baseindexentry(tree_entry: Tuple[str, int, str], stage: int) -> BaseIndexEntry:
+def _tree_entry_to_baseindexentry(tree_entry: 'TreeCacheTup', stage: int) -> BaseIndexEntry:
return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2]))
@@ -319,14 +319,13 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr
:param tree_shas: 1, 2 or 3 trees as identified by their binary 20 byte shas
If 1 or two, the entries will effectively correspond to the last given tree
If 3 are given, a 3 way merge is performed"""
- out = [] # type: List[BaseIndexEntry]
- out_append = out.append
+ out: List[BaseIndexEntry] = []
# one and two way is the same for us, as we don't have to handle an existing
# index, instrea
if len(tree_shas) in (1, 2):
for entry in traverse_tree_recursive(odb, tree_shas[-1], ''):
- out_append(_tree_entry_to_baseindexentry(entry, 0))
+ out.append(_tree_entry_to_baseindexentry(entry, 0))
# END for each entry
return out
# END handle single tree
@@ -347,23 +346,23 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr
if(base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or \
(base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1]):
# changed by both
- out_append(_tree_entry_to_baseindexentry(base, 1))
- out_append(_tree_entry_to_baseindexentry(ours, 2))
- out_append(_tree_entry_to_baseindexentry(theirs, 3))
+ out.append(_tree_entry_to_baseindexentry(base, 1))
+ out.append(_tree_entry_to_baseindexentry(ours, 2))
+ out.append(_tree_entry_to_baseindexentry(theirs, 3))
elif base[0] != ours[0] or base[1] != ours[1]:
# only we changed it
- out_append(_tree_entry_to_baseindexentry(ours, 0))
+ out.append(_tree_entry_to_baseindexentry(ours, 0))
else:
# either nobody changed it, or they did. In either
# case, use theirs
- out_append(_tree_entry_to_baseindexentry(theirs, 0))
+ out.append(_tree_entry_to_baseindexentry(theirs, 0))
# END handle modification
else:
if ours[0] != base[0] or ours[1] != base[1]:
# they deleted it, we changed it, conflict
- out_append(_tree_entry_to_baseindexentry(base, 1))
- out_append(_tree_entry_to_baseindexentry(ours, 2))
+ out.append(_tree_entry_to_baseindexentry(base, 1))
+ out.append(_tree_entry_to_baseindexentry(ours, 2))
# else:
# we didn't change it, ignore
# pass
@@ -376,8 +375,8 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr
else:
if theirs[0] != base[0] or theirs[1] != base[1]:
# deleted in ours, changed theirs, conflict
- out_append(_tree_entry_to_baseindexentry(base, 1))
- out_append(_tree_entry_to_baseindexentry(theirs, 3))
+ out.append(_tree_entry_to_baseindexentry(base, 1))
+ out.append(_tree_entry_to_baseindexentry(theirs, 3))
# END theirs changed
# else:
# theirs didn't change
@@ -388,18 +387,19 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr
# all three can't be None
if ours is None:
# added in their branch
- out_append(_tree_entry_to_baseindexentry(theirs, 0))
+ assert theirs is not None
+ out.append(_tree_entry_to_baseindexentry(theirs, 0))
elif theirs is None:
# added in our branch
- out_append(_tree_entry_to_baseindexentry(ours, 0))
+ out.append(_tree_entry_to_baseindexentry(ours, 0))
else:
# both have it, except for the base, see whether it changed
if ours[0] != theirs[0] or ours[1] != theirs[1]:
- out_append(_tree_entry_to_baseindexentry(ours, 2))
- out_append(_tree_entry_to_baseindexentry(theirs, 3))
+ out.append(_tree_entry_to_baseindexentry(ours, 2))
+ out.append(_tree_entry_to_baseindexentry(theirs, 3))
else:
# it was added the same in both
- out_append(_tree_entry_to_baseindexentry(ours, 0))
+ out.append(_tree_entry_to_baseindexentry(ours, 0))
# END handle two items
# END handle heads
# END handle base exists
diff --git a/git/index/util.py b/git/index/util.py
index 471e9262..4f8af553 100644
--- a/git/index/util.py
+++ b/git/index/util.py
@@ -11,9 +11,12 @@ import os.path as osp
# typing ----------------------------------------------------------------------
-from typing import (Any, Callable)
+from typing import (Any, Callable, TYPE_CHECKING)
-from git.types import PathLike
+from git.types import PathLike, _T
+
+if TYPE_CHECKING:
+ from git.index import IndexFile
# ---------------------------------------------------------------------------------
@@ -52,7 +55,7 @@ class TemporaryFileSwap(object):
#{ Decorators
-def post_clear_cache(func: Callable[..., Any]) -> Callable[..., Any]:
+def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]:
"""Decorator for functions that alter the index using the git command. This would
invalidate our possibly existing entries dictionary which is why it must be
deleted to allow it to be lazily reread later.
@@ -63,7 +66,7 @@ def post_clear_cache(func: Callable[..., Any]) -> Callable[..., Any]:
"""
@wraps(func)
- def post_clear_cache_if_not_raised(self, *args: Any, **kwargs: Any) -> Any:
+ def post_clear_cache_if_not_raised(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T:
rval = func(self, *args, **kwargs)
self._delete_entries_cache()
return rval
@@ -72,13 +75,13 @@ def post_clear_cache(func: Callable[..., Any]) -> Callable[..., Any]:
return post_clear_cache_if_not_raised
-def default_index(func: Callable[..., Any]) -> Callable[..., Any]:
+def default_index(func: Callable[..., _T]) -> Callable[..., _T]:
"""Decorator assuring the wrapped method may only run if we are the default
repository index. This is as we rely on git commands that operate
on that index only. """
@wraps(func)
- def check_default_index(self, *args: Any, **kwargs: Any) -> Any:
+ def check_default_index(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T:
if self._file_path != self._index_path():
raise AssertionError(
"Cannot call %r on indices that do not represent the default git index" % func.__name__)
@@ -88,14 +91,14 @@ def default_index(func: Callable[..., Any]) -> Callable[..., Any]:
return check_default_index
-def git_working_dir(func: Callable[..., Any]) -> Callable[..., None]:
+def git_working_dir(func: Callable[..., _T]) -> Callable[..., _T]:
"""Decorator which changes the current working dir to the one of the git
repository in order to assure relative paths are handled correctly"""
@wraps(func)
- def set_git_working_dir(self, *args: Any, **kwargs: Any) -> None:
+ def set_git_working_dir(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T:
cur_wd = os.getcwd()
- os.chdir(self.repo.working_tree_dir)
+ os.chdir(str(self.repo.working_tree_dir))
try:
return func(self, *args, **kwargs)
finally:
diff --git a/git/objects/commit.py b/git/objects/commit.py
index 81978ae8..65a87591 100644
--- a/git/objects/commit.py
+++ b/git/objects/commit.py
@@ -80,7 +80,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
"message", "parents", "encoding", "gpgsig")
_id_attribute_ = "hexsha"
- def __init__(self, repo: 'Repo', binsha: bytes, tree: 'Tree' = None,
+ def __init__(self, repo: 'Repo', binsha: bytes, tree: Union['Tree', None] = None,
author: Union[Actor, None] = None,
authored_date: Union[int, None] = None,
author_tz_offset: Union[None, float] = None,
diff --git a/git/objects/fun.py b/git/objects/fun.py
index 339a53b8..fc2ea1e7 100644
--- a/git/objects/fun.py
+++ b/git/objects/fun.py
@@ -1,6 +1,7 @@
"""Module with functions which are supposed to be as fast as possible"""
from stat import S_ISDIR
+
from git.compat import (
safe_decode,
defenc
@@ -8,8 +9,14 @@ from git.compat import (
# typing ----------------------------------------------
-from typing import List, Tuple
+from typing import Callable, List, MutableSequence, Sequence, Tuple, TYPE_CHECKING, Union, overload
+
+if TYPE_CHECKING:
+ from _typeshed import ReadableBuffer
+ from git import GitCmdObjectDB
+EntryTup = Tuple[bytes, int, str] # same as TreeCacheTup in tree.py
+EntryTupOrNone = Union[EntryTup, None]
# ---------------------------------------------------
@@ -18,7 +25,7 @@ __all__ = ('tree_to_stream', 'tree_entries_from_data', 'traverse_trees_recursive
'traverse_tree_recursive')
-def tree_to_stream(entries, write):
+def tree_to_stream(entries: Sequence[EntryTup], write: Callable[['ReadableBuffer'], Union[int, None]]) -> None:
"""Write the give list of entries into a stream using its write method
:param entries: **sorted** list of tuples with (binsha, mode, name)
:param write: write method which takes a data string"""
@@ -42,12 +49,14 @@ def tree_to_stream(entries, write):
# According to my tests, this is exactly what git does, that is it just
# takes the input literally, which appears to be utf8 on linux.
if isinstance(name, str):
- name = name.encode(defenc)
- write(b''.join((mode_str, b' ', name, b'\0', binsha)))
+ name_bytes = name.encode(defenc)
+ else:
+ name_bytes = name
+ write(b''.join((mode_str, b' ', name_bytes, b'\0', binsha)))
# END for each item
-def tree_entries_from_data(data: bytes) -> List[Tuple[bytes, int, str]]:
+def tree_entries_from_data(data: bytes) -> List[EntryTup]:
"""Reads the binary representation of a tree and returns tuples of Tree items
:param data: data block with tree data (as bytes)
:return: list(tuple(binsha, mode, tree_relative_path), ...)"""
@@ -93,11 +102,13 @@ def tree_entries_from_data(data: bytes) -> List[Tuple[bytes, int, str]]:
return out
-def _find_by_name(tree_data, name, is_dir, start_at):
+def _find_by_name(tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: bool, start_at: int
+ ) -> EntryTupOrNone:
"""return data entry matching the given name and tree mode
or None.
Before the item is returned, the respective data item is set
None in the tree_data list to mark it done"""
+
try:
item = tree_data[start_at]
if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
@@ -115,16 +126,27 @@ def _find_by_name(tree_data, name, is_dir, start_at):
return None
-def _to_full_path(item, path_prefix):
+@ overload
+def _to_full_path(item: None, path_prefix: str) -> None:
+ ...
+
+
+@ overload
+def _to_full_path(item: EntryTup, path_prefix: str) -> EntryTup:
+ ...
+
+
+def _to_full_path(item: EntryTupOrNone, path_prefix: str) -> EntryTupOrNone:
"""Rebuild entry with given path prefix"""
if not item:
return item
return (item[0], item[1], path_prefix + item[2])
-def traverse_trees_recursive(odb, tree_shas, path_prefix):
+def traverse_trees_recursive(odb: 'GitCmdObjectDB', tree_shas: Sequence[Union[bytes, None]],
+ path_prefix: str) -> List[Tuple[EntryTupOrNone, ...]]:
"""
- :return: list with entries according to the given binary tree-shas.
+ :return: list of list with entries according to the given binary tree-shas.
The result is encoded in a list
of n tuple|None per blob/commit, (n == len(tree_shas)), where
* [0] == 20 byte sha
@@ -137,28 +159,31 @@ def traverse_trees_recursive(odb, tree_shas, path_prefix):
:param path_prefix: a prefix to be added to the returned paths on this level,
set it '' for the first iteration
:note: The ordering of the returned items will be partially lost"""
- trees_data = []
+ trees_data: List[List[EntryTupOrNone]] = []
+
nt = len(tree_shas)
for tree_sha in tree_shas:
if tree_sha is None:
- data = []
+ data: List[EntryTupOrNone] = []
else:
- data = tree_entries_from_data(odb.stream(tree_sha).read())
+ # make new list for typing as list invariant
+ data = [x for x in tree_entries_from_data(odb.stream(tree_sha).read())]
# END handle muted trees
trees_data.append(data)
# END for each sha to get data for
- out = []
- out_append = out.append
+ out: List[Tuple[EntryTupOrNone, ...]] = []
# find all matching entries and recursively process them together if the match
# is a tree. If the match is a non-tree item, put it into the result.
# Processed items will be set None
for ti, tree_data in enumerate(trees_data):
+
for ii, item in enumerate(tree_data):
if not item:
continue
# END skip already done items
+ entries: List[EntryTupOrNone]
entries = [None for _ in range(nt)]
entries[ti] = item
_sha, mode, name = item
@@ -170,16 +195,16 @@ def traverse_trees_recursive(odb, tree_shas, path_prefix):
for tio in range(ti + 1, ti + nt):
tio = tio % nt
entries[tio] = _find_by_name(trees_data[tio], name, is_dir, ii)
- # END for each other item data
+ # END for each other item data
# if we are a directory, enter recursion
if is_dir:
out.extend(traverse_trees_recursive(
odb, [((ei and ei[0]) or None) for ei in entries], path_prefix + name + '/'))
else:
- out_append(tuple(_to_full_path(e, path_prefix) for e in entries))
- # END handle recursion
+ out.append(tuple(_to_full_path(e, path_prefix) for e in entries))
+ # END handle recursion
# finally mark it done
tree_data[ii] = None
# END for each item
@@ -190,7 +215,7 @@ def traverse_trees_recursive(odb, tree_shas, path_prefix):
return out
-def traverse_tree_recursive(odb, tree_sha, path_prefix):
+def traverse_tree_recursive(odb: 'GitCmdObjectDB', tree_sha: bytes, path_prefix: str) -> List[EntryTup]:
"""
:return: list of entries of the tree pointed to by the binary tree_sha. An entry
has the following format:
diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py
index c95b66f2..b485dbf6 100644
--- a/git/objects/submodule/base.py
+++ b/git/objects/submodule/base.py
@@ -49,12 +49,13 @@ from .util import (
# typing ----------------------------------------------------------------------
-from typing import Dict, TYPE_CHECKING
+from typing import Callable, Dict, Mapping, Sequence, TYPE_CHECKING, cast
from typing import Any, Iterator, Union
-from git.types import Commit_ish, PathLike
+from git.types import Commit_ish, PathLike, TBD
if TYPE_CHECKING:
+ from git.index import IndexFile
from git.repo import Repo
@@ -114,7 +115,7 @@ class Submodule(IndexObject, TraversableIterableObj):
path: Union[PathLike, None] = None,
name: Union[str, None] = None,
parent_commit: Union[Commit_ish, None] = None,
- url: str = None,
+ url: Union[str, None] = None,
branch_path: Union[PathLike, None] = None
) -> None:
"""Initialize this instance with its attributes. We only document the ones
@@ -131,14 +132,14 @@ class Submodule(IndexObject, TraversableIterableObj):
if url is not None:
self._url = url
if branch_path is not None:
- assert isinstance(branch_path, str)
+ # assert isinstance(branch_path, str)
self._branch_path = branch_path
if name is not None:
self._name = name
def _set_cache_(self, attr: str) -> None:
if attr in ('path', '_url', '_branch_path'):
- reader = self.config_reader()
+ reader: SectionConstraint = self.config_reader()
# default submodule values
try:
self.path = reader.get('path')
@@ -226,7 +227,7 @@ class Submodule(IndexObject, TraversableIterableObj):
return SubmoduleConfigParser(fp_module, read_only=read_only)
- def _clear_cache(self):
+ def _clear_cache(self) -> None:
# clear the possibly changed values
for name in self._cache_attrs:
try:
@@ -246,7 +247,7 @@ class Submodule(IndexObject, TraversableIterableObj):
def _config_parser_constrained(self, read_only: bool) -> SectionConstraint:
""":return: Config Parser constrained to our submodule in read or write mode"""
try:
- pc = self.parent_commit
+ pc: Union['Commit_ish', None] = self.parent_commit
except ValueError:
pc = None
# end handle empty parent repository
@@ -255,10 +256,12 @@ class Submodule(IndexObject, TraversableIterableObj):
return SectionConstraint(parser, sm_section(self.name))
@classmethod
- def _module_abspath(cls, parent_repo, path, name):
+ def _module_abspath(cls, parent_repo: 'Repo', path: PathLike, name: str) -> PathLike:
if cls._need_gitfile_submodules(parent_repo.git):
return osp.join(parent_repo.git_dir, 'modules', name)
- return osp.join(parent_repo.working_tree_dir, path)
+ if parent_repo.working_tree_dir:
+ return osp.join(parent_repo.working_tree_dir, path)
+ raise NotADirectoryError()
# end
@classmethod
@@ -286,7 +289,7 @@ class Submodule(IndexObject, TraversableIterableObj):
return clone
@classmethod
- def _to_relative_path(cls, parent_repo, path):
+ def _to_relative_path(cls, parent_repo: 'Repo', path: PathLike) -> PathLike:
""":return: a path guaranteed to be relative to the given parent - repository
:raise ValueError: if path is not contained in the parent repository's working tree"""
path = to_native_path_linux(path)
@@ -294,7 +297,7 @@ class Submodule(IndexObject, TraversableIterableObj):
path = path[:-1]
# END handle trailing slash
- if osp.isabs(path):
+ if osp.isabs(path) and parent_repo.working_tree_dir:
working_tree_linux = to_native_path_linux(parent_repo.working_tree_dir)
if not path.startswith(working_tree_linux):
raise ValueError("Submodule checkout path '%s' needs to be within the parents repository at '%s'"
@@ -308,7 +311,7 @@ class Submodule(IndexObject, TraversableIterableObj):
return path
@classmethod
- def _write_git_file_and_module_config(cls, working_tree_dir, module_abspath):
+ def _write_git_file_and_module_config(cls, working_tree_dir: PathLike, module_abspath: PathLike) -> None:
"""Writes a .git file containing a(preferably) relative path to the actual git module repository.
It is an error if the module_abspath cannot be made into a relative path, relative to the working_tree_dir
:note: will overwrite existing files !
@@ -335,7 +338,8 @@ class Submodule(IndexObject, TraversableIterableObj):
@classmethod
def add(cls, repo: 'Repo', name: str, path: PathLike, url: Union[str, None] = None,
- branch=None, no_checkout: bool = False, depth=None, env=None, clone_multi_options=None
+ branch: Union[str, None] = None, no_checkout: bool = False, depth: Union[int, None] = None,
+ env: Union[Mapping[str, str], None] = None, clone_multi_options: Union[Sequence[TBD], None] = None
) -> 'Submodule':
"""Add a new submodule to the given repository. This will alter the index
as well as the .gitmodules file, but will not create a new commit.
@@ -391,7 +395,7 @@ class Submodule(IndexObject, TraversableIterableObj):
if sm.exists():
# reretrieve submodule from tree
try:
- sm = repo.head.commit.tree[path]
+ sm = repo.head.commit.tree[str(path)]
sm._name = name
return sm
except KeyError:
@@ -414,12 +418,14 @@ class Submodule(IndexObject, TraversableIterableObj):
# END check url
# END verify urls match
- mrepo = None
+ mrepo: Union[Repo, None] = None
+
if url is None:
if not has_module:
raise ValueError("A URL was not given and a repository did not exist at %s" % path)
# END check url
mrepo = sm.module()
+ # assert isinstance(mrepo, git.Repo)
urls = [r.url for r in mrepo.remotes]
if not urls:
raise ValueError("Didn't find any remote url in repository at %s" % sm.abspath)
@@ -427,7 +433,7 @@ class Submodule(IndexObject, TraversableIterableObj):
url = urls[0]
else:
# clone new repo
- kwargs: Dict[str, Union[bool, int]] = {'n': no_checkout}
+ kwargs: Dict[str, Union[bool, int, Sequence[TBD]]] = {'n': no_checkout}
if not branch_is_default:
kwargs['b'] = br.name
# END setup checkout-branch
@@ -451,6 +457,8 @@ class Submodule(IndexObject, TraversableIterableObj):
# otherwise there is a '-' character in front of the submodule listing
# a38efa84daef914e4de58d1905a500d8d14aaf45 mymodule (v0.9.0-1-ga38efa8)
# -a38efa84daef914e4de58d1905a500d8d14aaf45 submodules/intermediate/one
+ writer: Union[GitConfigParser, SectionConstraint]
+
with sm.repo.config_writer() as writer:
writer.set_value(sm_section(name), 'url', url)
@@ -467,13 +475,15 @@ class Submodule(IndexObject, TraversableIterableObj):
sm._branch_path = br.path
# we deliberately assume that our head matches our index !
- sm.binsha = mrepo.head.commit.binsha
+ sm.binsha = mrepo.head.commit.binsha # type: ignore
index.add([sm], write=True)
return sm
- def update(self, recursive=False, init=True, to_latest_revision=False, progress=None, dry_run=False,
- force=False, keep_going=False, env=None, clone_multi_options=None):
+ def update(self, recursive: bool = False, init: bool = True, to_latest_revision: bool = False,
+ progress: Union['UpdateProgress', None] = None, dry_run: bool = False,
+ force: bool = False, keep_going: bool = False, env: Union[Mapping[str, str], None] = None,
+ clone_multi_options: Union[Sequence[TBD], None] = None):
"""Update the repository of this submodule to point to the checkout
we point at with the binsha of this instance.
@@ -580,6 +590,7 @@ class Submodule(IndexObject, TraversableIterableObj):
if not dry_run:
# see whether we have a valid branch to checkout
try:
+ mrepo = cast('Repo', mrepo)
# find a remote which has our branch - we try to be flexible
remote_branch = find_first_remote_branch(mrepo.remotes, self.branch_name)
local_branch = mkhead(mrepo, self.branch_path)
@@ -640,7 +651,7 @@ class Submodule(IndexObject, TraversableIterableObj):
may_reset = True
if mrepo.head.commit.binsha != self.NULL_BIN_SHA:
base_commit = mrepo.merge_base(mrepo.head.commit, hexsha)
- if len(base_commit) == 0 or base_commit[0].hexsha == hexsha:
+ if len(base_commit) == 0 or (base_commit[0] is not None and base_commit[0].hexsha == hexsha):
if force:
msg = "Will force checkout or reset on local branch that is possibly in the future of"
msg += "the commit it will be checked out to, effectively 'forgetting' new commits"
@@ -807,7 +818,8 @@ class Submodule(IndexObject, TraversableIterableObj):
return self
@unbare_repo
- def remove(self, module=True, force=False, configuration=True, dry_run=False):
+ def remove(self, module: bool = True, force: bool = False,
+ configuration: bool = True, dry_run: bool = False) -> 'Submodule':
"""Remove this submodule from the repository. This will remove our entry
from the .gitmodules file and the entry in the .git / config file.
@@ -861,7 +873,7 @@ class Submodule(IndexObject, TraversableIterableObj):
# TODO: If we run into permission problems, we have a highly inconsistent
# state. Delete the .git folders last, start with the submodules first
mp = self.abspath
- method = None
+ method: Union[None, Callable[[PathLike], None]] = None
if osp.islink(mp):
method = os.remove
elif osp.isdir(mp):
@@ -914,7 +926,7 @@ class Submodule(IndexObject, TraversableIterableObj):
import gc
gc.collect()
try:
- rmtree(wtd)
+ rmtree(str(wtd))
except Exception as ex:
if HIDE_WINDOWS_KNOWN_ERRORS:
raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex
@@ -928,7 +940,7 @@ class Submodule(IndexObject, TraversableIterableObj):
rmtree(git_dir)
except Exception as ex:
if HIDE_WINDOWS_KNOWN_ERRORS:
- raise SkipTest("FIXME: fails with: PermissionError\n %s", ex) from ex
+ raise SkipTest(f"FIXME: fails with: PermissionError\n {ex}") from ex
else:
raise
# end handle separate bare repository
@@ -952,6 +964,8 @@ class Submodule(IndexObject, TraversableIterableObj):
# now git config - need the config intact, otherwise we can't query
# information anymore
+ writer: Union[GitConfigParser, SectionConstraint]
+
with self.repo.config_writer() as writer:
writer.remove_section(sm_section(self.name))
@@ -961,7 +975,7 @@ class Submodule(IndexObject, TraversableIterableObj):
return self
- def set_parent_commit(self, commit: Union[Commit_ish, None], check=True):
+ def set_parent_commit(self, commit: Union[Commit_ish, None], check: bool = True) -> 'Submodule':
"""Set this instance to use the given commit whose tree is supposed to
contain the .gitmodules blob.
@@ -1009,7 +1023,7 @@ class Submodule(IndexObject, TraversableIterableObj):
return self
@unbare_repo
- def config_writer(self, index=None, write=True):
+ def config_writer(self, index: Union['IndexFile', None] = None, write: bool = True) -> SectionConstraint:
""":return: a config writer instance allowing you to read and write the data
belonging to this submodule into the .gitmodules file.
@@ -1030,7 +1044,7 @@ class Submodule(IndexObject, TraversableIterableObj):
return writer
@unbare_repo
- def rename(self, new_name):
+ def rename(self, new_name: str) -> 'Submodule':
"""Rename this submodule
:note: This method takes care of renaming the submodule in various places, such as
@@ -1065,13 +1079,14 @@ class Submodule(IndexObject, TraversableIterableObj):
destination_module_abspath = self._module_abspath(self.repo, self.path, new_name)
source_dir = mod.git_dir
# Let's be sure the submodule name is not so obviously tied to a directory
- if destination_module_abspath.startswith(mod.git_dir):
+ if str(destination_module_abspath).startswith(str(mod.git_dir)):
tmp_dir = self._module_abspath(self.repo, self.path, str(uuid.uuid4()))
os.renames(source_dir, tmp_dir)
source_dir = tmp_dir
# end handle self-containment
os.renames(source_dir, destination_module_abspath)
- self._write_git_file_and_module_config(mod.working_tree_dir, destination_module_abspath)
+ if mod.working_tree_dir:
+ self._write_git_file_and_module_config(mod.working_tree_dir, destination_module_abspath)
# end move separate git repository
return self
@@ -1081,7 +1096,7 @@ class Submodule(IndexObject, TraversableIterableObj):
#{ Query Interface
@unbare_repo
- def module(self):
+ def module(self) -> 'Repo':
""":return: Repo instance initialized from the repository at our submodule path
:raise InvalidGitRepositoryError: if a repository was not available. This could
also mean that it was not yet initialized"""
@@ -1098,7 +1113,7 @@ class Submodule(IndexObject, TraversableIterableObj):
raise InvalidGitRepositoryError("Repository at %r was not yet checked out" % module_checkout_abspath)
# END handle exceptions
- def module_exists(self):
+ def module_exists(self) -> bool:
""":return: True if our module exists and is a valid git repository. See module() method"""
try:
self.module()
@@ -1107,7 +1122,7 @@ class Submodule(IndexObject, TraversableIterableObj):
return False
# END handle exception
- def exists(self):
+ def exists(self) -> bool:
"""
:return: True if the submodule exists, False otherwise. Please note that
a submodule may exist ( in the .gitmodules file) even though its module
@@ -1148,26 +1163,26 @@ class Submodule(IndexObject, TraversableIterableObj):
return mkhead(self.module(), self._branch_path)
@property
- def branch_path(self):
+ def branch_path(self) -> PathLike:
"""
:return: full(relative) path as string to the branch we would checkout
from the remote and track"""
return self._branch_path
@property
- def branch_name(self):
+ def branch_name(self) -> str:
""":return: the name of the branch, which is the shortest possible branch name"""
# use an instance method, for this we create a temporary Head instance
# which uses a repository that is available at least ( it makes no difference )
return git.Head(self.repo, self._branch_path).name
@property
- def url(self):
+ def url(self) -> str:
""":return: The url to the repository which our module - repository refers to"""
return self._url
@property
- def parent_commit(self):
+ def parent_commit(self) -> 'Commit_ish':
""":return: Commit instance with the tree containing the .gitmodules file
:note: will always point to the current head's commit if it was not set explicitly"""
if self._parent_commit is None:
@@ -1175,7 +1190,7 @@ class Submodule(IndexObject, TraversableIterableObj):
return self._parent_commit
@property
- def name(self):
+ def name(self) -> str:
""":return: The name of this submodule. It is used to identify it within the
.gitmodules file.
:note: by default, the name is the path at which to find the submodule, but
diff --git a/git/objects/submodule/root.py b/git/objects/submodule/root.py
index 0af48710..bcac5419 100644
--- a/git/objects/submodule/root.py
+++ b/git/objects/submodule/root.py
@@ -10,6 +10,18 @@ import git
import logging
+# typing -------------------------------------------------------------------
+
+from typing import TYPE_CHECKING, Union
+
+from git.types import Commit_ish
+
+if TYPE_CHECKING:
+ from git.repo import Repo
+ from git.util import IterableList
+
+# ----------------------------------------------------------------------------
+
__all__ = ["RootModule", "RootUpdateProgress"]
log = logging.getLogger('git.objects.submodule.root')
@@ -42,7 +54,7 @@ class RootModule(Submodule):
k_root_name = '__ROOT__'
- def __init__(self, repo):
+ def __init__(self, repo: 'Repo'):
# repo, binsha, mode=None, path=None, name = None, parent_commit=None, url=None, ref=None)
super(RootModule, self).__init__(
repo,
@@ -55,15 +67,17 @@ class RootModule(Submodule):
branch_path=git.Head.to_full_path(self.k_head_default)
)
- def _clear_cache(self):
+ def _clear_cache(self) -> None:
"""May not do anything"""
pass
#{ Interface
- def update(self, previous_commit=None, recursive=True, force_remove=False, init=True,
- to_latest_revision=False, progress=None, dry_run=False, force_reset=False,
- keep_going=False):
+ def update(self, previous_commit: Union[Commit_ish, None] = None, # type: ignore[override]
+ recursive: bool = True, force_remove: bool = False, init: bool = True,
+ to_latest_revision: bool = False, progress: Union[None, 'RootUpdateProgress'] = None,
+ dry_run: bool = False, force_reset: bool = False, keep_going: bool = False
+ ) -> 'RootModule':
"""Update the submodules of this repository to the current HEAD commit.
This method behaves smartly by determining changes of the path of a submodules
repository, next to changes to the to-be-checked-out commit or the branch to be
@@ -128,8 +142,8 @@ class RootModule(Submodule):
previous_commit = repo.commit(previous_commit) # obtain commit object
# END handle previous commit
- psms = self.list_items(repo, parent_commit=previous_commit)
- sms = self.list_items(repo)
+ psms: 'IterableList[Submodule]' = self.list_items(repo, parent_commit=previous_commit)
+ sms: 'IterableList[Submodule]' = self.list_items(repo)
spsms = set(psms)
ssms = set(sms)
@@ -162,8 +176,8 @@ class RootModule(Submodule):
csms = (spsms & ssms)
len_csms = len(csms)
for i, csm in enumerate(csms):
- psm = psms[csm.name]
- sm = sms[csm.name]
+ psm: 'Submodule' = psms[csm.name]
+ sm: 'Submodule' = sms[csm.name]
# PATH CHANGES
##############
@@ -343,7 +357,7 @@ class RootModule(Submodule):
return self
- def module(self):
+ def module(self) -> 'Repo':
""":return: the actual repository containing the submodules"""
return self.repo
#} END interface
diff --git a/git/objects/submodule/util.py b/git/objects/submodule/util.py
index 5290000b..a776af88 100644
--- a/git/objects/submodule/util.py
+++ b/git/objects/submodule/util.py
@@ -5,11 +5,20 @@ from io import BytesIO
import weakref
-from typing import Any, TYPE_CHECKING, Union
+# typing -----------------------------------------------------------------------
+
+from typing import Any, Sequence, TYPE_CHECKING, Union
+
+from git.types import PathLike
if TYPE_CHECKING:
from .base import Submodule
from weakref import ReferenceType
+ from git.repo import Repo
+ from git.refs import Head
+ from git import Remote
+ from git.refs import RemoteReference
+
__all__ = ('sm_section', 'sm_name', 'mkhead', 'find_first_remote_branch',
'SubmoduleConfigParser')
@@ -17,23 +26,23 @@ __all__ = ('sm_section', 'sm_name', 'mkhead', 'find_first_remote_branch',
#{ Utilities
-def sm_section(name):
+def sm_section(name: str) -> str:
""":return: section title used in .gitmodules configuration file"""
- return 'submodule "%s"' % name
+ return f'submodule "{name}"'
-def sm_name(section):
+def sm_name(section: str) -> str:
""":return: name of the submodule as parsed from the section name"""
section = section.strip()
return section[11:-1]
-def mkhead(repo, path):
+def mkhead(repo: 'Repo', path: PathLike) -> 'Head':
""":return: New branch/head instance"""
return git.Head(repo, git.Head.to_full_path(path))
-def find_first_remote_branch(remotes, branch_name):
+def find_first_remote_branch(remotes: Sequence['Remote'], branch_name: str) -> 'RemoteReference':
"""Find the remote branch matching the name of the given branch or raise InvalidGitRepositoryError"""
for remote in remotes:
try:
@@ -92,7 +101,7 @@ class SubmoduleConfigParser(GitConfigParser):
#{ Overridden Methods
def write(self) -> None:
- rval = super(SubmoduleConfigParser, self).write()
+ rval: None = super(SubmoduleConfigParser, self).write()
self.flush_to_index()
return rval
# END overridden methods
diff --git a/git/objects/tree.py b/git/objects/tree.py
index 2e8d8a79..a9656c1d 100644
--- a/git/objects/tree.py
+++ b/git/objects/tree.py
@@ -4,8 +4,8 @@
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-from git.util import join_path
-import git.diff as diff
+from git.util import IterableList, join_path
+import git.diff as git_diff
from git.util import to_bin_sha
from . import util
@@ -21,8 +21,8 @@ from .fun import (
# typing -------------------------------------------------
-from typing import (Callable, Dict, Generic, Iterable, Iterator, List,
- Tuple, Type, TypeVar, Union, cast, TYPE_CHECKING)
+from typing import (Any, Callable, Dict, Iterable, Iterator, List,
+ Tuple, Type, Union, cast, TYPE_CHECKING)
from git.types import PathLike, TypeGuard
@@ -30,10 +30,15 @@ if TYPE_CHECKING:
from git.repo import Repo
from io import BytesIO
-T_Tree_cache = TypeVar('T_Tree_cache', bound=Tuple[bytes, int, str])
+TreeCacheTup = Tuple[bytes, int, str]
+
TraversedTreeTup = Union[Tuple[Union['Tree', None], IndexObjUnion,
Tuple['Submodule', 'Submodule']]]
+
+def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[TreeCacheTup]:
+ return isinstance(inp[0], bytes) and isinstance(inp[1], int) and isinstance([inp], str)
+
#--------------------------------------------------------
@@ -42,9 +47,9 @@ cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b)
__all__ = ("TreeModifier", "Tree")
-def git_cmp(t1: T_Tree_cache, t2: T_Tree_cache) -> int:
+def git_cmp(t1: TreeCacheTup, t2: TreeCacheTup) -> int:
a, b = t1[2], t2[2]
- assert isinstance(a, str) and isinstance(b, str) # Need as mypy 9.0 cannot unpack TypeVar properly
+ # assert isinstance(a, str) and isinstance(b, str)
len_a, len_b = len(a), len(b)
min_len = min(len_a, len_b)
min_cmp = cmp(a[:min_len], b[:min_len])
@@ -55,8 +60,8 @@ def git_cmp(t1: T_Tree_cache, t2: T_Tree_cache) -> int:
return len_a - len_b
-def merge_sort(a: List[T_Tree_cache],
- cmp: Callable[[T_Tree_cache, T_Tree_cache], int]) -> None:
+def merge_sort(a: List[TreeCacheTup],
+ cmp: Callable[[TreeCacheTup, TreeCacheTup], int]) -> None:
if len(a) < 2:
return None
@@ -91,7 +96,7 @@ def merge_sort(a: List[T_Tree_cache],
k = k + 1
-class TreeModifier(Generic[T_Tree_cache], object):
+class TreeModifier(object):
"""A utility class providing methods to alter the underlying cache in a list-like fashion.
@@ -99,7 +104,7 @@ class TreeModifier(Generic[T_Tree_cache], object):
the cache of a tree, will be sorted. Assuring it will be in a serializable state"""
__slots__ = '_cache'
- def __init__(self, cache: List[T_Tree_cache]) -> None:
+ def __init__(self, cache: List[TreeCacheTup]) -> None:
self._cache = cache
def _index_by_name(self, name: str) -> int:
@@ -141,11 +146,8 @@ class TreeModifier(Generic[T_Tree_cache], object):
sha = to_bin_sha(sha)
index = self._index_by_name(name)
- def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[T_Tree_cache]:
- return isinstance(inp[0], bytes) and isinstance(inp[1], int) and isinstance([inp], str)
-
item = (sha, mode, name)
- assert is_tree_cache(item)
+ # assert is_tree_cache(item)
if index == -1:
self._cache.append(item)
@@ -167,7 +169,7 @@ class TreeModifier(Generic[T_Tree_cache], object):
For more information on the parameters, see ``add``
:param binsha: 20 byte binary sha"""
assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str)
- tree_cache = cast(T_Tree_cache, (binsha, mode, name))
+ tree_cache = (binsha, mode, name)
self._cache.append(tree_cache)
@@ -180,7 +182,7 @@ class TreeModifier(Generic[T_Tree_cache], object):
#} END mutators
-class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
+class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
"""Tree objects represent an ordered list of Blobs and other Trees.
@@ -216,7 +218,6 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
def _get_intermediate_items(cls, index_object: 'Tree',
) -> Union[Tuple['Tree', ...], Tuple[()]]:
if index_object.type == "tree":
- index_object = cast('Tree', index_object)
return tuple(index_object._iter_convert_to_object(index_object._cache))
return ()
@@ -224,12 +225,12 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
if attr == "_cache":
# Set the data when we need it
ostream = self.repo.odb.stream(self.binsha)
- self._cache: List[Tuple[bytes, int, str]] = tree_entries_from_data(ostream.read())
+ self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read())
else:
super(Tree, self)._set_cache_(attr)
# END handle attribute
- def _iter_convert_to_object(self, iterable: Iterable[Tuple[bytes, int, str]]
+ def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]
) -> Iterator[IndexObjUnion]:
"""Iterable yields tuples of (binsha, mode, name), which will be converted
to the respective object representation"""
@@ -324,6 +325,14 @@ class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable):
super(Tree, self).traverse(predicate, prune, depth, # type: ignore
branch_first, visit_once, ignore_self))
+ def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]:
+ """
+ :return: IterableList with the results of the traversal as produced by
+ traverse()
+ Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']]
+ """
+ return super(Tree, self).list_traverse(* args, **kwargs)
+
# List protocol
def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]:
diff --git a/git/objects/util.py b/git/objects/util.py
index 0b449b7b..fbe3d9de 100644
--- a/git/objects/util.py
+++ b/git/objects/util.py
@@ -23,7 +23,7 @@ from datetime import datetime, timedelta, tzinfo
from typing import (Any, Callable, Deque, Iterator, NamedTuple, overload, Sequence,
TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast)
-from git.types import Literal, TypeGuard
+from git.types import Has_id_attribute, Literal
if TYPE_CHECKING:
from io import BytesIO, StringIO
@@ -32,8 +32,15 @@ if TYPE_CHECKING:
from .tag import TagObject
from .tree import Tree, TraversedTreeTup
from subprocess import Popen
+ from .submodule.base import Submodule
+
+
+class TraverseNT(NamedTuple):
+ depth: int
+ item: Union['Traversable', 'Blob']
+ src: Union['Traversable', None]
+
-
T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse()
TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule
@@ -306,20 +313,29 @@ class Traversable(object):
"""
raise NotImplementedError("To be implemented in subclass")
- def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList['TraversableIterableObj']:
+ def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']]:
"""
:return: IterableList with the results of the traversal as produced by
traverse()
- List objects must be IterableObj and Traversable e.g. Commit, Submodule"""
-
- def is_TraversableIterableObj(inp: 'Traversable') -> TypeGuard['TraversableIterableObj']:
- # return isinstance(self, TraversableIterableObj)
- # Can it be anythin else?
- return isinstance(self, Traversable)
-
- assert is_TraversableIterableObj(self), f"{type(self)}"
- out: IterableList['TraversableIterableObj'] = IterableList(self._id_attribute_)
- out.extend(self.traverse(*args, **kwargs))
+ Commit -> IterableList['Commit']
+ Submodule -> IterableList['Submodule']
+ Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']]
+ """
+ # Commit and Submodule have id.__attribute__ as IterableObj
+ # Tree has id.__attribute__ inherited from IndexObject
+ if isinstance(self, (TraversableIterableObj, Has_id_attribute)):
+ id = self._id_attribute_
+ else:
+ id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_
+ # could add _id_attribute_ to Traversable, or make all Traversable also Iterable?
+
+ out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id)
+ # overloads in subclasses (mypy does't allow typing self: subclass)
+ # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]]
+
+ # NOTE: if is_edge=True, self.traverse returns a Tuple, so should be prevented or flattened?
+ kwargs['as_edge'] = False
+ out.extend(self.traverse(*args, **kwargs)) # type: ignore
return out
def traverse(self,
@@ -364,15 +380,11 @@ class Traversable(object):
Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]]
Tree -> Iterator[Union[Blob, Tree, Submodule,
Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]]
-
+
ignore_self=True is_edge=True -> Iterator[item]
ignore_self=True is_edge=False --> Iterator[item]
ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]]
ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]"""
- class TraverseNT(NamedTuple):
- depth: int
- item: Union['Traversable', 'Blob']
- src: Union['Traversable', None]
visited = set()
stack = deque() # type: Deque[TraverseNT]
@@ -447,7 +459,10 @@ class TraversableIterableObj(Traversable, IterableObj):
TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj]
- @overload # type: ignore
+ def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]: # type: ignore[override]
+ return super(TraversableIterableObj, self).list_traverse(* args, **kwargs)
+
+ @ overload # type: ignore
def traverse(self: T_TIobj,
predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
@@ -457,7 +472,7 @@ class TraversableIterableObj(Traversable, IterableObj):
) -> Iterator[T_TIobj]:
...
- @overload
+ @ overload
def traverse(self: T_TIobj,
predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
@@ -467,7 +482,7 @@ class TraversableIterableObj(Traversable, IterableObj):
) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]:
...
- @overload
+ @ overload
def traverse(self: T_TIobj,
predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
diff --git a/git/refs/head.py b/git/refs/head.py
index c698004d..97c8e6a1 100644
--- a/git/refs/head.py
+++ b/git/refs/head.py
@@ -5,9 +5,13 @@ from git.exc import GitCommandError
from .symbolic import SymbolicReference
from .reference import Reference
-from typing import Union
+from typing import Union, TYPE_CHECKING
+
from git.types import Commit_ish
+if TYPE_CHECKING:
+ from git.repo import Repo
+
__all__ = ["HEAD", "Head"]
@@ -25,12 +29,13 @@ class HEAD(SymbolicReference):
_ORIG_HEAD_NAME = 'ORIG_HEAD'
__slots__ = ()
- def __init__(self, repo, path=_HEAD_NAME):
+ def __init__(self, repo: 'Repo', path=_HEAD_NAME):
if path != self._HEAD_NAME:
raise ValueError("HEAD instance must point to %r, got %r" % (self._HEAD_NAME, path))
super(HEAD, self).__init__(repo, path)
+ self.commit: 'Commit_ish'
- def orig_head(self):
+ def orig_head(self) -> 'SymbolicReference':
"""
:return: SymbolicReference pointing at the ORIG_HEAD, which is maintained
to contain the previous value of HEAD"""
diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py
index ca0691d9..f0bd9316 100644
--- a/git/refs/symbolic.py
+++ b/git/refs/symbolic.py
@@ -1,3 +1,4 @@
+from git.types import PathLike
import os
from git.compat import defenc
@@ -408,7 +409,7 @@ class SymbolicReference(object):
return RefLog.entry_at(RefLog.path(self), index)
@classmethod
- def to_full_path(cls, path):
+ def to_full_path(cls, path) -> PathLike:
"""
:return: string with a full repository-relative path which can be used to initialize
a Reference instance, for instance by using ``Reference.from_path``"""
diff --git a/git/remote.py b/git/remote.py
index 0ef54ea7..f59b3245 100644
--- a/git/remote.py
+++ b/git/remote.py
@@ -42,6 +42,7 @@ from git.types import PathLike, Literal, TBD, TypeGuard, Commit_ish
if TYPE_CHECKING:
from git.repo.base import Repo
+ from git.objects.submodule.base import UpdateProgress
# from git.objects.commit import Commit
# from git.objects import Blob, Tree, TagObject
@@ -64,7 +65,9 @@ __all__ = ('RemoteProgress', 'PushInfo', 'FetchInfo', 'Remote')
#{ Utilities
-def add_progress(kwargs: Any, git: Git, progress: Union[Callable[..., Any], None]) -> Any:
+def add_progress(kwargs: Any, git: Git,
+ progress: Union[RemoteProgress, 'UpdateProgress', Callable[..., RemoteProgress], None]
+ ) -> Any:
"""Add the --progress flag to the given kwargs dict if supported by the
git command. If the actual progress in the given progress instance is not
given, we do not request any progress
@@ -446,8 +449,9 @@ class Remote(LazyMixin, IterableObj):
:param repo: The repository we are a remote of
:param name: the name of the remote, i.e. 'origin'"""
- self.repo = repo # type: 'Repo'
+ self.repo = repo
self.name = name
+ self.url: str
def __getattr__(self, attr: str) -> Any:
"""Allows to call this instance like
@@ -466,7 +470,7 @@ class Remote(LazyMixin, IterableObj):
def _config_section_name(self) -> str:
return 'remote "%s"' % self.name
- def _set_cache_(self, attr: str) -> Any:
+ def _set_cache_(self, attr: str) -> None:
if attr == "_config_reader":
# NOTE: This is cached as __getattr__ is overridden to return remote config values implicitly, such as
# in print(r.pushurl)
@@ -555,7 +559,7 @@ class Remote(LazyMixin, IterableObj):
"""
return self.set_url(url, delete=True)
- @property
+ @ property
def urls(self) -> Iterator[str]:
""":return: Iterator yielding all configured URL targets on a remote as strings"""
try:
@@ -588,7 +592,7 @@ class Remote(LazyMixin, IterableObj):
else:
raise ex
- @property
+ @ property
def refs(self) -> IterableList[RemoteReference]:
"""
:return:
@@ -599,7 +603,7 @@ class Remote(LazyMixin, IterableObj):
out_refs.extend(RemoteReference.list_items(self.repo, remote=self.name))
return out_refs
- @property
+ @ property
def stale_refs(self) -> IterableList[Reference]:
"""
:return:
@@ -633,7 +637,7 @@ class Remote(LazyMixin, IterableObj):
# END for each line
return out_refs
- @classmethod
+ @ classmethod
def create(cls, repo: 'Repo', name: str, url: str, **kwargs: Any) -> 'Remote':
"""Create a new remote to the given repository
:param repo: Repository instance that is to receive the new remote
@@ -650,7 +654,7 @@ class Remote(LazyMixin, IterableObj):
# add is an alias
add = create
- @classmethod
+ @ classmethod
def remove(cls, repo: 'Repo', name: str) -> str:
"""Remove the remote with the given name
:return: the passed remote name to remove
@@ -794,7 +798,7 @@ class Remote(LazyMixin, IterableObj):
config.release()
def fetch(self, refspec: Union[str, List[str], None] = None,
- progress: Union[Callable[..., Any], None] = None,
+ progress: Union[RemoteProgress, None, 'UpdateProgress'] = None,
verbose: bool = True, **kwargs: Any) -> IterableList[FetchInfo]:
"""Fetch the latest changes for this remote
@@ -841,7 +845,7 @@ class Remote(LazyMixin, IterableObj):
return res
def pull(self, refspec: Union[str, List[str], None] = None,
- progress: Union[Callable[..., Any], None] = None,
+ progress: Union[RemoteProgress, 'UpdateProgress', None] = None,
**kwargs: Any) -> IterableList[FetchInfo]:
"""Pull changes from the given branch, being the same as a fetch followed
by a merge of branch with your local branch.
@@ -862,7 +866,7 @@ class Remote(LazyMixin, IterableObj):
return res
def push(self, refspec: Union[str, List[str], None] = None,
- progress: Union[Callable[..., Any], None] = None,
+ progress: Union[RemoteProgress, 'UpdateProgress', Callable[..., RemoteProgress], None] = None,
**kwargs: Any) -> IterableList[PushInfo]:
"""Push changes from source branch in refspec to target branch in refspec.
diff --git a/git/repo/base.py b/git/repo/base.py
index d77b19c1..3214b528 100644
--- a/git/repo/base.py
+++ b/git/repo/base.py
@@ -36,16 +36,20 @@ import gitdb
# typing ------------------------------------------------------
-from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish
+from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish, is_config_level
from typing import (Any, BinaryIO, Callable, Dict,
Iterator, List, Mapping, Optional, Sequence,
TextIO, Tuple, Type, Union,
NamedTuple, cast, TYPE_CHECKING)
-if TYPE_CHECKING: # only needed for types
+from git.types import ConfigLevels_Tup
+
+if TYPE_CHECKING:
from git.util import IterableList
from git.refs.symbolic import SymbolicReference
from git.objects import Tree
+ from git.objects.submodule.base import UpdateProgress
+ from git.remote import RemoteProgress
# -----------------------------------------------------------
@@ -55,12 +59,11 @@ log = logging.getLogger(__name__)
__all__ = ('Repo',)
-BlameEntry = NamedTuple('BlameEntry', [
- ('commit', Dict[str, TBD]),
- ('linenos', range),
- ('orig_path', Optional[str]),
- ('orig_linenos', range)]
-)
+class BlameEntry(NamedTuple):
+ commit: Dict[str, 'Commit']
+ linenos: range
+ orig_path: Optional[str]
+ orig_linenos: range
class Repo(object):
@@ -95,7 +98,7 @@ class Repo(object):
# invariants
# represents the configuration level of a configuration file
- config_level = ("system", "user", "global", "repository") # type: Tuple[Lit_config_levels, ...]
+ config_level: ConfigLevels_Tup = ("system", "user", "global", "repository")
# Subclass configuration
# Subclasses may easily bring in their own custom types by placing a constructor or type here
@@ -495,7 +498,7 @@ class Repo(object):
unknown, instead the global path will be used."""
files = None
if config_level is None:
- files = [self._get_config_path(f) for f in self.config_level]
+ files = [self._get_config_path(f) for f in self.config_level if is_config_level(f)]
else:
files = [self._get_config_path(config_level)]
return GitConfigParser(files, read_only=True, repo=self)
@@ -574,7 +577,7 @@ class Repo(object):
return Commit.iter_items(self, rev, paths, **kwargs)
def merge_base(self, *rev: TBD, **kwargs: Any
- ) -> List[Union['SymbolicReference', Commit_ish, None]]:
+ ) -> List[Union[Commit_ish, None]]:
"""Find the closest common ancestor for the given revision (e.g. Commits, Tags, References, etc)
:param rev: At least two revs to find the common ancestor for.
@@ -587,7 +590,7 @@ class Repo(object):
raise ValueError("Please specify at least two revs, got only %i" % len(rev))
# end handle input
- res = [] # type: List[Union['SymbolicReference', Commit_ish, None]]
+ res = [] # type: List[Union[Commit_ish, None]]
try:
lines = self.git.merge_base(*rev, **kwargs).splitlines() # List[str]
except GitCommandError as err:
@@ -620,7 +623,7 @@ class Repo(object):
raise
return True
- def is_valid_object(self, sha: str, object_type: str = None) -> bool:
+ def is_valid_object(self, sha: str, object_type: Union[str, None] = None) -> bool:
try:
complete_sha = self.odb.partial_to_complete_sha_hex(sha)
object_info = self.odb.info(complete_sha)
@@ -801,7 +804,7 @@ class Repo(object):
should get a continuous range spanning all line numbers in the file.
"""
data = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs)
- commits = {} # type: Dict[str, TBD]
+ commits: Dict[str, Commit] = {}
stream = (line for line in data.split(b'\n') if line)
while True:
@@ -973,7 +976,7 @@ class Repo(object):
return blames
@classmethod
- def init(cls, path: PathLike = None, mkdir: bool = True, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB,
+ def init(cls, path: Union[PathLike, None] = None, mkdir: bool = True, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB,
expand_vars: bool = True, **kwargs: Any) -> 'Repo':
"""Initialize a git repository at the given path if specified
@@ -1013,7 +1016,8 @@ class Repo(object):
@classmethod
def _clone(cls, git: 'Git', url: PathLike, path: PathLike, odb_default_type: Type[GitCmdObjectDB],
- progress: Optional[Callable], multi_options: Optional[List[str]] = None, **kwargs: Any
+ progress: Union['RemoteProgress', 'UpdateProgress', Callable[..., 'RemoteProgress'], None] = None,
+ multi_options: Optional[List[str]] = None, **kwargs: Any
) -> 'Repo':
odbt = kwargs.pop('odbt', odb_default_type)
diff --git a/git/types.py b/git/types.py
index fb63f46e..9181e040 100644
--- a/git/types.py
+++ b/git/types.py
@@ -4,13 +4,16 @@
import os
import sys
-from typing import Dict, Union, Any, TYPE_CHECKING
+from typing import (Callable, Dict, NoReturn, Sequence, Tuple, Union, Any, Iterator, # noqa: F401
+ NamedTuple, TYPE_CHECKING, TypeVar) # noqa: F401
+if TYPE_CHECKING:
+ from git.repo import Repo
if sys.version_info[:2] >= (3, 8):
- from typing import Final, Literal, SupportsIndex, TypedDict, Protocol # noqa: F401
+ from typing import Final, Literal, SupportsIndex, TypedDict, Protocol, runtime_checkable # noqa: F401
else:
- from typing_extensions import Final, Literal, SupportsIndex, TypedDict, Protocol # noqa: F401
+ from typing_extensions import Final, Literal, SupportsIndex, TypedDict, Protocol, runtime_checkable # noqa: F401
if sys.version_info[:2] >= (3, 10):
from typing import TypeGuard # noqa: F401
@@ -29,13 +32,44 @@ if TYPE_CHECKING:
# from git.refs import SymbolicReference
TBD = Any
+_T = TypeVar('_T')
Tree_ish = Union['Commit', 'Tree']
Commit_ish = Union['Commit', 'TagObject', 'Blob', 'Tree']
+# Config_levels ---------------------------------------------------------
+
Lit_config_levels = Literal['system', 'global', 'user', 'repository']
+def is_config_level(inp: str) -> TypeGuard[Lit_config_levels]:
+ # return inp in get_args(Lit_config_level) # only py >= 3.8
+ return inp in ("system", "user", "global", "repository")
+
+
+ConfigLevels_Tup = Tuple[Literal['system'], Literal['user'], Literal['global'], Literal['repository']]
+
+#-----------------------------------------------------------------------------------
+
+
+def assert_never(inp: NoReturn, raise_error: bool = True, exc: Union[Exception, None] = None) -> None:
+ """For use in exhaustive checking of literal or Enum in if/else chain.
+ Should only be reached if all memebers not handled OR attempt to pass non-members through chain.
+
+ If all members handled, type is Empty. Otherwise, will cause mypy error.
+ If non-members given, should cause mypy error at variable creation.
+
+ If raise_error is True, will also raise AssertionError or the Exception passed to exc.
+ """
+ if raise_error:
+ if exc is None:
+ raise ValueError(f"An unhandled Literal ({inp}) in an if/else chain was found")
+ else:
+ raise exc
+ else:
+ pass
+
+
class Files_TD(TypedDict):
insertions: int
deletions: int
@@ -52,3 +86,13 @@ class Total_TD(TypedDict):
class HSH_TD(TypedDict):
total: Total_TD
files: Dict[PathLike, Files_TD]
+
+
+@runtime_checkable
+class Has_Repo(Protocol):
+ repo: 'Repo'
+
+
+@runtime_checkable
+class Has_id_attribute(Protocol):
+ _id_attribute_: str
diff --git a/git/util.py b/git/util.py
index abc82bd3..571e261e 100644
--- a/git/util.py
+++ b/git/util.py
@@ -36,11 +36,14 @@ if TYPE_CHECKING:
from git.remote import Remote
from git.repo.base import Repo
from git.config import GitConfigParser, SectionConstraint
+ # from git.objects.base import IndexObject
-from .types import (Literal, Protocol, SupportsIndex, # because behind py version guards
- PathLike, HSH_TD, Total_TD, Files_TD) # aliases
-T_IterableObj = TypeVar('T_IterableObj', bound='IterableObj', covariant=True)
+from .types import (Literal, SupportsIndex, # because behind py version guards
+ PathLike, HSH_TD, Total_TD, Files_TD, # aliases
+ Has_id_attribute)
+
+T_IterableObj = TypeVar('T_IterableObj', bound=Union['IterableObj', 'Has_id_attribute'], covariant=True)
# So IterableList[Head] is subtype of IterableList[IterableObj]
# ---------------------------------------------------------------------
@@ -80,15 +83,17 @@ log = logging.getLogger(__name__)
HIDE_WINDOWS_KNOWN_ERRORS = is_win and os.environ.get('HIDE_WINDOWS_KNOWN_ERRORS', True)
HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get('HIDE_WINDOWS_FREEZE_ERRORS', True)
-#{ Utility Methods
+# { Utility Methods
+
+T = TypeVar('T')
-def unbare_repo(func: Callable) -> Callable:
+def unbare_repo(func: Callable[..., T]) -> Callable[..., T]:
"""Methods with this decorator raise InvalidGitRepositoryError if they
encounter a bare repository"""
@wraps(func)
- def wrapper(self: 'Remote', *args: Any, **kwargs: Any) -> Callable:
+ def wrapper(self: 'Remote', *args: Any, **kwargs: Any) -> T:
if self.repo.bare:
raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__)
# END bare method
@@ -243,7 +248,7 @@ def py_where(program: str, path: Optional[PathLike] = None) -> List[str]:
def _cygexpath(drive: Optional[str], path: str) -> str:
if osp.isabs(path) and not drive:
- ## Invoked from `cygpath()` directly with `D:Apps\123`?
+ # Invoked from `cygpath()` directly with `D:Apps\123`?
# It's an error, leave it alone just slashes)
p = path # convert to str if AnyPath given
else:
@@ -261,8 +266,8 @@ def _cygexpath(drive: Optional[str], path: str) -> str:
_cygpath_parsers = (
- ## See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx
- ## and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths
+ # See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx
+ # and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths
(re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"),
(lambda server, share, rest_path: '//%s/%s/%s' % (server, share, rest_path.replace('\\', '/'))),
False
@@ -293,7 +298,7 @@ _cygpath_parsers = (
def cygpath(path: str) -> str:
"""Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment."""
path = str(path) # ensure is str and not AnyPath.
- #Fix to use Paths when 3.5 dropped. or to be just str if only for urls?
+ # Fix to use Paths when 3.5 dropped. or to be just str if only for urls?
if not path.startswith(('/cygdrive', '//')):
for regex, parser, recurse in _cygpath_parsers:
match = regex.match(path)
@@ -353,7 +358,7 @@ def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool:
res = py_where(git_executable)
git_dir = osp.dirname(res[0]) if res else ""
- ## Just a name given, not a real path.
+ # Just a name given, not a real path.
uname_cmd = osp.join(git_dir, 'uname')
process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE,
universal_newlines=True)
@@ -374,7 +379,7 @@ def get_user_id() -> str:
def finalize_process(proc: subprocess.Popen, **kwargs: Any) -> None:
"""Wait for the process (clone, fetch, pull or push) and handle its errors accordingly"""
- ## TODO: No close proc-streams??
+ # TODO: No close proc-streams??
proc.wait(**kwargs)
@@ -428,9 +433,9 @@ def remove_password_if_present(cmdline):
return new_cmdline
-#} END utilities
+# } END utilities
-#{ Classes
+# { Classes
class RemoteProgress(object):
@@ -980,7 +985,7 @@ class IterableList(List[T_IterableObj]):
return False
# END handle membership
- def __getattr__(self, attr: str) -> Any:
+ def __getattr__(self, attr: str) -> T_IterableObj:
attr = self._prefix + attr
for item in self:
if getattr(item, self._id_attr) == attr:
@@ -988,7 +993,7 @@ class IterableList(List[T_IterableObj]):
# END for each item
return list.__getattribute__(self, attr)
- def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> Any:
+ def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> 'T_IterableObj': # type: ignore
assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str"
@@ -1003,7 +1008,7 @@ class IterableList(List[T_IterableObj]):
raise IndexError("No item found with id %r" % (self._prefix + index)) from e
# END handle getattr
- def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> Any:
+ def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None:
assert isinstance(index, (int, str)), "Index of IterableList should be an int or str"
@@ -1066,7 +1071,7 @@ class Iterable(object):
raise NotImplementedError("To be implemented by Subclass")
-class IterableObj(Protocol):
+class IterableObj():
"""Defines an interface for iterable items which is to assure a uniform
way to retrieve and iterate items within the git repository
@@ -1097,7 +1102,7 @@ class IterableObj(Protocol):
:return: iterator yielding Items"""
raise NotImplementedError("To be implemented by Subclass")
-#} END classes
+# } END classes
class NullHandler(logging.Handler):
diff --git a/mypy.ini b/mypy.ini
index 8f86a6af..67397d40 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -3,6 +3,11 @@
# TODO: enable when we've fully annotated everything
# disallow_untyped_defs = True
+no_implicit_optional = True
+warn_redundant_casts = True
+# warn_unused_ignores = True
+# warn_unreachable = True
+pretty = True
# TODO: remove when 'gitdb' is fully annotated
[mypy-gitdb.*]