diff options
author | Sebastian Thiel <sebastian.thiel@icloud.com> | 2022-05-18 07:43:53 +0800 |
---|---|---|
committer | Sebastian Thiel <sebastian.thiel@icloud.com> | 2022-05-18 07:43:53 +0800 |
commit | 21ec529987d10e0010badd37f8da3274167d436f (patch) | |
tree | a3394cfe902ce7edd07c89420c21c13274a2d295 /git/util.py | |
parent | b30720ee4d9762a03eae4fa7cfa4b0190d81784d (diff) | |
download | gitpython-21ec529987d10e0010badd37f8da3274167d436f.tar.gz |
Run everything through 'black'
That way people who use it won't be deterred, while it unifies style
everywhere.
Diffstat (limited to 'git/util.py')
-rw-r--r-- | git/util.py | 453 |
1 files changed, 287 insertions, 166 deletions
diff --git a/git/util.py b/git/util.py index 0711265a..edc8750d 100644 --- a/git/util.py +++ b/git/util.py @@ -26,9 +26,26 @@ import warnings # typing --------------------------------------------------------- -from typing import (Any, AnyStr, BinaryIO, Callable, Dict, Generator, IO, Iterator, List, - Optional, Pattern, Sequence, Tuple, TypeVar, Union, cast, - TYPE_CHECKING, overload, ) +from typing import ( + Any, + AnyStr, + BinaryIO, + Callable, + Dict, + Generator, + IO, + Iterator, + List, + Optional, + Pattern, + Sequence, + Tuple, + TypeVar, + Union, + cast, + TYPE_CHECKING, + overload, +) import pathlib @@ -37,14 +54,25 @@ if TYPE_CHECKING: from git.repo.base import Repo from git.config import GitConfigParser, SectionConstraint from git import Git + # from git.objects.base import IndexObject -from .types import (Literal, SupportsIndex, Protocol, runtime_checkable, # because behind py version guards - PathLike, HSH_TD, Total_TD, Files_TD, # aliases - Has_id_attribute) +from .types import ( + Literal, + SupportsIndex, + Protocol, + runtime_checkable, # because behind py version guards + PathLike, + HSH_TD, + Total_TD, + Files_TD, # aliases + Has_id_attribute, +) -T_IterableObj = TypeVar('T_IterableObj', bound=Union['IterableObj', 'Has_id_attribute'], covariant=True) +T_IterableObj = TypeVar( + "T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True +) # So IterableList[Head] is subtype of IterableList[IterableObj] # --------------------------------------------------------------------- @@ -52,14 +80,14 @@ T_IterableObj = TypeVar('T_IterableObj', bound=Union['IterableObj', 'Has_id_attr from gitdb.util import ( # NOQA @IgnorePep8 make_sha, - LockedFD, # @UnusedImport - file_contents_ro, # @UnusedImport - file_contents_ro_filepath, # @UnusedImport - LazyMixin, # @UnusedImport - to_hex_sha, # @UnusedImport - to_bin_sha, # @UnusedImport - bin_to_hex, # @UnusedImport - hex_to_bin, # @UnusedImport + LockedFD, # @UnusedImport + file_contents_ro, # @UnusedImport + file_contents_ro_filepath, # @UnusedImport + LazyMixin, # @UnusedImport + to_hex_sha, # @UnusedImport + to_bin_sha, # @UnusedImport + bin_to_hex, # @UnusedImport + hex_to_bin, # @UnusedImport ) @@ -67,11 +95,26 @@ from gitdb.util import ( # NOQA @IgnorePep8 # Handle once test-cases are back up and running. # Most of these are unused here, but are for use by git-python modules so these # don't see gitdb all the time. Flake of course doesn't like it. -__all__ = ["stream_copy", "join_path", "to_native_path_linux", - "join_path_native", "Stats", "IndexFileSHA1Writer", "IterableObj", "IterableList", - "BlockingLockFile", "LockFile", 'Actor', 'get_user_id', 'assure_directory_exists', - 'RemoteProgress', 'CallableRemoteProgress', 'rmtree', 'unbare_repo', - 'HIDE_WINDOWS_KNOWN_ERRORS'] +__all__ = [ + "stream_copy", + "join_path", + "to_native_path_linux", + "join_path_native", + "Stats", + "IndexFileSHA1Writer", + "IterableObj", + "IterableList", + "BlockingLockFile", + "LockFile", + "Actor", + "get_user_id", + "assure_directory_exists", + "RemoteProgress", + "CallableRemoteProgress", + "rmtree", + "unbare_repo", + "HIDE_WINDOWS_KNOWN_ERRORS", +] log = logging.getLogger(__name__) @@ -81,12 +124,14 @@ log = logging.getLogger(__name__) #: We need an easy way to see if Appveyor TCs start failing, #: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy, #: till then, we wish to hide them. -HIDE_WINDOWS_KNOWN_ERRORS = is_win and os.environ.get('HIDE_WINDOWS_KNOWN_ERRORS', True) -HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get('HIDE_WINDOWS_FREEZE_ERRORS', True) +HIDE_WINDOWS_KNOWN_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_KNOWN_ERRORS", True) +HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get( + "HIDE_WINDOWS_FREEZE_ERRORS", True +) # { Utility Methods -T = TypeVar('T') +T = TypeVar("T") def unbare_repo(func: Callable[..., T]) -> Callable[..., T]: @@ -96,11 +141,14 @@ def unbare_repo(func: Callable[..., T]) -> Callable[..., T]: from .exc import InvalidGitRepositoryError @wraps(func) - def wrapper(self: 'Remote', *args: Any, **kwargs: Any) -> T: + def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T: if self.repo.bare: - raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__) + raise InvalidGitRepositoryError( + "Method '%s' cannot operate on bare repositories" % func.__name__ + ) # END bare method return func(self, *args, **kwargs) + # END wrapper return wrapper @@ -131,7 +179,10 @@ def rmtree(path: PathLike) -> None: except Exception as ex: if HIDE_WINDOWS_KNOWN_ERRORS: from unittest import SkipTest - raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex + + raise SkipTest( + "FIXME: fails with: PermissionError\n {}".format(ex) + ) from ex raise return shutil.rmtree(path, False, onerror) @@ -145,7 +196,9 @@ def rmfile(path: PathLike) -> None: os.remove(path) -def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int: +def stream_copy( + source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024 +) -> int: """Copy all data from the source stream into the destination stream in chunks of size chunk_size @@ -169,24 +222,25 @@ def join_path(a: PathLike, *p: PathLike) -> PathLike: b = str(b) if not b: continue - if b.startswith('/'): + if b.startswith("/"): path += b[1:] - elif path == '' or path.endswith('/'): + elif path == "" or path.endswith("/"): path += b else: - path += '/' + b + path += "/" + b # END for each path token to add return path if is_win: + def to_native_path_windows(path: PathLike) -> PathLike: path = str(path) - return path.replace('/', '\\') + return path.replace("/", "\\") def to_native_path_linux(path: PathLike) -> str: path = str(path) - return path.replace('\\', '/') + return path.replace("\\", "/") __all__.append("to_native_path_windows") to_native_path = to_native_path_windows @@ -222,10 +276,14 @@ def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool: def _get_exe_extensions() -> Sequence[str]: - PATHEXT = os.environ.get('PATHEXT', None) - return tuple(p.upper() for p in PATHEXT.split(os.pathsep)) if PATHEXT \ - else ('.BAT', 'COM', '.EXE') if is_win \ - else ('') + PATHEXT = os.environ.get("PATHEXT", None) + return ( + tuple(p.upper() for p in PATHEXT.split(os.pathsep)) + if PATHEXT + else (".BAT", "COM", ".EXE") + if is_win + else ("") + ) def py_where(program: str, path: Optional[PathLike] = None) -> List[str]: @@ -233,9 +291,15 @@ def py_where(program: str, path: Optional[PathLike] = None) -> List[str]: winprog_exts = _get_exe_extensions() def is_exec(fpath: str) -> bool: - return osp.isfile(fpath) and os.access(fpath, os.X_OK) and ( - os.name != 'nt' or not winprog_exts or any(fpath.upper().endswith(ext) - for ext in winprog_exts)) + return ( + osp.isfile(fpath) + and os.access(fpath, os.X_OK) + and ( + os.name != "nt" + or not winprog_exts + or any(fpath.upper().endswith(ext) for ext in winprog_exts) + ) + ) progs = [] if not path: @@ -244,7 +308,7 @@ def py_where(program: str, path: Optional[PathLike] = None) -> List[str]: folder = folder.strip('"') if folder: exe_path = osp.join(folder, program) - for f in [exe_path] + ['%s%s' % (exe_path, e) for e in winprog_exts]: + for f in [exe_path] + ["%s%s" % (exe_path, e) for e in winprog_exts]: if is_exec(f): progs.append(f) return progs @@ -264,38 +328,26 @@ def _cygexpath(drive: Optional[str], path: str) -> str: else: p = cygpath(p) elif drive: - p = '/cygdrive/%s/%s' % (drive.lower(), p) + p = "/cygdrive/%s/%s" % (drive.lower(), p) p_str = str(p) # ensure it is a str and not AnyPath - return p_str.replace('\\', '/') + return p_str.replace("\\", "/") _cygpath_parsers: Tuple[Tuple[Pattern[str], Callable, bool], ...] = ( # See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx # and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths - (re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"), - (lambda server, share, rest_path: '//%s/%s/%s' % (server, share, rest_path.replace('\\', '/'))), - False - ), - - (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), - (_cygexpath), - False - ), - - (re.compile(r"(\w):[/\\](.*)"), - (_cygexpath), - False - ), - - (re.compile(r"file:(.*)", re.I), - (lambda rest_path: rest_path), - True - ), - - (re.compile(r"(\w{2,}:.*)"), # remote URL, do nothing - (lambda url: url), - False - ), + ( + re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"), + ( + lambda server, share, rest_path: "//%s/%s/%s" + % (server, share, rest_path.replace("\\", "/")) + ), + False, + ), + (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), (_cygexpath), False), + (re.compile(r"(\w):[/\\](.*)"), (_cygexpath), False), + (re.compile(r"file:(.*)", re.I), (lambda rest_path: rest_path), True), + (re.compile(r"(\w{2,}:.*)"), (lambda url: url), False), # remote URL, do nothing ) @@ -303,7 +355,7 @@ def cygpath(path: str) -> str: """Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment.""" path = str(path) # ensure is str and not AnyPath. # Fix to use Paths when 3.5 dropped. or to be just str if only for urls? - if not path.startswith(('/cygdrive', '//')): + if not path.startswith(("/cygdrive", "//")): for regex, parser, recurse in _cygpath_parsers: match = regex.match(path) if match: @@ -325,9 +377,9 @@ def decygpath(path: PathLike) -> str: m = _decygpath_regex.match(path) if m: drive, rest_path = m.groups() - path = '%s:%s' % (drive.upper(), rest_path or '') + path = "%s:%s" % (drive.upper(), rest_path or "") - return path.replace('/', '\\') + return path.replace("/", "\\") #: Store boolean flags denoting if a specific Git executable @@ -363,14 +415,15 @@ def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool: git_dir = osp.dirname(res[0]) if res else "" # Just a name given, not a real path. - uname_cmd = osp.join(git_dir, 'uname') - process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE, - universal_newlines=True) + uname_cmd = osp.join(git_dir, "uname") + process = subprocess.Popen( + [uname_cmd], stdout=subprocess.PIPE, universal_newlines=True + ) uname_out, _ = process.communicate() - #retcode = process.poll() - is_cygwin = 'CYGWIN' in uname_out + # retcode = process.poll() + is_cygwin = "CYGWIN" in uname_out except Exception as ex: - log.debug('Failed checking if running in CYGWIN due to: %r', ex) + log.debug("Failed checking if running in CYGWIN due to: %r", ex) _is_cygwin_cache[git_executable] = is_cygwin return is_cygwin @@ -381,7 +434,9 @@ def get_user_id() -> str: return "%s@%s" % (getpass.getuser(), platform.node()) -def finalize_process(proc: Union[subprocess.Popen, 'Git.AutoInterrupt'], **kwargs: Any) -> None: +def finalize_process( + proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any +) -> None: """Wait for the process (clone, fetch, pull or push) and handle its errors accordingly""" # TODO: No close proc-streams?? proc.wait(**kwargs) @@ -398,13 +453,15 @@ def expand_path(p: PathLike, expand_vars: bool = ...) -> str: ... -def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]: +def expand_path( + p: Union[None, PathLike], expand_vars: bool = True +) -> Optional[PathLike]: if isinstance(p, pathlib.Path): return p.resolve() try: p = osp.expanduser(p) # type: ignore if expand_vars: - p = osp.expandvars(p) # type: ignore + p = osp.expandvars(p) # type: ignore return osp.normpath(osp.abspath(p)) # type: ignore except Exception: return None @@ -430,11 +487,9 @@ def remove_password_if_present(cmdline: Sequence[str]) -> List[str]: continue if url.password is not None: - url = url._replace( - netloc=url.netloc.replace(url.password, "*****")) + url = url._replace(netloc=url.netloc.replace(url.password, "*****")) if url.username is not None: - url = url._replace( - netloc=url.netloc.replace(url.username, "*****")) + url = url._replace(netloc=url.netloc.replace(url.username, "*****")) new_cmdline[index] = urlunsplit(url) except ValueError: # This is not a valid URL @@ -452,19 +507,31 @@ class RemoteProgress(object): Handler providing an interface to parse progress information emitted by git-push and git-fetch and to dispatch callbacks allowing subclasses to react to the progress. """ + _num_op_codes: int = 9 - BEGIN, END, COUNTING, COMPRESSING, WRITING, RECEIVING, RESOLVING, FINDING_SOURCES, CHECKING_OUT = \ - [1 << x for x in range(_num_op_codes)] + ( + BEGIN, + END, + COUNTING, + COMPRESSING, + WRITING, + RECEIVING, + RESOLVING, + FINDING_SOURCES, + CHECKING_OUT, + ) = [1 << x for x in range(_num_op_codes)] STAGE_MASK = BEGIN | END OP_MASK = ~STAGE_MASK - DONE_TOKEN = 'done.' - TOKEN_SEPARATOR = ', ' + DONE_TOKEN = "done." + TOKEN_SEPARATOR = ", " - __slots__ = ('_cur_line', - '_seen_ops', - 'error_lines', # Lines that started with 'error:' or 'fatal:'. - 'other_lines') # Lines not denoting progress (i.e.g. push-infos). + __slots__ = ( + "_cur_line", + "_seen_ops", + "error_lines", # Lines that started with 'error:' or 'fatal:'. + "other_lines", + ) # Lines not denoting progress (i.e.g. push-infos). re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)") re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)") @@ -486,13 +553,13 @@ class RemoteProgress(object): # Compressing objects: 50% (1/2) # Compressing objects: 100% (2/2) # Compressing objects: 100% (2/2), done. - if isinstance(line, bytes): # mypy argues about ternary assignment - line_str = line.decode('utf-8') + if isinstance(line, bytes): # mypy argues about ternary assignment + line_str = line.decode("utf-8") else: line_str = line self._cur_line = line_str - if self._cur_line.startswith(('error:', 'fatal:')): + if self._cur_line.startswith(("error:", "fatal:")): self.error_lines.append(self._cur_line) return @@ -531,13 +598,13 @@ class RemoteProgress(object): op_code |= self.COMPRESSING elif op_name == "Writing objects": op_code |= self.WRITING - elif op_name == 'Receiving objects': + elif op_name == "Receiving objects": op_code |= self.RECEIVING - elif op_name == 'Resolving deltas': + elif op_name == "Resolving deltas": op_code |= self.RESOLVING - elif op_name == 'Finding sources': + elif op_name == "Finding sources": op_code |= self.FINDING_SOURCES - elif op_name == 'Checking out files': + elif op_name == "Checking out files": op_code |= self.CHECKING_OUT else: # Note: On windows it can happen that partial lines are sent @@ -559,28 +626,32 @@ class RemoteProgress(object): # END begin opcode if message is None: - message = '' + message = "" # END message handling message = message.strip() if message.endswith(self.DONE_TOKEN): op_code |= self.END - message = message[:-len(self.DONE_TOKEN)] + message = message[: -len(self.DONE_TOKEN)] # END end message handling message = message.strip(self.TOKEN_SEPARATOR) - self.update(op_code, - cur_count and float(cur_count), - max_count and float(max_count), - message) + self.update( + op_code, + cur_count and float(cur_count), + max_count and float(max_count), + message, + ) def new_message_handler(self) -> Callable[[str], None]: """ :return: a progress handler suitable for handle_process_output(), passing lines on to this Progress handler in a suitable format""" + def handler(line: AnyStr) -> None: return self._parse_progress_line(line.rstrip()) + # end return handler @@ -588,8 +659,13 @@ class RemoteProgress(object): """Called whenever a line could not be understood and was therefore dropped.""" pass - def update(self, op_code: int, cur_count: Union[str, float], max_count: Union[str, float, None] = None, - message: str = '',) -> None: + def update( + self, + op_code: int, + cur_count: Union[str, float], + max_count: Union[str, float, None] = None, + message: str = "", + ) -> None: """Called whenever the progress changes :param op_code: @@ -618,7 +694,8 @@ class RemoteProgress(object): class CallableRemoteProgress(RemoteProgress): """An implementation forwarding updates to any callable""" - __slots__ = ('_callable') + + __slots__ = "_callable" def __init__(self, fn: Callable) -> None: self._callable = fn @@ -632,9 +709,10 @@ class Actor(object): """Actors hold information about a person acting on the repository. They can be committers and authors or anything with a name and an email as mentioned in the git log entries.""" + # PRECOMPILED REGEX - name_only_regex = re.compile(r'<(.*)>') - name_email_regex = re.compile(r'(.*) <(.*?)>') + name_only_regex = re.compile(r"<(.*)>") + name_email_regex = re.compile(r"(.*) <(.*?)>") # ENVIRONMENT VARIABLES # read when creating new commits @@ -644,10 +722,10 @@ class Actor(object): env_committer_email = "GIT_COMMITTER_EMAIL" # CONFIGURATION KEYS - conf_name = 'name' - conf_email = 'email' + conf_name = "name" + conf_email = "email" - __slots__ = ('name', 'email') + __slots__ = ("name", "email") def __init__(self, name: Optional[str], email: Optional[str]) -> None: self.name = name @@ -669,13 +747,13 @@ class Actor(object): return '<git.Actor "%s <%s>">' % (self.name, self.email) @classmethod - def _from_string(cls, string: str) -> 'Actor': + def _from_string(cls, string: str) -> "Actor": """Create an Actor from a string. :param string: is the string, which is expected to be in regular git format John Doe <jdoe@example.com> - :return: Actor """ + :return: Actor""" m = cls.name_email_regex.search(string) if m: name, email = m.groups() @@ -690,9 +768,13 @@ class Actor(object): # END handle name/email matching @classmethod - def _main_actor(cls, env_name: str, env_email: str, - config_reader: Union[None, 'GitConfigParser', 'SectionConstraint'] = None) -> 'Actor': - actor = Actor('', '') + def _main_actor( + cls, + env_name: str, + env_email: str, + config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None, + ) -> "Actor": + actor = Actor("", "") user_id = None # We use this to avoid multiple calls to getpass.getuser() def default_email() -> str: @@ -702,17 +784,19 @@ class Actor(object): return user_id def default_name() -> str: - return default_email().split('@')[0] + return default_email().split("@")[0] - for attr, evar, cvar, default in (('name', env_name, cls.conf_name, default_name), - ('email', env_email, cls.conf_email, default_email)): + for attr, evar, cvar, default in ( + ("name", env_name, cls.conf_name, default_name), + ("email", env_email, cls.conf_email, default_email), + ): try: val = os.environ[evar] setattr(actor, attr, val) except KeyError: if config_reader is not None: try: - val = config_reader.get('user', cvar) + val = config_reader.get("user", cvar) except Exception: val = default() setattr(actor, attr, val) @@ -724,7 +808,9 @@ class Actor(object): return actor @classmethod - def committer(cls, config_reader: Union[None, 'GitConfigParser', 'SectionConstraint'] = None) -> 'Actor': + def committer( + cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None + ) -> "Actor": """ :return: Actor instance corresponding to the configured committer. It behaves similar to the git implementation, such that the environment will override @@ -732,10 +818,14 @@ class Actor(object): generated :param config_reader: ConfigReader to use to retrieve the values from in case they are not set in the environment""" - return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader) + return cls._main_actor( + cls.env_committer_name, cls.env_committer_email, config_reader + ) @classmethod - def author(cls, config_reader: Union[None, 'GitConfigParser', 'SectionConstraint'] = None) -> 'Actor': + def author( + cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None + ) -> "Actor": """Same as committer(), but defines the main author. It may be specified in the environment, but defaults to the committer""" return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader) @@ -767,6 +857,7 @@ class Stats(object): In addition to the items in the stat-dict, it features additional information:: files = number of changed files as int""" + __slots__ = ("total", "files") def __init__(self, total: Total_TD, files: Dict[PathLike, Files_TD]): @@ -774,30 +865,30 @@ class Stats(object): self.files = files @classmethod - def _list_from_string(cls, repo: 'Repo', text: str) -> 'Stats': + def _list_from_string(cls, repo: "Repo", text: str) -> "Stats": """Create a Stat object from output retrieved by git-diff. :return: git.Stat""" - hsh: HSH_TD = {'total': {'insertions': 0, - 'deletions': 0, - 'lines': 0, - 'files': 0}, - 'files': {} - } + hsh: HSH_TD = { + "total": {"insertions": 0, "deletions": 0, "lines": 0, "files": 0}, + "files": {}, + } for line in text.splitlines(): (raw_insertions, raw_deletions, filename) = line.split("\t") - insertions = raw_insertions != '-' and int(raw_insertions) or 0 - deletions = raw_deletions != '-' and int(raw_deletions) or 0 - hsh['total']['insertions'] += insertions - hsh['total']['deletions'] += deletions - hsh['total']['lines'] += insertions + deletions - hsh['total']['files'] += 1 - files_dict: Files_TD = {'insertions': insertions, - 'deletions': deletions, - 'lines': insertions + deletions} - hsh['files'][filename.strip()] = files_dict - return Stats(hsh['total'], hsh['files']) + insertions = raw_insertions != "-" and int(raw_insertions) or 0 + deletions = raw_deletions != "-" and int(raw_deletions) or 0 + hsh["total"]["insertions"] += insertions + hsh["total"]["deletions"] += deletions + hsh["total"]["lines"] += insertions + deletions + hsh["total"]["files"] += 1 + files_dict: Files_TD = { + "insertions": insertions, + "deletions": deletions, + "lines": insertions + deletions, + } + hsh["files"][filename.strip()] = files_dict + return Stats(hsh["total"], hsh["files"]) class IndexFileSHA1Writer(object): @@ -809,6 +900,7 @@ class IndexFileSHA1Writer(object): Only useful to the indexfile :note: Based on the dulwich project""" + __slots__ = ("f", "sha1") def __init__(self, f: IO) -> None: @@ -841,6 +933,7 @@ class LockFile(object): As we are a utility class to be derived from, we only use protected methods. Locks will automatically be released on destruction""" + __slots__ = ("_file_path", "_owns_lock") def __init__(self, file_path: PathLike) -> None: @@ -867,8 +960,10 @@ class LockFile(object): return lock_file = self._lock_file_path() if osp.isfile(lock_file): - raise IOError("Lock for file %r did already exist, delete %r in case the lock is illegal" % - (self._file_path, lock_file)) + raise IOError( + "Lock for file %r did already exist, delete %r in case the lock is illegal" + % (self._file_path, lock_file) + ) try: flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL @@ -909,9 +1004,15 @@ class BlockingLockFile(LockFile): :note: If the directory containing the lock was removed, an exception will be raised during the blocking period, preventing hangs as the lock can never be obtained.""" + __slots__ = ("_check_interval", "_max_block_time") - def __init__(self, file_path: PathLike, check_interval_s: float = 0.3, max_block_time_s: int = maxsize) -> None: + def __init__( + self, + file_path: PathLike, + check_interval_s: float = 0.3, + max_block_time_s: int = maxsize, + ) -> None: """Configure the instance :param check_interval_s: @@ -937,13 +1038,18 @@ class BlockingLockFile(LockFile): # readable anymore, raise an exception curtime = time.time() if not osp.isdir(osp.dirname(self._lock_file_path())): - msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % ( - self._lock_file_path(), curtime - starttime) + msg = ( + "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" + % (self._lock_file_path(), curtime - starttime) + ) raise IOError(msg) from e # END handle missing directory if curtime >= maxtime: - msg = "Waited %g seconds for lock at %r" % (maxtime - starttime, self._lock_file_path()) + msg = "Waited %g seconds for lock at %r" % ( + maxtime - starttime, + self._lock_file_path(), + ) raise IOError(msg) from e # END abort if we wait too long time.sleep(self._check_interval) @@ -971,12 +1077,13 @@ class IterableList(List[T_IterableObj]): A prefix can be specified which is to be used in case the id returned by the items always contains a prefix that does not matter to the user, so it can be left out.""" - __slots__ = ('_id_attr', '_prefix') - def __new__(cls, id_attr: str, prefix: str = '') -> 'IterableList[IterableObj]': + __slots__ = ("_id_attr", "_prefix") + + def __new__(cls, id_attr: str, prefix: str = "") -> "IterableList[IterableObj]": return super(IterableList, cls).__new__(cls) - def __init__(self, id_attr: str, prefix: str = '') -> None: + def __init__(self, id_attr: str, prefix: str = "") -> None: self._id_attr = id_attr self._prefix = prefix @@ -1008,7 +1115,9 @@ class IterableList(List[T_IterableObj]): def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: # type: ignore - assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str" + assert isinstance( + index, (int, str, slice) + ), "Index of IterableList should be an int or str" if isinstance(index, int): return list.__getitem__(self, index) @@ -1018,12 +1127,16 @@ class IterableList(List[T_IterableObj]): try: return getattr(self, index) except AttributeError as e: - raise IndexError("No item found with id %r" % (self._prefix + index)) from e + raise IndexError( + "No item found with id %r" % (self._prefix + index) + ) from e # END handle getattr def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None: - assert isinstance(index, (int, str)), "Index of IterableList should be an int or str" + assert isinstance( + index, (int, str) + ), "Index of IterableList should be an int or str" delindex = cast(int, index) if not isinstance(index, int): @@ -1043,27 +1156,31 @@ class IterableList(List[T_IterableObj]): class IterableClassWatcher(type): - """ Metaclass that watches """ + """Metaclass that watches""" + def __init__(cls, name: str, bases: Tuple, clsdict: Dict) -> None: for base in bases: if type(base) == IterableClassWatcher: - warnings.warn(f"GitPython Iterable subclassed by {name}. " - "Iterable is deprecated due to naming clash since v3.1.18" - " and will be removed in 3.1.20, " - "Use IterableObj instead \n", - DeprecationWarning, - stacklevel=2) + warnings.warn( + f"GitPython Iterable subclassed by {name}. " + "Iterable is deprecated due to naming clash since v3.1.18" + " and will be removed in 3.1.20, " + "Use IterableObj instead \n", + DeprecationWarning, + stacklevel=2, + ) class Iterable(metaclass=IterableClassWatcher): """Defines an interface for iterable items which is to assure a uniform way to retrieve and iterate items within the git repository""" + __slots__ = () _id_attribute_ = "attribute that most suitably identifies your instance" @classmethod - def list_items(cls, repo: 'Repo', *args: Any, **kwargs: Any) -> Any: + def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: """ Deprecated, use IterableObj instead. Find all items of this type - subclasses can specify args and kwargs differently. @@ -1078,7 +1195,7 @@ class Iterable(metaclass=IterableClassWatcher): return out_list @classmethod - def iter_items(cls, repo: 'Repo', *args: Any, **kwargs: Any) -> Any: + def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: # return typed to be compatible with subtypes e.g. Remote """For more information about the arguments, see list_items :return: iterator yielding Items""" @@ -1096,7 +1213,9 @@ class IterableObj(Protocol): _id_attribute_: str @classmethod - def list_items(cls, repo: 'Repo', *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]: + def list_items( + cls, repo: "Repo", *args: Any, **kwargs: Any + ) -> IterableList[T_IterableObj]: """ Find all items of this type - subclasses can specify args and kwargs differently. If no args are given, subclasses are obliged to return all items if no additional @@ -1111,13 +1230,15 @@ class IterableObj(Protocol): @classmethod @abstractmethod - def iter_items(cls, repo: 'Repo', *args: Any, **kwargs: Any - ) -> Iterator[T_IterableObj]: # Iterator[T_IterableObj]: + def iter_items( + cls, repo: "Repo", *args: Any, **kwargs: Any + ) -> Iterator[T_IterableObj]: # Iterator[T_IterableObj]: # return typed to be compatible with subtypes e.g. Remote """For more information about the arguments, see list_items - :return: iterator yielding Items""" + :return: iterator yielding Items""" raise NotImplementedError("To be implemented by Subclass") + # } END classes |