summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--git/exc.py11
-rw-r--r--git/index/base.py5
-rw-r--r--git/index/fun.py77
-rw-r--r--git/repo/base.py4
-rw-r--r--git/util.py1
5 files changed, 59 insertions, 39 deletions
diff --git a/git/exc.py b/git/exc.py
index 54a1d51b..e8ff784c 100644
--- a/git/exc.py
+++ b/git/exc.py
@@ -11,7 +11,7 @@ from git.compat import safe_decode
# typing ----------------------------------------------------
-from typing import List, Optional, Sequence, Tuple, Union, TYPE_CHECKING
+from typing import List, Sequence, Tuple, Union, TYPE_CHECKING
from git.types import PathLike
if TYPE_CHECKING:
@@ -113,7 +113,7 @@ class CheckoutError(GitError):
were checked out successfully and hence match the version stored in the
index"""
- def __init__(self, message: str, failed_files: Sequence[PathLike], valid_files: List[PathLike],
+ def __init__(self, message: str, failed_files: Sequence[PathLike], valid_files: Sequence[PathLike],
failed_reasons: List[str]) -> None:
Exception.__init__(self, message)
@@ -139,8 +139,11 @@ class HookExecutionError(CommandError):
"""Thrown if a hook exits with a non-zero exit code. It provides access to the exit code and the string returned
via standard output"""
- def __init__(self, command: Union[List[str], Tuple[str, ...], str], status: Optional[str],
- stderr: Optional[str] = None, stdout: Optional[str] = None) -> None:
+ def __init__(self, command: Union[List[str], Tuple[str, ...], str],
+ status: Union[str, int, None, Exception],
+ stderr: Union[bytes, str, None] = None,
+ stdout: Union[bytes, str, None] = None) -> None:
+
super(HookExecutionError, self).__init__(command, status, stderr, stdout)
self._msg = "Hook('%s') failed%s"
diff --git a/git/index/base.py b/git/index/base.py
index 2bb62f32..5c4947ca 100644
--- a/git/index/base.py
+++ b/git/index/base.py
@@ -285,7 +285,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
New IndexFile instance. Its path will be undefined.
If you intend to write such a merged Index, supply an alternate file_path
to its 'write' method."""
- base_entries = aggressive_tree_merge(repo.odb, [to_bin_sha(str(t)) for t in tree_sha])
+ tree_sha_bytes = [to_bin_sha(str(t)) for t in tree_sha] # List[bytes]
+ base_entries = aggressive_tree_merge(repo.odb, tree_sha_bytes)
inst = cls(repo)
# convert to entries dict
@@ -1023,7 +1024,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
@default_index
def checkout(self, paths: Union[None, Iterable[PathLike]] = None, force: bool = False,
fprogress: Callable = lambda *args: None, **kwargs: Any
- ) -> Union[None, Iterator[PathLike], List[PathLike]]:
+ ) -> Union[None, Iterator[PathLike], Sequence[PathLike]]:
"""Checkout the given paths or all files from the version known to the index into
the working tree.
diff --git a/git/index/fun.py b/git/index/fun.py
index cc43f0a4..95dc3d56 100644
--- a/git/index/fun.py
+++ b/git/index/fun.py
@@ -1,7 +1,7 @@
# Contains standalone functions to accompany the index implementation and make it
# more versatile
# NOTE: Autodoc hates it if this is a docstring
-from git.types import PathLike
+
from io import BytesIO
import os
from stat import (
@@ -13,7 +13,6 @@ from stat import (
S_IFREG,
)
import subprocess
-from typing import List, Tuple, Union, cast
from git.cmd import PROC_CREATIONFLAGS, handle_process_output
from git.compat import (
@@ -49,6 +48,17 @@ from .util import (
unpack
)
+# typing -----------------------------------------------------------------------------
+
+from typing import (Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast)
+
+from git.types import PathLike
+
+if TYPE_CHECKING:
+ from .base import IndexFile
+
+# ------------------------------------------------------------------------------------
+
S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule
CE_NAMEMASK_INV = ~CE_NAMEMASK
@@ -57,12 +67,12 @@ __all__ = ('write_cache', 'read_cache', 'write_tree_from_cache', 'entry_key',
'stat_mode_to_index_mode', 'S_IFGITLINK', 'run_commit_hook', 'hook_path')
-def hook_path(name, git_dir):
+def hook_path(name: str, git_dir: PathLike) -> str:
""":return: path to the given named hook in the given git repository directory"""
return osp.join(git_dir, 'hooks', name)
-def run_commit_hook(name, index, *args):
+def run_commit_hook(name: str, index: IndexFile, *args: str) -> None:
"""Run the commit hook of the given name. Silently ignores hooks that do not exist.
:param name: name of hook, like 'pre-commit'
:param index: IndexFile instance
@@ -70,10 +80,10 @@ def run_commit_hook(name, index, *args):
:raises HookExecutionError: """
hp = hook_path(name, index.repo.git_dir)
if not os.access(hp, os.X_OK):
- return
+ return None
env = os.environ.copy()
- env['GIT_INDEX_FILE'] = safe_decode(index.path)
+ env['GIT_INDEX_FILE'] = safe_decode(str(index.path))
env['GIT_EDITOR'] = ':'
try:
cmd = subprocess.Popen([hp] + list(args),
@@ -86,14 +96,14 @@ def run_commit_hook(name, index, *args):
except Exception as ex:
raise HookExecutionError(hp, ex) from ex
else:
- stdout = []
- stderr = []
- handle_process_output(cmd, stdout.append, stderr.append, finalize_process)
- stdout = ''.join(stdout)
- stderr = ''.join(stderr)
+ stdout_list = [] # type: List[str]
+ stderr_list = [] # type: List[str]
+ handle_process_output(cmd, stdout_list.append, stderr_list.append, finalize_process)
+ stdout_str = ''.join(stderr_list)
+ stderr_str = ''.join(stderr_list)
if cmd.returncode != 0:
- stdout = force_text(stdout, defenc)
- stderr = force_text(stderr, defenc)
+ stdout = force_text(stdout_str, defenc)
+ stderr = force_text(stderr_str, defenc)
raise HookExecutionError(hp, cmd.returncode, stderr, stdout)
# end handle return code
@@ -108,7 +118,9 @@ def stat_mode_to_index_mode(mode):
return S_IFREG | 0o644 | (mode & 0o111) # blobs with or without executable bit
-def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1Writer):
+def write_cache(entries: Sequence[Union[BaseIndexEntry, 'IndexEntry']], stream: IO[bytes],
+ extension_data: Union[None, bytes] = None,
+ ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer) -> None:
"""Write the cache represented by entries to a stream
:param entries: **sorted** list of entries
@@ -121,10 +133,10 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1
:param extension_data: any kind of data to write as a trailer, it must begin
a 4 byte identifier, followed by its size ( 4 bytes )"""
# wrap the stream into a compatible writer
- stream = ShaStreamCls(stream)
+ stream_sha = ShaStreamCls(stream)
- tell = stream.tell
- write = stream.write
+ tell = stream_sha.tell
+ write = stream_sha.write
# header
version = 2
@@ -136,8 +148,8 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1
beginoffset = tell()
write(entry[4]) # ctime
write(entry[5]) # mtime
- path = entry[3]
- path = force_bytes(path, encoding=defenc)
+ path_str = entry[3] # type: str
+ path = force_bytes(path_str, encoding=defenc)
plen = len(path) & CE_NAMEMASK # path length
assert plen == len(path), "Path %s too long to fit into index" % entry[3]
flags = plen | (entry[2] & CE_NAMEMASK_INV) # clear possible previous values
@@ -150,18 +162,19 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1
# write previously cached extensions data
if extension_data is not None:
- stream.write(extension_data)
+ stream_sha.write(extension_data)
# write the sha over the content
- stream.write_sha()
+ stream_sha.write_sha()
-def read_header(stream):
+def read_header(stream: IO[bytes]) -> Tuple[int, int]:
"""Return tuple(version_long, num_entries) from the given stream"""
type_id = stream.read(4)
if type_id != b"DIRC":
raise AssertionError("Invalid index file header: %r" % type_id)
- version, num_entries = unpack(">LL", stream.read(4 * 2))
+ unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2)))
+ version, num_entries = unpacked
# TODO: handle version 3: extended data, see read-cache.c
assert version in (1, 2)
@@ -180,7 +193,7 @@ def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, i
# END handle entry
-def read_cache(stream):
+def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'IndexEntry'], bytes, bytes]:
"""Read a cache file from the given stream
:return: tuple(version, entries_dict, extension_data, content_sha)
* version is the integer version number
@@ -189,7 +202,7 @@ def read_cache(stream):
* content_sha is a 20 byte sha on all cache file contents"""
version, num_entries = read_header(stream)
count = 0
- entries = {}
+ entries = {} # type: Dict[Tuple[PathLike, int], 'IndexEntry']
read = stream.read
tell = stream.tell
@@ -228,7 +241,8 @@ def read_cache(stream):
return (version, entries, extension_data, content_sha)
-def write_tree_from_cache(entries, odb, sl, si=0):
+def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0
+ ) -> Tuple[bytes, List[Tuple[str, int, str]]]:
"""Create a tree from the given sorted list of entries and put the respective
trees into the given object database
@@ -238,7 +252,7 @@ def write_tree_from_cache(entries, odb, sl, si=0):
:param sl: slice indicating the range we should process on the entries list
:return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of
tree entries being a tuple of hexsha, mode, name"""
- tree_items = []
+ tree_items = [] # type: List[Tuple[Union[bytes, str], int, str]]
tree_items_append = tree_items.append
ci = sl.start
end = sl.stop
@@ -277,18 +291,19 @@ def write_tree_from_cache(entries, odb, sl, si=0):
# finally create the tree
sio = BytesIO()
- tree_to_stream(tree_items, sio.write)
+ tree_to_stream(tree_items, sio.write) # converts bytes of each item[0] to str
+ tree_items_stringified = cast(List[Tuple[str, int, str]], tree_items) # type: List[Tuple[str, int, str]]
sio.seek(0)
istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio))
- return (istream.binsha, tree_items)
+ return (istream.binsha, tree_items_stringified)
-def _tree_entry_to_baseindexentry(tree_entry, stage):
+def _tree_entry_to_baseindexentry(tree_entry: Tuple[str, int, str], stage: int) -> BaseIndexEntry:
return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2]))
-def aggressive_tree_merge(odb, tree_shas) -> List[BaseIndexEntry]:
+def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]:
"""
:return: list of BaseIndexEntries representing the aggressive merge of the given
trees. All valid entries are on stage 0, whereas the conflicting ones are left
diff --git a/git/repo/base.py b/git/repo/base.py
index 607eb868..e23ebb1a 100644
--- a/git/repo/base.py
+++ b/git/repo/base.py
@@ -36,7 +36,7 @@ import gitdb
from git.types import TBD, PathLike, Lit_config_levels
from typing import (Any, BinaryIO, Callable, Dict,
- Iterator, List, Mapping, Optional,
+ Iterator, List, Mapping, Optional, Sequence,
TextIO, Tuple, Type, Union,
NamedTuple, cast, TYPE_CHECKING)
@@ -536,7 +536,7 @@ class Repo(object):
return self.head.commit.tree
return self.rev_parse(str(rev) + "^{tree}")
- def iter_commits(self, rev: Optional[TBD] = None, paths: Union[PathLike, List[PathLike]] = '',
+ def iter_commits(self, rev: Optional[TBD] = None, paths: Union[PathLike, Sequence[PathLike]] = '',
**kwargs: Any) -> Iterator[Commit]:
"""A list of Commit objects representing the history of a given ref/commit
diff --git a/git/util.py b/git/util.py
index 581bf877..76aaee49 100644
--- a/git/util.py
+++ b/git/util.py
@@ -377,6 +377,7 @@ def expand_path(p: None, expand_vars: bool = ...) -> None:
@overload
def expand_path(p: PathLike, expand_vars: bool = ...) -> str:
+ # improve these overloads when 3.5 dropped
...