diff options
Diffstat (limited to 'git/util.py')
-rw-r--r-- | git/util.py | 111 |
1 files changed, 58 insertions, 53 deletions
diff --git a/git/util.py b/git/util.py index 16c3e62a..b5cce59d 100644 --- a/git/util.py +++ b/git/util.py @@ -3,6 +3,8 @@ # # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php +from git.remote import Remote +from _typeshed import ReadableBuffer import contextlib from functools import wraps import getpass @@ -15,17 +17,16 @@ import shutil import stat from sys import maxsize import time -from typing import Any, AnyStr, Callable, Dict, Generator, List, NoReturn, Optional, Pattern, Sequence, Tuple, Union, cast from unittest import SkipTest -import typing_extensions -from .types import PathLike, TBD -from pathlib import Path - +from typing import (Any, AnyStr, BinaryIO, Callable, Dict, Generator, IO, List, NoReturn, Optional, Pattern, + Sequence, TextIO, Tuple, Union, cast) from typing_extensions import Literal +from git.repo.base import Repo +from .types import PathLike, TBD -from gitdb.util import (# NOQA @IgnorePep8 +from gitdb.util import ( # NOQA @IgnorePep8 make_sha, LockedFD, # @UnusedImport file_contents_ro, # @UnusedImport @@ -72,7 +73,7 @@ def unbare_repo(func: Callable) -> Callable: encounter a bare repository""" @wraps(func) - def wrapper(self, *args: Any, **kwargs: Any) -> Callable: + def wrapper(self: Remote, *args: Any, **kwargs: Any) -> TBD: if self.repo.bare: raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__) # END bare method @@ -98,7 +99,7 @@ def rmtree(path: PathLike) -> None: :note: we use shutil rmtree but adjust its behaviour to see whether files that couldn't be deleted are read-only. Windows will not remove them in that case""" - def onerror(func, path, exc_info): + def onerror(func: Callable, path: PathLike, exc_info: TBD) -> None: # Is the error an access error ? os.chmod(path, stat.S_IWUSR) @@ -120,7 +121,7 @@ def rmfile(path: PathLike) -> None: os.remove(path) -def stream_copy(source, destination, chunk_size: int = 512 * 1024) -> int: +def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int: """Copy all data from the source stream into the destination stream in chunks of size chunk_size @@ -136,11 +137,12 @@ def stream_copy(source, destination, chunk_size: int = 512 * 1024) -> int: return br -def join_path(a, *p): +def join_path(a: PathLike, *p: PathLike) -> PathLike: """Join path tokens together similar to osp.join, but always use '/' instead of possibly '\' on windows.""" - path = a + path = str(a) for b in p: + b = str(b) if not b: continue if b.startswith('/'): @@ -154,22 +156,24 @@ def join_path(a, *p): if is_win: - def to_native_path_windows(path): + def to_native_path_windows(path: PathLike) -> PathLike: + path = str(path) return path.replace('/', '\\') - def to_native_path_linux(path): + def to_native_path_linux(path: PathLike) -> PathLike: + path = str(path) return path.replace('\\', '/') __all__.append("to_native_path_windows") to_native_path = to_native_path_windows else: # no need for any work on linux - def to_native_path_linux(path): + def to_native_path_linux(path: PathLike) -> PathLike: return path to_native_path = to_native_path_linux -def join_path_native(a, *p): +def join_path_native(a: PathLike, *p: PathLike) -> PathLike: """ As join path, but makes sure an OS native path is returned. This is only needed to play it safe on my dear windows and to assure nice paths that only @@ -199,7 +203,7 @@ def _get_exe_extensions() -> Sequence[str]: else ('') -def py_where(program, path: Optional[PathLike]=None) -> List[str]: +def py_where(program: str, path: Optional[PathLike] = None) -> List[str]: # From: http://stackoverflow.com/a/377028/548792 winprog_exts = _get_exe_extensions() @@ -249,7 +253,7 @@ _cygpath_parsers = ( ), (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), - (_cygexpath), + (_cygexpath), False ), @@ -270,7 +274,6 @@ _cygpath_parsers = ( ) # type: Tuple[Tuple[Pattern[str], Callable, bool], ...] - def cygpath(path: PathLike) -> PathLike: """Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment.""" path = str(path) # ensure is str and not AnyPath @@ -292,7 +295,7 @@ _decygpath_regex = re.compile(r"/cygdrive/(\w)(/.*)?") def decygpath(path: PathLike) -> str: - path = str(Path) + path = str(path) m = _decygpath_regex.match(path) if m: drive, rest_path = m.groups() @@ -306,12 +309,12 @@ def decygpath(path: PathLike) -> str: _is_cygwin_cache = {} # type: Dict[str, Optional[bool]] -def is_cygwin_git(git_executable) -> bool: +def is_cygwin_git(git_executable: PathLike) -> bool: if not is_win: return False #from subprocess import check_output - + git_executable = str(git_executable) is_cygwin = _is_cygwin_cache.get(git_executable) # type: Optional[bool] if is_cygwin is None: is_cygwin = False @@ -319,7 +322,7 @@ def is_cygwin_git(git_executable) -> bool: git_dir = osp.dirname(git_executable) if not git_dir: res = py_where(git_executable) - git_dir = osp.dirname(res[0]) if res else None + git_dir = osp.dirname(res[0]) if res else "" ## Just a name given, not a real path. uname_cmd = osp.join(git_dir, 'uname') @@ -346,7 +349,7 @@ def finalize_process(proc: TBD, **kwargs: Any) -> None: proc.wait(**kwargs) -def expand_path(p: PathLike, expand_vars: bool=True) -> Optional[PathLike]: +def expand_path(p: PathLike, expand_vars: bool = True) -> Optional[PathLike]: try: p = osp.expanduser(p) if expand_vars: @@ -382,10 +385,10 @@ class RemoteProgress(object): re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)") def __init__(self) -> None: - self._seen_ops = [] + self._seen_ops = [] # type: List[TBD] self._cur_line = None # type: Optional[str] - self.error_lines = [] - self.other_lines = [] + self.error_lines = [] # type: List[str] + self.other_lines = [] # type: List[str] def _parse_progress_line(self, line: AnyStr) -> None: """Parse progress information from the given line as retrieved by git-push @@ -404,7 +407,7 @@ class RemoteProgress(object): else: line_str = line self._cur_line = line_str - + if self.error_lines or self._cur_line.startswith(('error:', 'fatal:')): self.error_lines.append(self._cur_line) return @@ -462,7 +465,7 @@ class RemoteProgress(object): self.line_dropped(line_str) # Note: Don't add this line to the other lines, as we have to silently # drop it - return + return None # END handle op code # figure out stage @@ -492,16 +495,17 @@ class RemoteProgress(object): :return: a progress handler suitable for handle_process_output(), passing lines on to this Progress handler in a suitable format""" - def handler(line): + def handler(line: AnyStr) -> None: return self._parse_progress_line(line.rstrip()) # end return handler - def line_dropped(self, line): + def line_dropped(self, line: str) -> None: """Called whenever a line could not be understood and was therefore dropped.""" pass - def update(self, op_code, cur_count, max_count=None, message=''): + def update(self, op_code: int, cur_count: Union[str, float], max_count: Union[str, float, None] = None, + message: str = '',) -> None: """Called whenever the progress changes :param op_code: @@ -536,7 +540,7 @@ class CallableRemoteProgress(RemoteProgress): self._callable = fn super(CallableRemoteProgress, self).__init__() - def update(self, *args, **kwargs): + def update(self, *args: Any, **kwargs: Any) -> None: self._callable(*args, **kwargs) @@ -575,7 +579,7 @@ class Actor(object): return hash((self.name, self.email)) def __str__(self) -> str: - return self.name + return self.name if self.name else "" def __repr__(self) -> str: return '<git.Actor "%s <%s>">' % (self.name, self.email) @@ -602,7 +606,7 @@ class Actor(object): # END handle name/email matching @classmethod - def _main_actor(cls, env_name: str, env_email: str, config_reader: Optional[TBD]=None) -> 'Actor': + def _main_actor(cls, env_name: str, env_email: str, config_reader: Optional[TBD] = None) -> 'Actor': actor = Actor('', '') user_id = None # We use this to avoid multiple calls to getpass.getuser() @@ -642,7 +646,7 @@ class Actor(object): return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader) @classmethod - def author(cls, config_reader: Optional[TBD] = None): + def author(cls, config_reader: Optional[TBD] = None) -> 'Actor': """Same as committer(), but defines the main author. It may be specified in the environment, but defaults to the committer""" return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader) @@ -681,11 +685,11 @@ class Stats(object): self.files = files @classmethod - def _list_from_string(cls, repo, text: str) -> 'Stats': + def _list_from_string(cls, repo: Repo, text: str) -> 'Stats': """Create a Stat object from output retrieved by git-diff. :return: git.Stat""" - hsh = {'total': {'insertions': 0, 'deletions': 0, 'lines': 0, 'files': 0}, + hsh = {'total': {'insertions': 0, 'deletions': 0, 'lines': 0, 'files': 0}, 'files': {} } # type: Dict[str, Dict[str, TBD]] ## need typeddict or refactor for mypy for line in text.splitlines(): @@ -713,11 +717,11 @@ class IndexFileSHA1Writer(object): :note: Based on the dulwich project""" __slots__ = ("f", "sha1") - def __init__(self, f) -> None: + def __init__(self, f: IO) -> None: self.f = f self.sha1 = make_sha(b"") - def write(self, data): + def write(self, data: AnyStr) -> int: self.sha1.update(data) return self.f.write(data) @@ -731,7 +735,7 @@ class IndexFileSHA1Writer(object): self.f.close() return sha - def tell(self): + def tell(self) -> int: return self.f.tell() @@ -813,7 +817,7 @@ class BlockingLockFile(LockFile): can never be obtained.""" __slots__ = ("_check_interval", "_max_block_time") - def __init__(self, file_path: PathLike, check_interval_s: float=0.3, max_block_time_s: int=maxsize) -> None: + def __init__(self, file_path: PathLike, check_interval_s: float = 0.3, max_block_time_s: int = maxsize) -> None: """Configure the instance :param check_interval_s: @@ -872,10 +876,10 @@ class IterableList(list): can be left out.""" __slots__ = ('_id_attr', '_prefix') - def __new__(cls, id_attr, prefix=''): + def __new__(cls, id_attr: str, prefix: str = '') -> 'IterableList': return super(IterableList, cls).__new__(cls) - def __init__(self, id_attr: str, prefix: str='') -> None: + def __init__(self, id_attr: str, prefix: str = '') -> None: self._id_attr = id_attr self._prefix = prefix @@ -897,7 +901,7 @@ class IterableList(list): return False # END handle membership - def __getattr__(self, attr: str) -> object: + def __getattr__(self, attr: str) -> Any: attr = self._prefix + attr for item in self: if getattr(item, self._id_attr) == attr: @@ -908,12 +912,13 @@ class IterableList(list): def __getitem__(self, index: Union[int, slice, str]) -> Any: if isinstance(index, int): return list.__getitem__(self, index) - - assert not isinstance(index, slice) - try: - return getattr(self, index) - except AttributeError as e: - raise IndexError("No item found with id %r" % (self._prefix + index)) from e + elif isinstance(index, slice): + raise ValueError("Index should be an int or str") + else: + try: + return getattr(self, index) + except AttributeError as e: + raise IndexError("No item found with id %r" % (self._prefix + index)) from e # END handle getattr def __delitem__(self, index: Union[int, str, slice]) -> None: @@ -943,7 +948,7 @@ class Iterable(object): _id_attribute_ = "attribute that most suitably identifies your instance" @classmethod - def list_items(cls, repo, *args, **kwargs) -> 'IterableList': + def list_items(cls, repo: Repo, *args: Any, **kwargs: Any) -> 'IterableList': """ Find all items of this type - subclasses can specify args and kwargs differently. If no args are given, subclasses are obliged to return all items if no additional @@ -957,7 +962,7 @@ class Iterable(object): return out_list @classmethod - def iter_items(cls, repo, *args, **kwargs) -> NoReturn: + def iter_items(cls, repo: Repo, *args: Any, **kwargs: Any) -> NoReturn: """For more information about the arguments, see list_items :return: iterator yielding Items""" raise NotImplementedError("To be implemented by Subclass") @@ -966,5 +971,5 @@ class Iterable(object): class NullHandler(logging.Handler): - def emit(self, record) -> None: + def emit(self, record: object) -> None: pass |