summaryrefslogtreecommitdiff
path: root/git/index/fun.py
diff options
context:
space:
mode:
Diffstat (limited to 'git/index/fun.py')
-rw-r--r--git/index/fun.py185
1 files changed, 114 insertions, 71 deletions
diff --git a/git/index/fun.py b/git/index/fun.py
index acab7423..e8dead86 100644
--- a/git/index/fun.py
+++ b/git/index/fun.py
@@ -25,14 +25,11 @@ from git.compat import (
is_win,
safe_decode,
)
-from git.exc import (
- UnmergedEntriesError,
- HookExecutionError
-)
+from git.exc import UnmergedEntriesError, HookExecutionError
from git.objects.fun import (
tree_to_stream,
traverse_tree_recursive,
- traverse_trees_recursive
+ traverse_trees_recursive,
)
from git.util import IndexFileSHA1Writer, finalize_process
from gitdb.base import IStream
@@ -40,20 +37,12 @@ from gitdb.typ import str_tree_type
import os.path as osp
-from .typ import (
- BaseIndexEntry,
- IndexEntry,
- CE_NAMEMASK,
- CE_STAGESHIFT
-)
-from .util import (
- pack,
- unpack
-)
+from .typ import BaseIndexEntry, IndexEntry, CE_NAMEMASK, CE_STAGESHIFT
+from .util import pack, unpack
# typing -----------------------------------------------------------------------------
-from typing import (Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast)
+from typing import Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast
from git.types import PathLike
@@ -61,40 +50,49 @@ if TYPE_CHECKING:
from .base import IndexFile
from git.db import GitCmdObjectDB
from git.objects.tree import TreeCacheTup
+
# from git.objects.fun import EntryTupOrNone
# ------------------------------------------------------------------------------------
-S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule
+S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule
CE_NAMEMASK_INV = ~CE_NAMEMASK
-__all__ = ('write_cache', 'read_cache', 'write_tree_from_cache', 'entry_key',
- 'stat_mode_to_index_mode', 'S_IFGITLINK', 'run_commit_hook', 'hook_path')
+__all__ = (
+ "write_cache",
+ "read_cache",
+ "write_tree_from_cache",
+ "entry_key",
+ "stat_mode_to_index_mode",
+ "S_IFGITLINK",
+ "run_commit_hook",
+ "hook_path",
+)
def hook_path(name: str, git_dir: PathLike) -> str:
""":return: path to the given named hook in the given git repository directory"""
- return osp.join(git_dir, 'hooks', name)
+ return osp.join(git_dir, "hooks", name)
def _has_file_extension(path):
return osp.splitext(path)[1]
-def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None:
+def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None:
"""Run the commit hook of the given name. Silently ignores hooks that do not exist.
:param name: name of hook, like 'pre-commit'
:param index: IndexFile instance
:param args: arguments passed to hook file
- :raises HookExecutionError: """
+ :raises HookExecutionError:"""
hp = hook_path(name, index.repo.git_dir)
if not os.access(hp, os.X_OK):
return None
env = os.environ.copy()
- env['GIT_INDEX_FILE'] = safe_decode(str(index.path))
- env['GIT_EDITOR'] = ':'
+ env["GIT_INDEX_FILE"] = safe_decode(str(index.path))
+ env["GIT_EDITOR"] = ":"
cmd = [hp]
try:
if is_win and not _has_file_extension(hp):
@@ -102,22 +100,26 @@ def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None:
# (doesn't understand shebangs). Try using bash to run the hook.
relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix()
cmd = ["bash.exe", relative_hp]
-
- cmd = subprocess.Popen(cmd + list(args),
- env=env,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- cwd=index.repo.working_dir,
- close_fds=is_posix,
- creationflags=PROC_CREATIONFLAGS,)
+
+ cmd = subprocess.Popen(
+ cmd + list(args),
+ env=env,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ cwd=index.repo.working_dir,
+ close_fds=is_posix,
+ creationflags=PROC_CREATIONFLAGS,
+ )
except Exception as ex:
raise HookExecutionError(hp, ex) from ex
else:
stdout_list: List[str] = []
stderr_list: List[str] = []
- handle_process_output(cmd, stdout_list.append, stderr_list.append, finalize_process)
- stdout = ''.join(stdout_list)
- stderr = ''.join(stderr_list)
+ handle_process_output(
+ cmd, stdout_list.append, stderr_list.append, finalize_process
+ )
+ stdout = "".join(stdout_list)
+ stderr = "".join(stderr_list)
if cmd.returncode != 0:
stdout = force_text(stdout, defenc)
stderr = force_text(stderr, defenc)
@@ -128,16 +130,21 @@ def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None:
def stat_mode_to_index_mode(mode: int) -> int:
"""Convert the given mode from a stat call to the corresponding index mode
and return it"""
- if S_ISLNK(mode): # symlinks
+ if S_ISLNK(mode): # symlinks
return S_IFLNK
- if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules
+ if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules
return S_IFGITLINK
- return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) # blobs with or without executable bit
+ return S_IFREG | (
+ mode & S_IXUSR and 0o755 or 0o644
+ ) # blobs with or without executable bit
-def write_cache(entries: Sequence[Union[BaseIndexEntry, 'IndexEntry']], stream: IO[bytes],
- extension_data: Union[None, bytes] = None,
- ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer) -> None:
+def write_cache(
+ entries: Sequence[Union[BaseIndexEntry, "IndexEntry"]],
+ stream: IO[bytes],
+ extension_data: Union[None, bytes] = None,
+ ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer,
+) -> None:
"""Write the cache represented by entries to a stream
:param entries: **sorted** list of entries
@@ -163,17 +170,28 @@ def write_cache(entries: Sequence[Union[BaseIndexEntry, 'IndexEntry']], stream:
# body
for entry in entries:
beginoffset = tell()
- write(entry.ctime_bytes) # ctime
- write(entry.mtime_bytes) # mtime
+ write(entry.ctime_bytes) # ctime
+ write(entry.mtime_bytes) # mtime
path_str = str(entry.path)
path: bytes = force_bytes(path_str, encoding=defenc)
- plen = len(path) & CE_NAMEMASK # path length
+ plen = len(path) & CE_NAMEMASK # path length
assert plen == len(path), "Path %s too long to fit into index" % entry.path
- flags = plen | (entry.flags & CE_NAMEMASK_INV) # clear possible previous values
- write(pack(">LLLLLL20sH", entry.dev, entry.inode, entry.mode,
- entry.uid, entry.gid, entry.size, entry.binsha, flags))
+ flags = plen | (entry.flags & CE_NAMEMASK_INV) # clear possible previous values
+ write(
+ pack(
+ ">LLLLLL20sH",
+ entry.dev,
+ entry.inode,
+ entry.mode,
+ entry.uid,
+ entry.gid,
+ entry.size,
+ entry.binsha,
+ flags,
+ )
+ )
write(path)
- real_size = ((tell() - beginoffset + 8) & ~7)
+ real_size = (tell() - beginoffset + 8) & ~7
write(b"\0" * ((beginoffset + real_size) - tell()))
# END for each entry
@@ -216,7 +234,9 @@ def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, i
# END handle entry
-def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'IndexEntry'], bytes, bytes]:
+def read_cache(
+ stream: IO[bytes],
+) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]:
"""Read a cache file from the given stream
:return: tuple(version, entries_dict, extension_data, content_sha)
* version is the integer version number
@@ -225,7 +245,7 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde
* content_sha is a 20 byte sha on all cache file contents"""
version, num_entries = read_header(stream)
count = 0
- entries: Dict[Tuple[PathLike, int], 'IndexEntry'] = {}
+ entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {}
read = stream.read
tell = stream.tell
@@ -233,14 +253,17 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde
beginoffset = tell()
ctime = unpack(">8s", read(8))[0]
mtime = unpack(">8s", read(8))[0]
- (dev, ino, mode, uid, gid, size, sha, flags) = \
- unpack(">LLLLLL20sH", read(20 + 4 * 6 + 2))
+ (dev, ino, mode, uid, gid, size, sha, flags) = unpack(
+ ">LLLLLL20sH", read(20 + 4 * 6 + 2)
+ )
path_size = flags & CE_NAMEMASK
path = read(path_size).decode(defenc)
- real_size = ((tell() - beginoffset + 8) & ~7)
+ real_size = (tell() - beginoffset + 8) & ~7
read((beginoffset + real_size) - tell())
- entry = IndexEntry((mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size))
+ entry = IndexEntry(
+ (mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size)
+ )
# entry_key would be the method to use, but we safe the effort
entries[(path, entry.stage)] = entry
count += 1
@@ -253,19 +276,22 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde
# 4 bytes length of chunk
# repeated 0 - N times
extension_data = stream.read(~0)
- assert len(extension_data) > 19, "Index Footer was not at least a sha on content as it was only %i bytes in size"\
- % len(extension_data)
+ assert len(extension_data) > 19, (
+ "Index Footer was not at least a sha on content as it was only %i bytes in size"
+ % len(extension_data)
+ )
content_sha = extension_data[-20:]
# truncate the sha in the end as we will dynamically create it anyway
- extension_data = extension_data[: -20]
+ extension_data = extension_data[:-20]
return (version, entries, extension_data, content_sha)
-def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: slice, si: int = 0
- ) -> Tuple[bytes, List['TreeCacheTup']]:
+def write_tree_from_cache(
+ entries: List[IndexEntry], odb: "GitCmdObjectDB", sl: slice, si: int = 0
+) -> Tuple[bytes, List["TreeCacheTup"]]:
"""Create a tree from the given sorted list of entries and put the respective
trees into the given object database
@@ -275,7 +301,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl:
:param sl: slice indicating the range we should process on the entries list
:return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of
tree entries being a tuple of hexsha, mode, name"""
- tree_items: List['TreeCacheTup'] = []
+ tree_items: List["TreeCacheTup"] = []
ci = sl.start
end = sl.stop
@@ -285,7 +311,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl:
raise UnmergedEntriesError(entry)
# END abort on unmerged
ci += 1
- rbound = entry.path.find('/', si)
+ rbound = entry.path.find("/", si)
if rbound == -1:
# its not a tree
tree_items.append((entry.binsha, entry.mode, entry.path[si:]))
@@ -295,7 +321,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl:
xi = ci
while xi < end:
oentry = entries[xi]
- orbound = oentry.path.find('/', si)
+ orbound = oentry.path.find("/", si)
if orbound == -1 or oentry.path[si:orbound] != base:
break
# END abort on base mismatch
@@ -304,7 +330,9 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl:
# enter recursion
# ci - 1 as we want to count our current item as well
- sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1)
+ sha, _tree_entry_list = write_tree_from_cache(
+ entries, odb, slice(ci - 1, xi), rbound + 1
+ )
tree_items.append((sha, S_IFDIR, base))
# skip ahead
@@ -314,18 +342,26 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl:
# finally create the tree
sio = BytesIO()
- tree_to_stream(tree_items, sio.write) # writes to stream as bytes, but doesn't change tree_items
+ tree_to_stream(
+ tree_items, sio.write
+ ) # writes to stream as bytes, but doesn't change tree_items
sio.seek(0)
istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio))
return (istream.binsha, tree_items)
-def _tree_entry_to_baseindexentry(tree_entry: 'TreeCacheTup', stage: int) -> BaseIndexEntry:
- return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2]))
+def _tree_entry_to_baseindexentry(
+ tree_entry: "TreeCacheTup", stage: int
+) -> BaseIndexEntry:
+ return BaseIndexEntry(
+ (tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])
+ )
-def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]:
+def aggressive_tree_merge(
+ odb: "GitCmdObjectDB", tree_shas: Sequence[bytes]
+) -> List[BaseIndexEntry]:
"""
:return: list of BaseIndexEntries representing the aggressive merge of the given
trees. All valid entries are on stage 0, whereas the conflicting ones are left
@@ -339,7 +375,7 @@ def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) ->
# one and two way is the same for us, as we don't have to handle an existing
# index, instrea
if len(tree_shas) in (1, 2):
- for entry in traverse_tree_recursive(odb, tree_shas[-1], ''):
+ for entry in traverse_tree_recursive(odb, tree_shas[-1], ""):
out.append(_tree_entry_to_baseindexentry(entry, 0))
# END for each entry
return out
@@ -349,7 +385,7 @@ def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) ->
raise ValueError("Cannot handle %i trees at once" % len(tree_shas))
# three trees
- for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ''):
+ for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ""):
if base is not None:
# base version exists
if ours is not None:
@@ -358,8 +394,15 @@ def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) ->
# it exists in all branches, if it was changed in both
# its a conflict, otherwise we take the changed version
# This should be the most common branch, so it comes first
- if(base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or \
- (base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1]):
+ if (
+ base[0] != ours[0]
+ and base[0] != theirs[0]
+ and ours[0] != theirs[0]
+ ) or (
+ base[1] != ours[1]
+ and base[1] != theirs[1]
+ and ours[1] != theirs[1]
+ ):
# changed by both
out.append(_tree_entry_to_baseindexentry(base, 1))
out.append(_tree_entry_to_baseindexentry(ours, 2))