From 39d37d550963a6a64e66ba3d6b9f4b077270a3ad Mon Sep 17 00:00:00 2001 From: Yobmod Date: Sat, 31 Jul 2021 22:26:20 +0100 Subject: replace some TBDs wiht runtime types --- git/objects/submodule/base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'git/objects') diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py index 559d2585..d306c91d 100644 --- a/git/objects/submodule/base.py +++ b/git/objects/submodule/base.py @@ -379,6 +379,7 @@ class Submodule(IndexObject, TraversableIterableObj): :return: The newly created submodule instance :note: works atomically, such that no change will be done if the repository update fails for instance""" + if repo.bare: raise InvalidGitRepositoryError("Cannot add submodules to bare repositories") # END handle bare repos @@ -434,7 +435,7 @@ class Submodule(IndexObject, TraversableIterableObj): url = urls[0] else: # clone new repo - kwargs: Dict[str, Union[bool, int, Sequence[TBD]]] = {'n': no_checkout} + kwargs: Dict[str, Union[bool, int, str, Sequence[TBD]]] = {'n': no_checkout} if not branch_is_default: kwargs['b'] = br.name # END setup checkout-branch -- cgit v1.2.1 From 2163322ef62fa97573ac94298261161fd9721993 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 14:00:33 +0100 Subject: increase mypy strictness (warn unused ignored) --- git/objects/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'git/objects') diff --git a/git/objects/util.py b/git/objects/util.py index f627211e..d3842cfb 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -346,7 +346,7 @@ class Traversable(Protocol): if not as_edge: out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id) - out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) # type: ignore + out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) return out # overloads in subclasses (mypy does't allow typing self: subclass) # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]] -- cgit v1.2.1 From 91fce331de16de6039c94cd4d7314184a5763e61 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 14:56:03 +0100 Subject: increase mypy strictness (warn unused ignored and warn unreachable) --- git/objects/fun.py | 2 +- git/objects/tree.py | 2 +- git/objects/util.py | 8 +++++--- 3 files changed, 7 insertions(+), 5 deletions(-) (limited to 'git/objects') diff --git a/git/objects/fun.py b/git/objects/fun.py index d6cdafe1..19b4e525 100644 --- a/git/objects/fun.py +++ b/git/objects/fun.py @@ -51,7 +51,7 @@ def tree_to_stream(entries: Sequence[EntryTup], write: Callable[['ReadableBuffer if isinstance(name, str): name_bytes = name.encode(defenc) else: - name_bytes = name + name_bytes = name # type: ignore[unreachable] # check runtime types - is always str? write(b''.join((mode_str, b' ', name_bytes, b'\0', binsha))) # END for each item diff --git a/git/objects/tree.py b/git/objects/tree.py index 0cceb59a..22531895 100644 --- a/git/objects/tree.py +++ b/git/objects/tree.py @@ -215,7 +215,7 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): super(Tree, self).__init__(repo, binsha, mode, path) @ classmethod - def _get_intermediate_items(cls, index_object: 'Tree', + def _get_intermediate_items(cls, index_object: IndexObjUnion, ) -> Union[Tuple['Tree', ...], Tuple[()]]: if index_object.type == "tree": return tuple(index_object._iter_convert_to_object(index_object._cache)) diff --git a/git/objects/util.py b/git/objects/util.py index d3842cfb..9c9ce773 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -167,7 +167,7 @@ def from_timestamp(timestamp: float, tz_offset: float) -> datetime: return utc_dt -def parse_date(string_date: str) -> Tuple[int, int]: +def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: """ Parse the given date as one of the following @@ -182,8 +182,10 @@ def parse_date(string_date: str) -> Tuple[int, int]: :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. """ if isinstance(string_date, datetime) and string_date.tzinfo: - offset = -int(string_date.utcoffset().total_seconds()) + offset = -int(string_date.utcoffset().total_seconds()) # type: ignore[union-attr] return int(string_date.astimezone(utc).timestamp()), offset + else: + assert isinstance(string_date, str) # for mypy # git time try: @@ -338,7 +340,7 @@ class Traversable(Protocol): """ # Commit and Submodule have id.__attribute__ as IterableObj # Tree has id.__attribute__ inherited from IndexObject - if isinstance(self, (TraversableIterableObj, Has_id_attribute)): + if isinstance(self, Has_id_attribute): id = self._id_attribute_ else: id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ -- cgit v1.2.1 From 13e0730b449e8ace2c7aa691d588febb4bed510c Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 16:16:11 +0100 Subject: Fix parse_date typing --- git/objects/util.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'git/objects') diff --git a/git/objects/util.py b/git/objects/util.py index 9c9ce773..d7472b5d 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -181,11 +181,13 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: :raise ValueError: If the format could not be understood :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. """ - if isinstance(string_date, datetime) and string_date.tzinfo: - offset = -int(string_date.utcoffset().total_seconds()) # type: ignore[union-attr] + if isinstance(string_date, datetime): + if string_date.tzinfo: + utcoffset = string_date.utcoffset() + offset = -int(utcoffset.total_seconds()) if utcoffset else 0 return int(string_date.astimezone(utc).timestamp()), offset else: - assert isinstance(string_date, str) # for mypy + assert isinstance(string_date, str), f"string_date={string_date}, type={type(string_date)}" # for mypy # git time try: -- cgit v1.2.1 From 730f11936364314b76738ed06bdd9222dc9de2ac Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 16:30:32 +0100 Subject: Fix parse_date typing 2 --- git/objects/util.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'git/objects') diff --git a/git/objects/util.py b/git/objects/util.py index d7472b5d..e3e7d3ba 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -182,9 +182,11 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. """ if isinstance(string_date, datetime): - if string_date.tzinfo: + if string_date.tzinfo and string_date.utcoffset(): utcoffset = string_date.utcoffset() offset = -int(utcoffset.total_seconds()) if utcoffset else 0 + else: + offset = 0 return int(string_date.astimezone(utc).timestamp()), offset else: assert isinstance(string_date, str), f"string_date={string_date}, type={type(string_date)}" # for mypy -- cgit v1.2.1 From 2fe13cad9c889b8628119ab5ee139038b0c164fd Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 16:54:31 +0100 Subject: Fix parse_date typing 3 --- git/objects/util.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'git/objects') diff --git a/git/objects/util.py b/git/objects/util.py index e3e7d3ba..6e3f688e 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -181,13 +181,11 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: :raise ValueError: If the format could not be understood :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. """ - if isinstance(string_date, datetime): - if string_date.tzinfo and string_date.utcoffset(): - utcoffset = string_date.utcoffset() - offset = -int(utcoffset.total_seconds()) if utcoffset else 0 - else: - offset = 0 + if isinstance(string_date, datetime) and string_date.tzinfo: + utcoffset = string_date.utcoffset() + offset = -int(utcoffset.total_seconds()) if utcoffset else 0 return int(string_date.astimezone(utc).timestamp()), offset + else: assert isinstance(string_date, str), f"string_date={string_date}, type={type(string_date)}" # for mypy -- cgit v1.2.1 From 024b69669811dc3aa5a018eb3df5535202edf5f9 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 17:09:03 +0100 Subject: Fix parse_date typing 4 --- git/objects/util.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) (limited to 'git/objects') diff --git a/git/objects/util.py b/git/objects/util.py index 6e3f688e..1ca6f050 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -181,16 +181,15 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: :raise ValueError: If the format could not be understood :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. """ - if isinstance(string_date, datetime) and string_date.tzinfo: - utcoffset = string_date.utcoffset() - offset = -int(utcoffset.total_seconds()) if utcoffset else 0 - return int(string_date.astimezone(utc).timestamp()), offset + if isinstance(string_date, datetime): + if string_date.tzinfo: + utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None + offset = -int(utcoffset.total_seconds()) + return int(string_date.astimezone(utc).timestamp()), offset + else: + raise ValueError(f"Unsupported date format or type: {string_date}" % string_date) else: - assert isinstance(string_date, str), f"string_date={string_date}, type={type(string_date)}" # for mypy - - # git time - try: if string_date.count(' ') == 1 and string_date.rfind(':') == -1: timestamp, offset_str = string_date.split() if timestamp.startswith('@'): @@ -244,13 +243,9 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: continue # END exception handling # END for each fmt - # still here ? fail raise ValueError("no format matched") # END handle format - except Exception as e: - raise ValueError("Unsupported date format: %s" % string_date) from e - # END handle exceptions # precompiled regex -- cgit v1.2.1 From e2f8367b53b14acb8e1a86f33334f92a5a306878 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 17:15:06 +0100 Subject: Fix parse_date typing 5 --- git/objects/util.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'git/objects') diff --git a/git/objects/util.py b/git/objects/util.py index 1ca6f050..d3d8d38b 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -187,9 +187,10 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: offset = -int(utcoffset.total_seconds()) return int(string_date.astimezone(utc).timestamp()), offset else: - raise ValueError(f"Unsupported date format or type: {string_date}" % string_date) + raise ValueError(f"string_date datetime object without tzinfo, {string_date}") - else: + # git time + try: if string_date.count(' ') == 1 and string_date.rfind(':') == -1: timestamp, offset_str = string_date.split() if timestamp.startswith('@'): @@ -243,9 +244,13 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: continue # END exception handling # END for each fmt + # still here ? fail raise ValueError("no format matched") # END handle format + except Exception as e: + raise ValueError(f"Unsupported date format or type: {string_date}" % string_date) from e + # END handle exceptions # precompiled regex -- cgit v1.2.1 From d30bc0722ee32c501c021bde511640ff6620a203 Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 17:19:33 +0100 Subject: Fix parse_date typing 6 --- git/objects/util.py | 2 +- .../util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp | 566 +++++++++++++++++++++ 2 files changed, 567 insertions(+), 1 deletion(-) create mode 100644 git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp (limited to 'git/objects') diff --git a/git/objects/util.py b/git/objects/util.py index d3d8d38b..16d4c0ac 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -249,7 +249,7 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: raise ValueError("no format matched") # END handle format except Exception as e: - raise ValueError(f"Unsupported date format or type: {string_date}" % string_date) from e + raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e # END handle exceptions diff --git a/git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp b/git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp new file mode 100644 index 00000000..16d4c0ac --- /dev/null +++ b/git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp @@ -0,0 +1,566 @@ +# util.py +# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors +# +# This module is part of GitPython and is released under +# the BSD License: http://www.opensource.org/licenses/bsd-license.php +"""Module for general utility functions""" + +from abc import abstractmethod +import warnings +from git.util import ( + IterableList, + IterableObj, + Actor +) + +import re +from collections import deque + +from string import digits +import time +import calendar +from datetime import datetime, timedelta, tzinfo + +# typing ------------------------------------------------------------ +from typing import (Any, Callable, Deque, Iterator, NamedTuple, overload, Sequence, + TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast) + +from git.types import Has_id_attribute, Literal, Protocol, runtime_checkable + +if TYPE_CHECKING: + from io import BytesIO, StringIO + from .commit import Commit + from .blob import Blob + from .tag import TagObject + from .tree import Tree, TraversedTreeTup + from subprocess import Popen + from .submodule.base import Submodule + + +class TraverseNT(NamedTuple): + depth: int + item: Union['Traversable', 'Blob'] + src: Union['Traversable', None] + + +T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse() + +TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule + 'TraversedTreeTup'] # for tree.traverse() + +# -------------------------------------------------------------------- + +__all__ = ('get_object_type_by_name', 'parse_date', 'parse_actor_and_date', + 'ProcessStreamAdapter', 'Traversable', 'altz_to_utctz_str', 'utctz_to_altz', + 'verify_utctz', 'Actor', 'tzoffset', 'utc') + +ZERO = timedelta(0) + +#{ Functions + + +def mode_str_to_int(modestr: Union[bytes, str]) -> int: + """ + :param modestr: string like 755 or 644 or 100644 - only the last 6 chars will be used + :return: + String identifying a mode compatible to the mode methods ids of the + stat module regarding the rwx permissions for user, group and other, + special flags and file system flags, i.e. whether it is a symlink + for example.""" + mode = 0 + for iteration, char in enumerate(reversed(modestr[-6:])): + char = cast(Union[str, int], char) + mode += int(char) << iteration * 3 + # END for each char + return mode + + +def get_object_type_by_name(object_type_name: bytes + ) -> Union[Type['Commit'], Type['TagObject'], Type['Tree'], Type['Blob']]: + """ + :return: type suitable to handle the given object type name. + Use the type to create new instances. + + :param object_type_name: Member of TYPES + + :raise ValueError: In case object_type_name is unknown""" + if object_type_name == b"commit": + from . import commit + return commit.Commit + elif object_type_name == b"tag": + from . import tag + return tag.TagObject + elif object_type_name == b"blob": + from . import blob + return blob.Blob + elif object_type_name == b"tree": + from . import tree + return tree.Tree + else: + raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode()) + + +def utctz_to_altz(utctz: str) -> int: + """we convert utctz to the timezone in seconds, it is the format time.altzone + returns. Git stores it as UTC timezone which has the opposite sign as well, + which explains the -1 * ( that was made explicit here ) + :param utctz: git utc timezone string, i.e. +0200""" + return -1 * int(float(utctz) / 100 * 3600) + + +def altz_to_utctz_str(altz: float) -> str: + """As above, but inverses the operation, returning a string that can be used + in commit objects""" + utci = -1 * int((float(altz) / 3600) * 100) + utcs = str(abs(utci)) + utcs = "0" * (4 - len(utcs)) + utcs + prefix = (utci < 0 and '-') or '+' + return prefix + utcs + + +def verify_utctz(offset: str) -> str: + """:raise ValueError: if offset is incorrect + :return: offset""" + fmt_exc = ValueError("Invalid timezone offset format: %s" % offset) + if len(offset) != 5: + raise fmt_exc + if offset[0] not in "+-": + raise fmt_exc + if offset[1] not in digits or\ + offset[2] not in digits or\ + offset[3] not in digits or\ + offset[4] not in digits: + raise fmt_exc + # END for each char + return offset + + +class tzoffset(tzinfo): + + def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None: + self._offset = timedelta(seconds=-secs_west_of_utc) + self._name = name or 'fixed' + + def __reduce__(self) -> Tuple[Type['tzoffset'], Tuple[float, str]]: + return tzoffset, (-self._offset.total_seconds(), self._name) + + def utcoffset(self, dt: Union[datetime, None]) -> timedelta: + return self._offset + + def tzname(self, dt: Union[datetime, None]) -> str: + return self._name + + def dst(self, dt: Union[datetime, None]) -> timedelta: + return ZERO + + +utc = tzoffset(0, 'UTC') + + +def from_timestamp(timestamp: float, tz_offset: float) -> datetime: + """Converts a timestamp + tz_offset into an aware datetime instance.""" + utc_dt = datetime.fromtimestamp(timestamp, utc) + try: + local_dt = utc_dt.astimezone(tzoffset(tz_offset)) + return local_dt + except ValueError: + return utc_dt + + +def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: + """ + Parse the given date as one of the following + + * aware datetime instance + * Git internal format: timestamp offset + * RFC 2822: Thu, 07 Apr 2005 22:13:13 +0200. + * ISO 8601 2005-04-07T22:13:13 + The T can be a space as well + + :return: Tuple(int(timestamp_UTC), int(offset)), both in seconds since epoch + :raise ValueError: If the format could not be understood + :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. + """ + if isinstance(string_date, datetime): + if string_date.tzinfo: + utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None + offset = -int(utcoffset.total_seconds()) + return int(string_date.astimezone(utc).timestamp()), offset + else: + raise ValueError(f"string_date datetime object without tzinfo, {string_date}") + + # git time + try: + if string_date.count(' ') == 1 and string_date.rfind(':') == -1: + timestamp, offset_str = string_date.split() + if timestamp.startswith('@'): + timestamp = timestamp[1:] + timestamp_int = int(timestamp) + return timestamp_int, utctz_to_altz(verify_utctz(offset_str)) + else: + offset_str = "+0000" # local time by default + if string_date[-5] in '-+': + offset_str = verify_utctz(string_date[-5:]) + string_date = string_date[:-6] # skip space as well + # END split timezone info + offset = utctz_to_altz(offset_str) + + # now figure out the date and time portion - split time + date_formats = [] + splitter = -1 + if ',' in string_date: + date_formats.append("%a, %d %b %Y") + splitter = string_date.rfind(' ') + else: + # iso plus additional + date_formats.append("%Y-%m-%d") + date_formats.append("%Y.%m.%d") + date_formats.append("%m/%d/%Y") + date_formats.append("%d.%m.%Y") + + splitter = string_date.rfind('T') + if splitter == -1: + splitter = string_date.rfind(' ') + # END handle 'T' and ' ' + # END handle rfc or iso + + assert splitter > -1 + + # split date and time + time_part = string_date[splitter + 1:] # skip space + date_part = string_date[:splitter] + + # parse time + tstruct = time.strptime(time_part, "%H:%M:%S") + + for fmt in date_formats: + try: + dtstruct = time.strptime(date_part, fmt) + utctime = calendar.timegm((dtstruct.tm_year, dtstruct.tm_mon, dtstruct.tm_mday, + tstruct.tm_hour, tstruct.tm_min, tstruct.tm_sec, + dtstruct.tm_wday, dtstruct.tm_yday, tstruct.tm_isdst)) + return int(utctime), offset + except ValueError: + continue + # END exception handling + # END for each fmt + + # still here ? fail + raise ValueError("no format matched") + # END handle format + except Exception as e: + raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e + # END handle exceptions + + +# precompiled regex +_re_actor_epoch = re.compile(r'^.+? (.*) (\d+) ([+-]\d+).*$') +_re_only_actor = re.compile(r'^.+? (.*)$') + + +def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: + """Parse out the actor (author or committer) info from a line like:: + + author Tom Preston-Werner 1191999972 -0700 + + :return: [Actor, int_seconds_since_epoch, int_timezone_offset]""" + actor, epoch, offset = '', '0', '0' + m = _re_actor_epoch.search(line) + if m: + actor, epoch, offset = m.groups() + else: + m = _re_only_actor.search(line) + actor = m.group(1) if m else line or '' + return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset)) + +#} END functions + + +#{ Classes + +class ProcessStreamAdapter(object): + + """Class wireing all calls to the contained Process instance. + + Use this type to hide the underlying process to provide access only to a specified + stream. The process is usually wrapped into an AutoInterrupt class to kill + it if the instance goes out of scope.""" + __slots__ = ("_proc", "_stream") + + def __init__(self, process: 'Popen', stream_name: str) -> None: + self._proc = process + self._stream: StringIO = getattr(process, stream_name) # guessed type + + def __getattr__(self, attr: str) -> Any: + return getattr(self._stream, attr) + + +@runtime_checkable +class Traversable(Protocol): + + """Simple interface to perform depth-first or breadth-first traversals + into one direction. + Subclasses only need to implement one function. + Instances of the Subclass must be hashable + + Defined subclasses = [Commit, Tree, SubModule] + """ + __slots__ = () + + @classmethod + @abstractmethod + def _get_intermediate_items(cls, item: Any) -> Sequence['Traversable']: + """ + Returns: + Tuple of items connected to the given item. + Must be implemented in subclass + + class Commit:: (cls, Commit) -> Tuple[Commit, ...] + class Submodule:: (cls, Submodule) -> Iterablelist[Submodule] + class Tree:: (cls, Tree) -> Tuple[Tree, ...] + """ + raise NotImplementedError("To be implemented in subclass") + + @abstractmethod + def list_traverse(self, *args: Any, **kwargs: Any) -> Any: + """ """ + warnings.warn("list_traverse() method should only be called from subclasses." + "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" + "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", + DeprecationWarning, + stacklevel=2) + return self._list_traverse(*args, **kwargs) + + def _list_traverse(self, as_edge: bool = False, *args: Any, **kwargs: Any + ) -> IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']]: + """ + :return: IterableList with the results of the traversal as produced by + traverse() + Commit -> IterableList['Commit'] + Submodule -> IterableList['Submodule'] + Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']] + """ + # Commit and Submodule have id.__attribute__ as IterableObj + # Tree has id.__attribute__ inherited from IndexObject + if isinstance(self, Has_id_attribute): + id = self._id_attribute_ + else: + id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ + # could add _id_attribute_ to Traversable, or make all Traversable also Iterable? + + if not as_edge: + out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id) + out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) + return out + # overloads in subclasses (mypy does't allow typing self: subclass) + # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]] + else: + # Raise deprecationwarning, doesn't make sense to use this + out_list: IterableList = IterableList(self.traverse(*args, **kwargs)) + return out_list + + @ abstractmethod + def traverse(self, *args: Any, **kwargs: Any) -> Any: + """ """ + warnings.warn("traverse() method should only be called from subclasses." + "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" + "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", + DeprecationWarning, + stacklevel=2) + return self._traverse(*args, **kwargs) + + def _traverse(self, + predicate: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: True, + prune: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: False, + depth: int = -1, branch_first: bool = True, visit_once: bool = True, + ignore_self: int = 1, as_edge: bool = False + ) -> Union[Iterator[Union['Traversable', 'Blob']], + Iterator[TraversedTup]]: + """:return: iterator yielding of items found when traversing self + :param predicate: f(i,d) returns False if item i at depth d should not be included in the result + + :param prune: + f(i,d) return True if the search should stop at item i at depth d. + Item i will not be returned. + + :param depth: + define at which level the iteration should not go deeper + if -1, there is no limit + if 0, you would effectively only get self, the root of the iteration + i.e. if 1, you would only get the first level of predecessors/successors + + :param branch_first: + if True, items will be returned branch first, otherwise depth first + + :param visit_once: + if True, items will only be returned once, although they might be encountered + several times. Loops are prevented that way. + + :param ignore_self: + if True, self will be ignored and automatically pruned from + the result. Otherwise it will be the first item to be returned. + If as_edge is True, the source of the first edge is None + + :param as_edge: + if True, return a pair of items, first being the source, second the + destination, i.e. tuple(src, dest) with the edge spanning from + source to destination""" + + """ + Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]] + Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]] + Tree -> Iterator[Union[Blob, Tree, Submodule, + Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]] + + ignore_self=True is_edge=True -> Iterator[item] + ignore_self=True is_edge=False --> Iterator[item] + ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]] + ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]""" + + visited = set() + stack: Deque[TraverseNT] = deque() + stack.append(TraverseNT(0, self, None)) # self is always depth level 0 + + def addToStack(stack: Deque[TraverseNT], + src_item: 'Traversable', + branch_first: bool, + depth: int) -> None: + lst = self._get_intermediate_items(item) + if not lst: # empty list + return None + if branch_first: + stack.extendleft(TraverseNT(depth, i, src_item) for i in lst) + else: + reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1)) + stack.extend(reviter) + # END addToStack local method + + while stack: + d, item, src = stack.pop() # depth of item, item, item_source + + if visit_once and item in visited: + continue + + if visit_once: + visited.add(item) + + rval: Union[TraversedTup, 'Traversable', 'Blob'] + if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item) + rval = (src, item) + else: + rval = item + + if prune(rval, d): + continue + + skipStartItem = ignore_self and (item is self) + if not skipStartItem and predicate(rval, d): + yield rval + + # only continue to next level if this is appropriate ! + nd = d + 1 + if depth > -1 and nd > depth: + continue + + addToStack(stack, item, branch_first, nd) + # END for each item on work stack + + +@ runtime_checkable +class Serializable(Protocol): + + """Defines methods to serialize and deserialize objects from and into a data stream""" + __slots__ = () + + # @abstractmethod + def _serialize(self, stream: 'BytesIO') -> 'Serializable': + """Serialize the data of this object into the given data stream + :note: a serialized object would ``_deserialize`` into the same object + :param stream: a file-like object + :return: self""" + raise NotImplementedError("To be implemented in subclass") + + # @abstractmethod + def _deserialize(self, stream: 'BytesIO') -> 'Serializable': + """Deserialize all information regarding this object from the stream + :param stream: a file-like object + :return: self""" + raise NotImplementedError("To be implemented in subclass") + + +class TraversableIterableObj(IterableObj, Traversable): + __slots__ = () + + TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj] + + def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]: + return super(TraversableIterableObj, self)._list_traverse(* args, **kwargs) + + @ overload # type: ignore + def traverse(self: T_TIobj + ) -> Iterator[T_TIobj]: + ... + + @ overload + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[True], + as_edge: Literal[False], + ) -> Iterator[T_TIobj]: + ... + + @ overload + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[False], + as_edge: Literal[True], + ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: + ... + + @ overload + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], + prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[True], + as_edge: Literal[True], + ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: + ... + + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], + bool] = lambda i, d: True, + prune: Callable[[Union[T_TIobj, TIobj_tuple], int], + bool] = lambda i, d: False, + depth: int = -1, branch_first: bool = True, visit_once: bool = True, + ignore_self: int = 1, as_edge: bool = False + ) -> Union[Iterator[T_TIobj], + Iterator[Tuple[T_TIobj, T_TIobj]], + Iterator[TIobj_tuple]]: + """For documentation, see util.Traversable._traverse()""" + + """ + # To typecheck instead of using cast. + import itertools + from git.types import TypeGuard + def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]: + for x in inp[1]: + if not isinstance(x, tuple) and len(x) != 2: + if all(isinstance(inner, Commit) for inner in x): + continue + return True + + ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge) + ret_tup = itertools.tee(ret, 2) + assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}" + return ret_tup[0] + """ + return cast(Union[Iterator[T_TIobj], + Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], + super(TraversableIterableObj, self)._traverse( + predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore + )) -- cgit v1.2.1 From 6470ad4a413fb7fbd9f2d3b9da1720c13ffc92bb Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 17:24:18 +0100 Subject: Fix parse_date typing 7 --- git/objects/util.py | 4 +- .../util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp | 566 --------------------- 2 files changed, 3 insertions(+), 567 deletions(-) delete mode 100644 git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp (limited to 'git/objects') diff --git a/git/objects/util.py b/git/objects/util.py index 16d4c0ac..0b843301 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -187,7 +187,9 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: offset = -int(utcoffset.total_seconds()) return int(string_date.astimezone(utc).timestamp()), offset else: - raise ValueError(f"string_date datetime object without tzinfo, {string_date}") + # should just return timestamp, 0? + return int(string_date.astimezone(utc).timestamp()), 0 + # raise ValueError(f"string_date datetime object without tzinfo, {string_date}") # git time try: diff --git a/git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp b/git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp deleted file mode 100644 index 16d4c0ac..00000000 --- a/git/objects/util.py.97f6472e9bbb12cad7bbab8f367a99fe.tmp +++ /dev/null @@ -1,566 +0,0 @@ -# util.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php -"""Module for general utility functions""" - -from abc import abstractmethod -import warnings -from git.util import ( - IterableList, - IterableObj, - Actor -) - -import re -from collections import deque - -from string import digits -import time -import calendar -from datetime import datetime, timedelta, tzinfo - -# typing ------------------------------------------------------------ -from typing import (Any, Callable, Deque, Iterator, NamedTuple, overload, Sequence, - TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast) - -from git.types import Has_id_attribute, Literal, Protocol, runtime_checkable - -if TYPE_CHECKING: - from io import BytesIO, StringIO - from .commit import Commit - from .blob import Blob - from .tag import TagObject - from .tree import Tree, TraversedTreeTup - from subprocess import Popen - from .submodule.base import Submodule - - -class TraverseNT(NamedTuple): - depth: int - item: Union['Traversable', 'Blob'] - src: Union['Traversable', None] - - -T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse() - -TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule - 'TraversedTreeTup'] # for tree.traverse() - -# -------------------------------------------------------------------- - -__all__ = ('get_object_type_by_name', 'parse_date', 'parse_actor_and_date', - 'ProcessStreamAdapter', 'Traversable', 'altz_to_utctz_str', 'utctz_to_altz', - 'verify_utctz', 'Actor', 'tzoffset', 'utc') - -ZERO = timedelta(0) - -#{ Functions - - -def mode_str_to_int(modestr: Union[bytes, str]) -> int: - """ - :param modestr: string like 755 or 644 or 100644 - only the last 6 chars will be used - :return: - String identifying a mode compatible to the mode methods ids of the - stat module regarding the rwx permissions for user, group and other, - special flags and file system flags, i.e. whether it is a symlink - for example.""" - mode = 0 - for iteration, char in enumerate(reversed(modestr[-6:])): - char = cast(Union[str, int], char) - mode += int(char) << iteration * 3 - # END for each char - return mode - - -def get_object_type_by_name(object_type_name: bytes - ) -> Union[Type['Commit'], Type['TagObject'], Type['Tree'], Type['Blob']]: - """ - :return: type suitable to handle the given object type name. - Use the type to create new instances. - - :param object_type_name: Member of TYPES - - :raise ValueError: In case object_type_name is unknown""" - if object_type_name == b"commit": - from . import commit - return commit.Commit - elif object_type_name == b"tag": - from . import tag - return tag.TagObject - elif object_type_name == b"blob": - from . import blob - return blob.Blob - elif object_type_name == b"tree": - from . import tree - return tree.Tree - else: - raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode()) - - -def utctz_to_altz(utctz: str) -> int: - """we convert utctz to the timezone in seconds, it is the format time.altzone - returns. Git stores it as UTC timezone which has the opposite sign as well, - which explains the -1 * ( that was made explicit here ) - :param utctz: git utc timezone string, i.e. +0200""" - return -1 * int(float(utctz) / 100 * 3600) - - -def altz_to_utctz_str(altz: float) -> str: - """As above, but inverses the operation, returning a string that can be used - in commit objects""" - utci = -1 * int((float(altz) / 3600) * 100) - utcs = str(abs(utci)) - utcs = "0" * (4 - len(utcs)) + utcs - prefix = (utci < 0 and '-') or '+' - return prefix + utcs - - -def verify_utctz(offset: str) -> str: - """:raise ValueError: if offset is incorrect - :return: offset""" - fmt_exc = ValueError("Invalid timezone offset format: %s" % offset) - if len(offset) != 5: - raise fmt_exc - if offset[0] not in "+-": - raise fmt_exc - if offset[1] not in digits or\ - offset[2] not in digits or\ - offset[3] not in digits or\ - offset[4] not in digits: - raise fmt_exc - # END for each char - return offset - - -class tzoffset(tzinfo): - - def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None: - self._offset = timedelta(seconds=-secs_west_of_utc) - self._name = name or 'fixed' - - def __reduce__(self) -> Tuple[Type['tzoffset'], Tuple[float, str]]: - return tzoffset, (-self._offset.total_seconds(), self._name) - - def utcoffset(self, dt: Union[datetime, None]) -> timedelta: - return self._offset - - def tzname(self, dt: Union[datetime, None]) -> str: - return self._name - - def dst(self, dt: Union[datetime, None]) -> timedelta: - return ZERO - - -utc = tzoffset(0, 'UTC') - - -def from_timestamp(timestamp: float, tz_offset: float) -> datetime: - """Converts a timestamp + tz_offset into an aware datetime instance.""" - utc_dt = datetime.fromtimestamp(timestamp, utc) - try: - local_dt = utc_dt.astimezone(tzoffset(tz_offset)) - return local_dt - except ValueError: - return utc_dt - - -def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: - """ - Parse the given date as one of the following - - * aware datetime instance - * Git internal format: timestamp offset - * RFC 2822: Thu, 07 Apr 2005 22:13:13 +0200. - * ISO 8601 2005-04-07T22:13:13 - The T can be a space as well - - :return: Tuple(int(timestamp_UTC), int(offset)), both in seconds since epoch - :raise ValueError: If the format could not be understood - :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. - """ - if isinstance(string_date, datetime): - if string_date.tzinfo: - utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None - offset = -int(utcoffset.total_seconds()) - return int(string_date.astimezone(utc).timestamp()), offset - else: - raise ValueError(f"string_date datetime object without tzinfo, {string_date}") - - # git time - try: - if string_date.count(' ') == 1 and string_date.rfind(':') == -1: - timestamp, offset_str = string_date.split() - if timestamp.startswith('@'): - timestamp = timestamp[1:] - timestamp_int = int(timestamp) - return timestamp_int, utctz_to_altz(verify_utctz(offset_str)) - else: - offset_str = "+0000" # local time by default - if string_date[-5] in '-+': - offset_str = verify_utctz(string_date[-5:]) - string_date = string_date[:-6] # skip space as well - # END split timezone info - offset = utctz_to_altz(offset_str) - - # now figure out the date and time portion - split time - date_formats = [] - splitter = -1 - if ',' in string_date: - date_formats.append("%a, %d %b %Y") - splitter = string_date.rfind(' ') - else: - # iso plus additional - date_formats.append("%Y-%m-%d") - date_formats.append("%Y.%m.%d") - date_formats.append("%m/%d/%Y") - date_formats.append("%d.%m.%Y") - - splitter = string_date.rfind('T') - if splitter == -1: - splitter = string_date.rfind(' ') - # END handle 'T' and ' ' - # END handle rfc or iso - - assert splitter > -1 - - # split date and time - time_part = string_date[splitter + 1:] # skip space - date_part = string_date[:splitter] - - # parse time - tstruct = time.strptime(time_part, "%H:%M:%S") - - for fmt in date_formats: - try: - dtstruct = time.strptime(date_part, fmt) - utctime = calendar.timegm((dtstruct.tm_year, dtstruct.tm_mon, dtstruct.tm_mday, - tstruct.tm_hour, tstruct.tm_min, tstruct.tm_sec, - dtstruct.tm_wday, dtstruct.tm_yday, tstruct.tm_isdst)) - return int(utctime), offset - except ValueError: - continue - # END exception handling - # END for each fmt - - # still here ? fail - raise ValueError("no format matched") - # END handle format - except Exception as e: - raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e - # END handle exceptions - - -# precompiled regex -_re_actor_epoch = re.compile(r'^.+? (.*) (\d+) ([+-]\d+).*$') -_re_only_actor = re.compile(r'^.+? (.*)$') - - -def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: - """Parse out the actor (author or committer) info from a line like:: - - author Tom Preston-Werner 1191999972 -0700 - - :return: [Actor, int_seconds_since_epoch, int_timezone_offset]""" - actor, epoch, offset = '', '0', '0' - m = _re_actor_epoch.search(line) - if m: - actor, epoch, offset = m.groups() - else: - m = _re_only_actor.search(line) - actor = m.group(1) if m else line or '' - return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset)) - -#} END functions - - -#{ Classes - -class ProcessStreamAdapter(object): - - """Class wireing all calls to the contained Process instance. - - Use this type to hide the underlying process to provide access only to a specified - stream. The process is usually wrapped into an AutoInterrupt class to kill - it if the instance goes out of scope.""" - __slots__ = ("_proc", "_stream") - - def __init__(self, process: 'Popen', stream_name: str) -> None: - self._proc = process - self._stream: StringIO = getattr(process, stream_name) # guessed type - - def __getattr__(self, attr: str) -> Any: - return getattr(self._stream, attr) - - -@runtime_checkable -class Traversable(Protocol): - - """Simple interface to perform depth-first or breadth-first traversals - into one direction. - Subclasses only need to implement one function. - Instances of the Subclass must be hashable - - Defined subclasses = [Commit, Tree, SubModule] - """ - __slots__ = () - - @classmethod - @abstractmethod - def _get_intermediate_items(cls, item: Any) -> Sequence['Traversable']: - """ - Returns: - Tuple of items connected to the given item. - Must be implemented in subclass - - class Commit:: (cls, Commit) -> Tuple[Commit, ...] - class Submodule:: (cls, Submodule) -> Iterablelist[Submodule] - class Tree:: (cls, Tree) -> Tuple[Tree, ...] - """ - raise NotImplementedError("To be implemented in subclass") - - @abstractmethod - def list_traverse(self, *args: Any, **kwargs: Any) -> Any: - """ """ - warnings.warn("list_traverse() method should only be called from subclasses." - "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" - "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", - DeprecationWarning, - stacklevel=2) - return self._list_traverse(*args, **kwargs) - - def _list_traverse(self, as_edge: bool = False, *args: Any, **kwargs: Any - ) -> IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']]: - """ - :return: IterableList with the results of the traversal as produced by - traverse() - Commit -> IterableList['Commit'] - Submodule -> IterableList['Submodule'] - Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']] - """ - # Commit and Submodule have id.__attribute__ as IterableObj - # Tree has id.__attribute__ inherited from IndexObject - if isinstance(self, Has_id_attribute): - id = self._id_attribute_ - else: - id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ - # could add _id_attribute_ to Traversable, or make all Traversable also Iterable? - - if not as_edge: - out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id) - out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) - return out - # overloads in subclasses (mypy does't allow typing self: subclass) - # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]] - else: - # Raise deprecationwarning, doesn't make sense to use this - out_list: IterableList = IterableList(self.traverse(*args, **kwargs)) - return out_list - - @ abstractmethod - def traverse(self, *args: Any, **kwargs: Any) -> Any: - """ """ - warnings.warn("traverse() method should only be called from subclasses." - "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" - "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", - DeprecationWarning, - stacklevel=2) - return self._traverse(*args, **kwargs) - - def _traverse(self, - predicate: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: True, - prune: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: False, - depth: int = -1, branch_first: bool = True, visit_once: bool = True, - ignore_self: int = 1, as_edge: bool = False - ) -> Union[Iterator[Union['Traversable', 'Blob']], - Iterator[TraversedTup]]: - """:return: iterator yielding of items found when traversing self - :param predicate: f(i,d) returns False if item i at depth d should not be included in the result - - :param prune: - f(i,d) return True if the search should stop at item i at depth d. - Item i will not be returned. - - :param depth: - define at which level the iteration should not go deeper - if -1, there is no limit - if 0, you would effectively only get self, the root of the iteration - i.e. if 1, you would only get the first level of predecessors/successors - - :param branch_first: - if True, items will be returned branch first, otherwise depth first - - :param visit_once: - if True, items will only be returned once, although they might be encountered - several times. Loops are prevented that way. - - :param ignore_self: - if True, self will be ignored and automatically pruned from - the result. Otherwise it will be the first item to be returned. - If as_edge is True, the source of the first edge is None - - :param as_edge: - if True, return a pair of items, first being the source, second the - destination, i.e. tuple(src, dest) with the edge spanning from - source to destination""" - - """ - Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]] - Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]] - Tree -> Iterator[Union[Blob, Tree, Submodule, - Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]] - - ignore_self=True is_edge=True -> Iterator[item] - ignore_self=True is_edge=False --> Iterator[item] - ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]] - ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]""" - - visited = set() - stack: Deque[TraverseNT] = deque() - stack.append(TraverseNT(0, self, None)) # self is always depth level 0 - - def addToStack(stack: Deque[TraverseNT], - src_item: 'Traversable', - branch_first: bool, - depth: int) -> None: - lst = self._get_intermediate_items(item) - if not lst: # empty list - return None - if branch_first: - stack.extendleft(TraverseNT(depth, i, src_item) for i in lst) - else: - reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1)) - stack.extend(reviter) - # END addToStack local method - - while stack: - d, item, src = stack.pop() # depth of item, item, item_source - - if visit_once and item in visited: - continue - - if visit_once: - visited.add(item) - - rval: Union[TraversedTup, 'Traversable', 'Blob'] - if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item) - rval = (src, item) - else: - rval = item - - if prune(rval, d): - continue - - skipStartItem = ignore_self and (item is self) - if not skipStartItem and predicate(rval, d): - yield rval - - # only continue to next level if this is appropriate ! - nd = d + 1 - if depth > -1 and nd > depth: - continue - - addToStack(stack, item, branch_first, nd) - # END for each item on work stack - - -@ runtime_checkable -class Serializable(Protocol): - - """Defines methods to serialize and deserialize objects from and into a data stream""" - __slots__ = () - - # @abstractmethod - def _serialize(self, stream: 'BytesIO') -> 'Serializable': - """Serialize the data of this object into the given data stream - :note: a serialized object would ``_deserialize`` into the same object - :param stream: a file-like object - :return: self""" - raise NotImplementedError("To be implemented in subclass") - - # @abstractmethod - def _deserialize(self, stream: 'BytesIO') -> 'Serializable': - """Deserialize all information regarding this object from the stream - :param stream: a file-like object - :return: self""" - raise NotImplementedError("To be implemented in subclass") - - -class TraversableIterableObj(IterableObj, Traversable): - __slots__ = () - - TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj] - - def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]: - return super(TraversableIterableObj, self)._list_traverse(* args, **kwargs) - - @ overload # type: ignore - def traverse(self: T_TIobj - ) -> Iterator[T_TIobj]: - ... - - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[True], - as_edge: Literal[False], - ) -> Iterator[T_TIobj]: - ... - - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[False], - as_edge: Literal[True], - ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: - ... - - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], - prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[True], - as_edge: Literal[True], - ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: - ... - - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], - bool] = lambda i, d: True, - prune: Callable[[Union[T_TIobj, TIobj_tuple], int], - bool] = lambda i, d: False, - depth: int = -1, branch_first: bool = True, visit_once: bool = True, - ignore_self: int = 1, as_edge: bool = False - ) -> Union[Iterator[T_TIobj], - Iterator[Tuple[T_TIobj, T_TIobj]], - Iterator[TIobj_tuple]]: - """For documentation, see util.Traversable._traverse()""" - - """ - # To typecheck instead of using cast. - import itertools - from git.types import TypeGuard - def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]: - for x in inp[1]: - if not isinstance(x, tuple) and len(x) != 2: - if all(isinstance(inner, Commit) for inner in x): - continue - return True - - ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge) - ret_tup = itertools.tee(ret, 2) - assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}" - return ret_tup[0] - """ - return cast(Union[Iterator[T_TIobj], - Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], - super(TraversableIterableObj, self)._traverse( - predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore - )) -- cgit v1.2.1 From 481f672baab666d6e2f81e9288a5f3c42c884a8e Mon Sep 17 00:00:00 2001 From: Yobmod Date: Mon, 2 Aug 2021 17:56:06 +0100 Subject: Add __future__.annotations to repo/base.py --- git/objects/util.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'git/objects') diff --git a/git/objects/util.py b/git/objects/util.py index 0b843301..16d4c0ac 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -187,9 +187,7 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: offset = -int(utcoffset.total_seconds()) return int(string_date.astimezone(utc).timestamp()), offset else: - # should just return timestamp, 0? - return int(string_date.astimezone(utc).timestamp()), 0 - # raise ValueError(f"string_date datetime object without tzinfo, {string_date}") + raise ValueError(f"string_date datetime object without tzinfo, {string_date}") # git time try: -- cgit v1.2.1