From 39d37d550963a6a64e66ba3d6b9f4b077270a3ad Mon Sep 17 00:00:00 2001 From: Yobmod Date: Sat, 31 Jul 2021 22:26:20 +0100 Subject: replace some TBDs wiht runtime types --- git/cmd.py | 4 ++-- git/compat.py | 18 ------------------ git/diff.py | 5 +++-- git/index/base.py | 11 ++++++----- git/objects/submodule/base.py | 3 ++- git/refs/reference.py | 2 +- git/refs/symbolic.py | 4 ++-- git/util.py | 5 +++-- 8 files changed, 19 insertions(+), 33 deletions(-) (limited to 'git') diff --git a/git/cmd.py b/git/cmd.py index f8212745..cbfde74c 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -68,7 +68,7 @@ __all__ = ('Git',) # Documentation ## @{ -def handle_process_output(process: subprocess.Popen, +def handle_process_output(process: Union[subprocess.Popen, 'Git.AutoInterrupt'], stdout_handler: Union[None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None], @@ -77,7 +77,7 @@ def handle_process_output(process: subprocess.Popen, Callable[[AnyStr], None], Callable[[List[AnyStr]], None]], finalizer: Union[None, - Callable[[subprocess.Popen], None]] = None, + Callable[[Union[subprocess.Popen, 'Git.AutoInterrupt']], None]] = None, decode_streams: bool = True) -> None: """Registers for notifications to learn that process output is ready to read, and dispatches lines to the respective line handlers. diff --git a/git/compat.py b/git/compat.py index 7a0a15d2..988c04ef 100644 --- a/git/compat.py +++ b/git/compat.py @@ -29,8 +29,6 @@ from typing import ( Union, overload, ) -from git.types import TBD - # --------------------------------------------------------------------------- @@ -97,19 +95,3 @@ def win_encode(s: Optional[AnyStr]) -> Optional[bytes]: elif s is not None: raise TypeError('Expected bytes or text, but got %r' % (s,)) return None - - -# type: ignore ## mypy cannot understand dynamic class creation -def with_metaclass(meta: Type[Any], *bases: Any) -> TBD: - """copied from https://github.com/Byron/bcore/blob/master/src/python/butility/future.py#L15""" - - class metaclass(meta): # type: ignore - __call__ = type.__call__ - __init__ = type.__init__ # type: ignore - - def __new__(cls, name: str, nbases: Optional[Tuple[int, ...]], d: Dict[str, Any]) -> TBD: - if nbases is None: - return type.__new__(cls, name, (), d) - return meta(name, bases, d) - - return metaclass(meta.__name__ + 'Helper', None, {}) # type: ignore diff --git a/git/diff.py b/git/diff.py index 74ca0b64..fc16b73e 100644 --- a/git/diff.py +++ b/git/diff.py @@ -16,7 +16,7 @@ from .objects.util import mode_str_to_int # typing ------------------------------------------------------------------ from typing import Any, Iterator, List, Match, Optional, Tuple, Type, TypeVar, Union, TYPE_CHECKING, cast -from git.types import PathLike, TBD, Literal +from git.types import PathLike, Literal if TYPE_CHECKING: from .objects.tree import Tree @@ -24,6 +24,7 @@ if TYPE_CHECKING: from git.repo.base import Repo from git.objects.base import IndexObject from subprocess import Popen + from git import Git Lit_change_type = Literal['A', 'D', 'C', 'M', 'R', 'T', 'U'] @@ -442,7 +443,7 @@ class Diff(object): return None @ classmethod - def _index_from_patch_format(cls, repo: 'Repo', proc: TBD) -> DiffIndex: + def _index_from_patch_format(cls, repo: 'Repo', proc: Union['Popen', 'Git.AutoInterrupt']) -> DiffIndex: """Create a new DiffIndex from the given text which must be in patch format :param repo: is the repository we are operating on - it is required :param stream: result of 'git diff' as a stream (supporting file protocol) diff --git a/git/index/base.py b/git/index/base.py index 6452419c..4c8b923a 100644 --- a/git/index/base.py +++ b/git/index/base.py @@ -70,7 +70,7 @@ from .util import ( from typing import (Any, BinaryIO, Callable, Dict, IO, Iterable, Iterator, List, NoReturn, Sequence, TYPE_CHECKING, Tuple, Type, Union) -from git.types import Commit_ish, PathLike, TBD +from git.types import Commit_ish, PathLike if TYPE_CHECKING: from subprocess import Popen @@ -181,7 +181,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): self.version, self.entries, self._extension_data, _conten_sha = read_cache(stream) return self - def _entries_sorted(self) -> List[TBD]: + def _entries_sorted(self) -> List[IndexEntry]: """:return: list of entries, in a sorted fashion, first by path, then by stage""" return sorted(self.entries.values(), key=lambda e: (e.path, e.stage)) @@ -427,8 +427,8 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # END path exception handling # END for each path - def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item: TBD, fmakeexc: Callable[..., GitError], - fprogress: Callable[[PathLike, bool, TBD], None], + def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item: PathLike, fmakeexc: Callable[..., GitError], + fprogress: Callable[[PathLike, bool, PathLike], None], read_from_stdout: bool = True) -> Union[None, str]: """Write path to proc.stdin and make sure it processes the item, including progress. @@ -492,12 +492,13 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): are at stage 3 will not have a stage 3 entry. """ is_unmerged_blob = lambda t: t[0] != 0 - path_map: Dict[PathLike, List[Tuple[TBD, Blob]]] = {} + path_map: Dict[PathLike, List[Tuple[StageType, Blob]]] = {} for stage, blob in self.iter_blobs(is_unmerged_blob): path_map.setdefault(blob.path, []).append((stage, blob)) # END for each unmerged blob for line in path_map.values(): line.sort() + return path_map @ classmethod diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py index 559d2585..d306c91d 100644 --- a/git/objects/submodule/base.py +++ b/git/objects/submodule/base.py @@ -379,6 +379,7 @@ class Submodule(IndexObject, TraversableIterableObj): :return: The newly created submodule instance :note: works atomically, such that no change will be done if the repository update fails for instance""" + if repo.bare: raise InvalidGitRepositoryError("Cannot add submodules to bare repositories") # END handle bare repos @@ -434,7 +435,7 @@ class Submodule(IndexObject, TraversableIterableObj): url = urls[0] else: # clone new repo - kwargs: Dict[str, Union[bool, int, Sequence[TBD]]] = {'n': no_checkout} + kwargs: Dict[str, Union[bool, int, str, Sequence[TBD]]] = {'n': no_checkout} if not branch_is_default: kwargs['b'] = br.name # END setup checkout-branch diff --git a/git/refs/reference.py b/git/refs/reference.py index a3647fb3..2a33fbff 100644 --- a/git/refs/reference.py +++ b/git/refs/reference.py @@ -8,7 +8,7 @@ from .symbolic import SymbolicReference, T_References # typing ------------------------------------------------------------------ from typing import Any, Callable, Iterator, Type, Union, TYPE_CHECKING # NOQA -from git.types import Commit_ish, PathLike, TBD, Literal, _T # NOQA +from git.types import Commit_ish, PathLike, _T # NOQA if TYPE_CHECKING: from git.repo import Repo diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py index b4a933aa..1c56c043 100644 --- a/git/refs/symbolic.py +++ b/git/refs/symbolic.py @@ -21,8 +21,8 @@ from .log import RefLog # typing ------------------------------------------------------------------ -from typing import Any, Iterator, List, Match, Optional, Tuple, Type, TypeVar, Union, TYPE_CHECKING, cast # NOQA -from git.types import Commit_ish, PathLike, TBD, Literal # NOQA +from typing import Any, Iterator, List, Tuple, Type, TypeVar, Union, TYPE_CHECKING, cast # NOQA +from git.types import Commit_ish, PathLike # NOQA if TYPE_CHECKING: from git.repo import Repo diff --git a/git/util.py b/git/util.py index 92d95379..8056804a 100644 --- a/git/util.py +++ b/git/util.py @@ -38,6 +38,7 @@ if TYPE_CHECKING: from git.remote import Remote from git.repo.base import Repo from git.config import GitConfigParser, SectionConstraint + from git import Git # from git.objects.base import IndexObject @@ -379,7 +380,7 @@ def get_user_id() -> str: return "%s@%s" % (getpass.getuser(), platform.node()) -def finalize_process(proc: subprocess.Popen, **kwargs: Any) -> None: +def finalize_process(proc: Union[subprocess.Popen, 'Git.AutoInterrupt'], **kwargs: Any) -> None: """Wait for the process (clone, fetch, pull or push) and handle its errors accordingly""" # TODO: No close proc-streams?? proc.wait(**kwargs) @@ -1033,7 +1034,7 @@ class IterableList(List[T_IterableObj]): class IterableClassWatcher(type): """ Metaclass that watches """ - def __init__(cls, name: str, bases: List, clsdict: Dict) -> None: + def __init__(cls, name: str, bases: Tuple, clsdict: Dict) -> None: for base in bases: if type(base) == IterableClassWatcher: warnings.warn(f"GitPython Iterable subclassed by {name}. " -- cgit v1.2.1 From c878771e3a31c983a0c3468396ed33a532f87e98 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Sat, 31 Jul 2021 22:59:11 +0100 Subject: replace more TBDs wiht runtime types --- git/cmd.py | 12 ++++++------ git/config.py | 11 ++++++----- git/remote.py | 10 +++++----- git/repo/base.py | 9 +++++---- 4 files changed, 22 insertions(+), 20 deletions(-) (limited to 'git') diff --git a/git/cmd.py b/git/cmd.py index cbfde74c..85a5fbe9 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -421,15 +421,15 @@ class Git(LazyMixin): return getattr(self.proc, attr) # TODO: Bad choice to mimic `proc.wait()` but with different args. - def wait(self, stderr: Union[None, bytes] = b'') -> int: + def wait(self, stderr: Union[None, str, bytes] = b'') -> int: """Wait for the process and return its status code. :param stderr: Previously read value of stderr, in case stderr is already closed. :warn: may deadlock if output or error pipes are used and not handled separately. :raise GitCommandError: if the return status is not 0""" if stderr is None: - stderr = b'' - stderr = force_bytes(data=stderr, encoding='utf-8') + stderr_b = b'' + stderr_b = force_bytes(data=stderr, encoding='utf-8') if self.proc is not None: status = self.proc.wait() @@ -437,11 +437,11 @@ class Git(LazyMixin): def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes: if stream: try: - return stderr + force_bytes(stream.read()) + return stderr_b + force_bytes(stream.read()) except ValueError: - return stderr or b'' + return stderr_b or b'' else: - return stderr or b'' + return stderr_b or b'' if status != 0: errstr = read_all_from_possibly_closed_stream(self.proc.stderr) diff --git a/git/config.py b/git/config.py index 011d0e0b..3565eece 100644 --- a/git/config.py +++ b/git/config.py @@ -33,7 +33,7 @@ import configparser as cp from typing import (Any, Callable, Generic, IO, List, Dict, Sequence, TYPE_CHECKING, Tuple, TypeVar, Union, cast, overload) -from git.types import Lit_config_levels, ConfigLevels_Tup, PathLike, TBD, assert_never, _T +from git.types import Lit_config_levels, ConfigLevels_Tup, PathLike, assert_never, _T if TYPE_CHECKING: from git.repo.base import Repo @@ -72,7 +72,7 @@ CONDITIONAL_INCLUDE_REGEXP = re.compile(r"(?<=includeIf )\"(gitdir|gitdir/i|onbr class MetaParserBuilder(abc.ABCMeta): """Utlity class wrapping base-class methods into decorators that assure read-only properties""" - def __new__(cls, name: str, bases: TBD, clsdict: Dict[str, Any]) -> TBD: + def __new__(cls, name: str, bases: Tuple, clsdict: Dict[str, Any]) -> 'MetaParserBuilder': """ Equip all base-class methods with a needs_values decorator, and all non-const methods with a set_dirty_and_flush_changes decorator in addition to that.""" @@ -617,12 +617,12 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder): def write_section(name: str, section_dict: _OMD) -> None: fp.write(("[%s]\n" % name).encode(defenc)) - values: Sequence[Union[str, bytes, int, float, bool]] + values: Sequence[str] # runtime only gets str in tests, but should be whatever _OMD stores + v: str for (key, values) in section_dict.items_all(): if key == "__name__": continue - v: Union[str, bytes, int, float, bool] for v in values: fp.write(("\t%s = %s\n" % (key, self._value_to_string(v).replace('\n', '\n\t'))).encode(defenc)) # END if key is not __name__ @@ -630,7 +630,8 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder): if self._defaults: write_section(cp.DEFAULTSECT, self._defaults) - value: TBD + value: _OMD + for name, value in self._sections.items(): write_section(name, value) diff --git a/git/remote.py b/git/remote.py index 11007cb6..c141519a 100644 --- a/git/remote.py +++ b/git/remote.py @@ -37,10 +37,10 @@ from .refs import ( # typing------------------------------------------------------- -from typing import (Any, Callable, Dict, Iterator, List, NoReturn, Optional, Sequence, # NOQA[TC002] +from typing import (Any, Callable, Dict, Iterator, List, NoReturn, Optional, Sequence, TYPE_CHECKING, Type, Union, cast, overload) -from git.types import PathLike, Literal, TBD, Commit_ish # NOQA[TC002] +from git.types import PathLike, Literal, Commit_ish if TYPE_CHECKING: from git.repo.base import Repo @@ -50,7 +50,6 @@ if TYPE_CHECKING: flagKeyLiteral = Literal[' ', '!', '+', '-', '*', '=', 't', '?'] - # def is_flagKeyLiteral(inp: str) -> TypeGuard[flagKeyLiteral]: # return inp in [' ', '!', '+', '-', '=', '*', 't', '?'] @@ -707,9 +706,10 @@ class Remote(LazyMixin, IterableObj): self.repo.git.remote(scmd, self.name, **kwargs) return self - def _get_fetch_info_from_stderr(self, proc: TBD, + def _get_fetch_info_from_stderr(self, proc: 'Git.AutoInterrupt', progress: Union[Callable[..., Any], RemoteProgress, None] ) -> IterableList['FetchInfo']: + progress = to_progress_instance(progress) # skip first line as it is some remote info we are not interested in @@ -768,7 +768,7 @@ class Remote(LazyMixin, IterableObj): log.warning("Git informed while fetching: %s", err_line.strip()) return output - def _get_push_info(self, proc: TBD, + def _get_push_info(self, proc: 'Git.AutoInterrupt', progress: Union[Callable[..., Any], RemoteProgress, None]) -> IterableList[PushInfo]: progress = to_progress_instance(progress) diff --git a/git/repo/base.py b/git/repo/base.py index 5581233b..07cf7adf 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -235,7 +235,7 @@ class Repo(object): def __enter__(self) -> 'Repo': return self - def __exit__(self, exc_type: TBD, exc_value: TBD, traceback: TBD) -> None: + def __exit__(self, *args: Any) -> None: self.close() def __del__(self) -> None: @@ -445,7 +445,7 @@ class Repo(object): :return: TagReference object """ return TagReference.create(self, path, ref, message, force, **kwargs) - def delete_tag(self, *tags: TBD) -> None: + def delete_tag(self, *tags: TagReference) -> None: """Delete the given tag references""" return TagReference.delete(self, *tags) @@ -795,7 +795,7 @@ class Repo(object): # reveal_type(self.head.reference) # => Reference return self.head.reference - def blame_incremental(self, rev: TBD, file: TBD, **kwargs: Any) -> Optional[Iterator['BlameEntry']]: + def blame_incremental(self, rev: Union[str, HEAD], file: str, **kwargs: Any) -> Iterator['BlameEntry']: """Iterator for blame information for the given file at the given revision. Unlike .blame(), this does not return the actual file's contents, only @@ -809,6 +809,7 @@ class Repo(object): If you combine all line number ranges outputted by this command, you should get a continuous range spanning all line numbers in the file. """ + data = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs) commits: Dict[str, Commit] = {} @@ -870,7 +871,7 @@ class Repo(object): safe_decode(orig_filename), range(orig_lineno, orig_lineno + num_lines)) - def blame(self, rev: TBD, file: TBD, incremental: bool = False, **kwargs: Any + def blame(self, rev: Union[str, HEAD], file: str, incremental: bool = False, **kwargs: Any ) -> Union[List[List[Union[Optional['Commit'], List[str]]]], Optional[Iterator[BlameEntry]]]: """The blame information for the given file at the given revision. -- cgit v1.2.1 From 2163322ef62fa97573ac94298261161fd9721993 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 14:00:33 +0100 Subject: increase mypy strictness (warn unused ignored) --- git/cmd.py | 2 +- git/config.py | 16 ++++++++-------- git/objects/util.py | 2 +- git/util.py | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) (limited to 'git') diff --git a/git/cmd.py b/git/cmd.py index 85a5fbe9..9d070367 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -164,7 +164,7 @@ CREATE_NO_WINDOW = 0x08000000 ## CREATE_NEW_PROCESS_GROUP is needed to allow killing it afterwards, # see https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal -PROC_CREATIONFLAGS = (CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined] +PROC_CREATIONFLAGS = (CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP if is_win else 0) diff --git a/git/config.py b/git/config.py index 3565eece..91bf65d3 100644 --- a/git/config.py +++ b/git/config.py @@ -44,10 +44,10 @@ T_OMD_value = TypeVar('T_OMD_value', str, bytes, int, float, bool) if sys.version_info[:3] < (3, 7, 2): # typing.Ordereddict not added until py 3.7.2 - from collections import OrderedDict # type: ignore # until 3.6 dropped - OrderedDict_OMD = OrderedDict # type: ignore # until 3.6 dropped + from collections import OrderedDict + OrderedDict_OMD = OrderedDict else: - from typing import OrderedDict # type: ignore # until 3.6 dropped + from typing import OrderedDict OrderedDict_OMD = OrderedDict[str, List[T_OMD_value]] # type: ignore[assignment, misc] # ------------------------------------------------------------- @@ -177,7 +177,7 @@ class SectionConstraint(Generic[T_ConfigParser]): class _OMD(OrderedDict_OMD): """Ordered multi-dict.""" - def __setitem__(self, key: str, value: _T) -> None: # type: ignore[override] + def __setitem__(self, key: str, value: _T) -> None: super(_OMD, self).__setitem__(key, [value]) def add(self, key: str, value: Any) -> None: @@ -203,8 +203,8 @@ class _OMD(OrderedDict_OMD): prior = super(_OMD, self).__getitem__(key) prior[-1] = value - def get(self, key: str, default: Union[_T, None] = None) -> Union[_T, None]: # type: ignore - return super(_OMD, self).get(key, [default])[-1] # type: ignore + def get(self, key: str, default: Union[_T, None] = None) -> Union[_T, None]: + return super(_OMD, self).get(key, [default])[-1] def getall(self, key: str) -> List[_T]: return super(_OMD, self).__getitem__(key) @@ -299,9 +299,9 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder): :param repo: Reference to repository to use if [includeIf] sections are found in configuration files. """ - cp.RawConfigParser.__init__(self, dict_type=_OMD) # type: ignore[arg-type] + cp.RawConfigParser.__init__(self, dict_type=_OMD) self._dict: Callable[..., _OMD] # type: ignore[assignment] # mypy/typeshed bug - self._defaults: _OMD # type: ignore[assignment] # mypy/typeshed bug + self._defaults: _OMD self._sections: _OMD # type: ignore[assignment] # mypy/typeshed bug # Used in python 3, needs to stay in sync with sections for underlying implementation to work diff --git a/git/objects/util.py b/git/objects/util.py index f627211e..d3842cfb 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -346,7 +346,7 @@ class Traversable(Protocol): if not as_edge: out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id) - out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) # type: ignore + out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) return out # overloads in subclasses (mypy does't allow typing self: subclass) # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]] diff --git a/git/util.py b/git/util.py index 8056804a..63060530 100644 --- a/git/util.py +++ b/git/util.py @@ -403,8 +403,8 @@ def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[ try: p = osp.expanduser(p) # type: ignore if expand_vars: - p = osp.expandvars(p) # type: ignore - return osp.normpath(osp.abspath(p)) # type: ignore + p = osp.expandvars(p) + return osp.normpath(osp.abspath(p)) except Exception: return None -- cgit v1.2.1 From 91fce331de16de6039c94cd4d7314184a5763e61 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 14:56:03 +0100 Subject: increase mypy strictness (warn unused ignored and warn unreachable) --- git/cmd.py | 8 +++----- git/config.py | 3 ++- git/diff.py | 4 ++-- git/index/base.py | 1 - git/objects/fun.py | 2 +- git/objects/tree.py | 2 +- git/objects/util.py | 8 +++++--- git/repo/base.py | 1 - git/repo/fun.py | 21 ++++++++++++++------- 9 files changed, 28 insertions(+), 22 deletions(-) (limited to 'git') diff --git a/git/cmd.py b/git/cmd.py index 9d070367..e690dc12 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -42,7 +42,7 @@ from .util import ( from typing import (Any, AnyStr, BinaryIO, Callable, Dict, IO, Iterator, List, Mapping, Sequence, TYPE_CHECKING, TextIO, Tuple, Union, cast, overload) -from git.types import PathLike, Literal +from git.types import PathLike, Literal, TBD if TYPE_CHECKING: from git.repo.base import Repo @@ -575,8 +575,8 @@ class Git(LazyMixin): self._environment: Dict[str, str] = {} # cached command slots - self.cat_file_header = None - self.cat_file_all = None + self.cat_file_header: Union[None, TBD] = None + self.cat_file_all: Union[None, TBD] = None def __getattr__(self, name: str) -> Any: """A convenience method as it allows to call the command as if it was @@ -1012,8 +1012,6 @@ class Git(LazyMixin): @classmethod def __unpack_args(cls, arg_list: Sequence[str]) -> List[str]: - if not isinstance(arg_list, (list, tuple)): - return [str(arg_list)] outlist = [] for arg in arg_list: diff --git a/git/config.py b/git/config.py index 91bf65d3..2a5aa142 100644 --- a/git/config.py +++ b/git/config.py @@ -236,7 +236,8 @@ def get_config_path(config_level: Lit_config_levels) -> str: raise ValueError("No repo to get repository configuration from. Use Repo._get_config_path") else: # Should not reach here. Will raise ValueError if does. Static typing will warn missing elifs - assert_never(config_level, ValueError(f"Invalid configuration level: {config_level!r}")) + assert_never(config_level, # type: ignore[unreachable] + ValueError(f"Invalid configuration level: {config_level!r}")) class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder): diff --git a/git/diff.py b/git/diff.py index fc16b73e..cea66d7e 100644 --- a/git/diff.py +++ b/git/diff.py @@ -456,8 +456,8 @@ class Diff(object): # for now, we have to bake the stream text = b''.join(text_list) index: 'DiffIndex' = DiffIndex() - previous_header = None - header = None + previous_header: Union[Match[bytes], None] = None + header: Union[Match[bytes], None] = None a_path, b_path = None, None # for mypy a_mode, b_mode = None, None # for mypy for _header in cls.re_header.finditer(text): diff --git a/git/index/base.py b/git/index/base.py index 4c8b923a..102703e6 100644 --- a/git/index/base.py +++ b/git/index/base.py @@ -1202,7 +1202,6 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): handle_stderr(proc, checked_out_files) return checked_out_files # END paths handling - assert "Should not reach this point" @ default_index def reset(self, commit: Union[Commit, 'Reference', str] = 'HEAD', working_tree: bool = False, diff --git a/git/objects/fun.py b/git/objects/fun.py index d6cdafe1..19b4e525 100644 --- a/git/objects/fun.py +++ b/git/objects/fun.py @@ -51,7 +51,7 @@ def tree_to_stream(entries: Sequence[EntryTup], write: Callable[['ReadableBuffer if isinstance(name, str): name_bytes = name.encode(defenc) else: - name_bytes = name + name_bytes = name # type: ignore[unreachable] # check runtime types - is always str? write(b''.join((mode_str, b' ', name_bytes, b'\0', binsha))) # END for each item diff --git a/git/objects/tree.py b/git/objects/tree.py index 0cceb59a..22531895 100644 --- a/git/objects/tree.py +++ b/git/objects/tree.py @@ -215,7 +215,7 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): super(Tree, self).__init__(repo, binsha, mode, path) @ classmethod - def _get_intermediate_items(cls, index_object: 'Tree', + def _get_intermediate_items(cls, index_object: IndexObjUnion, ) -> Union[Tuple['Tree', ...], Tuple[()]]: if index_object.type == "tree": return tuple(index_object._iter_convert_to_object(index_object._cache)) diff --git a/git/objects/util.py b/git/objects/util.py index d3842cfb..9c9ce773 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -167,7 +167,7 @@ def from_timestamp(timestamp: float, tz_offset: float) -> datetime: return utc_dt -def parse_date(string_date: str) -> Tuple[int, int]: +def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: """ Parse the given date as one of the following @@ -182,8 +182,10 @@ def parse_date(string_date: str) -> Tuple[int, int]: :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. """ if isinstance(string_date, datetime) and string_date.tzinfo: - offset = -int(string_date.utcoffset().total_seconds()) + offset = -int(string_date.utcoffset().total_seconds()) # type: ignore[union-attr] return int(string_date.astimezone(utc).timestamp()), offset + else: + assert isinstance(string_date, str) # for mypy # git time try: @@ -338,7 +340,7 @@ class Traversable(Protocol): """ # Commit and Submodule have id.__attribute__ as IterableObj # Tree has id.__attribute__ inherited from IndexObject - if isinstance(self, (TraversableIterableObj, Has_id_attribute)): + if isinstance(self, Has_id_attribute): id = self._id_attribute_ else: id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ diff --git a/git/repo/base.py b/git/repo/base.py index 07cf7adf..2609bf55 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -200,7 +200,6 @@ class Repo(object): # END while curpath if self.git_dir is None: - self.git_dir = cast(PathLike, self.git_dir) raise InvalidGitRepositoryError(epath) self._bare = False diff --git a/git/repo/fun.py b/git/repo/fun.py index 7d5c7823..b1b330c4 100644 --- a/git/repo/fun.py +++ b/git/repo/fun.py @@ -1,4 +1,6 @@ """Package with general repository related functions""" +from git.refs.reference import Reference +from git.types import Commit_ish import os import stat from string import digits @@ -202,7 +204,7 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']: raise NotImplementedError("commit by message search ( regex )") # END handle search - obj = cast(Object, None) # not ideal. Should use guards + obj: Union[Commit_ish, Reference, None] = None ref = None output_type = "commit" start = 0 @@ -222,14 +224,16 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']: ref = repo.head.ref else: if token == '@': - ref = name_to_object(repo, rev[:start], return_ref=True) + ref = cast(Reference, name_to_object(repo, rev[:start], return_ref=True)) else: - obj = name_to_object(repo, rev[:start]) + obj = cast(Commit_ish, name_to_object(repo, rev[:start])) # END handle token # END handle refname + else: + assert obj is not None if ref is not None: - obj = ref.commit + obj = cast(Commit, ref.commit) # END handle ref # END initialize obj on first token @@ -247,11 +251,13 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']: pass # default elif output_type == 'tree': try: + obj = cast(Object, obj) obj = to_commit(obj).tree except (AttributeError, ValueError): pass # error raised later # END exception handling elif output_type in ('', 'blob'): + obj = cast(TagObject, obj) if obj and obj.type == 'tag': obj = deref_tag(obj) else: @@ -280,13 +286,13 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']: obj = Object.new_from_sha(repo, hex_to_bin(entry.newhexsha)) # make it pass the following checks - output_type = None + output_type = '' else: raise ValueError("Invalid output type: %s ( in %s )" % (output_type, rev)) # END handle output type # empty output types don't require any specific type, its just about dereferencing tags - if output_type and obj.type != output_type: + if output_type and obj and obj.type != output_type: raise ValueError("Could not accommodate requested object type %r, got %s" % (output_type, obj.type)) # END verify output type @@ -319,6 +325,7 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']: parsed_to = start # handle hierarchy walk try: + obj = cast(Commit_ish, obj) if token == "~": obj = to_commit(obj) for _ in range(num): @@ -347,7 +354,7 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']: # still no obj ? Its probably a simple name if obj is None: - obj = name_to_object(repo, rev) + obj = cast(Commit_ish, name_to_object(repo, rev)) parsed_to = lr # END handle simple name -- cgit v1.2.1 From 15ace876d98d70c48a354ec8f526d6c8ad6b8d97 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 15:30:15 +0100 Subject: rmv 3.6 from CI matrix --- git/cmd.py | 3 +-- git/config.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) (limited to 'git') diff --git a/git/cmd.py b/git/cmd.py index e690dc12..78fa3c9d 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -164,8 +164,7 @@ CREATE_NO_WINDOW = 0x08000000 ## CREATE_NEW_PROCESS_GROUP is needed to allow killing it afterwards, # see https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal -PROC_CREATIONFLAGS = (CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP - if is_win else 0) +PROC_CREATIONFLAGS = (CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP if is_win else 0) class Git(LazyMixin): diff --git a/git/config.py b/git/config.py index 2a5aa142..293281d2 100644 --- a/git/config.py +++ b/git/config.py @@ -301,9 +301,9 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder): """ cp.RawConfigParser.__init__(self, dict_type=_OMD) - self._dict: Callable[..., _OMD] # type: ignore[assignment] # mypy/typeshed bug + self._dict: Callable[..., _OMD] # type: ignore # mypy/typeshed bug? self._defaults: _OMD - self._sections: _OMD # type: ignore[assignment] # mypy/typeshed bug + self._sections: _OMD # type: ignore # mypy/typeshed bug? # Used in python 3, needs to stay in sync with sections for underlying implementation to work if not hasattr(self, '_proxies'): -- cgit v1.2.1 From bef218246c9935f0c31b23a17d1a02ac3810301d Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 15:41:17 +0100 Subject: rmv 3.6 from setup.py --- git/cmd.py | 3 ++- git/util.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'git') diff --git a/git/cmd.py b/git/cmd.py index 78fa3c9d..18a2bec1 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -164,7 +164,8 @@ CREATE_NO_WINDOW = 0x08000000 ## CREATE_NEW_PROCESS_GROUP is needed to allow killing it afterwards, # see https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal -PROC_CREATIONFLAGS = (CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP if is_win else 0) +PROC_CREATIONFLAGS = (CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined] + if is_win else 0) # mypy error if not windows class Git(LazyMixin): diff --git a/git/util.py b/git/util.py index 63060530..c20be6eb 100644 --- a/git/util.py +++ b/git/util.py @@ -403,8 +403,8 @@ def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[ try: p = osp.expanduser(p) # type: ignore if expand_vars: - p = osp.expandvars(p) - return osp.normpath(osp.abspath(p)) + p = osp.expandvars(p) # type: ignore + return osp.normpath(osp.abspath(p)) # type: ignore except Exception: return None -- cgit v1.2.1 From 270c3d7d4bbe4c606049bfd8af53da1bc3df4ad4 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 15:48:17 +0100 Subject: rmv 3.6 README --- git/cmd.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'git') diff --git a/git/cmd.py b/git/cmd.py index 18a2bec1..b84c43df 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -1014,13 +1014,12 @@ class Git(LazyMixin): def __unpack_args(cls, arg_list: Sequence[str]) -> List[str]: outlist = [] - for arg in arg_list: - if isinstance(arg_list, (list, tuple)): + if isinstance(arg_list, (list, tuple)): + for arg in arg_list: outlist.extend(cls.__unpack_args(arg)) - # END recursion - else: - outlist.append(str(arg)) - # END for each arg + else: + outlist.append(str(arg_list)) + return outlist def __call__(self, **kwargs: Any) -> 'Git': -- cgit v1.2.1 From c3f3501f0fe00572d2692948ebb5ce25da8bb418 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 15:53:22 +0100 Subject: Add __future__.annotations to cmd.py --- git/repo/fun.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'git') diff --git a/git/repo/fun.py b/git/repo/fun.py index b1b330c4..36c8b797 100644 --- a/git/repo/fun.py +++ b/git/repo/fun.py @@ -1,6 +1,5 @@ """Package with general repository related functions""" -from git.refs.reference import Reference -from git.types import Commit_ish +from __future__ import annotations import os import stat from string import digits @@ -20,12 +19,13 @@ from git.cmd import Git # Typing ---------------------------------------------------------------------- from typing import Union, Optional, cast, TYPE_CHECKING - +from git.types import Commit_ish if TYPE_CHECKING: from git.types import PathLike from .base import Repo from git.db import GitCmdObjectDB + from git.refs.reference import Reference from git.objects import Commit, TagObject, Blob, Tree from git.refs.tag import Tag @@ -204,7 +204,7 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']: raise NotImplementedError("commit by message search ( regex )") # END handle search - obj: Union[Commit_ish, Reference, None] = None + obj: Union[Commit_ish, 'Reference', None] = None ref = None output_type = "commit" start = 0 @@ -224,7 +224,7 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']: ref = repo.head.ref else: if token == '@': - ref = cast(Reference, name_to_object(repo, rev[:start], return_ref=True)) + ref = cast('Reference', name_to_object(repo, rev[:start], return_ref=True)) else: obj = cast(Commit_ish, name_to_object(repo, rev[:start])) # END handle token @@ -251,13 +251,13 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']: pass # default elif output_type == 'tree': try: - obj = cast(Object, obj) + obj = cast(Commit_ish, obj) obj = to_commit(obj).tree except (AttributeError, ValueError): pass # error raised later # END exception handling elif output_type in ('', 'blob'): - obj = cast(TagObject, obj) + obj = cast('TagObject', obj) if obj and obj.type == 'tag': obj = deref_tag(obj) else: -- cgit v1.2.1 From 829142d4c30886db2a2622605092072e979afcc9 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 16:04:08 +0100 Subject: Add __future__.annotations to cmd.py2 --- git/repo/fun.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'git') diff --git a/git/repo/fun.py b/git/repo/fun.py index 36c8b797..1a83dd3d 100644 --- a/git/repo/fun.py +++ b/git/repo/fun.py @@ -233,7 +233,7 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']: assert obj is not None if ref is not None: - obj = cast(Commit, ref.commit) + obj = cast('Commit', ref.commit) # END handle ref # END initialize obj on first token @@ -347,8 +347,8 @@ def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']: # END end handle tag except (IndexError, AttributeError) as e: raise BadName( - "Invalid revision spec '%s' - not enough " - "parent commits to reach '%s%i'" % (rev, token, num)) from e + f"Invalid revision spec '{rev}' - not enough " + f"parent commits to reach '{token}{int(num)}'") from e # END exception handling # END parse loop -- cgit v1.2.1 From 13e0730b449e8ace2c7aa691d588febb4bed510c Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 16:16:11 +0100 Subject: Fix parse_date typing --- git/objects/util.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'git') diff --git a/git/objects/util.py b/git/objects/util.py index 9c9ce773..d7472b5d 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -181,11 +181,13 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: :raise ValueError: If the format could not be understood :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. """ - if isinstance(string_date, datetime) and string_date.tzinfo: - offset = -int(string_date.utcoffset().total_seconds()) # type: ignore[union-attr] + if isinstance(string_date, datetime): + if string_date.tzinfo: + utcoffset = string_date.utcoffset() + offset = -int(utcoffset.total_seconds()) if utcoffset else 0 return int(string_date.astimezone(utc).timestamp()), offset else: - assert isinstance(string_date, str) # for mypy + assert isinstance(string_date, str), f"string_date={string_date}, type={type(string_date)}" # for mypy # git time try: -- cgit v1.2.1 From 730f11936364314b76738ed06bdd9222dc9de2ac Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 16:30:32 +0100 Subject: Fix parse_date typing 2 --- git/objects/util.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'git') diff --git a/git/objects/util.py b/git/objects/util.py index d7472b5d..e3e7d3ba 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -182,9 +182,11 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. """ if isinstance(string_date, datetime): - if string_date.tzinfo: + if string_date.tzinfo and string_date.utcoffset(): utcoffset = string_date.utcoffset() offset = -int(utcoffset.total_seconds()) if utcoffset else 0 + else: + offset = 0 return int(string_date.astimezone(utc).timestamp()), offset else: assert isinstance(string_date, str), f"string_date={string_date}, type={type(string_date)}" # for mypy -- cgit v1.2.1 From 2fe13cad9c889b8628119ab5ee139038b0c164fd Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 16:54:31 +0100 Subject: Fix parse_date typing 3 --- git/objects/util.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'git') diff --git a/git/objects/util.py b/git/objects/util.py index e3e7d3ba..6e3f688e 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -181,13 +181,11 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: :raise ValueError: If the format could not be understood :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. """ - if isinstance(string_date, datetime): - if string_date.tzinfo and string_date.utcoffset(): - utcoffset = string_date.utcoffset() - offset = -int(utcoffset.total_seconds()) if utcoffset else 0 - else: - offset = 0 + if isinstance(string_date, datetime) and string_date.tzinfo: + utcoffset = string_date.utcoffset() + offset = -int(utcoffset.total_seconds()) if utcoffset else 0 return int(string_date.astimezone(utc).timestamp()), offset + else: assert isinstance(string_date, str), f"string_date={string_date}, type={type(string_date)}" # for mypy -- cgit v1.2.1 From 024b69669811dc3aa5a018eb3df5535202edf5f9 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 17:09:03 +0100 Subject: Fix parse_date typing 4 --- git/objects/util.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) (limited to 'git') diff --git a/git/objects/util.py b/git/objects/util.py index 6e3f688e..1ca6f050 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -181,16 +181,15 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: :raise ValueError: If the format could not be understood :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. """ - if isinstance(string_date, datetime) and string_date.tzinfo: - utcoffset = string_date.utcoffset() - offset = -int(utcoffset.total_seconds()) if utcoffset else 0 - return int(string_date.astimezone(utc).timestamp()), offset + if isinstance(string_date, datetime): + if string_date.tzinfo: + utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None + offset = -int(utcoffset.total_seconds()) + return int(string_date.astimezone(utc).timestamp()), offset + else: + raise ValueError(f"Unsupported date format or type: {string_date}" % string_date) else: - assert isinstance(string_date, str), f"string_date={string_date}, type={type(string_date)}" # for mypy - - # git time - try: if string_date.count(' ') == 1 and string_date.rfind(':') == -1: timestamp, offset_str = string_date.split() if timestamp.startswith('@'): @@ -244,13 +243,9 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: continue # END exception handling # END for each fmt - # still here ? fail raise ValueError("no format matched") # END handle format - except Exception as e: - raise ValueError("Unsupported date format: %s" % string_date) from e - # END handle exceptions # precompiled regex -- cgit v1.2.1 From e2f8367b53b14acb8e1a86f33334f92a5a306878 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 17:15:06 +0100 Subject: Fix parse_date typing 5 --- git/objects/util.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'git') diff --git a/git/objects/util.py b/git/objects/util.py index 1ca6f050..d3d8d38b 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -187,9 +187,10 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: offset = -int(utcoffset.total_seconds()) return int(string_date.astimezone(utc).timestamp()), offset else: - raise ValueError(f"Unsupported date format or type: {string_date}" % string_date) + raise ValueError(f"string_date datetime object without tzinfo, {string_date}") - else: + # git time + try: if string_date.count(' ') == 1 and string_date.rfind(':') == -1: timestamp, offset_str = string_date.split() if timestamp.startswith('@'): @@ -243,9 +244,13 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: continue # END exception handling # END for each fmt + # still here ? fail raise ValueError("no format matched") # END handle format + except Exception as e: + raise ValueError(f"Unsupported date format or type: {string_date}" % string_date) from e + # END handle exceptions # precompiled regex -- cgit v1.2.1 From d30bc0722ee32c501c021bde511640ff6620a203 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 17:19:33 +0100 Subject: Fix parse_date typing 6 --- git/objects/util.py | 2 +- .../util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp | 566 +++++++++++++++++++++ 2 files changed, 567 insertions(+), 1 deletion(-) create mode 100644 git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp (limited to 'git') diff --git a/git/objects/util.py b/git/objects/util.py index d3d8d38b..16d4c0ac 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -249,7 +249,7 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: raise ValueError("no format matched") # END handle format except Exception as e: - raise ValueError(f"Unsupported date format or type: {string_date}" % string_date) from e + raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e # END handle exceptions diff --git a/git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp b/git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp new file mode 100644 index 00000000..16d4c0ac --- /dev/null +++ b/git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp @@ -0,0 +1,566 @@ +# util.py +# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors +# +# This module is part of GitPython and is released under +# the BSD License: http://www.opensource.org/licenses/bsd-license.php +"""Module for general utility functions""" + +from abc import abstractmethod +import warnings +from git.util import ( + IterableList, + IterableObj, + Actor +) + +import re +from collections import deque + +from string import digits +import time +import calendar +from datetime import datetime, timedelta, tzinfo + +# typing ------------------------------------------------------------ +from typing import (Any, Callable, Deque, Iterator, NamedTuple, overload, Sequence, + TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast) + +from git.types import Has_id_attribute, Literal, Protocol, runtime_checkable + +if TYPE_CHECKING: + from io import BytesIO, StringIO + from .commit import Commit + from .blob import Blob + from .tag import TagObject + from .tree import Tree, TraversedTreeTup + from subprocess import Popen + from .submodule.base import Submodule + + +class TraverseNT(NamedTuple): + depth: int + item: Union['Traversable', 'Blob'] + src: Union['Traversable', None] + + +T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse() + +TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule + 'TraversedTreeTup'] # for tree.traverse() + +# -------------------------------------------------------------------- + +__all__ = ('get_object_type_by_name', 'parse_date', 'parse_actor_and_date', + 'ProcessStreamAdapter', 'Traversable', 'altz_to_utctz_str', 'utctz_to_altz', + 'verify_utctz', 'Actor', 'tzoffset', 'utc') + +ZERO = timedelta(0) + +#{ Functions + + +def mode_str_to_int(modestr: Union[bytes, str]) -> int: + """ + :param modestr: string like 755 or 644 or 100644 - only the last 6 chars will be used + :return: + String identifying a mode compatible to the mode methods ids of the + stat module regarding the rwx permissions for user, group and other, + special flags and file system flags, i.e. whether it is a symlink + for example.""" + mode = 0 + for iteration, char in enumerate(reversed(modestr[-6:])): + char = cast(Union[str, int], char) + mode += int(char) << iteration * 3 + # END for each char + return mode + + +def get_object_type_by_name(object_type_name: bytes + ) -> Union[Type['Commit'], Type['TagObject'], Type['Tree'], Type['Blob']]: + """ + :return: type suitable to handle the given object type name. + Use the type to create new instances. + + :param object_type_name: Member of TYPES + + :raise ValueError: In case object_type_name is unknown""" + if object_type_name == b"commit": + from . import commit + return commit.Commit + elif object_type_name == b"tag": + from . import tag + return tag.TagObject + elif object_type_name == b"blob": + from . import blob + return blob.Blob + elif object_type_name == b"tree": + from . import tree + return tree.Tree + else: + raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode()) + + +def utctz_to_altz(utctz: str) -> int: + """we convert utctz to the timezone in seconds, it is the format time.altzone + returns. Git stores it as UTC timezone which has the opposite sign as well, + which explains the -1 * ( that was made explicit here ) + :param utctz: git utc timezone string, i.e. +0200""" + return -1 * int(float(utctz) / 100 * 3600) + + +def altz_to_utctz_str(altz: float) -> str: + """As above, but inverses the operation, returning a string that can be used + in commit objects""" + utci = -1 * int((float(altz) / 3600) * 100) + utcs = str(abs(utci)) + utcs = "0" * (4 - len(utcs)) + utcs + prefix = (utci < 0 and '-') or '+' + return prefix + utcs + + +def verify_utctz(offset: str) -> str: + """:raise ValueError: if offset is incorrect + :return: offset""" + fmt_exc = ValueError("Invalid timezone offset format: %s" % offset) + if len(offset) != 5: + raise fmt_exc + if offset[0] not in "+-": + raise fmt_exc + if offset[1] not in digits or\ + offset[2] not in digits or\ + offset[3] not in digits or\ + offset[4] not in digits: + raise fmt_exc + # END for each char + return offset + + +class tzoffset(tzinfo): + + def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None: + self._offset = timedelta(seconds=-secs_west_of_utc) + self._name = name or 'fixed' + + def __reduce__(self) -> Tuple[Type['tzoffset'], Tuple[float, str]]: + return tzoffset, (-self._offset.total_seconds(), self._name) + + def utcoffset(self, dt: Union[datetime, None]) -> timedelta: + return self._offset + + def tzname(self, dt: Union[datetime, None]) -> str: + return self._name + + def dst(self, dt: Union[datetime, None]) -> timedelta: + return ZERO + + +utc = tzoffset(0, 'UTC') + + +def from_timestamp(timestamp: float, tz_offset: float) -> datetime: + """Converts a timestamp + tz_offset into an aware datetime instance.""" + utc_dt = datetime.fromtimestamp(timestamp, utc) + try: + local_dt = utc_dt.astimezone(tzoffset(tz_offset)) + return local_dt + except ValueError: + return utc_dt + + +def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: + """ + Parse the given date as one of the following + + * aware datetime instance + * Git internal format: timestamp offset + * RFC 2822: Thu, 07 Apr 2005 22:13:13 +0200. + * ISO 8601 2005-04-07T22:13:13 + The T can be a space as well + + :return: Tuple(int(timestamp_UTC), int(offset)), both in seconds since epoch + :raise ValueError: If the format could not be understood + :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. + """ + if isinstance(string_date, datetime): + if string_date.tzinfo: + utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None + offset = -int(utcoffset.total_seconds()) + return int(string_date.astimezone(utc).timestamp()), offset + else: + raise ValueError(f"string_date datetime object without tzinfo, {string_date}") + + # git time + try: + if string_date.count(' ') == 1 and string_date.rfind(':') == -1: + timestamp, offset_str = string_date.split() + if timestamp.startswith('@'): + timestamp = timestamp[1:] + timestamp_int = int(timestamp) + return timestamp_int, utctz_to_altz(verify_utctz(offset_str)) + else: + offset_str = "+0000" # local time by default + if string_date[-5] in '-+': + offset_str = verify_utctz(string_date[-5:]) + string_date = string_date[:-6] # skip space as well + # END split timezone info + offset = utctz_to_altz(offset_str) + + # now figure out the date and time portion - split time + date_formats = [] + splitter = -1 + if ',' in string_date: + date_formats.append("%a, %d %b %Y") + splitter = string_date.rfind(' ') + else: + # iso plus additional + date_formats.append("%Y-%m-%d") + date_formats.append("%Y.%m.%d") + date_formats.append("%m/%d/%Y") + date_formats.append("%d.%m.%Y") + + splitter = string_date.rfind('T') + if splitter == -1: + splitter = string_date.rfind(' ') + # END handle 'T' and ' ' + # END handle rfc or iso + + assert splitter > -1 + + # split date and time + time_part = string_date[splitter + 1:] # skip space + date_part = string_date[:splitter] + + # parse time + tstruct = time.strptime(time_part, "%H:%M:%S") + + for fmt in date_formats: + try: + dtstruct = time.strptime(date_part, fmt) + utctime = calendar.timegm((dtstruct.tm_year, dtstruct.tm_mon, dtstruct.tm_mday, + tstruct.tm_hour, tstruct.tm_min, tstruct.tm_sec, + dtstruct.tm_wday, dtstruct.tm_yday, tstruct.tm_isdst)) + return int(utctime), offset + except ValueError: + continue + # END exception handling + # END for each fmt + + # still here ? fail + raise ValueError("no format matched") + # END handle format + except Exception as e: + raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e + # END handle exceptions + + +# precompiled regex +_re_actor_epoch = re.compile(r'^.+? (.*) (\d+) ([+-]\d+).*$') +_re_only_actor = re.compile(r'^.+? (.*)$') + + +def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: + """Parse out the actor (author or committer) info from a line like:: + + author Tom Preston-Werner 1191999972 -0700 + + :return: [Actor, int_seconds_since_epoch, int_timezone_offset]""" + actor, epoch, offset = '', '0', '0' + m = _re_actor_epoch.search(line) + if m: + actor, epoch, offset = m.groups() + else: + m = _re_only_actor.search(line) + actor = m.group(1) if m else line or '' + return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset)) + +#} END functions + + +#{ Classes + +class ProcessStreamAdapter(object): + + """Class wireing all calls to the contained Process instance. + + Use this type to hide the underlying process to provide access only to a specified + stream. The process is usually wrapped into an AutoInterrupt class to kill + it if the instance goes out of scope.""" + __slots__ = ("_proc", "_stream") + + def __init__(self, process: 'Popen', stream_name: str) -> None: + self._proc = process + self._stream: StringIO = getattr(process, stream_name) # guessed type + + def __getattr__(self, attr: str) -> Any: + return getattr(self._stream, attr) + + +@runtime_checkable +class Traversable(Protocol): + + """Simple interface to perform depth-first or breadth-first traversals + into one direction. + Subclasses only need to implement one function. + Instances of the Subclass must be hashable + + Defined subclasses = [Commit, Tree, SubModule] + """ + __slots__ = () + + @classmethod + @abstractmethod + def _get_intermediate_items(cls, item: Any) -> Sequence['Traversable']: + """ + Returns: + Tuple of items connected to the given item. + Must be implemented in subclass + + class Commit:: (cls, Commit) -> Tuple[Commit, ...] + class Submodule:: (cls, Submodule) -> Iterablelist[Submodule] + class Tree:: (cls, Tree) -> Tuple[Tree, ...] + """ + raise NotImplementedError("To be implemented in subclass") + + @abstractmethod + def list_traverse(self, *args: Any, **kwargs: Any) -> Any: + """ """ + warnings.warn("list_traverse() method should only be called from subclasses." + "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" + "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", + DeprecationWarning, + stacklevel=2) + return self._list_traverse(*args, **kwargs) + + def _list_traverse(self, as_edge: bool = False, *args: Any, **kwargs: Any + ) -> IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']]: + """ + :return: IterableList with the results of the traversal as produced by + traverse() + Commit -> IterableList['Commit'] + Submodule -> IterableList['Submodule'] + Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']] + """ + # Commit and Submodule have id.__attribute__ as IterableObj + # Tree has id.__attribute__ inherited from IndexObject + if isinstance(self, Has_id_attribute): + id = self._id_attribute_ + else: + id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ + # could add _id_attribute_ to Traversable, or make all Traversable also Iterable? + + if not as_edge: + out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id) + out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) + return out + # overloads in subclasses (mypy does't allow typing self: subclass) + # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]] + else: + # Raise deprecationwarning, doesn't make sense to use this + out_list: IterableList = IterableList(self.traverse(*args, **kwargs)) + return out_list + + @ abstractmethod + def traverse(self, *args: Any, **kwargs: Any) -> Any: + """ """ + warnings.warn("traverse() method should only be called from subclasses." + "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" + "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", + DeprecationWarning, + stacklevel=2) + return self._traverse(*args, **kwargs) + + def _traverse(self, + predicate: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: True, + prune: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: False, + depth: int = -1, branch_first: bool = True, visit_once: bool = True, + ignore_self: int = 1, as_edge: bool = False + ) -> Union[Iterator[Union['Traversable', 'Blob']], + Iterator[TraversedTup]]: + """:return: iterator yielding of items found when traversing self + :param predicate: f(i,d) returns False if item i at depth d should not be included in the result + + :param prune: + f(i,d) return True if the search should stop at item i at depth d. + Item i will not be returned. + + :param depth: + define at which level the iteration should not go deeper + if -1, there is no limit + if 0, you would effectively only get self, the root of the iteration + i.e. if 1, you would only get the first level of predecessors/successors + + :param branch_first: + if True, items will be returned branch first, otherwise depth first + + :param visit_once: + if True, items will only be returned once, although they might be encountered + several times. Loops are prevented that way. + + :param ignore_self: + if True, self will be ignored and automatically pruned from + the result. Otherwise it will be the first item to be returned. + If as_edge is True, the source of the first edge is None + + :param as_edge: + if True, return a pair of items, first being the source, second the + destination, i.e. tuple(src, dest) with the edge spanning from + source to destination""" + + """ + Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]] + Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]] + Tree -> Iterator[Union[Blob, Tree, Submodule, + Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]] + + ignore_self=True is_edge=True -> Iterator[item] + ignore_self=True is_edge=False --> Iterator[item] + ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]] + ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]""" + + visited = set() + stack: Deque[TraverseNT] = deque() + stack.append(TraverseNT(0, self, None)) # self is always depth level 0 + + def addToStack(stack: Deque[TraverseNT], + src_item: 'Traversable', + branch_first: bool, + depth: int) -> None: + lst = self._get_intermediate_items(item) + if not lst: # empty list + return None + if branch_first: + stack.extendleft(TraverseNT(depth, i, src_item) for i in lst) + else: + reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1)) + stack.extend(reviter) + # END addToStack local method + + while stack: + d, item, src = stack.pop() # depth of item, item, item_source + + if visit_once and item in visited: + continue + + if visit_once: + visited.add(item) + + rval: Union[TraversedTup, 'Traversable', 'Blob'] + if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item) + rval = (src, item) + else: + rval = item + + if prune(rval, d): + continue + + skipStartItem = ignore_self and (item is self) + if not skipStartItem and predicate(rval, d): + yield rval + + # only continue to next level if this is appropriate ! + nd = d + 1 + if depth > -1 and nd > depth: + continue + + addToStack(stack, item, branch_first, nd) + # END for each item on work stack + + +@ runtime_checkable +class Serializable(Protocol): + + """Defines methods to serialize and deserialize objects from and into a data stream""" + __slots__ = () + + # @abstractmethod + def _serialize(self, stream: 'BytesIO') -> 'Serializable': + """Serialize the data of this object into the given data stream + :note: a serialized object would ``_deserialize`` into the same object + :param stream: a file-like object + :return: self""" + raise NotImplementedError("To be implemented in subclass") + + # @abstractmethod + def _deserialize(self, stream: 'BytesIO') -> 'Serializable': + """Deserialize all information regarding this object from the stream + :param stream: a file-like object + :return: self""" + raise NotImplementedError("To be implemented in subclass") + + +class TraversableIterableObj(IterableObj, Traversable): + __slots__ = () + + TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj] + + def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]: + return super(TraversableIterableObj, self)._list_traverse(* args, **kwargs) + + @ overload # type: ignore + def traverse(self: T_TIobj + ) -> Iterator[T_TIobj]: + ... + + @ overload + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[True], + as_edge: Literal[False], + ) -> Iterator[T_TIobj]: + ... + + @ overload + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[False], + as_edge: Literal[True], + ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: + ... + + @ overload + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], + prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[True], + as_edge: Literal[True], + ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: + ... + + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], + bool] = lambda i, d: True, + prune: Callable[[Union[T_TIobj, TIobj_tuple], int], + bool] = lambda i, d: False, + depth: int = -1, branch_first: bool = True, visit_once: bool = True, + ignore_self: int = 1, as_edge: bool = False + ) -> Union[Iterator[T_TIobj], + Iterator[Tuple[T_TIobj, T_TIobj]], + Iterator[TIobj_tuple]]: + """For documentation, see util.Traversable._traverse()""" + + """ + # To typecheck instead of using cast. + import itertools + from git.types import TypeGuard + def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]: + for x in inp[1]: + if not isinstance(x, tuple) and len(x) != 2: + if all(isinstance(inner, Commit) for inner in x): + continue + return True + + ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge) + ret_tup = itertools.tee(ret, 2) + assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}" + return ret_tup[0] + """ + return cast(Union[Iterator[T_TIobj], + Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], + super(TraversableIterableObj, self)._traverse( + predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore + )) -- cgit v1.2.1 From 6470ad4a413fb7fbd9f2d3b9da1720c13ffc92bb Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 17:24:18 +0100 Subject: Fix parse_date typing 7 --- git/objects/util.py | 4 +- .../util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp | 566 --------------------- 2 files changed, 3 insertions(+), 567 deletions(-) delete mode 100644 git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp (limited to 'git') diff --git a/git/objects/util.py b/git/objects/util.py index 16d4c0ac..0b843301 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -187,7 +187,9 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: offset = -int(utcoffset.total_seconds()) return int(string_date.astimezone(utc).timestamp()), offset else: - raise ValueError(f"string_date datetime object without tzinfo, {string_date}") + # should just return timestamp, 0? + return int(string_date.astimezone(utc).timestamp()), 0 + # raise ValueError(f"string_date datetime object without tzinfo, {string_date}") # git time try: diff --git a/git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp b/git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp deleted file mode 100644 index 16d4c0ac..00000000 --- a/git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp +++ /dev/null @@ -1,566 +0,0 @@ -# util.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php -"""Module for general utility functions""" - -from abc import abstractmethod -import warnings -from git.util import ( - IterableList, - IterableObj, - Actor -) - -import re -from collections import deque - -from string import digits -import time -import calendar -from datetime import datetime, timedelta, tzinfo - -# typing ------------------------------------------------------------ -from typing import (Any, Callable, Deque, Iterator, NamedTuple, overload, Sequence, - TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast) - -from git.types import Has_id_attribute, Literal, Protocol, runtime_checkable - -if TYPE_CHECKING: - from io import BytesIO, StringIO - from .commit import Commit - from .blob import Blob - from .tag import TagObject - from .tree import Tree, TraversedTreeTup - from subprocess import Popen - from .submodule.base import Submodule - - -class TraverseNT(NamedTuple): - depth: int - item: Union['Traversable', 'Blob'] - src: Union['Traversable', None] - - -T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse() - -TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule - 'TraversedTreeTup'] # for tree.traverse() - -# -------------------------------------------------------------------- - -__all__ = ('get_object_type_by_name', 'parse_date', 'parse_actor_and_date', - 'ProcessStreamAdapter', 'Traversable', 'altz_to_utctz_str', 'utctz_to_altz', - 'verify_utctz', 'Actor', 'tzoffset', 'utc') - -ZERO = timedelta(0) - -#{ Functions - - -def mode_str_to_int(modestr: Union[bytes, str]) -> int: - """ - :param modestr: string like 755 or 644 or 100644 - only the last 6 chars will be used - :return: - String identifying a mode compatible to the mode methods ids of the - stat module regarding the rwx permissions for user, group and other, - special flags and file system flags, i.e. whether it is a symlink - for example.""" - mode = 0 - for iteration, char in enumerate(reversed(modestr[-6:])): - char = cast(Union[str, int], char) - mode += int(char) << iteration * 3 - # END for each char - return mode - - -def get_object_type_by_name(object_type_name: bytes - ) -> Union[Type['Commit'], Type['TagObject'], Type['Tree'], Type['Blob']]: - """ - :return: type suitable to handle the given object type name. - Use the type to create new instances. - - :param object_type_name: Member of TYPES - - :raise ValueError: In case object_type_name is unknown""" - if object_type_name == b"commit": - from . import commit - return commit.Commit - elif object_type_name == b"tag": - from . import tag - return tag.TagObject - elif object_type_name == b"blob": - from . import blob - return blob.Blob - elif object_type_name == b"tree": - from . import tree - return tree.Tree - else: - raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode()) - - -def utctz_to_altz(utctz: str) -> int: - """we convert utctz to the timezone in seconds, it is the format time.altzone - returns. Git stores it as UTC timezone which has the opposite sign as well, - which explains the -1 * ( that was made explicit here ) - :param utctz: git utc timezone string, i.e. +0200""" - return -1 * int(float(utctz) / 100 * 3600) - - -def altz_to_utctz_str(altz: float) -> str: - """As above, but inverses the operation, returning a string that can be used - in commit objects""" - utci = -1 * int((float(altz) / 3600) * 100) - utcs = str(abs(utci)) - utcs = "0" * (4 - len(utcs)) + utcs - prefix = (utci < 0 and '-') or '+' - return prefix + utcs - - -def verify_utctz(offset: str) -> str: - """:raise ValueError: if offset is incorrect - :return: offset""" - fmt_exc = ValueError("Invalid timezone offset format: %s" % offset) - if len(offset) != 5: - raise fmt_exc - if offset[0] not in "+-": - raise fmt_exc - if offset[1] not in digits or\ - offset[2] not in digits or\ - offset[3] not in digits or\ - offset[4] not in digits: - raise fmt_exc - # END for each char - return offset - - -class tzoffset(tzinfo): - - def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None: - self._offset = timedelta(seconds=-secs_west_of_utc) - self._name = name or 'fixed' - - def __reduce__(self) -> Tuple[Type['tzoffset'], Tuple[float, str]]: - return tzoffset, (-self._offset.total_seconds(), self._name) - - def utcoffset(self, dt: Union[datetime, None]) -> timedelta: - return self._offset - - def tzname(self, dt: Union[datetime, None]) -> str: - return self._name - - def dst(self, dt: Union[datetime, None]) -> timedelta: - return ZERO - - -utc = tzoffset(0, 'UTC') - - -def from_timestamp(timestamp: float, tz_offset: float) -> datetime: - """Converts a timestamp + tz_offset into an aware datetime instance.""" - utc_dt = datetime.fromtimestamp(timestamp, utc) - try: - local_dt = utc_dt.astimezone(tzoffset(tz_offset)) - return local_dt - except ValueError: - return utc_dt - - -def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: - """ - Parse the given date as one of the following - - * aware datetime instance - * Git internal format: timestamp offset - * RFC 2822: Thu, 07 Apr 2005 22:13:13 +0200. - * ISO 8601 2005-04-07T22:13:13 - The T can be a space as well - - :return: Tuple(int(timestamp_UTC), int(offset)), both in seconds since epoch - :raise ValueError: If the format could not be understood - :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. - """ - if isinstance(string_date, datetime): - if string_date.tzinfo: - utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None - offset = -int(utcoffset.total_seconds()) - return int(string_date.astimezone(utc).timestamp()), offset - else: - raise ValueError(f"string_date datetime object without tzinfo, {string_date}") - - # git time - try: - if string_date.count(' ') == 1 and string_date.rfind(':') == -1: - timestamp, offset_str = string_date.split() - if timestamp.startswith('@'): - timestamp = timestamp[1:] - timestamp_int = int(timestamp) - return timestamp_int, utctz_to_altz(verify_utctz(offset_str)) - else: - offset_str = "+0000" # local time by default - if string_date[-5] in '-+': - offset_str = verify_utctz(string_date[-5:]) - string_date = string_date[:-6] # skip space as well - # END split timezone info - offset = utctz_to_altz(offset_str) - - # now figure out the date and time portion - split time - date_formats = [] - splitter = -1 - if ',' in string_date: - date_formats.append("%a, %d %b %Y") - splitter = string_date.rfind(' ') - else: - # iso plus additional - date_formats.append("%Y-%m-%d") - date_formats.append("%Y.%m.%d") - date_formats.append("%m/%d/%Y") - date_formats.append("%d.%m.%Y") - - splitter = string_date.rfind('T') - if splitter == -1: - splitter = string_date.rfind(' ') - # END handle 'T' and ' ' - # END handle rfc or iso - - assert splitter > -1 - - # split date and time - time_part = string_date[splitter + 1:] # skip space - date_part = string_date[:splitter] - - # parse time - tstruct = time.strptime(time_part, "%H:%M:%S") - - for fmt in date_formats: - try: - dtstruct = time.strptime(date_part, fmt) - utctime = calendar.timegm((dtstruct.tm_year, dtstruct.tm_mon, dtstruct.tm_mday, - tstruct.tm_hour, tstruct.tm_min, tstruct.tm_sec, - dtstruct.tm_wday, dtstruct.tm_yday, tstruct.tm_isdst)) - return int(utctime), offset - except ValueError: - continue - # END exception handling - # END for each fmt - - # still here ? fail - raise ValueError("no format matched") - # END handle format - except Exception as e: - raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e - # END handle exceptions - - -# precompiled regex -_re_actor_epoch = re.compile(r'^.+? (.*) (\d+) ([+-]\d+).*$') -_re_only_actor = re.compile(r'^.+? (.*)$') - - -def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: - """Parse out the actor (author or committer) info from a line like:: - - author Tom Preston-Werner 1191999972 -0700 - - :return: [Actor, int_seconds_since_epoch, int_timezone_offset]""" - actor, epoch, offset = '', '0', '0' - m = _re_actor_epoch.search(line) - if m: - actor, epoch, offset = m.groups() - else: - m = _re_only_actor.search(line) - actor = m.group(1) if m else line or '' - return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset)) - -#} END functions - - -#{ Classes - -class ProcessStreamAdapter(object): - - """Class wireing all calls to the contained Process instance. - - Use this type to hide the underlying process to provide access only to a specified - stream. The process is usually wrapped into an AutoInterrupt class to kill - it if the instance goes out of scope.""" - __slots__ = ("_proc", "_stream") - - def __init__(self, process: 'Popen', stream_name: str) -> None: - self._proc = process - self._stream: StringIO = getattr(process, stream_name) # guessed type - - def __getattr__(self, attr: str) -> Any: - return getattr(self._stream, attr) - - -@runtime_checkable -class Traversable(Protocol): - - """Simple interface to perform depth-first or breadth-first traversals - into one direction. - Subclasses only need to implement one function. - Instances of the Subclass must be hashable - - Defined subclasses = [Commit, Tree, SubModule] - """ - __slots__ = () - - @classmethod - @abstractmethod - def _get_intermediate_items(cls, item: Any) -> Sequence['Traversable']: - """ - Returns: - Tuple of items connected to the given item. - Must be implemented in subclass - - class Commit:: (cls, Commit) -> Tuple[Commit, ...] - class Submodule:: (cls, Submodule) -> Iterablelist[Submodule] - class Tree:: (cls, Tree) -> Tuple[Tree, ...] - """ - raise NotImplementedError("To be implemented in subclass") - - @abstractmethod - def list_traverse(self, *args: Any, **kwargs: Any) -> Any: - """ """ - warnings.warn("list_traverse() method should only be called from subclasses." - "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" - "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", - DeprecationWarning, - stacklevel=2) - return self._list_traverse(*args, **kwargs) - - def _list_traverse(self, as_edge: bool = False, *args: Any, **kwargs: Any - ) -> IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']]: - """ - :return: IterableList with the results of the traversal as produced by - traverse() - Commit -> IterableList['Commit'] - Submodule -> IterableList['Submodule'] - Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']] - """ - # Commit and Submodule have id.__attribute__ as IterableObj - # Tree has id.__attribute__ inherited from IndexObject - if isinstance(self, Has_id_attribute): - id = self._id_attribute_ - else: - id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ - # could add _id_attribute_ to Traversable, or make all Traversable also Iterable? - - if not as_edge: - out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id) - out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) - return out - # overloads in subclasses (mypy does't allow typing self: subclass) - # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]] - else: - # Raise deprecationwarning, doesn't make sense to use this - out_list: IterableList = IterableList(self.traverse(*args, **kwargs)) - return out_list - - @ abstractmethod - def traverse(self, *args: Any, **kwargs: Any) -> Any: - """ """ - warnings.warn("traverse() method should only be called from subclasses." - "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" - "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", - DeprecationWarning, - stacklevel=2) - return self._traverse(*args, **kwargs) - - def _traverse(self, - predicate: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: True, - prune: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: False, - depth: int = -1, branch_first: bool = True, visit_once: bool = True, - ignore_self: int = 1, as_edge: bool = False - ) -> Union[Iterator[Union['Traversable', 'Blob']], - Iterator[TraversedTup]]: - """:return: iterator yielding of items found when traversing self - :param predicate: f(i,d) returns False if item i at depth d should not be included in the result - - :param prune: - f(i,d) return True if the search should stop at item i at depth d. - Item i will not be returned. - - :param depth: - define at which level the iteration should not go deeper - if -1, there is no limit - if 0, you would effectively only get self, the root of the iteration - i.e. if 1, you would only get the first level of predecessors/successors - - :param branch_first: - if True, items will be returned branch first, otherwise depth first - - :param visit_once: - if True, items will only be returned once, although they might be encountered - several times. Loops are prevented that way. - - :param ignore_self: - if True, self will be ignored and automatically pruned from - the result. Otherwise it will be the first item to be returned. - If as_edge is True, the source of the first edge is None - - :param as_edge: - if True, return a pair of items, first being the source, second the - destination, i.e. tuple(src, dest) with the edge spanning from - source to destination""" - - """ - Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]] - Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]] - Tree -> Iterator[Union[Blob, Tree, Submodule, - Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]] - - ignore_self=True is_edge=True -> Iterator[item] - ignore_self=True is_edge=False --> Iterator[item] - ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]] - ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]""" - - visited = set() - stack: Deque[TraverseNT] = deque() - stack.append(TraverseNT(0, self, None)) # self is always depth level 0 - - def addToStack(stack: Deque[TraverseNT], - src_item: 'Traversable', - branch_first: bool, - depth: int) -> None: - lst = self._get_intermediate_items(item) - if not lst: # empty list - return None - if branch_first: - stack.extendleft(TraverseNT(depth, i, src_item) for i in lst) - else: - reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1)) - stack.extend(reviter) - # END addToStack local method - - while stack: - d, item, src = stack.pop() # depth of item, item, item_source - - if visit_once and item in visited: - continue - - if visit_once: - visited.add(item) - - rval: Union[TraversedTup, 'Traversable', 'Blob'] - if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item) - rval = (src, item) - else: - rval = item - - if prune(rval, d): - continue - - skipStartItem = ignore_self and (item is self) - if not skipStartItem and predicate(rval, d): - yield rval - - # only continue to next level if this is appropriate ! - nd = d + 1 - if depth > -1 and nd > depth: - continue - - addToStack(stack, item, branch_first, nd) - # END for each item on work stack - - -@ runtime_checkable -class Serializable(Protocol): - - """Defines methods to serialize and deserialize objects from and into a data stream""" - __slots__ = () - - # @abstractmethod - def _serialize(self, stream: 'BytesIO') -> 'Serializable': - """Serialize the data of this object into the given data stream - :note: a serialized object would ``_deserialize`` into the same object - :param stream: a file-like object - :return: self""" - raise NotImplementedError("To be implemented in subclass") - - # @abstractmethod - def _deserialize(self, stream: 'BytesIO') -> 'Serializable': - """Deserialize all information regarding this object from the stream - :param stream: a file-like object - :return: self""" - raise NotImplementedError("To be implemented in subclass") - - -class TraversableIterableObj(IterableObj, Traversable): - __slots__ = () - - TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj] - - def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]: - return super(TraversableIterableObj, self)._list_traverse(* args, **kwargs) - - @ overload # type: ignore - def traverse(self: T_TIobj - ) -> Iterator[T_TIobj]: - ... - - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[True], - as_edge: Literal[False], - ) -> Iterator[T_TIobj]: - ... - - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[False], - as_edge: Literal[True], - ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: - ... - - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], - prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[True], - as_edge: Literal[True], - ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: - ... - - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], - bool] = lambda i, d: True, - prune: Callable[[Union[T_TIobj, TIobj_tuple], int], - bool] = lambda i, d: False, - depth: int = -1, branch_first: bool = True, visit_once: bool = True, - ignore_self: int = 1, as_edge: bool = False - ) -> Union[Iterator[T_TIobj], - Iterator[Tuple[T_TIobj, T_TIobj]], - Iterator[TIobj_tuple]]: - """For documentation, see util.Traversable._traverse()""" - - """ - # To typecheck instead of using cast. - import itertools - from git.types import TypeGuard - def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]: - for x in inp[1]: - if not isinstance(x, tuple) and len(x) != 2: - if all(isinstance(inner, Commit) for inner in x): - continue - return True - - ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge) - ret_tup = itertools.tee(ret, 2) - assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}" - return ret_tup[0] - """ - return cast(Union[Iterator[T_TIobj], - Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], - super(TraversableIterableObj, self)._traverse( - predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore - )) -- cgit v1.2.1 From 481f672baab666d6e2f81e9288a5f3c42c884a8e Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 17:56:06 +0100 Subject: Add __future__.annotations to repo/base.py --- git/objects/util.py | 4 +--- git/refs/symbolic.py | 12 ++++++++---- git/repo/__init__.py | 2 +- git/repo/base.py | 9 +++++---- git/util.py | 2 +- 5 files changed, 16 insertions(+), 13 deletions(-) (limited to 'git') diff --git a/git/objects/util.py b/git/objects/util.py index 0b843301..16d4c0ac 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -187,9 +187,7 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: offset = -int(utcoffset.total_seconds()) return int(string_date.astimezone(utc).timestamp()), offset else: - # should just return timestamp, 0? - return int(string_date.astimezone(utc).timestamp()), 0 - # raise ValueError(f"string_date datetime object without tzinfo, {string_date}") + raise ValueError(f"string_date datetime object without tzinfo, {string_date}") # git time try: diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py index 1c56c043..238be839 100644 --- a/git/refs/symbolic.py +++ b/git/refs/symbolic.py @@ -72,12 +72,13 @@ class SymbolicReference(object): def __repr__(self) -> str: return '' % (self.__class__.__name__, self.path) - def __eq__(self, other: Any) -> bool: + def __eq__(self, other: object) -> bool: if hasattr(other, 'path'): + other = cast(SymbolicReference, other) return self.path == other.path return False - def __ne__(self, other: Any) -> bool: + def __ne__(self, other: object) -> bool: return not (self == other) def __hash__(self) -> int: @@ -364,8 +365,9 @@ class SymbolicReference(object): return self # aliased reference + reference: Union['Head', 'TagReference', 'RemoteReference', 'Reference'] reference = property(_get_reference, set_reference, doc="Returns the Reference we point to") # type: ignore - ref: Union['Head', 'TagReference', 'RemoteReference', 'Reference'] = reference # type: ignore + ref = reference def is_valid(self) -> bool: """ @@ -699,7 +701,9 @@ class SymbolicReference(object): instance = ref_type(repo, path) if instance.__class__ == SymbolicReference and instance.is_detached: raise ValueError("SymbolRef was detached, we drop it") - return instance + else: + assert isinstance(instance, Reference), "instance should be Reference or subtype" + return instance except ValueError: pass # END exception handling diff --git a/git/repo/__init__.py b/git/repo/__init__.py index 712df60d..23c18db8 100644 --- a/git/repo/__init__.py +++ b/git/repo/__init__.py @@ -1,3 +1,3 @@ """Initialize the Repo package""" # flake8: noqa -from .base import * +from .base import Repo as Repo diff --git a/git/repo/base.py b/git/repo/base.py index 2609bf55..6708872e 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -3,6 +3,7 @@ # # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php +from __future__ import annotations import logging import os import re @@ -384,13 +385,13 @@ class Repo(object): :return: created submodules""" return Submodule.add(self, *args, **kwargs) - def iter_submodules(self, *args: Any, **kwargs: Any) -> Iterator: + def iter_submodules(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]: """An iterator yielding Submodule instances, see Traversable interface for a description of args and kwargs :return: Iterator""" return RootModule(self).traverse(*args, **kwargs) - def submodule_update(self, *args: Any, **kwargs: Any) -> Iterator: + def submodule_update(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]: """Update the submodules, keeping the repository consistent as it will take the previous state into consideration. For more information, please see the documentation of RootModule.update""" @@ -774,7 +775,7 @@ class Repo(object): finalize_process(proc) return untracked_files - def ignored(self, *paths: PathLike) -> List[PathLike]: + def ignored(self, *paths: PathLike) -> List[str]: """Checks if paths are ignored via .gitignore Doing so using the "git check-ignore" method. @@ -782,7 +783,7 @@ class Repo(object): :return: subset of those paths which are ignored """ try: - proc = self.git.check_ignore(*paths) + proc: str = self.git.check_ignore(*paths) except GitCommandError: return [] return proc.replace("\\\\", "\\").replace('"', "").split("\n") diff --git a/git/util.py b/git/util.py index c20be6eb..4f82219e 100644 --- a/git/util.py +++ b/git/util.py @@ -70,7 +70,7 @@ from gitdb.util import ( # NOQA @IgnorePep8 # Most of these are unused here, but are for use by git-python modules so these # don't see gitdb all the time. Flake of course doesn't like it. __all__ = ["stream_copy", "join_path", "to_native_path_linux", - "join_path_native", "Stats", "IndexFileSHA1Writer", "Iterable", "IterableList", + "join_path_native", "Stats", "IndexFileSHA1Writer", "IterableObj", "IterableList", "BlockingLockFile", "LockFile", 'Actor', 'get_user_id', 'assure_directory_exists', 'RemoteProgress', 'CallableRemoteProgress', 'rmtree', 'unbare_repo', 'HIDE_WINDOWS_KNOWN_ERRORS'] -- cgit v1.2.1 From 9de7310f1a2bfcb90ca5c119321037d5ea97b24e Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 18:14:40 +0100 Subject: Minor type fixes --- git/refs/symbolic.py | 7 ++++--- git/remote.py | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) (limited to 'git') diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py index 238be839..0c0fa404 100644 --- a/git/refs/symbolic.py +++ b/git/refs/symbolic.py @@ -285,7 +285,7 @@ class SymbolicReference(object): commit = property(_get_commit, set_commit, doc="Query or set commits directly") # type: ignore object = property(_get_object, set_object, doc="Return the object our ref currently refers to") # type: ignore - def _get_reference(self) -> 'Reference': + def _get_reference(self) -> 'SymbolicReference': """:return: Reference Object we point to :raise TypeError: If this symbolic reference is detached, hence it doesn't point to a reference, but to a commit""" @@ -683,7 +683,7 @@ class SymbolicReference(object): return (r for r in cls._iter_items(repo, common_path) if r.__class__ == SymbolicReference or not r.is_detached) @classmethod - def from_path(cls, repo: 'Repo', path: PathLike) -> Union['Head', 'TagReference', 'Reference']: + def from_path(cls: Type[T_References], repo: 'Repo', path: PathLike) -> T_References: """ :param path: full .git-directory-relative path name to the Reference to instantiate :note: use to_full_path() if you only have a partial path of a known Reference Type @@ -698,12 +698,13 @@ class SymbolicReference(object): from . import HEAD, Head, RemoteReference, TagReference, Reference for ref_type in (HEAD, Head, RemoteReference, TagReference, Reference, SymbolicReference): try: + instance: T_References instance = ref_type(repo, path) if instance.__class__ == SymbolicReference and instance.is_detached: raise ValueError("SymbolRef was detached, we drop it") else: - assert isinstance(instance, Reference), "instance should be Reference or subtype" return instance + except ValueError: pass # END exception handling diff --git a/git/remote.py b/git/remote.py index c141519a..3888506f 100644 --- a/git/remote.py +++ b/git/remote.py @@ -632,7 +632,7 @@ class Remote(LazyMixin, IterableObj): as well. This is a fix for the issue described here: https://github.com/gitpython-developers/GitPython/issues/260 """ - out_refs: IterableList[RemoteReference] = IterableList(RemoteReference._id_attribute_, "%s/" % self.name) + out_refs: IterableList[Reference] = IterableList(RemoteReference._id_attribute_, "%s/" % self.name) for line in self.repo.git.remote("prune", "--dry-run", self).splitlines()[2:]: # expecting # * [would prune] origin/new_branch @@ -642,7 +642,7 @@ class Remote(LazyMixin, IterableObj): ref_name = line.replace(token, "") # sometimes, paths start with a full ref name, like refs/tags/foo, see #260 if ref_name.startswith(Reference._common_path_default + '/'): - out_refs.append(SymbolicReference.from_path(self.repo, ref_name)) + out_refs.append(Reference.from_path(self.repo, ref_name)) else: fqhn = "%s/%s" % (RemoteReference._common_path_default, ref_name) out_refs.append(RemoteReference(self.repo, fqhn)) -- cgit v1.2.1 From f34a39f206b5e2d408d4d47b0cc2012775d00917 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 18:25:20 +0100 Subject: Test new union syntax (Pep604) --- git/repo/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index 6708872e..b7eecbc3 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -795,7 +795,7 @@ class Repo(object): # reveal_type(self.head.reference) # => Reference return self.head.reference - def blame_incremental(self, rev: Union[str, HEAD], file: str, **kwargs: Any) -> Iterator['BlameEntry']: + def blame_incremental(self, rev: str | HEAD, file: str, **kwargs: Any) -> Iterator['BlameEntry']: """Iterator for blame information for the given file at the given revision. Unlike .blame(), this does not return the actual file's contents, only -- cgit v1.2.1 From 4dd06c3e43bf3ccaf592ffa30120501ab4e8b58c Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 18:33:26 +0100 Subject: Test trailing comma in args (>py3.6?) --- git/repo/base.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index b7eecbc3..b9399f62 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -38,7 +38,7 @@ import gitdb # typing ------------------------------------------------------ -from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish +from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish, assert_never from typing import (Any, BinaryIO, Callable, Dict, Iterator, List, Mapping, Optional, Sequence, TextIO, Tuple, Type, Union, @@ -481,10 +481,12 @@ class Repo(object): raise NotADirectoryError else: return osp.normpath(osp.join(repo_dir, "config")) + else: - raise ValueError("Invalid configuration level: %r" % config_level) + assert_never(config_level, # type:ignore[unreachable] + ValueError(f"Invalid configuration level: {config_level!r}")) - def config_reader(self, config_level: Optional[Lit_config_levels] = None + def config_reader(self, config_level: Optional[Lit_config_levels] = None, ) -> GitConfigParser: """ :return: -- cgit v1.2.1 From 94ae0c5839cf8de3b67c8dfd449ad9cef696aefb Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 22:15:18 +0100 Subject: Test Dataclass in repo.base.blame() --- git/repo/base.py | 103 +++++++++++++++++++++++++++++++++++-------------------- git/types.py | 2 +- 2 files changed, 67 insertions(+), 38 deletions(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index b9399f62..bedd6a08 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -7,6 +7,7 @@ from __future__ import annotations import logging import os import re +from dataclasses import dataclass import shlex import warnings from gitdb.db.loose import LooseObjectDB @@ -41,7 +42,7 @@ import gitdb from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish, assert_never from typing import (Any, BinaryIO, Callable, Dict, Iterator, List, Mapping, Optional, Sequence, - TextIO, Tuple, Type, Union, + TextIO, Tuple, Type, TypedDict, Union, NamedTuple, cast, TYPE_CHECKING) from git.types import ConfigLevels_Tup @@ -53,7 +54,6 @@ if TYPE_CHECKING: from git.objects.submodule.base import UpdateProgress from git.remote import RemoteProgress - # ----------------------------------------------------------- log = logging.getLogger(__name__) @@ -874,7 +874,7 @@ class Repo(object): range(orig_lineno, orig_lineno + num_lines)) def blame(self, rev: Union[str, HEAD], file: str, incremental: bool = False, **kwargs: Any - ) -> Union[List[List[Union[Optional['Commit'], List[str]]]], Optional[Iterator[BlameEntry]]]: + ) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None: """The blame information for the given file at the given revision. :param rev: revision specifier, see git-rev-parse for viable options. @@ -886,25 +886,52 @@ class Repo(object): if incremental: return self.blame_incremental(rev, file, **kwargs) - data = self.git.blame(rev, '--', file, p=True, stdout_as_string=False, **kwargs) - commits: Dict[str, TBD] = {} - blames: List[List[Union[Optional['Commit'], List[str]]]] = [] - - info: Dict[str, TBD] = {} # use Any until TypedDict available + data: bytes = self.git.blame(rev, '--', file, p=True, stdout_as_string=False, **kwargs) + commits: Dict[str, Commit] = {} + blames: List[List[Commit | List[str | bytes] | None]] = [] + + class InfoTC(TypedDict, total=False): + sha: str + id: str + filename: str + summary: str + author: str + author_email: str + author_date: int + committer: str + committer_email: str + committer_date: int + + @dataclass + class InfoDC(Dict[str, Union[str, int]]): + sha: str = '' + id: str = '' + filename: str = '' + summary: str = '' + author: str = '' + author_email: str = '' + author_date: int = 0 + committer: str = '' + committer_email: str = '' + committer_date: int = 0 + + # info: InfoTD = {} + info = InfoDC() keepends = True - for line in data.splitlines(keepends): + for line_bytes in data.splitlines(keepends): try: - line = line.rstrip().decode(defenc) + line_str = line_bytes.rstrip().decode(defenc) except UnicodeDecodeError: firstpart = '' + parts = [''] is_binary = True else: # As we don't have an idea when the binary data ends, as it could contain multiple newlines # in the process. So we rely on being able to decode to tell us what is is. # This can absolutely fail even on text files, but even if it does, we should be fine treating it # as binary instead - parts = self.re_whitespace.split(line, 1) + parts = self.re_whitespace.split(line_str, 1) firstpart = parts[0] is_binary = False # end handle decode of line @@ -916,10 +943,10 @@ class Repo(object): # another line of blame with the same data digits = parts[-1].split(" ") if len(digits) == 3: - info = {'id': firstpart} + info.id = firstpart blames.append([None, []]) - elif info['id'] != firstpart: - info = {'id': firstpart} + elif info.id != firstpart: + info.id = firstpart blames.append([commits.get(firstpart), []]) # END blame data initialization else: @@ -936,9 +963,9 @@ class Repo(object): # committer-tz -0700 - IGNORED BY US role = m.group(0) if firstpart.endswith('-mail'): - info["%s_email" % role] = parts[-1] + info[f"{role}_email"] = parts[-1] elif firstpart.endswith('-time'): - info["%s_date" % role] = int(parts[-1]) + info[f"{role}_date"] = int(parts[-1]) elif role == firstpart: info[role] = parts[-1] # END distinguish mail,time,name @@ -953,38 +980,40 @@ class Repo(object): info['summary'] = parts[-1] elif firstpart == '': if info: - sha = info['id'] + sha = info.id c = commits.get(sha) if c is None: c = Commit(self, hex_to_bin(sha), - author=Actor._from_string(info['author'] + ' ' + info['author_email']), - authored_date=info['author_date'], + author=Actor._from_string(info.author + ' ' + info.author_email), + authored_date=info.author_date, committer=Actor._from_string( - info['committer'] + ' ' + info['committer_email']), - committed_date=info['committer_date']) + info.committer + ' ' + info.committer_email), + committed_date=info.committer_date) commits[sha] = c + blames[-1][0] = c # END if commit objects needs initial creation - if not is_binary: - if line and line[0] == '\t': - line = line[1:] - else: - # NOTE: We are actually parsing lines out of binary data, which can lead to the - # binary being split up along the newline separator. We will append this to the blame - # we are currently looking at, even though it should be concatenated with the last line - # we have seen. - pass - # end handle line contents - blames[-1][0] = c if blames[-1][1] is not None: - blames[-1][1].append(line) - info = {'id': sha} + if not is_binary: + if line_str and line_str[0] == '\t': + line_str = line_str[1:] + + blames[-1][1].append(line_str) + else: + # NOTE: We are actually parsing lines out of binary data, which can lead to the + # binary being split up along the newline separator. We will append this to the + # blame we are currently looking at, even though it should be concatenated with + # the last line we have seen. + blames[-1][1].append(line_bytes) + # end handle line contents + + info.id = sha # END if we collected commit info # END distinguish filename,summary,rest # END distinguish author|committer vs filename,summary,rest # END distinguish hexsha vs other information return blames - @classmethod + @ classmethod def init(cls, path: Union[PathLike, None] = None, mkdir: bool = True, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB, expand_vars: bool = True, **kwargs: Any) -> 'Repo': """Initialize a git repository at the given path if specified @@ -1023,7 +1052,7 @@ class Repo(object): git.init(**kwargs) return cls(path, odbt=odbt) - @classmethod + @ classmethod def _clone(cls, git: 'Git', url: PathLike, path: PathLike, odb_default_type: Type[GitCmdObjectDB], progress: Union['RemoteProgress', 'UpdateProgress', Callable[..., 'RemoteProgress'], None] = None, multi_options: Optional[List[str]] = None, **kwargs: Any @@ -1101,7 +1130,7 @@ class Repo(object): :return: ``git.Repo`` (the newly cloned repo)""" return self._clone(self.git, self.common_dir, path, type(self.odb), progress, multi_options, **kwargs) - @classmethod + @ classmethod def clone_from(cls, url: PathLike, to_path: PathLike, progress: Optional[Callable] = None, env: Optional[Mapping[str, Any]] = None, multi_options: Optional[List[str]] = None, **kwargs: Any) -> 'Repo': diff --git a/git/types.py b/git/types.py index ccaffef3..64bf3d96 100644 --- a/git/types.py +++ b/git/types.py @@ -23,7 +23,7 @@ if sys.version_info[:2] < (3, 9): PathLike = Union[str, os.PathLike] elif sys.version_info[:2] >= (3, 9): # os.PathLike only becomes subscriptable from Python 3.9 onwards - PathLike = Union[str, 'os.PathLike[str]'] # forward ref as pylance complains unless editing with py3.9+ + PathLike = Union[str, os.PathLike] if TYPE_CHECKING: from git.repo import Repo -- cgit v1.2.1 From a3f5b1308f3340375832f1f2254b41c1ecbbc17e Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 22:18:28 +0100 Subject: Test Dataclass in repo.base.blame() 2 --- git/repo/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index bedd6a08..a9d2e5be 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -42,10 +42,10 @@ import gitdb from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish, assert_never from typing import (Any, BinaryIO, Callable, Dict, Iterator, List, Mapping, Optional, Sequence, - TextIO, Tuple, Type, TypedDict, Union, + TextIO, Tuple, Type, Union, NamedTuple, cast, TYPE_CHECKING) -from git.types import ConfigLevels_Tup +from git.types import ConfigLevels_Tup, TypedDict if TYPE_CHECKING: from git.util import IterableList -- cgit v1.2.1 From a2a36e06942d7a146d6640f275d4a4ec84e187c0 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 22:26:14 +0100 Subject: Test Dataclass in repo.base.blame() 3 --- git/repo/base.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index a9d2e5be..0f231e5f 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -890,7 +890,7 @@ class Repo(object): commits: Dict[str, Commit] = {} blames: List[List[Commit | List[str | bytes] | None]] = [] - class InfoTC(TypedDict, total=False): + class InfoTD(TypedDict, total=False): sha: str id: str filename: str @@ -992,19 +992,20 @@ class Repo(object): commits[sha] = c blames[-1][0] = c # END if commit objects needs initial creation - if blames[-1][1] is not None: - if not is_binary: - if line_str and line_str[0] == '\t': - line_str = line_str[1:] - - blames[-1][1].append(line_str) - else: - # NOTE: We are actually parsing lines out of binary data, which can lead to the - # binary being split up along the newline separator. We will append this to the - # blame we are currently looking at, even though it should be concatenated with - # the last line we have seen. - blames[-1][1].append(line_bytes) + if not is_binary: + if line_str and line_str[0] == '\t': + line_str = line_str[1:] + line_AnyStr: str | bytes = line_str + else: + line_AnyStr = line_bytes + # NOTE: We are actually parsing lines out of binary data, which can lead to the + # binary being split up along the newline separator. We will append this to the + # blame we are currently looking at, even though it should be concatenated with + # the last line we have seen. + # end handle line contents + if blames[-1][1] is not None: + blames[-1][1].append(line_AnyStr) info.id = sha # END if we collected commit info -- cgit v1.2.1 From ed137cbddf69ae11e5287a9e96e1df1a6e71250d Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 22:35:03 +0100 Subject: Test TypedDict in repo.base.blame() 2 --- git/repo/base.py | 80 +++++++++++++++++++++++++------------------------------- 1 file changed, 36 insertions(+), 44 deletions(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index 0f231e5f..0a12d959 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -7,7 +7,6 @@ from __future__ import annotations import logging import os import re -from dataclasses import dataclass import shlex import warnings from gitdb.db.loose import LooseObjectDB @@ -902,21 +901,7 @@ class Repo(object): committer_email: str committer_date: int - @dataclass - class InfoDC(Dict[str, Union[str, int]]): - sha: str = '' - id: str = '' - filename: str = '' - summary: str = '' - author: str = '' - author_email: str = '' - author_date: int = 0 - committer: str = '' - committer_email: str = '' - committer_date: int = 0 - - # info: InfoTD = {} - info = InfoDC() + info: InfoTD = {} keepends = True for line_bytes in data.splitlines(keepends): @@ -943,10 +928,10 @@ class Repo(object): # another line of blame with the same data digits = parts[-1].split(" ") if len(digits) == 3: - info.id = firstpart + info = {'id': firstpart} blames.append([None, []]) - elif info.id != firstpart: - info.id = firstpart + elif info['id'] != firstpart: + info = {'id': firstpart} blames.append([commits.get(firstpart), []]) # END blame data initialization else: @@ -962,12 +947,20 @@ class Repo(object): # committer-time 1192271832 # committer-tz -0700 - IGNORED BY US role = m.group(0) - if firstpart.endswith('-mail'): - info[f"{role}_email"] = parts[-1] - elif firstpart.endswith('-time'): - info[f"{role}_date"] = int(parts[-1]) - elif role == firstpart: - info[role] = parts[-1] + if role == 'author': + if firstpart.endswith('-mail'): + info["author_email"] = parts[-1] + elif firstpart.endswith('-time'): + info["author_date"] = int(parts[-1]) + elif role == firstpart: + info["author"] = parts[-1] + elif role == 'committer': + if firstpart.endswith('-mail'): + info["committer_email"] = parts[-1] + elif firstpart.endswith('-time'): + info["committer_date"] = int(parts[-1]) + elif role == firstpart: + info["committer"] = parts[-1] # END distinguish mail,time,name else: # handle @@ -980,34 +973,33 @@ class Repo(object): info['summary'] = parts[-1] elif firstpart == '': if info: - sha = info.id + sha = info['id'] c = commits.get(sha) if c is None: c = Commit(self, hex_to_bin(sha), - author=Actor._from_string(info.author + ' ' + info.author_email), - authored_date=info.author_date, + author=Actor._from_string(info['author'] + ' ' + info['author_email']), + authored_date=info['author_date'], committer=Actor._from_string( - info.committer + ' ' + info.committer_email), - committed_date=info.committer_date) + info['committer'] + ' ' + info['committer_email']), + committed_date=info['committer_date']) commits[sha] = c blames[-1][0] = c # END if commit objects needs initial creation - if not is_binary: - if line_str and line_str[0] == '\t': - line_str = line_str[1:] - line_AnyStr: str | bytes = line_str - else: - line_AnyStr = line_bytes - # NOTE: We are actually parsing lines out of binary data, which can lead to the - # binary being split up along the newline separator. We will append this to the - # blame we are currently looking at, even though it should be concatenated with - # the last line we have seen. - - # end handle line contents if blames[-1][1] is not None: - blames[-1][1].append(line_AnyStr) + if not is_binary: + if line_str and line_str[0] == '\t': + line_str = line_str[1:] + + blames[-1][1].append(line_str) + else: + # NOTE: We are actually parsing lines out of binary data, which can lead to the + # binary being split up along the newline separator. We will append this to the + # blame we are currently looking at, even though it should be concatenated with + # the last line we have seen. + blames[-1][1].append(line_bytes) + # end handle line contents - info.id = sha + info = {'id': sha} # END if we collected commit info # END distinguish filename,summary,rest # END distinguish author|committer vs filename,summary,rest -- cgit v1.2.1 From e4761ff67ef14df27026bbe9e215b9ddf5e5b3a5 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 22:45:19 +0100 Subject: Test TypedDict in repo.base.blame() 1 --- git/repo/base.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index 0a12d959..58b9d5c2 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -909,7 +909,7 @@ class Repo(object): line_str = line_bytes.rstrip().decode(defenc) except UnicodeDecodeError: firstpart = '' - parts = [''] + parts = [] is_binary = True else: # As we don't have an idea when the binary data ends, as it could contain multiple newlines @@ -983,20 +983,21 @@ class Repo(object): info['committer'] + ' ' + info['committer_email']), committed_date=info['committer_date']) commits[sha] = c - blames[-1][0] = c + blames[-1][0] = c # END if commit objects needs initial creation + if not is_binary: + if line_str and line_str[0] == '\t': + line_str = line_str[1:] + else: + pass + # NOTE: We are actually parsing lines out of binary data, which can lead to the + # binary being split up along the newline separator. We will append this to the + # blame we are currently looking at, even though it should be concatenated with + # the last line we have seen. + if blames[-1][1] is not None: - if not is_binary: - if line_str and line_str[0] == '\t': - line_str = line_str[1:] - - blames[-1][1].append(line_str) - else: - # NOTE: We are actually parsing lines out of binary data, which can lead to the - # binary being split up along the newline separator. We will append this to the - # blame we are currently looking at, even though it should be concatenated with - # the last line we have seen. - blames[-1][1].append(line_bytes) + blames[-1][1].append(line_str) + info = {'id': sha} # end handle line contents info = {'id': sha} -- cgit v1.2.1 From 1aaa7048ddecb4509e1c279e28de5ef71477e71f Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 22:50:11 +0100 Subject: Test Dataclass in repo.base.blame() 4 --- git/repo/base.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index 58b9d5c2..2bfc4677 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -985,22 +985,21 @@ class Repo(object): commits[sha] = c blames[-1][0] = c # END if commit objects needs initial creation - if not is_binary: - if line_str and line_str[0] == '\t': - line_str = line_str[1:] - else: - pass - # NOTE: We are actually parsing lines out of binary data, which can lead to the - # binary being split up along the newline separator. We will append this to the - # blame we are currently looking at, even though it should be concatenated with - # the last line we have seen. if blames[-1][1] is not None: - blames[-1][1].append(line_str) + if not is_binary: + if line_str and line_str[0] == '\t': + line_str = line_str[1:] + blames[-1][1].append(line_str) + else: + blames[-1][1].append(line_bytes) + # NOTE: We are actually parsing lines out of binary data, which can lead to the + # binary being split up along the newline separator. We will append this to the + # blame we are currently looking at, even though it should be concatenated with + # the last line we have seen. info = {'id': sha} # end handle line contents - info = {'id': sha} # END if we collected commit info # END distinguish filename,summary,rest # END distinguish author|committer vs filename,summary,rest -- cgit v1.2.1 From bc9bcf51ef68385895d8cdbc76098d6b493cd1b6 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 22:52:10 +0100 Subject: Test Dataclass in repo.base.blame() 5 --- git/repo/base.py | 67 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 37 insertions(+), 30 deletions(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index 2bfc4677..a0aee322 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -7,6 +7,7 @@ from __future__ import annotations import logging import os import re +from dataclasses import dataclass import shlex import warnings from gitdb.db.loose import LooseObjectDB @@ -41,10 +42,10 @@ import gitdb from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish, assert_never from typing import (Any, BinaryIO, Callable, Dict, Iterator, List, Mapping, Optional, Sequence, - TextIO, Tuple, Type, Union, + TextIO, Tuple, Type, TypedDict, Union, NamedTuple, cast, TYPE_CHECKING) -from git.types import ConfigLevels_Tup, TypedDict +from git.types import ConfigLevels_Tup if TYPE_CHECKING: from git.util import IterableList @@ -889,7 +890,7 @@ class Repo(object): commits: Dict[str, Commit] = {} blames: List[List[Commit | List[str | bytes] | None]] = [] - class InfoTD(TypedDict, total=False): + class InfoTC(TypedDict, total=False): sha: str id: str filename: str @@ -901,7 +902,21 @@ class Repo(object): committer_email: str committer_date: int - info: InfoTD = {} + @dataclass + class InfoDC(Dict[str, Union[str, int]]): + sha: str = '' + id: str = '' + filename: str = '' + summary: str = '' + author: str = '' + author_email: str = '' + author_date: int = 0 + committer: str = '' + committer_email: str = '' + committer_date: int = 0 + + # info: InfoTD = {} + info = InfoDC() keepends = True for line_bytes in data.splitlines(keepends): @@ -909,7 +924,7 @@ class Repo(object): line_str = line_bytes.rstrip().decode(defenc) except UnicodeDecodeError: firstpart = '' - parts = [] + parts = [''] is_binary = True else: # As we don't have an idea when the binary data ends, as it could contain multiple newlines @@ -928,10 +943,10 @@ class Repo(object): # another line of blame with the same data digits = parts[-1].split(" ") if len(digits) == 3: - info = {'id': firstpart} + info.id = firstpart blames.append([None, []]) - elif info['id'] != firstpart: - info = {'id': firstpart} + elif info.id != firstpart: + info.id = firstpart blames.append([commits.get(firstpart), []]) # END blame data initialization else: @@ -947,20 +962,12 @@ class Repo(object): # committer-time 1192271832 # committer-tz -0700 - IGNORED BY US role = m.group(0) - if role == 'author': - if firstpart.endswith('-mail'): - info["author_email"] = parts[-1] - elif firstpart.endswith('-time'): - info["author_date"] = int(parts[-1]) - elif role == firstpart: - info["author"] = parts[-1] - elif role == 'committer': - if firstpart.endswith('-mail'): - info["committer_email"] = parts[-1] - elif firstpart.endswith('-time'): - info["committer_date"] = int(parts[-1]) - elif role == firstpart: - info["committer"] = parts[-1] + if firstpart.endswith('-mail'): + info[f"{role}_email"] = parts[-1] + elif firstpart.endswith('-time'): + info[f"{role}_date"] = int(parts[-1]) + elif role == firstpart: + info[role] = parts[-1] # END distinguish mail,time,name else: # handle @@ -973,33 +980,33 @@ class Repo(object): info['summary'] = parts[-1] elif firstpart == '': if info: - sha = info['id'] + sha = info.id c = commits.get(sha) if c is None: c = Commit(self, hex_to_bin(sha), - author=Actor._from_string(info['author'] + ' ' + info['author_email']), - authored_date=info['author_date'], + author=Actor._from_string(info.author + ' ' + info.author_email), + authored_date=info.author_date, committer=Actor._from_string( - info['committer'] + ' ' + info['committer_email']), - committed_date=info['committer_date']) + info.committer + ' ' + info.committer_email), + committed_date=info.committer_date) commits[sha] = c blames[-1][0] = c # END if commit objects needs initial creation - if blames[-1][1] is not None: if not is_binary: if line_str and line_str[0] == '\t': line_str = line_str[1:] + blames[-1][1].append(line_str) else: - blames[-1][1].append(line_bytes) # NOTE: We are actually parsing lines out of binary data, which can lead to the # binary being split up along the newline separator. We will append this to the # blame we are currently looking at, even though it should be concatenated with # the last line we have seen. - info = {'id': sha} + blames[-1][1].append(line_bytes) # end handle line contents + info.id = sha # END if we collected commit info # END distinguish filename,summary,rest # END distinguish author|committer vs filename,summary,rest -- cgit v1.2.1 From ad417ba77c98a39c2d5b3b3a74eb0a1ca17f0ccc Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 22:54:31 +0100 Subject: Test Dataclass in repo.base.blame() 6 --- git/repo/base.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index a0aee322..54409b6a 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -42,10 +42,10 @@ import gitdb from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish, assert_never from typing import (Any, BinaryIO, Callable, Dict, Iterator, List, Mapping, Optional, Sequence, - TextIO, Tuple, Type, TypedDict, Union, + TextIO, Tuple, Type, Union, NamedTuple, cast, TYPE_CHECKING) -from git.types import ConfigLevels_Tup +from git.types import ConfigLevels_Tup, TypedDict if TYPE_CHECKING: from git.util import IterableList @@ -984,11 +984,10 @@ class Repo(object): c = commits.get(sha) if c is None: c = Commit(self, hex_to_bin(sha), - author=Actor._from_string(info.author + ' ' + info.author_email), + author=Actor._from_string(f"{info.author} {info.author_email}"), authored_date=info.author_date, - committer=Actor._from_string( - info.committer + ' ' + info.committer_email), - committed_date=info.committer_date) + committer=Actor._from_string(f"{info.committer} {info.committer_email}"), + committed_date=info.committer_date) commits[sha] = c blames[-1][0] = c # END if commit objects needs initial creation -- cgit v1.2.1 From ecb1f79cdb5198a10e099c2b7cd27aff69105ea9 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 23:04:43 +0100 Subject: Choose TypedDict! --- git/repo/base.py | 69 ++++++++++++++++++++++++++------------------------------ 1 file changed, 32 insertions(+), 37 deletions(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index 54409b6a..e06e4eac 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -7,7 +7,6 @@ from __future__ import annotations import logging import os import re -from dataclasses import dataclass import shlex import warnings from gitdb.db.loose import LooseObjectDB @@ -890,7 +889,7 @@ class Repo(object): commits: Dict[str, Commit] = {} blames: List[List[Commit | List[str | bytes] | None]] = [] - class InfoTC(TypedDict, total=False): + class InfoTD(TypedDict, total=False): sha: str id: str filename: str @@ -902,21 +901,7 @@ class Repo(object): committer_email: str committer_date: int - @dataclass - class InfoDC(Dict[str, Union[str, int]]): - sha: str = '' - id: str = '' - filename: str = '' - summary: str = '' - author: str = '' - author_email: str = '' - author_date: int = 0 - committer: str = '' - committer_email: str = '' - committer_date: int = 0 - - # info: InfoTD = {} - info = InfoDC() + info: InfoTD = {} keepends = True for line_bytes in data.splitlines(keepends): @@ -924,7 +909,7 @@ class Repo(object): line_str = line_bytes.rstrip().decode(defenc) except UnicodeDecodeError: firstpart = '' - parts = [''] + parts = [] is_binary = True else: # As we don't have an idea when the binary data ends, as it could contain multiple newlines @@ -943,10 +928,10 @@ class Repo(object): # another line of blame with the same data digits = parts[-1].split(" ") if len(digits) == 3: - info.id = firstpart + info = {'id': firstpart} blames.append([None, []]) - elif info.id != firstpart: - info.id = firstpart + elif info['id'] != firstpart: + info = {'id': firstpart} blames.append([commits.get(firstpart), []]) # END blame data initialization else: @@ -962,12 +947,20 @@ class Repo(object): # committer-time 1192271832 # committer-tz -0700 - IGNORED BY US role = m.group(0) - if firstpart.endswith('-mail'): - info[f"{role}_email"] = parts[-1] - elif firstpart.endswith('-time'): - info[f"{role}_date"] = int(parts[-1]) - elif role == firstpart: - info[role] = parts[-1] + if role == 'author': + if firstpart.endswith('-mail'): + info["author_email"] = parts[-1] + elif firstpart.endswith('-time'): + info["author_date"] = int(parts[-1]) + elif role == firstpart: + info["author"] = parts[-1] + elif role == 'committer': + if firstpart.endswith('-mail'): + info["committer_email"] = parts[-1] + elif firstpart.endswith('-time'): + info["committer_date"] = int(parts[-1]) + elif role == firstpart: + info["committer"] = parts[-1] # END distinguish mail,time,name else: # handle @@ -980,32 +973,34 @@ class Repo(object): info['summary'] = parts[-1] elif firstpart == '': if info: - sha = info.id + sha = info['id'] c = commits.get(sha) if c is None: c = Commit(self, hex_to_bin(sha), - author=Actor._from_string(f"{info.author} {info.author_email}"), - authored_date=info.author_date, - committer=Actor._from_string(f"{info.committer} {info.committer_email}"), - committed_date=info.committer_date) + author=Actor._from_string(f"{info['author']} {info['author_email']}"), + authored_date=info['author_date'], + committer=Actor._from_string( + f"{info['committer']} {info['committer_email']}"), + committed_date=info['committer_date']) commits[sha] = c blames[-1][0] = c # END if commit objects needs initial creation + if blames[-1][1] is not None: + line: str | bytes if not is_binary: if line_str and line_str[0] == '\t': line_str = line_str[1:] - - blames[-1][1].append(line_str) + line = line_str else: + line = line_bytes # NOTE: We are actually parsing lines out of binary data, which can lead to the # binary being split up along the newline separator. We will append this to the # blame we are currently looking at, even though it should be concatenated with # the last line we have seen. - blames[-1][1].append(line_bytes) - # end handle line contents + blames[-1][1].append(line) - info.id = sha + info = {'id': sha} # END if we collected commit info # END distinguish filename,summary,rest # END distinguish author|committer vs filename,summary,rest -- cgit v1.2.1 From 5aa8c3401a860974db0126dc030e74bbddf217eb Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 23:22:04 +0100 Subject: Improve type of repo.blame_incremental() --- git/repo/base.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index e06e4eac..344e8a71 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -811,8 +811,8 @@ class Repo(object): should get a continuous range spanning all line numbers in the file. """ - data = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs) - commits: Dict[str, Commit] = {} + data: bytes = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs) + commits: Dict[bytes, Commit] = {} stream = (line for line in data.split(b'\n') if line) while True: @@ -820,15 +820,15 @@ class Repo(object): line = next(stream) # when exhausted, causes a StopIteration, terminating this function except StopIteration: return - split_line: Tuple[str, str, str, str] = line.split() - hexsha, orig_lineno_str, lineno_str, num_lines_str = split_line - lineno = int(lineno_str) - num_lines = int(num_lines_str) - orig_lineno = int(orig_lineno_str) + split_line = line.split() + hexsha, orig_lineno_b, lineno_b, num_lines_b = split_line + lineno = int(lineno_b) + num_lines = int(num_lines_b) + orig_lineno = int(orig_lineno_b) if hexsha not in commits: # Now read the next few lines and build up a dict of properties # for this commit - props = {} + props: Dict[bytes, bytes] = {} while True: try: line = next(stream) @@ -1126,7 +1126,7 @@ class Repo(object): @ classmethod def clone_from(cls, url: PathLike, to_path: PathLike, progress: Optional[Callable] = None, - env: Optional[Mapping[str, Any]] = None, + env: Optional[Mapping[str, str]] = None, multi_options: Optional[List[str]] = None, **kwargs: Any) -> 'Repo': """Create a clone from the given URL -- cgit v1.2.1 From 8b8aa16ee247c6ce403db7178d6c0f9c4ccd529c Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 23:30:27 +0100 Subject: Improve type of repo.currently_rebasing_on() --- git/repo/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index 344e8a71..c0229a84 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -1148,7 +1148,7 @@ class Repo(object): return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs) def archive(self, ostream: Union[TextIO, BinaryIO], treeish: Optional[str] = None, - prefix: Optional[str] = None, **kwargs: Any) -> 'Repo': + prefix: Optional[str] = None, **kwargs: Any) -> Repo: """Archive the tree at the given revision. :param ostream: file compatible stream object to which the archive will be written as bytes @@ -1195,7 +1195,7 @@ class Repo(object): clazz = self.__class__ return '<%s.%s %r>' % (clazz.__module__, clazz.__name__, self.git_dir) - def currently_rebasing_on(self) -> Union['SymbolicReference', Commit_ish, None]: + def currently_rebasing_on(self) -> Commit | None: """ :return: The commit which is currently being replayed while rebasing. -- cgit v1.2.1