summaryrefslogtreecommitdiff
path: root/git/util.py
diff options
context:
space:
mode:
authorSebastian Thiel <sebastian.thiel@icloud.com>2022-05-18 07:43:53 +0800
committerSebastian Thiel <sebastian.thiel@icloud.com>2022-05-18 07:43:53 +0800
commit21ec529987d10e0010badd37f8da3274167d436f (patch)
treea3394cfe902ce7edd07c89420c21c13274a2d295 /git/util.py
parentb30720ee4d9762a03eae4fa7cfa4b0190d81784d (diff)
downloadgitpython-21ec529987d10e0010badd37f8da3274167d436f.tar.gz
Run everything through 'black'
That way people who use it won't be deterred, while it unifies style everywhere.
Diffstat (limited to 'git/util.py')
-rw-r--r--git/util.py453
1 files changed, 287 insertions, 166 deletions
diff --git a/git/util.py b/git/util.py
index 0711265a..edc8750d 100644
--- a/git/util.py
+++ b/git/util.py
@@ -26,9 +26,26 @@ import warnings
# typing ---------------------------------------------------------
-from typing import (Any, AnyStr, BinaryIO, Callable, Dict, Generator, IO, Iterator, List,
- Optional, Pattern, Sequence, Tuple, TypeVar, Union, cast,
- TYPE_CHECKING, overload, )
+from typing import (
+ Any,
+ AnyStr,
+ BinaryIO,
+ Callable,
+ Dict,
+ Generator,
+ IO,
+ Iterator,
+ List,
+ Optional,
+ Pattern,
+ Sequence,
+ Tuple,
+ TypeVar,
+ Union,
+ cast,
+ TYPE_CHECKING,
+ overload,
+)
import pathlib
@@ -37,14 +54,25 @@ if TYPE_CHECKING:
from git.repo.base import Repo
from git.config import GitConfigParser, SectionConstraint
from git import Git
+
# from git.objects.base import IndexObject
-from .types import (Literal, SupportsIndex, Protocol, runtime_checkable, # because behind py version guards
- PathLike, HSH_TD, Total_TD, Files_TD, # aliases
- Has_id_attribute)
+from .types import (
+ Literal,
+ SupportsIndex,
+ Protocol,
+ runtime_checkable, # because behind py version guards
+ PathLike,
+ HSH_TD,
+ Total_TD,
+ Files_TD, # aliases
+ Has_id_attribute,
+)
-T_IterableObj = TypeVar('T_IterableObj', bound=Union['IterableObj', 'Has_id_attribute'], covariant=True)
+T_IterableObj = TypeVar(
+ "T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True
+)
# So IterableList[Head] is subtype of IterableList[IterableObj]
# ---------------------------------------------------------------------
@@ -52,14 +80,14 @@ T_IterableObj = TypeVar('T_IterableObj', bound=Union['IterableObj', 'Has_id_attr
from gitdb.util import ( # NOQA @IgnorePep8
make_sha,
- LockedFD, # @UnusedImport
- file_contents_ro, # @UnusedImport
- file_contents_ro_filepath, # @UnusedImport
- LazyMixin, # @UnusedImport
- to_hex_sha, # @UnusedImport
- to_bin_sha, # @UnusedImport
- bin_to_hex, # @UnusedImport
- hex_to_bin, # @UnusedImport
+ LockedFD, # @UnusedImport
+ file_contents_ro, # @UnusedImport
+ file_contents_ro_filepath, # @UnusedImport
+ LazyMixin, # @UnusedImport
+ to_hex_sha, # @UnusedImport
+ to_bin_sha, # @UnusedImport
+ bin_to_hex, # @UnusedImport
+ hex_to_bin, # @UnusedImport
)
@@ -67,11 +95,26 @@ from gitdb.util import ( # NOQA @IgnorePep8
# Handle once test-cases are back up and running.
# Most of these are unused here, but are for use by git-python modules so these
# don't see gitdb all the time. Flake of course doesn't like it.
-__all__ = ["stream_copy", "join_path", "to_native_path_linux",
- "join_path_native", "Stats", "IndexFileSHA1Writer", "IterableObj", "IterableList",
- "BlockingLockFile", "LockFile", 'Actor', 'get_user_id', 'assure_directory_exists',
- 'RemoteProgress', 'CallableRemoteProgress', 'rmtree', 'unbare_repo',
- 'HIDE_WINDOWS_KNOWN_ERRORS']
+__all__ = [
+ "stream_copy",
+ "join_path",
+ "to_native_path_linux",
+ "join_path_native",
+ "Stats",
+ "IndexFileSHA1Writer",
+ "IterableObj",
+ "IterableList",
+ "BlockingLockFile",
+ "LockFile",
+ "Actor",
+ "get_user_id",
+ "assure_directory_exists",
+ "RemoteProgress",
+ "CallableRemoteProgress",
+ "rmtree",
+ "unbare_repo",
+ "HIDE_WINDOWS_KNOWN_ERRORS",
+]
log = logging.getLogger(__name__)
@@ -81,12 +124,14 @@ log = logging.getLogger(__name__)
#: We need an easy way to see if Appveyor TCs start failing,
#: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy,
#: till then, we wish to hide them.
-HIDE_WINDOWS_KNOWN_ERRORS = is_win and os.environ.get('HIDE_WINDOWS_KNOWN_ERRORS', True)
-HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get('HIDE_WINDOWS_FREEZE_ERRORS', True)
+HIDE_WINDOWS_KNOWN_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_KNOWN_ERRORS", True)
+HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get(
+ "HIDE_WINDOWS_FREEZE_ERRORS", True
+)
# { Utility Methods
-T = TypeVar('T')
+T = TypeVar("T")
def unbare_repo(func: Callable[..., T]) -> Callable[..., T]:
@@ -96,11 +141,14 @@ def unbare_repo(func: Callable[..., T]) -> Callable[..., T]:
from .exc import InvalidGitRepositoryError
@wraps(func)
- def wrapper(self: 'Remote', *args: Any, **kwargs: Any) -> T:
+ def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T:
if self.repo.bare:
- raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__)
+ raise InvalidGitRepositoryError(
+ "Method '%s' cannot operate on bare repositories" % func.__name__
+ )
# END bare method
return func(self, *args, **kwargs)
+
# END wrapper
return wrapper
@@ -131,7 +179,10 @@ def rmtree(path: PathLike) -> None:
except Exception as ex:
if HIDE_WINDOWS_KNOWN_ERRORS:
from unittest import SkipTest
- raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex
+
+ raise SkipTest(
+ "FIXME: fails with: PermissionError\n {}".format(ex)
+ ) from ex
raise
return shutil.rmtree(path, False, onerror)
@@ -145,7 +196,9 @@ def rmfile(path: PathLike) -> None:
os.remove(path)
-def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int:
+def stream_copy(
+ source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024
+) -> int:
"""Copy all data from the source stream into the destination stream in chunks
of size chunk_size
@@ -169,24 +222,25 @@ def join_path(a: PathLike, *p: PathLike) -> PathLike:
b = str(b)
if not b:
continue
- if b.startswith('/'):
+ if b.startswith("/"):
path += b[1:]
- elif path == '' or path.endswith('/'):
+ elif path == "" or path.endswith("/"):
path += b
else:
- path += '/' + b
+ path += "/" + b
# END for each path token to add
return path
if is_win:
+
def to_native_path_windows(path: PathLike) -> PathLike:
path = str(path)
- return path.replace('/', '\\')
+ return path.replace("/", "\\")
def to_native_path_linux(path: PathLike) -> str:
path = str(path)
- return path.replace('\\', '/')
+ return path.replace("\\", "/")
__all__.append("to_native_path_windows")
to_native_path = to_native_path_windows
@@ -222,10 +276,14 @@ def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool:
def _get_exe_extensions() -> Sequence[str]:
- PATHEXT = os.environ.get('PATHEXT', None)
- return tuple(p.upper() for p in PATHEXT.split(os.pathsep)) if PATHEXT \
- else ('.BAT', 'COM', '.EXE') if is_win \
- else ('')
+ PATHEXT = os.environ.get("PATHEXT", None)
+ return (
+ tuple(p.upper() for p in PATHEXT.split(os.pathsep))
+ if PATHEXT
+ else (".BAT", "COM", ".EXE")
+ if is_win
+ else ("")
+ )
def py_where(program: str, path: Optional[PathLike] = None) -> List[str]:
@@ -233,9 +291,15 @@ def py_where(program: str, path: Optional[PathLike] = None) -> List[str]:
winprog_exts = _get_exe_extensions()
def is_exec(fpath: str) -> bool:
- return osp.isfile(fpath) and os.access(fpath, os.X_OK) and (
- os.name != 'nt' or not winprog_exts or any(fpath.upper().endswith(ext)
- for ext in winprog_exts))
+ return (
+ osp.isfile(fpath)
+ and os.access(fpath, os.X_OK)
+ and (
+ os.name != "nt"
+ or not winprog_exts
+ or any(fpath.upper().endswith(ext) for ext in winprog_exts)
+ )
+ )
progs = []
if not path:
@@ -244,7 +308,7 @@ def py_where(program: str, path: Optional[PathLike] = None) -> List[str]:
folder = folder.strip('"')
if folder:
exe_path = osp.join(folder, program)
- for f in [exe_path] + ['%s%s' % (exe_path, e) for e in winprog_exts]:
+ for f in [exe_path] + ["%s%s" % (exe_path, e) for e in winprog_exts]:
if is_exec(f):
progs.append(f)
return progs
@@ -264,38 +328,26 @@ def _cygexpath(drive: Optional[str], path: str) -> str:
else:
p = cygpath(p)
elif drive:
- p = '/cygdrive/%s/%s' % (drive.lower(), p)
+ p = "/cygdrive/%s/%s" % (drive.lower(), p)
p_str = str(p) # ensure it is a str and not AnyPath
- return p_str.replace('\\', '/')
+ return p_str.replace("\\", "/")
_cygpath_parsers: Tuple[Tuple[Pattern[str], Callable, bool], ...] = (
# See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx
# and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths
- (re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"),
- (lambda server, share, rest_path: '//%s/%s/%s' % (server, share, rest_path.replace('\\', '/'))),
- False
- ),
-
- (re.compile(r"\\\\\?\\(\w):[/\\](.*)"),
- (_cygexpath),
- False
- ),
-
- (re.compile(r"(\w):[/\\](.*)"),
- (_cygexpath),
- False
- ),
-
- (re.compile(r"file:(.*)", re.I),
- (lambda rest_path: rest_path),
- True
- ),
-
- (re.compile(r"(\w{2,}:.*)"), # remote URL, do nothing
- (lambda url: url),
- False
- ),
+ (
+ re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"),
+ (
+ lambda server, share, rest_path: "//%s/%s/%s"
+ % (server, share, rest_path.replace("\\", "/"))
+ ),
+ False,
+ ),
+ (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), (_cygexpath), False),
+ (re.compile(r"(\w):[/\\](.*)"), (_cygexpath), False),
+ (re.compile(r"file:(.*)", re.I), (lambda rest_path: rest_path), True),
+ (re.compile(r"(\w{2,}:.*)"), (lambda url: url), False), # remote URL, do nothing
)
@@ -303,7 +355,7 @@ def cygpath(path: str) -> str:
"""Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment."""
path = str(path) # ensure is str and not AnyPath.
# Fix to use Paths when 3.5 dropped. or to be just str if only for urls?
- if not path.startswith(('/cygdrive', '//')):
+ if not path.startswith(("/cygdrive", "//")):
for regex, parser, recurse in _cygpath_parsers:
match = regex.match(path)
if match:
@@ -325,9 +377,9 @@ def decygpath(path: PathLike) -> str:
m = _decygpath_regex.match(path)
if m:
drive, rest_path = m.groups()
- path = '%s:%s' % (drive.upper(), rest_path or '')
+ path = "%s:%s" % (drive.upper(), rest_path or "")
- return path.replace('/', '\\')
+ return path.replace("/", "\\")
#: Store boolean flags denoting if a specific Git executable
@@ -363,14 +415,15 @@ def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool:
git_dir = osp.dirname(res[0]) if res else ""
# Just a name given, not a real path.
- uname_cmd = osp.join(git_dir, 'uname')
- process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE,
- universal_newlines=True)
+ uname_cmd = osp.join(git_dir, "uname")
+ process = subprocess.Popen(
+ [uname_cmd], stdout=subprocess.PIPE, universal_newlines=True
+ )
uname_out, _ = process.communicate()
- #retcode = process.poll()
- is_cygwin = 'CYGWIN' in uname_out
+ # retcode = process.poll()
+ is_cygwin = "CYGWIN" in uname_out
except Exception as ex:
- log.debug('Failed checking if running in CYGWIN due to: %r', ex)
+ log.debug("Failed checking if running in CYGWIN due to: %r", ex)
_is_cygwin_cache[git_executable] = is_cygwin
return is_cygwin
@@ -381,7 +434,9 @@ def get_user_id() -> str:
return "%s@%s" % (getpass.getuser(), platform.node())
-def finalize_process(proc: Union[subprocess.Popen, 'Git.AutoInterrupt'], **kwargs: Any) -> None:
+def finalize_process(
+ proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any
+) -> None:
"""Wait for the process (clone, fetch, pull or push) and handle its errors accordingly"""
# TODO: No close proc-streams??
proc.wait(**kwargs)
@@ -398,13 +453,15 @@ def expand_path(p: PathLike, expand_vars: bool = ...) -> str:
...
-def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]:
+def expand_path(
+ p: Union[None, PathLike], expand_vars: bool = True
+) -> Optional[PathLike]:
if isinstance(p, pathlib.Path):
return p.resolve()
try:
p = osp.expanduser(p) # type: ignore
if expand_vars:
- p = osp.expandvars(p) # type: ignore
+ p = osp.expandvars(p) # type: ignore
return osp.normpath(osp.abspath(p)) # type: ignore
except Exception:
return None
@@ -430,11 +487,9 @@ def remove_password_if_present(cmdline: Sequence[str]) -> List[str]:
continue
if url.password is not None:
- url = url._replace(
- netloc=url.netloc.replace(url.password, "*****"))
+ url = url._replace(netloc=url.netloc.replace(url.password, "*****"))
if url.username is not None:
- url = url._replace(
- netloc=url.netloc.replace(url.username, "*****"))
+ url = url._replace(netloc=url.netloc.replace(url.username, "*****"))
new_cmdline[index] = urlunsplit(url)
except ValueError:
# This is not a valid URL
@@ -452,19 +507,31 @@ class RemoteProgress(object):
Handler providing an interface to parse progress information emitted by git-push
and git-fetch and to dispatch callbacks allowing subclasses to react to the progress.
"""
+
_num_op_codes: int = 9
- BEGIN, END, COUNTING, COMPRESSING, WRITING, RECEIVING, RESOLVING, FINDING_SOURCES, CHECKING_OUT = \
- [1 << x for x in range(_num_op_codes)]
+ (
+ BEGIN,
+ END,
+ COUNTING,
+ COMPRESSING,
+ WRITING,
+ RECEIVING,
+ RESOLVING,
+ FINDING_SOURCES,
+ CHECKING_OUT,
+ ) = [1 << x for x in range(_num_op_codes)]
STAGE_MASK = BEGIN | END
OP_MASK = ~STAGE_MASK
- DONE_TOKEN = 'done.'
- TOKEN_SEPARATOR = ', '
+ DONE_TOKEN = "done."
+ TOKEN_SEPARATOR = ", "
- __slots__ = ('_cur_line',
- '_seen_ops',
- 'error_lines', # Lines that started with 'error:' or 'fatal:'.
- 'other_lines') # Lines not denoting progress (i.e.g. push-infos).
+ __slots__ = (
+ "_cur_line",
+ "_seen_ops",
+ "error_lines", # Lines that started with 'error:' or 'fatal:'.
+ "other_lines",
+ ) # Lines not denoting progress (i.e.g. push-infos).
re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)")
re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)")
@@ -486,13 +553,13 @@ class RemoteProgress(object):
# Compressing objects: 50% (1/2)
# Compressing objects: 100% (2/2)
# Compressing objects: 100% (2/2), done.
- if isinstance(line, bytes): # mypy argues about ternary assignment
- line_str = line.decode('utf-8')
+ if isinstance(line, bytes): # mypy argues about ternary assignment
+ line_str = line.decode("utf-8")
else:
line_str = line
self._cur_line = line_str
- if self._cur_line.startswith(('error:', 'fatal:')):
+ if self._cur_line.startswith(("error:", "fatal:")):
self.error_lines.append(self._cur_line)
return
@@ -531,13 +598,13 @@ class RemoteProgress(object):
op_code |= self.COMPRESSING
elif op_name == "Writing objects":
op_code |= self.WRITING
- elif op_name == 'Receiving objects':
+ elif op_name == "Receiving objects":
op_code |= self.RECEIVING
- elif op_name == 'Resolving deltas':
+ elif op_name == "Resolving deltas":
op_code |= self.RESOLVING
- elif op_name == 'Finding sources':
+ elif op_name == "Finding sources":
op_code |= self.FINDING_SOURCES
- elif op_name == 'Checking out files':
+ elif op_name == "Checking out files":
op_code |= self.CHECKING_OUT
else:
# Note: On windows it can happen that partial lines are sent
@@ -559,28 +626,32 @@ class RemoteProgress(object):
# END begin opcode
if message is None:
- message = ''
+ message = ""
# END message handling
message = message.strip()
if message.endswith(self.DONE_TOKEN):
op_code |= self.END
- message = message[:-len(self.DONE_TOKEN)]
+ message = message[: -len(self.DONE_TOKEN)]
# END end message handling
message = message.strip(self.TOKEN_SEPARATOR)
- self.update(op_code,
- cur_count and float(cur_count),
- max_count and float(max_count),
- message)
+ self.update(
+ op_code,
+ cur_count and float(cur_count),
+ max_count and float(max_count),
+ message,
+ )
def new_message_handler(self) -> Callable[[str], None]:
"""
:return:
a progress handler suitable for handle_process_output(), passing lines on to this Progress
handler in a suitable format"""
+
def handler(line: AnyStr) -> None:
return self._parse_progress_line(line.rstrip())
+
# end
return handler
@@ -588,8 +659,13 @@ class RemoteProgress(object):
"""Called whenever a line could not be understood and was therefore dropped."""
pass
- def update(self, op_code: int, cur_count: Union[str, float], max_count: Union[str, float, None] = None,
- message: str = '',) -> None:
+ def update(
+ self,
+ op_code: int,
+ cur_count: Union[str, float],
+ max_count: Union[str, float, None] = None,
+ message: str = "",
+ ) -> None:
"""Called whenever the progress changes
:param op_code:
@@ -618,7 +694,8 @@ class RemoteProgress(object):
class CallableRemoteProgress(RemoteProgress):
"""An implementation forwarding updates to any callable"""
- __slots__ = ('_callable')
+
+ __slots__ = "_callable"
def __init__(self, fn: Callable) -> None:
self._callable = fn
@@ -632,9 +709,10 @@ class Actor(object):
"""Actors hold information about a person acting on the repository. They
can be committers and authors or anything with a name and an email as
mentioned in the git log entries."""
+
# PRECOMPILED REGEX
- name_only_regex = re.compile(r'<(.*)>')
- name_email_regex = re.compile(r'(.*) <(.*?)>')
+ name_only_regex = re.compile(r"<(.*)>")
+ name_email_regex = re.compile(r"(.*) <(.*?)>")
# ENVIRONMENT VARIABLES
# read when creating new commits
@@ -644,10 +722,10 @@ class Actor(object):
env_committer_email = "GIT_COMMITTER_EMAIL"
# CONFIGURATION KEYS
- conf_name = 'name'
- conf_email = 'email'
+ conf_name = "name"
+ conf_email = "email"
- __slots__ = ('name', 'email')
+ __slots__ = ("name", "email")
def __init__(self, name: Optional[str], email: Optional[str]) -> None:
self.name = name
@@ -669,13 +747,13 @@ class Actor(object):
return '<git.Actor "%s <%s>">' % (self.name, self.email)
@classmethod
- def _from_string(cls, string: str) -> 'Actor':
+ def _from_string(cls, string: str) -> "Actor":
"""Create an Actor from a string.
:param string: is the string, which is expected to be in regular git format
John Doe <jdoe@example.com>
- :return: Actor """
+ :return: Actor"""
m = cls.name_email_regex.search(string)
if m:
name, email = m.groups()
@@ -690,9 +768,13 @@ class Actor(object):
# END handle name/email matching
@classmethod
- def _main_actor(cls, env_name: str, env_email: str,
- config_reader: Union[None, 'GitConfigParser', 'SectionConstraint'] = None) -> 'Actor':
- actor = Actor('', '')
+ def _main_actor(
+ cls,
+ env_name: str,
+ env_email: str,
+ config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None,
+ ) -> "Actor":
+ actor = Actor("", "")
user_id = None # We use this to avoid multiple calls to getpass.getuser()
def default_email() -> str:
@@ -702,17 +784,19 @@ class Actor(object):
return user_id
def default_name() -> str:
- return default_email().split('@')[0]
+ return default_email().split("@")[0]
- for attr, evar, cvar, default in (('name', env_name, cls.conf_name, default_name),
- ('email', env_email, cls.conf_email, default_email)):
+ for attr, evar, cvar, default in (
+ ("name", env_name, cls.conf_name, default_name),
+ ("email", env_email, cls.conf_email, default_email),
+ ):
try:
val = os.environ[evar]
setattr(actor, attr, val)
except KeyError:
if config_reader is not None:
try:
- val = config_reader.get('user', cvar)
+ val = config_reader.get("user", cvar)
except Exception:
val = default()
setattr(actor, attr, val)
@@ -724,7 +808,9 @@ class Actor(object):
return actor
@classmethod
- def committer(cls, config_reader: Union[None, 'GitConfigParser', 'SectionConstraint'] = None) -> 'Actor':
+ def committer(
+ cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None
+ ) -> "Actor":
"""
:return: Actor instance corresponding to the configured committer. It behaves
similar to the git implementation, such that the environment will override
@@ -732,10 +818,14 @@ class Actor(object):
generated
:param config_reader: ConfigReader to use to retrieve the values from in case
they are not set in the environment"""
- return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader)
+ return cls._main_actor(
+ cls.env_committer_name, cls.env_committer_email, config_reader
+ )
@classmethod
- def author(cls, config_reader: Union[None, 'GitConfigParser', 'SectionConstraint'] = None) -> 'Actor':
+ def author(
+ cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None
+ ) -> "Actor":
"""Same as committer(), but defines the main author. It may be specified in the environment,
but defaults to the committer"""
return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader)
@@ -767,6 +857,7 @@ class Stats(object):
In addition to the items in the stat-dict, it features additional information::
files = number of changed files as int"""
+
__slots__ = ("total", "files")
def __init__(self, total: Total_TD, files: Dict[PathLike, Files_TD]):
@@ -774,30 +865,30 @@ class Stats(object):
self.files = files
@classmethod
- def _list_from_string(cls, repo: 'Repo', text: str) -> 'Stats':
+ def _list_from_string(cls, repo: "Repo", text: str) -> "Stats":
"""Create a Stat object from output retrieved by git-diff.
:return: git.Stat"""
- hsh: HSH_TD = {'total': {'insertions': 0,
- 'deletions': 0,
- 'lines': 0,
- 'files': 0},
- 'files': {}
- }
+ hsh: HSH_TD = {
+ "total": {"insertions": 0, "deletions": 0, "lines": 0, "files": 0},
+ "files": {},
+ }
for line in text.splitlines():
(raw_insertions, raw_deletions, filename) = line.split("\t")
- insertions = raw_insertions != '-' and int(raw_insertions) or 0
- deletions = raw_deletions != '-' and int(raw_deletions) or 0
- hsh['total']['insertions'] += insertions
- hsh['total']['deletions'] += deletions
- hsh['total']['lines'] += insertions + deletions
- hsh['total']['files'] += 1
- files_dict: Files_TD = {'insertions': insertions,
- 'deletions': deletions,
- 'lines': insertions + deletions}
- hsh['files'][filename.strip()] = files_dict
- return Stats(hsh['total'], hsh['files'])
+ insertions = raw_insertions != "-" and int(raw_insertions) or 0
+ deletions = raw_deletions != "-" and int(raw_deletions) or 0
+ hsh["total"]["insertions"] += insertions
+ hsh["total"]["deletions"] += deletions
+ hsh["total"]["lines"] += insertions + deletions
+ hsh["total"]["files"] += 1
+ files_dict: Files_TD = {
+ "insertions": insertions,
+ "deletions": deletions,
+ "lines": insertions + deletions,
+ }
+ hsh["files"][filename.strip()] = files_dict
+ return Stats(hsh["total"], hsh["files"])
class IndexFileSHA1Writer(object):
@@ -809,6 +900,7 @@ class IndexFileSHA1Writer(object):
Only useful to the indexfile
:note: Based on the dulwich project"""
+
__slots__ = ("f", "sha1")
def __init__(self, f: IO) -> None:
@@ -841,6 +933,7 @@ class LockFile(object):
As we are a utility class to be derived from, we only use protected methods.
Locks will automatically be released on destruction"""
+
__slots__ = ("_file_path", "_owns_lock")
def __init__(self, file_path: PathLike) -> None:
@@ -867,8 +960,10 @@ class LockFile(object):
return
lock_file = self._lock_file_path()
if osp.isfile(lock_file):
- raise IOError("Lock for file %r did already exist, delete %r in case the lock is illegal" %
- (self._file_path, lock_file))
+ raise IOError(
+ "Lock for file %r did already exist, delete %r in case the lock is illegal"
+ % (self._file_path, lock_file)
+ )
try:
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
@@ -909,9 +1004,15 @@ class BlockingLockFile(LockFile):
:note: If the directory containing the lock was removed, an exception will
be raised during the blocking period, preventing hangs as the lock
can never be obtained."""
+
__slots__ = ("_check_interval", "_max_block_time")
- def __init__(self, file_path: PathLike, check_interval_s: float = 0.3, max_block_time_s: int = maxsize) -> None:
+ def __init__(
+ self,
+ file_path: PathLike,
+ check_interval_s: float = 0.3,
+ max_block_time_s: int = maxsize,
+ ) -> None:
"""Configure the instance
:param check_interval_s:
@@ -937,13 +1038,18 @@ class BlockingLockFile(LockFile):
# readable anymore, raise an exception
curtime = time.time()
if not osp.isdir(osp.dirname(self._lock_file_path())):
- msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % (
- self._lock_file_path(), curtime - starttime)
+ msg = (
+ "Directory containing the lockfile %r was not readable anymore after waiting %g seconds"
+ % (self._lock_file_path(), curtime - starttime)
+ )
raise IOError(msg) from e
# END handle missing directory
if curtime >= maxtime:
- msg = "Waited %g seconds for lock at %r" % (maxtime - starttime, self._lock_file_path())
+ msg = "Waited %g seconds for lock at %r" % (
+ maxtime - starttime,
+ self._lock_file_path(),
+ )
raise IOError(msg) from e
# END abort if we wait too long
time.sleep(self._check_interval)
@@ -971,12 +1077,13 @@ class IterableList(List[T_IterableObj]):
A prefix can be specified which is to be used in case the id returned by the
items always contains a prefix that does not matter to the user, so it
can be left out."""
- __slots__ = ('_id_attr', '_prefix')
- def __new__(cls, id_attr: str, prefix: str = '') -> 'IterableList[IterableObj]':
+ __slots__ = ("_id_attr", "_prefix")
+
+ def __new__(cls, id_attr: str, prefix: str = "") -> "IterableList[IterableObj]":
return super(IterableList, cls).__new__(cls)
- def __init__(self, id_attr: str, prefix: str = '') -> None:
+ def __init__(self, id_attr: str, prefix: str = "") -> None:
self._id_attr = id_attr
self._prefix = prefix
@@ -1008,7 +1115,9 @@ class IterableList(List[T_IterableObj]):
def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: # type: ignore
- assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str"
+ assert isinstance(
+ index, (int, str, slice)
+ ), "Index of IterableList should be an int or str"
if isinstance(index, int):
return list.__getitem__(self, index)
@@ -1018,12 +1127,16 @@ class IterableList(List[T_IterableObj]):
try:
return getattr(self, index)
except AttributeError as e:
- raise IndexError("No item found with id %r" % (self._prefix + index)) from e
+ raise IndexError(
+ "No item found with id %r" % (self._prefix + index)
+ ) from e
# END handle getattr
def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None:
- assert isinstance(index, (int, str)), "Index of IterableList should be an int or str"
+ assert isinstance(
+ index, (int, str)
+ ), "Index of IterableList should be an int or str"
delindex = cast(int, index)
if not isinstance(index, int):
@@ -1043,27 +1156,31 @@ class IterableList(List[T_IterableObj]):
class IterableClassWatcher(type):
- """ Metaclass that watches """
+ """Metaclass that watches"""
+
def __init__(cls, name: str, bases: Tuple, clsdict: Dict) -> None:
for base in bases:
if type(base) == IterableClassWatcher:
- warnings.warn(f"GitPython Iterable subclassed by {name}. "
- "Iterable is deprecated due to naming clash since v3.1.18"
- " and will be removed in 3.1.20, "
- "Use IterableObj instead \n",
- DeprecationWarning,
- stacklevel=2)
+ warnings.warn(
+ f"GitPython Iterable subclassed by {name}. "
+ "Iterable is deprecated due to naming clash since v3.1.18"
+ " and will be removed in 3.1.20, "
+ "Use IterableObj instead \n",
+ DeprecationWarning,
+ stacklevel=2,
+ )
class Iterable(metaclass=IterableClassWatcher):
"""Defines an interface for iterable items which is to assure a uniform
way to retrieve and iterate items within the git repository"""
+
__slots__ = ()
_id_attribute_ = "attribute that most suitably identifies your instance"
@classmethod
- def list_items(cls, repo: 'Repo', *args: Any, **kwargs: Any) -> Any:
+ def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any:
"""
Deprecated, use IterableObj instead.
Find all items of this type - subclasses can specify args and kwargs differently.
@@ -1078,7 +1195,7 @@ class Iterable(metaclass=IterableClassWatcher):
return out_list
@classmethod
- def iter_items(cls, repo: 'Repo', *args: Any, **kwargs: Any) -> Any:
+ def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any:
# return typed to be compatible with subtypes e.g. Remote
"""For more information about the arguments, see list_items
:return: iterator yielding Items"""
@@ -1096,7 +1213,9 @@ class IterableObj(Protocol):
_id_attribute_: str
@classmethod
- def list_items(cls, repo: 'Repo', *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]:
+ def list_items(
+ cls, repo: "Repo", *args: Any, **kwargs: Any
+ ) -> IterableList[T_IterableObj]:
"""
Find all items of this type - subclasses can specify args and kwargs differently.
If no args are given, subclasses are obliged to return all items if no additional
@@ -1111,13 +1230,15 @@ class IterableObj(Protocol):
@classmethod
@abstractmethod
- def iter_items(cls, repo: 'Repo', *args: Any, **kwargs: Any
- ) -> Iterator[T_IterableObj]: # Iterator[T_IterableObj]:
+ def iter_items(
+ cls, repo: "Repo", *args: Any, **kwargs: Any
+ ) -> Iterator[T_IterableObj]: # Iterator[T_IterableObj]:
# return typed to be compatible with subtypes e.g. Remote
"""For more information about the arguments, see list_items
- :return: iterator yielding Items"""
+ :return: iterator yielding Items"""
raise NotImplementedError("To be implemented by Subclass")
+
# } END classes