diff options
author | Yobmod <yobmod@gmail.com> | 2021-06-23 02:22:34 +0100 |
---|---|---|
committer | Yobmod <yobmod@gmail.com> | 2021-06-23 02:22:34 +0100 |
commit | 5b6fe83f4d817a3b73b44df16cfb4f96bd4d9904 (patch) | |
tree | e9030835ef3a199a650e9d948c03eea99a09696d /git/objects/util.py | |
parent | 7ca97dcef3131a11dd5ef41d674bb6bd36608608 (diff) | |
download | gitpython-5b6fe83f4d817a3b73b44df16cfb4f96bd4d9904.tar.gz |
Update typing-extensions version in requirements.txt
Diffstat (limited to 'git/objects/util.py')
-rw-r--r-- | git/objects/util.py | 129 |
1 files changed, 91 insertions, 38 deletions
diff --git a/git/objects/util.py b/git/objects/util.py index d15d83c3..087f0166 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -4,19 +4,34 @@ # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php """Module for general utility functions""" + from git.util import ( IterableList, Actor ) import re -from collections import deque as Deque +from collections import deque from string import digits import time import calendar from datetime import datetime, timedelta, tzinfo +# typing ------------------------------------------------------------ +from typing import (Any, Callable, Deque, Iterator, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast, overload) + +if TYPE_CHECKING: + from io import BytesIO, StringIO + from .submodule.base import Submodule + from .commit import Commit + from .blob import Blob + from .tag import TagObject + from .tree import Tree + from subprocess import Popen + +# -------------------------------------------------------------------- + __all__ = ('get_object_type_by_name', 'parse_date', 'parse_actor_and_date', 'ProcessStreamAdapter', 'Traversable', 'altz_to_utctz_str', 'utctz_to_altz', 'verify_utctz', 'Actor', 'tzoffset', 'utc') @@ -26,7 +41,7 @@ ZERO = timedelta(0) #{ Functions -def mode_str_to_int(modestr): +def mode_str_to_int(modestr: Union[bytes, str]) -> int: """ :param modestr: string like 755 or 644 or 100644 - only the last 6 chars will be used :return: @@ -36,12 +51,14 @@ def mode_str_to_int(modestr): for example.""" mode = 0 for iteration, char in enumerate(reversed(modestr[-6:])): + char = cast(Union[str, int], char) mode += int(char) << iteration * 3 # END for each char return mode -def get_object_type_by_name(object_type_name): +def get_object_type_by_name(object_type_name: bytes + ) -> Union[Type['Commit'], Type['TagObject'], Type['Tree'], Type['Blob']]: """ :return: type suitable to handle the given object type name. Use the type to create new instances. @@ -62,10 +79,10 @@ def get_object_type_by_name(object_type_name): from . import tree return tree.Tree else: - raise ValueError("Cannot handle unknown object type: %s" % object_type_name) + raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode()) -def utctz_to_altz(utctz): +def utctz_to_altz(utctz: str) -> int: """we convert utctz to the timezone in seconds, it is the format time.altzone returns. Git stores it as UTC timezone which has the opposite sign as well, which explains the -1 * ( that was made explicit here ) @@ -73,7 +90,7 @@ def utctz_to_altz(utctz): return -1 * int(float(utctz) / 100 * 3600) -def altz_to_utctz_str(altz): +def altz_to_utctz_str(altz: int) -> str: """As above, but inverses the operation, returning a string that can be used in commit objects""" utci = -1 * int((float(altz) / 3600) * 100) @@ -83,7 +100,7 @@ def altz_to_utctz_str(altz): return prefix + utcs -def verify_utctz(offset): +def verify_utctz(offset: str) -> str: """:raise ValueError: if offset is incorrect :return: offset""" fmt_exc = ValueError("Invalid timezone offset format: %s" % offset) @@ -101,27 +118,28 @@ def verify_utctz(offset): class tzoffset(tzinfo): - def __init__(self, secs_west_of_utc, name=None): + + def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None: self._offset = timedelta(seconds=-secs_west_of_utc) self._name = name or 'fixed' - def __reduce__(self): + def __reduce__(self) -> Tuple[Type['tzoffset'], Tuple[float, str]]: return tzoffset, (-self._offset.total_seconds(), self._name) - def utcoffset(self, dt): + def utcoffset(self, dt) -> timedelta: return self._offset - def tzname(self, dt): + def tzname(self, dt) -> str: return self._name - def dst(self, dt): + def dst(self, dt) -> timedelta: return ZERO utc = tzoffset(0, 'UTC') -def from_timestamp(timestamp, tz_offset): +def from_timestamp(timestamp, tz_offset: float) -> datetime: """Converts a timestamp + tz_offset into an aware datetime instance.""" utc_dt = datetime.fromtimestamp(timestamp, utc) try: @@ -131,7 +149,7 @@ def from_timestamp(timestamp, tz_offset): return utc_dt -def parse_date(string_date): +def parse_date(string_date: str) -> Tuple[int, int]: """ Parse the given date as one of the following @@ -152,18 +170,18 @@ def parse_date(string_date): # git time try: if string_date.count(' ') == 1 and string_date.rfind(':') == -1: - timestamp, offset = string_date.split() + timestamp, offset_str = string_date.split() if timestamp.startswith('@'): timestamp = timestamp[1:] - timestamp = int(timestamp) - return timestamp, utctz_to_altz(verify_utctz(offset)) + timestamp_int = int(timestamp) + return timestamp_int, utctz_to_altz(verify_utctz(offset_str)) else: - offset = "+0000" # local time by default + offset_str = "+0000" # local time by default if string_date[-5] in '-+': - offset = verify_utctz(string_date[-5:]) + offset_str = verify_utctz(string_date[-5:]) string_date = string_date[:-6] # skip space as well # END split timezone info - offset = utctz_to_altz(offset) + offset = utctz_to_altz(offset_str) # now figure out the date and time portion - split time date_formats = [] @@ -218,13 +236,13 @@ _re_actor_epoch = re.compile(r'^.+? (.*) (\d+) ([+-]\d+).*$') _re_only_actor = re.compile(r'^.+? (.*)$') -def parse_actor_and_date(line): +def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: """Parse out the actor (author or committer) info from a line like:: author Tom Preston-Werner <tom@mojombo.com> 1191999972 -0700 :return: [Actor, int_seconds_since_epoch, int_timezone_offset]""" - actor, epoch, offset = '', 0, 0 + actor, epoch, offset = '', '0', '0' m = _re_actor_epoch.search(line) if m: actor, epoch, offset = m.groups() @@ -247,11 +265,11 @@ class ProcessStreamAdapter(object): it if the instance goes out of scope.""" __slots__ = ("_proc", "_stream") - def __init__(self, process, stream_name): + def __init__(self, process: 'Popen', stream_name: str) -> None: self._proc = process - self._stream = getattr(process, stream_name) + self._stream = getattr(process, stream_name) # type: StringIO ## guess - def __getattr__(self, attr): + def __getattr__(self, attr: str) -> Any: return getattr(self._stream, attr) @@ -260,29 +278,61 @@ class Traversable(object): """Simple interface to perform depth-first or breadth-first traversals into one direction. Subclasses only need to implement one function. - Instances of the Subclass must be hashable""" + Instances of the Subclass must be hashable + + Defined subclasses = [Commit, Tree, SubModule] + """ __slots__ = () + @overload + @classmethod + def _get_intermediate_items(cls, item: 'Commit') -> Tuple['Commit', ...]: + ... + + @overload + @classmethod + def _get_intermediate_items(cls, item: 'Submodule') -> Tuple['Submodule', ...]: + ... + + @overload + @classmethod + def _get_intermediate_items(cls, item: 'Tree') -> Tuple['Tree', ...]: + ... + + @overload + @classmethod + def _get_intermediate_items(cls, item: 'Traversable') -> Tuple['Traversable', ...]: + ... + @classmethod - def _get_intermediate_items(cls, item): + def _get_intermediate_items(cls, item: 'Traversable' + ) -> Sequence['Traversable']: """ Returns: - List of items connected to the given item. + Tuple of items connected to the given item. Must be implemented in subclass + + class Commit:: (cls, Commit) -> Tuple[Commit, ...] + class Submodule:: (cls, Submodule) -> Iterablelist[Submodule] + class Tree:: (cls, Tree) -> Tuple[Tree, ...] """ raise NotImplementedError("To be implemented in subclass") - def list_traverse(self, *args, **kwargs): + def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList: """ :return: IterableList with the results of the traversal as produced by traverse()""" - out = IterableList(self._id_attribute_) + out = IterableList(self._id_attribute_) # type: ignore[attr-defined] # defined in sublcasses out.extend(self.traverse(*args, **kwargs)) return out - def traverse(self, predicate=lambda i, d: True, - prune=lambda i, d: False, depth=-1, branch_first=True, - visit_once=True, ignore_self=1, as_edge=False): + def traverse(self, + predicate: Callable[[object, int], bool] = lambda i, d: True, + prune: Callable[[object, int], bool] = lambda i, d: False, + depth: int = -1, + branch_first: bool = True, + visit_once: bool = True, ignore_self: int = 1, as_edge: bool = False + ) -> Union[Iterator['Traversable'], Iterator[Tuple['Traversable', 'Traversable']]]: """:return: iterator yielding of items found when traversing self :param predicate: f(i,d) returns False if item i at depth d should not be included in the result @@ -314,13 +364,16 @@ class Traversable(object): destination, i.e. tuple(src, dest) with the edge spanning from source to destination""" visited = set() - stack = Deque() + stack = deque() # type: Deque[Tuple[int, Traversable, Union[Traversable, None]]] stack.append((0, self, None)) # self is always depth level 0 - def addToStack(stack, item, branch_first, depth): + def addToStack(stack: Deque[Tuple[int, 'Traversable', Union['Traversable', None]]], + item: 'Traversable', + branch_first: bool, + depth) -> None: lst = self._get_intermediate_items(item) if not lst: - return + return None if branch_first: stack.extendleft((depth, i, item) for i in lst) else: @@ -359,14 +412,14 @@ class Serializable(object): """Defines methods to serialize and deserialize objects from and into a data stream""" __slots__ = () - def _serialize(self, stream): + def _serialize(self, stream: 'BytesIO') -> 'Serializable': """Serialize the data of this object into the given data stream :note: a serialized object would ``_deserialize`` into the same object :param stream: a file-like object :return: self""" raise NotImplementedError("To be implemented in subclass") - def _deserialize(self, stream): + def _deserialize(self, stream: 'BytesIO') -> 'Serializable': """Deserialize all information regarding this object from the stream :param stream: a file-like object :return: self""" |