summaryrefslogtreecommitdiff
path: root/git/util.py
diff options
context:
space:
mode:
authoryobmod <yobmod@gmail.com>2021-03-01 20:18:01 +0000
committeryobmod <yobmod@gmail.com>2021-03-01 20:18:01 +0000
commita094ac1808f7c5fa0653ac075074bb2232223ac1 (patch)
tree53ad607894773eae18e9e110a2f72594f79c0311 /git/util.py
parent5b0028e1e75e1ee0eea63ba78cb3160d49c1f3a3 (diff)
downloadgitpython-a094ac1808f7c5fa0653ac075074bb2232223ac1.tar.gz
add types to git.util and git.__init__
Diffstat (limited to 'git/util.py')
-rw-r--r--git/util.py111
1 files changed, 58 insertions, 53 deletions
diff --git a/git/util.py b/git/util.py
index 16c3e62a..b5cce59d 100644
--- a/git/util.py
+++ b/git/util.py
@@ -3,6 +3,8 @@
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
+from git.remote import Remote
+from _typeshed import ReadableBuffer
import contextlib
from functools import wraps
import getpass
@@ -15,17 +17,16 @@ import shutil
import stat
from sys import maxsize
import time
-from typing import Any, AnyStr, Callable, Dict, Generator, List, NoReturn, Optional, Pattern, Sequence, Tuple, Union, cast
from unittest import SkipTest
-import typing_extensions
-from .types import PathLike, TBD
-from pathlib import Path
-
+from typing import (Any, AnyStr, BinaryIO, Callable, Dict, Generator, IO, List, NoReturn, Optional, Pattern,
+ Sequence, TextIO, Tuple, Union, cast)
from typing_extensions import Literal
+from git.repo.base import Repo
+from .types import PathLike, TBD
-from gitdb.util import (# NOQA @IgnorePep8
+from gitdb.util import ( # NOQA @IgnorePep8
make_sha,
LockedFD, # @UnusedImport
file_contents_ro, # @UnusedImport
@@ -72,7 +73,7 @@ def unbare_repo(func: Callable) -> Callable:
encounter a bare repository"""
@wraps(func)
- def wrapper(self, *args: Any, **kwargs: Any) -> Callable:
+ def wrapper(self: Remote, *args: Any, **kwargs: Any) -> TBD:
if self.repo.bare:
raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__)
# END bare method
@@ -98,7 +99,7 @@ def rmtree(path: PathLike) -> None:
:note: we use shutil rmtree but adjust its behaviour to see whether files that
couldn't be deleted are read-only. Windows will not remove them in that case"""
- def onerror(func, path, exc_info):
+ def onerror(func: Callable, path: PathLike, exc_info: TBD) -> None:
# Is the error an access error ?
os.chmod(path, stat.S_IWUSR)
@@ -120,7 +121,7 @@ def rmfile(path: PathLike) -> None:
os.remove(path)
-def stream_copy(source, destination, chunk_size: int = 512 * 1024) -> int:
+def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int:
"""Copy all data from the source stream into the destination stream in chunks
of size chunk_size
@@ -136,11 +137,12 @@ def stream_copy(source, destination, chunk_size: int = 512 * 1024) -> int:
return br
-def join_path(a, *p):
+def join_path(a: PathLike, *p: PathLike) -> PathLike:
"""Join path tokens together similar to osp.join, but always use
'/' instead of possibly '\' on windows."""
- path = a
+ path = str(a)
for b in p:
+ b = str(b)
if not b:
continue
if b.startswith('/'):
@@ -154,22 +156,24 @@ def join_path(a, *p):
if is_win:
- def to_native_path_windows(path):
+ def to_native_path_windows(path: PathLike) -> PathLike:
+ path = str(path)
return path.replace('/', '\\')
- def to_native_path_linux(path):
+ def to_native_path_linux(path: PathLike) -> PathLike:
+ path = str(path)
return path.replace('\\', '/')
__all__.append("to_native_path_windows")
to_native_path = to_native_path_windows
else:
# no need for any work on linux
- def to_native_path_linux(path):
+ def to_native_path_linux(path: PathLike) -> PathLike:
return path
to_native_path = to_native_path_linux
-def join_path_native(a, *p):
+def join_path_native(a: PathLike, *p: PathLike) -> PathLike:
"""
As join path, but makes sure an OS native path is returned. This is only
needed to play it safe on my dear windows and to assure nice paths that only
@@ -199,7 +203,7 @@ def _get_exe_extensions() -> Sequence[str]:
else ('')
-def py_where(program, path: Optional[PathLike]=None) -> List[str]:
+def py_where(program: str, path: Optional[PathLike] = None) -> List[str]:
# From: http://stackoverflow.com/a/377028/548792
winprog_exts = _get_exe_extensions()
@@ -249,7 +253,7 @@ _cygpath_parsers = (
),
(re.compile(r"\\\\\?\\(\w):[/\\](.*)"),
- (_cygexpath),
+ (_cygexpath),
False
),
@@ -270,7 +274,6 @@ _cygpath_parsers = (
) # type: Tuple[Tuple[Pattern[str], Callable, bool], ...]
-
def cygpath(path: PathLike) -> PathLike:
"""Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment."""
path = str(path) # ensure is str and not AnyPath
@@ -292,7 +295,7 @@ _decygpath_regex = re.compile(r"/cygdrive/(\w)(/.*)?")
def decygpath(path: PathLike) -> str:
- path = str(Path)
+ path = str(path)
m = _decygpath_regex.match(path)
if m:
drive, rest_path = m.groups()
@@ -306,12 +309,12 @@ def decygpath(path: PathLike) -> str:
_is_cygwin_cache = {} # type: Dict[str, Optional[bool]]
-def is_cygwin_git(git_executable) -> bool:
+def is_cygwin_git(git_executable: PathLike) -> bool:
if not is_win:
return False
#from subprocess import check_output
-
+ git_executable = str(git_executable)
is_cygwin = _is_cygwin_cache.get(git_executable) # type: Optional[bool]
if is_cygwin is None:
is_cygwin = False
@@ -319,7 +322,7 @@ def is_cygwin_git(git_executable) -> bool:
git_dir = osp.dirname(git_executable)
if not git_dir:
res = py_where(git_executable)
- git_dir = osp.dirname(res[0]) if res else None
+ git_dir = osp.dirname(res[0]) if res else ""
## Just a name given, not a real path.
uname_cmd = osp.join(git_dir, 'uname')
@@ -346,7 +349,7 @@ def finalize_process(proc: TBD, **kwargs: Any) -> None:
proc.wait(**kwargs)
-def expand_path(p: PathLike, expand_vars: bool=True) -> Optional[PathLike]:
+def expand_path(p: PathLike, expand_vars: bool = True) -> Optional[PathLike]:
try:
p = osp.expanduser(p)
if expand_vars:
@@ -382,10 +385,10 @@ class RemoteProgress(object):
re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)")
def __init__(self) -> None:
- self._seen_ops = []
+ self._seen_ops = [] # type: List[TBD]
self._cur_line = None # type: Optional[str]
- self.error_lines = []
- self.other_lines = []
+ self.error_lines = [] # type: List[str]
+ self.other_lines = [] # type: List[str]
def _parse_progress_line(self, line: AnyStr) -> None:
"""Parse progress information from the given line as retrieved by git-push
@@ -404,7 +407,7 @@ class RemoteProgress(object):
else:
line_str = line
self._cur_line = line_str
-
+
if self.error_lines or self._cur_line.startswith(('error:', 'fatal:')):
self.error_lines.append(self._cur_line)
return
@@ -462,7 +465,7 @@ class RemoteProgress(object):
self.line_dropped(line_str)
# Note: Don't add this line to the other lines, as we have to silently
# drop it
- return
+ return None
# END handle op code
# figure out stage
@@ -492,16 +495,17 @@ class RemoteProgress(object):
:return:
a progress handler suitable for handle_process_output(), passing lines on to this Progress
handler in a suitable format"""
- def handler(line):
+ def handler(line: AnyStr) -> None:
return self._parse_progress_line(line.rstrip())
# end
return handler
- def line_dropped(self, line):
+ def line_dropped(self, line: str) -> None:
"""Called whenever a line could not be understood and was therefore dropped."""
pass
- def update(self, op_code, cur_count, max_count=None, message=''):
+ def update(self, op_code: int, cur_count: Union[str, float], max_count: Union[str, float, None] = None,
+ message: str = '',) -> None:
"""Called whenever the progress changes
:param op_code:
@@ -536,7 +540,7 @@ class CallableRemoteProgress(RemoteProgress):
self._callable = fn
super(CallableRemoteProgress, self).__init__()
- def update(self, *args, **kwargs):
+ def update(self, *args: Any, **kwargs: Any) -> None:
self._callable(*args, **kwargs)
@@ -575,7 +579,7 @@ class Actor(object):
return hash((self.name, self.email))
def __str__(self) -> str:
- return self.name
+ return self.name if self.name else ""
def __repr__(self) -> str:
return '<git.Actor "%s <%s>">' % (self.name, self.email)
@@ -602,7 +606,7 @@ class Actor(object):
# END handle name/email matching
@classmethod
- def _main_actor(cls, env_name: str, env_email: str, config_reader: Optional[TBD]=None) -> 'Actor':
+ def _main_actor(cls, env_name: str, env_email: str, config_reader: Optional[TBD] = None) -> 'Actor':
actor = Actor('', '')
user_id = None # We use this to avoid multiple calls to getpass.getuser()
@@ -642,7 +646,7 @@ class Actor(object):
return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader)
@classmethod
- def author(cls, config_reader: Optional[TBD] = None):
+ def author(cls, config_reader: Optional[TBD] = None) -> 'Actor':
"""Same as committer(), but defines the main author. It may be specified in the environment,
but defaults to the committer"""
return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader)
@@ -681,11 +685,11 @@ class Stats(object):
self.files = files
@classmethod
- def _list_from_string(cls, repo, text: str) -> 'Stats':
+ def _list_from_string(cls, repo: Repo, text: str) -> 'Stats':
"""Create a Stat object from output retrieved by git-diff.
:return: git.Stat"""
- hsh = {'total': {'insertions': 0, 'deletions': 0, 'lines': 0, 'files': 0},
+ hsh = {'total': {'insertions': 0, 'deletions': 0, 'lines': 0, 'files': 0},
'files': {}
} # type: Dict[str, Dict[str, TBD]] ## need typeddict or refactor for mypy
for line in text.splitlines():
@@ -713,11 +717,11 @@ class IndexFileSHA1Writer(object):
:note: Based on the dulwich project"""
__slots__ = ("f", "sha1")
- def __init__(self, f) -> None:
+ def __init__(self, f: IO) -> None:
self.f = f
self.sha1 = make_sha(b"")
- def write(self, data):
+ def write(self, data: AnyStr) -> int:
self.sha1.update(data)
return self.f.write(data)
@@ -731,7 +735,7 @@ class IndexFileSHA1Writer(object):
self.f.close()
return sha
- def tell(self):
+ def tell(self) -> int:
return self.f.tell()
@@ -813,7 +817,7 @@ class BlockingLockFile(LockFile):
can never be obtained."""
__slots__ = ("_check_interval", "_max_block_time")
- def __init__(self, file_path: PathLike, check_interval_s: float=0.3, max_block_time_s: int=maxsize) -> None:
+ def __init__(self, file_path: PathLike, check_interval_s: float = 0.3, max_block_time_s: int = maxsize) -> None:
"""Configure the instance
:param check_interval_s:
@@ -872,10 +876,10 @@ class IterableList(list):
can be left out."""
__slots__ = ('_id_attr', '_prefix')
- def __new__(cls, id_attr, prefix=''):
+ def __new__(cls, id_attr: str, prefix: str = '') -> 'IterableList':
return super(IterableList, cls).__new__(cls)
- def __init__(self, id_attr: str, prefix: str='') -> None:
+ def __init__(self, id_attr: str, prefix: str = '') -> None:
self._id_attr = id_attr
self._prefix = prefix
@@ -897,7 +901,7 @@ class IterableList(list):
return False
# END handle membership
- def __getattr__(self, attr: str) -> object:
+ def __getattr__(self, attr: str) -> Any:
attr = self._prefix + attr
for item in self:
if getattr(item, self._id_attr) == attr:
@@ -908,12 +912,13 @@ class IterableList(list):
def __getitem__(self, index: Union[int, slice, str]) -> Any:
if isinstance(index, int):
return list.__getitem__(self, index)
-
- assert not isinstance(index, slice)
- try:
- return getattr(self, index)
- except AttributeError as e:
- raise IndexError("No item found with id %r" % (self._prefix + index)) from e
+ elif isinstance(index, slice):
+ raise ValueError("Index should be an int or str")
+ else:
+ try:
+ return getattr(self, index)
+ except AttributeError as e:
+ raise IndexError("No item found with id %r" % (self._prefix + index)) from e
# END handle getattr
def __delitem__(self, index: Union[int, str, slice]) -> None:
@@ -943,7 +948,7 @@ class Iterable(object):
_id_attribute_ = "attribute that most suitably identifies your instance"
@classmethod
- def list_items(cls, repo, *args, **kwargs) -> 'IterableList':
+ def list_items(cls, repo: Repo, *args: Any, **kwargs: Any) -> 'IterableList':
"""
Find all items of this type - subclasses can specify args and kwargs differently.
If no args are given, subclasses are obliged to return all items if no additional
@@ -957,7 +962,7 @@ class Iterable(object):
return out_list
@classmethod
- def iter_items(cls, repo, *args, **kwargs) -> NoReturn:
+ def iter_items(cls, repo: Repo, *args: Any, **kwargs: Any) -> NoReturn:
"""For more information about the arguments, see list_items
:return: iterator yielding Items"""
raise NotImplementedError("To be implemented by Subclass")
@@ -966,5 +971,5 @@ class Iterable(object):
class NullHandler(logging.Handler):
- def emit(self, record) -> None:
+ def emit(self, record: object) -> None:
pass