summaryrefslogtreecommitdiff
path: root/git/cmd.py
diff options
context:
space:
mode:
authorSebastian Thiel <sebastian.thiel@icloud.com>2022-05-18 07:43:53 +0800
committerSebastian Thiel <sebastian.thiel@icloud.com>2022-05-18 07:43:53 +0800
commit21ec529987d10e0010badd37f8da3274167d436f (patch)
treea3394cfe902ce7edd07c89420c21c13274a2d295 /git/cmd.py
parentb30720ee4d9762a03eae4fa7cfa4b0190d81784d (diff)
downloadgitpython-21ec529987d10e0010badd37f8da3274167d436f.tar.gz
Run everything through 'black'
That way people who use it won't be deterred, while it unifies style everywhere.
Diffstat (limited to 'git/cmd.py')
-rw-r--r--git/cmd.py592
1 files changed, 368 insertions, 224 deletions
diff --git a/git/cmd.py b/git/cmd.py
index 1ddf9e03..12409b0c 100644
--- a/git/cmd.py
+++ b/git/cmd.py
@@ -9,12 +9,7 @@ import io
import logging
import os
import signal
-from subprocess import (
- call,
- Popen,
- PIPE,
- DEVNULL
-)
+from subprocess import call, Popen, PIPE, DEVNULL
import subprocess
import threading
from textwrap import dedent
@@ -29,10 +24,7 @@ from git.compat import (
from git.exc import CommandError
from git.util import is_cygwin_git, cygpath, expand_path, remove_password_if_present
-from .exc import (
- GitCommandError,
- GitCommandNotFound
-)
+from .exc import GitCommandError, GitCommandNotFound
from .util import (
LazyMixin,
stream_copy,
@@ -40,8 +32,24 @@ from .util import (
# typing ---------------------------------------------------------------------------
-from typing import (Any, AnyStr, BinaryIO, Callable, Dict, IO, Iterator, List, Mapping,
- Sequence, TYPE_CHECKING, TextIO, Tuple, Union, cast, overload)
+from typing import (
+ Any,
+ AnyStr,
+ BinaryIO,
+ Callable,
+ Dict,
+ IO,
+ Iterator,
+ List,
+ Mapping,
+ Sequence,
+ TYPE_CHECKING,
+ TextIO,
+ Tuple,
+ Union,
+ cast,
+ overload,
+)
from git.types import PathLike, Literal, TBD
@@ -52,15 +60,26 @@ if TYPE_CHECKING:
# ---------------------------------------------------------------------------------
-execute_kwargs = {'istream', 'with_extended_output',
- 'with_exceptions', 'as_process', 'stdout_as_string',
- 'output_stream', 'with_stdout', 'kill_after_timeout',
- 'universal_newlines', 'shell', 'env', 'max_chunk_size', 'strip_newline_in_stdout'}
+execute_kwargs = {
+ "istream",
+ "with_extended_output",
+ "with_exceptions",
+ "as_process",
+ "stdout_as_string",
+ "output_stream",
+ "with_stdout",
+ "kill_after_timeout",
+ "universal_newlines",
+ "shell",
+ "env",
+ "max_chunk_size",
+ "strip_newline_in_stdout",
+}
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())
-__all__ = ('Git',)
+__all__ = ("Git",)
# ==============================================================================
@@ -69,18 +88,24 @@ __all__ = ('Git',)
# Documentation
## @{
-def handle_process_output(process: 'Git.AutoInterrupt' | Popen,
- stdout_handler: Union[None,
- Callable[[AnyStr], None],
- Callable[[List[AnyStr]], None],
- Callable[[bytes, 'Repo', 'DiffIndex'], None]],
- stderr_handler: Union[None,
- Callable[[AnyStr], None],
- Callable[[List[AnyStr]], None]],
- finalizer: Union[None,
- Callable[[Union[subprocess.Popen, 'Git.AutoInterrupt']], None]] = None,
- decode_streams: bool = True,
- kill_after_timeout: Union[None, float] = None) -> None:
+
+def handle_process_output(
+ process: "Git.AutoInterrupt" | Popen,
+ stdout_handler: Union[
+ None,
+ Callable[[AnyStr], None],
+ Callable[[List[AnyStr]], None],
+ Callable[[bytes, "Repo", "DiffIndex"], None],
+ ],
+ stderr_handler: Union[
+ None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None]
+ ],
+ finalizer: Union[
+ None, Callable[[Union[subprocess.Popen, "Git.AutoInterrupt"]], None]
+ ] = None,
+ decode_streams: bool = True,
+ kill_after_timeout: Union[None, float] = None,
+) -> None:
"""Registers for notifications to learn that process output is ready to read, and dispatches lines to
the respective line handlers.
This function returns once the finalizer returns
@@ -101,8 +126,13 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen,
should be killed.
"""
# Use 2 "pump" threads and wait for both to finish.
- def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], is_decode: bool,
- handler: Union[None, Callable[[Union[bytes, str]], None]]) -> None:
+ def pump_stream(
+ cmdline: List[str],
+ name: str,
+ stream: Union[BinaryIO, TextIO],
+ is_decode: bool,
+ handler: Union[None, Callable[[Union[bytes, str]], None]],
+ ) -> None:
try:
for line in stream:
if handler:
@@ -114,21 +144,25 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen,
handler(line)
except Exception as ex:
- log.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}")
+ log.error(
+ f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}"
+ )
if "I/O operation on closed file" not in str(ex):
# Only reraise if the error was not due to the stream closing
- raise CommandError([f'<{name}-pump>'] + remove_password_if_present(cmdline), ex) from ex
+ raise CommandError(
+ [f"<{name}-pump>"] + remove_password_if_present(cmdline), ex
+ ) from ex
finally:
stream.close()
- if hasattr(process, 'proc'):
- process = cast('Git.AutoInterrupt', process)
- cmdline: str | Tuple[str, ...] | List[str] = getattr(process.proc, 'args', '')
+ if hasattr(process, "proc"):
+ process = cast("Git.AutoInterrupt", process)
+ cmdline: str | Tuple[str, ...] | List[str] = getattr(process.proc, "args", "")
p_stdout = process.proc.stdout if process.proc else None
p_stderr = process.proc.stderr if process.proc else None
else:
process = cast(Popen, process)
- cmdline = getattr(process, 'args', '')
+ cmdline = getattr(process, "args", "")
p_stdout = process.stdout
p_stderr = process.stderr
@@ -137,15 +171,16 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen,
pumps: List[Tuple[str, IO, Callable[..., None] | None]] = []
if p_stdout:
- pumps.append(('stdout', p_stdout, stdout_handler))
+ pumps.append(("stdout", p_stdout, stdout_handler))
if p_stderr:
- pumps.append(('stderr', p_stderr, stderr_handler))
+ pumps.append(("stderr", p_stderr, stderr_handler))
threads: List[threading.Thread] = []
for name, stream, handler in pumps:
- t = threading.Thread(target=pump_stream,
- args=(cmdline, name, stream, decode_streams, handler))
+ t = threading.Thread(
+ target=pump_stream, args=(cmdline, name, stream, decode_streams, handler)
+ )
t.daemon = True
t.start()
threads.append(t)
@@ -158,12 +193,15 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen,
if isinstance(process, Git.AutoInterrupt):
process._terminate()
else: # Don't want to deal with the other case
- raise RuntimeError("Thread join() timed out in cmd.handle_process_output()."
- f" kill_after_timeout={kill_after_timeout} seconds")
+ raise RuntimeError(
+ "Thread join() timed out in cmd.handle_process_output()."
+ f" kill_after_timeout={kill_after_timeout} seconds"
+ )
if stderr_handler:
error_str: Union[str, bytes] = (
"error: process killed because it timed out."
- f" kill_after_timeout={kill_after_timeout} seconds")
+ f" kill_after_timeout={kill_after_timeout} seconds"
+ )
if not decode_streams and isinstance(p_stderr, BinaryIO):
# Assume stderr_handler needs binary input
error_str = cast(str, error_str)
@@ -179,19 +217,22 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen,
def dashify(string: str) -> str:
- return string.replace('_', '-')
+ return string.replace("_", "-")
def slots_to_dict(self: object, exclude: Sequence[str] = ()) -> Dict[str, Any]:
return {s: getattr(self, s) for s in self.__slots__ if s not in exclude}
-def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], excluded: Sequence[str] = ()) -> None:
+def dict_to_slots_and__excluded_are_none(
+ self: object, d: Mapping[str, Any], excluded: Sequence[str] = ()
+) -> None:
for k, v in d.items():
setattr(self, k, v)
for k in excluded:
setattr(self, k, None)
+
## -- End Utilities -- @}
@@ -200,8 +241,11 @@ CREATE_NO_WINDOW = 0x08000000
## CREATE_NEW_PROCESS_GROUP is needed to allow killing it afterwards,
# see https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal
-PROC_CREATIONFLAGS = (CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined]
- if is_win else 0) # mypy error if not windows
+PROC_CREATIONFLAGS = (
+ CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined]
+ if is_win
+ else 0
+) # mypy error if not windows
class Git(LazyMixin):
@@ -220,10 +264,18 @@ class Git(LazyMixin):
of the command to stdout.
Set its value to 'full' to see details about the returned values.
"""
- __slots__ = ("_working_dir", "cat_file_all", "cat_file_header", "_version_info",
- "_git_options", "_persistent_git_options", "_environment")
- _excluded_ = ('cat_file_all', 'cat_file_header', '_version_info')
+ __slots__ = (
+ "_working_dir",
+ "cat_file_all",
+ "cat_file_header",
+ "_version_info",
+ "_git_options",
+ "_persistent_git_options",
+ "_environment",
+ )
+
+ _excluded_ = ("cat_file_all", "cat_file_header", "_version_info")
def __getstate__(self) -> Dict[str, Any]:
return slots_to_dict(self, exclude=self._excluded_)
@@ -233,7 +285,7 @@ class Git(LazyMixin):
# CONFIGURATION
- git_exec_name = "git" # default that should work on linux and windows
+ git_exec_name = "git" # default that should work on linux and windows
# Enables debugging of GitPython's git commands
GIT_PYTHON_TRACE = os.environ.get("GIT_PYTHON_TRACE", False)
@@ -282,13 +334,18 @@ class Git(LazyMixin):
# warn or raise exception if test failed
if not has_git:
- err = dedent("""\
+ err = (
+ dedent(
+ """\
Bad git executable.
The git executable must be specified in one of the following ways:
- be included in your $PATH
- be set via $%s
- explicitly set via git.refresh()
- """) % cls._git_exec_env_var
+ """
+ )
+ % cls._git_exec_env_var
+ )
# revert to whatever the old_git was
cls.GIT_PYTHON_GIT_EXECUTABLE = old_git
@@ -314,7 +371,9 @@ class Git(LazyMixin):
if mode in quiet:
pass
elif mode in warn or mode in error:
- err = dedent("""\
+ err = (
+ dedent(
+ """\
%s
All git commands will error until this is rectified.
@@ -326,32 +385,42 @@ class Git(LazyMixin):
Example:
export %s=%s
- """) % (
- err,
- cls._refresh_env_var,
- "|".join(quiet),
- "|".join(warn),
- "|".join(error),
- cls._refresh_env_var,
- quiet[0])
+ """
+ )
+ % (
+ err,
+ cls._refresh_env_var,
+ "|".join(quiet),
+ "|".join(warn),
+ "|".join(error),
+ cls._refresh_env_var,
+ quiet[0],
+ )
+ )
if mode in warn:
print("WARNING: %s" % err)
else:
raise ImportError(err)
else:
- err = dedent("""\
+ err = (
+ dedent(
+ """\
%s environment variable has been set but it has been set with an invalid value.
Use only the following values:
- %s: for no warning or exception
- %s: for a printed warning
- %s: for a raised exception
- """) % (
- cls._refresh_env_var,
- "|".join(quiet),
- "|".join(warn),
- "|".join(error))
+ """
+ )
+ % (
+ cls._refresh_env_var,
+ "|".join(quiet),
+ "|".join(warn),
+ "|".join(error),
+ )
+ )
raise ImportError(err)
# we get here if this was the init refresh and the refresh mode
@@ -395,7 +464,7 @@ class Git(LazyMixin):
Hence we undo the escaping just to be sure.
"""
url = os.path.expandvars(url)
- if url.startswith('~'):
+ if url.startswith("~"):
url = os.path.expanduser(url)
url = url.replace("\\\\", "\\").replace("\\", "/")
return url
@@ -441,7 +510,7 @@ class Git(LazyMixin):
log.info("Ignored error after process had died: %r", ex)
# can be that nothing really exists anymore ...
- if os is None or getattr(os, 'kill', None) is None:
+ if os is None or getattr(os, "kill", None) is None:
return None
# try to kill it
@@ -458,7 +527,10 @@ class Git(LazyMixin):
# we simply use the shell and redirect to nul. Its slower than CreateProcess, question
# is whether we really want to see all these messages. Its annoying no matter what.
if is_win:
- call(("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)), shell=True)
+ call(
+ ("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)),
+ shell=True,
+ )
# END exception handling
def __del__(self) -> None:
@@ -468,15 +540,15 @@ class Git(LazyMixin):
return getattr(self.proc, attr)
# TODO: Bad choice to mimic `proc.wait()` but with different args.
- def wait(self, stderr: Union[None, str, bytes] = b'') -> int:
+ def wait(self, stderr: Union[None, str, bytes] = b"") -> int:
"""Wait for the process and return its status code.
:param stderr: Previously read value of stderr, in case stderr is already closed.
:warn: may deadlock if output or error pipes are used and not handled separately.
:raise GitCommandError: if the return status is not 0"""
if stderr is None:
- stderr_b = b''
- stderr_b = force_bytes(data=stderr, encoding='utf-8')
+ stderr_b = b""
+ stderr_b = force_bytes(data=stderr, encoding="utf-8")
status: Union[int, None]
if self.proc is not None:
status = self.proc.wait()
@@ -485,21 +557,25 @@ class Git(LazyMixin):
status = self.status
p_stderr = None
- def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes:
+ def read_all_from_possibly_closed_stream(
+ stream: Union[IO[bytes], None]
+ ) -> bytes:
if stream:
try:
return stderr_b + force_bytes(stream.read())
except ValueError:
- return stderr_b or b''
+ return stderr_b or b""
else:
- return stderr_b or b''
+ return stderr_b or b""
# END status handling
if status != 0:
errstr = read_all_from_possibly_closed_stream(p_stderr)
- log.debug('AutoInterrupt wait stderr: %r' % (errstr,))
- raise GitCommandError(remove_password_if_present(self.args), status, errstr)
+ log.debug("AutoInterrupt wait stderr: %r" % (errstr,))
+ raise GitCommandError(
+ remove_password_if_present(self.args), status, errstr
+ )
return status
# END auto interrupt
@@ -513,12 +589,12 @@ class Git(LazyMixin):
If not all data is read to the end of the objects's lifetime, we read the
rest to assure the underlying stream continues to work"""
- __slots__: Tuple[str, ...] = ('_stream', '_nbr', '_size')
+ __slots__: Tuple[str, ...] = ("_stream", "_nbr", "_size")
def __init__(self, size: int, stream: IO[bytes]) -> None:
self._stream = stream
self._size = size
- self._nbr = 0 # num bytes read
+ self._nbr = 0 # num bytes read
# special case: if the object is empty, has null bytes, get the
# final newline right away.
@@ -529,7 +605,7 @@ class Git(LazyMixin):
def read(self, size: int = -1) -> bytes:
bytes_left = self._size - self._nbr
if bytes_left == 0:
- return b''
+ return b""
if size > -1:
# assure we don't try to read past our limit
size = min(bytes_left, size)
@@ -542,13 +618,13 @@ class Git(LazyMixin):
# check for depletion, read our final byte to make the stream usable by others
if self._size - self._nbr == 0:
- self._stream.read(1) # final newline
+ self._stream.read(1) # final newline
# END finish reading
return data
def readline(self, size: int = -1) -> bytes:
if self._nbr == self._size:
- return b''
+ return b""
# clamp size to lowest allowed value
bytes_left = self._size - self._nbr
@@ -589,7 +665,7 @@ class Git(LazyMixin):
return out
# skipcq: PYL-E0301
- def __iter__(self) -> 'Git.CatFileContentStream':
+ def __iter__(self) -> "Git.CatFileContentStream":
return self
def __next__(self) -> bytes:
@@ -634,7 +710,7 @@ class Git(LazyMixin):
"""A convenience method as it allows to call the command as if it was
an object.
:return: Callable object that will execute call _call_process with your arguments."""
- if name[0] == '_':
+ if name[0] == "_":
return LazyMixin.__getattr__(self, name)
return lambda *args, **kwargs: self._call_process(name, *args, **kwargs)
@@ -650,27 +726,31 @@ class Git(LazyMixin):
"""
self._persistent_git_options = self.transform_kwargs(
- split_single_char_options=True, **kwargs)
+ split_single_char_options=True, **kwargs
+ )
def _set_cache_(self, attr: str) -> None:
- if attr == '_version_info':
+ if attr == "_version_info":
# We only use the first 4 numbers, as everything else could be strings in fact (on windows)
- process_version = self._call_process('version') # should be as default *args and **kwargs used
- version_numbers = process_version.split(' ')[2]
-
- self._version_info = cast(Tuple[int, int, int, int],
- tuple(int(n) for n in version_numbers.split('.')[:4] if n.isdigit())
- )
+ process_version = self._call_process(
+ "version"
+ ) # should be as default *args and **kwargs used
+ version_numbers = process_version.split(" ")[2]
+
+ self._version_info = cast(
+ Tuple[int, int, int, int],
+ tuple(int(n) for n in version_numbers.split(".")[:4] if n.isdigit()),
+ )
else:
super(Git, self)._set_cache_(attr)
# END handle version info
- @ property
+ @property
def working_dir(self) -> Union[None, PathLike]:
""":return: Git directory we are working on"""
return self._working_dir
- @ property
+ @property
def version_info(self) -> Tuple[int, int, int, int]:
"""
:return: tuple(int, int, int, int) tuple with integers representing the major, minor
@@ -678,69 +758,72 @@ class Git(LazyMixin):
This value is generated on demand and is cached"""
return self._version_info
- @ overload
- def execute(self,
- command: Union[str, Sequence[Any]],
- *,
- as_process: Literal[True]
- ) -> 'AutoInterrupt':
+ @overload
+ def execute(
+ self, command: Union[str, Sequence[Any]], *, as_process: Literal[True]
+ ) -> "AutoInterrupt":
...
- @ overload
- def execute(self,
- command: Union[str, Sequence[Any]],
- *,
- as_process: Literal[False] = False,
- stdout_as_string: Literal[True]
- ) -> Union[str, Tuple[int, str, str]]:
+ @overload
+ def execute(
+ self,
+ command: Union[str, Sequence[Any]],
+ *,
+ as_process: Literal[False] = False,
+ stdout_as_string: Literal[True],
+ ) -> Union[str, Tuple[int, str, str]]:
...
- @ overload
- def execute(self,
- command: Union[str, Sequence[Any]],
- *,
- as_process: Literal[False] = False,
- stdout_as_string: Literal[False] = False
- ) -> Union[bytes, Tuple[int, bytes, str]]:
+ @overload
+ def execute(
+ self,
+ command: Union[str, Sequence[Any]],
+ *,
+ as_process: Literal[False] = False,
+ stdout_as_string: Literal[False] = False,
+ ) -> Union[bytes, Tuple[int, bytes, str]]:
...
- @ overload
- def execute(self,
- command: Union[str, Sequence[Any]],
- *,
- with_extended_output: Literal[False],
- as_process: Literal[False],
- stdout_as_string: Literal[True]
- ) -> str:
+ @overload
+ def execute(
+ self,
+ command: Union[str, Sequence[Any]],
+ *,
+ with_extended_output: Literal[False],
+ as_process: Literal[False],
+ stdout_as_string: Literal[True],
+ ) -> str:
...
- @ overload
- def execute(self,
- command: Union[str, Sequence[Any]],
- *,
- with_extended_output: Literal[False],
- as_process: Literal[False],
- stdout_as_string: Literal[False]
- ) -> bytes:
+ @overload
+ def execute(
+ self,
+ command: Union[str, Sequence[Any]],
+ *,
+ with_extended_output: Literal[False],
+ as_process: Literal[False],
+ stdout_as_string: Literal[False],
+ ) -> bytes:
...
- def execute(self,
- command: Union[str, Sequence[Any]],
- istream: Union[None, BinaryIO] = None,
- with_extended_output: bool = False,
- with_exceptions: bool = True,
- as_process: bool = False,
- output_stream: Union[None, BinaryIO] = None,
- stdout_as_string: bool = True,
- kill_after_timeout: Union[None, float] = None,
- with_stdout: bool = True,
- universal_newlines: bool = False,
- shell: Union[None, bool] = None,
- env: Union[None, Mapping[str, str]] = None,
- max_chunk_size: int = io.DEFAULT_BUFFER_SIZE,
- strip_newline_in_stdout: bool = True,
- **subprocess_kwargs: Any
- ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]:
+ def execute(
+ self,
+ command: Union[str, Sequence[Any]],
+ istream: Union[None, BinaryIO] = None,
+ with_extended_output: bool = False,
+ with_exceptions: bool = True,
+ as_process: bool = False,
+ output_stream: Union[None, BinaryIO] = None,
+ stdout_as_string: bool = True,
+ kill_after_timeout: Union[None, float] = None,
+ with_stdout: bool = True,
+ universal_newlines: bool = False,
+ shell: Union[None, bool] = None,
+ env: Union[None, Mapping[str, str]] = None,
+ max_chunk_size: int = io.DEFAULT_BUFFER_SIZE,
+ strip_newline_in_stdout: bool = True,
+ **subprocess_kwargs: Any,
+ ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]:
"""Handles executing the command on the shell and consumes and returns
the returned information (stdout)
@@ -831,8 +914,8 @@ class Git(LazyMixin):
you must update the execute_kwargs tuple housed in this module."""
# Remove password for the command if present
redacted_command = remove_password_if_present(command)
- if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != 'full' or as_process):
- log.info(' '.join(redacted_command))
+ if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != "full" or as_process):
+ log.info(" ".join(redacted_command))
# Allow the user to have the command executed in their working dir.
try:
@@ -858,33 +941,47 @@ class Git(LazyMixin):
if is_win:
cmd_not_found_exception = OSError
if kill_after_timeout is not None:
- raise GitCommandError(redacted_command, '"kill_after_timeout" feature is not supported on Windows.')
+ raise GitCommandError(
+ redacted_command,
+ '"kill_after_timeout" feature is not supported on Windows.',
+ )
else:
- cmd_not_found_exception = FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable
+ cmd_not_found_exception = (
+ FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable
+ )
# end handle
- stdout_sink = (PIPE
- if with_stdout
- else getattr(subprocess, 'DEVNULL', None) or open(os.devnull, 'wb'))
+ stdout_sink = (
+ PIPE
+ if with_stdout
+ else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb")
+ )
istream_ok = "None"
if istream:
istream_ok = "<valid stream>"
- log.debug("Popen(%s, cwd=%s, universal_newlines=%s, shell=%s, istream=%s)",
- redacted_command, cwd, universal_newlines, shell, istream_ok)
+ log.debug(
+ "Popen(%s, cwd=%s, universal_newlines=%s, shell=%s, istream=%s)",
+ redacted_command,
+ cwd,
+ universal_newlines,
+ shell,
+ istream_ok,
+ )
try:
- proc = Popen(command,
- env=env,
- cwd=cwd,
- bufsize=-1,
- stdin=istream or DEVNULL,
- stderr=PIPE,
- stdout=stdout_sink,
- shell=shell is not None and shell or self.USE_SHELL,
- close_fds=is_posix, # unsupported on windows
- universal_newlines=universal_newlines,
- creationflags=PROC_CREATIONFLAGS,
- **subprocess_kwargs
- )
+ proc = Popen(
+ command,
+ env=env,
+ cwd=cwd,
+ bufsize=-1,
+ stdin=istream or DEVNULL,
+ stderr=PIPE,
+ stdout=stdout_sink,
+ shell=shell is not None and shell or self.USE_SHELL,
+ close_fds=is_posix, # unsupported on windows
+ universal_newlines=universal_newlines,
+ creationflags=PROC_CREATIONFLAGS,
+ **subprocess_kwargs,
+ )
except cmd_not_found_exception as err:
raise GitCommandNotFound(redacted_command, err) from err
@@ -897,9 +994,12 @@ class Git(LazyMixin):
return self.AutoInterrupt(proc, command)
def _kill_process(pid: int) -> None:
- """ Callback method to kill a process. """
- p = Popen(['ps', '--ppid', str(pid)], stdout=PIPE,
- creationflags=PROC_CREATIONFLAGS)
+ """Callback method to kill a process."""
+ p = Popen(
+ ["ps", "--ppid", str(pid)],
+ stdout=PIPE,
+ creationflags=PROC_CREATIONFLAGS,
+ )
child_pids = []
if p.stdout is not None:
for line in p.stdout:
@@ -909,29 +1009,32 @@ class Git(LazyMixin):
child_pids.append(int(local_pid))
try:
# Windows does not have SIGKILL, so use SIGTERM instead
- sig = getattr(signal, 'SIGKILL', signal.SIGTERM)
+ sig = getattr(signal, "SIGKILL", signal.SIGTERM)
os.kill(pid, sig)
for child_pid in child_pids:
try:
os.kill(child_pid, sig)
except OSError:
pass
- kill_check.set() # tell the main routine that the process was killed
+ kill_check.set() # tell the main routine that the process was killed
except OSError:
# It is possible that the process gets completed in the duration after timeout
# happens and before we try to kill the process.
pass
return
+
# end
if kill_after_timeout is not None:
kill_check = threading.Event()
- watchdog = threading.Timer(kill_after_timeout, _kill_process, args=(proc.pid,))
+ watchdog = threading.Timer(
+ kill_after_timeout, _kill_process, args=(proc.pid,)
+ )
# Wait for the process to return
status = 0
- stdout_value: Union[str, bytes] = b''
- stderr_value: Union[str, bytes] = b''
+ stdout_value: Union[str, bytes] = b""
+ stderr_value: Union[str, bytes] = b""
newline = "\n" if universal_newlines else b"\n"
try:
if output_stream is None:
@@ -941,8 +1044,10 @@ class Git(LazyMixin):
if kill_after_timeout is not None:
watchdog.cancel()
if kill_check.is_set():
- stderr_value = ('Timeout: the command "%s" did not complete in %d '
- 'secs.' % (" ".join(redacted_command), kill_after_timeout))
+ stderr_value = (
+ 'Timeout: the command "%s" did not complete in %d '
+ "secs." % (" ".join(redacted_command), kill_after_timeout)
+ )
if not universal_newlines:
stderr_value = stderr_value.encode(defenc)
# strip trailing "\n"
@@ -953,12 +1058,16 @@ class Git(LazyMixin):
status = proc.returncode
else:
- max_chunk_size = max_chunk_size if max_chunk_size and max_chunk_size > 0 else io.DEFAULT_BUFFER_SIZE
+ max_chunk_size = (
+ max_chunk_size
+ if max_chunk_size and max_chunk_size > 0
+ else io.DEFAULT_BUFFER_SIZE
+ )
stream_copy(proc.stdout, output_stream, max_chunk_size)
stdout_value = proc.stdout.read()
stderr_value = proc.stderr.read()
# strip trailing "\n"
- if stderr_value.endswith(newline): # type: ignore
+ if stderr_value.endswith(newline): # type: ignore
stderr_value = stderr_value[:-1]
status = proc.wait()
# END stdout handling
@@ -966,18 +1075,28 @@ class Git(LazyMixin):
proc.stdout.close()
proc.stderr.close()
- if self.GIT_PYTHON_TRACE == 'full':
+ if self.GIT_PYTHON_TRACE == "full":
cmdstr = " ".join(redacted_command)
def as_text(stdout_value: Union[bytes, str]) -> str:
- return not output_stream and safe_decode(stdout_value) or '<OUTPUT_STREAM>'
+ return (
+ not output_stream and safe_decode(stdout_value) or "<OUTPUT_STREAM>"
+ )
+
# end
if stderr_value:
- log.info("%s -> %d; stdout: '%s'; stderr: '%s'",
- cmdstr, status, as_text(stdout_value), safe_decode(stderr_value))
+ log.info(
+ "%s -> %d; stdout: '%s'; stderr: '%s'",
+ cmdstr,
+ status,
+ as_text(stdout_value),
+ safe_decode(stderr_value),
+ )
elif stdout_value:
- log.info("%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value))
+ log.info(
+ "%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value)
+ )
else:
log.info("%s -> %d", cmdstr, status)
# END handle debug printing
@@ -985,7 +1104,9 @@ class Git(LazyMixin):
if with_exceptions and status != 0:
raise GitCommandError(redacted_command, status, stderr_value, stdout_value)
- if isinstance(stdout_value, bytes) and stdout_as_string: # could also be output_stream
+ if (
+ isinstance(stdout_value, bytes) and stdout_as_string
+ ): # could also be output_stream
stdout_value = safe_decode(stdout_value)
# Allow access to the command's status code
@@ -1042,7 +1163,9 @@ class Git(LazyMixin):
finally:
self.update_environment(**old_env)
- def transform_kwarg(self, name: str, value: Any, split_single_char_options: bool) -> List[str]:
+ def transform_kwarg(
+ self, name: str, value: Any, split_single_char_options: bool
+ ) -> List[str]:
if len(name) == 1:
if value is True:
return ["-%s" % name]
@@ -1058,7 +1181,9 @@ class Git(LazyMixin):
return ["--%s=%s" % (dashify(name), value)]
return []
- def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any) -> List[str]:
+ def transform_kwargs(
+ self, split_single_char_options: bool = True, **kwargs: Any
+ ) -> List[str]:
"""Transforms Python style kwargs into git command line options."""
args = []
for k, v in kwargs.items():
@@ -1081,7 +1206,7 @@ class Git(LazyMixin):
return outlist
- def __call__(self, **kwargs: Any) -> 'Git':
+ def __call__(self, **kwargs: Any) -> "Git":
"""Specify command line options to the git executable
for a subcommand call
@@ -1094,28 +1219,34 @@ class Git(LazyMixin):
``Examples``::
git(work_tree='/tmp').difftool()"""
self._git_options = self.transform_kwargs(
- split_single_char_options=True, **kwargs)
+ split_single_char_options=True, **kwargs
+ )
return self
@overload
- def _call_process(self, method: str, *args: None, **kwargs: None
- ) -> str:
+ def _call_process(self, method: str, *args: None, **kwargs: None) -> str:
... # if no args given, execute called with all defaults
@overload
- def _call_process(self, method: str,
- istream: int,
- as_process: Literal[True],
- *args: Any, **kwargs: Any
- ) -> 'Git.AutoInterrupt': ...
+ def _call_process(
+ self,
+ method: str,
+ istream: int,
+ as_process: Literal[True],
+ *args: Any,
+ **kwargs: Any,
+ ) -> "Git.AutoInterrupt":
+ ...
@overload
- def _call_process(self, method: str, *args: Any, **kwargs: Any
- ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], 'Git.AutoInterrupt']:
+ def _call_process(
+ self, method: str, *args: Any, **kwargs: Any
+ ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]:
...
- def _call_process(self, method: str, *args: Any, **kwargs: Any
- ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], 'Git.AutoInterrupt']:
+ def _call_process(
+ self, method: str, *args: Any, **kwargs: Any
+ ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]:
"""Run the given git command with the specified arguments and return
the result as a String
@@ -1145,13 +1276,13 @@ class Git(LazyMixin):
:return: Same as ``execute``
if no args given used execute default (esp. as_process = False, stdout_as_string = True)
- and return str """
+ and return str"""
# Handle optional arguments prior to calling transform_kwargs
# otherwise these'll end up in args, which is bad.
exec_kwargs = {k: v for k, v in kwargs.items() if k in execute_kwargs}
opts_kwargs = {k: v for k, v in kwargs.items() if k not in execute_kwargs}
- insert_after_this_arg = opts_kwargs.pop('insert_kwargs_after', None)
+ insert_after_this_arg = opts_kwargs.pop("insert_kwargs_after", None)
# Prepare the argument list
@@ -1164,10 +1295,12 @@ class Git(LazyMixin):
try:
index = ext_args.index(insert_after_this_arg)
except ValueError as err:
- raise ValueError("Couldn't find argument '%s' in args %s to insert cmd options after"
- % (insert_after_this_arg, str(ext_args))) from err
+ raise ValueError(
+ "Couldn't find argument '%s' in args %s to insert cmd options after"
+ % (insert_after_this_arg, str(ext_args))
+ ) from err
# end handle error
- args_list = ext_args[:index + 1] + opt_args + ext_args[index + 1:]
+ args_list = ext_args[: index + 1] + opt_args + ext_args[index + 1 :]
# end handle opts_kwargs
call = [self.GIT_PYTHON_GIT_EXECUTABLE]
@@ -1197,9 +1330,15 @@ class Git(LazyMixin):
tokens = header_line.split()
if len(tokens) != 3:
if not tokens:
- raise ValueError("SHA could not be resolved, git returned: %r" % (header_line.strip()))
+ raise ValueError(
+ "SHA could not be resolved, git returned: %r"
+ % (header_line.strip())
+ )
else:
- raise ValueError("SHA %s could not be resolved, git returned: %r" % (tokens[0], header_line.strip()))
+ raise ValueError(
+ "SHA %s could not be resolved, git returned: %r"
+ % (tokens[0], header_line.strip())
+ )
# END handle actual return value
# END error handling
@@ -1211,9 +1350,9 @@ class Git(LazyMixin):
# required for command to separate refs on stdin, as bytes
if isinstance(ref, bytes):
# Assume 40 bytes hexsha - bin-to-ascii for some reason returns bytes, not text
- refstr: str = ref.decode('ascii')
+ refstr: str = ref.decode("ascii")
elif not isinstance(ref, str):
- refstr = str(ref) # could be ref-object
+ refstr = str(ref) # could be ref-object
else:
refstr = ref
@@ -1221,8 +1360,9 @@ class Git(LazyMixin):
refstr += "\n"
return refstr.encode(defenc)
- def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any
- ) -> 'Git.AutoInterrupt':
+ def _get_persistent_cmd(
+ self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any
+ ) -> "Git.AutoInterrupt":
cur_val = getattr(self, attr_name)
if cur_val is not None:
return cur_val
@@ -1232,10 +1372,12 @@ class Git(LazyMixin):
cmd = self._call_process(cmd_name, *args, **options)
setattr(self, attr_name, cmd)
- cmd = cast('Git.AutoInterrupt', cmd)
+ cmd = cast("Git.AutoInterrupt", cmd)
return cmd
- def __get_object_header(self, cmd: 'Git.AutoInterrupt', ref: AnyStr) -> Tuple[str, str, int]:
+ def __get_object_header(
+ self, cmd: "Git.AutoInterrupt", ref: AnyStr
+ ) -> Tuple[str, str, int]:
if cmd.stdin and cmd.stdout:
cmd.stdin.write(self._prepare_ref(ref))
cmd.stdin.flush()
@@ -1244,7 +1386,7 @@ class Git(LazyMixin):
raise ValueError("cmd stdin was empty")
def get_object_header(self, ref: str) -> Tuple[str, str, int]:
- """ Use this method to quickly examine the type and size of the object behind
+ """Use this method to quickly examine the type and size of the object behind
the given ref.
:note: The method will only suffer from the costs of command invocation
@@ -1255,16 +1397,18 @@ class Git(LazyMixin):
return self.__get_object_header(cmd, ref)
def get_object_data(self, ref: str) -> Tuple[str, str, int, bytes]:
- """ As get_object_header, but returns object data as well
+ """As get_object_header, but returns object data as well
:return: (hexsha, type_string, size_as_int,data_string)
:note: not threadsafe"""
hexsha, typename, size, stream = self.stream_object_data(ref)
data = stream.read(size)
- del(stream)
+ del stream
return (hexsha, typename, size, data)
- def stream_object_data(self, ref: str) -> Tuple[str, str, int, 'Git.CatFileContentStream']:
- """ As get_object_header, but returns the data as a stream
+ def stream_object_data(
+ self, ref: str
+ ) -> Tuple[str, str, int, "Git.CatFileContentStream"]:
+ """As get_object_header, but returns the data as a stream
:return: (hexsha, type_string, size_as_int, stream)
:note: This method is not threadsafe, you need one independent Command instance per thread to be safe !"""
@@ -1273,7 +1417,7 @@ class Git(LazyMixin):
cmd_stdout = cmd.stdout if cmd.stdout is not None else io.BytesIO()
return (hexsha, typename, size, self.CatFileContentStream(size, cmd_stdout))
- def clear_cache(self) -> 'Git':
+ def clear_cache(self) -> "Git":
"""Clear all kinds of internal caches to release resources.
Currently persistent commands will be interrupted.