diff options
author | Sebastian Thiel <sebastian.thiel@icloud.com> | 2022-05-18 07:43:53 +0800 |
---|---|---|
committer | Sebastian Thiel <sebastian.thiel@icloud.com> | 2022-05-18 07:43:53 +0800 |
commit | 21ec529987d10e0010badd37f8da3274167d436f (patch) | |
tree | a3394cfe902ce7edd07c89420c21c13274a2d295 /git/index/fun.py | |
parent | b30720ee4d9762a03eae4fa7cfa4b0190d81784d (diff) | |
download | gitpython-21ec529987d10e0010badd37f8da3274167d436f.tar.gz |
Run everything through 'black'
That way people who use it won't be deterred, while it unifies style
everywhere.
Diffstat (limited to 'git/index/fun.py')
-rw-r--r-- | git/index/fun.py | 185 |
1 files changed, 114 insertions, 71 deletions
diff --git a/git/index/fun.py b/git/index/fun.py index acab7423..e8dead86 100644 --- a/git/index/fun.py +++ b/git/index/fun.py @@ -25,14 +25,11 @@ from git.compat import ( is_win, safe_decode, ) -from git.exc import ( - UnmergedEntriesError, - HookExecutionError -) +from git.exc import UnmergedEntriesError, HookExecutionError from git.objects.fun import ( tree_to_stream, traverse_tree_recursive, - traverse_trees_recursive + traverse_trees_recursive, ) from git.util import IndexFileSHA1Writer, finalize_process from gitdb.base import IStream @@ -40,20 +37,12 @@ from gitdb.typ import str_tree_type import os.path as osp -from .typ import ( - BaseIndexEntry, - IndexEntry, - CE_NAMEMASK, - CE_STAGESHIFT -) -from .util import ( - pack, - unpack -) +from .typ import BaseIndexEntry, IndexEntry, CE_NAMEMASK, CE_STAGESHIFT +from .util import pack, unpack # typing ----------------------------------------------------------------------------- -from typing import (Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast) +from typing import Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast from git.types import PathLike @@ -61,40 +50,49 @@ if TYPE_CHECKING: from .base import IndexFile from git.db import GitCmdObjectDB from git.objects.tree import TreeCacheTup + # from git.objects.fun import EntryTupOrNone # ------------------------------------------------------------------------------------ -S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule +S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule CE_NAMEMASK_INV = ~CE_NAMEMASK -__all__ = ('write_cache', 'read_cache', 'write_tree_from_cache', 'entry_key', - 'stat_mode_to_index_mode', 'S_IFGITLINK', 'run_commit_hook', 'hook_path') +__all__ = ( + "write_cache", + "read_cache", + "write_tree_from_cache", + "entry_key", + "stat_mode_to_index_mode", + "S_IFGITLINK", + "run_commit_hook", + "hook_path", +) def hook_path(name: str, git_dir: PathLike) -> str: """:return: path to the given named hook in the given git repository directory""" - return osp.join(git_dir, 'hooks', name) + return osp.join(git_dir, "hooks", name) def _has_file_extension(path): return osp.splitext(path)[1] -def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None: +def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None: """Run the commit hook of the given name. Silently ignores hooks that do not exist. :param name: name of hook, like 'pre-commit' :param index: IndexFile instance :param args: arguments passed to hook file - :raises HookExecutionError: """ + :raises HookExecutionError:""" hp = hook_path(name, index.repo.git_dir) if not os.access(hp, os.X_OK): return None env = os.environ.copy() - env['GIT_INDEX_FILE'] = safe_decode(str(index.path)) - env['GIT_EDITOR'] = ':' + env["GIT_INDEX_FILE"] = safe_decode(str(index.path)) + env["GIT_EDITOR"] = ":" cmd = [hp] try: if is_win and not _has_file_extension(hp): @@ -102,22 +100,26 @@ def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None: # (doesn't understand shebangs). Try using bash to run the hook. relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix() cmd = ["bash.exe", relative_hp] - - cmd = subprocess.Popen(cmd + list(args), - env=env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - cwd=index.repo.working_dir, - close_fds=is_posix, - creationflags=PROC_CREATIONFLAGS,) + + cmd = subprocess.Popen( + cmd + list(args), + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=index.repo.working_dir, + close_fds=is_posix, + creationflags=PROC_CREATIONFLAGS, + ) except Exception as ex: raise HookExecutionError(hp, ex) from ex else: stdout_list: List[str] = [] stderr_list: List[str] = [] - handle_process_output(cmd, stdout_list.append, stderr_list.append, finalize_process) - stdout = ''.join(stdout_list) - stderr = ''.join(stderr_list) + handle_process_output( + cmd, stdout_list.append, stderr_list.append, finalize_process + ) + stdout = "".join(stdout_list) + stderr = "".join(stderr_list) if cmd.returncode != 0: stdout = force_text(stdout, defenc) stderr = force_text(stderr, defenc) @@ -128,16 +130,21 @@ def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None: def stat_mode_to_index_mode(mode: int) -> int: """Convert the given mode from a stat call to the corresponding index mode and return it""" - if S_ISLNK(mode): # symlinks + if S_ISLNK(mode): # symlinks return S_IFLNK - if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules + if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules return S_IFGITLINK - return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) # blobs with or without executable bit + return S_IFREG | ( + mode & S_IXUSR and 0o755 or 0o644 + ) # blobs with or without executable bit -def write_cache(entries: Sequence[Union[BaseIndexEntry, 'IndexEntry']], stream: IO[bytes], - extension_data: Union[None, bytes] = None, - ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer) -> None: +def write_cache( + entries: Sequence[Union[BaseIndexEntry, "IndexEntry"]], + stream: IO[bytes], + extension_data: Union[None, bytes] = None, + ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer, +) -> None: """Write the cache represented by entries to a stream :param entries: **sorted** list of entries @@ -163,17 +170,28 @@ def write_cache(entries: Sequence[Union[BaseIndexEntry, 'IndexEntry']], stream: # body for entry in entries: beginoffset = tell() - write(entry.ctime_bytes) # ctime - write(entry.mtime_bytes) # mtime + write(entry.ctime_bytes) # ctime + write(entry.mtime_bytes) # mtime path_str = str(entry.path) path: bytes = force_bytes(path_str, encoding=defenc) - plen = len(path) & CE_NAMEMASK # path length + plen = len(path) & CE_NAMEMASK # path length assert plen == len(path), "Path %s too long to fit into index" % entry.path - flags = plen | (entry.flags & CE_NAMEMASK_INV) # clear possible previous values - write(pack(">LLLLLL20sH", entry.dev, entry.inode, entry.mode, - entry.uid, entry.gid, entry.size, entry.binsha, flags)) + flags = plen | (entry.flags & CE_NAMEMASK_INV) # clear possible previous values + write( + pack( + ">LLLLLL20sH", + entry.dev, + entry.inode, + entry.mode, + entry.uid, + entry.gid, + entry.size, + entry.binsha, + flags, + ) + ) write(path) - real_size = ((tell() - beginoffset + 8) & ~7) + real_size = (tell() - beginoffset + 8) & ~7 write(b"\0" * ((beginoffset + real_size) - tell())) # END for each entry @@ -216,7 +234,9 @@ def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, i # END handle entry -def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'IndexEntry'], bytes, bytes]: +def read_cache( + stream: IO[bytes], +) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]: """Read a cache file from the given stream :return: tuple(version, entries_dict, extension_data, content_sha) * version is the integer version number @@ -225,7 +245,7 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde * content_sha is a 20 byte sha on all cache file contents""" version, num_entries = read_header(stream) count = 0 - entries: Dict[Tuple[PathLike, int], 'IndexEntry'] = {} + entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {} read = stream.read tell = stream.tell @@ -233,14 +253,17 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde beginoffset = tell() ctime = unpack(">8s", read(8))[0] mtime = unpack(">8s", read(8))[0] - (dev, ino, mode, uid, gid, size, sha, flags) = \ - unpack(">LLLLLL20sH", read(20 + 4 * 6 + 2)) + (dev, ino, mode, uid, gid, size, sha, flags) = unpack( + ">LLLLLL20sH", read(20 + 4 * 6 + 2) + ) path_size = flags & CE_NAMEMASK path = read(path_size).decode(defenc) - real_size = ((tell() - beginoffset + 8) & ~7) + real_size = (tell() - beginoffset + 8) & ~7 read((beginoffset + real_size) - tell()) - entry = IndexEntry((mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size)) + entry = IndexEntry( + (mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size) + ) # entry_key would be the method to use, but we safe the effort entries[(path, entry.stage)] = entry count += 1 @@ -253,19 +276,22 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde # 4 bytes length of chunk # repeated 0 - N times extension_data = stream.read(~0) - assert len(extension_data) > 19, "Index Footer was not at least a sha on content as it was only %i bytes in size"\ - % len(extension_data) + assert len(extension_data) > 19, ( + "Index Footer was not at least a sha on content as it was only %i bytes in size" + % len(extension_data) + ) content_sha = extension_data[-20:] # truncate the sha in the end as we will dynamically create it anyway - extension_data = extension_data[: -20] + extension_data = extension_data[:-20] return (version, entries, extension_data, content_sha) -def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: slice, si: int = 0 - ) -> Tuple[bytes, List['TreeCacheTup']]: +def write_tree_from_cache( + entries: List[IndexEntry], odb: "GitCmdObjectDB", sl: slice, si: int = 0 +) -> Tuple[bytes, List["TreeCacheTup"]]: """Create a tree from the given sorted list of entries and put the respective trees into the given object database @@ -275,7 +301,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: :param sl: slice indicating the range we should process on the entries list :return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name""" - tree_items: List['TreeCacheTup'] = [] + tree_items: List["TreeCacheTup"] = [] ci = sl.start end = sl.stop @@ -285,7 +311,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: raise UnmergedEntriesError(entry) # END abort on unmerged ci += 1 - rbound = entry.path.find('/', si) + rbound = entry.path.find("/", si) if rbound == -1: # its not a tree tree_items.append((entry.binsha, entry.mode, entry.path[si:])) @@ -295,7 +321,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: xi = ci while xi < end: oentry = entries[xi] - orbound = oentry.path.find('/', si) + orbound = oentry.path.find("/", si) if orbound == -1 or oentry.path[si:orbound] != base: break # END abort on base mismatch @@ -304,7 +330,9 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: # enter recursion # ci - 1 as we want to count our current item as well - sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1) + sha, _tree_entry_list = write_tree_from_cache( + entries, odb, slice(ci - 1, xi), rbound + 1 + ) tree_items.append((sha, S_IFDIR, base)) # skip ahead @@ -314,18 +342,26 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: # finally create the tree sio = BytesIO() - tree_to_stream(tree_items, sio.write) # writes to stream as bytes, but doesn't change tree_items + tree_to_stream( + tree_items, sio.write + ) # writes to stream as bytes, but doesn't change tree_items sio.seek(0) istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio)) return (istream.binsha, tree_items) -def _tree_entry_to_baseindexentry(tree_entry: 'TreeCacheTup', stage: int) -> BaseIndexEntry: - return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])) +def _tree_entry_to_baseindexentry( + tree_entry: "TreeCacheTup", stage: int +) -> BaseIndexEntry: + return BaseIndexEntry( + (tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2]) + ) -def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]: +def aggressive_tree_merge( + odb: "GitCmdObjectDB", tree_shas: Sequence[bytes] +) -> List[BaseIndexEntry]: """ :return: list of BaseIndexEntries representing the aggressive merge of the given trees. All valid entries are on stage 0, whereas the conflicting ones are left @@ -339,7 +375,7 @@ def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) -> # one and two way is the same for us, as we don't have to handle an existing # index, instrea if len(tree_shas) in (1, 2): - for entry in traverse_tree_recursive(odb, tree_shas[-1], ''): + for entry in traverse_tree_recursive(odb, tree_shas[-1], ""): out.append(_tree_entry_to_baseindexentry(entry, 0)) # END for each entry return out @@ -349,7 +385,7 @@ def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) -> raise ValueError("Cannot handle %i trees at once" % len(tree_shas)) # three trees - for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ''): + for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ""): if base is not None: # base version exists if ours is not None: @@ -358,8 +394,15 @@ def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) -> # it exists in all branches, if it was changed in both # its a conflict, otherwise we take the changed version # This should be the most common branch, so it comes first - if(base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or \ - (base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1]): + if ( + base[0] != ours[0] + and base[0] != theirs[0] + and ours[0] != theirs[0] + ) or ( + base[1] != ours[1] + and base[1] != theirs[1] + and ours[1] != theirs[1] + ): # changed by both out.append(_tree_entry_to_baseindexentry(base, 1)) out.append(_tree_entry_to_baseindexentry(ours, 2)) |