From 217ba50d150c1ee85b8b7dcb8fbedd93ed5ebd60 Mon Sep 17 00:00:00 2001 From: David Hotham Date: Fri, 30 Jul 2021 12:14:26 +0100 Subject: Fix typing of Head.create_head --- git/repo/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index f8a1689a..bb8ddf13 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -422,7 +422,7 @@ class Repo(object): def create_head(self, path: PathLike, commit: str = 'HEAD', force: bool = False, logmsg: Optional[str] = None - ) -> 'SymbolicReference': + ) -> Head: """Create a new head within the repository. For more documentation, please see the Head.create method. -- cgit v1.2.1 From ee2704b463cc7592df2be295150824260e83e491 Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 11:45:14 +0100 Subject: Update util.py --- git/objects/util.py | 634 ++++++++-------------------------------------------- 1 file changed, 94 insertions(+), 540 deletions(-) (limited to 'git') diff --git a/git/objects/util.py b/git/objects/util.py index ef1ae77b..4f8af553 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -1,557 +1,111 @@ -# util.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php -"""Module for general utility functions""" - -from abc import abstractmethod -import warnings -from git.util import ( - IterableList, - IterableObj, - Actor -) - -import re -from collections import deque - -from string import digits -import time -import calendar -from datetime import datetime, timedelta, tzinfo - -# typing ------------------------------------------------------------ -from typing import (Any, Callable, Deque, Iterator, NamedTuple, overload, Sequence, - TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast) - -from git.types import Has_id_attribute, Literal, Protocol, runtime_checkable +"""Module containing index utilities""" +from functools import wraps +import os +import struct +import tempfile + +from git.compat import is_win + +import os.path as osp + + +# typing ---------------------------------------------------------------------- + +from typing import (Any, Callable, TYPE_CHECKING) + +from git.types import PathLike, _T if TYPE_CHECKING: - from io import BytesIO, StringIO - from .commit import Commit - from .blob import Blob - from .tag import TagObject - from .tree import Tree, TraversedTreeTup - from subprocess import Popen - from .submodule.base import Submodule + from git.index import IndexFile +# --------------------------------------------------------------------------------- -class TraverseNT(NamedTuple): - depth: int - item: Union['Traversable', 'Blob'] - src: Union['Traversable', None] +__all__ = ('TemporaryFileSwap', 'post_clear_cache', 'default_index', 'git_working_dir') -T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse() +#{ Aliases +pack = struct.pack +unpack = struct.unpack -TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule - 'TraversedTreeTup'] # for tree.traverse() -# -------------------------------------------------------------------- +#} END aliases -__all__ = ('get_object_type_by_name', 'parse_date', 'parse_actor_and_date', - 'ProcessStreamAdapter', 'Traversable', 'altz_to_utctz_str', 'utctz_to_altz', - 'verify_utctz', 'Actor', 'tzoffset', 'utc') +class TemporaryFileSwap(object): -ZERO = timedelta(0) + """Utility class moving a file to a temporary location within the same directory + and moving it back on to where on object deletion.""" + __slots__ = ("file_path", "tmp_file_path") -#{ Functions + def __init__(self, file_path: PathLike) -> None: + self.file_path = file_path + self.tmp_file_path = str(self.file_path) + tempfile.mktemp('', '', '') + # it may be that the source does not exist + try: + os.rename(self.file_path, self.tmp_file_path) + except OSError: + pass + def __del__(self) -> None: + if osp.isfile(self.tmp_file_path): + if is_win and osp.exists(self.file_path): + os.remove(self.file_path) + os.rename(self.tmp_file_path, self.file_path) + # END temp file exists -def mode_str_to_int(modestr: Union[bytes, str]) -> int: - """ - :param modestr: string like 755 or 644 or 100644 - only the last 6 chars will be used - :return: - String identifying a mode compatible to the mode methods ids of the - stat module regarding the rwx permissions for user, group and other, - special flags and file system flags, i.e. whether it is a symlink - for example.""" - mode = 0 - for iteration, char in enumerate(reversed(modestr[-6:])): - char = cast(Union[str, int], char) - mode += int(char) << iteration * 3 - # END for each char - return mode - - -def get_object_type_by_name(object_type_name: bytes - ) -> Union[Type['Commit'], Type['TagObject'], Type['Tree'], Type['Blob']]: - """ - :return: type suitable to handle the given object type name. - Use the type to create new instances. - - :param object_type_name: Member of TYPES - - :raise ValueError: In case object_type_name is unknown""" - if object_type_name == b"commit": - from . import commit - return commit.Commit - elif object_type_name == b"tag": - from . import tag - return tag.TagObject - elif object_type_name == b"blob": - from . import blob - return blob.Blob - elif object_type_name == b"tree": - from . import tree - return tree.Tree - else: - raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode()) - - -def utctz_to_altz(utctz: str) -> int: - """we convert utctz to the timezone in seconds, it is the format time.altzone - returns. Git stores it as UTC timezone which has the opposite sign as well, - which explains the -1 * ( that was made explicit here ) - :param utctz: git utc timezone string, i.e. +0200""" - return -1 * int(float(utctz) / 100 * 3600) - - -def altz_to_utctz_str(altz: float) -> str: - """As above, but inverses the operation, returning a string that can be used - in commit objects""" - utci = -1 * int((float(altz) / 3600) * 100) - utcs = str(abs(utci)) - utcs = "0" * (4 - len(utcs)) + utcs - prefix = (utci < 0 and '-') or '+' - return prefix + utcs - - -def verify_utctz(offset: str) -> str: - """:raise ValueError: if offset is incorrect - :return: offset""" - fmt_exc = ValueError("Invalid timezone offset format: %s" % offset) - if len(offset) != 5: - raise fmt_exc - if offset[0] not in "+-": - raise fmt_exc - if offset[1] not in digits or\ - offset[2] not in digits or\ - offset[3] not in digits or\ - offset[4] not in digits: - raise fmt_exc - # END for each char - return offset - - -class tzoffset(tzinfo): - - def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None: - self._offset = timedelta(seconds=-secs_west_of_utc) - self._name = name or 'fixed' - - def __reduce__(self) -> Tuple[Type['tzoffset'], Tuple[float, str]]: - return tzoffset, (-self._offset.total_seconds(), self._name) - - def utcoffset(self, dt) -> timedelta: - return self._offset - - def tzname(self, dt) -> str: - return self._name - - def dst(self, dt) -> timedelta: - return ZERO - - -utc = tzoffset(0, 'UTC') - - -def from_timestamp(timestamp, tz_offset: float) -> datetime: - """Converts a timestamp + tz_offset into an aware datetime instance.""" - utc_dt = datetime.fromtimestamp(timestamp, utc) - try: - local_dt = utc_dt.astimezone(tzoffset(tz_offset)) - return local_dt - except ValueError: - return utc_dt - - -def parse_date(string_date: str) -> Tuple[int, int]: - """ - Parse the given date as one of the following - * aware datetime instance - * Git internal format: timestamp offset - * RFC 2822: Thu, 07 Apr 2005 22:13:13 +0200. - * ISO 8601 2005-04-07T22:13:13 - The T can be a space as well +#{ Decorators - :return: Tuple(int(timestamp_UTC), int(offset)), both in seconds since epoch - :raise ValueError: If the format could not be understood - :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. - """ - if isinstance(string_date, datetime) and string_date.tzinfo: - offset = -int(string_date.utcoffset().total_seconds()) - return int(string_date.astimezone(utc).timestamp()), offset - - # git time - try: - if string_date.count(' ') == 1 and string_date.rfind(':') == -1: - timestamp, offset_str = string_date.split() - if timestamp.startswith('@'): - timestamp = timestamp[1:] - timestamp_int = int(timestamp) - return timestamp_int, utctz_to_altz(verify_utctz(offset_str)) - else: - offset_str = "+0000" # local time by default - if string_date[-5] in '-+': - offset_str = verify_utctz(string_date[-5:]) - string_date = string_date[:-6] # skip space as well - # END split timezone info - offset = utctz_to_altz(offset_str) - - # now figure out the date and time portion - split time - date_formats = [] - splitter = -1 - if ',' in string_date: - date_formats.append("%a, %d %b %Y") - splitter = string_date.rfind(' ') - else: - # iso plus additional - date_formats.append("%Y-%m-%d") - date_formats.append("%Y.%m.%d") - date_formats.append("%m/%d/%Y") - date_formats.append("%d.%m.%Y") - - splitter = string_date.rfind('T') - if splitter == -1: - splitter = string_date.rfind(' ') - # END handle 'T' and ' ' - # END handle rfc or iso - - assert splitter > -1 - - # split date and time - time_part = string_date[splitter + 1:] # skip space - date_part = string_date[:splitter] - - # parse time - tstruct = time.strptime(time_part, "%H:%M:%S") - - for fmt in date_formats: - try: - dtstruct = time.strptime(date_part, fmt) - utctime = calendar.timegm((dtstruct.tm_year, dtstruct.tm_mon, dtstruct.tm_mday, - tstruct.tm_hour, tstruct.tm_min, tstruct.tm_sec, - dtstruct.tm_wday, dtstruct.tm_yday, tstruct.tm_isdst)) - return int(utctime), offset - except ValueError: - continue - # END exception handling - # END for each fmt - - # still here ? fail - raise ValueError("no format matched") - # END handle format - except Exception as e: - raise ValueError("Unsupported date format: %s" % string_date) from e - # END handle exceptions - - -# precompiled regex -_re_actor_epoch = re.compile(r'^.+? (.*) (\d+) ([+-]\d+).*$') -_re_only_actor = re.compile(r'^.+? (.*)$') - - -def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: - """Parse out the actor (author or committer) info from a line like:: - - author Tom Preston-Werner 1191999972 -0700 - - :return: [Actor, int_seconds_since_epoch, int_timezone_offset]""" - actor, epoch, offset = '', '0', '0' - m = _re_actor_epoch.search(line) - if m: - actor, epoch, offset = m.groups() - else: - m = _re_only_actor.search(line) - actor = m.group(1) if m else line or '' - return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset)) - -#} END functions - - -#{ Classes - -class ProcessStreamAdapter(object): - - """Class wireing all calls to the contained Process instance. - - Use this type to hide the underlying process to provide access only to a specified - stream. The process is usually wrapped into an AutoInterrupt class to kill - it if the instance goes out of scope.""" - __slots__ = ("_proc", "_stream") - - def __init__(self, process: 'Popen', stream_name: str) -> None: - self._proc = process - self._stream: StringIO = getattr(process, stream_name) # guessed type - - def __getattr__(self, attr: str) -> Any: - return getattr(self._stream, attr) - - -@runtime_checkable -class Traversable(Protocol): - - """Simple interface to perform depth-first or breadth-first traversals - into one direction. - Subclasses only need to implement one function. - Instances of the Subclass must be hashable +def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]: + """Decorator for functions that alter the index using the git command. This would + invalidate our possibly existing entries dictionary which is why it must be + deleted to allow it to be lazily reread later. - Defined subclasses = [Commit, Tree, SubModule] + :note: + This decorator will not be required once all functions are implemented + natively which in fact is possible, but probably not feasible performance wise. """ - __slots__ = () - - @classmethod - @abstractmethod - def _get_intermediate_items(cls, item) -> Sequence['Traversable']: - """ - Returns: - Tuple of items connected to the given item. - Must be implemented in subclass - - class Commit:: (cls, Commit) -> Tuple[Commit, ...] - class Submodule:: (cls, Submodule) -> Iterablelist[Submodule] - class Tree:: (cls, Tree) -> Tuple[Tree, ...] - """ - raise NotImplementedError("To be implemented in subclass") - - @abstractmethod - def list_traverse(self, *args: Any, **kwargs: Any) -> Any: - """ """ - warnings.warn("list_traverse() method should only be called from subclasses." - "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" - "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", - DeprecationWarning, - stacklevel=2) - return self._list_traverse(*args, **kwargs) - - def _list_traverse(self, as_edge=False, *args: Any, **kwargs: Any - ) -> IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']]: - """ - :return: IterableList with the results of the traversal as produced by - traverse() - Commit -> IterableList['Commit'] - Submodule -> IterableList['Submodule'] - Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']] - """ - # Commit and Submodule have id.__attribute__ as IterableObj - # Tree has id.__attribute__ inherited from IndexObject - if isinstance(self, (TraversableIterableObj, Has_id_attribute)): - id = self._id_attribute_ - else: - id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ - # could add _id_attribute_ to Traversable, or make all Traversable also Iterable? - - if not as_edge: - out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id) - out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) # type: ignore - return out - # overloads in subclasses (mypy does't allow typing self: subclass) - # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]] - else: - # Raise deprecationwarning, doesn't make sense to use this - out_list: IterableList = IterableList(self.traverse(*args, **kwargs)) - return out_list - - @ abstractmethod - def traverse(self, *args: Any, **kwargs: Any) -> Any: - """ """ - warnings.warn("traverse() method should only be called from subclasses." - "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" - "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", - DeprecationWarning, - stacklevel=2) - return self._traverse(*args, **kwargs) - - def _traverse(self, - predicate: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: True, - prune: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: False, - depth: int = -1, branch_first: bool = True, visit_once: bool = True, - ignore_self: int = 1, as_edge: bool = False - ) -> Union[Iterator[Union['Traversable', 'Blob']], - Iterator[TraversedTup]]: - """:return: iterator yielding of items found when traversing self - :param predicate: f(i,d) returns False if item i at depth d should not be included in the result - - :param prune: - f(i,d) return True if the search should stop at item i at depth d. - Item i will not be returned. - - :param depth: - define at which level the iteration should not go deeper - if -1, there is no limit - if 0, you would effectively only get self, the root of the iteration - i.e. if 1, you would only get the first level of predecessors/successors - - :param branch_first: - if True, items will be returned branch first, otherwise depth first - - :param visit_once: - if True, items will only be returned once, although they might be encountered - several times. Loops are prevented that way. - - :param ignore_self: - if True, self will be ignored and automatically pruned from - the result. Otherwise it will be the first item to be returned. - If as_edge is True, the source of the first edge is None - - :param as_edge: - if True, return a pair of items, first being the source, second the - destination, i.e. tuple(src, dest) with the edge spanning from - source to destination""" - - """ - Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]] - Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]] - Tree -> Iterator[Union[Blob, Tree, Submodule, - Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]] - - ignore_self=True is_edge=True -> Iterator[item] - ignore_self=True is_edge=False --> Iterator[item] - ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]] - ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]""" - - visited = set() - stack: Deque[TraverseNT] = deque() - stack.append(TraverseNT(0, self, None)) # self is always depth level 0 - - def addToStack(stack: Deque[TraverseNT], - src_item: 'Traversable', - branch_first: bool, - depth: int) -> None: - lst = self._get_intermediate_items(item) - if not lst: # empty list - return None - if branch_first: - stack.extendleft(TraverseNT(depth, i, src_item) for i in lst) - else: - reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1)) - stack.extend(reviter) - # END addToStack local method - - while stack: - d, item, src = stack.pop() # depth of item, item, item_source - - if visit_once and item in visited: - continue - - if visit_once: - visited.add(item) - - rval: Union[TraversedTup, 'Traversable', 'Blob'] - if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item) - rval = (src, item) - else: - rval = item - - if prune(rval, d): - continue - - skipStartItem = ignore_self and (item is self) - if not skipStartItem and predicate(rval, d): - yield rval - - # only continue to next level if this is appropriate ! - nd = d + 1 - if depth > -1 and nd > depth: - continue - - addToStack(stack, item, branch_first, nd) - # END for each item on work stack - - -@ runtime_checkable -class Serializable(Protocol): - - """Defines methods to serialize and deserialize objects from and into a data stream""" - __slots__ = () - - # @abstractmethod - def _serialize(self, stream: 'BytesIO') -> 'Serializable': - """Serialize the data of this object into the given data stream - :note: a serialized object would ``_deserialize`` into the same object - :param stream: a file-like object - :return: self""" - raise NotImplementedError("To be implemented in subclass") - - # @abstractmethod - def _deserialize(self, stream: 'BytesIO') -> 'Serializable': - """Deserialize all information regarding this object from the stream - :param stream: a file-like object - :return: self""" - raise NotImplementedError("To be implemented in subclass") - - -class TraversableIterableObj(IterableObj, Traversable): - __slots__ = () - - TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj] - - def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]: - return super(TraversableIterableObj, self)._list_traverse(* args, **kwargs) - - @ overload # type: ignore - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[True], - as_edge: Literal[False], - ) -> Iterator[T_TIobj]: - ... - - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[False], - as_edge: Literal[True], - ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: - ... - - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], - prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[True], - as_edge: Literal[True], - ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: - ... - - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], - bool] = lambda i, d: True, - prune: Callable[[Union[T_TIobj, TIobj_tuple], int], - bool] = lambda i, d: False, - depth: int = -1, branch_first: bool = True, visit_once: bool = True, - ignore_self: int = 1, as_edge: bool = False - ) -> Union[Iterator[T_TIobj], - Iterator[Tuple[T_TIobj, T_TIobj]], - Iterator[TIobj_tuple]]: - """For documentation, see util.Traversable._traverse()""" - - """ - # To typecheck instead of using cast. - import itertools - from git.types import TypeGuard - def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]: - for x in inp[1]: - if not isinstance(x, tuple) and len(x) != 2: - if all(isinstance(inner, Commit) for inner in x): - continue - return True - - ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge) - ret_tup = itertools.tee(ret, 2) - assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}" - return ret_tup[0] - """ - return cast(Union[Iterator[T_TIobj], - Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], - super(TraversableIterableObj, self)._traverse( - predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore - )) + + @wraps(func) + def post_clear_cache_if_not_raised(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: + rval = func(self, *args, **kwargs) + self._delete_entries_cache() + return rval + # END wrapper method + + return post_clear_cache_if_not_raised + + +def default_index(func: Callable[..., _T]) -> Callable[..., _T]: + """Decorator assuring the wrapped method may only run if we are the default + repository index. This is as we rely on git commands that operate + on that index only. """ + + @wraps(func) + def check_default_index(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: + if self._file_path != self._index_path(): + raise AssertionError( + "Cannot call %r on indices that do not represent the default git index" % func.__name__) + return func(self, *args, **kwargs) + # END wrapper method + + return check_default_index + + +def git_working_dir(func: Callable[..., _T]) -> Callable[..., _T]: + """Decorator which changes the current working dir to the one of the git + repository in order to assure relative paths are handled correctly""" + + @wraps(func) + def set_git_working_dir(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: + cur_wd = os.getcwd() + os.chdir(str(self.repo.working_tree_dir)) + try: + return func(self, *args, **kwargs) + finally: + os.chdir(cur_wd) + # END handle working dir + # END wrapper + + return set_git_working_dir + +#} END decorators -- cgit v1.2.1 From 9b9bfc2af1be03f5c006c2c79ec2d21e4f66f468 Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 11:45:50 +0100 Subject: Update util.py --- git/objects/util.py | 639 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 545 insertions(+), 94 deletions(-) (limited to 'git') diff --git a/git/objects/util.py b/git/objects/util.py index 4f8af553..db7807c2 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -1,111 +1,562 @@ -"""Module containing index utilities""" -from functools import wraps -import os -import struct -import tempfile - -from git.compat import is_win - -import os.path as osp - - -# typing ---------------------------------------------------------------------- - -from typing import (Any, Callable, TYPE_CHECKING) - -from git.types import PathLike, _T +# util.py +# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors +# +# This module is part of GitPython and is released under +# the BSD License: http://www.opensource.org/licenses/bsd-license.php +"""Module for general utility functions""" + +from abc import abstractmethod +import warnings +from git.util import ( + IterableList, + IterableObj, + Actor +) + +import re +from collections import deque + +from string import digits +import time +import calendar +from datetime import datetime, timedelta, tzinfo + +# typing ------------------------------------------------------------ +from typing import (Any, Callable, Deque, Iterator, NamedTuple, overload, Sequence, + TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast) + +from git.types import Has_id_attribute, Literal, Protocol, runtime_checkable if TYPE_CHECKING: - from git.index import IndexFile + from io import BytesIO, StringIO + from .commit import Commit + from .blob import Blob + from .tag import TagObject + from .tree import Tree, TraversedTreeTup + from subprocess import Popen + from .submodule.base import Submodule -# --------------------------------------------------------------------------------- +class TraverseNT(NamedTuple): + depth: int + item: Union['Traversable', 'Blob'] + src: Union['Traversable', None] -__all__ = ('TemporaryFileSwap', 'post_clear_cache', 'default_index', 'git_working_dir') -#{ Aliases -pack = struct.pack -unpack = struct.unpack +T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse() +TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule + 'TraversedTreeTup'] # for tree.traverse() -#} END aliases +# -------------------------------------------------------------------- -class TemporaryFileSwap(object): +__all__ = ('get_object_type_by_name', 'parse_date', 'parse_actor_and_date', + 'ProcessStreamAdapter', 'Traversable', 'altz_to_utctz_str', 'utctz_to_altz', + 'verify_utctz', 'Actor', 'tzoffset', 'utc') - """Utility class moving a file to a temporary location within the same directory - and moving it back on to where on object deletion.""" - __slots__ = ("file_path", "tmp_file_path") +ZERO = timedelta(0) - def __init__(self, file_path: PathLike) -> None: - self.file_path = file_path - self.tmp_file_path = str(self.file_path) + tempfile.mktemp('', '', '') - # it may be that the source does not exist - try: - os.rename(self.file_path, self.tmp_file_path) - except OSError: - pass +#{ Functions - def __del__(self) -> None: - if osp.isfile(self.tmp_file_path): - if is_win and osp.exists(self.file_path): - os.remove(self.file_path) - os.rename(self.tmp_file_path, self.file_path) - # END temp file exists - -#{ Decorators - -def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]: - """Decorator for functions that alter the index using the git command. This would - invalidate our possibly existing entries dictionary which is why it must be - deleted to allow it to be lazily reread later. - - :note: - This decorator will not be required once all functions are implemented - natively which in fact is possible, but probably not feasible performance wise. +def mode_str_to_int(modestr: Union[bytes, str]) -> int: """ + :param modestr: string like 755 or 644 or 100644 - only the last 6 chars will be used + :return: + String identifying a mode compatible to the mode methods ids of the + stat module regarding the rwx permissions for user, group and other, + special flags and file system flags, i.e. whether it is a symlink + for example.""" + mode = 0 + for iteration, char in enumerate(reversed(modestr[-6:])): + char = cast(Union[str, int], char) + mode += int(char) << iteration * 3 + # END for each char + return mode + + +def get_object_type_by_name(object_type_name: bytes + ) -> Union[Type['Commit'], Type['TagObject'], Type['Tree'], Type['Blob']]: + """ + :return: type suitable to handle the given object type name. + Use the type to create new instances. + + :param object_type_name: Member of TYPES + + :raise ValueError: In case object_type_name is unknown""" + if object_type_name == b"commit": + from . import commit + return commit.Commit + elif object_type_name == b"tag": + from . import tag + return tag.TagObject + elif object_type_name == b"blob": + from . import blob + return blob.Blob + elif object_type_name == b"tree": + from . import tree + return tree.Tree + else: + raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode()) + + +def utctz_to_altz(utctz: str) -> int: + """we convert utctz to the timezone in seconds, it is the format time.altzone + returns. Git stores it as UTC timezone which has the opposite sign as well, + which explains the -1 * ( that was made explicit here ) + :param utctz: git utc timezone string, i.e. +0200""" + return -1 * int(float(utctz) / 100 * 3600) + + +def altz_to_utctz_str(altz: float) -> str: + """As above, but inverses the operation, returning a string that can be used + in commit objects""" + utci = -1 * int((float(altz) / 3600) * 100) + utcs = str(abs(utci)) + utcs = "0" * (4 - len(utcs)) + utcs + prefix = (utci < 0 and '-') or '+' + return prefix + utcs + + +def verify_utctz(offset: str) -> str: + """:raise ValueError: if offset is incorrect + :return: offset""" + fmt_exc = ValueError("Invalid timezone offset format: %s" % offset) + if len(offset) != 5: + raise fmt_exc + if offset[0] not in "+-": + raise fmt_exc + if offset[1] not in digits or\ + offset[2] not in digits or\ + offset[3] not in digits or\ + offset[4] not in digits: + raise fmt_exc + # END for each char + return offset + + +class tzoffset(tzinfo): + + def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None: + self._offset = timedelta(seconds=-secs_west_of_utc) + self._name = name or 'fixed' + + def __reduce__(self) -> Tuple[Type['tzoffset'], Tuple[float, str]]: + return tzoffset, (-self._offset.total_seconds(), self._name) + + def utcoffset(self, dt) -> timedelta: + return self._offset + + def tzname(self, dt) -> str: + return self._name + + def dst(self, dt) -> timedelta: + return ZERO + + +utc = tzoffset(0, 'UTC') + + +def from_timestamp(timestamp, tz_offset: float) -> datetime: + """Converts a timestamp + tz_offset into an aware datetime instance.""" + utc_dt = datetime.fromtimestamp(timestamp, utc) + try: + local_dt = utc_dt.astimezone(tzoffset(tz_offset)) + return local_dt + except ValueError: + return utc_dt + + +def parse_date(string_date: str) -> Tuple[int, int]: + """ + Parse the given date as one of the following - @wraps(func) - def post_clear_cache_if_not_raised(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: - rval = func(self, *args, **kwargs) - self._delete_entries_cache() - return rval - # END wrapper method - - return post_clear_cache_if_not_raised - - -def default_index(func: Callable[..., _T]) -> Callable[..., _T]: - """Decorator assuring the wrapped method may only run if we are the default - repository index. This is as we rely on git commands that operate - on that index only. """ - - @wraps(func) - def check_default_index(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: - if self._file_path != self._index_path(): - raise AssertionError( - "Cannot call %r on indices that do not represent the default git index" % func.__name__) - return func(self, *args, **kwargs) - # END wrapper method - - return check_default_index - - -def git_working_dir(func: Callable[..., _T]) -> Callable[..., _T]: - """Decorator which changes the current working dir to the one of the git - repository in order to assure relative paths are handled correctly""" - - @wraps(func) - def set_git_working_dir(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: - cur_wd = os.getcwd() - os.chdir(str(self.repo.working_tree_dir)) - try: - return func(self, *args, **kwargs) - finally: - os.chdir(cur_wd) - # END handle working dir - # END wrapper + * aware datetime instance + * Git internal format: timestamp offset + * RFC 2822: Thu, 07 Apr 2005 22:13:13 +0200. + * ISO 8601 2005-04-07T22:13:13 + The T can be a space as well - return set_git_working_dir + :return: Tuple(int(timestamp_UTC), int(offset)), both in seconds since epoch + :raise ValueError: If the format could not be understood + :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. + """ + if isinstance(string_date, datetime) and string_date.tzinfo: + offset = -int(string_date.utcoffset().total_seconds()) + return int(string_date.astimezone(utc).timestamp()), offset + + # git time + try: + if string_date.count(' ') == 1 and string_date.rfind(':') == -1: + timestamp, offset_str = string_date.split() + if timestamp.startswith('@'): + timestamp = timestamp[1:] + timestamp_int = int(timestamp) + return timestamp_int, utctz_to_altz(verify_utctz(offset_str)) + else: + offset_str = "+0000" # local time by default + if string_date[-5] in '-+': + offset_str = verify_utctz(string_date[-5:]) + string_date = string_date[:-6] # skip space as well + # END split timezone info + offset = utctz_to_altz(offset_str) + + # now figure out the date and time portion - split time + date_formats = [] + splitter = -1 + if ',' in string_date: + date_formats.append("%a, %d %b %Y") + splitter = string_date.rfind(' ') + else: + # iso plus additional + date_formats.append("%Y-%m-%d") + date_formats.append("%Y.%m.%d") + date_formats.append("%m/%d/%Y") + date_formats.append("%d.%m.%Y") + + splitter = string_date.rfind('T') + if splitter == -1: + splitter = string_date.rfind(' ') + # END handle 'T' and ' ' + # END handle rfc or iso + + assert splitter > -1 + + # split date and time + time_part = string_date[splitter + 1:] # skip space + date_part = string_date[:splitter] + + # parse time + tstruct = time.strptime(time_part, "%H:%M:%S") + + for fmt in date_formats: + try: + dtstruct = time.strptime(date_part, fmt) + utctime = calendar.timegm((dtstruct.tm_year, dtstruct.tm_mon, dtstruct.tm_mday, + tstruct.tm_hour, tstruct.tm_min, tstruct.tm_sec, + dtstruct.tm_wday, dtstruct.tm_yday, tstruct.tm_isdst)) + return int(utctime), offset + except ValueError: + continue + # END exception handling + # END for each fmt + + # still here ? fail + raise ValueError("no format matched") + # END handle format + except Exception as e: + raise ValueError("Unsupported date format: %s" % string_date) from e + # END handle exceptions + + +# precompiled regex +_re_actor_epoch = re.compile(r'^.+? (.*) (\d+) ([+-]\d+).*$') +_re_only_actor = re.compile(r'^.+? (.*)$') + + +def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: + """Parse out the actor (author or committer) info from a line like:: + + author Tom Preston-Werner 1191999972 -0700 + + :return: [Actor, int_seconds_since_epoch, int_timezone_offset]""" + actor, epoch, offset = '', '0', '0' + m = _re_actor_epoch.search(line) + if m: + actor, epoch, offset = m.groups() + else: + m = _re_only_actor.search(line) + actor = m.group(1) if m else line or '' + return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset)) + +#} END functions + + +#{ Classes + +class ProcessStreamAdapter(object): + + """Class wireing all calls to the contained Process instance. + + Use this type to hide the underlying process to provide access only to a specified + stream. The process is usually wrapped into an AutoInterrupt class to kill + it if the instance goes out of scope.""" + __slots__ = ("_proc", "_stream") + + def __init__(self, process: 'Popen', stream_name: str) -> None: + self._proc = process + self._stream: StringIO = getattr(process, stream_name) # guessed type + + def __getattr__(self, attr: str) -> Any: + return getattr(self._stream, attr) + + +@runtime_checkable +class Traversable(Protocol): + + """Simple interface to perform depth-first or breadth-first traversals + into one direction. + Subclasses only need to implement one function. + Instances of the Subclass must be hashable -#} END decorators + Defined subclasses = [Commit, Tree, SubModule] + """ + __slots__ = () + + @classmethod + @abstractmethod + def _get_intermediate_items(cls, item) -> Sequence['Traversable']: + """ + Returns: + Tuple of items connected to the given item. + Must be implemented in subclass + + class Commit:: (cls, Commit) -> Tuple[Commit, ...] + class Submodule:: (cls, Submodule) -> Iterablelist[Submodule] + class Tree:: (cls, Tree) -> Tuple[Tree, ...] + """ + raise NotImplementedError("To be implemented in subclass") + + @abstractmethod + def list_traverse(self, *args: Any, **kwargs: Any) -> Any: + """ """ + warnings.warn("list_traverse() method should only be called from subclasses." + "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" + "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", + DeprecationWarning, + stacklevel=2) + return self._list_traverse(*args, **kwargs) + + def _list_traverse(self, as_edge=False, *args: Any, **kwargs: Any + ) -> IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']]: + """ + :return: IterableList with the results of the traversal as produced by + traverse() + Commit -> IterableList['Commit'] + Submodule -> IterableList['Submodule'] + Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']] + """ + # Commit and Submodule have id.__attribute__ as IterableObj + # Tree has id.__attribute__ inherited from IndexObject + if isinstance(self, (TraversableIterableObj, Has_id_attribute)): + id = self._id_attribute_ + else: + id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ + # could add _id_attribute_ to Traversable, or make all Traversable also Iterable? + + if not as_edge: + out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id) + out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) # type: ignore + return out + # overloads in subclasses (mypy does't allow typing self: subclass) + # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]] + else: + # Raise deprecationwarning, doesn't make sense to use this + out_list: IterableList = IterableList(self.traverse(*args, **kwargs)) + return out_list + + @ abstractmethod + def traverse(self, *args: Any, **kwargs: Any) -> Any: + """ """ + warnings.warn("traverse() method should only be called from subclasses." + "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" + "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", + DeprecationWarning, + stacklevel=2) + return self._traverse(*args, **kwargs) + + def _traverse(self, + predicate: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: True, + prune: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: False, + depth: int = -1, branch_first: bool = True, visit_once: bool = True, + ignore_self: int = 1, as_edge: bool = False + ) -> Union[Iterator[Union['Traversable', 'Blob']], + Iterator[TraversedTup]]: + """:return: iterator yielding of items found when traversing self + :param predicate: f(i,d) returns False if item i at depth d should not be included in the result + + :param prune: + f(i,d) return True if the search should stop at item i at depth d. + Item i will not be returned. + + :param depth: + define at which level the iteration should not go deeper + if -1, there is no limit + if 0, you would effectively only get self, the root of the iteration + i.e. if 1, you would only get the first level of predecessors/successors + + :param branch_first: + if True, items will be returned branch first, otherwise depth first + + :param visit_once: + if True, items will only be returned once, although they might be encountered + several times. Loops are prevented that way. + + :param ignore_self: + if True, self will be ignored and automatically pruned from + the result. Otherwise it will be the first item to be returned. + If as_edge is True, the source of the first edge is None + + :param as_edge: + if True, return a pair of items, first being the source, second the + destination, i.e. tuple(src, dest) with the edge spanning from + source to destination""" + + """ + Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]] + Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]] + Tree -> Iterator[Union[Blob, Tree, Submodule, + Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]] + + ignore_self=True is_edge=True -> Iterator[item] + ignore_self=True is_edge=False --> Iterator[item] + ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]] + ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]""" + + visited = set() + stack: Deque[TraverseNT] = deque() + stack.append(TraverseNT(0, self, None)) # self is always depth level 0 + + def addToStack(stack: Deque[TraverseNT], + src_item: 'Traversable', + branch_first: bool, + depth: int) -> None: + lst = self._get_intermediate_items(item) + if not lst: # empty list + return None + if branch_first: + stack.extendleft(TraverseNT(depth, i, src_item) for i in lst) + else: + reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1)) + stack.extend(reviter) + # END addToStack local method + + while stack: + d, item, src = stack.pop() # depth of item, item, item_source + + if visit_once and item in visited: + continue + + if visit_once: + visited.add(item) + + rval: Union[TraversedTup, 'Traversable', 'Blob'] + if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item) + rval = (src, item) + else: + rval = item + + if prune(rval, d): + continue + + skipStartItem = ignore_self and (item is self) + if not skipStartItem and predicate(rval, d): + yield rval + + # only continue to next level if this is appropriate ! + nd = d + 1 + if depth > -1 and nd > depth: + continue + + addToStack(stack, item, branch_first, nd) + # END for each item on work stack + + +@ runtime_checkable +class Serializable(Protocol): + + """Defines methods to serialize and deserialize objects from and into a data stream""" + __slots__ = () + + # @abstractmethod + def _serialize(self, stream: 'BytesIO') -> 'Serializable': + """Serialize the data of this object into the given data stream + :note: a serialized object would ``_deserialize`` into the same object + :param stream: a file-like object + :return: self""" + raise NotImplementedError("To be implemented in subclass") + + # @abstractmethod + def _deserialize(self, stream: 'BytesIO') -> 'Serializable': + """Deserialize all information regarding this object from the stream + :param stream: a file-like object + :return: self""" + raise NotImplementedError("To be implemented in subclass") + + +class TraversableIterableObj(IterableObj, Traversable): + __slots__ = () + + TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj] + + def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]: + return super(TraversableIterableObj, self)._list_traverse(* args, **kwargs) + + @ overload # type: ignore + def traverse(self: T_TIobj + ) -> Iterator[T_TIobj]: + ... + + @ overload + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[True], + as_edge: Literal[False], + ) -> Iterator[T_TIobj]: + ... + + @ overload + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[False], + as_edge: Literal[True], + ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: + ... + + @ overload + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], + prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[True], + as_edge: Literal[True], + ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: + ... + + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], + bool] = lambda i, d: True, + prune: Callable[[Union[T_TIobj, TIobj_tuple], int], + bool] = lambda i, d: False, + depth: int = -1, branch_first: bool = True, visit_once: bool = True, + ignore_self: int = 1, as_edge: bool = False + ) -> Union[Iterator[T_TIobj], + Iterator[Tuple[T_TIobj, T_TIobj]], + Iterator[TIobj_tuple]]: + """For documentation, see util.Traversable._traverse()""" + + """ + # To typecheck instead of using cast. + import itertools + from git.types import TypeGuard + def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]: + for x in inp[1]: + if not isinstance(x, tuple) and len(x) != 2: + if all(isinstance(inner, Commit) for inner in x): + continue + return True + + ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge) + ret_tup = itertools.tee(ret, 2) + assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}" + return ret_tup[0] + """ + return cast(Union[Iterator[T_TIobj], + Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], + super(TraversableIterableObj, self)._traverse( + predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore + )) -- cgit v1.2.1 From b76b99184e8f0e16ba66a730846f3d61c72061fe Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:02:02 +0100 Subject: Update base.py --- git/repo/base.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'git') diff --git a/git/repo/base.py b/git/repo/base.py index bb8ddf13..355f9399 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -422,14 +422,14 @@ class Repo(object): def create_head(self, path: PathLike, commit: str = 'HEAD', force: bool = False, logmsg: Optional[str] = None - ) -> Head: + ) -> 'Head': """Create a new head within the repository. For more documentation, please see the Head.create method. :return: newly created Head Reference""" return Head.create(self, path, commit, logmsg, force) - def delete_head(self, *heads: 'SymbolicReference', **kwargs: Any) -> None: + def delete_head(self, *heads: 'Head', **kwargs: Any) -> None: """Delete the given heads :param kwargs: Additional keyword arguments to be passed to git-branch""" @@ -788,10 +788,10 @@ class Repo(object): return proc.replace("\\\\", "\\").replace('"', "").split("\n") @property - def active_branch(self) -> 'SymbolicReference': + def active_branch(self) -> Head: """The name of the currently active branch. - :return: Head to the active branch""" + # reveal_type(self.head.reference) # => Reference return self.head.reference def blame_incremental(self, rev: TBD, file: TBD, **kwargs: Any) -> Optional[Iterator['BlameEntry']]: -- cgit v1.2.1 From 464848e3c5961a2840533c6de58cb3a5d253711b Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:08:02 +0100 Subject: Update config.py --- git/config.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'git') diff --git a/git/config.py b/git/config.py index c4b26ba6..ad02b437 100644 --- a/git/config.py +++ b/git/config.py @@ -41,12 +41,13 @@ if TYPE_CHECKING: T_ConfigParser = TypeVar('T_ConfigParser', bound='GitConfigParser') -if sys.version_info[:2] < (3, 7): - from collections import OrderedDict - OrderedDict_OMD = OrderedDict +if sys.version_info[:3] < (3, 7, 2): + # typing.Ordereddict not added until py 3.7.2 + from collections import OrderedDict # type: ignore # until 3.6 dropped + OrderedDict_OMD = OrderedDict # type: ignore # until 3.6 dropped else: - from typing import OrderedDict - OrderedDict_OMD = OrderedDict[str, List[_T]] + from typing import OrderedDict # type: ignore # until 3.6 dropped + OrderedDict_OMD = OrderedDict[str, List[_T]] # type: ignore[assignment, misc] # ------------------------------------------------------------- -- cgit v1.2.1 From b833eebece8d0c6cb1c79bc06e8ff9f26b994bb6 Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:10:21 +0100 Subject: Update tag.py --- git/refs/tag.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) (limited to 'git') diff --git a/git/refs/tag.py b/git/refs/tag.py index 281ce09a..edfab33d 100644 --- a/git/refs/tag.py +++ b/git/refs/tag.py @@ -4,13 +4,14 @@ __all__ = ["TagReference", "Tag"] # typing ------------------------------------------------------------------ -from typing import Any, Union, TYPE_CHECKING +from typing import Any, Type, Union, TYPE_CHECKING from git.types import Commit_ish, PathLike if TYPE_CHECKING: from git.repo import Repo from git.objects import Commit from git.objects import TagObject + from git.refs import SymbolicReference # ------------------------------------------------------------------------------ @@ -68,7 +69,8 @@ class TagReference(Reference): return Reference._get_object(self) @classmethod - def create(cls, repo: 'Repo', path: PathLike, reference: Union[Commit_ish, str] = 'HEAD', + def create(cls: Type['TagReference'], repo: 'Repo', path: PathLike, + reference: Union[str, 'SymbolicReference'] = 'HEAD', logmsg: Union[str, None] = None, force: bool = False, **kwargs: Any) -> 'TagReference': """Create a new tag reference. @@ -78,7 +80,7 @@ class TagReference(Reference): The prefix refs/tags is implied :param ref: - A reference to the object you want to tag. It can be a commit, tree or + A reference to the Object you want to tag. The Object can be a commit, tree or blob. :param logmsg: @@ -98,7 +100,9 @@ class TagReference(Reference): Additional keyword arguments to be passed to git-tag :return: A new TagReference""" - args = (path, reference) + if 'ref' in kwargs and kwargs['ref']: + reference = kwargs['ref'] + if logmsg: kwargs['m'] = logmsg elif 'message' in kwargs and kwargs['message']: @@ -107,11 +111,13 @@ class TagReference(Reference): if force: kwargs['f'] = True + args = (path, reference) + repo.git.tag(*args, **kwargs) return TagReference(repo, "%s/%s" % (cls._common_path_default, path)) @classmethod - def delete(cls, repo: 'Repo', *tags: 'TagReference') -> None: + def delete(cls, repo: 'Repo', *tags: 'TagReference') -> None: # type: ignore[override] """Delete the given existing tag or tags""" repo.git.tag("-d", *tags) -- cgit v1.2.1 From e2d5e0e42a7bb664560133d1c3efeb7b4686f7c7 Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:15:02 +0100 Subject: Update head.py --- git/refs/head.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'git') diff --git a/git/refs/head.py b/git/refs/head.py index 338efce9..260bf5e7 100644 --- a/git/refs/head.py +++ b/git/refs/head.py @@ -1,4 +1,4 @@ -from git.config import SectionConstraint +from git.config import GitConfigParser, SectionConstraint from git.util import join_path from git.exc import GitCommandError @@ -142,7 +142,7 @@ class Head(Reference): flag = "-D" repo.git.branch(flag, *heads) - def set_tracking_branch(self, remote_reference: 'RemoteReference') -> 'Head': + def set_tracking_branch(self, remote_reference: Union['RemoteReference', None]) -> 'Head': """ Configure this branch to track the given remote reference. This will alter this branch's configuration accordingly. @@ -203,7 +203,7 @@ class Head(Reference): self.path = "%s/%s" % (self._common_path_default, new_path) return self - def checkout(self, force: bool = False, **kwargs: Any): + def checkout(self, force: bool = False, **kwargs: Any) -> Union['HEAD', 'Head']: """Checkout this head by setting the HEAD to this reference, by updating the index to reflect the tree we point to and by updating the working tree to reflect the latest index. @@ -235,10 +235,11 @@ class Head(Reference): self.repo.git.checkout(self, **kwargs) if self.repo.head.is_detached: return self.repo.head - return self.repo.active_branch + else: + return self.repo.active_branch #{ Configuration - def _config_parser(self, read_only: bool) -> SectionConstraint: + def _config_parser(self, read_only: bool) -> SectionConstraint[GitConfigParser]: if read_only: parser = self.repo.config_reader() else: @@ -247,13 +248,13 @@ class Head(Reference): return SectionConstraint(parser, 'branch "%s"' % self.name) - def config_reader(self) -> SectionConstraint: + def config_reader(self) -> SectionConstraint[GitConfigParser]: """ :return: A configuration parser instance constrained to only read this instance's values""" return self._config_parser(read_only=True) - def config_writer(self) -> SectionConstraint: + def config_writer(self) -> SectionConstraint[GitConfigParser]: """ :return: A configuration writer instance with read-and write access to options of this head""" -- cgit v1.2.1 From 995547aa9b2ca1f1d7795d91a916f83c5d1a96f9 Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:18:32 +0100 Subject: Update reference.py --- git/refs/reference.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'git') diff --git a/git/refs/reference.py b/git/refs/reference.py index 64662281..bc2c6e80 100644 --- a/git/refs/reference.py +++ b/git/refs/reference.py @@ -62,7 +62,9 @@ class Reference(SymbolicReference, LazyMixin, IterableObj): #{ Interface - def set_object(self, object: Commit_ish, logmsg: Union[str, None] = None) -> 'Reference': # @ReservedAssignment + # @ReservedAssignment + def set_object(self, object: Union[Commit_ish, 'SymbolicReference'], logmsg: Union[str, None] = None + ) -> 'SymbolicReference': """Special version which checks if the head-log needs an update as well :return: self""" oldbinsha = None -- cgit v1.2.1 From c081f51b6d4ea6020a411e4ec5c2f90a48e10cad Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:27:55 +0100 Subject: update types commit.py --- git/objects/commit.py | 1 + 1 file changed, 1 insertion(+) (limited to 'git') diff --git a/git/objects/commit.py b/git/objects/commit.py index 884f6522..9d709656 100644 --- a/git/objects/commit.py +++ b/git/objects/commit.py @@ -128,6 +128,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): as what time.altzone returns. The sign is inverted compared to git's UTC timezone.""" super(Commit, self).__init__(repo, binsha) + self.binsha = binsha if tree is not None: assert isinstance(tree, Tree), "Tree needs to be a Tree instance, was %s" % type(tree) if tree is not None: -- cgit v1.2.1 From 0affa33e449db5ba3e00e4c606e6d0c78ce228cf Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:28:39 +0100 Subject: update types submodule/base.py --- git/objects/submodule/base.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'git') diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py index 29212167..14351190 100644 --- a/git/objects/submodule/base.py +++ b/git/objects/submodule/base.py @@ -563,6 +563,7 @@ class Submodule(IndexObject, TraversableIterableObj): progress.update(op, i, len_rmts, prefix + "Done fetching remote of submodule %r" % self.name) # END fetch new data except InvalidGitRepositoryError: + mrepo = None if not init: return self # END early abort if init is not allowed @@ -603,7 +604,7 @@ class Submodule(IndexObject, TraversableIterableObj): # make sure HEAD is not detached mrepo.head.set_reference(local_branch, logmsg="submodule: attaching head to %s" % local_branch) - mrepo.head.ref.set_tracking_branch(remote_branch) + mrepo.head.reference.set_tracking_branch(remote_branch) except (IndexError, InvalidGitRepositoryError): log.warning("Failed to checkout tracking branch %s", self.branch_path) # END handle tracking branch @@ -629,13 +630,14 @@ class Submodule(IndexObject, TraversableIterableObj): if mrepo is not None and to_latest_revision: msg_base = "Cannot update to latest revision in repository at %r as " % mrepo.working_dir if not is_detached: - rref = mrepo.head.ref.tracking_branch() + rref = mrepo.head.reference.tracking_branch() if rref is not None: rcommit = rref.commit binsha = rcommit.binsha hexsha = rcommit.hexsha else: - log.error("%s a tracking branch was not set for local branch '%s'", msg_base, mrepo.head.ref) + log.error("%s a tracking branch was not set for local branch '%s'", + msg_base, mrepo.head.reference) # END handle remote ref else: log.error("%s there was no local tracking branch", msg_base) -- cgit v1.2.1 From a3cd08a402f9517583b263152dddfa0005934015 Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:29:15 +0100 Subject: update types submodule/root.py --- git/objects/submodule/root.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'git') diff --git a/git/objects/submodule/root.py b/git/objects/submodule/root.py index bcac5419..5e84d161 100644 --- a/git/objects/submodule/root.py +++ b/git/objects/submodule/root.py @@ -2,9 +2,7 @@ from .base import ( Submodule, UpdateProgress ) -from .util import ( - find_first_remote_branch -) +from .util import find_first_remote_branch from git.exc import InvalidGitRepositoryError import git -- cgit v1.2.1 From 03190098bdf0f4c0cbb90e39f464c3dd67b0ee1d Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:34:22 +0100 Subject: update types sybmbolic.py --- git/refs/symbolic.py | 172 +++++++++++++++++++++++++++++---------------------- 1 file changed, 98 insertions(+), 74 deletions(-) (limited to 'git') diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py index 0e9dad5c..4171fe23 100644 --- a/git/refs/symbolic.py +++ b/git/refs/symbolic.py @@ -1,4 +1,3 @@ -from git.types import PathLike import os from git.compat import defenc @@ -17,17 +16,18 @@ from gitdb.exc import ( BadName ) -import os.path as osp - -from .log import RefLog +from .log import RefLog, RefLogEntry # typing ------------------------------------------------------------------ -from typing import Any, Iterator, List, Match, Optional, Tuple, Type, TypeVar, Union, TYPE_CHECKING # NOQA +from typing import Any, Iterator, List, Match, Optional, Tuple, Type, TypeVar, Union, TYPE_CHECKING, cast # NOQA from git.types import Commit_ish, PathLike, TBD, Literal # NOQA if TYPE_CHECKING: from git.repo import Repo + from git.refs import Reference, Head, TagReference, RemoteReference + from git.config import GitConfigParser + from git.objects.commit import Actor T_References = TypeVar('T_References', bound='SymbolicReference') @@ -37,9 +37,9 @@ T_References = TypeVar('T_References', bound='SymbolicReference') __all__ = ["SymbolicReference"] -def _git_dir(repo, path): +def _git_dir(repo: 'Repo', path: PathLike) -> PathLike: """ Find the git dir that's appropriate for the path""" - name = "%s" % (path,) + name = f"{path}" if name in ['HEAD', 'ORIG_HEAD', 'FETCH_HEAD', 'index', 'logs']: return repo.git_dir return repo.common_dir @@ -59,46 +59,47 @@ class SymbolicReference(object): _remote_common_path_default = "refs/remotes" _id_attribute_ = "name" - def __init__(self, repo: 'Repo', path: PathLike, check_path: bool = False): + def __init__(self, repo: 'Repo', path: PathLike, check_path: bool = False) -> None: self.repo = repo self.path = str(path) + self.ref = self._get_reference() def __str__(self) -> str: return self.path - def __repr__(self): + def __repr__(self) -> str: return '' % (self.__class__.__name__, self.path) - def __eq__(self, other): + def __eq__(self, other) -> bool: if hasattr(other, 'path'): return self.path == other.path return False - def __ne__(self, other): + def __ne__(self, other) -> bool: return not (self == other) def __hash__(self): return hash(self.path) @property - def name(self): + def name(self) -> str: """ :return: In case of symbolic references, the shortest assumable name is the path itself.""" - return self.path + return str(self.path) @property - def abspath(self): + def abspath(self) -> PathLike: return join_path_native(_git_dir(self.repo, self.path), self.path) @classmethod - def _get_packed_refs_path(cls, repo): - return osp.join(repo.common_dir, 'packed-refs') + def _get_packed_refs_path(cls, repo: 'Repo') -> str: + return os.path.join(repo.common_dir, 'packed-refs') @classmethod - def _iter_packed_refs(cls, repo): - """Returns an iterator yielding pairs of sha1/path pairs (as bytes) for the corresponding refs. + def _iter_packed_refs(cls, repo: 'Repo') -> Iterator[Tuple[str, str]]: + """Returns an iterator yielding pairs of sha1/path pairs for the corresponding refs. :note: The packed refs file will be kept open as long as we iterate""" try: with open(cls._get_packed_refs_path(repo), 'rt', encoding='UTF-8') as fp: @@ -126,7 +127,7 @@ class SymbolicReference(object): if line[0] == '^': continue - yield tuple(line.split(' ', 1)) + yield cast(Tuple[str, str], tuple(line.split(' ', 1))) # END for each line except OSError: return None @@ -137,26 +138,26 @@ class SymbolicReference(object): # alright. @classmethod - def dereference_recursive(cls, repo, ref_path): + def dereference_recursive(cls, repo: 'Repo', ref_path: PathLike) -> str: """ :return: hexsha stored in the reference at the given ref_path, recursively dereferencing all intermediate references as required :param repo: the repository containing the reference at ref_path""" while True: - hexsha, ref_path = cls._get_ref_info(repo, ref_path) + hexsha, _ref_path_out = cls._get_ref_info(repo, ref_path) if hexsha is not None: return hexsha # END recursive dereferencing @classmethod - def _get_ref_info_helper(cls, repo, ref_path): + def _get_ref_info_helper(cls, repo: 'Repo', ref_path: PathLike) -> Union[Tuple[str, None], Tuple[None, PathLike]]: """Return: (str(sha), str(target_ref_path)) if available, the sha the file at rela_path points to, or None. target_ref_path is the reference we point to, or None""" - tokens = None + tokens: Union[List[str], Tuple[str, str], None] = None repodir = _git_dir(repo, ref_path) try: - with open(osp.join(repodir, ref_path), 'rt', encoding='UTF-8') as fp: + with open(os.path.join(repodir, ref_path), 'rt', encoding='UTF-8') as fp: value = fp.read().rstrip() # Don't only split on spaces, but on whitespace, which allows to parse lines like # 60b64ef992065e2600bfef6187a97f92398a9144 branch 'master' of git-server:/path/to/repo @@ -188,13 +189,14 @@ class SymbolicReference(object): raise ValueError("Failed to parse reference information from %r" % ref_path) @classmethod - def _get_ref_info(cls, repo, ref_path): + def _get_ref_info(cls, repo: 'Repo', ref_path: PathLike + ) -> Union[Tuple[str, None], Tuple[None, PathLike]]: """Return: (str(sha), str(target_ref_path)) if available, the sha the file at rela_path points to, or None. target_ref_path is the reference we point to, or None""" return cls._get_ref_info_helper(repo, ref_path) - def _get_object(self): + def _get_object(self) -> Commit_ish: """ :return: The object our ref currently refers to. Refs can be cached, they will @@ -203,7 +205,7 @@ class SymbolicReference(object): # Our path will be resolved to the hexsha which will be used accordingly return Object.new_from_sha(self.repo, hex_to_bin(self.dereference_recursive(self.repo, self.path))) - def _get_commit(self): + def _get_commit(self) -> 'Commit': """ :return: Commit object we point to, works for detached and non-detached @@ -218,7 +220,8 @@ class SymbolicReference(object): # END handle type return obj - def set_commit(self, commit: Union[Commit, 'SymbolicReference', str], logmsg=None): + def set_commit(self, commit: Union[Commit, 'SymbolicReference', str], + logmsg: Union[str, None] = None) -> None: """As set_object, but restricts the type of object to be a Commit :raise ValueError: If commit is not a Commit object or doesn't point to @@ -228,11 +231,13 @@ class SymbolicReference(object): invalid_type = False if isinstance(commit, Object): invalid_type = commit.type != Commit.type + commit = cast('Commit', commit) elif isinstance(commit, SymbolicReference): invalid_type = commit.object.type != Commit.type else: try: - invalid_type = self.repo.rev_parse(commit).type != Commit.type + commit = self.repo.rev_parse(commit) + invalid_type = commit.type != Commit.type except (BadObject, BadName) as e: raise ValueError("Invalid object: %s" % commit) from e # END handle exception @@ -245,9 +250,12 @@ class SymbolicReference(object): # we leave strings to the rev-parse method below self.set_object(commit, logmsg) - return self + # return self + return None - def set_object(self, object, logmsg=None): # @ReservedAssignment + def set_object(self, object: Union[Commit_ish, 'SymbolicReference'], + logmsg: Union[str, None] = None + ) -> 'SymbolicReference': # @ReservedAssignment """Set the object we point to, possibly dereference our symbolic reference first. If the reference does not exist, it will be created @@ -274,10 +282,11 @@ class SymbolicReference(object): # set the commit on our reference return self._get_reference().set_object(object, logmsg) - commit = property(_get_commit, set_commit, doc="Query or set commits directly") - object = property(_get_object, set_object, doc="Return the object our ref currently refers to") + commit = cast('Commit', property(_get_commit, set_commit, doc="Query or set commits directly")) + object = property(_get_object, set_object, doc="Return the object our ref currently refers to") # type: ignore - def _get_reference(self): + def _get_reference(self + ) -> Union['Head', 'RemoteReference', 'TagReference', 'Reference']: """:return: Reference Object we point to :raise TypeError: If this symbolic reference is detached, hence it doesn't point to a reference, but to a commit""" @@ -286,7 +295,8 @@ class SymbolicReference(object): raise TypeError("%s is a detached symbolic reference as it points to %r" % (self, sha)) return self.from_path(self.repo, target_ref_path) - def set_reference(self, ref, logmsg=None): + def set_reference(self, ref: Union[str, Commit_ish, 'SymbolicReference'], logmsg: Union[str, None] = None + ) -> 'SymbolicReference': """Set ourselves to the given ref. It will stay a symbol if the ref is a Reference. Otherwise an Object, given as Object instance or refspec, is assumed and if valid, will be set which effectively detaches the refererence if it was a purely @@ -327,7 +337,7 @@ class SymbolicReference(object): raise TypeError("Require commit, got %r" % obj) # END verify type - oldbinsha = None + oldbinsha: bytes = b'' if logmsg is not None: try: oldbinsha = self.commit.binsha @@ -355,11 +365,16 @@ class SymbolicReference(object): return self - # aliased reference - reference = property(_get_reference, set_reference, doc="Returns the Reference we point to") - ref: Union[Commit_ish] = reference # type: ignore # Union[str, Commit_ish, SymbolicReference] + @ property + def reference(self) -> Union['Head', 'RemoteReference', 'TagReference', 'Reference']: + return self._get_reference() - def is_valid(self): + @ reference.setter + def reference(self, ref: Union[str, Commit_ish, 'SymbolicReference'], logmsg: Union[str, None] = None + ) -> 'SymbolicReference': + return self.set_reference(ref=ref, logmsg=logmsg) + + def is_valid(self) -> bool: """ :return: True if the reference is valid, hence it can be read and points to @@ -371,7 +386,7 @@ class SymbolicReference(object): else: return True - @property + @ property def is_detached(self): """ :return: @@ -383,7 +398,7 @@ class SymbolicReference(object): except TypeError: return True - def log(self): + def log(self) -> 'RefLog': """ :return: RefLog for this reference. Its last entry reflects the latest change applied to this reference @@ -392,7 +407,8 @@ class SymbolicReference(object): instead of calling this method repeatedly. It should be considered read-only.""" return RefLog.from_file(RefLog.path(self)) - def log_append(self, oldbinsha, message, newbinsha=None): + def log_append(self, oldbinsha: bytes, message: Union[str, None], + newbinsha: Union[bytes, None] = None) -> 'RefLogEntry': """Append a logentry to the logfile of this ref :param oldbinsha: binary sha this ref used to point to @@ -404,15 +420,19 @@ class SymbolicReference(object): # correct to allow overriding the committer on a per-commit level. # See https://github.com/gitpython-developers/GitPython/pull/146 try: - committer_or_reader = self.commit.committer + committer_or_reader: Union['Actor', 'GitConfigParser'] = self.commit.committer except ValueError: committer_or_reader = self.repo.config_reader() # end handle newly cloned repositories - return RefLog.append_entry(committer_or_reader, RefLog.path(self), oldbinsha, - (newbinsha is None and self.commit.binsha) or newbinsha, - message) + if newbinsha is None: + newbinsha = self.commit.binsha + + if message is None: + message = '' - def log_entry(self, index): + return RefLog.append_entry(committer_or_reader, RefLog.path(self), oldbinsha, newbinsha, message) + + def log_entry(self, index: int) -> RefLogEntry: """:return: RefLogEntry at the given index :param index: python list compatible positive or negative index @@ -421,22 +441,23 @@ class SymbolicReference(object): In that case, it will be faster than the ``log()`` method""" return RefLog.entry_at(RefLog.path(self), index) - @classmethod - def to_full_path(cls, path) -> PathLike: + @ classmethod + def to_full_path(cls, path: Union[PathLike, 'SymbolicReference']) -> str: """ :return: string with a full repository-relative path which can be used to initialize a Reference instance, for instance by using ``Reference.from_path``""" if isinstance(path, SymbolicReference): path = path.path - full_ref_path = path + full_ref_path = str(path) if not cls._common_path_default: return full_ref_path - if not path.startswith(cls._common_path_default + "/"): + + if not str(path).startswith(cls._common_path_default + "/"): full_ref_path = '%s/%s' % (cls._common_path_default, path) return full_ref_path - @classmethod - def delete(cls, repo, path): + @ classmethod + def delete(cls, repo: 'Repo', path: PathLike) -> None: """Delete the reference at the given path :param repo: @@ -447,8 +468,8 @@ class SymbolicReference(object): or just "myreference", hence 'refs/' is implied. Alternatively the symbolic reference to be deleted""" full_ref_path = cls.to_full_path(path) - abs_path = osp.join(repo.common_dir, full_ref_path) - if osp.exists(abs_path): + abs_path = os.path.join(repo.common_dir, full_ref_path) + if os.path.exists(abs_path): os.remove(abs_path) else: # check packed refs @@ -458,8 +479,8 @@ class SymbolicReference(object): new_lines = [] made_change = False dropped_last_line = False - for line in reader: - line = line.decode(defenc) + for line_bytes in reader: + line = line_bytes.decode(defenc) _, _, line_ref = line.partition(' ') line_ref = line_ref.strip() # keep line if it is a comment or if the ref to delete is not @@ -489,12 +510,14 @@ class SymbolicReference(object): # delete the reflog reflog_path = RefLog.path(cls(repo, full_ref_path)) - if osp.isfile(reflog_path): + if os.path.isfile(reflog_path): os.remove(reflog_path) # END remove reflog - @classmethod - def _create(cls, repo, path, resolve, reference, force, logmsg=None): + @ classmethod + def _create(cls: Type[T_References], repo: 'Repo', path: PathLike, resolve: bool, + reference: Union[str, 'SymbolicReference'], + force: bool, logmsg: Union[str, None] = None) -> T_References: """internal method used to create a new symbolic reference. If resolve is False, the reference will be taken as is, creating a proper symbolic reference. Otherwise it will be resolved to the @@ -502,14 +525,14 @@ class SymbolicReference(object): instead""" git_dir = _git_dir(repo, path) full_ref_path = cls.to_full_path(path) - abs_ref_path = osp.join(git_dir, full_ref_path) + abs_ref_path = os.path.join(git_dir, full_ref_path) # figure out target data target = reference if resolve: target = repo.rev_parse(str(reference)) - if not force and osp.isfile(abs_ref_path): + if not force and os.path.isfile(abs_ref_path): target_data = str(target) if isinstance(target, SymbolicReference): target_data = target.path @@ -527,8 +550,9 @@ class SymbolicReference(object): return ref @classmethod - def create(cls, repo: 'Repo', path: PathLike, reference: Union[Commit_ish, str] = 'HEAD', - logmsg: Union[str, None] = None, force: bool = False, **kwargs: Any): + def create(cls: Type[T_References], repo: 'Repo', path: PathLike, + reference: Union[str, 'SymbolicReference'] = 'SymbolicReference', + logmsg: Union[str, None] = None, force: bool = False, **kwargs: Any) -> T_References: """Create a new symbolic reference, hence a reference pointing , to another reference. :param repo: @@ -540,7 +564,7 @@ class SymbolicReference(object): :param reference: The reference to which the new symbolic reference should point to. - If it is a commit'ish, the symbolic ref will be detached. + If it is a ref to a commit'ish, the symbolic ref will be detached. :param force: if True, force creation even if a symbolic reference with that name already exists. @@ -559,7 +583,7 @@ class SymbolicReference(object): :note: This does not alter the current HEAD, index or Working Tree""" return cls._create(repo, path, cls._resolve_ref_on_create, reference, force, logmsg) - def rename(self, new_path, force=False): + def rename(self, new_path: str, force: bool = False) -> 'SymbolicReference': """Rename self to a new path :param new_path: @@ -577,9 +601,9 @@ class SymbolicReference(object): if self.path == new_path: return self - new_abs_path = osp.join(_git_dir(self.repo, new_path), new_path) - cur_abs_path = osp.join(_git_dir(self.repo, self.path), self.path) - if osp.isfile(new_abs_path): + new_abs_path = os.path.join(_git_dir(self.repo, new_path), new_path) + cur_abs_path = os.path.join(_git_dir(self.repo, self.path), self.path) + if os.path.isfile(new_abs_path): if not force: # if they point to the same file, its not an error with open(new_abs_path, 'rb') as fd1: @@ -594,8 +618,8 @@ class SymbolicReference(object): os.remove(new_abs_path) # END handle existing target file - dname = osp.dirname(new_abs_path) - if not osp.isdir(dname): + dname = os.path.dirname(new_abs_path) + if not os.path.isdir(dname): os.makedirs(dname) # END create directory @@ -630,7 +654,7 @@ class SymbolicReference(object): # read packed refs for _sha, rela_path in cls._iter_packed_refs(repo): - if rela_path.startswith(common_path): + if rela_path.startswith(str(common_path)): rela_paths.add(rela_path) # END relative path matches common path # END packed refs reading @@ -665,7 +689,7 @@ class SymbolicReference(object): return (r for r in cls._iter_items(repo, common_path) if r.__class__ == SymbolicReference or not r.is_detached) @classmethod - def from_path(cls, repo, path): + def from_path(cls, repo: 'Repo', path: PathLike) -> Union['Head', 'RemoteReference', 'TagReference', 'Reference']: """ :param path: full .git-directory-relative path name to the Reference to instantiate :note: use to_full_path() if you only have a partial path of a known Reference Type -- cgit v1.2.1 From a5a05d153ecdbd9b9bdb285a77f5b14d9c0344b0 Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:50:39 +0100 Subject: rvrt symbolic.py types --- git/refs/symbolic.py | 172 ++++++++++++++++++++++----------------------------- 1 file changed, 74 insertions(+), 98 deletions(-) (limited to 'git') diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py index 4171fe23..0e9dad5c 100644 --- a/git/refs/symbolic.py +++ b/git/refs/symbolic.py @@ -1,3 +1,4 @@ +from git.types import PathLike import os from git.compat import defenc @@ -16,18 +17,17 @@ from gitdb.exc import ( BadName ) -from .log import RefLog, RefLogEntry +import os.path as osp + +from .log import RefLog # typing ------------------------------------------------------------------ -from typing import Any, Iterator, List, Match, Optional, Tuple, Type, TypeVar, Union, TYPE_CHECKING, cast # NOQA +from typing import Any, Iterator, List, Match, Optional, Tuple, Type, TypeVar, Union, TYPE_CHECKING # NOQA from git.types import Commit_ish, PathLike, TBD, Literal # NOQA if TYPE_CHECKING: from git.repo import Repo - from git.refs import Reference, Head, TagReference, RemoteReference - from git.config import GitConfigParser - from git.objects.commit import Actor T_References = TypeVar('T_References', bound='SymbolicReference') @@ -37,9 +37,9 @@ T_References = TypeVar('T_References', bound='SymbolicReference') __all__ = ["SymbolicReference"] -def _git_dir(repo: 'Repo', path: PathLike) -> PathLike: +def _git_dir(repo, path): """ Find the git dir that's appropriate for the path""" - name = f"{path}" + name = "%s" % (path,) if name in ['HEAD', 'ORIG_HEAD', 'FETCH_HEAD', 'index', 'logs']: return repo.git_dir return repo.common_dir @@ -59,47 +59,46 @@ class SymbolicReference(object): _remote_common_path_default = "refs/remotes" _id_attribute_ = "name" - def __init__(self, repo: 'Repo', path: PathLike, check_path: bool = False) -> None: + def __init__(self, repo: 'Repo', path: PathLike, check_path: bool = False): self.repo = repo self.path = str(path) - self.ref = self._get_reference() def __str__(self) -> str: return self.path - def __repr__(self) -> str: + def __repr__(self): return '' % (self.__class__.__name__, self.path) - def __eq__(self, other) -> bool: + def __eq__(self, other): if hasattr(other, 'path'): return self.path == other.path return False - def __ne__(self, other) -> bool: + def __ne__(self, other): return not (self == other) def __hash__(self): return hash(self.path) @property - def name(self) -> str: + def name(self): """ :return: In case of symbolic references, the shortest assumable name is the path itself.""" - return str(self.path) + return self.path @property - def abspath(self) -> PathLike: + def abspath(self): return join_path_native(_git_dir(self.repo, self.path), self.path) @classmethod - def _get_packed_refs_path(cls, repo: 'Repo') -> str: - return os.path.join(repo.common_dir, 'packed-refs') + def _get_packed_refs_path(cls, repo): + return osp.join(repo.common_dir, 'packed-refs') @classmethod - def _iter_packed_refs(cls, repo: 'Repo') -> Iterator[Tuple[str, str]]: - """Returns an iterator yielding pairs of sha1/path pairs for the corresponding refs. + def _iter_packed_refs(cls, repo): + """Returns an iterator yielding pairs of sha1/path pairs (as bytes) for the corresponding refs. :note: The packed refs file will be kept open as long as we iterate""" try: with open(cls._get_packed_refs_path(repo), 'rt', encoding='UTF-8') as fp: @@ -127,7 +126,7 @@ class SymbolicReference(object): if line[0] == '^': continue - yield cast(Tuple[str, str], tuple(line.split(' ', 1))) + yield tuple(line.split(' ', 1)) # END for each line except OSError: return None @@ -138,26 +137,26 @@ class SymbolicReference(object): # alright. @classmethod - def dereference_recursive(cls, repo: 'Repo', ref_path: PathLike) -> str: + def dereference_recursive(cls, repo, ref_path): """ :return: hexsha stored in the reference at the given ref_path, recursively dereferencing all intermediate references as required :param repo: the repository containing the reference at ref_path""" while True: - hexsha, _ref_path_out = cls._get_ref_info(repo, ref_path) + hexsha, ref_path = cls._get_ref_info(repo, ref_path) if hexsha is not None: return hexsha # END recursive dereferencing @classmethod - def _get_ref_info_helper(cls, repo: 'Repo', ref_path: PathLike) -> Union[Tuple[str, None], Tuple[None, PathLike]]: + def _get_ref_info_helper(cls, repo, ref_path): """Return: (str(sha), str(target_ref_path)) if available, the sha the file at rela_path points to, or None. target_ref_path is the reference we point to, or None""" - tokens: Union[List[str], Tuple[str, str], None] = None + tokens = None repodir = _git_dir(repo, ref_path) try: - with open(os.path.join(repodir, ref_path), 'rt', encoding='UTF-8') as fp: + with open(osp.join(repodir, ref_path), 'rt', encoding='UTF-8') as fp: value = fp.read().rstrip() # Don't only split on spaces, but on whitespace, which allows to parse lines like # 60b64ef992065e2600bfef6187a97f92398a9144 branch 'master' of git-server:/path/to/repo @@ -189,14 +188,13 @@ class SymbolicReference(object): raise ValueError("Failed to parse reference information from %r" % ref_path) @classmethod - def _get_ref_info(cls, repo: 'Repo', ref_path: PathLike - ) -> Union[Tuple[str, None], Tuple[None, PathLike]]: + def _get_ref_info(cls, repo, ref_path): """Return: (str(sha), str(target_ref_path)) if available, the sha the file at rela_path points to, or None. target_ref_path is the reference we point to, or None""" return cls._get_ref_info_helper(repo, ref_path) - def _get_object(self) -> Commit_ish: + def _get_object(self): """ :return: The object our ref currently refers to. Refs can be cached, they will @@ -205,7 +203,7 @@ class SymbolicReference(object): # Our path will be resolved to the hexsha which will be used accordingly return Object.new_from_sha(self.repo, hex_to_bin(self.dereference_recursive(self.repo, self.path))) - def _get_commit(self) -> 'Commit': + def _get_commit(self): """ :return: Commit object we point to, works for detached and non-detached @@ -220,8 +218,7 @@ class SymbolicReference(object): # END handle type return obj - def set_commit(self, commit: Union[Commit, 'SymbolicReference', str], - logmsg: Union[str, None] = None) -> None: + def set_commit(self, commit: Union[Commit, 'SymbolicReference', str], logmsg=None): """As set_object, but restricts the type of object to be a Commit :raise ValueError: If commit is not a Commit object or doesn't point to @@ -231,13 +228,11 @@ class SymbolicReference(object): invalid_type = False if isinstance(commit, Object): invalid_type = commit.type != Commit.type - commit = cast('Commit', commit) elif isinstance(commit, SymbolicReference): invalid_type = commit.object.type != Commit.type else: try: - commit = self.repo.rev_parse(commit) - invalid_type = commit.type != Commit.type + invalid_type = self.repo.rev_parse(commit).type != Commit.type except (BadObject, BadName) as e: raise ValueError("Invalid object: %s" % commit) from e # END handle exception @@ -250,12 +245,9 @@ class SymbolicReference(object): # we leave strings to the rev-parse method below self.set_object(commit, logmsg) - # return self - return None + return self - def set_object(self, object: Union[Commit_ish, 'SymbolicReference'], - logmsg: Union[str, None] = None - ) -> 'SymbolicReference': # @ReservedAssignment + def set_object(self, object, logmsg=None): # @ReservedAssignment """Set the object we point to, possibly dereference our symbolic reference first. If the reference does not exist, it will be created @@ -282,11 +274,10 @@ class SymbolicReference(object): # set the commit on our reference return self._get_reference().set_object(object, logmsg) - commit = cast('Commit', property(_get_commit, set_commit, doc="Query or set commits directly")) - object = property(_get_object, set_object, doc="Return the object our ref currently refers to") # type: ignore + commit = property(_get_commit, set_commit, doc="Query or set commits directly") + object = property(_get_object, set_object, doc="Return the object our ref currently refers to") - def _get_reference(self - ) -> Union['Head', 'RemoteReference', 'TagReference', 'Reference']: + def _get_reference(self): """:return: Reference Object we point to :raise TypeError: If this symbolic reference is detached, hence it doesn't point to a reference, but to a commit""" @@ -295,8 +286,7 @@ class SymbolicReference(object): raise TypeError("%s is a detached symbolic reference as it points to %r" % (self, sha)) return self.from_path(self.repo, target_ref_path) - def set_reference(self, ref: Union[str, Commit_ish, 'SymbolicReference'], logmsg: Union[str, None] = None - ) -> 'SymbolicReference': + def set_reference(self, ref, logmsg=None): """Set ourselves to the given ref. It will stay a symbol if the ref is a Reference. Otherwise an Object, given as Object instance or refspec, is assumed and if valid, will be set which effectively detaches the refererence if it was a purely @@ -337,7 +327,7 @@ class SymbolicReference(object): raise TypeError("Require commit, got %r" % obj) # END verify type - oldbinsha: bytes = b'' + oldbinsha = None if logmsg is not None: try: oldbinsha = self.commit.binsha @@ -365,16 +355,11 @@ class SymbolicReference(object): return self - @ property - def reference(self) -> Union['Head', 'RemoteReference', 'TagReference', 'Reference']: - return self._get_reference() + # aliased reference + reference = property(_get_reference, set_reference, doc="Returns the Reference we point to") + ref: Union[Commit_ish] = reference # type: ignore # Union[str, Commit_ish, SymbolicReference] - @ reference.setter - def reference(self, ref: Union[str, Commit_ish, 'SymbolicReference'], logmsg: Union[str, None] = None - ) -> 'SymbolicReference': - return self.set_reference(ref=ref, logmsg=logmsg) - - def is_valid(self) -> bool: + def is_valid(self): """ :return: True if the reference is valid, hence it can be read and points to @@ -386,7 +371,7 @@ class SymbolicReference(object): else: return True - @ property + @property def is_detached(self): """ :return: @@ -398,7 +383,7 @@ class SymbolicReference(object): except TypeError: return True - def log(self) -> 'RefLog': + def log(self): """ :return: RefLog for this reference. Its last entry reflects the latest change applied to this reference @@ -407,8 +392,7 @@ class SymbolicReference(object): instead of calling this method repeatedly. It should be considered read-only.""" return RefLog.from_file(RefLog.path(self)) - def log_append(self, oldbinsha: bytes, message: Union[str, None], - newbinsha: Union[bytes, None] = None) -> 'RefLogEntry': + def log_append(self, oldbinsha, message, newbinsha=None): """Append a logentry to the logfile of this ref :param oldbinsha: binary sha this ref used to point to @@ -420,19 +404,15 @@ class SymbolicReference(object): # correct to allow overriding the committer on a per-commit level. # See https://github.com/gitpython-developers/GitPython/pull/146 try: - committer_or_reader: Union['Actor', 'GitConfigParser'] = self.commit.committer + committer_or_reader = self.commit.committer except ValueError: committer_or_reader = self.repo.config_reader() # end handle newly cloned repositories - if newbinsha is None: - newbinsha = self.commit.binsha - - if message is None: - message = '' + return RefLog.append_entry(committer_or_reader, RefLog.path(self), oldbinsha, + (newbinsha is None and self.commit.binsha) or newbinsha, + message) - return RefLog.append_entry(committer_or_reader, RefLog.path(self), oldbinsha, newbinsha, message) - - def log_entry(self, index: int) -> RefLogEntry: + def log_entry(self, index): """:return: RefLogEntry at the given index :param index: python list compatible positive or negative index @@ -441,23 +421,22 @@ class SymbolicReference(object): In that case, it will be faster than the ``log()`` method""" return RefLog.entry_at(RefLog.path(self), index) - @ classmethod - def to_full_path(cls, path: Union[PathLike, 'SymbolicReference']) -> str: + @classmethod + def to_full_path(cls, path) -> PathLike: """ :return: string with a full repository-relative path which can be used to initialize a Reference instance, for instance by using ``Reference.from_path``""" if isinstance(path, SymbolicReference): path = path.path - full_ref_path = str(path) + full_ref_path = path if not cls._common_path_default: return full_ref_path - - if not str(path).startswith(cls._common_path_default + "/"): + if not path.startswith(cls._common_path_default + "/"): full_ref_path = '%s/%s' % (cls._common_path_default, path) return full_ref_path - @ classmethod - def delete(cls, repo: 'Repo', path: PathLike) -> None: + @classmethod + def delete(cls, repo, path): """Delete the reference at the given path :param repo: @@ -468,8 +447,8 @@ class SymbolicReference(object): or just "myreference", hence 'refs/' is implied. Alternatively the symbolic reference to be deleted""" full_ref_path = cls.to_full_path(path) - abs_path = os.path.join(repo.common_dir, full_ref_path) - if os.path.exists(abs_path): + abs_path = osp.join(repo.common_dir, full_ref_path) + if osp.exists(abs_path): os.remove(abs_path) else: # check packed refs @@ -479,8 +458,8 @@ class SymbolicReference(object): new_lines = [] made_change = False dropped_last_line = False - for line_bytes in reader: - line = line_bytes.decode(defenc) + for line in reader: + line = line.decode(defenc) _, _, line_ref = line.partition(' ') line_ref = line_ref.strip() # keep line if it is a comment or if the ref to delete is not @@ -510,14 +489,12 @@ class SymbolicReference(object): # delete the reflog reflog_path = RefLog.path(cls(repo, full_ref_path)) - if os.path.isfile(reflog_path): + if osp.isfile(reflog_path): os.remove(reflog_path) # END remove reflog - @ classmethod - def _create(cls: Type[T_References], repo: 'Repo', path: PathLike, resolve: bool, - reference: Union[str, 'SymbolicReference'], - force: bool, logmsg: Union[str, None] = None) -> T_References: + @classmethod + def _create(cls, repo, path, resolve, reference, force, logmsg=None): """internal method used to create a new symbolic reference. If resolve is False, the reference will be taken as is, creating a proper symbolic reference. Otherwise it will be resolved to the @@ -525,14 +502,14 @@ class SymbolicReference(object): instead""" git_dir = _git_dir(repo, path) full_ref_path = cls.to_full_path(path) - abs_ref_path = os.path.join(git_dir, full_ref_path) + abs_ref_path = osp.join(git_dir, full_ref_path) # figure out target data target = reference if resolve: target = repo.rev_parse(str(reference)) - if not force and os.path.isfile(abs_ref_path): + if not force and osp.isfile(abs_ref_path): target_data = str(target) if isinstance(target, SymbolicReference): target_data = target.path @@ -550,9 +527,8 @@ class SymbolicReference(object): return ref @classmethod - def create(cls: Type[T_References], repo: 'Repo', path: PathLike, - reference: Union[str, 'SymbolicReference'] = 'SymbolicReference', - logmsg: Union[str, None] = None, force: bool = False, **kwargs: Any) -> T_References: + def create(cls, repo: 'Repo', path: PathLike, reference: Union[Commit_ish, str] = 'HEAD', + logmsg: Union[str, None] = None, force: bool = False, **kwargs: Any): """Create a new symbolic reference, hence a reference pointing , to another reference. :param repo: @@ -564,7 +540,7 @@ class SymbolicReference(object): :param reference: The reference to which the new symbolic reference should point to. - If it is a ref to a commit'ish, the symbolic ref will be detached. + If it is a commit'ish, the symbolic ref will be detached. :param force: if True, force creation even if a symbolic reference with that name already exists. @@ -583,7 +559,7 @@ class SymbolicReference(object): :note: This does not alter the current HEAD, index or Working Tree""" return cls._create(repo, path, cls._resolve_ref_on_create, reference, force, logmsg) - def rename(self, new_path: str, force: bool = False) -> 'SymbolicReference': + def rename(self, new_path, force=False): """Rename self to a new path :param new_path: @@ -601,9 +577,9 @@ class SymbolicReference(object): if self.path == new_path: return self - new_abs_path = os.path.join(_git_dir(self.repo, new_path), new_path) - cur_abs_path = os.path.join(_git_dir(self.repo, self.path), self.path) - if os.path.isfile(new_abs_path): + new_abs_path = osp.join(_git_dir(self.repo, new_path), new_path) + cur_abs_path = osp.join(_git_dir(self.repo, self.path), self.path) + if osp.isfile(new_abs_path): if not force: # if they point to the same file, its not an error with open(new_abs_path, 'rb') as fd1: @@ -618,8 +594,8 @@ class SymbolicReference(object): os.remove(new_abs_path) # END handle existing target file - dname = os.path.dirname(new_abs_path) - if not os.path.isdir(dname): + dname = osp.dirname(new_abs_path) + if not osp.isdir(dname): os.makedirs(dname) # END create directory @@ -654,7 +630,7 @@ class SymbolicReference(object): # read packed refs for _sha, rela_path in cls._iter_packed_refs(repo): - if rela_path.startswith(str(common_path)): + if rela_path.startswith(common_path): rela_paths.add(rela_path) # END relative path matches common path # END packed refs reading @@ -689,7 +665,7 @@ class SymbolicReference(object): return (r for r in cls._iter_items(repo, common_path) if r.__class__ == SymbolicReference or not r.is_detached) @classmethod - def from_path(cls, repo: 'Repo', path: PathLike) -> Union['Head', 'RemoteReference', 'TagReference', 'Reference']: + def from_path(cls, repo, path): """ :param path: full .git-directory-relative path name to the Reference to instantiate :note: use to_full_path() if you only have a partial path of a known Reference Type -- cgit v1.2.1 From 879324b36eef1690b382fb3bd118cf83276a30b7 Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 14:21:55 +0100 Subject: Update symbolic.py --- git/refs/symbolic.py | 184 +++++++++++++++++++++++++++------------------------ 1 file changed, 98 insertions(+), 86 deletions(-) (limited to 'git') diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py index 0e9dad5c..4637133b 100644 --- a/git/refs/symbolic.py +++ b/git/refs/symbolic.py @@ -1,6 +1,4 @@ -from git.types import PathLike import os - from git.compat import defenc from git.objects import Object from git.objects.commit import Commit @@ -17,17 +15,18 @@ from gitdb.exc import ( BadName ) -import os.path as osp - -from .log import RefLog +from .log import RefLog, RefLogEntry # typing ------------------------------------------------------------------ -from typing import Any, Iterator, List, Match, Optional, Tuple, Type, TypeVar, Union, TYPE_CHECKING # NOQA +from typing import Any, Iterator, List, Match, Optional, Tuple, Type, TypeVar, Union, TYPE_CHECKING, cast # NOQA from git.types import Commit_ish, PathLike, TBD, Literal # NOQA if TYPE_CHECKING: from git.repo import Repo + from git.refs import Reference, Head, TagReference, RemoteReference + from git.config import GitConfigParser + from git.objects.commit import Actor T_References = TypeVar('T_References', bound='SymbolicReference') @@ -37,9 +36,9 @@ T_References = TypeVar('T_References', bound='SymbolicReference') __all__ = ["SymbolicReference"] -def _git_dir(repo, path): +def _git_dir(repo: 'Repo', path: PathLike) -> PathLike: """ Find the git dir that's appropriate for the path""" - name = "%s" % (path,) + name = f"{path}" if name in ['HEAD', 'ORIG_HEAD', 'FETCH_HEAD', 'index', 'logs']: return repo.git_dir return repo.common_dir @@ -59,46 +58,47 @@ class SymbolicReference(object): _remote_common_path_default = "refs/remotes" _id_attribute_ = "name" - def __init__(self, repo: 'Repo', path: PathLike, check_path: bool = False): + def __init__(self, repo: 'Repo', path: PathLike, check_path: bool = False) -> None: self.repo = repo self.path = str(path) + self.ref = self._get_reference def __str__(self) -> str: return self.path - def __repr__(self): + def __repr__(self) -> str: return '' % (self.__class__.__name__, self.path) - def __eq__(self, other): + def __eq__(self, other) -> bool: if hasattr(other, 'path'): return self.path == other.path return False - def __ne__(self, other): + def __ne__(self, other) -> bool: return not (self == other) def __hash__(self): return hash(self.path) @property - def name(self): + def name(self) -> str: """ :return: In case of symbolic references, the shortest assumable name is the path itself.""" - return self.path + return str(self.path) @property - def abspath(self): + def abspath(self) -> PathLike: return join_path_native(_git_dir(self.repo, self.path), self.path) @classmethod - def _get_packed_refs_path(cls, repo): - return osp.join(repo.common_dir, 'packed-refs') + def _get_packed_refs_path(cls, repo: 'Repo') -> str: + return os.path.join(repo.common_dir, 'packed-refs') @classmethod - def _iter_packed_refs(cls, repo): - """Returns an iterator yielding pairs of sha1/path pairs (as bytes) for the corresponding refs. + def _iter_packed_refs(cls, repo: 'Repo') -> Iterator[Tuple[str, str]]: + """Returns an iterator yielding pairs of sha1/path pairs for the corresponding refs. :note: The packed refs file will be kept open as long as we iterate""" try: with open(cls._get_packed_refs_path(repo), 'rt', encoding='UTF-8') as fp: @@ -126,7 +126,7 @@ class SymbolicReference(object): if line[0] == '^': continue - yield tuple(line.split(' ', 1)) + yield cast(Tuple[str, str], tuple(line.split(' ', 1))) # END for each line except OSError: return None @@ -137,26 +137,26 @@ class SymbolicReference(object): # alright. @classmethod - def dereference_recursive(cls, repo, ref_path): + def dereference_recursive(cls, repo: 'Repo', ref_path: PathLike) -> str: """ :return: hexsha stored in the reference at the given ref_path, recursively dereferencing all intermediate references as required :param repo: the repository containing the reference at ref_path""" while True: - hexsha, ref_path = cls._get_ref_info(repo, ref_path) + hexsha, _ref_path_out = cls._get_ref_info(repo, ref_path) if hexsha is not None: return hexsha # END recursive dereferencing @classmethod - def _get_ref_info_helper(cls, repo, ref_path): + def _get_ref_info_helper(cls, repo: 'Repo', ref_path: PathLike) -> Union[Tuple[str, None], Tuple[None, PathLike]]: """Return: (str(sha), str(target_ref_path)) if available, the sha the file at rela_path points to, or None. target_ref_path is the reference we point to, or None""" - tokens = None + tokens: Union[List[str], Tuple[str, str], None] = None repodir = _git_dir(repo, ref_path) try: - with open(osp.join(repodir, ref_path), 'rt', encoding='UTF-8') as fp: + with open(os.path.join(repodir, ref_path), 'rt', encoding='UTF-8') as fp: value = fp.read().rstrip() # Don't only split on spaces, but on whitespace, which allows to parse lines like # 60b64ef992065e2600bfef6187a97f92398a9144 branch 'master' of git-server:/path/to/repo @@ -188,13 +188,14 @@ class SymbolicReference(object): raise ValueError("Failed to parse reference information from %r" % ref_path) @classmethod - def _get_ref_info(cls, repo, ref_path): + def _get_ref_info(cls, repo: 'Repo', ref_path: PathLike + ) -> Union[Tuple[str, None], Tuple[None, PathLike]]: """Return: (str(sha), str(target_ref_path)) if available, the sha the file at rela_path points to, or None. target_ref_path is the reference we point to, or None""" return cls._get_ref_info_helper(repo, ref_path) - def _get_object(self): + def _get_object(self) -> Commit_ish: """ :return: The object our ref currently refers to. Refs can be cached, they will @@ -203,7 +204,7 @@ class SymbolicReference(object): # Our path will be resolved to the hexsha which will be used accordingly return Object.new_from_sha(self.repo, hex_to_bin(self.dereference_recursive(self.repo, self.path))) - def _get_commit(self): + def _get_commit(self) -> 'Commit': """ :return: Commit object we point to, works for detached and non-detached @@ -218,7 +219,8 @@ class SymbolicReference(object): # END handle type return obj - def set_commit(self, commit: Union[Commit, 'SymbolicReference', str], logmsg=None): + def set_commit(self, commit: Union[Commit, 'SymbolicReference', str], + logmsg: Union[str, None] = None) -> None: """As set_object, but restricts the type of object to be a Commit :raise ValueError: If commit is not a Commit object or doesn't point to @@ -228,11 +230,13 @@ class SymbolicReference(object): invalid_type = False if isinstance(commit, Object): invalid_type = commit.type != Commit.type + commit = cast('Commit', commit) elif isinstance(commit, SymbolicReference): invalid_type = commit.object.type != Commit.type else: try: - invalid_type = self.repo.rev_parse(commit).type != Commit.type + commit = self.repo.rev_parse(commit) + invalid_type = commit.type != Commit.type except (BadObject, BadName) as e: raise ValueError("Invalid object: %s" % commit) from e # END handle exception @@ -245,9 +249,12 @@ class SymbolicReference(object): # we leave strings to the rev-parse method below self.set_object(commit, logmsg) - return self + # return self + return None - def set_object(self, object, logmsg=None): # @ReservedAssignment + def set_object(self, object: Union[Commit_ish, 'SymbolicReference'], + logmsg: Union[str, None] = None + ) -> 'SymbolicReference': # @ReservedAssignment """Set the object we point to, possibly dereference our symbolic reference first. If the reference does not exist, it will be created @@ -274,10 +281,12 @@ class SymbolicReference(object): # set the commit on our reference return self._get_reference().set_object(object, logmsg) - commit = property(_get_commit, set_commit, doc="Query or set commits directly") - object = property(_get_object, set_object, doc="Return the object our ref currently refers to") + commit = cast('Commit', property(_get_commit, set_commit, doc="Query or set commits directly")) + object = property(_get_object, set_object, doc="Return the object our ref currently refers to") # type: ignore + # reference = property(_get_reference, set_reference, doc="Return the object our ref currently refers to") # type: ignore - def _get_reference(self): + def _get_reference(self + ) -> Union['Head', 'RemoteReference', 'TagReference', 'Reference']: """:return: Reference Object we point to :raise TypeError: If this symbolic reference is detached, hence it doesn't point to a reference, but to a commit""" @@ -286,7 +295,8 @@ class SymbolicReference(object): raise TypeError("%s is a detached symbolic reference as it points to %r" % (self, sha)) return self.from_path(self.repo, target_ref_path) - def set_reference(self, ref, logmsg=None): + def set_reference(self, ref: Union[str, Commit_ish, 'SymbolicReference'], logmsg: Union[str, None] = None + ) -> 'SymbolicReference': """Set ourselves to the given ref. It will stay a symbol if the ref is a Reference. Otherwise an Object, given as Object instance or refspec, is assumed and if valid, will be set which effectively detaches the refererence if it was a purely @@ -327,7 +337,7 @@ class SymbolicReference(object): raise TypeError("Require commit, got %r" % obj) # END verify type - oldbinsha = None + oldbinsha: bytes = b'' if logmsg is not None: try: oldbinsha = self.commit.binsha @@ -355,23 +365,16 @@ class SymbolicReference(object): return self - # aliased reference - reference = property(_get_reference, set_reference, doc="Returns the Reference we point to") - ref: Union[Commit_ish] = reference # type: ignore # Union[str, Commit_ish, SymbolicReference] - - def is_valid(self): - """ - :return: - True if the reference is valid, hence it can be read and points to - a valid object or reference.""" - try: - self.object - except (OSError, ValueError): - return False - else: - return True + @ property + def reference(self) -> Union['Head', 'RemoteReference', 'TagReference', 'Reference']: + return self._get_reference() - @property + @ reference.setter + def reference(self, ref: Union[str, Commit_ish, 'SymbolicReference'], logmsg: Union[str, None] = None + ) -> 'SymbolicReference': + return self.set_reference(ref=ref, logmsg=logmsg) + + @ property def is_detached(self): """ :return: @@ -383,7 +386,7 @@ class SymbolicReference(object): except TypeError: return True - def log(self): + def log(self) -> 'RefLog': """ :return: RefLog for this reference. Its last entry reflects the latest change applied to this reference @@ -392,7 +395,8 @@ class SymbolicReference(object): instead of calling this method repeatedly. It should be considered read-only.""" return RefLog.from_file(RefLog.path(self)) - def log_append(self, oldbinsha, message, newbinsha=None): + def log_append(self, oldbinsha: bytes, message: Union[str, None], + newbinsha: Union[bytes, None] = None) -> 'RefLogEntry': """Append a logentry to the logfile of this ref :param oldbinsha: binary sha this ref used to point to @@ -404,15 +408,19 @@ class SymbolicReference(object): # correct to allow overriding the committer on a per-commit level. # See https://github.com/gitpython-developers/GitPython/pull/146 try: - committer_or_reader = self.commit.committer + committer_or_reader: Union['Actor', 'GitConfigParser'] = self.commit.committer except ValueError: committer_or_reader = self.repo.config_reader() # end handle newly cloned repositories - return RefLog.append_entry(committer_or_reader, RefLog.path(self), oldbinsha, - (newbinsha is None and self.commit.binsha) or newbinsha, - message) + if newbinsha is None: + newbinsha = self.commit.binsha + + if message is None: + message = '' + + return RefLog.append_entry(committer_or_reader, RefLog.path(self), oldbinsha, newbinsha, message) - def log_entry(self, index): + def log_entry(self, index: int) -> RefLogEntry: """:return: RefLogEntry at the given index :param index: python list compatible positive or negative index @@ -421,22 +429,23 @@ class SymbolicReference(object): In that case, it will be faster than the ``log()`` method""" return RefLog.entry_at(RefLog.path(self), index) - @classmethod - def to_full_path(cls, path) -> PathLike: + @ classmethod + def to_full_path(cls, path: Union[PathLike, 'SymbolicReference']) -> str: """ :return: string with a full repository-relative path which can be used to initialize a Reference instance, for instance by using ``Reference.from_path``""" if isinstance(path, SymbolicReference): path = path.path - full_ref_path = path + full_ref_path = str(path) if not cls._common_path_default: return full_ref_path - if not path.startswith(cls._common_path_default + "/"): + + if not str(path).startswith(cls._common_path_default + "/"): full_ref_path = '%s/%s' % (cls._common_path_default, path) return full_ref_path - @classmethod - def delete(cls, repo, path): + @ classmethod + def delete(cls, repo: 'Repo', path: PathLike) -> None: """Delete the reference at the given path :param repo: @@ -447,8 +456,8 @@ class SymbolicReference(object): or just "myreference", hence 'refs/' is implied. Alternatively the symbolic reference to be deleted""" full_ref_path = cls.to_full_path(path) - abs_path = osp.join(repo.common_dir, full_ref_path) - if osp.exists(abs_path): + abs_path = os.path.join(repo.common_dir, full_ref_path) + if os.path.exists(abs_path): os.remove(abs_path) else: # check packed refs @@ -458,8 +467,8 @@ class SymbolicReference(object): new_lines = [] made_change = False dropped_last_line = False - for line in reader: - line = line.decode(defenc) + for line_bytes in reader: + line = line_bytes.decode(defenc) _, _, line_ref = line.partition(' ') line_ref = line_ref.strip() # keep line if it is a comment or if the ref to delete is not @@ -489,12 +498,14 @@ class SymbolicReference(object): # delete the reflog reflog_path = RefLog.path(cls(repo, full_ref_path)) - if osp.isfile(reflog_path): + if os.path.isfile(reflog_path): os.remove(reflog_path) # END remove reflog - @classmethod - def _create(cls, repo, path, resolve, reference, force, logmsg=None): + @ classmethod + def _create(cls: Type[T_References], repo: 'Repo', path: PathLike, resolve: bool, + reference: Union[str, 'SymbolicReference'], + force: bool, logmsg: Union[str, None] = None) -> T_References: """internal method used to create a new symbolic reference. If resolve is False, the reference will be taken as is, creating a proper symbolic reference. Otherwise it will be resolved to the @@ -502,14 +513,14 @@ class SymbolicReference(object): instead""" git_dir = _git_dir(repo, path) full_ref_path = cls.to_full_path(path) - abs_ref_path = osp.join(git_dir, full_ref_path) + abs_ref_path = os.path.join(git_dir, full_ref_path) # figure out target data target = reference if resolve: target = repo.rev_parse(str(reference)) - if not force and osp.isfile(abs_ref_path): + if not force and os.path.isfile(abs_ref_path): target_data = str(target) if isinstance(target, SymbolicReference): target_data = target.path @@ -527,8 +538,9 @@ class SymbolicReference(object): return ref @classmethod - def create(cls, repo: 'Repo', path: PathLike, reference: Union[Commit_ish, str] = 'HEAD', - logmsg: Union[str, None] = None, force: bool = False, **kwargs: Any): + def create(cls: Type[T_References], repo: 'Repo', path: PathLike, + reference: Union[str, 'SymbolicReference'] = 'SymbolicReference', + logmsg: Union[str, None] = None, force: bool = False, **kwargs: Any) -> T_References: """Create a new symbolic reference, hence a reference pointing , to another reference. :param repo: @@ -540,7 +552,7 @@ class SymbolicReference(object): :param reference: The reference to which the new symbolic reference should point to. - If it is a commit'ish, the symbolic ref will be detached. + If it is a ref to a commit'ish, the symbolic ref will be detached. :param force: if True, force creation even if a symbolic reference with that name already exists. @@ -559,7 +571,7 @@ class SymbolicReference(object): :note: This does not alter the current HEAD, index or Working Tree""" return cls._create(repo, path, cls._resolve_ref_on_create, reference, force, logmsg) - def rename(self, new_path, force=False): + def rename(self, new_path: str, force: bool = False) -> 'SymbolicReference': """Rename self to a new path :param new_path: @@ -577,9 +589,9 @@ class SymbolicReference(object): if self.path == new_path: return self - new_abs_path = osp.join(_git_dir(self.repo, new_path), new_path) - cur_abs_path = osp.join(_git_dir(self.repo, self.path), self.path) - if osp.isfile(new_abs_path): + new_abs_path = os.path.join(_git_dir(self.repo, new_path), new_path) + cur_abs_path = os.path.join(_git_dir(self.repo, self.path), self.path) + if os.path.isfile(new_abs_path): if not force: # if they point to the same file, its not an error with open(new_abs_path, 'rb') as fd1: @@ -594,8 +606,8 @@ class SymbolicReference(object): os.remove(new_abs_path) # END handle existing target file - dname = osp.dirname(new_abs_path) - if not osp.isdir(dname): + dname = os.path.dirname(new_abs_path) + if not os.path.isdir(dname): os.makedirs(dname) # END create directory @@ -630,7 +642,7 @@ class SymbolicReference(object): # read packed refs for _sha, rela_path in cls._iter_packed_refs(repo): - if rela_path.startswith(common_path): + if rela_path.startswith(str(common_path)): rela_paths.add(rela_path) # END relative path matches common path # END packed refs reading @@ -665,7 +677,7 @@ class SymbolicReference(object): return (r for r in cls._iter_items(repo, common_path) if r.__class__ == SymbolicReference or not r.is_detached) @classmethod - def from_path(cls, repo, path): + def from_path(cls, repo: 'Repo', path: PathLike) -> Union['Head', 'RemoteReference', 'TagReference', 'Reference']: """ :param path: full .git-directory-relative path name to the Reference to instantiate :note: use to_full_path() if you only have a partial path of a known Reference Type -- cgit v1.2.1 From 38ef31f5537de073a72dc4594b9b6374577b6842 Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 14:23:09 +0100 Subject: Update symbolic.py --- git/refs/symbolic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'git') diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py index 4637133b..9829415c 100644 --- a/git/refs/symbolic.py +++ b/git/refs/symbolic.py @@ -283,7 +283,7 @@ class SymbolicReference(object): commit = cast('Commit', property(_get_commit, set_commit, doc="Query or set commits directly")) object = property(_get_object, set_object, doc="Return the object our ref currently refers to") # type: ignore - # reference = property(_get_reference, set_reference, doc="Return the object our ref currently refers to") # type: ignore + # reference = property(_get_reference, set_reference, doc="Return the object our ref currently refers to") def _get_reference(self ) -> Union['Head', 'RemoteReference', 'TagReference', 'Reference']: -- cgit v1.2.1 From d858916ce254287b70f2b3cc675ff7860171bfba Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 15:38:02 +0100 Subject: Rvrt types of symbolic.py that were breaking pytest --- git/refs/symbolic.py | 184 ++++++++++++++++++++++++--------------------------- 1 file changed, 86 insertions(+), 98 deletions(-) (limited to 'git') diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py index 9829415c..0e9dad5c 100644 --- a/git/refs/symbolic.py +++ b/git/refs/symbolic.py @@ -1,4 +1,6 @@ +from git.types import PathLike import os + from git.compat import defenc from git.objects import Object from git.objects.commit import Commit @@ -15,18 +17,17 @@ from gitdb.exc import ( BadName ) -from .log import RefLog, RefLogEntry +import os.path as osp + +from .log import RefLog # typing ------------------------------------------------------------------ -from typing import Any, Iterator, List, Match, Optional, Tuple, Type, TypeVar, Union, TYPE_CHECKING, cast # NOQA +from typing import Any, Iterator, List, Match, Optional, Tuple, Type, TypeVar, Union, TYPE_CHECKING # NOQA from git.types import Commit_ish, PathLike, TBD, Literal # NOQA if TYPE_CHECKING: from git.repo import Repo - from git.refs import Reference, Head, TagReference, RemoteReference - from git.config import GitConfigParser - from git.objects.commit import Actor T_References = TypeVar('T_References', bound='SymbolicReference') @@ -36,9 +37,9 @@ T_References = TypeVar('T_References', bound='SymbolicReference') __all__ = ["SymbolicReference"] -def _git_dir(repo: 'Repo', path: PathLike) -> PathLike: +def _git_dir(repo, path): """ Find the git dir that's appropriate for the path""" - name = f"{path}" + name = "%s" % (path,) if name in ['HEAD', 'ORIG_HEAD', 'FETCH_HEAD', 'index', 'logs']: return repo.git_dir return repo.common_dir @@ -58,47 +59,46 @@ class SymbolicReference(object): _remote_common_path_default = "refs/remotes" _id_attribute_ = "name" - def __init__(self, repo: 'Repo', path: PathLike, check_path: bool = False) -> None: + def __init__(self, repo: 'Repo', path: PathLike, check_path: bool = False): self.repo = repo self.path = str(path) - self.ref = self._get_reference def __str__(self) -> str: return self.path - def __repr__(self) -> str: + def __repr__(self): return '' % (self.__class__.__name__, self.path) - def __eq__(self, other) -> bool: + def __eq__(self, other): if hasattr(other, 'path'): return self.path == other.path return False - def __ne__(self, other) -> bool: + def __ne__(self, other): return not (self == other) def __hash__(self): return hash(self.path) @property - def name(self) -> str: + def name(self): """ :return: In case of symbolic references, the shortest assumable name is the path itself.""" - return str(self.path) + return self.path @property - def abspath(self) -> PathLike: + def abspath(self): return join_path_native(_git_dir(self.repo, self.path), self.path) @classmethod - def _get_packed_refs_path(cls, repo: 'Repo') -> str: - return os.path.join(repo.common_dir, 'packed-refs') + def _get_packed_refs_path(cls, repo): + return osp.join(repo.common_dir, 'packed-refs') @classmethod - def _iter_packed_refs(cls, repo: 'Repo') -> Iterator[Tuple[str, str]]: - """Returns an iterator yielding pairs of sha1/path pairs for the corresponding refs. + def _iter_packed_refs(cls, repo): + """Returns an iterator yielding pairs of sha1/path pairs (as bytes) for the corresponding refs. :note: The packed refs file will be kept open as long as we iterate""" try: with open(cls._get_packed_refs_path(repo), 'rt', encoding='UTF-8') as fp: @@ -126,7 +126,7 @@ class SymbolicReference(object): if line[0] == '^': continue - yield cast(Tuple[str, str], tuple(line.split(' ', 1))) + yield tuple(line.split(' ', 1)) # END for each line except OSError: return None @@ -137,26 +137,26 @@ class SymbolicReference(object): # alright. @classmethod - def dereference_recursive(cls, repo: 'Repo', ref_path: PathLike) -> str: + def dereference_recursive(cls, repo, ref_path): """ :return: hexsha stored in the reference at the given ref_path, recursively dereferencing all intermediate references as required :param repo: the repository containing the reference at ref_path""" while True: - hexsha, _ref_path_out = cls._get_ref_info(repo, ref_path) + hexsha, ref_path = cls._get_ref_info(repo, ref_path) if hexsha is not None: return hexsha # END recursive dereferencing @classmethod - def _get_ref_info_helper(cls, repo: 'Repo', ref_path: PathLike) -> Union[Tuple[str, None], Tuple[None, PathLike]]: + def _get_ref_info_helper(cls, repo, ref_path): """Return: (str(sha), str(target_ref_path)) if available, the sha the file at rela_path points to, or None. target_ref_path is the reference we point to, or None""" - tokens: Union[List[str], Tuple[str, str], None] = None + tokens = None repodir = _git_dir(repo, ref_path) try: - with open(os.path.join(repodir, ref_path), 'rt', encoding='UTF-8') as fp: + with open(osp.join(repodir, ref_path), 'rt', encoding='UTF-8') as fp: value = fp.read().rstrip() # Don't only split on spaces, but on whitespace, which allows to parse lines like # 60b64ef992065e2600bfef6187a97f92398a9144 branch 'master' of git-server:/path/to/repo @@ -188,14 +188,13 @@ class SymbolicReference(object): raise ValueError("Failed to parse reference information from %r" % ref_path) @classmethod - def _get_ref_info(cls, repo: 'Repo', ref_path: PathLike - ) -> Union[Tuple[str, None], Tuple[None, PathLike]]: + def _get_ref_info(cls, repo, ref_path): """Return: (str(sha), str(target_ref_path)) if available, the sha the file at rela_path points to, or None. target_ref_path is the reference we point to, or None""" return cls._get_ref_info_helper(repo, ref_path) - def _get_object(self) -> Commit_ish: + def _get_object(self): """ :return: The object our ref currently refers to. Refs can be cached, they will @@ -204,7 +203,7 @@ class SymbolicReference(object): # Our path will be resolved to the hexsha which will be used accordingly return Object.new_from_sha(self.repo, hex_to_bin(self.dereference_recursive(self.repo, self.path))) - def _get_commit(self) -> 'Commit': + def _get_commit(self): """ :return: Commit object we point to, works for detached and non-detached @@ -219,8 +218,7 @@ class SymbolicReference(object): # END handle type return obj - def set_commit(self, commit: Union[Commit, 'SymbolicReference', str], - logmsg: Union[str, None] = None) -> None: + def set_commit(self, commit: Union[Commit, 'SymbolicReference', str], logmsg=None): """As set_object, but restricts the type of object to be a Commit :raise ValueError: If commit is not a Commit object or doesn't point to @@ -230,13 +228,11 @@ class SymbolicReference(object): invalid_type = False if isinstance(commit, Object): invalid_type = commit.type != Commit.type - commit = cast('Commit', commit) elif isinstance(commit, SymbolicReference): invalid_type = commit.object.type != Commit.type else: try: - commit = self.repo.rev_parse(commit) - invalid_type = commit.type != Commit.type + invalid_type = self.repo.rev_parse(commit).type != Commit.type except (BadObject, BadName) as e: raise ValueError("Invalid object: %s" % commit) from e # END handle exception @@ -249,12 +245,9 @@ class SymbolicReference(object): # we leave strings to the rev-parse method below self.set_object(commit, logmsg) - # return self - return None + return self - def set_object(self, object: Union[Commit_ish, 'SymbolicReference'], - logmsg: Union[str, None] = None - ) -> 'SymbolicReference': # @ReservedAssignment + def set_object(self, object, logmsg=None): # @ReservedAssignment """Set the object we point to, possibly dereference our symbolic reference first. If the reference does not exist, it will be created @@ -281,12 +274,10 @@ class SymbolicReference(object): # set the commit on our reference return self._get_reference().set_object(object, logmsg) - commit = cast('Commit', property(_get_commit, set_commit, doc="Query or set commits directly")) - object = property(_get_object, set_object, doc="Return the object our ref currently refers to") # type: ignore - # reference = property(_get_reference, set_reference, doc="Return the object our ref currently refers to") + commit = property(_get_commit, set_commit, doc="Query or set commits directly") + object = property(_get_object, set_object, doc="Return the object our ref currently refers to") - def _get_reference(self - ) -> Union['Head', 'RemoteReference', 'TagReference', 'Reference']: + def _get_reference(self): """:return: Reference Object we point to :raise TypeError: If this symbolic reference is detached, hence it doesn't point to a reference, but to a commit""" @@ -295,8 +286,7 @@ class SymbolicReference(object): raise TypeError("%s is a detached symbolic reference as it points to %r" % (self, sha)) return self.from_path(self.repo, target_ref_path) - def set_reference(self, ref: Union[str, Commit_ish, 'SymbolicReference'], logmsg: Union[str, None] = None - ) -> 'SymbolicReference': + def set_reference(self, ref, logmsg=None): """Set ourselves to the given ref. It will stay a symbol if the ref is a Reference. Otherwise an Object, given as Object instance or refspec, is assumed and if valid, will be set which effectively detaches the refererence if it was a purely @@ -337,7 +327,7 @@ class SymbolicReference(object): raise TypeError("Require commit, got %r" % obj) # END verify type - oldbinsha: bytes = b'' + oldbinsha = None if logmsg is not None: try: oldbinsha = self.commit.binsha @@ -365,16 +355,23 @@ class SymbolicReference(object): return self - @ property - def reference(self) -> Union['Head', 'RemoteReference', 'TagReference', 'Reference']: - return self._get_reference() + # aliased reference + reference = property(_get_reference, set_reference, doc="Returns the Reference we point to") + ref: Union[Commit_ish] = reference # type: ignore # Union[str, Commit_ish, SymbolicReference] + + def is_valid(self): + """ + :return: + True if the reference is valid, hence it can be read and points to + a valid object or reference.""" + try: + self.object + except (OSError, ValueError): + return False + else: + return True - @ reference.setter - def reference(self, ref: Union[str, Commit_ish, 'SymbolicReference'], logmsg: Union[str, None] = None - ) -> 'SymbolicReference': - return self.set_reference(ref=ref, logmsg=logmsg) - - @ property + @property def is_detached(self): """ :return: @@ -386,7 +383,7 @@ class SymbolicReference(object): except TypeError: return True - def log(self) -> 'RefLog': + def log(self): """ :return: RefLog for this reference. Its last entry reflects the latest change applied to this reference @@ -395,8 +392,7 @@ class SymbolicReference(object): instead of calling this method repeatedly. It should be considered read-only.""" return RefLog.from_file(RefLog.path(self)) - def log_append(self, oldbinsha: bytes, message: Union[str, None], - newbinsha: Union[bytes, None] = None) -> 'RefLogEntry': + def log_append(self, oldbinsha, message, newbinsha=None): """Append a logentry to the logfile of this ref :param oldbinsha: binary sha this ref used to point to @@ -408,19 +404,15 @@ class SymbolicReference(object): # correct to allow overriding the committer on a per-commit level. # See https://github.com/gitpython-developers/GitPython/pull/146 try: - committer_or_reader: Union['Actor', 'GitConfigParser'] = self.commit.committer + committer_or_reader = self.commit.committer except ValueError: committer_or_reader = self.repo.config_reader() # end handle newly cloned repositories - if newbinsha is None: - newbinsha = self.commit.binsha - - if message is None: - message = '' - - return RefLog.append_entry(committer_or_reader, RefLog.path(self), oldbinsha, newbinsha, message) + return RefLog.append_entry(committer_or_reader, RefLog.path(self), oldbinsha, + (newbinsha is None and self.commit.binsha) or newbinsha, + message) - def log_entry(self, index: int) -> RefLogEntry: + def log_entry(self, index): """:return: RefLogEntry at the given index :param index: python list compatible positive or negative index @@ -429,23 +421,22 @@ class SymbolicReference(object): In that case, it will be faster than the ``log()`` method""" return RefLog.entry_at(RefLog.path(self), index) - @ classmethod - def to_full_path(cls, path: Union[PathLike, 'SymbolicReference']) -> str: + @classmethod + def to_full_path(cls, path) -> PathLike: """ :return: string with a full repository-relative path which can be used to initialize a Reference instance, for instance by using ``Reference.from_path``""" if isinstance(path, SymbolicReference): path = path.path - full_ref_path = str(path) + full_ref_path = path if not cls._common_path_default: return full_ref_path - - if not str(path).startswith(cls._common_path_default + "/"): + if not path.startswith(cls._common_path_default + "/"): full_ref_path = '%s/%s' % (cls._common_path_default, path) return full_ref_path - @ classmethod - def delete(cls, repo: 'Repo', path: PathLike) -> None: + @classmethod + def delete(cls, repo, path): """Delete the reference at the given path :param repo: @@ -456,8 +447,8 @@ class SymbolicReference(object): or just "myreference", hence 'refs/' is implied. Alternatively the symbolic reference to be deleted""" full_ref_path = cls.to_full_path(path) - abs_path = os.path.join(repo.common_dir, full_ref_path) - if os.path.exists(abs_path): + abs_path = osp.join(repo.common_dir, full_ref_path) + if osp.exists(abs_path): os.remove(abs_path) else: # check packed refs @@ -467,8 +458,8 @@ class SymbolicReference(object): new_lines = [] made_change = False dropped_last_line = False - for line_bytes in reader: - line = line_bytes.decode(defenc) + for line in reader: + line = line.decode(defenc) _, _, line_ref = line.partition(' ') line_ref = line_ref.strip() # keep line if it is a comment or if the ref to delete is not @@ -498,14 +489,12 @@ class SymbolicReference(object): # delete the reflog reflog_path = RefLog.path(cls(repo, full_ref_path)) - if os.path.isfile(reflog_path): + if osp.isfile(reflog_path): os.remove(reflog_path) # END remove reflog - @ classmethod - def _create(cls: Type[T_References], repo: 'Repo', path: PathLike, resolve: bool, - reference: Union[str, 'SymbolicReference'], - force: bool, logmsg: Union[str, None] = None) -> T_References: + @classmethod + def _create(cls, repo, path, resolve, reference, force, logmsg=None): """internal method used to create a new symbolic reference. If resolve is False, the reference will be taken as is, creating a proper symbolic reference. Otherwise it will be resolved to the @@ -513,14 +502,14 @@ class SymbolicReference(object): instead""" git_dir = _git_dir(repo, path) full_ref_path = cls.to_full_path(path) - abs_ref_path = os.path.join(git_dir, full_ref_path) + abs_ref_path = osp.join(git_dir, full_ref_path) # figure out target data target = reference if resolve: target = repo.rev_parse(str(reference)) - if not force and os.path.isfile(abs_ref_path): + if not force and osp.isfile(abs_ref_path): target_data = str(target) if isinstance(target, SymbolicReference): target_data = target.path @@ -538,9 +527,8 @@ class SymbolicReference(object): return ref @classmethod - def create(cls: Type[T_References], repo: 'Repo', path: PathLike, - reference: Union[str, 'SymbolicReference'] = 'SymbolicReference', - logmsg: Union[str, None] = None, force: bool = False, **kwargs: Any) -> T_References: + def create(cls, repo: 'Repo', path: PathLike, reference: Union[Commit_ish, str] = 'HEAD', + logmsg: Union[str, None] = None, force: bool = False, **kwargs: Any): """Create a new symbolic reference, hence a reference pointing , to another reference. :param repo: @@ -552,7 +540,7 @@ class SymbolicReference(object): :param reference: The reference to which the new symbolic reference should point to. - If it is a ref to a commit'ish, the symbolic ref will be detached. + If it is a commit'ish, the symbolic ref will be detached. :param force: if True, force creation even if a symbolic reference with that name already exists. @@ -571,7 +559,7 @@ class SymbolicReference(object): :note: This does not alter the current HEAD, index or Working Tree""" return cls._create(repo, path, cls._resolve_ref_on_create, reference, force, logmsg) - def rename(self, new_path: str, force: bool = False) -> 'SymbolicReference': + def rename(self, new_path, force=False): """Rename self to a new path :param new_path: @@ -589,9 +577,9 @@ class SymbolicReference(object): if self.path == new_path: return self - new_abs_path = os.path.join(_git_dir(self.repo, new_path), new_path) - cur_abs_path = os.path.join(_git_dir(self.repo, self.path), self.path) - if os.path.isfile(new_abs_path): + new_abs_path = osp.join(_git_dir(self.repo, new_path), new_path) + cur_abs_path = osp.join(_git_dir(self.repo, self.path), self.path) + if osp.isfile(new_abs_path): if not force: # if they point to the same file, its not an error with open(new_abs_path, 'rb') as fd1: @@ -606,8 +594,8 @@ class SymbolicReference(object): os.remove(new_abs_path) # END handle existing target file - dname = os.path.dirname(new_abs_path) - if not os.path.isdir(dname): + dname = osp.dirname(new_abs_path) + if not osp.isdir(dname): os.makedirs(dname) # END create directory @@ -642,7 +630,7 @@ class SymbolicReference(object): # read packed refs for _sha, rela_path in cls._iter_packed_refs(repo): - if rela_path.startswith(str(common_path)): + if rela_path.startswith(common_path): rela_paths.add(rela_path) # END relative path matches common path # END packed refs reading @@ -677,7 +665,7 @@ class SymbolicReference(object): return (r for r in cls._iter_items(repo, common_path) if r.__class__ == SymbolicReference or not r.is_detached) @classmethod - def from_path(cls, repo: 'Repo', path: PathLike) -> Union['Head', 'RemoteReference', 'TagReference', 'Reference']: + def from_path(cls, repo, path): """ :param path: full .git-directory-relative path name to the Reference to instantiate :note: use to_full_path() if you only have a partial path of a known Reference Type -- cgit v1.2.1