summaryrefslogtreecommitdiff
path: root/git/repo
diff options
context:
space:
mode:
authorDominic <yobmod@gmail.com>2021-08-03 16:40:48 +0100
committerGitHub <noreply@github.com>2021-08-03 16:40:48 +0100
commitfe54118ec07a68d5dc6f6108510cffc55dfca643 (patch)
tree3025974ca54ef607ee3d4660da4dc242e184f8ea /git/repo
parentd8a639865d02a6bb3f93a233d3caa928d18bc622 (diff)
parent84232f7c71e41e56636f203eb26763a03ab6e945 (diff)
downloadgitpython-fe54118ec07a68d5dc6f6108510cffc55dfca643.tar.gz
Merge pull request #1311 from Yobmod/main
Drop 3.6, increase type strictness
Diffstat (limited to 'git/repo')
-rw-r--r--git/repo/__init__.py2
-rw-r--r--git/repo/base.py138
-rw-r--r--git/repo/fun.py27
3 files changed, 100 insertions, 67 deletions
diff --git a/git/repo/__init__.py b/git/repo/__init__.py
index 712df60d..23c18db8 100644
--- a/git/repo/__init__.py
+++ b/git/repo/__init__.py
@@ -1,3 +1,3 @@
"""Initialize the Repo package"""
# flake8: noqa
-from .base import *
+from .base import Repo as Repo
diff --git a/git/repo/base.py b/git/repo/base.py
index 5581233b..c0229a84 100644
--- a/git/repo/base.py
+++ b/git/repo/base.py
@@ -3,6 +3,7 @@
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
+from __future__ import annotations
import logging
import os
import re
@@ -37,13 +38,13 @@ import gitdb
# typing ------------------------------------------------------
-from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish
+from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish, assert_never
from typing import (Any, BinaryIO, Callable, Dict,
Iterator, List, Mapping, Optional, Sequence,
TextIO, Tuple, Type, Union,
NamedTuple, cast, TYPE_CHECKING)
-from git.types import ConfigLevels_Tup
+from git.types import ConfigLevels_Tup, TypedDict
if TYPE_CHECKING:
from git.util import IterableList
@@ -52,7 +53,6 @@ if TYPE_CHECKING:
from git.objects.submodule.base import UpdateProgress
from git.remote import RemoteProgress
-
# -----------------------------------------------------------
log = logging.getLogger(__name__)
@@ -200,7 +200,6 @@ class Repo(object):
# END while curpath
if self.git_dir is None:
- self.git_dir = cast(PathLike, self.git_dir)
raise InvalidGitRepositoryError(epath)
self._bare = False
@@ -235,7 +234,7 @@ class Repo(object):
def __enter__(self) -> 'Repo':
return self
- def __exit__(self, exc_type: TBD, exc_value: TBD, traceback: TBD) -> None:
+ def __exit__(self, *args: Any) -> None:
self.close()
def __del__(self) -> None:
@@ -385,13 +384,13 @@ class Repo(object):
:return: created submodules"""
return Submodule.add(self, *args, **kwargs)
- def iter_submodules(self, *args: Any, **kwargs: Any) -> Iterator:
+ def iter_submodules(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]:
"""An iterator yielding Submodule instances, see Traversable interface
for a description of args and kwargs
:return: Iterator"""
return RootModule(self).traverse(*args, **kwargs)
- def submodule_update(self, *args: Any, **kwargs: Any) -> Iterator:
+ def submodule_update(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]:
"""Update the submodules, keeping the repository consistent as it will
take the previous state into consideration. For more information, please
see the documentation of RootModule.update"""
@@ -445,7 +444,7 @@ class Repo(object):
:return: TagReference object """
return TagReference.create(self, path, ref, message, force, **kwargs)
- def delete_tag(self, *tags: TBD) -> None:
+ def delete_tag(self, *tags: TagReference) -> None:
"""Delete the given tag references"""
return TagReference.delete(self, *tags)
@@ -481,10 +480,12 @@ class Repo(object):
raise NotADirectoryError
else:
return osp.normpath(osp.join(repo_dir, "config"))
+ else:
- raise ValueError("Invalid configuration level: %r" % config_level)
+ assert_never(config_level, # type:ignore[unreachable]
+ ValueError(f"Invalid configuration level: {config_level!r}"))
- def config_reader(self, config_level: Optional[Lit_config_levels] = None
+ def config_reader(self, config_level: Optional[Lit_config_levels] = None,
) -> GitConfigParser:
"""
:return:
@@ -775,7 +776,7 @@ class Repo(object):
finalize_process(proc)
return untracked_files
- def ignored(self, *paths: PathLike) -> List[PathLike]:
+ def ignored(self, *paths: PathLike) -> List[str]:
"""Checks if paths are ignored via .gitignore
Doing so using the "git check-ignore" method.
@@ -783,7 +784,7 @@ class Repo(object):
:return: subset of those paths which are ignored
"""
try:
- proc = self.git.check_ignore(*paths)
+ proc: str = self.git.check_ignore(*paths)
except GitCommandError:
return []
return proc.replace("\\\\", "\\").replace('"', "").split("\n")
@@ -795,7 +796,7 @@ class Repo(object):
# reveal_type(self.head.reference) # => Reference
return self.head.reference
- def blame_incremental(self, rev: TBD, file: TBD, **kwargs: Any) -> Optional[Iterator['BlameEntry']]:
+ def blame_incremental(self, rev: str | HEAD, file: str, **kwargs: Any) -> Iterator['BlameEntry']:
"""Iterator for blame information for the given file at the given revision.
Unlike .blame(), this does not return the actual file's contents, only
@@ -809,8 +810,9 @@ class Repo(object):
If you combine all line number ranges outputted by this command, you
should get a continuous range spanning all line numbers in the file.
"""
- data = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs)
- commits: Dict[str, Commit] = {}
+
+ data: bytes = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs)
+ commits: Dict[bytes, Commit] = {}
stream = (line for line in data.split(b'\n') if line)
while True:
@@ -818,15 +820,15 @@ class Repo(object):
line = next(stream) # when exhausted, causes a StopIteration, terminating this function
except StopIteration:
return
- split_line: Tuple[str, str, str, str] = line.split()
- hexsha, orig_lineno_str, lineno_str, num_lines_str = split_line
- lineno = int(lineno_str)
- num_lines = int(num_lines_str)
- orig_lineno = int(orig_lineno_str)
+ split_line = line.split()
+ hexsha, orig_lineno_b, lineno_b, num_lines_b = split_line
+ lineno = int(lineno_b)
+ num_lines = int(num_lines_b)
+ orig_lineno = int(orig_lineno_b)
if hexsha not in commits:
# Now read the next few lines and build up a dict of properties
# for this commit
- props = {}
+ props: Dict[bytes, bytes] = {}
while True:
try:
line = next(stream)
@@ -870,8 +872,8 @@ class Repo(object):
safe_decode(orig_filename),
range(orig_lineno, orig_lineno + num_lines))
- def blame(self, rev: TBD, file: TBD, incremental: bool = False, **kwargs: Any
- ) -> Union[List[List[Union[Optional['Commit'], List[str]]]], Optional[Iterator[BlameEntry]]]:
+ def blame(self, rev: Union[str, HEAD], file: str, incremental: bool = False, **kwargs: Any
+ ) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None:
"""The blame information for the given file at the given revision.
:param rev: revision specifier, see git-rev-parse for viable options.
@@ -883,25 +885,38 @@ class Repo(object):
if incremental:
return self.blame_incremental(rev, file, **kwargs)
- data = self.git.blame(rev, '--', file, p=True, stdout_as_string=False, **kwargs)
- commits: Dict[str, TBD] = {}
- blames: List[List[Union[Optional['Commit'], List[str]]]] = []
-
- info: Dict[str, TBD] = {} # use Any until TypedDict available
+ data: bytes = self.git.blame(rev, '--', file, p=True, stdout_as_string=False, **kwargs)
+ commits: Dict[str, Commit] = {}
+ blames: List[List[Commit | List[str | bytes] | None]] = []
+
+ class InfoTD(TypedDict, total=False):
+ sha: str
+ id: str
+ filename: str
+ summary: str
+ author: str
+ author_email: str
+ author_date: int
+ committer: str
+ committer_email: str
+ committer_date: int
+
+ info: InfoTD = {}
keepends = True
- for line in data.splitlines(keepends):
+ for line_bytes in data.splitlines(keepends):
try:
- line = line.rstrip().decode(defenc)
+ line_str = line_bytes.rstrip().decode(defenc)
except UnicodeDecodeError:
firstpart = ''
+ parts = []
is_binary = True
else:
# As we don't have an idea when the binary data ends, as it could contain multiple newlines
# in the process. So we rely on being able to decode to tell us what is is.
# This can absolutely fail even on text files, but even if it does, we should be fine treating it
# as binary instead
- parts = self.re_whitespace.split(line, 1)
+ parts = self.re_whitespace.split(line_str, 1)
firstpart = parts[0]
is_binary = False
# end handle decode of line
@@ -932,12 +947,20 @@ class Repo(object):
# committer-time 1192271832
# committer-tz -0700 - IGNORED BY US
role = m.group(0)
- if firstpart.endswith('-mail'):
- info["%s_email" % role] = parts[-1]
- elif firstpart.endswith('-time'):
- info["%s_date" % role] = int(parts[-1])
- elif role == firstpart:
- info[role] = parts[-1]
+ if role == 'author':
+ if firstpart.endswith('-mail'):
+ info["author_email"] = parts[-1]
+ elif firstpart.endswith('-time'):
+ info["author_date"] = int(parts[-1])
+ elif role == firstpart:
+ info["author"] = parts[-1]
+ elif role == 'committer':
+ if firstpart.endswith('-mail'):
+ info["committer_email"] = parts[-1]
+ elif firstpart.endswith('-time'):
+ info["committer_date"] = int(parts[-1])
+ elif role == firstpart:
+ info["committer"] = parts[-1]
# END distinguish mail,time,name
else:
# handle
@@ -954,26 +977,29 @@ class Repo(object):
c = commits.get(sha)
if c is None:
c = Commit(self, hex_to_bin(sha),
- author=Actor._from_string(info['author'] + ' ' + info['author_email']),
+ author=Actor._from_string(f"{info['author']} {info['author_email']}"),
authored_date=info['author_date'],
committer=Actor._from_string(
- info['committer'] + ' ' + info['committer_email']),
+ f"{info['committer']} {info['committer_email']}"),
committed_date=info['committer_date'])
commits[sha] = c
- # END if commit objects needs initial creation
- if not is_binary:
- if line and line[0] == '\t':
- line = line[1:]
- else:
- # NOTE: We are actually parsing lines out of binary data, which can lead to the
- # binary being split up along the newline separator. We will append this to the blame
- # we are currently looking at, even though it should be concatenated with the last line
- # we have seen.
- pass
- # end handle line contents
blames[-1][0] = c
+ # END if commit objects needs initial creation
+
if blames[-1][1] is not None:
+ line: str | bytes
+ if not is_binary:
+ if line_str and line_str[0] == '\t':
+ line_str = line_str[1:]
+ line = line_str
+ else:
+ line = line_bytes
+ # NOTE: We are actually parsing lines out of binary data, which can lead to the
+ # binary being split up along the newline separator. We will append this to the
+ # blame we are currently looking at, even though it should be concatenated with
+ # the last line we have seen.
blames[-1][1].append(line)
+
info = {'id': sha}
# END if we collected commit info
# END distinguish filename,summary,rest
@@ -981,7 +1007,7 @@ class Repo(object):
# END distinguish hexsha vs other information
return blames
- @classmethod
+ @ classmethod
def init(cls, path: Union[PathLike, None] = None, mkdir: bool = True, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB,
expand_vars: bool = True, **kwargs: Any) -> 'Repo':
"""Initialize a git repository at the given path if specified
@@ -1020,7 +1046,7 @@ class Repo(object):
git.init(**kwargs)
return cls(path, odbt=odbt)
- @classmethod
+ @ classmethod
def _clone(cls, git: 'Git', url: PathLike, path: PathLike, odb_default_type: Type[GitCmdObjectDB],
progress: Union['RemoteProgress', 'UpdateProgress', Callable[..., 'RemoteProgress'], None] = None,
multi_options: Optional[List[str]] = None, **kwargs: Any
@@ -1098,9 +1124,9 @@ class Repo(object):
:return: ``git.Repo`` (the newly cloned repo)"""
return self._clone(self.git, self.common_dir, path, type(self.odb), progress, multi_options, **kwargs)
- @classmethod
+ @ classmethod
def clone_from(cls, url: PathLike, to_path: PathLike, progress: Optional[Callable] = None,
- env: Optional[Mapping[str, Any]] = None,
+ env: Optional[Mapping[str, str]] = None,
multi_options: Optional[List[str]] = None, **kwargs: Any) -> 'Repo':
"""Create a clone from the given URL
@@ -1122,7 +1148,7 @@ class Repo(object):
return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs)
def archive(self, ostream: Union[TextIO, BinaryIO], treeish: Optional[str] = None,
- prefix: Optional[str] = None, **kwargs: Any) -> 'Repo':
+ prefix: Optional[str] = None, **kwargs: Any) -> Repo:
"""Archive the tree at the given revision.
:param ostream: file compatible stream object to which the archive will be written as bytes
@@ -1169,7 +1195,7 @@ class Repo(object):
clazz = self.__class__
return '<%s.%s %r>' % (clazz.__module__, clazz.__name__, self.git_dir)
- def currently_rebasing_on(self) -> Union['SymbolicReference', Commit_ish, None]:
+ def currently_rebasing_on(self) -> Commit | None:
"""
:return: The commit which is currently being replayed while rebasing.
diff --git a/git/repo/fun.py b/git/repo/fun.py
index 7d5c7823..1a83dd3d 100644
--- a/git/repo/fun.py
+++ b/git/repo/fun.py
@@ -1,4 +1,5 @@
"""Package with general repository related functions"""
+from __future__ import annotations
import os
import stat
from string import digits
@@ -18,12 +19,13 @@ from git.cmd import Git
# Typing ----------------------------------------------------------------------
from typing import Union, Optional, cast, TYPE_CHECKING
-
+from git.types import Commit_ish
if TYPE_CHECKING:
from git.types import PathLike
from .base import Repo
from git.db import GitCmdObjectDB
+ from git.refs.reference import Reference
from git.objects import Commit, TagObject, Blob, Tree
from git.refs.tag import Tag
@@ -202,7 +204,7 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']:
raise NotImplementedError("commit by message search ( regex )")
# END handle search
- obj = cast(Object, None) # not ideal. Should use guards
+ obj: Union[Commit_ish, 'Reference', None] = None
ref = None
output_type = "commit"
start = 0
@@ -222,14 +224,16 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']:
ref = repo.head.ref
else:
if token == '@':
- ref = name_to_object(repo, rev[:start], return_ref=True)
+ ref = cast('Reference', name_to_object(repo, rev[:start], return_ref=True))
else:
- obj = name_to_object(repo, rev[:start])
+ obj = cast(Commit_ish, name_to_object(repo, rev[:start]))
# END handle token
# END handle refname
+ else:
+ assert obj is not None
if ref is not None:
- obj = ref.commit
+ obj = cast('Commit', ref.commit)
# END handle ref
# END initialize obj on first token
@@ -247,11 +251,13 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']:
pass # default
elif output_type == 'tree':
try:
+ obj = cast(Commit_ish, obj)
obj = to_commit(obj).tree
except (AttributeError, ValueError):
pass # error raised later
# END exception handling
elif output_type in ('', 'blob'):
+ obj = cast('TagObject', obj)
if obj and obj.type == 'tag':
obj = deref_tag(obj)
else:
@@ -280,13 +286,13 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']:
obj = Object.new_from_sha(repo, hex_to_bin(entry.newhexsha))
# make it pass the following checks
- output_type = None
+ output_type = ''
else:
raise ValueError("Invalid output type: %s ( in %s )" % (output_type, rev))
# END handle output type
# empty output types don't require any specific type, its just about dereferencing tags
- if output_type and obj.type != output_type:
+ if output_type and obj and obj.type != output_type:
raise ValueError("Could not accommodate requested object type %r, got %s" % (output_type, obj.type))
# END verify output type
@@ -319,6 +325,7 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']:
parsed_to = start
# handle hierarchy walk
try:
+ obj = cast(Commit_ish, obj)
if token == "~":
obj = to_commit(obj)
for _ in range(num):
@@ -340,14 +347,14 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']:
# END end handle tag
except (IndexError, AttributeError) as e:
raise BadName(
- "Invalid revision spec '%s' - not enough "
- "parent commits to reach '%s%i'" % (rev, token, num)) from e
+ f"Invalid revision spec '{rev}' - not enough "
+ f"parent commits to reach '{token}{int(num)}'") from e
# END exception handling
# END parse loop
# still no obj ? Its probably a simple name
if obj is None:
- obj = name_to_object(repo, rev)
+ obj = cast(Commit_ish, name_to_object(repo, rev))
parsed_to = lr
# END handle simple name