summaryrefslogtreecommitdiff
path: root/git/objects/util.py
diff options
context:
space:
mode:
authorYobmod <yobmod@gmail.com>2021-06-23 02:22:34 +0100
committerYobmod <yobmod@gmail.com>2021-06-23 02:22:34 +0100
commit5b6fe83f4d817a3b73b44df16cfb4f96bd4d9904 (patch)
treee9030835ef3a199a650e9d948c03eea99a09696d /git/objects/util.py
parent7ca97dcef3131a11dd5ef41d674bb6bd36608608 (diff)
downloadgitpython-5b6fe83f4d817a3b73b44df16cfb4f96bd4d9904.tar.gz
Update typing-extensions version in requirements.txt
Diffstat (limited to 'git/objects/util.py')
-rw-r--r--git/objects/util.py129
1 files changed, 91 insertions, 38 deletions
diff --git a/git/objects/util.py b/git/objects/util.py
index d15d83c3..087f0166 100644
--- a/git/objects/util.py
+++ b/git/objects/util.py
@@ -4,19 +4,34 @@
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Module for general utility functions"""
+
from git.util import (
IterableList,
Actor
)
import re
-from collections import deque as Deque
+from collections import deque
from string import digits
import time
import calendar
from datetime import datetime, timedelta, tzinfo
+# typing ------------------------------------------------------------
+from typing import (Any, Callable, Deque, Iterator, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast, overload)
+
+if TYPE_CHECKING:
+ from io import BytesIO, StringIO
+ from .submodule.base import Submodule
+ from .commit import Commit
+ from .blob import Blob
+ from .tag import TagObject
+ from .tree import Tree
+ from subprocess import Popen
+
+# --------------------------------------------------------------------
+
__all__ = ('get_object_type_by_name', 'parse_date', 'parse_actor_and_date',
'ProcessStreamAdapter', 'Traversable', 'altz_to_utctz_str', 'utctz_to_altz',
'verify_utctz', 'Actor', 'tzoffset', 'utc')
@@ -26,7 +41,7 @@ ZERO = timedelta(0)
#{ Functions
-def mode_str_to_int(modestr):
+def mode_str_to_int(modestr: Union[bytes, str]) -> int:
"""
:param modestr: string like 755 or 644 or 100644 - only the last 6 chars will be used
:return:
@@ -36,12 +51,14 @@ def mode_str_to_int(modestr):
for example."""
mode = 0
for iteration, char in enumerate(reversed(modestr[-6:])):
+ char = cast(Union[str, int], char)
mode += int(char) << iteration * 3
# END for each char
return mode
-def get_object_type_by_name(object_type_name):
+def get_object_type_by_name(object_type_name: bytes
+ ) -> Union[Type['Commit'], Type['TagObject'], Type['Tree'], Type['Blob']]:
"""
:return: type suitable to handle the given object type name.
Use the type to create new instances.
@@ -62,10 +79,10 @@ def get_object_type_by_name(object_type_name):
from . import tree
return tree.Tree
else:
- raise ValueError("Cannot handle unknown object type: %s" % object_type_name)
+ raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode())
-def utctz_to_altz(utctz):
+def utctz_to_altz(utctz: str) -> int:
"""we convert utctz to the timezone in seconds, it is the format time.altzone
returns. Git stores it as UTC timezone which has the opposite sign as well,
which explains the -1 * ( that was made explicit here )
@@ -73,7 +90,7 @@ def utctz_to_altz(utctz):
return -1 * int(float(utctz) / 100 * 3600)
-def altz_to_utctz_str(altz):
+def altz_to_utctz_str(altz: int) -> str:
"""As above, but inverses the operation, returning a string that can be used
in commit objects"""
utci = -1 * int((float(altz) / 3600) * 100)
@@ -83,7 +100,7 @@ def altz_to_utctz_str(altz):
return prefix + utcs
-def verify_utctz(offset):
+def verify_utctz(offset: str) -> str:
""":raise ValueError: if offset is incorrect
:return: offset"""
fmt_exc = ValueError("Invalid timezone offset format: %s" % offset)
@@ -101,27 +118,28 @@ def verify_utctz(offset):
class tzoffset(tzinfo):
- def __init__(self, secs_west_of_utc, name=None):
+
+ def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None:
self._offset = timedelta(seconds=-secs_west_of_utc)
self._name = name or 'fixed'
- def __reduce__(self):
+ def __reduce__(self) -> Tuple[Type['tzoffset'], Tuple[float, str]]:
return tzoffset, (-self._offset.total_seconds(), self._name)
- def utcoffset(self, dt):
+ def utcoffset(self, dt) -> timedelta:
return self._offset
- def tzname(self, dt):
+ def tzname(self, dt) -> str:
return self._name
- def dst(self, dt):
+ def dst(self, dt) -> timedelta:
return ZERO
utc = tzoffset(0, 'UTC')
-def from_timestamp(timestamp, tz_offset):
+def from_timestamp(timestamp, tz_offset: float) -> datetime:
"""Converts a timestamp + tz_offset into an aware datetime instance."""
utc_dt = datetime.fromtimestamp(timestamp, utc)
try:
@@ -131,7 +149,7 @@ def from_timestamp(timestamp, tz_offset):
return utc_dt
-def parse_date(string_date):
+def parse_date(string_date: str) -> Tuple[int, int]:
"""
Parse the given date as one of the following
@@ -152,18 +170,18 @@ def parse_date(string_date):
# git time
try:
if string_date.count(' ') == 1 and string_date.rfind(':') == -1:
- timestamp, offset = string_date.split()
+ timestamp, offset_str = string_date.split()
if timestamp.startswith('@'):
timestamp = timestamp[1:]
- timestamp = int(timestamp)
- return timestamp, utctz_to_altz(verify_utctz(offset))
+ timestamp_int = int(timestamp)
+ return timestamp_int, utctz_to_altz(verify_utctz(offset_str))
else:
- offset = "+0000" # local time by default
+ offset_str = "+0000" # local time by default
if string_date[-5] in '-+':
- offset = verify_utctz(string_date[-5:])
+ offset_str = verify_utctz(string_date[-5:])
string_date = string_date[:-6] # skip space as well
# END split timezone info
- offset = utctz_to_altz(offset)
+ offset = utctz_to_altz(offset_str)
# now figure out the date and time portion - split time
date_formats = []
@@ -218,13 +236,13 @@ _re_actor_epoch = re.compile(r'^.+? (.*) (\d+) ([+-]\d+).*$')
_re_only_actor = re.compile(r'^.+? (.*)$')
-def parse_actor_and_date(line):
+def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]:
"""Parse out the actor (author or committer) info from a line like::
author Tom Preston-Werner <tom@mojombo.com> 1191999972 -0700
:return: [Actor, int_seconds_since_epoch, int_timezone_offset]"""
- actor, epoch, offset = '', 0, 0
+ actor, epoch, offset = '', '0', '0'
m = _re_actor_epoch.search(line)
if m:
actor, epoch, offset = m.groups()
@@ -247,11 +265,11 @@ class ProcessStreamAdapter(object):
it if the instance goes out of scope."""
__slots__ = ("_proc", "_stream")
- def __init__(self, process, stream_name):
+ def __init__(self, process: 'Popen', stream_name: str) -> None:
self._proc = process
- self._stream = getattr(process, stream_name)
+ self._stream = getattr(process, stream_name) # type: StringIO ## guess
- def __getattr__(self, attr):
+ def __getattr__(self, attr: str) -> Any:
return getattr(self._stream, attr)
@@ -260,29 +278,61 @@ class Traversable(object):
"""Simple interface to perform depth-first or breadth-first traversals
into one direction.
Subclasses only need to implement one function.
- Instances of the Subclass must be hashable"""
+ Instances of the Subclass must be hashable
+
+ Defined subclasses = [Commit, Tree, SubModule]
+ """
__slots__ = ()
+ @overload
+ @classmethod
+ def _get_intermediate_items(cls, item: 'Commit') -> Tuple['Commit', ...]:
+ ...
+
+ @overload
+ @classmethod
+ def _get_intermediate_items(cls, item: 'Submodule') -> Tuple['Submodule', ...]:
+ ...
+
+ @overload
+ @classmethod
+ def _get_intermediate_items(cls, item: 'Tree') -> Tuple['Tree', ...]:
+ ...
+
+ @overload
+ @classmethod
+ def _get_intermediate_items(cls, item: 'Traversable') -> Tuple['Traversable', ...]:
+ ...
+
@classmethod
- def _get_intermediate_items(cls, item):
+ def _get_intermediate_items(cls, item: 'Traversable'
+ ) -> Sequence['Traversable']:
"""
Returns:
- List of items connected to the given item.
+ Tuple of items connected to the given item.
Must be implemented in subclass
+
+ class Commit:: (cls, Commit) -> Tuple[Commit, ...]
+ class Submodule:: (cls, Submodule) -> Iterablelist[Submodule]
+ class Tree:: (cls, Tree) -> Tuple[Tree, ...]
"""
raise NotImplementedError("To be implemented in subclass")
- def list_traverse(self, *args, **kwargs):
+ def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList:
"""
:return: IterableList with the results of the traversal as produced by
traverse()"""
- out = IterableList(self._id_attribute_)
+ out = IterableList(self._id_attribute_) # type: ignore[attr-defined] # defined in sublcasses
out.extend(self.traverse(*args, **kwargs))
return out
- def traverse(self, predicate=lambda i, d: True,
- prune=lambda i, d: False, depth=-1, branch_first=True,
- visit_once=True, ignore_self=1, as_edge=False):
+ def traverse(self,
+ predicate: Callable[[object, int], bool] = lambda i, d: True,
+ prune: Callable[[object, int], bool] = lambda i, d: False,
+ depth: int = -1,
+ branch_first: bool = True,
+ visit_once: bool = True, ignore_self: int = 1, as_edge: bool = False
+ ) -> Union[Iterator['Traversable'], Iterator[Tuple['Traversable', 'Traversable']]]:
""":return: iterator yielding of items found when traversing self
:param predicate: f(i,d) returns False if item i at depth d should not be included in the result
@@ -314,13 +364,16 @@ class Traversable(object):
destination, i.e. tuple(src, dest) with the edge spanning from
source to destination"""
visited = set()
- stack = Deque()
+ stack = deque() # type: Deque[Tuple[int, Traversable, Union[Traversable, None]]]
stack.append((0, self, None)) # self is always depth level 0
- def addToStack(stack, item, branch_first, depth):
+ def addToStack(stack: Deque[Tuple[int, 'Traversable', Union['Traversable', None]]],
+ item: 'Traversable',
+ branch_first: bool,
+ depth) -> None:
lst = self._get_intermediate_items(item)
if not lst:
- return
+ return None
if branch_first:
stack.extendleft((depth, i, item) for i in lst)
else:
@@ -359,14 +412,14 @@ class Serializable(object):
"""Defines methods to serialize and deserialize objects from and into a data stream"""
__slots__ = ()
- def _serialize(self, stream):
+ def _serialize(self, stream: 'BytesIO') -> 'Serializable':
"""Serialize the data of this object into the given data stream
:note: a serialized object would ``_deserialize`` into the same object
:param stream: a file-like object
:return: self"""
raise NotImplementedError("To be implemented in subclass")
- def _deserialize(self, stream):
+ def _deserialize(self, stream: 'BytesIO') -> 'Serializable':
"""Deserialize all information regarding this object from the stream
:param stream: a file-like object
:return: self"""