summaryrefslogtreecommitdiff
path: root/git
diff options
context:
space:
mode:
Diffstat (limited to 'git')
-rw-r--r--git/__init__.py6
-rw-r--r--git/cmd.py123
-rw-r--r--git/config.py97
-rw-r--r--git/db.py4
-rw-r--r--git/diff.py65
-rw-r--r--git/exc.py4
-rw-r--r--git/index/base.py108
-rw-r--r--git/index/fun.py53
-rw-r--r--git/index/typ.py4
-rw-r--r--git/index/util.py7
-rw-r--r--git/objects/__init__.py6
-rw-r--r--git/objects/commit.py46
-rw-r--r--git/objects/fun.py12
-rw-r--r--git/objects/submodule/base.py190
-rw-r--r--git/objects/submodule/root.py44
-rw-r--r--git/objects/submodule/util.py8
-rw-r--r--git/objects/tree.py38
-rw-r--r--git/objects/util.py76
-rw-r--r--git/refs/head.py38
-rw-r--r--git/refs/log.py17
-rw-r--r--git/refs/reference.py8
-rw-r--r--git/refs/remote.py6
-rw-r--r--git/refs/symbolic.py71
-rw-r--r--git/refs/tag.py2
-rw-r--r--git/remote.py112
-rw-r--r--git/repo/base.py85
-rw-r--r--git/repo/fun.py35
-rw-r--r--git/types.py12
-rw-r--r--git/util.py87
29 files changed, 333 insertions, 1031 deletions
diff --git a/git/__init__.py b/git/__init__.py
index 3f26886f..f746e1fc 100644
--- a/git/__init__.py
+++ b/git/__init__.py
@@ -61,11 +61,7 @@ except GitError as exc:
# } END imports
-__all__ = [
- name
- for name, obj in locals().items()
- if not (name.startswith("_") or inspect.ismodule(obj))
-]
+__all__ = [name for name, obj in locals().items() if not (name.startswith("_") or inspect.ismodule(obj))]
# { Initialize git executable path
diff --git a/git/cmd.py b/git/cmd.py
index 12409b0c..0d291367 100644
--- a/git/cmd.py
+++ b/git/cmd.py
@@ -97,12 +97,8 @@ def handle_process_output(
Callable[[List[AnyStr]], None],
Callable[[bytes, "Repo", "DiffIndex"], None],
],
- stderr_handler: Union[
- None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None]
- ],
- finalizer: Union[
- None, Callable[[Union[subprocess.Popen, "Git.AutoInterrupt"]], None]
- ] = None,
+ stderr_handler: Union[None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None]],
+ finalizer: Union[None, Callable[[Union[subprocess.Popen, "Git.AutoInterrupt"]], None]] = None,
decode_streams: bool = True,
kill_after_timeout: Union[None, float] = None,
) -> None:
@@ -144,14 +140,10 @@ def handle_process_output(
handler(line)
except Exception as ex:
- log.error(
- f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}"
- )
+ log.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}")
if "I/O operation on closed file" not in str(ex):
# Only reraise if the error was not due to the stream closing
- raise CommandError(
- [f"<{name}-pump>"] + remove_password_if_present(cmdline), ex
- ) from ex
+ raise CommandError([f"<{name}-pump>"] + remove_password_if_present(cmdline), ex) from ex
finally:
stream.close()
@@ -178,9 +170,7 @@ def handle_process_output(
threads: List[threading.Thread] = []
for name, stream, handler in pumps:
- t = threading.Thread(
- target=pump_stream, args=(cmdline, name, stream, decode_streams, handler)
- )
+ t = threading.Thread(target=pump_stream, args=(cmdline, name, stream, decode_streams, handler))
t.daemon = True
t.start()
threads.append(t)
@@ -199,8 +189,7 @@ def handle_process_output(
)
if stderr_handler:
error_str: Union[str, bytes] = (
- "error: process killed because it timed out."
- f" kill_after_timeout={kill_after_timeout} seconds"
+ "error: process killed because it timed out." f" kill_after_timeout={kill_after_timeout} seconds"
)
if not decode_streams and isinstance(p_stderr, BinaryIO):
# Assume stderr_handler needs binary input
@@ -224,9 +213,7 @@ def slots_to_dict(self: object, exclude: Sequence[str] = ()) -> Dict[str, Any]:
return {s: getattr(self, s) for s in self.__slots__ if s not in exclude}
-def dict_to_slots_and__excluded_are_none(
- self: object, d: Mapping[str, Any], excluded: Sequence[str] = ()
-) -> None:
+def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], excluded: Sequence[str] = ()) -> None:
for k, v in d.items():
setattr(self, k, v)
for k in excluded:
@@ -242,9 +229,7 @@ CREATE_NO_WINDOW = 0x08000000
## CREATE_NEW_PROCESS_GROUP is needed to allow killing it afterwards,
# see https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal
PROC_CREATIONFLAGS = (
- CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined]
- if is_win
- else 0
+ CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP if is_win else 0 # type: ignore[attr-defined]
) # mypy error if not windows
@@ -557,9 +542,7 @@ class Git(LazyMixin):
status = self.status
p_stderr = None
- def read_all_from_possibly_closed_stream(
- stream: Union[IO[bytes], None]
- ) -> bytes:
+ def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes:
if stream:
try:
return stderr_b + force_bytes(stream.read())
@@ -573,9 +556,7 @@ class Git(LazyMixin):
if status != 0:
errstr = read_all_from_possibly_closed_stream(p_stderr)
log.debug("AutoInterrupt wait stderr: %r" % (errstr,))
- raise GitCommandError(
- remove_password_if_present(self.args), status, errstr
- )
+ raise GitCommandError(remove_password_if_present(self.args), status, errstr)
return status
# END auto interrupt
@@ -725,16 +706,12 @@ class Git(LazyMixin):
the subcommand.
"""
- self._persistent_git_options = self.transform_kwargs(
- split_single_char_options=True, **kwargs
- )
+ self._persistent_git_options = self.transform_kwargs(split_single_char_options=True, **kwargs)
def _set_cache_(self, attr: str) -> None:
if attr == "_version_info":
# We only use the first 4 numbers, as everything else could be strings in fact (on windows)
- process_version = self._call_process(
- "version"
- ) # should be as default *args and **kwargs used
+ process_version = self._call_process("version") # should be as default *args and **kwargs used
version_numbers = process_version.split(" ")[2]
self._version_info = cast(
@@ -759,9 +736,7 @@ class Git(LazyMixin):
return self._version_info
@overload
- def execute(
- self, command: Union[str, Sequence[Any]], *, as_process: Literal[True]
- ) -> "AutoInterrupt":
+ def execute(self, command: Union[str, Sequence[Any]], *, as_process: Literal[True]) -> "AutoInterrupt":
...
@overload
@@ -946,16 +921,10 @@ class Git(LazyMixin):
'"kill_after_timeout" feature is not supported on Windows.',
)
else:
- cmd_not_found_exception = (
- FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable
- )
+ cmd_not_found_exception = FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable
# end handle
- stdout_sink = (
- PIPE
- if with_stdout
- else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb")
- )
+ stdout_sink = PIPE if with_stdout else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb")
istream_ok = "None"
if istream:
istream_ok = "<valid stream>"
@@ -1027,9 +996,7 @@ class Git(LazyMixin):
if kill_after_timeout is not None:
kill_check = threading.Event()
- watchdog = threading.Timer(
- kill_after_timeout, _kill_process, args=(proc.pid,)
- )
+ watchdog = threading.Timer(kill_after_timeout, _kill_process, args=(proc.pid,))
# Wait for the process to return
status = 0
@@ -1044,9 +1011,9 @@ class Git(LazyMixin):
if kill_after_timeout is not None:
watchdog.cancel()
if kill_check.is_set():
- stderr_value = (
- 'Timeout: the command "%s" did not complete in %d '
- "secs." % (" ".join(redacted_command), kill_after_timeout)
+ stderr_value = 'Timeout: the command "%s" did not complete in %d ' "secs." % (
+ " ".join(redacted_command),
+ kill_after_timeout,
)
if not universal_newlines:
stderr_value = stderr_value.encode(defenc)
@@ -1058,11 +1025,7 @@ class Git(LazyMixin):
status = proc.returncode
else:
- max_chunk_size = (
- max_chunk_size
- if max_chunk_size and max_chunk_size > 0
- else io.DEFAULT_BUFFER_SIZE
- )
+ max_chunk_size = max_chunk_size if max_chunk_size and max_chunk_size > 0 else io.DEFAULT_BUFFER_SIZE
stream_copy(proc.stdout, output_stream, max_chunk_size)
stdout_value = proc.stdout.read()
stderr_value = proc.stderr.read()
@@ -1079,9 +1042,7 @@ class Git(LazyMixin):
cmdstr = " ".join(redacted_command)
def as_text(stdout_value: Union[bytes, str]) -> str:
- return (
- not output_stream and safe_decode(stdout_value) or "<OUTPUT_STREAM>"
- )
+ return not output_stream and safe_decode(stdout_value) or "<OUTPUT_STREAM>"
# end
@@ -1094,9 +1055,7 @@ class Git(LazyMixin):
safe_decode(stderr_value),
)
elif stdout_value:
- log.info(
- "%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value)
- )
+ log.info("%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value))
else:
log.info("%s -> %d", cmdstr, status)
# END handle debug printing
@@ -1104,9 +1063,7 @@ class Git(LazyMixin):
if with_exceptions and status != 0:
raise GitCommandError(redacted_command, status, stderr_value, stdout_value)
- if (
- isinstance(stdout_value, bytes) and stdout_as_string
- ): # could also be output_stream
+ if isinstance(stdout_value, bytes) and stdout_as_string: # could also be output_stream
stdout_value = safe_decode(stdout_value)
# Allow access to the command's status code
@@ -1163,9 +1120,7 @@ class Git(LazyMixin):
finally:
self.update_environment(**old_env)
- def transform_kwarg(
- self, name: str, value: Any, split_single_char_options: bool
- ) -> List[str]:
+ def transform_kwarg(self, name: str, value: Any, split_single_char_options: bool) -> List[str]:
if len(name) == 1:
if value is True:
return ["-%s" % name]
@@ -1181,9 +1136,7 @@ class Git(LazyMixin):
return ["--%s=%s" % (dashify(name), value)]
return []
- def transform_kwargs(
- self, split_single_char_options: bool = True, **kwargs: Any
- ) -> List[str]:
+ def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any) -> List[str]:
"""Transforms Python style kwargs into git command line options."""
args = []
for k, v in kwargs.items():
@@ -1218,9 +1171,7 @@ class Git(LazyMixin):
``Examples``::
git(work_tree='/tmp').difftool()"""
- self._git_options = self.transform_kwargs(
- split_single_char_options=True, **kwargs
- )
+ self._git_options = self.transform_kwargs(split_single_char_options=True, **kwargs)
return self
@overload
@@ -1330,15 +1281,9 @@ class Git(LazyMixin):
tokens = header_line.split()
if len(tokens) != 3:
if not tokens:
- raise ValueError(
- "SHA could not be resolved, git returned: %r"
- % (header_line.strip())
- )
+ raise ValueError("SHA could not be resolved, git returned: %r" % (header_line.strip()))
else:
- raise ValueError(
- "SHA %s could not be resolved, git returned: %r"
- % (tokens[0], header_line.strip())
- )
+ raise ValueError("SHA %s could not be resolved, git returned: %r" % (tokens[0], header_line.strip()))
# END handle actual return value
# END error handling
@@ -1360,9 +1305,7 @@ class Git(LazyMixin):
refstr += "\n"
return refstr.encode(defenc)
- def _get_persistent_cmd(
- self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any
- ) -> "Git.AutoInterrupt":
+ def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any) -> "Git.AutoInterrupt":
cur_val = getattr(self, attr_name)
if cur_val is not None:
return cur_val
@@ -1375,9 +1318,7 @@ class Git(LazyMixin):
cmd = cast("Git.AutoInterrupt", cmd)
return cmd
- def __get_object_header(
- self, cmd: "Git.AutoInterrupt", ref: AnyStr
- ) -> Tuple[str, str, int]:
+ def __get_object_header(self, cmd: "Git.AutoInterrupt", ref: AnyStr) -> Tuple[str, str, int]:
if cmd.stdin and cmd.stdout:
cmd.stdin.write(self._prepare_ref(ref))
cmd.stdin.flush()
@@ -1405,9 +1346,7 @@ class Git(LazyMixin):
del stream
return (hexsha, typename, size, data)
- def stream_object_data(
- self, ref: str
- ) -> Tuple[str, str, int, "Git.CatFileContentStream"]:
+ def stream_object_data(self, ref: str) -> Tuple[str, str, int, "Git.CatFileContentStream"]:
"""As get_object_header, but returns the data as a stream
:return: (hexsha, type_string, size_as_int, stream)
diff --git a/git/config.py b/git/config.py
index 24c2b201..5f07cb00 100644
--- a/git/config.py
+++ b/git/config.py
@@ -81,17 +81,13 @@ CONFIG_LEVELS: ConfigLevels_Tup = ("system", "user", "global", "repository")
# Section pattern to detect conditional includes.
# https://git-scm.com/docs/git-config#_conditional_includes
-CONDITIONAL_INCLUDE_REGEXP = re.compile(
- r"(?<=includeIf )\"(gitdir|gitdir/i|onbranch):(.+)\""
-)
+CONDITIONAL_INCLUDE_REGEXP = re.compile(r"(?<=includeIf )\"(gitdir|gitdir/i|onbranch):(.+)\"")
class MetaParserBuilder(abc.ABCMeta):
"""Utility class wrapping base-class methods into decorators that assure read-only properties"""
- def __new__(
- cls, name: str, bases: Tuple, clsdict: Dict[str, Any]
- ) -> "MetaParserBuilder":
+ def __new__(cls, name: str, bases: Tuple, clsdict: Dict[str, Any]) -> "MetaParserBuilder":
"""
Equip all base-class methods with a needs_values decorator, and all non-const methods
with a set_dirty_and_flush_changes decorator in addition to that."""
@@ -99,19 +95,13 @@ class MetaParserBuilder(abc.ABCMeta):
if kmm in clsdict:
mutating_methods = clsdict[kmm]
for base in bases:
- methods = (
- t
- for t in inspect.getmembers(base, inspect.isroutine)
- if not t[0].startswith("_")
- )
+ methods = (t for t in inspect.getmembers(base, inspect.isroutine) if not t[0].startswith("_"))
for name, method in methods:
if name in clsdict:
continue
method_with_values = needs_values(method)
if name in mutating_methods:
- method_with_values = set_dirty_and_flush_changes(
- method_with_values
- )
+ method_with_values = set_dirty_and_flush_changes(method_with_values)
# END mutating methods handling
clsdict[name] = method_with_values
@@ -209,9 +199,7 @@ class SectionConstraint(Generic[T_ConfigParser]):
self._config.__enter__()
return self
- def __exit__(
- self, exception_type: str, exception_value: str, traceback: str
- ) -> None:
+ def __exit__(self, exception_type: str, exception_value: str, traceback: str) -> None:
self._config.__exit__(exception_type, exception_value, traceback)
@@ -269,16 +257,12 @@ def get_config_path(config_level: Lit_config_levels) -> str:
if config_level == "system":
return "/etc/gitconfig"
elif config_level == "user":
- config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(
- os.environ.get("HOME", "~"), ".config"
- )
+ config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", "~"), ".config")
return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config")))
elif config_level == "global":
return osp.normpath(osp.expanduser("~/.gitconfig"))
elif config_level == "repository":
- raise ValueError(
- "No repo to get repository configuration from. Use Repo._get_config_path"
- )
+ raise ValueError("No repo to get repository configuration from. Use Repo._get_config_path")
else:
# Should not reach here. Will raise ValueError if does. Static typing will warn missing elifs
assert_never(
@@ -327,9 +311,7 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder):
def __init__(
self,
- file_or_files: Union[
- None, PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]
- ] = None,
+ file_or_files: Union[None, PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = None,
read_only: bool = True,
merge_includes: bool = True,
config_level: Union[Lit_config_levels, None] = None,
@@ -363,21 +345,15 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder):
self._proxies = self._dict()
if file_or_files is not None:
- self._file_or_files: Union[
- PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]
- ] = file_or_files
+ self._file_or_files: Union[PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = file_or_files
else:
if config_level is None:
if read_only:
self._file_or_files = [
- get_config_path(cast(Lit_config_levels, f))
- for f in CONFIG_LEVELS
- if f != "repository"
+ get_config_path(cast(Lit_config_levels, f)) for f in CONFIG_LEVELS if f != "repository"
]
else:
- raise ValueError(
- "No configuration level or configuration files specified"
- )
+ raise ValueError("No configuration level or configuration files specified")
else:
self._file_or_files = [get_config_path(config_level)]
@@ -434,9 +410,7 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder):
try:
self.write()
except IOError:
- log.error(
- "Exception during destruction of GitConfigParser", exc_info=True
- )
+ log.error("Exception during destruction of GitConfigParser", exc_info=True)
except ReferenceError:
# This happens in PY3 ... and usually means that some state cannot be written
# as the sections dict cannot be iterated
@@ -512,11 +486,7 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder):
if mo:
# We might just have handled the last line, which could contain a quotation we want to remove
optname, vi, optval = mo.group("option", "vi", "value")
- if (
- vi in ("=", ":")
- and ";" in optval
- and not optval.strip().startswith('"')
- ):
+ if vi in ("=", ":") and ";" in optval and not optval.strip().startswith('"'):
pos = optval.find(";")
if pos != -1 and optval[pos - 1].isspace():
optval = optval[:pos]
@@ -633,9 +603,7 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder):
if hasattr(file_path, "seek"):
# must be a file objectfile-object
- file_path = cast(
- IO[bytes], file_path
- ) # replace with assert to narrow type, once sure
+ file_path = cast(IO[bytes], file_path) # replace with assert to narrow type, once sure
self._read(file_path, file_path.name)
else:
# assume a path if it is not a file-object
@@ -658,9 +626,7 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder):
continue
# end ignore relative paths if we don't know the configuration file path
file_path = cast(PathLike, file_path)
- assert osp.isabs(
- file_path
- ), "Need absolute paths to be sure our cycle checks will work"
+ assert osp.isabs(file_path), "Need absolute paths to be sure our cycle checks will work"
include_path = osp.join(osp.dirname(file_path), include_path)
# end make include path absolute
include_path = osp.normpath(include_path)
@@ -687,21 +653,14 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder):
def write_section(name: str, section_dict: _OMD) -> None:
fp.write(("[%s]\n" % name).encode(defenc))
- values: Sequence[
- str
- ] # runtime only gets str in tests, but should be whatever _OMD stores
+ values: Sequence[str] # runtime only gets str in tests, but should be whatever _OMD stores
v: str
for (key, values) in section_dict.items_all():
if key == "__name__":
continue
for v in values:
- fp.write(
- (
- "\t%s = %s\n"
- % (key, self._value_to_string(v).replace("\n", "\n\t"))
- ).encode(defenc)
- )
+ fp.write(("\t%s = %s\n" % (key, self._value_to_string(v).replace("\n", "\n\t"))).encode(defenc))
# END if key is not __name__
# END section writing
@@ -715,11 +674,7 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder):
def items(self, section_name: str) -> List[Tuple[str, str]]: # type: ignore[override]
""":return: list((option, value), ...) pairs of all items in the given section"""
- return [
- (k, v)
- for k, v in super(GitConfigParser, self).items(section_name)
- if k != "__name__"
- ]
+ return [(k, v) for k, v in super(GitConfigParser, self).items(section_name) if k != "__name__"]
def items_all(self, section_name: str) -> List[Tuple[str, List[str]]]:
""":return: list((option, [values...]), ...) pairs of all items in the given section"""
@@ -765,9 +720,7 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder):
fp = self._file_or_files
# we have a physical file on disk, so get a lock
- is_file_lock = isinstance(
- fp, (str, os.PathLike, IOBase)
- ) # can't use Pathlike until 3.5 dropped
+ is_file_lock = isinstance(fp, (str, os.PathLike, IOBase)) # can't use Pathlike until 3.5 dropped
if is_file_lock and self._lock is not None: # else raise Error?
self._lock._obtain_lock()
@@ -785,9 +738,7 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder):
def _assure_writable(self, method_name: str) -> None:
if self.read_only:
- raise IOError(
- "Cannot execute non-constant method %s.%s" % (self, method_name)
- )
+ raise IOError("Cannot execute non-constant method %s.%s" % (self, method_name))
def add_section(self, section: str) -> None:
"""Assures added options will stay in order"""
@@ -888,9 +839,7 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder):
@needs_values
@set_dirty_and_flush_changes
- def set_value(
- self, section: str, option: str, value: Union[str, bytes, int, float, bool]
- ) -> "GitConfigParser":
+ def set_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser":
"""Sets the given option in section to the given value.
It will create the section if required, and will not throw as opposed to the default
ConfigParser 'set' method.
@@ -908,9 +857,7 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder):
@needs_values
@set_dirty_and_flush_changes
- def add_value(
- self, section: str, option: str, value: Union[str, bytes, int, float, bool]
- ) -> "GitConfigParser":
+ def add_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser":
"""Adds a value for the given option in section.
It will create the section if required, and will not throw as opposed to the default
ConfigParser 'set' method. The value becomes the new value of the option as returned
diff --git a/git/db.py b/git/db.py
index a119f4ea..bff43347 100644
--- a/git/db.py
+++ b/git/db.py
@@ -42,9 +42,7 @@ class GitCmdObjectDB(LooseObjectDB):
def stream(self, binsha: bytes) -> OStream:
"""For now, all lookup is done by git itself"""
- hexsha, typename, size, stream = self._git.stream_object_data(
- bin_to_hex(binsha)
- )
+ hexsha, typename, size, stream = self._git.stream_object_data(bin_to_hex(binsha))
return OStream(hex_to_bin(hexsha), typename, size, stream)
# { Interface
diff --git a/git/diff.py b/git/diff.py
index 6526ed68..c315a9a9 100644
--- a/git/diff.py
+++ b/git/diff.py
@@ -68,13 +68,7 @@ def decode_path(path: bytes, has_ab_prefix: bool = True) -> Optional[bytes]:
return None
if path.startswith(b'"') and path.endswith(b'"'):
- path = (
- path[1:-1]
- .replace(b"\\n", b"\n")
- .replace(b"\\t", b"\t")
- .replace(b'\\"', b'"')
- .replace(b"\\\\", b"\\")
- )
+ path = path[1:-1].replace(b"\\n", b"\n").replace(b"\\t", b"\t").replace(b'\\"', b'"').replace(b"\\\\", b"\\")
path = _octal_byte_re.sub(_octal_repl, path)
@@ -114,7 +108,7 @@ class Diffable(object):
other: Union[Type["Index"], "Tree", "Commit", None, str, object] = Index,
paths: Union[PathLike, List[PathLike], Tuple[PathLike, ...], None] = None,
create_patch: bool = False,
- **kwargs: Any
+ **kwargs: Any,
) -> "DiffIndex":
"""Creates diffs between two items being trees, trees and index or an
index and the working tree. It will detect renames automatically.
@@ -190,11 +184,7 @@ class Diffable(object):
kwargs["as_process"] = True
proc = diff_cmd(*self._process_diff_args(args), **kwargs)
- diff_method = (
- Diff._index_from_patch_format
- if create_patch
- else Diff._index_from_raw_format
- )
+ diff_method = Diff._index_from_patch_format if create_patch else Diff._index_from_raw_format
index = diff_method(self.repo, proc)
proc.wait()
@@ -247,12 +237,7 @@ class DiffIndex(List[T_Diff]):
yield diffidx
elif change_type == "R" and diffidx.renamed:
yield diffidx
- elif (
- change_type == "M"
- and diffidx.a_blob
- and diffidx.b_blob
- and diffidx.a_blob != diffidx.b_blob
- ):
+ elif change_type == "M" and diffidx.a_blob and diffidx.b_blob and diffidx.a_blob != diffidx.b_blob:
yield diffidx
# END for each diff
@@ -372,17 +357,13 @@ class Diff(object):
if a_blob_id is None or a_blob_id == self.NULL_HEX_SHA:
self.a_blob = None
else:
- self.a_blob = Blob(
- repo, hex_to_bin(a_blob_id), mode=self.a_mode, path=self.a_path
- )
+ self.a_blob = Blob(repo, hex_to_bin(a_blob_id), mode=self.a_mode, path=self.a_path)
self.b_blob: Union["IndexObject", None]
if b_blob_id is None or b_blob_id == self.NULL_HEX_SHA:
self.b_blob = None
else:
- self.b_blob = Blob(
- repo, hex_to_bin(b_blob_id), mode=self.b_mode, path=self.b_path
- )
+ self.b_blob = Blob(repo, hex_to_bin(b_blob_id), mode=self.b_mode, path=self.b_path)
self.new_file: bool = new_file
self.deleted_file: bool = deleted_file
@@ -447,11 +428,7 @@ class Diff(object):
if self.diff:
msg += "\n---"
try:
- msg += (
- self.diff.decode(defenc)
- if isinstance(self.diff, bytes)
- else self.diff
- )
+ msg += self.diff.decode(defenc) if isinstance(self.diff, bytes) else self.diff
except UnicodeDecodeError:
msg += "OMITTED BINARY DATA"
# end handle encoding
@@ -474,17 +451,11 @@ class Diff(object):
@property
def rename_from(self) -> Optional[str]:
- return (
- self.raw_rename_from.decode(defenc, "replace")
- if self.raw_rename_from
- else None
- )
+ return self.raw_rename_from.decode(defenc, "replace") if self.raw_rename_from else None
@property
def rename_to(self) -> Optional[str]:
- return (
- self.raw_rename_to.decode(defenc, "replace") if self.raw_rename_to else None
- )
+ return self.raw_rename_to.decode(defenc, "replace") if self.raw_rename_to else None
@property
def renamed(self) -> bool:
@@ -499,9 +470,7 @@ class Diff(object):
return self.rename_from != self.rename_to
@classmethod
- def _pick_best_path(
- cls, path_match: bytes, rename_match: bytes, path_fallback_match: bytes
- ) -> Optional[bytes]:
+ def _pick_best_path(cls, path_match: bytes, rename_match: bytes, path_fallback_match: bytes) -> Optional[bytes]:
if path_match:
return decode_path(path_match)
@@ -514,9 +483,7 @@ class Diff(object):
return None
@classmethod
- def _index_from_patch_format(
- cls, repo: "Repo", proc: Union["Popen", "Git.AutoInterrupt"]
- ) -> DiffIndex:
+ def _index_from_patch_format(cls, repo: "Repo", proc: Union["Popen", "Git.AutoInterrupt"]) -> DiffIndex:
"""Create a new DiffIndex from the given text which must be in patch format
:param repo: is the repository we are operating on - it is required
:param stream: result of 'git diff' as a stream (supporting file protocol)
@@ -524,9 +491,7 @@ class Diff(object):
## FIXME: Here SLURPING raw, need to re-phrase header-regexes linewise.
text_list: List[bytes] = []
- handle_process_output(
- proc, text_list.append, None, finalize_process, decode_streams=False
- )
+ handle_process_output(proc, text_list.append, None, finalize_process, decode_streams=False)
# for now, we have to bake the stream
text = b"".join(text_list)
@@ -570,11 +535,7 @@ class Diff(object):
# Make sure the mode is set if the path is set. Otherwise the resulting blob is invalid
# We just use the one mode we should have parsed
- a_mode = (
- old_mode
- or deleted_file_mode
- or (a_path and (b_mode or new_mode or new_file_mode))
- )
+ a_mode = old_mode or deleted_file_mode or (a_path and (b_mode or new_mode or new_file_mode))
b_mode = b_mode or new_mode or new_file_mode or (b_path and a_mode)
index.append(
Diff(
diff --git a/git/exc.py b/git/exc.py
index 487ce179..22fcde0d 100644
--- a/git/exc.py
+++ b/git/exc.py
@@ -92,9 +92,7 @@ class GitCommandNotFound(CommandError):
"""Thrown if we cannot find the `git` executable in the PATH or at the path given by
the GIT_PYTHON_GIT_EXECUTABLE environment variable"""
- def __init__(
- self, command: Union[List[str], Tuple[str], str], cause: Union[str, Exception]
- ) -> None:
+ def __init__(self, command: Union[List[str], Tuple[str], str], cause: Union[str, Exception]) -> None:
super(GitCommandNotFound, self).__init__(command, cause)
self._msg = "Cmd('%s') not found%s"
diff --git a/git/index/base.py b/git/index/base.py
index 48894833..edc64875 100644
--- a/git/index/base.py
+++ b/git/index/base.py
@@ -173,18 +173,14 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
def _deserialize(self, stream: IO) -> "IndexFile":
"""Initialize this instance with index values read from the given stream"""
- self.version, self.entries, self._extension_data, _conten_sha = read_cache(
- stream
- )
+ self.version, self.entries, self._extension_data, _conten_sha = read_cache(stream)
return self
def _entries_sorted(self) -> List[IndexEntry]:
""":return: list of entries, in a sorted fashion, first by path, then by stage"""
return sorted(self.entries.values(), key=lambda e: (e.path, e.stage))
- def _serialize(
- self, stream: IO, ignore_extension_data: bool = False
- ) -> "IndexFile":
+ def _serialize(self, stream: IO, ignore_extension_data: bool = False) -> "IndexFile":
entries = self._entries_sorted()
extension_data = self._extension_data # type: Union[None, bytes]
if ignore_extension_data:
@@ -242,9 +238,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
@post_clear_cache
@default_index
- def merge_tree(
- self, rhs: Treeish, base: Union[None, Treeish] = None
- ) -> "IndexFile":
+ def merge_tree(self, rhs: Treeish, base: Union[None, Treeish] = None) -> "IndexFile":
"""Merge the given rhs treeish into the current index, possibly taking
a common base treeish into account.
@@ -344,9 +338,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
it will be temporarily moved out of the way to assure there are no unsuspected
interferences."""
if len(treeish) == 0 or len(treeish) > 3:
- raise ValueError(
- "Please specify between 1 and 3 treeish, got %i" % len(treeish)
- )
+ raise ValueError("Please specify between 1 and 3 treeish, got %i" % len(treeish))
arg_list: List[Union[Treeish, str]] = []
# ignore that working tree and index possibly are out of date
@@ -383,9 +375,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# UTILITIES
@unbare_repo
- def _iter_expand_paths(
- self: "IndexFile", paths: Sequence[PathLike]
- ) -> Iterator[PathLike]:
+ def _iter_expand_paths(self: "IndexFile", paths: Sequence[PathLike]) -> Iterator[PathLike]:
"""Expand the directories in list of paths to the corresponding paths accordingly,
Note: git will add items multiple times even if a glob overlapped
@@ -415,9 +405,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# end check symlink
# if the path is not already pointing to an existing file, resolve globs if possible
- if not os.path.exists(abs_path) and (
- "?" in abs_path or "*" in abs_path or "[" in abs_path
- ):
+ if not os.path.exists(abs_path) and ("?" in abs_path or "*" in abs_path or "[" in abs_path):
resolved_paths = glob.glob(abs_path)
# not abs_path in resolved_paths:
# a glob() resolving to the same path we are feeding it with
@@ -525,9 +513,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
return path_map
@classmethod
- def entry_key(
- cls, *entry: Union[BaseIndexEntry, PathLike, StageType]
- ) -> Tuple[PathLike, StageType]:
+ def entry_key(cls, *entry: Union[BaseIndexEntry, PathLike, StageType]) -> Tuple[PathLike, StageType]:
return entry_key(*entry)
def resolve_blobs(self, iter_blobs: Iterator[Blob]) -> "IndexFile":
@@ -621,10 +607,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
if self.repo.bare:
raise InvalidGitRepositoryError("require non-bare repository")
if not str(path).startswith(str(self.repo.working_tree_dir)):
- raise ValueError(
- "Absolute path %r is not in git repository at %r"
- % (path, self.repo.working_tree_dir)
- )
+ raise ValueError("Absolute path %r is not in git repository at %r" % (path, self.repo.working_tree_dir))
return os.path.relpath(path, self.repo.working_tree_dir)
def _preprocess_add_items(
@@ -655,9 +638,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
st = os.lstat(filepath) # handles non-symlinks as well
if S_ISLNK(st.st_mode):
# in PY3, readlink is string, but we need bytes. In PY2, it's just OS encoded bytes, we assume UTF-8
- open_stream: Callable[[], BinaryIO] = lambda: BytesIO(
- force_bytes(os.readlink(filepath), encoding=defenc)
- )
+ open_stream: Callable[[], BinaryIO] = lambda: BytesIO(force_bytes(os.readlink(filepath), encoding=defenc))
else:
open_stream = lambda: open(filepath, "rb")
with open_stream() as stream:
@@ -830,9 +811,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# That way, we are OK on a bare repository as well.
# If there are no paths, the rewriter has nothing to do either
if paths:
- entries_added.extend(
- self._entries_for_paths(paths, path_rewriter, fprogress, entries)
- )
+ entries_added.extend(self._entries_for_paths(paths, path_rewriter, fprogress, entries))
# HANDLE ENTRIES
if entries:
@@ -845,9 +824,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# HANDLE ENTRY OBJECT CREATION
# create objects if required, otherwise go with the existing shas
- null_entries_indices = [
- i for i, e in enumerate(entries) if e.binsha == Object.NULL_BIN_SHA
- ]
+ null_entries_indices = [i for i, e in enumerate(entries) if e.binsha == Object.NULL_BIN_SHA]
if null_entries_indices:
@git_working_dir
@@ -876,9 +853,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# all object sha's
if path_rewriter:
for i, e in enumerate(entries):
- entries[i] = BaseIndexEntry(
- (e.mode, e.binsha, e.stage, path_rewriter(e))
- )
+ entries[i] = BaseIndexEntry((e.mode, e.binsha, e.stage, path_rewriter(e)))
# END for each entry
# END handle path rewriting
@@ -906,9 +881,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
def _items_to_rela_paths(
self,
- items: Union[
- PathLike, Sequence[Union[PathLike, BaseIndexEntry, Blob, Submodule]]
- ],
+ items: Union[PathLike, Sequence[Union[PathLike, BaseIndexEntry, Blob, Submodule]]],
) -> List[PathLike]:
"""Returns a list of repo-relative paths from the given items which
may be absolute or relative paths, entries or blobs"""
@@ -933,7 +906,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
self,
items: Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]],
working_tree: bool = False,
- **kwargs: Any
+ **kwargs: Any,
) -> List[str]:
"""Remove the given items from the index and optionally from
the working tree as well.
@@ -989,7 +962,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
self,
items: Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]],
skip_errors: bool = False,
- **kwargs: Any
+ **kwargs: Any,
) -> List[Tuple[str, str]]:
"""Rename/move the items, whereas the last item is considered the destination of
the move operation. If the destination is a file, the first item ( of two )
@@ -1020,9 +993,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
paths = self._items_to_rela_paths(items)
if len(paths) < 2:
- raise ValueError(
- "Please provide at least one source and one destination of the move operation"
- )
+ raise ValueError("Please provide at least one source and one destination of the move operation")
was_dry_run = kwargs.pop("dry_run", kwargs.pop("n", None))
kwargs["dry_run"] = True
@@ -1110,9 +1081,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
def _commit_editmsg_filepath(self) -> str:
return osp.join(self.repo.common_dir, "COMMIT_EDITMSG")
- def _flush_stdin_and_wait(
- cls, proc: "Popen[bytes]", ignore_stdout: bool = False
- ) -> bytes:
+ def _flush_stdin_and_wait(cls, proc: "Popen[bytes]", ignore_stdout: bool = False) -> bytes:
stdin_IO = proc.stdin
if stdin_IO:
stdin_IO.flush()
@@ -1133,7 +1102,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
paths: Union[None, Iterable[PathLike]] = None,
force: bool = False,
fprogress: Callable = lambda *args: None,
- **kwargs: Any
+ **kwargs: Any,
) -> Union[None, Iterator[PathLike], Sequence[PathLike]]:
"""Checkout the given paths or all files from the version known to the index into
the working tree.
@@ -1185,9 +1154,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
failed_reasons = []
unknown_lines = []
- def handle_stderr(
- proc: "Popen[bytes]", iter_checked_out_files: Iterable[PathLike]
- ) -> None:
+ def handle_stderr(proc: "Popen[bytes]", iter_checked_out_files: Iterable[PathLike]) -> None:
stderr_IO = proc.stderr
if not stderr_IO:
@@ -1204,9 +1171,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
" is unmerged",
)
for line in stderr.splitlines():
- if not line.startswith("git checkout-index: ") and not line.startswith(
- "git-checkout-index: "
- ):
+ if not line.startswith("git checkout-index: ") and not line.startswith("git-checkout-index: "):
is_a_dir = " is a directory"
unlink_issue = "unable to unlink old '"
already_exists_issue = " already exists, no checkout" # created by entry.c:checkout_entry(...)
@@ -1269,9 +1234,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
kwargs["istream"] = subprocess.PIPE
proc = self.repo.git.checkout_index(args, **kwargs)
# FIXME: Reading from GIL!
- make_exc = lambda: GitCommandError(
- ("git-checkout-index",) + tuple(args), 128, proc.stderr.read()
- )
+ make_exc = lambda: GitCommandError(("git-checkout-index",) + tuple(args), 128, proc.stderr.read())
checked_out_files: List[PathLike] = []
for path in paths:
@@ -1288,9 +1251,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
for entry in self.entries.values():
if str(entry.path).startswith(folder):
p = entry.path
- self._write_path_to_stdin(
- proc, p, p, make_exc, fprogress, read_from_stdout=False
- )
+ self._write_path_to_stdin(proc, p, p, make_exc, fprogress, read_from_stdout=False)
checked_out_files.append(p)
path_is_directory = True
# END if entry is in directory
@@ -1298,9 +1259,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# END path exception handlnig
if not path_is_directory:
- self._write_path_to_stdin(
- proc, co_path, path, make_exc, fprogress, read_from_stdout=False
- )
+ self._write_path_to_stdin(proc, co_path, path, make_exc, fprogress, read_from_stdout=False)
checked_out_files.append(co_path)
# END path is a file
# END for each path
@@ -1326,7 +1285,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
working_tree: bool = False,
paths: Union[None, Iterable[PathLike]] = None,
head: bool = False,
- **kwargs: Any
+ **kwargs: Any,
) -> "IndexFile":
"""Reset the index to reflect the tree at the given commit. This will not
adjust our HEAD reference as opposed to HEAD.reset by default.
@@ -1389,9 +1348,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# END handle working tree
if head:
- self.repo.head.set_commit(
- self.repo.commit(commit), logmsg="%s: Updating HEAD" % commit
- )
+ self.repo.head.set_commit(self.repo.commit(commit), logmsg="%s: Updating HEAD" % commit)
# END handle head change
return self
@@ -1399,12 +1356,10 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# @ default_index, breaks typing for some reason, copied into function
def diff(
self, # type: ignore[override]
- other: Union[
- Type["git_diff.Diffable.Index"], "Tree", "Commit", str, None
- ] = git_diff.Diffable.Index,
+ other: Union[Type["git_diff.Diffable.Index"], "Tree", "Commit", str, None] = git_diff.Diffable.Index,
paths: Union[PathLike, List[PathLike], Tuple[PathLike, ...], None] = None,
create_patch: bool = False,
- **kwargs: Any
+ **kwargs: Any,
) -> git_diff.DiffIndex:
"""Diff this index against the working copy or a Tree or Commit object
@@ -1418,10 +1373,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# only run if we are the default repository index
if self._file_path != self._index_path():
- raise AssertionError(
- "Cannot call %r on indices that do not represent the default git index"
- % self.diff()
- )
+ raise AssertionError("Cannot call %r on indices that do not represent the default git index" % self.diff())
# index against index is always empty
if other is self.Index:
return git_diff.DiffIndex()
@@ -1442,9 +1394,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# if other is not None here, something is wrong
if other is not None:
- raise ValueError(
- "other must be None, Diffable.Index, a Tree or Commit, was %r" % other
- )
+ raise ValueError("other must be None, Diffable.Index, a Tree or Commit, was %r" % other)
# diff against working copy - can be handled by superclass natively
return super(IndexFile, self).diff(other, paths, create_patch, **kwargs)
diff --git a/git/index/fun.py b/git/index/fun.py
index e8dead86..4659ac89 100644
--- a/git/index/fun.py
+++ b/git/index/fun.py
@@ -115,9 +115,7 @@ def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None:
else:
stdout_list: List[str] = []
stderr_list: List[str] = []
- handle_process_output(
- cmd, stdout_list.append, stderr_list.append, finalize_process
- )
+ handle_process_output(cmd, stdout_list.append, stderr_list.append, finalize_process)
stdout = "".join(stdout_list)
stderr = "".join(stderr_list)
if cmd.returncode != 0:
@@ -134,9 +132,7 @@ def stat_mode_to_index_mode(mode: int) -> int:
return S_IFLNK
if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules
return S_IFGITLINK
- return S_IFREG | (
- mode & S_IXUSR and 0o755 or 0o644
- ) # blobs with or without executable bit
+ return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) # blobs with or without executable bit
def write_cache(
@@ -253,17 +249,13 @@ def read_cache(
beginoffset = tell()
ctime = unpack(">8s", read(8))[0]
mtime = unpack(">8s", read(8))[0]
- (dev, ino, mode, uid, gid, size, sha, flags) = unpack(
- ">LLLLLL20sH", read(20 + 4 * 6 + 2)
- )
+ (dev, ino, mode, uid, gid, size, sha, flags) = unpack(">LLLLLL20sH", read(20 + 4 * 6 + 2))
path_size = flags & CE_NAMEMASK
path = read(path_size).decode(defenc)
real_size = (tell() - beginoffset + 8) & ~7
read((beginoffset + real_size) - tell())
- entry = IndexEntry(
- (mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size)
- )
+ entry = IndexEntry((mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size))
# entry_key would be the method to use, but we safe the effort
entries[(path, entry.stage)] = entry
count += 1
@@ -276,10 +268,9 @@ def read_cache(
# 4 bytes length of chunk
# repeated 0 - N times
extension_data = stream.read(~0)
- assert len(extension_data) > 19, (
- "Index Footer was not at least a sha on content as it was only %i bytes in size"
- % len(extension_data)
- )
+ assert (
+ len(extension_data) > 19
+ ), "Index Footer was not at least a sha on content as it was only %i bytes in size" % len(extension_data)
content_sha = extension_data[-20:]
@@ -330,9 +321,7 @@ def write_tree_from_cache(
# enter recursion
# ci - 1 as we want to count our current item as well
- sha, _tree_entry_list = write_tree_from_cache(
- entries, odb, slice(ci - 1, xi), rbound + 1
- )
+ sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1)
tree_items.append((sha, S_IFDIR, base))
# skip ahead
@@ -342,26 +331,18 @@ def write_tree_from_cache(
# finally create the tree
sio = BytesIO()
- tree_to_stream(
- tree_items, sio.write
- ) # writes to stream as bytes, but doesn't change tree_items
+ tree_to_stream(tree_items, sio.write) # writes to stream as bytes, but doesn't change tree_items
sio.seek(0)
istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio))
return (istream.binsha, tree_items)
-def _tree_entry_to_baseindexentry(
- tree_entry: "TreeCacheTup", stage: int
-) -> BaseIndexEntry:
- return BaseIndexEntry(
- (tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])
- )
+def _tree_entry_to_baseindexentry(tree_entry: "TreeCacheTup", stage: int) -> BaseIndexEntry:
+ return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2]))
-def aggressive_tree_merge(
- odb: "GitCmdObjectDB", tree_shas: Sequence[bytes]
-) -> List[BaseIndexEntry]:
+def aggressive_tree_merge(odb: "GitCmdObjectDB", tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]:
"""
:return: list of BaseIndexEntries representing the aggressive merge of the given
trees. All valid entries are on stage 0, whereas the conflicting ones are left
@@ -394,14 +375,8 @@ def aggressive_tree_merge(
# it exists in all branches, if it was changed in both
# its a conflict, otherwise we take the changed version
# This should be the most common branch, so it comes first
- if (
- base[0] != ours[0]
- and base[0] != theirs[0]
- and ours[0] != theirs[0]
- ) or (
- base[1] != ours[1]
- and base[1] != theirs[1]
- and ours[1] != theirs[1]
+ if (base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or (
+ base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1]
):
# changed by both
out.append(_tree_entry_to_baseindexentry(base, 1))
diff --git a/git/index/typ.py b/git/index/typ.py
index cbe26f27..6371953b 100644
--- a/git/index/typ.py
+++ b/git/index/typ.py
@@ -159,9 +159,7 @@ class IndexEntry(BaseIndexEntry):
:param base: Instance of type BaseIndexEntry"""
time = pack(">LL", 0, 0)
- return IndexEntry(
- (base.mode, base.binsha, base.flags, base.path, time, time, 0, 0, 0, 0, 0)
- )
+ return IndexEntry((base.mode, base.binsha, base.flags, base.path, time, time, 0, 0, 0, 0, 0))
@classmethod
def from_blob(cls, blob: Blob, stage: int = 0) -> "IndexEntry":
diff --git a/git/index/util.py b/git/index/util.py
index 7339b147..bfc7fadd 100644
--- a/git/index/util.py
+++ b/git/index/util.py
@@ -69,9 +69,7 @@ def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]:
"""
@wraps(func)
- def post_clear_cache_if_not_raised(
- self: "IndexFile", *args: Any, **kwargs: Any
- ) -> _T:
+ def post_clear_cache_if_not_raised(self: "IndexFile", *args: Any, **kwargs: Any) -> _T:
rval = func(self, *args, **kwargs)
self._delete_entries_cache()
return rval
@@ -90,8 +88,7 @@ def default_index(func: Callable[..., _T]) -> Callable[..., _T]:
def check_default_index(self: "IndexFile", *args: Any, **kwargs: Any) -> _T:
if self._file_path != self._index_path():
raise AssertionError(
- "Cannot call %r on indices that do not represent the default git index"
- % func.__name__
+ "Cannot call %r on indices that do not represent the default git index" % func.__name__
)
return func(self, *args, **kwargs)
diff --git a/git/objects/__init__.py b/git/objects/__init__.py
index d2e1e53a..5910ac58 100644
--- a/git/objects/__init__.py
+++ b/git/objects/__init__.py
@@ -21,8 +21,4 @@ del smutil
# must come after submodule was made available
-__all__ = [
- name
- for name, obj in locals().items()
- if not (name.startswith("_") or inspect.ismodule(obj))
-]
+__all__ = [name for name, obj in locals().items() if not (name.startswith("_") or inspect.ismodule(obj))]
diff --git a/git/objects/commit.py b/git/objects/commit.py
index 137cc620..66cb9191 100644
--- a/git/objects/commit.py
+++ b/git/objects/commit.py
@@ -144,9 +144,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
super(Commit, self).__init__(repo, binsha)
self.binsha = binsha
if tree is not None:
- assert isinstance(
- tree, Tree
- ), "Tree needs to be a Tree instance, was %s" % type(tree)
+ assert isinstance(tree, Tree), "Tree needs to be a Tree instance, was %s" % type(tree)
if tree is not None:
self.tree = tree
if author is not None:
@@ -234,9 +232,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
else:
return self.message.split(b"\n", 1)[0]
- def count(
- self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any
- ) -> int:
+ def count(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> int:
"""Count the number of commits reachable from this commit
:param paths:
@@ -250,9 +246,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# yes, it makes a difference whether empty paths are given or not in our case
# as the empty paths version will ignore merge commits for some reason.
if paths:
- return len(
- self.repo.git.rev_list(self.hexsha, "--", paths, **kwargs).splitlines()
- )
+ return len(self.repo.git.rev_list(self.hexsha, "--", paths, **kwargs).splitlines())
return len(self.repo.git.rev_list(self.hexsha, **kwargs).splitlines())
@property
@@ -285,9 +279,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
``since`` all commits since i.e. '1970-01-01'
:return: iterator yielding Commit items"""
if "pretty" in kwargs:
- raise ValueError(
- "--pretty cannot be used as parsing expects single sha's only"
- )
+ raise ValueError("--pretty cannot be used as parsing expects single sha's only")
# END handle pretty
# use -- in any case, to prevent possibility of ambiguous arguments
@@ -308,9 +300,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs)
return cls._iter_from_process_or_stream(repo, proc)
- def iter_parents(
- self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any
- ) -> Iterator["Commit"]:
+ def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> Iterator["Commit"]:
"""Iterate _all_ parents of this commit.
:param paths:
@@ -340,9 +330,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
text2 += "%s\t%s\t%s\n" % (insertions, deletions, filename)
text = text2
else:
- text = self.repo.git.diff(
- self.parents[0].hexsha, self.hexsha, "--", numstat=True
- )
+ text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, "--", numstat=True)
return Stats._list_from_string(self.repo, text)
@property
@@ -394,9 +382,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
return d
@classmethod
- def _iter_from_process_or_stream(
- cls, repo: "Repo", proc_or_stream: Union[Popen, IO]
- ) -> Iterator["Commit"]:
+ def _iter_from_process_or_stream(cls, repo: "Repo", proc_or_stream: Union[Popen, IO]) -> Iterator["Commit"]:
"""Parse out commit information into a list of Commit objects
We expect one-line per commit, and parse the actual commit information directly
from our lighting fast object database
@@ -577,9 +563,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
new_commit,
logmsg="commit (initial): %s" % message,
)
- repo.head.set_reference(
- master, logmsg="commit: Switching to %s" % master
- )
+ repo.head.set_reference(master, logmsg="commit: Switching to %s" % master)
# END handle empty repositories
# END advance head handling
@@ -652,9 +636,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
Otherwise it is assumed to be a plain data stream from our object
"""
readline = stream.readline
- self.tree = Tree(
- self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, ""
- )
+ self.tree = Tree(self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, "")
self.parents = []
next_line = None
@@ -664,11 +646,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
next_line = parent_line
break
# END abort reading parents
- self.parents.append(
- type(self)(
- self.repo, hex_to_bin(parent_line.split()[-1].decode("ascii"))
- )
- )
+ self.parents.append(type(self)(self.repo, hex_to_bin(parent_line.split()[-1].decode("ascii"))))
# END for each parent line
self.parents = tuple(self.parents)
@@ -694,9 +672,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
buf = enc.strip()
while buf:
if buf[0:10] == b"encoding ":
- self.encoding = buf[buf.find(b" ") + 1 :].decode(
- self.encoding, "ignore"
- )
+ self.encoding = buf[buf.find(b" ") + 1 :].decode(self.encoding, "ignore")
elif buf[0:7] == b"gpgsig ":
sig = buf[buf.find(b" ") + 1 :] + b"\n"
is_next_header = False
diff --git a/git/objects/fun.py b/git/objects/fun.py
index de065599..001e10e4 100644
--- a/git/objects/fun.py
+++ b/git/objects/fun.py
@@ -35,9 +35,7 @@ __all__ = (
)
-def tree_to_stream(
- entries: Sequence[EntryTup], write: Callable[["ReadableBuffer"], Union[int, None]]
-) -> None:
+def tree_to_stream(entries: Sequence[EntryTup], write: Callable[["ReadableBuffer"], Union[int, None]]) -> None:
"""Write the give list of entries into a stream using its write method
:param entries: **sorted** list of tuples with (binsha, mode, name)
:param write: write method which takes a data string"""
@@ -114,9 +112,7 @@ def tree_entries_from_data(data: bytes) -> List[EntryTup]:
return out
-def _find_by_name(
- tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: bool, start_at: int
-) -> EntryTupOrNone:
+def _find_by_name(tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: bool, start_at: int) -> EntryTupOrNone:
"""return data entry matching the given name and tree mode
or None.
Before the item is returned, the respective data item is set
@@ -234,9 +230,7 @@ def traverse_trees_recursive(
return out
-def traverse_tree_recursive(
- odb: "GitCmdObjectDB", tree_sha: bytes, path_prefix: str
-) -> List[EntryTup]:
+def traverse_tree_recursive(odb: "GitCmdObjectDB", tree_sha: bytes, path_prefix: str) -> List[EntryTup]:
"""
:return: list of entries of the tree pointed to by the binary tree_sha. An entry
has the following format:
diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py
index 84a34206..bdcdf1ec 100644
--- a/git/objects/submodule/base.py
+++ b/git/objects/submodule/base.py
@@ -68,10 +68,7 @@ class UpdateProgress(RemoteProgress):
"""Class providing detailed progress information to the caller who should
derive from it and implement the ``update(...)`` message"""
- CLONE, FETCH, UPDWKTREE = [
- 1 << x
- for x in range(RemoteProgress._num_op_codes, RemoteProgress._num_op_codes + 3)
- ]
+ CLONE, FETCH, UPDWKTREE = [1 << x for x in range(RemoteProgress._num_op_codes, RemoteProgress._num_op_codes + 3)]
_num_op_codes: int = RemoteProgress._num_op_codes + 3
__slots__ = ()
@@ -101,9 +98,7 @@ class Submodule(IndexObject, TraversableIterableObj):
k_modules_file = ".gitmodules"
k_head_option = "branch"
k_head_default = "master"
- k_default_mode = (
- stat.S_IFDIR | stat.S_IFLNK
- ) # submodules are directories with link-status
+ k_default_mode = stat.S_IFDIR | stat.S_IFLNK # submodules are directories with link-status
# this is a bogus type for base class compatibility
type: Literal["submodule"] = "submodule" # type: ignore
@@ -156,13 +151,9 @@ class Submodule(IndexObject, TraversableIterableObj):
# end
self._url = reader.get("url")
# git-python extension values - optional
- self._branch_path = reader.get_value(
- self.k_head_option, git.Head.to_full_path(self.k_head_default)
- )
+ self._branch_path = reader.get_value(self.k_head_option, git.Head.to_full_path(self.k_head_default))
elif attr == "_name":
- raise AttributeError(
- "Cannot retrieve the name of a submodule if it was not set initially"
- )
+ raise AttributeError("Cannot retrieve the name of a submodule if it was not set initially")
else:
super(Submodule, self)._set_cache_(attr)
# END handle attribute name
@@ -227,23 +218,18 @@ class Submodule(IndexObject, TraversableIterableObj):
if not repo.bare and parent_matches_head and repo.working_tree_dir:
fp_module = osp.join(repo.working_tree_dir, cls.k_modules_file)
else:
- assert (
- parent_commit is not None
- ), "need valid parent_commit in bare repositories"
+ assert parent_commit is not None, "need valid parent_commit in bare repositories"
try:
fp_module = cls._sio_modules(parent_commit)
except KeyError as e:
raise IOError(
- "Could not find %s file in the tree of parent commit %s"
- % (cls.k_modules_file, parent_commit)
+ "Could not find %s file in the tree of parent commit %s" % (cls.k_modules_file, parent_commit)
) from e
# END handle exceptions
# END handle non-bare working tree
if not read_only and (repo.bare or not parent_matches_head):
- raise ValueError(
- "Cannot write blobs of 'historical' submodule configurations"
- )
+ raise ValueError("Cannot write blobs of 'historical' submodule configurations")
# END handle writes of historical submodules
return SubmoduleConfigParser(fp_module, read_only=read_only)
@@ -277,9 +263,7 @@ class Submodule(IndexObject, TraversableIterableObj):
return SectionConstraint(parser, sm_section(self.name))
@classmethod
- def _module_abspath(
- cls, parent_repo: "Repo", path: PathLike, name: str
- ) -> PathLike:
+ def _module_abspath(cls, parent_repo: "Repo", path: PathLike, name: str) -> PathLike:
if cls._need_gitfile_submodules(parent_repo.git):
return osp.join(parent_repo.git_dir, "modules", name)
if parent_repo.working_tree_dir:
@@ -288,9 +272,7 @@ class Submodule(IndexObject, TraversableIterableObj):
# end
@classmethod
- def _clone_repo(
- cls, repo: "Repo", url: str, path: PathLike, name: str, **kwargs: Any
- ) -> "Repo":
+ def _clone_repo(cls, repo: "Repo", url: str, path: PathLike, name: str, **kwargs: Any) -> "Repo":
""":return: Repo instance of newly cloned repository
:param repo: our parent repository
:param url: url to clone from
@@ -331,19 +313,14 @@ class Submodule(IndexObject, TraversableIterableObj):
)
path = path[len(working_tree_linux.rstrip("/")) + 1 :]
if not path:
- raise ValueError(
- "Absolute submodule path '%s' didn't yield a valid relative path"
- % path
- )
+ raise ValueError("Absolute submodule path '%s' didn't yield a valid relative path" % path)
# end verify converted relative path makes sense
# end convert to a relative path
return path
@classmethod
- def _write_git_file_and_module_config(
- cls, working_tree_dir: PathLike, module_abspath: PathLike
- ) -> None:
+ def _write_git_file_and_module_config(cls, working_tree_dir: PathLike, module_abspath: PathLike) -> None:
"""Writes a .git file containing a(preferably) relative path to the actual git module repository.
It is an error if the module_abspath cannot be made into a relative path, relative to the working_tree_dir
:note: will overwrite existing files !
@@ -361,15 +338,11 @@ class Submodule(IndexObject, TraversableIterableObj):
with open(git_file, "wb") as fp:
fp.write(("gitdir: %s" % rela_path).encode(defenc))
- with GitConfigParser(
- osp.join(module_abspath, "config"), read_only=False, merge_includes=False
- ) as writer:
+ with GitConfigParser(osp.join(module_abspath, "config"), read_only=False, merge_includes=False) as writer:
writer.set_value(
"core",
"worktree",
- to_native_path_linux(
- osp.relpath(working_tree_dir, start=module_abspath)
- ),
+ to_native_path_linux(osp.relpath(working_tree_dir, start=module_abspath)),
)
# { Edit Interface
@@ -426,9 +399,7 @@ class Submodule(IndexObject, TraversableIterableObj):
update fails for instance"""
if repo.bare:
- raise InvalidGitRepositoryError(
- "Cannot add submodules to bare repositories"
- )
+ raise InvalidGitRepositoryError("Cannot add submodules to bare repositories")
# END handle bare repos
path = cls._to_relative_path(repo, path)
@@ -470,8 +441,7 @@ class Submodule(IndexObject, TraversableIterableObj):
if has_module and url is not None:
if url not in [r.url for r in sm.module().remotes]:
raise ValueError(
- "Specified URL '%s' does not match any remote url of the repository at '%s'"
- % (url, sm.abspath)
+ "Specified URL '%s' does not match any remote url of the repository at '%s'" % (url, sm.abspath)
)
# END check url
# END verify urls match
@@ -480,17 +450,13 @@ class Submodule(IndexObject, TraversableIterableObj):
if url is None:
if not has_module:
- raise ValueError(
- "A URL was not given and a repository did not exist at %s" % path
- )
+ raise ValueError("A URL was not given and a repository did not exist at %s" % path)
# END check url
mrepo = sm.module()
# assert isinstance(mrepo, git.Repo)
urls = [r.url for r in mrepo.remotes]
if not urls:
- raise ValueError(
- "Didn't find any remote url in repository at %s" % sm.abspath
- )
+ raise ValueError("Didn't find any remote url in repository at %s" % sm.abspath)
# END verify we have url
url = urls[0]
else:
@@ -624,8 +590,7 @@ class Submodule(IndexObject, TraversableIterableObj):
op,
i,
len_rmts,
- prefix
- + "Fetching remote %s of submodule %r" % (remote, self.name),
+ prefix + "Fetching remote %s of submodule %r" % (remote, self.name),
)
# ===============================
if not dry_run:
@@ -655,8 +620,7 @@ class Submodule(IndexObject, TraversableIterableObj):
os.rmdir(checkout_module_abspath)
except OSError as e:
raise OSError(
- "Module directory at %r does already exist and is non-empty"
- % checkout_module_abspath
+ "Module directory at %r does already exist and is non-empty" % checkout_module_abspath
) from e
# END handle OSError
# END handle directory removal
@@ -668,8 +632,7 @@ class Submodule(IndexObject, TraversableIterableObj):
0,
1,
prefix
- + "Cloning url '%s' to '%s' in submodule %r"
- % (self.url, checkout_module_abspath, self.name),
+ + "Cloning url '%s' to '%s' in submodule %r" % (self.url, checkout_module_abspath, self.name),
)
if not dry_run:
mrepo = self._clone_repo(
@@ -694,9 +657,7 @@ class Submodule(IndexObject, TraversableIterableObj):
try:
mrepo = cast("Repo", mrepo)
# find a remote which has our branch - we try to be flexible
- remote_branch = find_first_remote_branch(
- mrepo.remotes, self.branch_name
- )
+ remote_branch = find_first_remote_branch(mrepo.remotes, self.branch_name)
local_branch = mkhead(mrepo, self.branch_path)
# have a valid branch, but no checkout - make sure we can figure
@@ -711,9 +672,7 @@ class Submodule(IndexObject, TraversableIterableObj):
)
mrepo.head.reference.set_tracking_branch(remote_branch)
except (IndexError, InvalidGitRepositoryError):
- log.warning(
- "Failed to checkout tracking branch %s", self.branch_path
- )
+ log.warning("Failed to checkout tracking branch %s", self.branch_path)
# END handle tracking branch
# NOTE: Have to write the repo config file as well, otherwise
@@ -735,10 +694,7 @@ class Submodule(IndexObject, TraversableIterableObj):
# END handle dry_run
if mrepo is not None and to_latest_revision:
- msg_base = (
- "Cannot update to latest revision in repository at %r as "
- % mrepo.working_dir
- )
+ msg_base = "Cannot update to latest revision in repository at %r as " % mrepo.working_dir
if not is_detached:
rref = mrepo.head.reference.tracking_branch()
if rref is not None:
@@ -767,9 +723,7 @@ class Submodule(IndexObject, TraversableIterableObj):
may_reset = True
if mrepo.head.commit.binsha != self.NULL_BIN_SHA:
base_commit = mrepo.merge_base(mrepo.head.commit, hexsha)
- if len(base_commit) == 0 or (
- base_commit[0] is not None and base_commit[0].hexsha == hexsha
- ):
+ if len(base_commit) == 0 or (base_commit[0] is not None and base_commit[0].hexsha == hexsha):
if force:
msg = "Will force checkout or reset on local branch that is possibly in the future of"
msg += "the commit it will be checked out to, effectively 'forgetting' new commits"
@@ -786,16 +740,8 @@ class Submodule(IndexObject, TraversableIterableObj):
# end handle force
# end handle if we are in the future
- if (
- may_reset
- and not force
- and mrepo.is_dirty(
- index=True, working_tree=True, untracked_files=True
- )
- ):
- raise RepositoryDirtyError(
- mrepo, "Cannot reset a dirty repository"
- )
+ if may_reset and not force and mrepo.is_dirty(index=True, working_tree=True, untracked_files=True):
+ raise RepositoryDirtyError(mrepo, "Cannot reset a dirty repository")
# end handle force and dirty state
# end handle empty repo
@@ -805,8 +751,7 @@ class Submodule(IndexObject, TraversableIterableObj):
0,
1,
prefix
- + "Updating working tree at %s for submodule %r to revision %s"
- % (self.path, self.name, hexsha),
+ + "Updating working tree at %s for submodule %r to revision %s" % (self.path, self.name, hexsha),
)
if not dry_run and may_reset:
@@ -855,9 +800,7 @@ class Submodule(IndexObject, TraversableIterableObj):
return self
@unbare_repo
- def move(
- self, module_path: PathLike, configuration: bool = True, module: bool = True
- ) -> "Submodule":
+ def move(self, module_path: PathLike, configuration: bool = True, module: bool = True) -> "Submodule":
"""Move the submodule to a another module path. This involves physically moving
the repository at our current path, changing the configuration, as well as
adjusting our index entry accordingly.
@@ -877,9 +820,7 @@ class Submodule(IndexObject, TraversableIterableObj):
in an inconsistent state if a sub - step fails for some reason
"""
if module + configuration < 1:
- raise ValueError(
- "You must specify to move at least the module or the configuration of the submodule"
- )
+ raise ValueError("You must specify to move at least the module or the configuration of the submodule")
# END handle input
module_checkout_path = self._to_relative_path(self.repo, module_path)
@@ -889,13 +830,9 @@ class Submodule(IndexObject, TraversableIterableObj):
return self
# END handle no change
- module_checkout_abspath = join_path_native(
- str(self.repo.working_tree_dir), module_checkout_path
- )
+ module_checkout_abspath = join_path_native(str(self.repo.working_tree_dir), module_checkout_path)
if osp.isfile(module_checkout_abspath):
- raise ValueError(
- "Cannot move repository onto a file: %s" % module_checkout_abspath
- )
+ raise ValueError("Cannot move repository onto a file: %s" % module_checkout_abspath)
# END handle target files
index = self.repo.index
@@ -933,9 +870,7 @@ class Submodule(IndexObject, TraversableIterableObj):
if osp.isfile(osp.join(module_checkout_abspath, ".git")):
module_abspath = self._module_abspath(self.repo, self.path, self.name)
- self._write_git_file_and_module_config(
- module_checkout_abspath, module_abspath
- )
+ self._write_git_file_and_module_config(module_checkout_abspath, module_abspath)
# end handle git file rewrite
# END move physical module
@@ -948,14 +883,10 @@ class Submodule(IndexObject, TraversableIterableObj):
ekey = index.entry_key(self.path, 0)
entry = index.entries[ekey]
del index.entries[ekey]
- nentry = git.IndexEntry(
- entry[:3] + (module_checkout_path,) + entry[4:]
- )
+ nentry = git.IndexEntry(entry[:3] + (module_checkout_path,) + entry[4:])
index.entries[tekey] = nentry
except KeyError as e:
- raise InvalidGitRepositoryError(
- "Submodule's entry at %r did not exist" % (self.path)
- ) from e
+ raise InvalidGitRepositoryError("Submodule's entry at %r did not exist" % (self.path)) from e
# END handle submodule doesn't exist
# update configuration
@@ -1012,9 +943,7 @@ class Submodule(IndexObject, TraversableIterableObj):
:raise InvalidGitRepositoryError: thrown if the repository cannot be deleted
:raise OSError: if directories or files could not be removed"""
if not (module or configuration):
- raise ValueError(
- "Need to specify to delete at least the module, or the configuration"
- )
+ raise ValueError("Need to specify to delete at least the module, or the configuration")
# END handle parameters
# Recursively remove children of this submodule
@@ -1027,9 +956,7 @@ class Submodule(IndexObject, TraversableIterableObj):
if configuration and not dry_run and nc > 0:
# Assure we don't leave the parent repository in a dirty state, and commit our changes
# It's important for recursive, unforced, deletions to work as expected
- self.module().index.commit(
- "Removed at least one of child-modules of '%s'" % self.name
- )
+ self.module().index.commit("Removed at least one of child-modules of '%s'" % self.name)
# end handle recursion
# DELETE REPOSITORY WORKING TREE
@@ -1048,9 +975,7 @@ class Submodule(IndexObject, TraversableIterableObj):
elif osp.isdir(mp):
method = rmtree
elif osp.exists(mp):
- raise AssertionError(
- "Cannot forcibly delete repository as it was neither a link, nor a directory"
- )
+ raise AssertionError("Cannot forcibly delete repository as it was neither a link, nor a directory")
# END handle brutal deletion
if not dry_run:
assert method
@@ -1079,8 +1004,7 @@ class Submodule(IndexObject, TraversableIterableObj):
# not a single remote branch contained all our commits
if len(rrefs) and num_branches_with_new_commits == len(rrefs):
raise InvalidGitRepositoryError(
- "Cannot delete module at %s as there are new commits"
- % mod.working_tree_dir
+ "Cannot delete module at %s as there are new commits" % mod.working_tree_dir
)
# END handle new commits
# have to manually delete references as python's scoping is
@@ -1106,9 +1030,7 @@ class Submodule(IndexObject, TraversableIterableObj):
if HIDE_WINDOWS_KNOWN_ERRORS:
from unittest import SkipTest
- raise SkipTest(
- "FIXME: fails with: PermissionError\n {}".format(ex)
- ) from ex
+ raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex
raise
# END delete tree if possible
# END handle force
@@ -1121,9 +1043,7 @@ class Submodule(IndexObject, TraversableIterableObj):
if HIDE_WINDOWS_KNOWN_ERRORS:
from unittest import SkipTest
- raise SkipTest(
- f"FIXME: fails with: PermissionError\n {ex}"
- ) from ex
+ raise SkipTest(f"FIXME: fails with: PermissionError\n {ex}") from ex
else:
raise
# end handle separate bare repository
@@ -1157,9 +1077,7 @@ class Submodule(IndexObject, TraversableIterableObj):
return self
- def set_parent_commit(
- self, commit: Union[Commit_ish, None], check: bool = True
- ) -> "Submodule":
+ def set_parent_commit(self, commit: Union[Commit_ish, None], check: bool = True) -> "Submodule":
"""Set this instance to use the given commit whose tree is supposed to
contain the .gitmodules blob.
@@ -1180,10 +1098,7 @@ class Submodule(IndexObject, TraversableIterableObj):
pcommit = self.repo.commit(commit)
pctree = pcommit.tree
if self.k_modules_file not in pctree:
- raise ValueError(
- "Tree of commit %s did not contain the %s file"
- % (commit, self.k_modules_file)
- )
+ raise ValueError("Tree of commit %s did not contain the %s file" % (commit, self.k_modules_file))
# END handle exceptions
prev_pc = self._parent_commit
@@ -1193,10 +1108,7 @@ class Submodule(IndexObject, TraversableIterableObj):
parser = self._config_parser(self.repo, self._parent_commit, read_only=True)
if not parser.has_section(sm_section(self.name)):
self._parent_commit = prev_pc
- raise ValueError(
- "Submodule at path %r did not exist in parent commit %s"
- % (self.path, commit)
- )
+ raise ValueError("Submodule at path %r did not exist in parent commit %s" % (self.path, commit))
# END handle submodule did not exist
# END handle checking mode
@@ -1268,9 +1180,7 @@ class Submodule(IndexObject, TraversableIterableObj):
# .git/modules
mod = self.module()
if mod.has_separate_working_tree():
- destination_module_abspath = self._module_abspath(
- self.repo, self.path, new_name
- )
+ destination_module_abspath = self._module_abspath(self.repo, self.path, new_name)
source_dir = mod.git_dir
# Let's be sure the submodule name is not so obviously tied to a directory
if str(destination_module_abspath).startswith(str(mod.git_dir)):
@@ -1280,9 +1190,7 @@ class Submodule(IndexObject, TraversableIterableObj):
# end handle self-containment
os.renames(source_dir, destination_module_abspath)
if mod.working_tree_dir:
- self._write_git_file_and_module_config(
- mod.working_tree_dir, destination_module_abspath
- )
+ self._write_git_file_and_module_config(mod.working_tree_dir, destination_module_abspath)
# end move separate git repository
return self
@@ -1304,13 +1212,9 @@ class Submodule(IndexObject, TraversableIterableObj):
return repo
# END handle repo uninitialized
except (InvalidGitRepositoryError, NoSuchPathError) as e:
- raise InvalidGitRepositoryError(
- "No valid repository at %s" % module_checkout_abspath
- ) from e
+ raise InvalidGitRepositoryError("No valid repository at %s" % module_checkout_abspath) from e
else:
- raise InvalidGitRepositoryError(
- "Repository at %r was not yet checked out" % module_checkout_abspath
- )
+ raise InvalidGitRepositoryError("Repository at %r was not yet checked out" % module_checkout_abspath)
# END handle exceptions
def module_exists(self) -> bool:
diff --git a/git/objects/submodule/root.py b/git/objects/submodule/root.py
index 16f0f91f..0cbc262c 100644
--- a/git/objects/submodule/root.py
+++ b/git/objects/submodule/root.py
@@ -27,8 +27,7 @@ class RootUpdateProgress(UpdateProgress):
"""Utility class which adds more opcodes to the UpdateProgress"""
REMOVE, PATHCHANGE, BRANCHCHANGE, URLCHANGE = [
- 1 << x
- for x in range(UpdateProgress._num_op_codes, UpdateProgress._num_op_codes + 4)
+ 1 << x for x in range(UpdateProgress._num_op_codes, UpdateProgress._num_op_codes + 4)
]
_num_op_codes = UpdateProgress._num_op_codes + 4
@@ -116,9 +115,7 @@ class RootModule(Submodule):
In conjunction with dry_run, it can be useful to anticipate all errors when updating submodules
:return: self"""
if self.repo.bare:
- raise InvalidGitRepositoryError(
- "Cannot update submodules in bare repositories"
- )
+ raise InvalidGitRepositoryError("Cannot update submodules in bare repositories")
# END handle bare
if progress is None:
@@ -149,9 +146,7 @@ class RootModule(Submodule):
previous_commit = repo.commit(previous_commit) # obtain commit object
# END handle previous commit
- psms: "IterableList[Submodule]" = self.list_items(
- repo, parent_commit=previous_commit
- )
+ psms: "IterableList[Submodule]" = self.list_items(repo, parent_commit=previous_commit)
sms: "IterableList[Submodule]" = self.list_items(repo)
spsms = set(psms)
ssms = set(sms)
@@ -186,9 +181,7 @@ class RootModule(Submodule):
if i == len_rrsm - 1:
op |= END
# END handle end
- progress.update(
- op, i, len_rrsm, prefix + "Done removing submodule %r" % rsm.name
- )
+ progress.update(op, i, len_rrsm, prefix + "Done removing submodule %r" % rsm.name)
# END for each removed submodule
# HANDLE PATH RENAMES
@@ -207,9 +200,7 @@ class RootModule(Submodule):
BEGIN | PATHCHANGE,
i,
len_csms,
- prefix
- + "Moving repository of submodule %r from %s to %s"
- % (sm.name, psm.abspath, sm.abspath),
+ prefix + "Moving repository of submodule %r from %s to %s" % (sm.name, psm.abspath, sm.abspath),
)
# move the module to the new path
if not dry_run:
@@ -240,9 +231,7 @@ class RootModule(Submodule):
BEGIN | URLCHANGE,
i,
len_csms,
- prefix
- + "Changing url of submodule %r from %s to %s"
- % (sm.name, psm.url, sm.url),
+ prefix + "Changing url of submodule %r from %s to %s" % (sm.name, psm.url, sm.url),
)
if not dry_run:
@@ -252,16 +241,7 @@ class RootModule(Submodule):
# If we have a tracking branch, it should be available
# in the new remote as well.
- if (
- len(
- [
- r
- for r in smr.refs
- if r.remote_head == sm.branch_name
- ]
- )
- == 0
- ):
+ if len([r for r in smr.refs if r.remote_head == sm.branch_name]) == 0:
raise ValueError(
"Submodule branch named %r was not available in new submodule remote at %r"
% (sm.branch_name, sm.url)
@@ -289,8 +269,7 @@ class RootModule(Submodule):
# Alternatively we could just generate a unique name and leave all
# existing ones in place
raise InvalidGitRepositoryError(
- "Couldn't find original remote-repo at url %r"
- % psm.url
+ "Couldn't find original remote-repo at url %r" % psm.url
)
# END handle one single remote
# END handle check we found a remote
@@ -340,8 +319,7 @@ class RootModule(Submodule):
END | URLCHANGE,
i,
len_csms,
- prefix
- + "Done adjusting url of submodule %r" % (sm.name),
+ prefix + "Done adjusting url of submodule %r" % (sm.name),
)
# END skip remote handling if new url already exists in module
# END handle url
@@ -378,9 +356,7 @@ class RootModule(Submodule):
tbr = git.Head(smm, sm.branch_path)
# END assure tracking branch exists
- tbr.set_tracking_branch(
- find_first_remote_branch(smmr, sm.branch_name)
- )
+ tbr.set_tracking_branch(find_first_remote_branch(smmr, sm.branch_name))
# NOTE: All head-resetting is done in the base implementation of update
# but we will have to checkout the new branch here. As it still points to the currently
# checkout out commit, we don't do any harm.
diff --git a/git/objects/submodule/util.py b/git/objects/submodule/util.py
index 456ae34b..56ce1489 100644
--- a/git/objects/submodule/util.py
+++ b/git/objects/submodule/util.py
@@ -47,9 +47,7 @@ def mkhead(repo: "Repo", path: PathLike) -> "Head":
return git.Head(repo, git.Head.to_full_path(path))
-def find_first_remote_branch(
- remotes: Sequence["Remote"], branch_name: str
-) -> "RemoteReference":
+def find_first_remote_branch(remotes: Sequence["Remote"], branch_name: str) -> "RemoteReference":
"""Find the remote branch matching the name of the given branch or raise InvalidGitRepositoryError"""
for remote in remotes:
try:
@@ -58,9 +56,7 @@ def find_first_remote_branch(
continue
# END exception handling
# END for remote
- raise InvalidGitRepositoryError(
- "Didn't find remote branch '%r' in any of the given remotes" % branch_name
- )
+ raise InvalidGitRepositoryError("Didn't find remote branch '%r' in any of the given remotes" % branch_name)
# } END utilities
diff --git a/git/objects/tree.py b/git/objects/tree.py
index e1fcced7..b72e88c4 100644
--- a/git/objects/tree.py
+++ b/git/objects/tree.py
@@ -40,9 +40,7 @@ if TYPE_CHECKING:
TreeCacheTup = Tuple[bytes, int, str]
-TraversedTreeTup = Union[
- Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]
-]
+TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]]
# def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[TreeCacheTup]:
@@ -69,9 +67,7 @@ def git_cmp(t1: TreeCacheTup, t2: TreeCacheTup) -> int:
return len_a - len_b
-def merge_sort(
- a: List[TreeCacheTup], cmp: Callable[[TreeCacheTup, TreeCacheTup], int]
-) -> None:
+def merge_sort(a: List[TreeCacheTup], cmp: Callable[[TreeCacheTup, TreeCacheTup], int]) -> None:
if len(a) < 2:
return None
@@ -139,9 +135,7 @@ class TreeModifier(object):
# } END interface
# { Mutators
- def add(
- self, sha: bytes, mode: int, name: str, force: bool = False
- ) -> "TreeModifier":
+ def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier":
"""Add the given item to the tree. If an item with the given name already
exists, nothing will be done, but a ValueError will be raised if the
sha and mode of the existing item do not match the one you add, unless
@@ -182,11 +176,7 @@ class TreeModifier(object):
puts the caller into responsibility to assure the input is correct.
For more information on the parameters, see ``add``
:param binsha: 20 byte binary sha"""
- assert (
- isinstance(binsha, bytes)
- and isinstance(mode, int)
- and isinstance(name, str)
- )
+ assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str)
tree_cache = (binsha, mode, name)
self._cache.append(tree_cache)
@@ -256,9 +246,7 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
super(Tree, self)._set_cache_(attr)
# END handle attribute
- def _iter_convert_to_object(
- self, iterable: Iterable[TreeCacheTup]
- ) -> Iterator[IndexObjUnion]:
+ def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]:
"""Iterable yields tuples of (binsha, mode, name), which will be converted
to the respective object representation"""
for binsha, mode, name in iterable:
@@ -266,9 +254,7 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
try:
yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path)
except KeyError as e:
- raise TypeError(
- "Unknown mode %o found in tree data for path '%s'" % (mode, path)
- ) from e
+ raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e
# END for each item
def join(self, file: str) -> IndexObjUnion:
@@ -330,12 +316,8 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
def traverse(
self, # type: ignore[override]
- predicate: Callable[
- [Union[IndexObjUnion, TraversedTreeTup], int], bool
- ] = lambda i, d: True,
- prune: Callable[
- [Union[IndexObjUnion, TraversedTreeTup], int], bool
- ] = lambda i, d: False,
+ predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True,
+ prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False,
depth: int = -1,
branch_first: bool = True,
visit_once: bool = False,
@@ -389,9 +371,7 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion:
if isinstance(item, int):
info = self._cache[item]
- return self._map_id_to_type[info[1] >> 12](
- self.repo, info[0], info[1], join_path(self.path, info[2])
- )
+ return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2]))
if isinstance(item, str):
# compatibility
diff --git a/git/objects/util.py b/git/objects/util.py
index 4ba59c8a..fad00001 100644
--- a/git/objects/util.py
+++ b/git/objects/util.py
@@ -60,9 +60,7 @@ class TraverseNT(NamedTuple):
src: Union["Traversable", None]
-T_TIobj = TypeVar(
- "T_TIobj", bound="TraversableIterableObj"
-) # for TraversableIterableObj.traverse()
+T_TIobj = TypeVar("T_TIobj", bound="TraversableIterableObj") # for TraversableIterableObj.traverse()
TraversedTup = Union[
Tuple[Union["Traversable", None], "Traversable"], # for commit, submodule
@@ -133,9 +131,7 @@ def get_object_type_by_name(
return tree.Tree
else:
- raise ValueError(
- "Cannot handle unknown object type: %s" % object_type_name.decode()
- )
+ raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode())
def utctz_to_altz(utctz: str) -> int:
@@ -164,12 +160,7 @@ def verify_utctz(offset: str) -> str:
raise fmt_exc
if offset[0] not in "+-":
raise fmt_exc
- if (
- offset[1] not in digits
- or offset[2] not in digits
- or offset[3] not in digits
- or offset[4] not in digits
- ):
+ if offset[1] not in digits or offset[2] not in digits or offset[3] not in digits or offset[4] not in digits:
raise fmt_exc
# END for each char
return offset
@@ -222,15 +213,11 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]:
"""
if isinstance(string_date, datetime):
if string_date.tzinfo:
- utcoffset = cast(
- timedelta, string_date.utcoffset()
- ) # typeguard, if tzinfoand is not None
+ utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None
offset = -int(utcoffset.total_seconds())
return int(string_date.astimezone(utc).timestamp()), offset
else:
- raise ValueError(
- f"string_date datetime object without tzinfo, {string_date}"
- )
+ raise ValueError(f"string_date datetime object without tzinfo, {string_date}")
# git time
try:
@@ -302,9 +289,7 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]:
raise ValueError("no format matched")
# END handle format
except Exception as e:
- raise ValueError(
- f"Unsupported date format or type: {string_date}, type={type(string_date)}"
- ) from e
+ raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e
# END handle exceptions
@@ -411,9 +396,7 @@ class Traversable(Protocol):
# could add _id_attribute_ to Traversable, or make all Traversable also Iterable?
if not as_edge:
- out: IterableList[
- Union["Commit", "Submodule", "Tree", "Blob"]
- ] = IterableList(id)
+ out: IterableList[Union["Commit", "Submodule", "Tree", "Blob"]] = IterableList(id)
out.extend(self.traverse(as_edge=as_edge, *args, **kwargs))
return out
# overloads in subclasses (mypy doesn't allow typing self: subclass)
@@ -437,12 +420,8 @@ class Traversable(Protocol):
def _traverse(
self,
- predicate: Callable[
- [Union["Traversable", "Blob", TraversedTup], int], bool
- ] = lambda i, d: True,
- prune: Callable[
- [Union["Traversable", "Blob", TraversedTup], int], bool
- ] = lambda i, d: False,
+ predicate: Callable[[Union["Traversable", "Blob", TraversedTup], int], bool] = lambda i, d: True,
+ prune: Callable[[Union["Traversable", "Blob", TraversedTup], int], bool] = lambda i, d: False,
depth: int = -1,
branch_first: bool = True,
visit_once: bool = True,
@@ -506,10 +485,7 @@ class Traversable(Protocol):
if branch_first:
stack.extendleft(TraverseNT(depth, i, src_item) for i in lst)
else:
- reviter = (
- TraverseNT(depth, lst[i], src_item)
- for i in range(len(lst) - 1, -1, -1)
- )
+ reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1))
stack.extend(reviter)
# END addToStack local method
@@ -524,9 +500,7 @@ class Traversable(Protocol):
visited.add(item)
rval: Union[TraversedTup, "Traversable", "Blob"]
- if (
- as_edge
- ): # if as_edge return (src, item) unless rrc is None (e.g. for first item)
+ if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item)
rval = (src, item)
else:
rval = item
@@ -575,9 +549,7 @@ class TraversableIterableObj(IterableObj, Traversable):
TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj]
- def list_traverse(
- self: T_TIobj, *args: Any, **kwargs: Any
- ) -> IterableList[T_TIobj]:
+ def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]:
return super(TraversableIterableObj, self)._list_traverse(*args, **kwargs)
@overload # type: ignore
@@ -587,12 +559,8 @@ class TraversableIterableObj(IterableObj, Traversable):
@overload
def traverse(
self: T_TIobj,
- predicate: Callable[
- [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool
- ],
- prune: Callable[
- [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool
- ],
+ predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
+ prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
depth: int,
branch_first: bool,
visit_once: bool,
@@ -604,12 +572,8 @@ class TraversableIterableObj(IterableObj, Traversable):
@overload
def traverse(
self: T_TIobj,
- predicate: Callable[
- [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool
- ],
- prune: Callable[
- [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool
- ],
+ predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
+ prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
depth: int,
branch_first: bool,
visit_once: bool,
@@ -633,18 +597,14 @@ class TraversableIterableObj(IterableObj, Traversable):
def traverse(
self: T_TIobj,
- predicate: Callable[
- [Union[T_TIobj, TIobj_tuple], int], bool
- ] = lambda i, d: True,
+ predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: True,
prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: False,
depth: int = -1,
branch_first: bool = True,
visit_once: bool = True,
ignore_self: int = 1,
as_edge: bool = False,
- ) -> Union[
- Iterator[T_TIobj], Iterator[Tuple[T_TIobj, T_TIobj]], Iterator[TIobj_tuple]
- ]:
+ ) -> Union[Iterator[T_TIobj], Iterator[Tuple[T_TIobj, T_TIobj]], Iterator[TIobj_tuple]]:
"""For documentation, see util.Traversable._traverse()"""
"""
diff --git a/git/refs/head.py b/git/refs/head.py
index befdc135..26efc6cb 100644
--- a/git/refs/head.py
+++ b/git/refs/head.py
@@ -38,9 +38,7 @@ class HEAD(SymbolicReference):
def __init__(self, repo: "Repo", path: PathLike = _HEAD_NAME):
if path != self._HEAD_NAME:
- raise ValueError(
- "HEAD instance must point to %r, got %r" % (self._HEAD_NAME, path)
- )
+ raise ValueError("HEAD instance must point to %r, got %r" % (self._HEAD_NAME, path))
super(HEAD, self).__init__(repo, path)
self.commit: "Commit"
@@ -56,7 +54,7 @@ class HEAD(SymbolicReference):
index: bool = True,
working_tree: bool = False,
paths: Union[PathLike, Sequence[PathLike], None] = None,
- **kwargs: Any
+ **kwargs: Any,
) -> "HEAD":
"""Reset our HEAD to the given commit optionally synchronizing
the index and working tree. The reference we refer to will be set to
@@ -98,9 +96,7 @@ class HEAD(SymbolicReference):
if working_tree:
mode = "--hard"
if not index:
- raise ValueError(
- "Cannot reset the working tree if the index is not reset as well"
- )
+ raise ValueError("Cannot reset the working tree if the index is not reset as well")
# END working tree handling
@@ -140,13 +136,7 @@ class Head(Reference):
k_config_remote_ref = "merge" # branch to merge from remote
@classmethod
- def delete(
- cls,
- repo: "Repo",
- *heads: "Union[Head, str]",
- force: bool = False,
- **kwargs: Any
- ) -> None:
+ def delete(cls, repo: "Repo", *heads: "Union[Head, str]", force: bool = False, **kwargs: Any) -> None:
"""Delete the given heads
:param force:
@@ -158,9 +148,7 @@ class Head(Reference):
flag = "-D"
repo.git.branch(flag, *heads)
- def set_tracking_branch(
- self, remote_reference: Union["RemoteReference", None]
- ) -> "Head":
+ def set_tracking_branch(self, remote_reference: Union["RemoteReference", None]) -> "Head":
"""
Configure this branch to track the given remote reference. This will alter
this branch's configuration accordingly.
@@ -170,9 +158,7 @@ class Head(Reference):
:return: self"""
from .remote import RemoteReference
- if remote_reference is not None and not isinstance(
- remote_reference, RemoteReference
- ):
+ if remote_reference is not None and not isinstance(remote_reference, RemoteReference):
raise ValueError("Incorrect parameter type: %r" % remote_reference)
# END handle type
@@ -198,18 +184,12 @@ class Head(Reference):
from .remote import RemoteReference
reader = self.config_reader()
- if reader.has_option(self.k_config_remote) and reader.has_option(
- self.k_config_remote_ref
- ):
+ if reader.has_option(self.k_config_remote) and reader.has_option(self.k_config_remote_ref):
ref = Head(
self.repo,
- Head.to_full_path(
- strip_quotes(reader.get_value(self.k_config_remote_ref))
- ),
- )
- remote_refpath = RemoteReference.to_full_path(
- join_path(reader.get_value(self.k_config_remote), ref.name)
+ Head.to_full_path(strip_quotes(reader.get_value(self.k_config_remote_ref))),
)
+ remote_refpath = RemoteReference.to_full_path(join_path(reader.get_value(self.k_config_remote), ref.name))
return RemoteReference(self.repo, remote_refpath)
# END handle have tracking branch
diff --git a/git/refs/log.py b/git/refs/log.py
index 908f93d1..a5f4de58 100644
--- a/git/refs/log.py
+++ b/git/refs/log.py
@@ -118,10 +118,7 @@ class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):
elif len(fields) == 2:
info, msg = fields
else:
- raise ValueError(
- "Line must have up to two TAB-separated fields."
- " Got %s" % repr(line_str)
- )
+ raise ValueError("Line must have up to two TAB-separated fields." " Got %s" % repr(line_str))
# END handle first split
oldhexsha = info[:40]
@@ -247,9 +244,7 @@ class RefLog(List[RefLogEntry], Serializable):
for i in range(index + 1):
line = fp.readline()
if not line:
- raise IndexError(
- f"Index file ended at line {i+1}, before given index was reached"
- )
+ raise IndexError(f"Index file ended at line {i+1}, before given index was reached")
# END abort on eof
# END handle runup
@@ -304,9 +299,7 @@ class RefLog(List[RefLogEntry], Serializable):
assure_directory_exists(filepath, is_file=True)
first_line = message.split("\n")[0]
if isinstance(config_reader, Actor):
- committer = (
- config_reader # mypy thinks this is Actor | Gitconfigparser, but why?
- )
+ committer = config_reader # mypy thinks this is Actor | Gitconfigparser, but why?
else:
committer = Actor.committer(config_reader)
entry = RefLogEntry(
@@ -335,9 +328,7 @@ class RefLog(List[RefLogEntry], Serializable):
"""Write this instance's data to the file we are originating from
:return: self"""
if self._path is None:
- raise ValueError(
- "Instance was not initialized with a path, use to_file(...) instead"
- )
+ raise ValueError("Instance was not initialized with a path, use to_file(...) instead")
# END assert path
self.to_file(self._path)
return self
diff --git a/git/refs/reference.py b/git/refs/reference.py
index 9b946ec4..ca43cc43 100644
--- a/git/refs/reference.py
+++ b/git/refs/reference.py
@@ -26,9 +26,7 @@ def require_remote_ref_path(func: Callable[..., _T]) -> Callable[..., _T]:
def wrapper(self: T_References, *args: Any) -> _T:
if not self.is_remote():
- raise ValueError(
- "ref path does not point to a remote reference: %s" % self.path
- )
+ raise ValueError("ref path does not point to a remote reference: %s" % self.path)
return func(self, *args)
# END wrapper
@@ -59,9 +57,7 @@ class Reference(SymbolicReference, LazyMixin, IterableObj):
:param check_path: if False, you can provide any path. Otherwise the path must start with the
default path prefix of this type."""
if check_path and not str(path).startswith(self._common_path_default + "/"):
- raise ValueError(
- f"Cannot instantiate {self.__class__.__name__!r} from path {path}"
- )
+ raise ValueError(f"Cannot instantiate {self.__class__.__name__!r} from path {path}")
self.path: str # SymbolicReference converts to string atm
super(Reference, self).__init__(repo, path)
diff --git a/git/refs/remote.py b/git/refs/remote.py
index 8ac6bcd2..ec10c5a1 100644
--- a/git/refs/remote.py
+++ b/git/refs/remote.py
@@ -33,7 +33,7 @@ class RemoteReference(Head):
common_path: Union[PathLike, None] = None,
remote: Union["Remote", None] = None,
*args: Any,
- **kwargs: Any
+ **kwargs: Any,
) -> Iterator["RemoteReference"]:
"""Iterate remote references, and if given, constrain them to the given remote"""
common_path = common_path or cls._common_path_default
@@ -48,9 +48,7 @@ class RemoteReference(Head):
# tightening the types of arguments in subclasses and recommends Any or
# "type: ignore". (See https://github.com/python/typing/issues/241)
@classmethod
- def delete(
- cls, repo: "Repo", *refs: "RemoteReference", **kwargs: Any # type: ignore
- ) -> None:
+ def delete(cls, repo: "Repo", *refs: "RemoteReference", **kwargs: Any) -> None: # type: ignore
"""Delete the given remote references
:note:
diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py
index 6d9ebb96..33c3bf15 100644
--- a/git/refs/symbolic.py
+++ b/git/refs/symbolic.py
@@ -127,13 +127,8 @@ class SymbolicReference(object):
# I looked at master on 2017-10-11,
# commit 111ef79afe, after tag v2.15.0-rc1
# from repo https://github.com/git/git.git
- if (
- line.startswith("# pack-refs with:")
- and "peeled" not in line
- ):
- raise TypeError(
- "PackingType of packed-Refs not understood: %r" % line
- )
+ if line.startswith("# pack-refs with:") and "peeled" not in line:
+ raise TypeError("PackingType of packed-Refs not understood: %r" % line)
# END abort if we do not understand the packing scheme
continue
# END parse comment
@@ -154,9 +149,7 @@ class SymbolicReference(object):
# alright.
@classmethod
- def dereference_recursive(
- cls, repo: "Repo", ref_path: Union[PathLike, None]
- ) -> str:
+ def dereference_recursive(cls, repo: "Repo", ref_path: Union[PathLike, None]) -> str:
"""
:return: hexsha stored in the reference at the given ref_path, recursively dereferencing all
intermediate references as required
@@ -178,9 +171,7 @@ class SymbolicReference(object):
tokens: Union[None, List[str], Tuple[str, str]] = None
repodir = _git_dir(repo, ref_path)
try:
- with open(
- os.path.join(repodir, str(ref_path)), "rt", encoding="UTF-8"
- ) as fp:
+ with open(os.path.join(repodir, str(ref_path)), "rt", encoding="UTF-8") as fp:
value = fp.read().rstrip()
# Don't only split on spaces, but on whitespace, which allows to parse lines like
# 60b64ef992065e2600bfef6187a97f92398a9144 branch 'master' of git-server:/path/to/repo
@@ -212,9 +203,7 @@ class SymbolicReference(object):
raise ValueError("Failed to parse reference information from %r" % ref_path)
@classmethod
- def _get_ref_info(
- cls, repo: "Repo", ref_path: Union[PathLike, None]
- ) -> Union[Tuple[str, None], Tuple[None, str]]:
+ def _get_ref_info(cls, repo: "Repo", ref_path: Union[PathLike, None]) -> Union[Tuple[str, None], Tuple[None, str]]:
"""Return: (str(sha), str(target_ref_path)) if available, the sha the file at
rela_path points to, or None. target_ref_path is the reference we
point to, or None"""
@@ -227,9 +216,7 @@ class SymbolicReference(object):
always point to the actual object as it gets re-created on each query"""
# have to be dynamic here as we may be a tag which can point to anything
# Our path will be resolved to the hexsha which will be used accordingly
- return Object.new_from_sha(
- self.repo, hex_to_bin(self.dereference_recursive(self.repo, self.path))
- )
+ return Object.new_from_sha(self.repo, hex_to_bin(self.dereference_recursive(self.repo, self.path)))
def _get_commit(self) -> "Commit":
"""
@@ -242,9 +229,7 @@ class SymbolicReference(object):
# END dereference tag
if obj.type != Commit.type:
- raise TypeError(
- "Symbolic Reference pointed to object %r, commit was required" % obj
- )
+ raise TypeError("Symbolic Reference pointed to object %r, commit was required" % obj)
# END handle type
return obj
@@ -321,9 +306,7 @@ class SymbolicReference(object):
to a reference, but to a commit"""
sha, target_ref_path = self._get_ref_info(self.repo, self.path)
if target_ref_path is None:
- raise TypeError(
- "%s is a detached symbolic reference as it points to %r" % (self, sha)
- )
+ raise TypeError("%s is a detached symbolic reference as it points to %r" % (self, sha))
return self.from_path(self.repo, target_ref_path)
def set_reference(
@@ -454,9 +437,7 @@ class SymbolicReference(object):
# correct to allow overriding the committer on a per-commit level.
# See https://github.com/gitpython-developers/GitPython/pull/146
try:
- committer_or_reader: Union[
- "Actor", "GitConfigParser"
- ] = self.commit.committer
+ committer_or_reader: Union["Actor", "GitConfigParser"] = self.commit.committer
except ValueError:
committer_or_reader = self.repo.config_reader()
# end handle newly cloned repositories
@@ -466,9 +447,7 @@ class SymbolicReference(object):
if message is None:
message = ""
- return RefLog.append_entry(
- committer_or_reader, RefLog.path(self), oldbinsha, newbinsha, message
- )
+ return RefLog.append_entry(committer_or_reader, RefLog.path(self), oldbinsha, newbinsha, message)
def log_entry(self, index: int) -> "RefLogEntry":
""":return: RefLogEntry at the given index
@@ -525,9 +504,7 @@ class SymbolicReference(object):
# If we deleted the last line and this one is a tag-reference object,
# we drop it as well
if (line.startswith("#") or full_ref_path != line_ref) and (
- not dropped_last_line
- or dropped_last_line
- and not line.startswith("^")
+ not dropped_last_line or dropped_last_line and not line.startswith("^")
):
new_lines.append(line)
dropped_last_line = False
@@ -635,9 +612,7 @@ class SymbolicReference(object):
already exists.
:note: This does not alter the current HEAD, index or Working Tree"""
- return cls._create(
- repo, path, cls._resolve_ref_on_create, reference, force, logmsg
- )
+ return cls._create(repo, path, cls._resolve_ref_on_create, reference, force, logmsg)
def rename(self, new_path: PathLike, force: bool = False) -> "SymbolicReference":
"""Rename self to a new path
@@ -694,9 +669,7 @@ class SymbolicReference(object):
# walk loose refs
# Currently we do not follow links
- for root, dirs, files in os.walk(
- join_path_native(repo.common_dir, common_path)
- ):
+ for root, dirs, files in os.walk(join_path_native(repo.common_dir, common_path)):
if "refs" not in root.split(os.sep): # skip non-refs subfolders
refs_id = [d for d in dirs if d == "refs"]
if refs_id:
@@ -707,9 +680,7 @@ class SymbolicReference(object):
if f == "packed-refs":
continue
abs_path = to_native_path_linux(join_path(root, f))
- rela_paths.add(
- abs_path.replace(to_native_path_linux(repo.common_dir) + "/", "")
- )
+ rela_paths.add(abs_path.replace(to_native_path_linux(repo.common_dir) + "/", ""))
# END for each file in root directory
# END for each directory to walk
@@ -752,16 +723,10 @@ class SymbolicReference(object):
List is lexicographically sorted
The returned objects represent actual subclasses, such as Head or TagReference"""
- return (
- r
- for r in cls._iter_items(repo, common_path)
- if r.__class__ == SymbolicReference or not r.is_detached
- )
+ return (r for r in cls._iter_items(repo, common_path) if r.__class__ == SymbolicReference or not r.is_detached)
@classmethod
- def from_path(
- cls: Type[T_References], repo: "Repo", path: PathLike
- ) -> T_References:
+ def from_path(cls: Type[T_References], repo: "Repo", path: PathLike) -> T_References:
"""
:param path: full .git-directory-relative path name to the Reference to instantiate
:note: use to_full_path() if you only have a partial path of a known Reference Type
@@ -795,9 +760,7 @@ class SymbolicReference(object):
pass
# END exception handling
# END for each type to try
- raise ValueError(
- "Could not find reference type suitable to handle path %r" % path
- )
+ raise ValueError("Could not find reference type suitable to handle path %r" % path)
def is_remote(self) -> bool:
""":return: True if this symbolic reference points to a remote branch"""
diff --git a/git/refs/tag.py b/git/refs/tag.py
index 96494148..0295b54d 100644
--- a/git/refs/tag.py
+++ b/git/refs/tag.py
@@ -81,7 +81,7 @@ class TagReference(Reference):
reference: Union[str, "SymbolicReference"] = "HEAD",
logmsg: Union[str, None] = None,
force: bool = False,
- **kwargs: Any
+ **kwargs: Any,
) -> "TagReference":
"""Create a new tag reference.
diff --git a/git/remote.py b/git/remote.py
index 8cd79057..7b44020c 100644
--- a/git/remote.py
+++ b/git/remote.py
@@ -77,9 +77,7 @@ __all__ = ("RemoteProgress", "PushInfo", "FetchInfo", "Remote")
def add_progress(
kwargs: Any,
git: Git,
- progress: Union[
- RemoteProgress, "UpdateProgress", Callable[..., RemoteProgress], None
- ],
+ progress: Union[RemoteProgress, "UpdateProgress", Callable[..., RemoteProgress], None],
) -> Any:
"""Add the --progress flag to the given kwargs dict if supported by the
git command. If the actual progress in the given progress instance is not
@@ -200,11 +198,7 @@ class PushInfo(IterableObj, object):
@property
def old_commit(self) -> Union[str, SymbolicReference, Commit_ish, None]:
- return (
- self._old_commit_sha
- and self._remote.repo.commit(self._old_commit_sha)
- or None
- )
+ return self._old_commit_sha and self._remote.repo.commit(self._old_commit_sha) or None
@property
def remote_ref(self) -> Union[RemoteReference, TagReference]:
@@ -236,10 +230,7 @@ class PushInfo(IterableObj, object):
try:
flags |= cls._flag_map[control_character]
except KeyError as e:
- raise ValueError(
- "Control character %r unknown as parsed from line %r"
- % (control_character, line)
- ) from e
+ raise ValueError("Control character %r unknown as parsed from line %r" % (control_character, line)) from e
# END handle control character
# from_to handling
@@ -282,9 +273,7 @@ class PushInfo(IterableObj, object):
return PushInfo(flags, from_ref, to_ref_string, remote, old_commit, summary)
@classmethod
- def iter_items(
- cls, repo: "Repo", *args: Any, **kwargs: Any
- ) -> NoReturn: # -> Iterator['PushInfo']:
+ def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> NoReturn: # -> Iterator['PushInfo']:
raise NotImplementedError
@@ -335,9 +324,7 @@ class FetchInfo(IterableObj, object):
ERROR,
) = [1 << x for x in range(8)]
- _re_fetch_result = re.compile(
- r"^\s*(.) (\[[\w\s\.$@]+\]|[\w\.$@]+)\s+(.+) -> ([^\s]+)( \(.*\)?$)?"
- )
+ _re_fetch_result = re.compile(r"^\s*(.) (\[[\w\s\.$@]+\]|[\w\.$@]+)\s+(.+) -> ([^\s]+)( \(.*\)?$)?")
_flag_map: Dict[flagKeyLiteral, int] = {
"!": ERROR,
@@ -446,10 +433,7 @@ class FetchInfo(IterableObj, object):
try:
flags |= cls._flag_map[control_character]
except KeyError as e:
- raise ValueError(
- "Control character %r unknown as parsed from line %r"
- % (control_character, line)
- ) from e
+ raise ValueError("Control character %r unknown as parsed from line %r" % (control_character, line)) from e
# END control char exception handling
# parse operation string for more info - makes no sense for symbolic refs, but we parse it anyway
@@ -512,23 +496,16 @@ class FetchInfo(IterableObj, object):
# always use actual type if we get absolute paths
# Will always be the case if something is fetched outside of refs/remotes (if its not a tag)
ref_path = remote_local_ref_str
- if (
- ref_type is not TagReference
- and not remote_local_ref_str.startswith(
- RemoteReference._common_path_default + "/"
- )
+ if ref_type is not TagReference and not remote_local_ref_str.startswith(
+ RemoteReference._common_path_default + "/"
):
ref_type = Reference
# END downgrade remote reference
elif ref_type is TagReference and "tags/" in remote_local_ref_str:
# even though its a tag, it is located in refs/remotes
- ref_path = join_path(
- RemoteReference._common_path_default, remote_local_ref_str
- )
+ ref_path = join_path(RemoteReference._common_path_default, remote_local_ref_str)
else:
- ref_path = join_path(
- ref_type._common_path_default, remote_local_ref_str
- )
+ ref_path = join_path(ref_type._common_path_default, remote_local_ref_str)
# END obtain refpath
# even though the path could be within the git conventions, we make
@@ -541,9 +518,7 @@ class FetchInfo(IterableObj, object):
return cls(remote_local_ref, flags, note, old_commit, local_remote_ref)
@classmethod
- def iter_items(
- cls, repo: "Repo", *args: Any, **kwargs: Any
- ) -> NoReturn: # -> Iterator['FetchInfo']:
+ def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> NoReturn: # -> Iterator['FetchInfo']:
raise NotImplementedError
@@ -590,9 +565,7 @@ class Remote(LazyMixin, IterableObj):
if attr == "_config_reader":
# NOTE: This is cached as __getattr__ is overridden to return remote config values implicitly, such as
# in print(r.pushurl)
- self._config_reader = SectionConstraint(
- self.repo.config_reader("repository"), self._config_section_name()
- )
+ self._config_reader = SectionConstraint(self.repo.config_reader("repository"), self._config_section_name())
else:
super(Remote, self)._set_cache_(attr)
@@ -638,9 +611,7 @@ class Remote(LazyMixin, IterableObj):
yield Remote(repo, section[lbound + 1 : rbound])
# END for each configuration section
- def set_url(
- self, new_url: str, old_url: Optional[str] = None, **kwargs: Any
- ) -> "Remote":
+ def set_url(self, new_url: str, old_url: Optional[str] = None, **kwargs: Any) -> "Remote":
"""Configure URLs on current remote (cf command git remote set_url)
This command manages URLs on the remote.
@@ -701,14 +672,9 @@ class Remote(LazyMixin, IterableObj):
if " Push URL:" in line:
yield line.split(": ")[-1]
except GitCommandError as _ex:
- if any(
- msg in str(_ex)
- for msg in ["correct access rights", "cannot run ssh"]
- ):
+ if any(msg in str(_ex) for msg in ["correct access rights", "cannot run ssh"]):
# If ssh is not setup to access this repository, see issue 694
- remote_details = self.repo.git.config(
- "--get-all", "remote.%s.url" % self.name
- )
+ remote_details = self.repo.git.config("--get-all", "remote.%s.url" % self.name)
assert isinstance(remote_details, str)
for line in remote_details.split("\n"):
yield line
@@ -724,9 +690,7 @@ class Remote(LazyMixin, IterableObj):
IterableList of RemoteReference objects. It is prefixed, allowing
you to omit the remote path portion, i.e.::
remote.refs.master # yields RemoteReference('/refs/remotes/origin/master')"""
- out_refs: IterableList[RemoteReference] = IterableList(
- RemoteReference._id_attribute_, "%s/" % self.name
- )
+ out_refs: IterableList[RemoteReference] = IterableList(RemoteReference._id_attribute_, "%s/" % self.name)
out_refs.extend(RemoteReference.list_items(self.repo, remote=self.name))
return out_refs
@@ -746,9 +710,7 @@ class Remote(LazyMixin, IterableObj):
as well. This is a fix for the issue described here:
https://github.com/gitpython-developers/GitPython/issues/260
"""
- out_refs: IterableList[Reference] = IterableList(
- RemoteReference._id_attribute_, "%s/" % self.name
- )
+ out_refs: IterableList[Reference] = IterableList(RemoteReference._id_attribute_, "%s/" % self.name)
for line in self.repo.git.remote("prune", "--dry-run", self).splitlines()[2:]:
# expecting
# * [would prune] origin/new_branch
@@ -959,7 +921,7 @@ class Remote(LazyMixin, IterableObj):
progress: Union[RemoteProgress, None, "UpdateProgress"] = None,
verbose: bool = True,
kill_after_timeout: Union[None, float] = None,
- **kwargs: Any
+ **kwargs: Any,
) -> IterableList[FetchInfo]:
"""Fetch the latest changes for this remote
@@ -1002,17 +964,9 @@ class Remote(LazyMixin, IterableObj):
args = [refspec]
proc = self.repo.git.fetch(
- self,
- *args,
- as_process=True,
- with_stdout=False,
- universal_newlines=True,
- v=verbose,
- **kwargs
- )
- res = self._get_fetch_info_from_stderr(
- proc, progress, kill_after_timeout=kill_after_timeout
+ self, *args, as_process=True, with_stdout=False, universal_newlines=True, v=verbose, **kwargs
)
+ res = self._get_fetch_info_from_stderr(proc, progress, kill_after_timeout=kill_after_timeout)
if hasattr(self.repo.odb, "update_cache"):
self.repo.odb.update_cache()
return res
@@ -1022,7 +976,7 @@ class Remote(LazyMixin, IterableObj):
refspec: Union[str, List[str], None] = None,
progress: Union[RemoteProgress, "UpdateProgress", None] = None,
kill_after_timeout: Union[None, float] = None,
- **kwargs: Any
+ **kwargs: Any,
) -> IterableList[FetchInfo]:
"""Pull changes from the given branch, being the same as a fetch followed
by a merge of branch with your local branch.
@@ -1037,17 +991,9 @@ class Remote(LazyMixin, IterableObj):
self._assert_refspec()
kwargs = add_progress(kwargs, self.repo.git, progress)
proc = self.repo.git.pull(
- self,
- refspec,
- with_stdout=False,
- as_process=True,
- universal_newlines=True,
- v=True,
- **kwargs
- )
- res = self._get_fetch_info_from_stderr(
- proc, progress, kill_after_timeout=kill_after_timeout
+ self, refspec, with_stdout=False, as_process=True, universal_newlines=True, v=True, **kwargs
)
+ res = self._get_fetch_info_from_stderr(proc, progress, kill_after_timeout=kill_after_timeout)
if hasattr(self.repo.odb, "update_cache"):
self.repo.odb.update_cache()
return res
@@ -1055,11 +1001,9 @@ class Remote(LazyMixin, IterableObj):
def push(
self,
refspec: Union[str, List[str], None] = None,
- progress: Union[
- RemoteProgress, "UpdateProgress", Callable[..., RemoteProgress], None
- ] = None,
+ progress: Union[RemoteProgress, "UpdateProgress", Callable[..., RemoteProgress], None] = None,
kill_after_timeout: Union[None, float] = None,
- **kwargs: Any
+ **kwargs: Any,
) -> IterableList[PushInfo]:
"""Push changes from source branch in refspec to target branch in refspec.
@@ -1096,11 +1040,9 @@ class Remote(LazyMixin, IterableObj):
as_process=True,
universal_newlines=True,
kill_after_timeout=kill_after_timeout,
- **kwargs
- )
- return self._get_push_info(
- proc, progress, kill_after_timeout=kill_after_timeout
+ **kwargs,
)
+ return self._get_push_info(proc, progress, kill_after_timeout=kill_after_timeout)
@property
def config_reader(self) -> SectionConstraint[GitConfigParser]:
diff --git a/git/repo/base.py b/git/repo/base.py
index 356a8f2f..111a350e 100644
--- a/git/repo/base.py
+++ b/git/repo/base.py
@@ -115,9 +115,7 @@ class Repo(object):
DAEMON_EXPORT_FILE = "git-daemon-export-ok"
- git = cast(
- "Git", None
- ) # Must exist, or __del__ will fail in case we raise on `__init__()`
+ git = cast("Git", None) # Must exist, or __del__ will fail in case we raise on `__init__()`
working_dir: Optional[PathLike] = None
_working_tree_dir: Optional[PathLike] = None
git_dir: PathLike = ""
@@ -251,9 +249,7 @@ class Repo(object):
pass
try:
- common_dir = (
- open(osp.join(self.git_dir, "commondir"), "rt").readlines()[0].strip()
- )
+ common_dir = open(osp.join(self.git_dir, "commondir"), "rt").readlines()[0].strip()
self._common_dir = osp.join(self.git_dir, common_dir)
except OSError:
self._common_dir = ""
@@ -325,9 +321,7 @@ class Repo(object):
with open(filename, "wb") as fp:
fp.write((descr + "\n").encode(defenc))
- description = property(
- _get_description, _set_description, doc="the project's description"
- )
+ description = property(_get_description, _set_description, doc="the project's description")
del _get_description
del _set_description
@@ -522,9 +516,7 @@ class Repo(object):
if config_level == "system":
return "/etc/gitconfig"
elif config_level == "user":
- config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(
- os.environ.get("HOME", "~"), ".config"
- )
+ config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", "~"), ".config")
return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config")))
elif config_level == "global":
return osp.normpath(osp.expanduser("~/.gitconfig"))
@@ -569,9 +561,7 @@ class Repo(object):
files = [self._get_config_path(config_level)]
return GitConfigParser(files, read_only=True, repo=self)
- def config_writer(
- self, config_level: Lit_config_levels = "repository"
- ) -> GitConfigParser:
+ def config_writer(self, config_level: Lit_config_levels = "repository") -> GitConfigParser:
"""
:return:
GitConfigParser allowing to write values of the specified configuration file level.
@@ -584,9 +574,7 @@ class Repo(object):
system = system wide configuration file
global = user level configuration file
repository = configuration file for this repository only"""
- return GitConfigParser(
- self._get_config_path(config_level), read_only=False, repo=self
- )
+ return GitConfigParser(self._get_config_path(config_level), read_only=False, repo=self)
def commit(self, rev: Union[str, Commit_ish, None] = None) -> Commit:
"""The Commit object for the specified revision
@@ -801,9 +789,7 @@ class Repo(object):
default_args.extend(["--", str(path)])
if index:
# diff index against HEAD
- if osp.isfile(self.index.path) and len(
- self.git.diff("--cached", *default_args)
- ):
+ if osp.isfile(self.index.path) and len(self.git.diff("--cached", *default_args)):
return True
# END index handling
if working_tree:
@@ -835,9 +821,7 @@ class Repo(object):
def _get_untracked_files(self, *args: Any, **kwargs: Any) -> List[str]:
# make sure we get all files, not only untracked directories
- proc = self.git.status(
- *args, porcelain=True, untracked_files=True, as_process=True, **kwargs
- )
+ proc = self.git.status(*args, porcelain=True, untracked_files=True, as_process=True, **kwargs)
# Untracked files prefix in porcelain mode
prefix = "?? "
untracked_files = []
@@ -850,12 +834,7 @@ class Repo(object):
if filename[0] == filename[-1] == '"':
filename = filename[1:-1]
# WHATEVER ... it's a mess, but works for me
- filename = (
- filename.encode("ascii")
- .decode("unicode_escape")
- .encode("latin1")
- .decode(defenc)
- )
+ filename = filename.encode("ascii").decode("unicode_escape").encode("latin1").decode(defenc)
untracked_files.append(filename)
finalize_process(proc)
return untracked_files
@@ -880,9 +859,7 @@ class Repo(object):
# reveal_type(self.head.reference) # => Reference
return self.head.reference
- def blame_incremental(
- self, rev: str | HEAD, file: str, **kwargs: Any
- ) -> Iterator["BlameEntry"]:
+ def blame_incremental(self, rev: str | HEAD, file: str, **kwargs: Any) -> Iterator["BlameEntry"]:
"""Iterator for blame information for the given file at the given revision.
Unlike .blame(), this does not return the actual file's contents, only
@@ -897,17 +874,13 @@ class Repo(object):
should get a continuous range spanning all line numbers in the file.
"""
- data: bytes = self.git.blame(
- rev, "--", file, p=True, incremental=True, stdout_as_string=False, **kwargs
- )
+ data: bytes = self.git.blame(rev, "--", file, p=True, incremental=True, stdout_as_string=False, **kwargs)
commits: Dict[bytes, Commit] = {}
stream = (line for line in data.split(b"\n") if line)
while True:
try:
- line = next(
- stream
- ) # when exhausted, causes a StopIteration, terminating this function
+ line = next(stream) # when exhausted, causes a StopIteration, terminating this function
except StopIteration:
return
split_line = line.split()
@@ -956,9 +929,7 @@ class Repo(object):
# guaranteed to be the last line
while True:
try:
- line = next(
- stream
- ) # will fail if we reach the EOF unexpectedly
+ line = next(stream) # will fail if we reach the EOF unexpectedly
except StopIteration:
return
tag, value = line.split(b" ", 1)
@@ -987,9 +958,7 @@ class Repo(object):
if incremental:
return self.blame_incremental(rev, file, **kwargs)
- data: bytes = self.git.blame(
- rev, "--", file, p=True, stdout_as_string=False, **kwargs
- )
+ data: bytes = self.git.blame(rev, "--", file, p=True, stdout_as_string=False, **kwargs)
commits: Dict[str, Commit] = {}
blames: List[List[Commit | List[str | bytes] | None]] = []
@@ -1083,13 +1052,9 @@ class Repo(object):
c = Commit(
self,
hex_to_bin(sha),
- author=Actor._from_string(
- f"{info['author']} {info['author_email']}"
- ),
+ author=Actor._from_string(f"{info['author']} {info['author_email']}"),
authored_date=info["author_date"],
- committer=Actor._from_string(
- f"{info['committer']} {info['committer_email']}"
- ),
+ committer=Actor._from_string(f"{info['committer']} {info['committer_email']}"),
committed_date=info["committer_date"],
)
commits[sha] = c
@@ -1169,9 +1134,7 @@ class Repo(object):
url: PathLike,
path: PathLike,
odb_default_type: Type[GitCmdObjectDB],
- progress: Union[
- "RemoteProgress", "UpdateProgress", Callable[..., "RemoteProgress"], None
- ] = None,
+ progress: Union["RemoteProgress", "UpdateProgress", Callable[..., "RemoteProgress"], None] = None,
multi_options: Optional[List[str]] = None,
**kwargs: Any,
) -> "Repo":
@@ -1187,9 +1150,7 @@ class Repo(object):
# becomes::
# git clone --bare /cygwin/d/foo.git /cygwin/d/C:\\Work
#
- clone_path = (
- Git.polish_url(path) if Git.is_cygwin() and "bare" in kwargs else path
- )
+ clone_path = Git.polish_url(path) if Git.is_cygwin() and "bare" in kwargs else path
sep_dir = kwargs.get("separate_git_dir")
if sep_dir:
kwargs["separate_git_dir"] = Git.polish_url(sep_dir)
@@ -1225,11 +1186,7 @@ class Repo(object):
# our git command could have a different working dir than our actual
# environment, hence we prepend its working dir if required
if not osp.isabs(path):
- path = (
- osp.join(git._working_dir, path)
- if git._working_dir is not None
- else path
- )
+ path = osp.join(git._working_dir, path) if git._working_dir is not None else path
repo = cls(path, odbt=odbt)
@@ -1305,9 +1262,7 @@ class Repo(object):
git = cls.GitCommandWrapperType(os.getcwd())
if env is not None:
git.update_environment(**env)
- return cls._clone(
- git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs
- )
+ return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs)
def archive(
self,
diff --git a/git/repo/fun.py b/git/repo/fun.py
index 03f9cabb..8a07c2ab 100644
--- a/git/repo/fun.py
+++ b/git/repo/fun.py
@@ -59,13 +59,11 @@ def is_git_dir(d: "PathLike") -> bool:
There is the unlikely danger to throw if we see directories which just look like a worktree dir,
but are none."""
if osp.isdir(d):
- if (
- osp.isdir(osp.join(d, "objects")) or "GIT_OBJECT_DIRECTORY" in os.environ
- ) and osp.isdir(osp.join(d, "refs")):
+ if (osp.isdir(osp.join(d, "objects")) or "GIT_OBJECT_DIRECTORY" in os.environ) and osp.isdir(
+ osp.join(d, "refs")
+ ):
headref = osp.join(d, "HEAD")
- return osp.isfile(headref) or (
- osp.islink(headref) and os.readlink(headref).startswith("refs")
- )
+ return osp.isfile(headref) or (osp.islink(headref) and os.readlink(headref).startswith("refs"))
elif (
osp.isfile(osp.join(d, "gitdir"))
and osp.isfile(osp.join(d, "commondir"))
@@ -244,9 +242,7 @@ def rev_parse(repo: "Repo", rev: str) -> Union["Commit", "Tag", "Tree", "Blob"]:
ref = repo.head.ref
else:
if token == "@":
- ref = cast(
- "Reference", name_to_object(repo, rev[:start], return_ref=True)
- )
+ ref = cast("Reference", name_to_object(repo, rev[:start], return_ref=True))
else:
obj = cast(Commit_ish, name_to_object(repo, rev[:start]))
# END handle token
@@ -296,9 +292,7 @@ def rev_parse(repo: "Repo", rev: str) -> Union["Commit", "Tag", "Tree", "Blob"]:
except ValueError as e:
# TODO: Try to parse the other date options, using parse_date
# maybe
- raise NotImplementedError(
- "Support for additional @{...} modes not implemented"
- ) from e
+ raise NotImplementedError("Support for additional @{...} modes not implemented") from e
# END handle revlog index
try:
@@ -312,17 +306,12 @@ def rev_parse(repo: "Repo", rev: str) -> Union["Commit", "Tag", "Tree", "Blob"]:
# make it pass the following checks
output_type = ""
else:
- raise ValueError(
- "Invalid output type: %s ( in %s )" % (output_type, rev)
- )
+ raise ValueError("Invalid output type: %s ( in %s )" % (output_type, rev))
# END handle output type
# empty output types don't require any specific type, its just about dereferencing tags
if output_type and obj and obj.type != output_type:
- raise ValueError(
- "Could not accommodate requested object type %r, got %s"
- % (output_type, obj.type)
- )
+ raise ValueError("Could not accommodate requested object type %r, got %s" % (output_type, obj.type))
# END verify output type
start = end + 1 # skip brace
@@ -376,8 +365,7 @@ def rev_parse(repo: "Repo", rev: str) -> Union["Commit", "Tag", "Tree", "Blob"]:
# END end handle tag
except (IndexError, AttributeError) as e:
raise BadName(
- f"Invalid revision spec '{rev}' - not enough "
- f"parent commits to reach '{token}{int(num)}'"
+ f"Invalid revision spec '{rev}' - not enough " f"parent commits to reach '{token}{int(num)}'"
) from e
# END exception handling
# END parse loop
@@ -392,9 +380,6 @@ def rev_parse(repo: "Repo", rev: str) -> Union["Commit", "Tag", "Tree", "Blob"]:
raise ValueError("Revision specifier could not be parsed: %s" % rev)
if parsed_to != lr:
- raise ValueError(
- "Didn't consume complete rev spec %s, consumed part: %s"
- % (rev, rev[:parsed_to])
- )
+ raise ValueError("Didn't consume complete rev spec %s, consumed part: %s" % (rev, rev[:parsed_to]))
return obj
diff --git a/git/types.py b/git/types.py
index 24df887a..7a25688b 100644
--- a/git/types.py
+++ b/git/types.py
@@ -72,16 +72,12 @@ Lit_config_levels = Literal["system", "global", "user", "repository"]
# return inp in ("system", "user", "global", "repository")
-ConfigLevels_Tup = Tuple[
- Literal["system"], Literal["user"], Literal["global"], Literal["repository"]
-]
+ConfigLevels_Tup = Tuple[Literal["system"], Literal["user"], Literal["global"], Literal["repository"]]
# -----------------------------------------------------------------------------------
-def assert_never(
- inp: NoReturn, raise_error: bool = True, exc: Union[Exception, None] = None
-) -> None:
+def assert_never(inp: NoReturn, raise_error: bool = True, exc: Union[Exception, None] = None) -> None:
"""For use in exhaustive checking of literal or Enum in if/else chain.
Should only be reached if all members not handled OR attempt to pass non-members through chain.
@@ -92,9 +88,7 @@ def assert_never(
"""
if raise_error:
if exc is None:
- raise ValueError(
- f"An unhandled Literal ({inp}) in an if/else chain was found"
- )
+ raise ValueError(f"An unhandled Literal ({inp}) in an if/else chain was found")
else:
raise exc
else:
diff --git a/git/util.py b/git/util.py
index edc8750d..11139156 100644
--- a/git/util.py
+++ b/git/util.py
@@ -70,9 +70,7 @@ from .types import (
Has_id_attribute,
)
-T_IterableObj = TypeVar(
- "T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True
-)
+T_IterableObj = TypeVar("T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True)
# So IterableList[Head] is subtype of IterableList[IterableObj]
# ---------------------------------------------------------------------
@@ -125,9 +123,7 @@ log = logging.getLogger(__name__)
#: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy,
#: till then, we wish to hide them.
HIDE_WINDOWS_KNOWN_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_KNOWN_ERRORS", True)
-HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get(
- "HIDE_WINDOWS_FREEZE_ERRORS", True
-)
+HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_FREEZE_ERRORS", True)
# { Utility Methods
@@ -143,9 +139,7 @@ def unbare_repo(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T:
if self.repo.bare:
- raise InvalidGitRepositoryError(
- "Method '%s' cannot operate on bare repositories" % func.__name__
- )
+ raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__)
# END bare method
return func(self, *args, **kwargs)
@@ -180,9 +174,7 @@ def rmtree(path: PathLike) -> None:
if HIDE_WINDOWS_KNOWN_ERRORS:
from unittest import SkipTest
- raise SkipTest(
- "FIXME: fails with: PermissionError\n {}".format(ex)
- ) from ex
+ raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex
raise
return shutil.rmtree(path, False, onerror)
@@ -196,9 +188,7 @@ def rmfile(path: PathLike) -> None:
os.remove(path)
-def stream_copy(
- source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024
-) -> int:
+def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int:
"""Copy all data from the source stream into the destination stream in chunks
of size chunk_size
@@ -278,11 +268,7 @@ def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool:
def _get_exe_extensions() -> Sequence[str]:
PATHEXT = os.environ.get("PATHEXT", None)
return (
- tuple(p.upper() for p in PATHEXT.split(os.pathsep))
- if PATHEXT
- else (".BAT", "COM", ".EXE")
- if is_win
- else ("")
+ tuple(p.upper() for p in PATHEXT.split(os.pathsep)) if PATHEXT else (".BAT", "COM", ".EXE") if is_win else ("")
)
@@ -294,11 +280,7 @@ def py_where(program: str, path: Optional[PathLike] = None) -> List[str]:
return (
osp.isfile(fpath)
and os.access(fpath, os.X_OK)
- and (
- os.name != "nt"
- or not winprog_exts
- or any(fpath.upper().endswith(ext) for ext in winprog_exts)
- )
+ and (os.name != "nt" or not winprog_exts or any(fpath.upper().endswith(ext) for ext in winprog_exts))
)
progs = []
@@ -338,10 +320,7 @@ _cygpath_parsers: Tuple[Tuple[Pattern[str], Callable, bool], ...] = (
# and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths
(
re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"),
- (
- lambda server, share, rest_path: "//%s/%s/%s"
- % (server, share, rest_path.replace("\\", "/"))
- ),
+ (lambda server, share, rest_path: "//%s/%s/%s" % (server, share, rest_path.replace("\\", "/"))),
False,
),
(re.compile(r"\\\\\?\\(\w):[/\\](.*)"), (_cygexpath), False),
@@ -416,9 +395,7 @@ def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool:
# Just a name given, not a real path.
uname_cmd = osp.join(git_dir, "uname")
- process = subprocess.Popen(
- [uname_cmd], stdout=subprocess.PIPE, universal_newlines=True
- )
+ process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE, universal_newlines=True)
uname_out, _ = process.communicate()
# retcode = process.poll()
is_cygwin = "CYGWIN" in uname_out
@@ -434,9 +411,7 @@ def get_user_id() -> str:
return "%s@%s" % (getpass.getuser(), platform.node())
-def finalize_process(
- proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any
-) -> None:
+def finalize_process(proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any) -> None:
"""Wait for the process (clone, fetch, pull or push) and handle its errors accordingly"""
# TODO: No close proc-streams??
proc.wait(**kwargs)
@@ -453,9 +428,7 @@ def expand_path(p: PathLike, expand_vars: bool = ...) -> str:
...
-def expand_path(
- p: Union[None, PathLike], expand_vars: bool = True
-) -> Optional[PathLike]:
+def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]:
if isinstance(p, pathlib.Path):
return p.resolve()
try:
@@ -808,9 +781,7 @@ class Actor(object):
return actor
@classmethod
- def committer(
- cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None
- ) -> "Actor":
+ def committer(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor":
"""
:return: Actor instance corresponding to the configured committer. It behaves
similar to the git implementation, such that the environment will override
@@ -818,14 +789,10 @@ class Actor(object):
generated
:param config_reader: ConfigReader to use to retrieve the values from in case
they are not set in the environment"""
- return cls._main_actor(
- cls.env_committer_name, cls.env_committer_email, config_reader
- )
+ return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader)
@classmethod
- def author(
- cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None
- ) -> "Actor":
+ def author(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor":
"""Same as committer(), but defines the main author. It may be specified in the environment,
but defaults to the committer"""
return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader)
@@ -1038,9 +1005,9 @@ class BlockingLockFile(LockFile):
# readable anymore, raise an exception
curtime = time.time()
if not osp.isdir(osp.dirname(self._lock_file_path())):
- msg = (
- "Directory containing the lockfile %r was not readable anymore after waiting %g seconds"
- % (self._lock_file_path(), curtime - starttime)
+ msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % (
+ self._lock_file_path(),
+ curtime - starttime,
)
raise IOError(msg) from e
# END handle missing directory
@@ -1115,9 +1082,7 @@ class IterableList(List[T_IterableObj]):
def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: # type: ignore
- assert isinstance(
- index, (int, str, slice)
- ), "Index of IterableList should be an int or str"
+ assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str"
if isinstance(index, int):
return list.__getitem__(self, index)
@@ -1127,16 +1092,12 @@ class IterableList(List[T_IterableObj]):
try:
return getattr(self, index)
except AttributeError as e:
- raise IndexError(
- "No item found with id %r" % (self._prefix + index)
- ) from e
+ raise IndexError("No item found with id %r" % (self._prefix + index)) from e
# END handle getattr
def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None:
- assert isinstance(
- index, (int, str)
- ), "Index of IterableList should be an int or str"
+ assert isinstance(index, (int, str)), "Index of IterableList should be an int or str"
delindex = cast(int, index)
if not isinstance(index, int):
@@ -1213,9 +1174,7 @@ class IterableObj(Protocol):
_id_attribute_: str
@classmethod
- def list_items(
- cls, repo: "Repo", *args: Any, **kwargs: Any
- ) -> IterableList[T_IterableObj]:
+ def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]:
"""
Find all items of this type - subclasses can specify args and kwargs differently.
If no args are given, subclasses are obliged to return all items if no additional
@@ -1230,9 +1189,7 @@ class IterableObj(Protocol):
@classmethod
@abstractmethod
- def iter_items(
- cls, repo: "Repo", *args: Any, **kwargs: Any
- ) -> Iterator[T_IterableObj]: # Iterator[T_IterableObj]:
+ def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Iterator[T_IterableObj]: # Iterator[T_IterableObj]:
# return typed to be compatible with subtypes e.g. Remote
"""For more information about the arguments, see list_items
:return: iterator yielding Items"""