summaryrefslogtreecommitdiff
path: root/git/repo/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'git/repo/base.py')
-rw-r--r--git/repo/base.py103
1 files changed, 66 insertions, 37 deletions
diff --git a/git/repo/base.py b/git/repo/base.py
index b9399f62..bedd6a08 100644
--- a/git/repo/base.py
+++ b/git/repo/base.py
@@ -7,6 +7,7 @@ from __future__ import annotations
import logging
import os
import re
+from dataclasses import dataclass
import shlex
import warnings
from gitdb.db.loose import LooseObjectDB
@@ -41,7 +42,7 @@ import gitdb
from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish, assert_never
from typing import (Any, BinaryIO, Callable, Dict,
Iterator, List, Mapping, Optional, Sequence,
- TextIO, Tuple, Type, Union,
+ TextIO, Tuple, Type, TypedDict, Union,
NamedTuple, cast, TYPE_CHECKING)
from git.types import ConfigLevels_Tup
@@ -53,7 +54,6 @@ if TYPE_CHECKING:
from git.objects.submodule.base import UpdateProgress
from git.remote import RemoteProgress
-
# -----------------------------------------------------------
log = logging.getLogger(__name__)
@@ -874,7 +874,7 @@ class Repo(object):
range(orig_lineno, orig_lineno + num_lines))
def blame(self, rev: Union[str, HEAD], file: str, incremental: bool = False, **kwargs: Any
- ) -> Union[List[List[Union[Optional['Commit'], List[str]]]], Optional[Iterator[BlameEntry]]]:
+ ) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None:
"""The blame information for the given file at the given revision.
:param rev: revision specifier, see git-rev-parse for viable options.
@@ -886,25 +886,52 @@ class Repo(object):
if incremental:
return self.blame_incremental(rev, file, **kwargs)
- data = self.git.blame(rev, '--', file, p=True, stdout_as_string=False, **kwargs)
- commits: Dict[str, TBD] = {}
- blames: List[List[Union[Optional['Commit'], List[str]]]] = []
-
- info: Dict[str, TBD] = {} # use Any until TypedDict available
+ data: bytes = self.git.blame(rev, '--', file, p=True, stdout_as_string=False, **kwargs)
+ commits: Dict[str, Commit] = {}
+ blames: List[List[Commit | List[str | bytes] | None]] = []
+
+ class InfoTC(TypedDict, total=False):
+ sha: str
+ id: str
+ filename: str
+ summary: str
+ author: str
+ author_email: str
+ author_date: int
+ committer: str
+ committer_email: str
+ committer_date: int
+
+ @dataclass
+ class InfoDC(Dict[str, Union[str, int]]):
+ sha: str = ''
+ id: str = ''
+ filename: str = ''
+ summary: str = ''
+ author: str = ''
+ author_email: str = ''
+ author_date: int = 0
+ committer: str = ''
+ committer_email: str = ''
+ committer_date: int = 0
+
+ # info: InfoTD = {}
+ info = InfoDC()
keepends = True
- for line in data.splitlines(keepends):
+ for line_bytes in data.splitlines(keepends):
try:
- line = line.rstrip().decode(defenc)
+ line_str = line_bytes.rstrip().decode(defenc)
except UnicodeDecodeError:
firstpart = ''
+ parts = ['']
is_binary = True
else:
# As we don't have an idea when the binary data ends, as it could contain multiple newlines
# in the process. So we rely on being able to decode to tell us what is is.
# This can absolutely fail even on text files, but even if it does, we should be fine treating it
# as binary instead
- parts = self.re_whitespace.split(line, 1)
+ parts = self.re_whitespace.split(line_str, 1)
firstpart = parts[0]
is_binary = False
# end handle decode of line
@@ -916,10 +943,10 @@ class Repo(object):
# another line of blame with the same data
digits = parts[-1].split(" ")
if len(digits) == 3:
- info = {'id': firstpart}
+ info.id = firstpart
blames.append([None, []])
- elif info['id'] != firstpart:
- info = {'id': firstpart}
+ elif info.id != firstpart:
+ info.id = firstpart
blames.append([commits.get(firstpart), []])
# END blame data initialization
else:
@@ -936,9 +963,9 @@ class Repo(object):
# committer-tz -0700 - IGNORED BY US
role = m.group(0)
if firstpart.endswith('-mail'):
- info["%s_email" % role] = parts[-1]
+ info[f"{role}_email"] = parts[-1]
elif firstpart.endswith('-time'):
- info["%s_date" % role] = int(parts[-1])
+ info[f"{role}_date"] = int(parts[-1])
elif role == firstpart:
info[role] = parts[-1]
# END distinguish mail,time,name
@@ -953,38 +980,40 @@ class Repo(object):
info['summary'] = parts[-1]
elif firstpart == '':
if info:
- sha = info['id']
+ sha = info.id
c = commits.get(sha)
if c is None:
c = Commit(self, hex_to_bin(sha),
- author=Actor._from_string(info['author'] + ' ' + info['author_email']),
- authored_date=info['author_date'],
+ author=Actor._from_string(info.author + ' ' + info.author_email),
+ authored_date=info.author_date,
committer=Actor._from_string(
- info['committer'] + ' ' + info['committer_email']),
- committed_date=info['committer_date'])
+ info.committer + ' ' + info.committer_email),
+ committed_date=info.committer_date)
commits[sha] = c
+ blames[-1][0] = c
# END if commit objects needs initial creation
- if not is_binary:
- if line and line[0] == '\t':
- line = line[1:]
- else:
- # NOTE: We are actually parsing lines out of binary data, which can lead to the
- # binary being split up along the newline separator. We will append this to the blame
- # we are currently looking at, even though it should be concatenated with the last line
- # we have seen.
- pass
- # end handle line contents
- blames[-1][0] = c
if blames[-1][1] is not None:
- blames[-1][1].append(line)
- info = {'id': sha}
+ if not is_binary:
+ if line_str and line_str[0] == '\t':
+ line_str = line_str[1:]
+
+ blames[-1][1].append(line_str)
+ else:
+ # NOTE: We are actually parsing lines out of binary data, which can lead to the
+ # binary being split up along the newline separator. We will append this to the
+ # blame we are currently looking at, even though it should be concatenated with
+ # the last line we have seen.
+ blames[-1][1].append(line_bytes)
+ # end handle line contents
+
+ info.id = sha
# END if we collected commit info
# END distinguish filename,summary,rest
# END distinguish author|committer vs filename,summary,rest
# END distinguish hexsha vs other information
return blames
- @classmethod
+ @ classmethod
def init(cls, path: Union[PathLike, None] = None, mkdir: bool = True, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB,
expand_vars: bool = True, **kwargs: Any) -> 'Repo':
"""Initialize a git repository at the given path if specified
@@ -1023,7 +1052,7 @@ class Repo(object):
git.init(**kwargs)
return cls(path, odbt=odbt)
- @classmethod
+ @ classmethod
def _clone(cls, git: 'Git', url: PathLike, path: PathLike, odb_default_type: Type[GitCmdObjectDB],
progress: Union['RemoteProgress', 'UpdateProgress', Callable[..., 'RemoteProgress'], None] = None,
multi_options: Optional[List[str]] = None, **kwargs: Any
@@ -1101,7 +1130,7 @@ class Repo(object):
:return: ``git.Repo`` (the newly cloned repo)"""
return self._clone(self.git, self.common_dir, path, type(self.odb), progress, multi_options, **kwargs)
- @classmethod
+ @ classmethod
def clone_from(cls, url: PathLike, to_path: PathLike, progress: Optional[Callable] = None,
env: Optional[Mapping[str, Any]] = None,
multi_options: Optional[List[str]] = None, **kwargs: Any) -> 'Repo':