diff options
author | Dominic <yobmod@gmail.com> | 2021-08-03 16:40:48 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-03 16:40:48 +0100 |
commit | fe54118ec07a68d5dc6f6108510cffc55dfca643 (patch) | |
tree | 3025974ca54ef607ee3d4660da4dc242e184f8ea /git/repo/base.py | |
parent | d8a639865d02a6bb3f93a233d3caa928d18bc622 (diff) | |
parent | 84232f7c71e41e56636f203eb26763a03ab6e945 (diff) | |
download | gitpython-fe54118ec07a68d5dc6f6108510cffc55dfca643.tar.gz |
Merge pull request #1311 from Yobmod/main
Drop 3.6, increase type strictness
Diffstat (limited to 'git/repo/base.py')
-rw-r--r-- | git/repo/base.py | 138 |
1 files changed, 82 insertions, 56 deletions
diff --git a/git/repo/base.py b/git/repo/base.py index 5581233b..c0229a84 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -3,6 +3,7 @@ # # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php +from __future__ import annotations import logging import os import re @@ -37,13 +38,13 @@ import gitdb # typing ------------------------------------------------------ -from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish +from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish, assert_never from typing import (Any, BinaryIO, Callable, Dict, Iterator, List, Mapping, Optional, Sequence, TextIO, Tuple, Type, Union, NamedTuple, cast, TYPE_CHECKING) -from git.types import ConfigLevels_Tup +from git.types import ConfigLevels_Tup, TypedDict if TYPE_CHECKING: from git.util import IterableList @@ -52,7 +53,6 @@ if TYPE_CHECKING: from git.objects.submodule.base import UpdateProgress from git.remote import RemoteProgress - # ----------------------------------------------------------- log = logging.getLogger(__name__) @@ -200,7 +200,6 @@ class Repo(object): # END while curpath if self.git_dir is None: - self.git_dir = cast(PathLike, self.git_dir) raise InvalidGitRepositoryError(epath) self._bare = False @@ -235,7 +234,7 @@ class Repo(object): def __enter__(self) -> 'Repo': return self - def __exit__(self, exc_type: TBD, exc_value: TBD, traceback: TBD) -> None: + def __exit__(self, *args: Any) -> None: self.close() def __del__(self) -> None: @@ -385,13 +384,13 @@ class Repo(object): :return: created submodules""" return Submodule.add(self, *args, **kwargs) - def iter_submodules(self, *args: Any, **kwargs: Any) -> Iterator: + def iter_submodules(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]: """An iterator yielding Submodule instances, see Traversable interface for a description of args and kwargs :return: Iterator""" return RootModule(self).traverse(*args, **kwargs) - def submodule_update(self, *args: Any, **kwargs: Any) -> Iterator: + def submodule_update(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]: """Update the submodules, keeping the repository consistent as it will take the previous state into consideration. For more information, please see the documentation of RootModule.update""" @@ -445,7 +444,7 @@ class Repo(object): :return: TagReference object """ return TagReference.create(self, path, ref, message, force, **kwargs) - def delete_tag(self, *tags: TBD) -> None: + def delete_tag(self, *tags: TagReference) -> None: """Delete the given tag references""" return TagReference.delete(self, *tags) @@ -481,10 +480,12 @@ class Repo(object): raise NotADirectoryError else: return osp.normpath(osp.join(repo_dir, "config")) + else: - raise ValueError("Invalid configuration level: %r" % config_level) + assert_never(config_level, # type:ignore[unreachable] + ValueError(f"Invalid configuration level: {config_level!r}")) - def config_reader(self, config_level: Optional[Lit_config_levels] = None + def config_reader(self, config_level: Optional[Lit_config_levels] = None, ) -> GitConfigParser: """ :return: @@ -775,7 +776,7 @@ class Repo(object): finalize_process(proc) return untracked_files - def ignored(self, *paths: PathLike) -> List[PathLike]: + def ignored(self, *paths: PathLike) -> List[str]: """Checks if paths are ignored via .gitignore Doing so using the "git check-ignore" method. @@ -783,7 +784,7 @@ class Repo(object): :return: subset of those paths which are ignored """ try: - proc = self.git.check_ignore(*paths) + proc: str = self.git.check_ignore(*paths) except GitCommandError: return [] return proc.replace("\\\\", "\\").replace('"', "").split("\n") @@ -795,7 +796,7 @@ class Repo(object): # reveal_type(self.head.reference) # => Reference return self.head.reference - def blame_incremental(self, rev: TBD, file: TBD, **kwargs: Any) -> Optional[Iterator['BlameEntry']]: + def blame_incremental(self, rev: str | HEAD, file: str, **kwargs: Any) -> Iterator['BlameEntry']: """Iterator for blame information for the given file at the given revision. Unlike .blame(), this does not return the actual file's contents, only @@ -809,8 +810,9 @@ class Repo(object): If you combine all line number ranges outputted by this command, you should get a continuous range spanning all line numbers in the file. """ - data = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs) - commits: Dict[str, Commit] = {} + + data: bytes = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs) + commits: Dict[bytes, Commit] = {} stream = (line for line in data.split(b'\n') if line) while True: @@ -818,15 +820,15 @@ class Repo(object): line = next(stream) # when exhausted, causes a StopIteration, terminating this function except StopIteration: return - split_line: Tuple[str, str, str, str] = line.split() - hexsha, orig_lineno_str, lineno_str, num_lines_str = split_line - lineno = int(lineno_str) - num_lines = int(num_lines_str) - orig_lineno = int(orig_lineno_str) + split_line = line.split() + hexsha, orig_lineno_b, lineno_b, num_lines_b = split_line + lineno = int(lineno_b) + num_lines = int(num_lines_b) + orig_lineno = int(orig_lineno_b) if hexsha not in commits: # Now read the next few lines and build up a dict of properties # for this commit - props = {} + props: Dict[bytes, bytes] = {} while True: try: line = next(stream) @@ -870,8 +872,8 @@ class Repo(object): safe_decode(orig_filename), range(orig_lineno, orig_lineno + num_lines)) - def blame(self, rev: TBD, file: TBD, incremental: bool = False, **kwargs: Any - ) -> Union[List[List[Union[Optional['Commit'], List[str]]]], Optional[Iterator[BlameEntry]]]: + def blame(self, rev: Union[str, HEAD], file: str, incremental: bool = False, **kwargs: Any + ) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None: """The blame information for the given file at the given revision. :param rev: revision specifier, see git-rev-parse for viable options. @@ -883,25 +885,38 @@ class Repo(object): if incremental: return self.blame_incremental(rev, file, **kwargs) - data = self.git.blame(rev, '--', file, p=True, stdout_as_string=False, **kwargs) - commits: Dict[str, TBD] = {} - blames: List[List[Union[Optional['Commit'], List[str]]]] = [] - - info: Dict[str, TBD] = {} # use Any until TypedDict available + data: bytes = self.git.blame(rev, '--', file, p=True, stdout_as_string=False, **kwargs) + commits: Dict[str, Commit] = {} + blames: List[List[Commit | List[str | bytes] | None]] = [] + + class InfoTD(TypedDict, total=False): + sha: str + id: str + filename: str + summary: str + author: str + author_email: str + author_date: int + committer: str + committer_email: str + committer_date: int + + info: InfoTD = {} keepends = True - for line in data.splitlines(keepends): + for line_bytes in data.splitlines(keepends): try: - line = line.rstrip().decode(defenc) + line_str = line_bytes.rstrip().decode(defenc) except UnicodeDecodeError: firstpart = '' + parts = [] is_binary = True else: # As we don't have an idea when the binary data ends, as it could contain multiple newlines # in the process. So we rely on being able to decode to tell us what is is. # This can absolutely fail even on text files, but even if it does, we should be fine treating it # as binary instead - parts = self.re_whitespace.split(line, 1) + parts = self.re_whitespace.split(line_str, 1) firstpart = parts[0] is_binary = False # end handle decode of line @@ -932,12 +947,20 @@ class Repo(object): # committer-time 1192271832 # committer-tz -0700 - IGNORED BY US role = m.group(0) - if firstpart.endswith('-mail'): - info["%s_email" % role] = parts[-1] - elif firstpart.endswith('-time'): - info["%s_date" % role] = int(parts[-1]) - elif role == firstpart: - info[role] = parts[-1] + if role == 'author': + if firstpart.endswith('-mail'): + info["author_email"] = parts[-1] + elif firstpart.endswith('-time'): + info["author_date"] = int(parts[-1]) + elif role == firstpart: + info["author"] = parts[-1] + elif role == 'committer': + if firstpart.endswith('-mail'): + info["committer_email"] = parts[-1] + elif firstpart.endswith('-time'): + info["committer_date"] = int(parts[-1]) + elif role == firstpart: + info["committer"] = parts[-1] # END distinguish mail,time,name else: # handle @@ -954,26 +977,29 @@ class Repo(object): c = commits.get(sha) if c is None: c = Commit(self, hex_to_bin(sha), - author=Actor._from_string(info['author'] + ' ' + info['author_email']), + author=Actor._from_string(f"{info['author']} {info['author_email']}"), authored_date=info['author_date'], committer=Actor._from_string( - info['committer'] + ' ' + info['committer_email']), + f"{info['committer']} {info['committer_email']}"), committed_date=info['committer_date']) commits[sha] = c - # END if commit objects needs initial creation - if not is_binary: - if line and line[0] == '\t': - line = line[1:] - else: - # NOTE: We are actually parsing lines out of binary data, which can lead to the - # binary being split up along the newline separator. We will append this to the blame - # we are currently looking at, even though it should be concatenated with the last line - # we have seen. - pass - # end handle line contents blames[-1][0] = c + # END if commit objects needs initial creation + if blames[-1][1] is not None: + line: str | bytes + if not is_binary: + if line_str and line_str[0] == '\t': + line_str = line_str[1:] + line = line_str + else: + line = line_bytes + # NOTE: We are actually parsing lines out of binary data, which can lead to the + # binary being split up along the newline separator. We will append this to the + # blame we are currently looking at, even though it should be concatenated with + # the last line we have seen. blames[-1][1].append(line) + info = {'id': sha} # END if we collected commit info # END distinguish filename,summary,rest @@ -981,7 +1007,7 @@ class Repo(object): # END distinguish hexsha vs other information return blames - @classmethod + @ classmethod def init(cls, path: Union[PathLike, None] = None, mkdir: bool = True, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB, expand_vars: bool = True, **kwargs: Any) -> 'Repo': """Initialize a git repository at the given path if specified @@ -1020,7 +1046,7 @@ class Repo(object): git.init(**kwargs) return cls(path, odbt=odbt) - @classmethod + @ classmethod def _clone(cls, git: 'Git', url: PathLike, path: PathLike, odb_default_type: Type[GitCmdObjectDB], progress: Union['RemoteProgress', 'UpdateProgress', Callable[..., 'RemoteProgress'], None] = None, multi_options: Optional[List[str]] = None, **kwargs: Any @@ -1098,9 +1124,9 @@ class Repo(object): :return: ``git.Repo`` (the newly cloned repo)""" return self._clone(self.git, self.common_dir, path, type(self.odb), progress, multi_options, **kwargs) - @classmethod + @ classmethod def clone_from(cls, url: PathLike, to_path: PathLike, progress: Optional[Callable] = None, - env: Optional[Mapping[str, Any]] = None, + env: Optional[Mapping[str, str]] = None, multi_options: Optional[List[str]] = None, **kwargs: Any) -> 'Repo': """Create a clone from the given URL @@ -1122,7 +1148,7 @@ class Repo(object): return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs) def archive(self, ostream: Union[TextIO, BinaryIO], treeish: Optional[str] = None, - prefix: Optional[str] = None, **kwargs: Any) -> 'Repo': + prefix: Optional[str] = None, **kwargs: Any) -> Repo: """Archive the tree at the given revision. :param ostream: file compatible stream object to which the archive will be written as bytes @@ -1169,7 +1195,7 @@ class Repo(object): clazz = self.__class__ return '<%s.%s %r>' % (clazz.__module__, clazz.__name__, self.git_dir) - def currently_rebasing_on(self) -> Union['SymbolicReference', Commit_ish, None]: + def currently_rebasing_on(self) -> Commit | None: """ :return: The commit which is currently being replayed while rebasing. |