diff options
-rw-r--r-- | git/exc.py | 11 | ||||
-rw-r--r-- | git/index/base.py | 5 | ||||
-rw-r--r-- | git/index/fun.py | 77 | ||||
-rw-r--r-- | git/repo/base.py | 4 | ||||
-rw-r--r-- | git/util.py | 1 |
5 files changed, 59 insertions, 39 deletions
@@ -11,7 +11,7 @@ from git.compat import safe_decode # typing ---------------------------------------------------- -from typing import List, Optional, Sequence, Tuple, Union, TYPE_CHECKING +from typing import List, Sequence, Tuple, Union, TYPE_CHECKING from git.types import PathLike if TYPE_CHECKING: @@ -113,7 +113,7 @@ class CheckoutError(GitError): were checked out successfully and hence match the version stored in the index""" - def __init__(self, message: str, failed_files: Sequence[PathLike], valid_files: List[PathLike], + def __init__(self, message: str, failed_files: Sequence[PathLike], valid_files: Sequence[PathLike], failed_reasons: List[str]) -> None: Exception.__init__(self, message) @@ -139,8 +139,11 @@ class HookExecutionError(CommandError): """Thrown if a hook exits with a non-zero exit code. It provides access to the exit code and the string returned via standard output""" - def __init__(self, command: Union[List[str], Tuple[str, ...], str], status: Optional[str], - stderr: Optional[str] = None, stdout: Optional[str] = None) -> None: + def __init__(self, command: Union[List[str], Tuple[str, ...], str], + status: Union[str, int, None, Exception], + stderr: Union[bytes, str, None] = None, + stdout: Union[bytes, str, None] = None) -> None: + super(HookExecutionError, self).__init__(command, status, stderr, stdout) self._msg = "Hook('%s') failed%s" diff --git a/git/index/base.py b/git/index/base.py index 2bb62f32..5c4947ca 100644 --- a/git/index/base.py +++ b/git/index/base.py @@ -285,7 +285,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): New IndexFile instance. Its path will be undefined. If you intend to write such a merged Index, supply an alternate file_path to its 'write' method.""" - base_entries = aggressive_tree_merge(repo.odb, [to_bin_sha(str(t)) for t in tree_sha]) + tree_sha_bytes = [to_bin_sha(str(t)) for t in tree_sha] # List[bytes] + base_entries = aggressive_tree_merge(repo.odb, tree_sha_bytes) inst = cls(repo) # convert to entries dict @@ -1023,7 +1024,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): @default_index def checkout(self, paths: Union[None, Iterable[PathLike]] = None, force: bool = False, fprogress: Callable = lambda *args: None, **kwargs: Any - ) -> Union[None, Iterator[PathLike], List[PathLike]]: + ) -> Union[None, Iterator[PathLike], Sequence[PathLike]]: """Checkout the given paths or all files from the version known to the index into the working tree. diff --git a/git/index/fun.py b/git/index/fun.py index cc43f0a4..95dc3d56 100644 --- a/git/index/fun.py +++ b/git/index/fun.py @@ -1,7 +1,7 @@ # Contains standalone functions to accompany the index implementation and make it # more versatile # NOTE: Autodoc hates it if this is a docstring -from git.types import PathLike + from io import BytesIO import os from stat import ( @@ -13,7 +13,6 @@ from stat import ( S_IFREG, ) import subprocess -from typing import List, Tuple, Union, cast from git.cmd import PROC_CREATIONFLAGS, handle_process_output from git.compat import ( @@ -49,6 +48,17 @@ from .util import ( unpack ) +# typing ----------------------------------------------------------------------------- + +from typing import (Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast) + +from git.types import PathLike + +if TYPE_CHECKING: + from .base import IndexFile + +# ------------------------------------------------------------------------------------ + S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule CE_NAMEMASK_INV = ~CE_NAMEMASK @@ -57,12 +67,12 @@ __all__ = ('write_cache', 'read_cache', 'write_tree_from_cache', 'entry_key', 'stat_mode_to_index_mode', 'S_IFGITLINK', 'run_commit_hook', 'hook_path') -def hook_path(name, git_dir): +def hook_path(name: str, git_dir: PathLike) -> str: """:return: path to the given named hook in the given git repository directory""" return osp.join(git_dir, 'hooks', name) -def run_commit_hook(name, index, *args): +def run_commit_hook(name: str, index: IndexFile, *args: str) -> None: """Run the commit hook of the given name. Silently ignores hooks that do not exist. :param name: name of hook, like 'pre-commit' :param index: IndexFile instance @@ -70,10 +80,10 @@ def run_commit_hook(name, index, *args): :raises HookExecutionError: """ hp = hook_path(name, index.repo.git_dir) if not os.access(hp, os.X_OK): - return + return None env = os.environ.copy() - env['GIT_INDEX_FILE'] = safe_decode(index.path) + env['GIT_INDEX_FILE'] = safe_decode(str(index.path)) env['GIT_EDITOR'] = ':' try: cmd = subprocess.Popen([hp] + list(args), @@ -86,14 +96,14 @@ def run_commit_hook(name, index, *args): except Exception as ex: raise HookExecutionError(hp, ex) from ex else: - stdout = [] - stderr = [] - handle_process_output(cmd, stdout.append, stderr.append, finalize_process) - stdout = ''.join(stdout) - stderr = ''.join(stderr) + stdout_list = [] # type: List[str] + stderr_list = [] # type: List[str] + handle_process_output(cmd, stdout_list.append, stderr_list.append, finalize_process) + stdout_str = ''.join(stderr_list) + stderr_str = ''.join(stderr_list) if cmd.returncode != 0: - stdout = force_text(stdout, defenc) - stderr = force_text(stderr, defenc) + stdout = force_text(stdout_str, defenc) + stderr = force_text(stderr_str, defenc) raise HookExecutionError(hp, cmd.returncode, stderr, stdout) # end handle return code @@ -108,7 +118,9 @@ def stat_mode_to_index_mode(mode): return S_IFREG | 0o644 | (mode & 0o111) # blobs with or without executable bit -def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1Writer): +def write_cache(entries: Sequence[Union[BaseIndexEntry, 'IndexEntry']], stream: IO[bytes], + extension_data: Union[None, bytes] = None, + ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer) -> None: """Write the cache represented by entries to a stream :param entries: **sorted** list of entries @@ -121,10 +133,10 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1 :param extension_data: any kind of data to write as a trailer, it must begin a 4 byte identifier, followed by its size ( 4 bytes )""" # wrap the stream into a compatible writer - stream = ShaStreamCls(stream) + stream_sha = ShaStreamCls(stream) - tell = stream.tell - write = stream.write + tell = stream_sha.tell + write = stream_sha.write # header version = 2 @@ -136,8 +148,8 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1 beginoffset = tell() write(entry[4]) # ctime write(entry[5]) # mtime - path = entry[3] - path = force_bytes(path, encoding=defenc) + path_str = entry[3] # type: str + path = force_bytes(path_str, encoding=defenc) plen = len(path) & CE_NAMEMASK # path length assert plen == len(path), "Path %s too long to fit into index" % entry[3] flags = plen | (entry[2] & CE_NAMEMASK_INV) # clear possible previous values @@ -150,18 +162,19 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1 # write previously cached extensions data if extension_data is not None: - stream.write(extension_data) + stream_sha.write(extension_data) # write the sha over the content - stream.write_sha() + stream_sha.write_sha() -def read_header(stream): +def read_header(stream: IO[bytes]) -> Tuple[int, int]: """Return tuple(version_long, num_entries) from the given stream""" type_id = stream.read(4) if type_id != b"DIRC": raise AssertionError("Invalid index file header: %r" % type_id) - version, num_entries = unpack(">LL", stream.read(4 * 2)) + unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2))) + version, num_entries = unpacked # TODO: handle version 3: extended data, see read-cache.c assert version in (1, 2) @@ -180,7 +193,7 @@ def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, i # END handle entry -def read_cache(stream): +def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'IndexEntry'], bytes, bytes]: """Read a cache file from the given stream :return: tuple(version, entries_dict, extension_data, content_sha) * version is the integer version number @@ -189,7 +202,7 @@ def read_cache(stream): * content_sha is a 20 byte sha on all cache file contents""" version, num_entries = read_header(stream) count = 0 - entries = {} + entries = {} # type: Dict[Tuple[PathLike, int], 'IndexEntry'] read = stream.read tell = stream.tell @@ -228,7 +241,8 @@ def read_cache(stream): return (version, entries, extension_data, content_sha) -def write_tree_from_cache(entries, odb, sl, si=0): +def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 + ) -> Tuple[bytes, List[Tuple[str, int, str]]]: """Create a tree from the given sorted list of entries and put the respective trees into the given object database @@ -238,7 +252,7 @@ def write_tree_from_cache(entries, odb, sl, si=0): :param sl: slice indicating the range we should process on the entries list :return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name""" - tree_items = [] + tree_items = [] # type: List[Tuple[Union[bytes, str], int, str]] tree_items_append = tree_items.append ci = sl.start end = sl.stop @@ -277,18 +291,19 @@ def write_tree_from_cache(entries, odb, sl, si=0): # finally create the tree sio = BytesIO() - tree_to_stream(tree_items, sio.write) + tree_to_stream(tree_items, sio.write) # converts bytes of each item[0] to str + tree_items_stringified = cast(List[Tuple[str, int, str]], tree_items) # type: List[Tuple[str, int, str]] sio.seek(0) istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio)) - return (istream.binsha, tree_items) + return (istream.binsha, tree_items_stringified) -def _tree_entry_to_baseindexentry(tree_entry, stage): +def _tree_entry_to_baseindexentry(tree_entry: Tuple[str, int, str], stage: int) -> BaseIndexEntry: return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])) -def aggressive_tree_merge(odb, tree_shas) -> List[BaseIndexEntry]: +def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]: """ :return: list of BaseIndexEntries representing the aggressive merge of the given trees. All valid entries are on stage 0, whereas the conflicting ones are left diff --git a/git/repo/base.py b/git/repo/base.py index 607eb868..e23ebb1a 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -36,7 +36,7 @@ import gitdb from git.types import TBD, PathLike, Lit_config_levels from typing import (Any, BinaryIO, Callable, Dict, - Iterator, List, Mapping, Optional, + Iterator, List, Mapping, Optional, Sequence, TextIO, Tuple, Type, Union, NamedTuple, cast, TYPE_CHECKING) @@ -536,7 +536,7 @@ class Repo(object): return self.head.commit.tree return self.rev_parse(str(rev) + "^{tree}") - def iter_commits(self, rev: Optional[TBD] = None, paths: Union[PathLike, List[PathLike]] = '', + def iter_commits(self, rev: Optional[TBD] = None, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any) -> Iterator[Commit]: """A list of Commit objects representing the history of a given ref/commit diff --git a/git/util.py b/git/util.py index 581bf877..76aaee49 100644 --- a/git/util.py +++ b/git/util.py @@ -377,6 +377,7 @@ def expand_path(p: None, expand_vars: bool = ...) -> None: @overload def expand_path(p: PathLike, expand_vars: bool = ...) -> str: + # improve these overloads when 3.5 dropped ... |