diff options
author | Sebastian Thiel <sebastian.thiel@icloud.com> | 2022-05-18 07:43:53 +0800 |
---|---|---|
committer | Sebastian Thiel <sebastian.thiel@icloud.com> | 2022-05-18 07:43:53 +0800 |
commit | 21ec529987d10e0010badd37f8da3274167d436f (patch) | |
tree | a3394cfe902ce7edd07c89420c21c13274a2d295 /git/objects/util.py | |
parent | b30720ee4d9762a03eae4fa7cfa4b0190d81784d (diff) | |
download | gitpython-21ec529987d10e0010badd37f8da3274167d436f.tar.gz |
Run everything through 'black'
That way people who use it won't be deterred, while it unifies style
everywhere.
Diffstat (limited to 'git/objects/util.py')
-rw-r--r-- | git/objects/util.py | 362 |
1 files changed, 230 insertions, 132 deletions
diff --git a/git/objects/util.py b/git/objects/util.py index 800eccdf..4ba59c8a 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -7,11 +7,7 @@ from abc import ABC, abstractmethod import warnings -from git.util import ( - IterableList, - IterableObj, - Actor -) +from git.util import IterableList, IterableObj, Actor import re from collections import deque @@ -22,10 +18,24 @@ import calendar from datetime import datetime, timedelta, tzinfo # typing ------------------------------------------------------------ -from typing import (Any, Callable, Deque, Iterator, Generic, NamedTuple, overload, Sequence, # NOQA: F401 - TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast) +from typing import ( + Any, + Callable, + Deque, + Iterator, + Generic, + NamedTuple, + overload, + Sequence, # NOQA: F401 + TYPE_CHECKING, + Tuple, + Type, + TypeVar, + Union, + cast, +) -from git.types import Has_id_attribute, Literal, _T # NOQA: F401 +from git.types import Has_id_attribute, Literal, _T # NOQA: F401 if TYPE_CHECKING: from io import BytesIO, StringIO @@ -46,24 +56,38 @@ else: class TraverseNT(NamedTuple): depth: int - item: Union['Traversable', 'Blob'] - src: Union['Traversable', None] + item: Union["Traversable", "Blob"] + src: Union["Traversable", None] -T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse() +T_TIobj = TypeVar( + "T_TIobj", bound="TraversableIterableObj" +) # for TraversableIterableObj.traverse() -TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule - 'TraversedTreeTup'] # for tree.traverse() +TraversedTup = Union[ + Tuple[Union["Traversable", None], "Traversable"], # for commit, submodule + "TraversedTreeTup", +] # for tree.traverse() # -------------------------------------------------------------------- -__all__ = ('get_object_type_by_name', 'parse_date', 'parse_actor_and_date', - 'ProcessStreamAdapter', 'Traversable', 'altz_to_utctz_str', 'utctz_to_altz', - 'verify_utctz', 'Actor', 'tzoffset', 'utc') +__all__ = ( + "get_object_type_by_name", + "parse_date", + "parse_actor_and_date", + "ProcessStreamAdapter", + "Traversable", + "altz_to_utctz_str", + "utctz_to_altz", + "verify_utctz", + "Actor", + "tzoffset", + "utc", +) ZERO = timedelta(0) -#{ Functions +# { Functions def mode_str_to_int(modestr: Union[bytes, str]) -> int: @@ -82,8 +106,9 @@ def mode_str_to_int(modestr: Union[bytes, str]) -> int: return mode -def get_object_type_by_name(object_type_name: bytes - ) -> Union[Type['Commit'], Type['TagObject'], Type['Tree'], Type['Blob']]: +def get_object_type_by_name( + object_type_name: bytes, +) -> Union[Type["Commit"], Type["TagObject"], Type["Tree"], Type["Blob"]]: """ :return: type suitable to handle the given object type name. Use the type to create new instances. @@ -93,18 +118,24 @@ def get_object_type_by_name(object_type_name: bytes :raise ValueError: In case object_type_name is unknown""" if object_type_name == b"commit": from . import commit + return commit.Commit elif object_type_name == b"tag": from . import tag + return tag.TagObject elif object_type_name == b"blob": from . import blob + return blob.Blob elif object_type_name == b"tree": from . import tree + return tree.Tree else: - raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode()) + raise ValueError( + "Cannot handle unknown object type: %s" % object_type_name.decode() + ) def utctz_to_altz(utctz: str) -> int: @@ -121,7 +152,7 @@ def altz_to_utctz_str(altz: float) -> str: utci = -1 * int((float(altz) / 3600) * 100) utcs = str(abs(utci)) utcs = "0" * (4 - len(utcs)) + utcs - prefix = (utci < 0 and '-') or '+' + prefix = (utci < 0 and "-") or "+" return prefix + utcs @@ -133,22 +164,23 @@ def verify_utctz(offset: str) -> str: raise fmt_exc if offset[0] not in "+-": raise fmt_exc - if offset[1] not in digits or\ - offset[2] not in digits or\ - offset[3] not in digits or\ - offset[4] not in digits: + if ( + offset[1] not in digits + or offset[2] not in digits + or offset[3] not in digits + or offset[4] not in digits + ): raise fmt_exc # END for each char return offset class tzoffset(tzinfo): - def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None: self._offset = timedelta(seconds=-secs_west_of_utc) - self._name = name or 'fixed' + self._name = name or "fixed" - def __reduce__(self) -> Tuple[Type['tzoffset'], Tuple[float, str]]: + def __reduce__(self) -> Tuple[Type["tzoffset"], Tuple[float, str]]: return tzoffset, (-self._offset.total_seconds(), self._name) def utcoffset(self, dt: Union[datetime, None]) -> timedelta: @@ -161,7 +193,7 @@ class tzoffset(tzinfo): return ZERO -utc = tzoffset(0, 'UTC') +utc = tzoffset(0, "UTC") def from_timestamp(timestamp: float, tz_offset: float) -> datetime: @@ -190,23 +222,27 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: """ if isinstance(string_date, datetime): if string_date.tzinfo: - utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None + utcoffset = cast( + timedelta, string_date.utcoffset() + ) # typeguard, if tzinfoand is not None offset = -int(utcoffset.total_seconds()) return int(string_date.astimezone(utc).timestamp()), offset else: - raise ValueError(f"string_date datetime object without tzinfo, {string_date}") + raise ValueError( + f"string_date datetime object without tzinfo, {string_date}" + ) # git time try: - if string_date.count(' ') == 1 and string_date.rfind(':') == -1: + if string_date.count(" ") == 1 and string_date.rfind(":") == -1: timestamp, offset_str = string_date.split() - if timestamp.startswith('@'): + if timestamp.startswith("@"): timestamp = timestamp[1:] timestamp_int = int(timestamp) return timestamp_int, utctz_to_altz(verify_utctz(offset_str)) else: - offset_str = "+0000" # local time by default - if string_date[-5] in '-+': + offset_str = "+0000" # local time by default + if string_date[-5] in "-+": offset_str = verify_utctz(string_date[-5:]) string_date = string_date[:-6] # skip space as well # END split timezone info @@ -215,9 +251,9 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: # now figure out the date and time portion - split time date_formats = [] splitter = -1 - if ',' in string_date: + if "," in string_date: date_formats.append("%a, %d %b %Y") - splitter = string_date.rfind(' ') + splitter = string_date.rfind(" ") else: # iso plus additional date_formats.append("%Y-%m-%d") @@ -225,16 +261,16 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: date_formats.append("%m/%d/%Y") date_formats.append("%d.%m.%Y") - splitter = string_date.rfind('T') + splitter = string_date.rfind("T") if splitter == -1: - splitter = string_date.rfind(' ') + splitter = string_date.rfind(" ") # END handle 'T' and ' ' # END handle rfc or iso assert splitter > -1 # split date and time - time_part = string_date[splitter + 1:] # skip space + time_part = string_date[splitter + 1 :] # skip space date_part = string_date[:splitter] # parse time @@ -243,9 +279,19 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: for fmt in date_formats: try: dtstruct = time.strptime(date_part, fmt) - utctime = calendar.timegm((dtstruct.tm_year, dtstruct.tm_mon, dtstruct.tm_mday, - tstruct.tm_hour, tstruct.tm_min, tstruct.tm_sec, - dtstruct.tm_wday, dtstruct.tm_yday, tstruct.tm_isdst)) + utctime = calendar.timegm( + ( + dtstruct.tm_year, + dtstruct.tm_mon, + dtstruct.tm_mday, + tstruct.tm_hour, + tstruct.tm_min, + tstruct.tm_sec, + dtstruct.tm_wday, + dtstruct.tm_yday, + tstruct.tm_isdst, + ) + ) return int(utctime), offset except ValueError: continue @@ -256,13 +302,15 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: raise ValueError("no format matched") # END handle format except Exception as e: - raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e + raise ValueError( + f"Unsupported date format or type: {string_date}, type={type(string_date)}" + ) from e # END handle exceptions # precompiled regex -_re_actor_epoch = re.compile(r'^.+? (.*) (\d+) ([+-]\d+).*$') -_re_only_actor = re.compile(r'^.+? (.*)$') +_re_actor_epoch = re.compile(r"^.+? (.*) (\d+) ([+-]\d+).*$") +_re_only_actor = re.compile(r"^.+? (.*)$") def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: @@ -271,19 +319,21 @@ def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: author Tom Preston-Werner <tom@mojombo.com> 1191999972 -0700 :return: [Actor, int_seconds_since_epoch, int_timezone_offset]""" - actor, epoch, offset = '', '0', '0' + actor, epoch, offset = "", "0", "0" m = _re_actor_epoch.search(line) if m: actor, epoch, offset = m.groups() else: m = _re_only_actor.search(line) - actor = m.group(1) if m else line or '' + actor = m.group(1) if m else line or "" return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset)) -#} END functions + +# } END functions -#{ Classes +# { Classes + class ProcessStreamAdapter(object): @@ -292,9 +342,10 @@ class ProcessStreamAdapter(object): Use this type to hide the underlying process to provide access only to a specified stream. The process is usually wrapped into an AutoInterrupt class to kill it if the instance goes out of scope.""" + __slots__ = ("_proc", "_stream") - def __init__(self, process: 'Popen', stream_name: str) -> None: + def __init__(self, process: "Popen", stream_name: str) -> None: self._proc = process self._stream: StringIO = getattr(process, stream_name) # guessed type @@ -312,11 +363,12 @@ class Traversable(Protocol): Defined subclasses = [Commit, Tree, SubModule] """ + __slots__ = () @classmethod @abstractmethod - def _get_intermediate_items(cls, item: Any) -> Sequence['Traversable']: + def _get_intermediate_items(cls, item: Any) -> Sequence["Traversable"]: """ Returns: Tuple of items connected to the given item. @@ -331,15 +383,18 @@ class Traversable(Protocol): @abstractmethod def list_traverse(self, *args: Any, **kwargs: Any) -> Any: """ """ - warnings.warn("list_traverse() method should only be called from subclasses." - "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" - "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", - DeprecationWarning, - stacklevel=2) + warnings.warn( + "list_traverse() method should only be called from subclasses." + "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" + "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", + DeprecationWarning, + stacklevel=2, + ) return self._list_traverse(*args, **kwargs) - def _list_traverse(self, as_edge: bool = False, *args: Any, **kwargs: Any - ) -> IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']]: + def _list_traverse( + self, as_edge: bool = False, *args: Any, **kwargs: Any + ) -> IterableList[Union["Commit", "Submodule", "Tree", "Blob"]]: """ :return: IterableList with the results of the traversal as produced by traverse() @@ -352,11 +407,13 @@ class Traversable(Protocol): if isinstance(self, Has_id_attribute): id = self._id_attribute_ else: - id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ + id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ # could add _id_attribute_ to Traversable, or make all Traversable also Iterable? if not as_edge: - out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id) + out: IterableList[ + Union["Commit", "Submodule", "Tree", "Blob"] + ] = IterableList(id) out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) return out # overloads in subclasses (mypy doesn't allow typing self: subclass) @@ -366,23 +423,32 @@ class Traversable(Protocol): out_list: IterableList = IterableList(self.traverse(*args, **kwargs)) return out_list - @ abstractmethod + @abstractmethod def traverse(self, *args: Any, **kwargs: Any) -> Any: """ """ - warnings.warn("traverse() method should only be called from subclasses." - "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" - "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", - DeprecationWarning, - stacklevel=2) + warnings.warn( + "traverse() method should only be called from subclasses." + "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" + "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", + DeprecationWarning, + stacklevel=2, + ) return self._traverse(*args, **kwargs) - def _traverse(self, - predicate: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: True, - prune: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: False, - depth: int = -1, branch_first: bool = True, visit_once: bool = True, - ignore_self: int = 1, as_edge: bool = False - ) -> Union[Iterator[Union['Traversable', 'Blob']], - Iterator[TraversedTup]]: + def _traverse( + self, + predicate: Callable[ + [Union["Traversable", "Blob", TraversedTup], int], bool + ] = lambda i, d: True, + prune: Callable[ + [Union["Traversable", "Blob", TraversedTup], int], bool + ] = lambda i, d: False, + depth: int = -1, + branch_first: bool = True, + visit_once: bool = True, + ignore_self: int = 1, + as_edge: bool = False, + ) -> Union[Iterator[Union["Traversable", "Blob"]], Iterator[TraversedTup]]: """:return: iterator yielding of items found when traversing self :param predicate: f(i,d) returns False if item i at depth d should not be included in the result @@ -426,24 +492,30 @@ class Traversable(Protocol): visited = set() stack: Deque[TraverseNT] = deque() - stack.append(TraverseNT(0, self, None)) # self is always depth level 0 - - def addToStack(stack: Deque[TraverseNT], - src_item: 'Traversable', - branch_first: bool, - depth: int) -> None: + stack.append(TraverseNT(0, self, None)) # self is always depth level 0 + + def addToStack( + stack: Deque[TraverseNT], + src_item: "Traversable", + branch_first: bool, + depth: int, + ) -> None: lst = self._get_intermediate_items(item) - if not lst: # empty list + if not lst: # empty list return None if branch_first: stack.extendleft(TraverseNT(depth, i, src_item) for i in lst) else: - reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1)) + reviter = ( + TraverseNT(depth, lst[i], src_item) + for i in range(len(lst) - 1, -1, -1) + ) stack.extend(reviter) + # END addToStack local method while stack: - d, item, src = stack.pop() # depth of item, item, item_source + d, item, src = stack.pop() # depth of item, item, item_source if visit_once and item in visited: continue @@ -451,8 +523,10 @@ class Traversable(Protocol): if visit_once: visited.add(item) - rval: Union[TraversedTup, 'Traversable', 'Blob'] - if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item) + rval: Union[TraversedTup, "Traversable", "Blob"] + if ( + as_edge + ): # if as_edge return (src, item) unless rrc is None (e.g. for first item) rval = (src, item) else: rval = item @@ -473,14 +547,15 @@ class Traversable(Protocol): # END for each item on work stack -@ runtime_checkable +@runtime_checkable class Serializable(Protocol): """Defines methods to serialize and deserialize objects from and into a data stream""" + __slots__ = () # @abstractmethod - def _serialize(self, stream: 'BytesIO') -> 'Serializable': + def _serialize(self, stream: "BytesIO") -> "Serializable": """Serialize the data of this object into the given data stream :note: a serialized object would ``_deserialize`` into the same object :param stream: a file-like object @@ -488,7 +563,7 @@ class Serializable(Protocol): raise NotImplementedError("To be implemented in subclass") # @abstractmethod - def _deserialize(self, stream: 'BytesIO') -> 'Serializable': + def _deserialize(self, stream: "BytesIO") -> "Serializable": """Deserialize all information regarding this object from the stream :param stream: a file-like object :return: self""" @@ -500,54 +575,76 @@ class TraversableIterableObj(IterableObj, Traversable): TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj] - def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]: - return super(TraversableIterableObj, self)._list_traverse(* args, **kwargs) + def list_traverse( + self: T_TIobj, *args: Any, **kwargs: Any + ) -> IterableList[T_TIobj]: + return super(TraversableIterableObj, self)._list_traverse(*args, **kwargs) - @ overload # type: ignore - def traverse(self: T_TIobj - ) -> Iterator[T_TIobj]: + @overload # type: ignore + def traverse(self: T_TIobj) -> Iterator[T_TIobj]: ... - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[True], - as_edge: Literal[False], - ) -> Iterator[T_TIobj]: + @overload + def traverse( + self: T_TIobj, + predicate: Callable[ + [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool + ], + prune: Callable[ + [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool + ], + depth: int, + branch_first: bool, + visit_once: bool, + ignore_self: Literal[True], + as_edge: Literal[False], + ) -> Iterator[T_TIobj]: ... - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[False], - as_edge: Literal[True], - ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: + @overload + def traverse( + self: T_TIobj, + predicate: Callable[ + [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool + ], + prune: Callable[ + [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool + ], + depth: int, + branch_first: bool, + visit_once: bool, + ignore_self: Literal[False], + as_edge: Literal[True], + ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: ... - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], - prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[True], - as_edge: Literal[True], - ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: + @overload + def traverse( + self: T_TIobj, + predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], + prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], + depth: int, + branch_first: bool, + visit_once: bool, + ignore_self: Literal[True], + as_edge: Literal[True], + ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: ... - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], - bool] = lambda i, d: True, - prune: Callable[[Union[T_TIobj, TIobj_tuple], int], - bool] = lambda i, d: False, - depth: int = -1, branch_first: bool = True, visit_once: bool = True, - ignore_self: int = 1, as_edge: bool = False - ) -> Union[Iterator[T_TIobj], - Iterator[Tuple[T_TIobj, T_TIobj]], - Iterator[TIobj_tuple]]: + def traverse( + self: T_TIobj, + predicate: Callable[ + [Union[T_TIobj, TIobj_tuple], int], bool + ] = lambda i, d: True, + prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: False, + depth: int = -1, + branch_first: bool = True, + visit_once: bool = True, + ignore_self: int = 1, + as_edge: bool = False, + ) -> Union[ + Iterator[T_TIobj], Iterator[Tuple[T_TIobj, T_TIobj]], Iterator[TIobj_tuple] + ]: """For documentation, see util.Traversable._traverse()""" """ @@ -566,8 +663,9 @@ class TraversableIterableObj(IterableObj, Traversable): assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}" return ret_tup[0] """ - return cast(Union[Iterator[T_TIobj], - Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], - super(TraversableIterableObj, self)._traverse( - predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore - )) + return cast( + Union[Iterator[T_TIobj], Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], + super(TraversableIterableObj, self)._traverse( + predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore + ), + ) |