diff options
Diffstat (limited to 'git/cmd.py')
-rw-r--r-- | git/cmd.py | 592 |
1 files changed, 368 insertions, 224 deletions
@@ -9,12 +9,7 @@ import io import logging import os import signal -from subprocess import ( - call, - Popen, - PIPE, - DEVNULL -) +from subprocess import call, Popen, PIPE, DEVNULL import subprocess import threading from textwrap import dedent @@ -29,10 +24,7 @@ from git.compat import ( from git.exc import CommandError from git.util import is_cygwin_git, cygpath, expand_path, remove_password_if_present -from .exc import ( - GitCommandError, - GitCommandNotFound -) +from .exc import GitCommandError, GitCommandNotFound from .util import ( LazyMixin, stream_copy, @@ -40,8 +32,24 @@ from .util import ( # typing --------------------------------------------------------------------------- -from typing import (Any, AnyStr, BinaryIO, Callable, Dict, IO, Iterator, List, Mapping, - Sequence, TYPE_CHECKING, TextIO, Tuple, Union, cast, overload) +from typing import ( + Any, + AnyStr, + BinaryIO, + Callable, + Dict, + IO, + Iterator, + List, + Mapping, + Sequence, + TYPE_CHECKING, + TextIO, + Tuple, + Union, + cast, + overload, +) from git.types import PathLike, Literal, TBD @@ -52,15 +60,26 @@ if TYPE_CHECKING: # --------------------------------------------------------------------------------- -execute_kwargs = {'istream', 'with_extended_output', - 'with_exceptions', 'as_process', 'stdout_as_string', - 'output_stream', 'with_stdout', 'kill_after_timeout', - 'universal_newlines', 'shell', 'env', 'max_chunk_size', 'strip_newline_in_stdout'} +execute_kwargs = { + "istream", + "with_extended_output", + "with_exceptions", + "as_process", + "stdout_as_string", + "output_stream", + "with_stdout", + "kill_after_timeout", + "universal_newlines", + "shell", + "env", + "max_chunk_size", + "strip_newline_in_stdout", +} log = logging.getLogger(__name__) log.addHandler(logging.NullHandler()) -__all__ = ('Git',) +__all__ = ("Git",) # ============================================================================== @@ -69,18 +88,24 @@ __all__ = ('Git',) # Documentation ## @{ -def handle_process_output(process: 'Git.AutoInterrupt' | Popen, - stdout_handler: Union[None, - Callable[[AnyStr], None], - Callable[[List[AnyStr]], None], - Callable[[bytes, 'Repo', 'DiffIndex'], None]], - stderr_handler: Union[None, - Callable[[AnyStr], None], - Callable[[List[AnyStr]], None]], - finalizer: Union[None, - Callable[[Union[subprocess.Popen, 'Git.AutoInterrupt']], None]] = None, - decode_streams: bool = True, - kill_after_timeout: Union[None, float] = None) -> None: + +def handle_process_output( + process: "Git.AutoInterrupt" | Popen, + stdout_handler: Union[ + None, + Callable[[AnyStr], None], + Callable[[List[AnyStr]], None], + Callable[[bytes, "Repo", "DiffIndex"], None], + ], + stderr_handler: Union[ + None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None] + ], + finalizer: Union[ + None, Callable[[Union[subprocess.Popen, "Git.AutoInterrupt"]], None] + ] = None, + decode_streams: bool = True, + kill_after_timeout: Union[None, float] = None, +) -> None: """Registers for notifications to learn that process output is ready to read, and dispatches lines to the respective line handlers. This function returns once the finalizer returns @@ -101,8 +126,13 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen, should be killed. """ # Use 2 "pump" threads and wait for both to finish. - def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], is_decode: bool, - handler: Union[None, Callable[[Union[bytes, str]], None]]) -> None: + def pump_stream( + cmdline: List[str], + name: str, + stream: Union[BinaryIO, TextIO], + is_decode: bool, + handler: Union[None, Callable[[Union[bytes, str]], None]], + ) -> None: try: for line in stream: if handler: @@ -114,21 +144,25 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen, handler(line) except Exception as ex: - log.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}") + log.error( + f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}" + ) if "I/O operation on closed file" not in str(ex): # Only reraise if the error was not due to the stream closing - raise CommandError([f'<{name}-pump>'] + remove_password_if_present(cmdline), ex) from ex + raise CommandError( + [f"<{name}-pump>"] + remove_password_if_present(cmdline), ex + ) from ex finally: stream.close() - if hasattr(process, 'proc'): - process = cast('Git.AutoInterrupt', process) - cmdline: str | Tuple[str, ...] | List[str] = getattr(process.proc, 'args', '') + if hasattr(process, "proc"): + process = cast("Git.AutoInterrupt", process) + cmdline: str | Tuple[str, ...] | List[str] = getattr(process.proc, "args", "") p_stdout = process.proc.stdout if process.proc else None p_stderr = process.proc.stderr if process.proc else None else: process = cast(Popen, process) - cmdline = getattr(process, 'args', '') + cmdline = getattr(process, "args", "") p_stdout = process.stdout p_stderr = process.stderr @@ -137,15 +171,16 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen, pumps: List[Tuple[str, IO, Callable[..., None] | None]] = [] if p_stdout: - pumps.append(('stdout', p_stdout, stdout_handler)) + pumps.append(("stdout", p_stdout, stdout_handler)) if p_stderr: - pumps.append(('stderr', p_stderr, stderr_handler)) + pumps.append(("stderr", p_stderr, stderr_handler)) threads: List[threading.Thread] = [] for name, stream, handler in pumps: - t = threading.Thread(target=pump_stream, - args=(cmdline, name, stream, decode_streams, handler)) + t = threading.Thread( + target=pump_stream, args=(cmdline, name, stream, decode_streams, handler) + ) t.daemon = True t.start() threads.append(t) @@ -158,12 +193,15 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen, if isinstance(process, Git.AutoInterrupt): process._terminate() else: # Don't want to deal with the other case - raise RuntimeError("Thread join() timed out in cmd.handle_process_output()." - f" kill_after_timeout={kill_after_timeout} seconds") + raise RuntimeError( + "Thread join() timed out in cmd.handle_process_output()." + f" kill_after_timeout={kill_after_timeout} seconds" + ) if stderr_handler: error_str: Union[str, bytes] = ( "error: process killed because it timed out." - f" kill_after_timeout={kill_after_timeout} seconds") + f" kill_after_timeout={kill_after_timeout} seconds" + ) if not decode_streams and isinstance(p_stderr, BinaryIO): # Assume stderr_handler needs binary input error_str = cast(str, error_str) @@ -179,19 +217,22 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen, def dashify(string: str) -> str: - return string.replace('_', '-') + return string.replace("_", "-") def slots_to_dict(self: object, exclude: Sequence[str] = ()) -> Dict[str, Any]: return {s: getattr(self, s) for s in self.__slots__ if s not in exclude} -def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], excluded: Sequence[str] = ()) -> None: +def dict_to_slots_and__excluded_are_none( + self: object, d: Mapping[str, Any], excluded: Sequence[str] = () +) -> None: for k, v in d.items(): setattr(self, k, v) for k in excluded: setattr(self, k, None) + ## -- End Utilities -- @} @@ -200,8 +241,11 @@ CREATE_NO_WINDOW = 0x08000000 ## CREATE_NEW_PROCESS_GROUP is needed to allow killing it afterwards, # see https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal -PROC_CREATIONFLAGS = (CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined] - if is_win else 0) # mypy error if not windows +PROC_CREATIONFLAGS = ( + CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined] + if is_win + else 0 +) # mypy error if not windows class Git(LazyMixin): @@ -220,10 +264,18 @@ class Git(LazyMixin): of the command to stdout. Set its value to 'full' to see details about the returned values. """ - __slots__ = ("_working_dir", "cat_file_all", "cat_file_header", "_version_info", - "_git_options", "_persistent_git_options", "_environment") - _excluded_ = ('cat_file_all', 'cat_file_header', '_version_info') + __slots__ = ( + "_working_dir", + "cat_file_all", + "cat_file_header", + "_version_info", + "_git_options", + "_persistent_git_options", + "_environment", + ) + + _excluded_ = ("cat_file_all", "cat_file_header", "_version_info") def __getstate__(self) -> Dict[str, Any]: return slots_to_dict(self, exclude=self._excluded_) @@ -233,7 +285,7 @@ class Git(LazyMixin): # CONFIGURATION - git_exec_name = "git" # default that should work on linux and windows + git_exec_name = "git" # default that should work on linux and windows # Enables debugging of GitPython's git commands GIT_PYTHON_TRACE = os.environ.get("GIT_PYTHON_TRACE", False) @@ -282,13 +334,18 @@ class Git(LazyMixin): # warn or raise exception if test failed if not has_git: - err = dedent("""\ + err = ( + dedent( + """\ Bad git executable. The git executable must be specified in one of the following ways: - be included in your $PATH - be set via $%s - explicitly set via git.refresh() - """) % cls._git_exec_env_var + """ + ) + % cls._git_exec_env_var + ) # revert to whatever the old_git was cls.GIT_PYTHON_GIT_EXECUTABLE = old_git @@ -314,7 +371,9 @@ class Git(LazyMixin): if mode in quiet: pass elif mode in warn or mode in error: - err = dedent("""\ + err = ( + dedent( + """\ %s All git commands will error until this is rectified. @@ -326,32 +385,42 @@ class Git(LazyMixin): Example: export %s=%s - """) % ( - err, - cls._refresh_env_var, - "|".join(quiet), - "|".join(warn), - "|".join(error), - cls._refresh_env_var, - quiet[0]) + """ + ) + % ( + err, + cls._refresh_env_var, + "|".join(quiet), + "|".join(warn), + "|".join(error), + cls._refresh_env_var, + quiet[0], + ) + ) if mode in warn: print("WARNING: %s" % err) else: raise ImportError(err) else: - err = dedent("""\ + err = ( + dedent( + """\ %s environment variable has been set but it has been set with an invalid value. Use only the following values: - %s: for no warning or exception - %s: for a printed warning - %s: for a raised exception - """) % ( - cls._refresh_env_var, - "|".join(quiet), - "|".join(warn), - "|".join(error)) + """ + ) + % ( + cls._refresh_env_var, + "|".join(quiet), + "|".join(warn), + "|".join(error), + ) + ) raise ImportError(err) # we get here if this was the init refresh and the refresh mode @@ -395,7 +464,7 @@ class Git(LazyMixin): Hence we undo the escaping just to be sure. """ url = os.path.expandvars(url) - if url.startswith('~'): + if url.startswith("~"): url = os.path.expanduser(url) url = url.replace("\\\\", "\\").replace("\\", "/") return url @@ -441,7 +510,7 @@ class Git(LazyMixin): log.info("Ignored error after process had died: %r", ex) # can be that nothing really exists anymore ... - if os is None or getattr(os, 'kill', None) is None: + if os is None or getattr(os, "kill", None) is None: return None # try to kill it @@ -458,7 +527,10 @@ class Git(LazyMixin): # we simply use the shell and redirect to nul. Its slower than CreateProcess, question # is whether we really want to see all these messages. Its annoying no matter what. if is_win: - call(("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)), shell=True) + call( + ("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)), + shell=True, + ) # END exception handling def __del__(self) -> None: @@ -468,15 +540,15 @@ class Git(LazyMixin): return getattr(self.proc, attr) # TODO: Bad choice to mimic `proc.wait()` but with different args. - def wait(self, stderr: Union[None, str, bytes] = b'') -> int: + def wait(self, stderr: Union[None, str, bytes] = b"") -> int: """Wait for the process and return its status code. :param stderr: Previously read value of stderr, in case stderr is already closed. :warn: may deadlock if output or error pipes are used and not handled separately. :raise GitCommandError: if the return status is not 0""" if stderr is None: - stderr_b = b'' - stderr_b = force_bytes(data=stderr, encoding='utf-8') + stderr_b = b"" + stderr_b = force_bytes(data=stderr, encoding="utf-8") status: Union[int, None] if self.proc is not None: status = self.proc.wait() @@ -485,21 +557,25 @@ class Git(LazyMixin): status = self.status p_stderr = None - def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes: + def read_all_from_possibly_closed_stream( + stream: Union[IO[bytes], None] + ) -> bytes: if stream: try: return stderr_b + force_bytes(stream.read()) except ValueError: - return stderr_b or b'' + return stderr_b or b"" else: - return stderr_b or b'' + return stderr_b or b"" # END status handling if status != 0: errstr = read_all_from_possibly_closed_stream(p_stderr) - log.debug('AutoInterrupt wait stderr: %r' % (errstr,)) - raise GitCommandError(remove_password_if_present(self.args), status, errstr) + log.debug("AutoInterrupt wait stderr: %r" % (errstr,)) + raise GitCommandError( + remove_password_if_present(self.args), status, errstr + ) return status # END auto interrupt @@ -513,12 +589,12 @@ class Git(LazyMixin): If not all data is read to the end of the objects's lifetime, we read the rest to assure the underlying stream continues to work""" - __slots__: Tuple[str, ...] = ('_stream', '_nbr', '_size') + __slots__: Tuple[str, ...] = ("_stream", "_nbr", "_size") def __init__(self, size: int, stream: IO[bytes]) -> None: self._stream = stream self._size = size - self._nbr = 0 # num bytes read + self._nbr = 0 # num bytes read # special case: if the object is empty, has null bytes, get the # final newline right away. @@ -529,7 +605,7 @@ class Git(LazyMixin): def read(self, size: int = -1) -> bytes: bytes_left = self._size - self._nbr if bytes_left == 0: - return b'' + return b"" if size > -1: # assure we don't try to read past our limit size = min(bytes_left, size) @@ -542,13 +618,13 @@ class Git(LazyMixin): # check for depletion, read our final byte to make the stream usable by others if self._size - self._nbr == 0: - self._stream.read(1) # final newline + self._stream.read(1) # final newline # END finish reading return data def readline(self, size: int = -1) -> bytes: if self._nbr == self._size: - return b'' + return b"" # clamp size to lowest allowed value bytes_left = self._size - self._nbr @@ -589,7 +665,7 @@ class Git(LazyMixin): return out # skipcq: PYL-E0301 - def __iter__(self) -> 'Git.CatFileContentStream': + def __iter__(self) -> "Git.CatFileContentStream": return self def __next__(self) -> bytes: @@ -634,7 +710,7 @@ class Git(LazyMixin): """A convenience method as it allows to call the command as if it was an object. :return: Callable object that will execute call _call_process with your arguments.""" - if name[0] == '_': + if name[0] == "_": return LazyMixin.__getattr__(self, name) return lambda *args, **kwargs: self._call_process(name, *args, **kwargs) @@ -650,27 +726,31 @@ class Git(LazyMixin): """ self._persistent_git_options = self.transform_kwargs( - split_single_char_options=True, **kwargs) + split_single_char_options=True, **kwargs + ) def _set_cache_(self, attr: str) -> None: - if attr == '_version_info': + if attr == "_version_info": # We only use the first 4 numbers, as everything else could be strings in fact (on windows) - process_version = self._call_process('version') # should be as default *args and **kwargs used - version_numbers = process_version.split(' ')[2] - - self._version_info = cast(Tuple[int, int, int, int], - tuple(int(n) for n in version_numbers.split('.')[:4] if n.isdigit()) - ) + process_version = self._call_process( + "version" + ) # should be as default *args and **kwargs used + version_numbers = process_version.split(" ")[2] + + self._version_info = cast( + Tuple[int, int, int, int], + tuple(int(n) for n in version_numbers.split(".")[:4] if n.isdigit()), + ) else: super(Git, self)._set_cache_(attr) # END handle version info - @ property + @property def working_dir(self) -> Union[None, PathLike]: """:return: Git directory we are working on""" return self._working_dir - @ property + @property def version_info(self) -> Tuple[int, int, int, int]: """ :return: tuple(int, int, int, int) tuple with integers representing the major, minor @@ -678,69 +758,72 @@ class Git(LazyMixin): This value is generated on demand and is cached""" return self._version_info - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - as_process: Literal[True] - ) -> 'AutoInterrupt': + @overload + def execute( + self, command: Union[str, Sequence[Any]], *, as_process: Literal[True] + ) -> "AutoInterrupt": ... - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - as_process: Literal[False] = False, - stdout_as_string: Literal[True] - ) -> Union[str, Tuple[int, str, str]]: + @overload + def execute( + self, + command: Union[str, Sequence[Any]], + *, + as_process: Literal[False] = False, + stdout_as_string: Literal[True], + ) -> Union[str, Tuple[int, str, str]]: ... - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - as_process: Literal[False] = False, - stdout_as_string: Literal[False] = False - ) -> Union[bytes, Tuple[int, bytes, str]]: + @overload + def execute( + self, + command: Union[str, Sequence[Any]], + *, + as_process: Literal[False] = False, + stdout_as_string: Literal[False] = False, + ) -> Union[bytes, Tuple[int, bytes, str]]: ... - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - with_extended_output: Literal[False], - as_process: Literal[False], - stdout_as_string: Literal[True] - ) -> str: + @overload + def execute( + self, + command: Union[str, Sequence[Any]], + *, + with_extended_output: Literal[False], + as_process: Literal[False], + stdout_as_string: Literal[True], + ) -> str: ... - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - with_extended_output: Literal[False], - as_process: Literal[False], - stdout_as_string: Literal[False] - ) -> bytes: + @overload + def execute( + self, + command: Union[str, Sequence[Any]], + *, + with_extended_output: Literal[False], + as_process: Literal[False], + stdout_as_string: Literal[False], + ) -> bytes: ... - def execute(self, - command: Union[str, Sequence[Any]], - istream: Union[None, BinaryIO] = None, - with_extended_output: bool = False, - with_exceptions: bool = True, - as_process: bool = False, - output_stream: Union[None, BinaryIO] = None, - stdout_as_string: bool = True, - kill_after_timeout: Union[None, float] = None, - with_stdout: bool = True, - universal_newlines: bool = False, - shell: Union[None, bool] = None, - env: Union[None, Mapping[str, str]] = None, - max_chunk_size: int = io.DEFAULT_BUFFER_SIZE, - strip_newline_in_stdout: bool = True, - **subprocess_kwargs: Any - ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]: + def execute( + self, + command: Union[str, Sequence[Any]], + istream: Union[None, BinaryIO] = None, + with_extended_output: bool = False, + with_exceptions: bool = True, + as_process: bool = False, + output_stream: Union[None, BinaryIO] = None, + stdout_as_string: bool = True, + kill_after_timeout: Union[None, float] = None, + with_stdout: bool = True, + universal_newlines: bool = False, + shell: Union[None, bool] = None, + env: Union[None, Mapping[str, str]] = None, + max_chunk_size: int = io.DEFAULT_BUFFER_SIZE, + strip_newline_in_stdout: bool = True, + **subprocess_kwargs: Any, + ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]: """Handles executing the command on the shell and consumes and returns the returned information (stdout) @@ -831,8 +914,8 @@ class Git(LazyMixin): you must update the execute_kwargs tuple housed in this module.""" # Remove password for the command if present redacted_command = remove_password_if_present(command) - if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != 'full' or as_process): - log.info(' '.join(redacted_command)) + if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != "full" or as_process): + log.info(" ".join(redacted_command)) # Allow the user to have the command executed in their working dir. try: @@ -858,33 +941,47 @@ class Git(LazyMixin): if is_win: cmd_not_found_exception = OSError if kill_after_timeout is not None: - raise GitCommandError(redacted_command, '"kill_after_timeout" feature is not supported on Windows.') + raise GitCommandError( + redacted_command, + '"kill_after_timeout" feature is not supported on Windows.', + ) else: - cmd_not_found_exception = FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable + cmd_not_found_exception = ( + FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable + ) # end handle - stdout_sink = (PIPE - if with_stdout - else getattr(subprocess, 'DEVNULL', None) or open(os.devnull, 'wb')) + stdout_sink = ( + PIPE + if with_stdout + else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb") + ) istream_ok = "None" if istream: istream_ok = "<valid stream>" - log.debug("Popen(%s, cwd=%s, universal_newlines=%s, shell=%s, istream=%s)", - redacted_command, cwd, universal_newlines, shell, istream_ok) + log.debug( + "Popen(%s, cwd=%s, universal_newlines=%s, shell=%s, istream=%s)", + redacted_command, + cwd, + universal_newlines, + shell, + istream_ok, + ) try: - proc = Popen(command, - env=env, - cwd=cwd, - bufsize=-1, - stdin=istream or DEVNULL, - stderr=PIPE, - stdout=stdout_sink, - shell=shell is not None and shell or self.USE_SHELL, - close_fds=is_posix, # unsupported on windows - universal_newlines=universal_newlines, - creationflags=PROC_CREATIONFLAGS, - **subprocess_kwargs - ) + proc = Popen( + command, + env=env, + cwd=cwd, + bufsize=-1, + stdin=istream or DEVNULL, + stderr=PIPE, + stdout=stdout_sink, + shell=shell is not None and shell or self.USE_SHELL, + close_fds=is_posix, # unsupported on windows + universal_newlines=universal_newlines, + creationflags=PROC_CREATIONFLAGS, + **subprocess_kwargs, + ) except cmd_not_found_exception as err: raise GitCommandNotFound(redacted_command, err) from err @@ -897,9 +994,12 @@ class Git(LazyMixin): return self.AutoInterrupt(proc, command) def _kill_process(pid: int) -> None: - """ Callback method to kill a process. """ - p = Popen(['ps', '--ppid', str(pid)], stdout=PIPE, - creationflags=PROC_CREATIONFLAGS) + """Callback method to kill a process.""" + p = Popen( + ["ps", "--ppid", str(pid)], + stdout=PIPE, + creationflags=PROC_CREATIONFLAGS, + ) child_pids = [] if p.stdout is not None: for line in p.stdout: @@ -909,29 +1009,32 @@ class Git(LazyMixin): child_pids.append(int(local_pid)) try: # Windows does not have SIGKILL, so use SIGTERM instead - sig = getattr(signal, 'SIGKILL', signal.SIGTERM) + sig = getattr(signal, "SIGKILL", signal.SIGTERM) os.kill(pid, sig) for child_pid in child_pids: try: os.kill(child_pid, sig) except OSError: pass - kill_check.set() # tell the main routine that the process was killed + kill_check.set() # tell the main routine that the process was killed except OSError: # It is possible that the process gets completed in the duration after timeout # happens and before we try to kill the process. pass return + # end if kill_after_timeout is not None: kill_check = threading.Event() - watchdog = threading.Timer(kill_after_timeout, _kill_process, args=(proc.pid,)) + watchdog = threading.Timer( + kill_after_timeout, _kill_process, args=(proc.pid,) + ) # Wait for the process to return status = 0 - stdout_value: Union[str, bytes] = b'' - stderr_value: Union[str, bytes] = b'' + stdout_value: Union[str, bytes] = b"" + stderr_value: Union[str, bytes] = b"" newline = "\n" if universal_newlines else b"\n" try: if output_stream is None: @@ -941,8 +1044,10 @@ class Git(LazyMixin): if kill_after_timeout is not None: watchdog.cancel() if kill_check.is_set(): - stderr_value = ('Timeout: the command "%s" did not complete in %d ' - 'secs.' % (" ".join(redacted_command), kill_after_timeout)) + stderr_value = ( + 'Timeout: the command "%s" did not complete in %d ' + "secs." % (" ".join(redacted_command), kill_after_timeout) + ) if not universal_newlines: stderr_value = stderr_value.encode(defenc) # strip trailing "\n" @@ -953,12 +1058,16 @@ class Git(LazyMixin): status = proc.returncode else: - max_chunk_size = max_chunk_size if max_chunk_size and max_chunk_size > 0 else io.DEFAULT_BUFFER_SIZE + max_chunk_size = ( + max_chunk_size + if max_chunk_size and max_chunk_size > 0 + else io.DEFAULT_BUFFER_SIZE + ) stream_copy(proc.stdout, output_stream, max_chunk_size) stdout_value = proc.stdout.read() stderr_value = proc.stderr.read() # strip trailing "\n" - if stderr_value.endswith(newline): # type: ignore + if stderr_value.endswith(newline): # type: ignore stderr_value = stderr_value[:-1] status = proc.wait() # END stdout handling @@ -966,18 +1075,28 @@ class Git(LazyMixin): proc.stdout.close() proc.stderr.close() - if self.GIT_PYTHON_TRACE == 'full': + if self.GIT_PYTHON_TRACE == "full": cmdstr = " ".join(redacted_command) def as_text(stdout_value: Union[bytes, str]) -> str: - return not output_stream and safe_decode(stdout_value) or '<OUTPUT_STREAM>' + return ( + not output_stream and safe_decode(stdout_value) or "<OUTPUT_STREAM>" + ) + # end if stderr_value: - log.info("%s -> %d; stdout: '%s'; stderr: '%s'", - cmdstr, status, as_text(stdout_value), safe_decode(stderr_value)) + log.info( + "%s -> %d; stdout: '%s'; stderr: '%s'", + cmdstr, + status, + as_text(stdout_value), + safe_decode(stderr_value), + ) elif stdout_value: - log.info("%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value)) + log.info( + "%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value) + ) else: log.info("%s -> %d", cmdstr, status) # END handle debug printing @@ -985,7 +1104,9 @@ class Git(LazyMixin): if with_exceptions and status != 0: raise GitCommandError(redacted_command, status, stderr_value, stdout_value) - if isinstance(stdout_value, bytes) and stdout_as_string: # could also be output_stream + if ( + isinstance(stdout_value, bytes) and stdout_as_string + ): # could also be output_stream stdout_value = safe_decode(stdout_value) # Allow access to the command's status code @@ -1042,7 +1163,9 @@ class Git(LazyMixin): finally: self.update_environment(**old_env) - def transform_kwarg(self, name: str, value: Any, split_single_char_options: bool) -> List[str]: + def transform_kwarg( + self, name: str, value: Any, split_single_char_options: bool + ) -> List[str]: if len(name) == 1: if value is True: return ["-%s" % name] @@ -1058,7 +1181,9 @@ class Git(LazyMixin): return ["--%s=%s" % (dashify(name), value)] return [] - def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any) -> List[str]: + def transform_kwargs( + self, split_single_char_options: bool = True, **kwargs: Any + ) -> List[str]: """Transforms Python style kwargs into git command line options.""" args = [] for k, v in kwargs.items(): @@ -1081,7 +1206,7 @@ class Git(LazyMixin): return outlist - def __call__(self, **kwargs: Any) -> 'Git': + def __call__(self, **kwargs: Any) -> "Git": """Specify command line options to the git executable for a subcommand call @@ -1094,28 +1219,34 @@ class Git(LazyMixin): ``Examples``:: git(work_tree='/tmp').difftool()""" self._git_options = self.transform_kwargs( - split_single_char_options=True, **kwargs) + split_single_char_options=True, **kwargs + ) return self @overload - def _call_process(self, method: str, *args: None, **kwargs: None - ) -> str: + def _call_process(self, method: str, *args: None, **kwargs: None) -> str: ... # if no args given, execute called with all defaults @overload - def _call_process(self, method: str, - istream: int, - as_process: Literal[True], - *args: Any, **kwargs: Any - ) -> 'Git.AutoInterrupt': ... + def _call_process( + self, + method: str, + istream: int, + as_process: Literal[True], + *args: Any, + **kwargs: Any, + ) -> "Git.AutoInterrupt": + ... @overload - def _call_process(self, method: str, *args: Any, **kwargs: Any - ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], 'Git.AutoInterrupt']: + def _call_process( + self, method: str, *args: Any, **kwargs: Any + ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: ... - def _call_process(self, method: str, *args: Any, **kwargs: Any - ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], 'Git.AutoInterrupt']: + def _call_process( + self, method: str, *args: Any, **kwargs: Any + ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: """Run the given git command with the specified arguments and return the result as a String @@ -1145,13 +1276,13 @@ class Git(LazyMixin): :return: Same as ``execute`` if no args given used execute default (esp. as_process = False, stdout_as_string = True) - and return str """ + and return str""" # Handle optional arguments prior to calling transform_kwargs # otherwise these'll end up in args, which is bad. exec_kwargs = {k: v for k, v in kwargs.items() if k in execute_kwargs} opts_kwargs = {k: v for k, v in kwargs.items() if k not in execute_kwargs} - insert_after_this_arg = opts_kwargs.pop('insert_kwargs_after', None) + insert_after_this_arg = opts_kwargs.pop("insert_kwargs_after", None) # Prepare the argument list @@ -1164,10 +1295,12 @@ class Git(LazyMixin): try: index = ext_args.index(insert_after_this_arg) except ValueError as err: - raise ValueError("Couldn't find argument '%s' in args %s to insert cmd options after" - % (insert_after_this_arg, str(ext_args))) from err + raise ValueError( + "Couldn't find argument '%s' in args %s to insert cmd options after" + % (insert_after_this_arg, str(ext_args)) + ) from err # end handle error - args_list = ext_args[:index + 1] + opt_args + ext_args[index + 1:] + args_list = ext_args[: index + 1] + opt_args + ext_args[index + 1 :] # end handle opts_kwargs call = [self.GIT_PYTHON_GIT_EXECUTABLE] @@ -1197,9 +1330,15 @@ class Git(LazyMixin): tokens = header_line.split() if len(tokens) != 3: if not tokens: - raise ValueError("SHA could not be resolved, git returned: %r" % (header_line.strip())) + raise ValueError( + "SHA could not be resolved, git returned: %r" + % (header_line.strip()) + ) else: - raise ValueError("SHA %s could not be resolved, git returned: %r" % (tokens[0], header_line.strip())) + raise ValueError( + "SHA %s could not be resolved, git returned: %r" + % (tokens[0], header_line.strip()) + ) # END handle actual return value # END error handling @@ -1211,9 +1350,9 @@ class Git(LazyMixin): # required for command to separate refs on stdin, as bytes if isinstance(ref, bytes): # Assume 40 bytes hexsha - bin-to-ascii for some reason returns bytes, not text - refstr: str = ref.decode('ascii') + refstr: str = ref.decode("ascii") elif not isinstance(ref, str): - refstr = str(ref) # could be ref-object + refstr = str(ref) # could be ref-object else: refstr = ref @@ -1221,8 +1360,9 @@ class Git(LazyMixin): refstr += "\n" return refstr.encode(defenc) - def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any - ) -> 'Git.AutoInterrupt': + def _get_persistent_cmd( + self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any + ) -> "Git.AutoInterrupt": cur_val = getattr(self, attr_name) if cur_val is not None: return cur_val @@ -1232,10 +1372,12 @@ class Git(LazyMixin): cmd = self._call_process(cmd_name, *args, **options) setattr(self, attr_name, cmd) - cmd = cast('Git.AutoInterrupt', cmd) + cmd = cast("Git.AutoInterrupt", cmd) return cmd - def __get_object_header(self, cmd: 'Git.AutoInterrupt', ref: AnyStr) -> Tuple[str, str, int]: + def __get_object_header( + self, cmd: "Git.AutoInterrupt", ref: AnyStr + ) -> Tuple[str, str, int]: if cmd.stdin and cmd.stdout: cmd.stdin.write(self._prepare_ref(ref)) cmd.stdin.flush() @@ -1244,7 +1386,7 @@ class Git(LazyMixin): raise ValueError("cmd stdin was empty") def get_object_header(self, ref: str) -> Tuple[str, str, int]: - """ Use this method to quickly examine the type and size of the object behind + """Use this method to quickly examine the type and size of the object behind the given ref. :note: The method will only suffer from the costs of command invocation @@ -1255,16 +1397,18 @@ class Git(LazyMixin): return self.__get_object_header(cmd, ref) def get_object_data(self, ref: str) -> Tuple[str, str, int, bytes]: - """ As get_object_header, but returns object data as well + """As get_object_header, but returns object data as well :return: (hexsha, type_string, size_as_int,data_string) :note: not threadsafe""" hexsha, typename, size, stream = self.stream_object_data(ref) data = stream.read(size) - del(stream) + del stream return (hexsha, typename, size, data) - def stream_object_data(self, ref: str) -> Tuple[str, str, int, 'Git.CatFileContentStream']: - """ As get_object_header, but returns the data as a stream + def stream_object_data( + self, ref: str + ) -> Tuple[str, str, int, "Git.CatFileContentStream"]: + """As get_object_header, but returns the data as a stream :return: (hexsha, type_string, size_as_int, stream) :note: This method is not threadsafe, you need one independent Command instance per thread to be safe !""" @@ -1273,7 +1417,7 @@ class Git(LazyMixin): cmd_stdout = cmd.stdout if cmd.stdout is not None else io.BytesIO() return (hexsha, typename, size, self.CatFileContentStream(size, cmd_stdout)) - def clear_cache(self) -> 'Git': + def clear_cache(self) -> "Git": """Clear all kinds of internal caches to release resources. Currently persistent commands will be interrupted. |