summaryrefslogtreecommitdiff
path: root/git/repo/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'git/repo/base.py')
-rw-r--r--git/repo/base.py138
1 files changed, 82 insertions, 56 deletions
diff --git a/git/repo/base.py b/git/repo/base.py
index 5581233b..c0229a84 100644
--- a/git/repo/base.py
+++ b/git/repo/base.py
@@ -3,6 +3,7 @@
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
+from __future__ import annotations
import logging
import os
import re
@@ -37,13 +38,13 @@ import gitdb
# typing ------------------------------------------------------
-from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish
+from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish, assert_never
from typing import (Any, BinaryIO, Callable, Dict,
Iterator, List, Mapping, Optional, Sequence,
TextIO, Tuple, Type, Union,
NamedTuple, cast, TYPE_CHECKING)
-from git.types import ConfigLevels_Tup
+from git.types import ConfigLevels_Tup, TypedDict
if TYPE_CHECKING:
from git.util import IterableList
@@ -52,7 +53,6 @@ if TYPE_CHECKING:
from git.objects.submodule.base import UpdateProgress
from git.remote import RemoteProgress
-
# -----------------------------------------------------------
log = logging.getLogger(__name__)
@@ -200,7 +200,6 @@ class Repo(object):
# END while curpath
if self.git_dir is None:
- self.git_dir = cast(PathLike, self.git_dir)
raise InvalidGitRepositoryError(epath)
self._bare = False
@@ -235,7 +234,7 @@ class Repo(object):
def __enter__(self) -> 'Repo':
return self
- def __exit__(self, exc_type: TBD, exc_value: TBD, traceback: TBD) -> None:
+ def __exit__(self, *args: Any) -> None:
self.close()
def __del__(self) -> None:
@@ -385,13 +384,13 @@ class Repo(object):
:return: created submodules"""
return Submodule.add(self, *args, **kwargs)
- def iter_submodules(self, *args: Any, **kwargs: Any) -> Iterator:
+ def iter_submodules(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]:
"""An iterator yielding Submodule instances, see Traversable interface
for a description of args and kwargs
:return: Iterator"""
return RootModule(self).traverse(*args, **kwargs)
- def submodule_update(self, *args: Any, **kwargs: Any) -> Iterator:
+ def submodule_update(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]:
"""Update the submodules, keeping the repository consistent as it will
take the previous state into consideration. For more information, please
see the documentation of RootModule.update"""
@@ -445,7 +444,7 @@ class Repo(object):
:return: TagReference object """
return TagReference.create(self, path, ref, message, force, **kwargs)
- def delete_tag(self, *tags: TBD) -> None:
+ def delete_tag(self, *tags: TagReference) -> None:
"""Delete the given tag references"""
return TagReference.delete(self, *tags)
@@ -481,10 +480,12 @@ class Repo(object):
raise NotADirectoryError
else:
return osp.normpath(osp.join(repo_dir, "config"))
+ else:
- raise ValueError("Invalid configuration level: %r" % config_level)
+ assert_never(config_level, # type:ignore[unreachable]
+ ValueError(f"Invalid configuration level: {config_level!r}"))
- def config_reader(self, config_level: Optional[Lit_config_levels] = None
+ def config_reader(self, config_level: Optional[Lit_config_levels] = None,
) -> GitConfigParser:
"""
:return:
@@ -775,7 +776,7 @@ class Repo(object):
finalize_process(proc)
return untracked_files
- def ignored(self, *paths: PathLike) -> List[PathLike]:
+ def ignored(self, *paths: PathLike) -> List[str]:
"""Checks if paths are ignored via .gitignore
Doing so using the "git check-ignore" method.
@@ -783,7 +784,7 @@ class Repo(object):
:return: subset of those paths which are ignored
"""
try:
- proc = self.git.check_ignore(*paths)
+ proc: str = self.git.check_ignore(*paths)
except GitCommandError:
return []
return proc.replace("\\\\", "\\").replace('"', "").split("\n")
@@ -795,7 +796,7 @@ class Repo(object):
# reveal_type(self.head.reference) # => Reference
return self.head.reference
- def blame_incremental(self, rev: TBD, file: TBD, **kwargs: Any) -> Optional[Iterator['BlameEntry']]:
+ def blame_incremental(self, rev: str | HEAD, file: str, **kwargs: Any) -> Iterator['BlameEntry']:
"""Iterator for blame information for the given file at the given revision.
Unlike .blame(), this does not return the actual file's contents, only
@@ -809,8 +810,9 @@ class Repo(object):
If you combine all line number ranges outputted by this command, you
should get a continuous range spanning all line numbers in the file.
"""
- data = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs)
- commits: Dict[str, Commit] = {}
+
+ data: bytes = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs)
+ commits: Dict[bytes, Commit] = {}
stream = (line for line in data.split(b'\n') if line)
while True:
@@ -818,15 +820,15 @@ class Repo(object):
line = next(stream) # when exhausted, causes a StopIteration, terminating this function
except StopIteration:
return
- split_line: Tuple[str, str, str, str] = line.split()
- hexsha, orig_lineno_str, lineno_str, num_lines_str = split_line
- lineno = int(lineno_str)
- num_lines = int(num_lines_str)
- orig_lineno = int(orig_lineno_str)
+ split_line = line.split()
+ hexsha, orig_lineno_b, lineno_b, num_lines_b = split_line
+ lineno = int(lineno_b)
+ num_lines = int(num_lines_b)
+ orig_lineno = int(orig_lineno_b)
if hexsha not in commits:
# Now read the next few lines and build up a dict of properties
# for this commit
- props = {}
+ props: Dict[bytes, bytes] = {}
while True:
try:
line = next(stream)
@@ -870,8 +872,8 @@ class Repo(object):
safe_decode(orig_filename),
range(orig_lineno, orig_lineno + num_lines))
- def blame(self, rev: TBD, file: TBD, incremental: bool = False, **kwargs: Any
- ) -> Union[List[List[Union[Optional['Commit'], List[str]]]], Optional[Iterator[BlameEntry]]]:
+ def blame(self, rev: Union[str, HEAD], file: str, incremental: bool = False, **kwargs: Any
+ ) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None:
"""The blame information for the given file at the given revision.
:param rev: revision specifier, see git-rev-parse for viable options.
@@ -883,25 +885,38 @@ class Repo(object):
if incremental:
return self.blame_incremental(rev, file, **kwargs)
- data = self.git.blame(rev, '--', file, p=True, stdout_as_string=False, **kwargs)
- commits: Dict[str, TBD] = {}
- blames: List[List[Union[Optional['Commit'], List[str]]]] = []
-
- info: Dict[str, TBD] = {} # use Any until TypedDict available
+ data: bytes = self.git.blame(rev, '--', file, p=True, stdout_as_string=False, **kwargs)
+ commits: Dict[str, Commit] = {}
+ blames: List[List[Commit | List[str | bytes] | None]] = []
+
+ class InfoTD(TypedDict, total=False):
+ sha: str
+ id: str
+ filename: str
+ summary: str
+ author: str
+ author_email: str
+ author_date: int
+ committer: str
+ committer_email: str
+ committer_date: int
+
+ info: InfoTD = {}
keepends = True
- for line in data.splitlines(keepends):
+ for line_bytes in data.splitlines(keepends):
try:
- line = line.rstrip().decode(defenc)
+ line_str = line_bytes.rstrip().decode(defenc)
except UnicodeDecodeError:
firstpart = ''
+ parts = []
is_binary = True
else:
# As we don't have an idea when the binary data ends, as it could contain multiple newlines
# in the process. So we rely on being able to decode to tell us what is is.
# This can absolutely fail even on text files, but even if it does, we should be fine treating it
# as binary instead
- parts = self.re_whitespace.split(line, 1)
+ parts = self.re_whitespace.split(line_str, 1)
firstpart = parts[0]
is_binary = False
# end handle decode of line
@@ -932,12 +947,20 @@ class Repo(object):
# committer-time 1192271832
# committer-tz -0700 - IGNORED BY US
role = m.group(0)
- if firstpart.endswith('-mail'):
- info["%s_email" % role] = parts[-1]
- elif firstpart.endswith('-time'):
- info["%s_date" % role] = int(parts[-1])
- elif role == firstpart:
- info[role] = parts[-1]
+ if role == 'author':
+ if firstpart.endswith('-mail'):
+ info["author_email"] = parts[-1]
+ elif firstpart.endswith('-time'):
+ info["author_date"] = int(parts[-1])
+ elif role == firstpart:
+ info["author"] = parts[-1]
+ elif role == 'committer':
+ if firstpart.endswith('-mail'):
+ info["committer_email"] = parts[-1]
+ elif firstpart.endswith('-time'):
+ info["committer_date"] = int(parts[-1])
+ elif role == firstpart:
+ info["committer"] = parts[-1]
# END distinguish mail,time,name
else:
# handle
@@ -954,26 +977,29 @@ class Repo(object):
c = commits.get(sha)
if c is None:
c = Commit(self, hex_to_bin(sha),
- author=Actor._from_string(info['author'] + ' ' + info['author_email']),
+ author=Actor._from_string(f"{info['author']} {info['author_email']}"),
authored_date=info['author_date'],
committer=Actor._from_string(
- info['committer'] + ' ' + info['committer_email']),
+ f"{info['committer']} {info['committer_email']}"),
committed_date=info['committer_date'])
commits[sha] = c
- # END if commit objects needs initial creation
- if not is_binary:
- if line and line[0] == '\t':
- line = line[1:]
- else:
- # NOTE: We are actually parsing lines out of binary data, which can lead to the
- # binary being split up along the newline separator. We will append this to the blame
- # we are currently looking at, even though it should be concatenated with the last line
- # we have seen.
- pass
- # end handle line contents
blames[-1][0] = c
+ # END if commit objects needs initial creation
+
if blames[-1][1] is not None:
+ line: str | bytes
+ if not is_binary:
+ if line_str and line_str[0] == '\t':
+ line_str = line_str[1:]
+ line = line_str
+ else:
+ line = line_bytes
+ # NOTE: We are actually parsing lines out of binary data, which can lead to the
+ # binary being split up along the newline separator. We will append this to the
+ # blame we are currently looking at, even though it should be concatenated with
+ # the last line we have seen.
blames[-1][1].append(line)
+
info = {'id': sha}
# END if we collected commit info
# END distinguish filename,summary,rest
@@ -981,7 +1007,7 @@ class Repo(object):
# END distinguish hexsha vs other information
return blames
- @classmethod
+ @ classmethod
def init(cls, path: Union[PathLike, None] = None, mkdir: bool = True, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB,
expand_vars: bool = True, **kwargs: Any) -> 'Repo':
"""Initialize a git repository at the given path if specified
@@ -1020,7 +1046,7 @@ class Repo(object):
git.init(**kwargs)
return cls(path, odbt=odbt)
- @classmethod
+ @ classmethod
def _clone(cls, git: 'Git', url: PathLike, path: PathLike, odb_default_type: Type[GitCmdObjectDB],
progress: Union['RemoteProgress', 'UpdateProgress', Callable[..., 'RemoteProgress'], None] = None,
multi_options: Optional[List[str]] = None, **kwargs: Any
@@ -1098,9 +1124,9 @@ class Repo(object):
:return: ``git.Repo`` (the newly cloned repo)"""
return self._clone(self.git, self.common_dir, path, type(self.odb), progress, multi_options, **kwargs)
- @classmethod
+ @ classmethod
def clone_from(cls, url: PathLike, to_path: PathLike, progress: Optional[Callable] = None,
- env: Optional[Mapping[str, Any]] = None,
+ env: Optional[Mapping[str, str]] = None,
multi_options: Optional[List[str]] = None, **kwargs: Any) -> 'Repo':
"""Create a clone from the given URL
@@ -1122,7 +1148,7 @@ class Repo(object):
return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs)
def archive(self, ostream: Union[TextIO, BinaryIO], treeish: Optional[str] = None,
- prefix: Optional[str] = None, **kwargs: Any) -> 'Repo':
+ prefix: Optional[str] = None, **kwargs: Any) -> Repo:
"""Archive the tree at the given revision.
:param ostream: file compatible stream object to which the archive will be written as bytes
@@ -1169,7 +1195,7 @@ class Repo(object):
clazz = self.__class__
return '<%s.%s %r>' % (clazz.__module__, clazz.__name__, self.git_dir)
- def currently_rebasing_on(self) -> Union['SymbolicReference', Commit_ish, None]:
+ def currently_rebasing_on(self) -> Commit | None:
"""
:return: The commit which is currently being replayed while rebasing.