diff options
| -rw-r--r-- | VERSION | 2 | ||||
| -rw-r--r-- | git/refs/symbolic.py | 4 | ||||
| -rw-r--r-- | git/repo/base.py | 281 | ||||
| -rw-r--r-- | git/repo/fun.py | 38 | ||||
| -rw-r--r-- | requirements.txt | 1 | ||||
| -rw-r--r-- | test-requirements.txt | 1 | ||||
| -rw-r--r-- | tox.ini | 3 | 
7 files changed, 203 insertions, 127 deletions
@@ -1 +1 @@ -3.1.13 +3.1.14 diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py index 60cfe554..22d9c1d5 100644 --- a/git/refs/symbolic.py +++ b/git/refs/symbolic.py @@ -87,7 +87,7 @@ class SymbolicReference(object):          """Returns an iterator yielding pairs of sha1/path pairs (as bytes) for the corresponding refs.          :note: The packed refs file will be kept open as long as we iterate"""          try: -            with open(cls._get_packed_refs_path(repo), 'rt') as fp: +            with open(cls._get_packed_refs_path(repo), 'rt', encoding='UTF-8') as fp:                  for line in fp:                      line = line.strip()                      if not line: @@ -513,7 +513,7 @@ class SymbolicReference(object):          return ref      @classmethod -    def create(cls, repo, path, reference='HEAD', force=False, logmsg=None): +    def create(cls, repo, path, reference='HEAD', force=False, logmsg=None, **kwargs):          """Create a new symbolic reference, hence a reference pointing to another reference.          :param repo: diff --git a/git/repo/base.py b/git/repo/base.py index 8f1ef0a6..99e87643 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -4,7 +4,6 @@  # This module is part of GitPython and is released under  # the BSD License: http://www.opensource.org/licenses/bsd-license.php -from collections import namedtuple  import logging  import os  import re @@ -33,18 +32,44 @@ from .fun import rev_parse, is_git_dir, find_submodule_git_dir, touch, find_work  import gc  import gitdb -try: -    import pathlib -except ImportError: -    pathlib = None - +# typing ------------------------------------------------------ + +from git.types import TBD, PathLike +from typing_extensions import Literal +from typing import (Any, +                    BinaryIO, +                    Callable, +                    Dict, +                    Iterator, +                    List, +                    Mapping, +                    Optional, +                    TextIO, +                    Tuple, +                    Type, +                    Union, +                    NamedTuple, cast, TYPE_CHECKING) + +if TYPE_CHECKING:  # only needed for types +    from git.util import IterableList +    from git.refs.symbolic import SymbolicReference +    from git.objects import TagObject, Blob, Tree  # NOQA: F401 + +Lit_config_levels = Literal['system', 'global', 'user', 'repository'] + +# -----------------------------------------------------------  log = logging.getLogger(__name__) -BlameEntry = namedtuple('BlameEntry', ['commit', 'linenos', 'orig_path', 'orig_linenos']) +__all__ = ('Repo',) -__all__ = ('Repo',) +BlameEntry = NamedTuple('BlameEntry', [ +    ('commit', Dict[str, TBD]), +    ('linenos', range), +    ('orig_path', Optional[str]), +    ('orig_linenos', range)] +)  class Repo(object): @@ -63,11 +88,11 @@ class Repo(object):      'git_dir' is the .git repository directory, which is always set."""      DAEMON_EXPORT_FILE = 'git-daemon-export-ok' -    git = None  # Must exist, or  __del__  will fail in case we raise on `__init__()` -    working_dir = None -    _working_tree_dir = None -    git_dir = None -    _common_dir = None +    git = cast('Git', None)  # Must exist, or  __del__  will fail in case we raise on `__init__()` +    working_dir = None    # type: Optional[PathLike] +    _working_tree_dir = None  # type: Optional[PathLike] +    git_dir = None  # type: Optional[PathLike] +    _common_dir = None  # type: Optional[PathLike]      # precompiled regex      re_whitespace = re.compile(r'\s+') @@ -79,13 +104,14 @@ class Repo(object):      # invariants      # represents the configuration level of a configuration file -    config_level = ("system", "user", "global", "repository") +    config_level = ("system", "user", "global", "repository")  # type: Tuple[Lit_config_levels, ...]      # Subclass configuration      # Subclasses may easily bring in their own custom types by placing a constructor or type here      GitCommandWrapperType = Git -    def __init__(self, path=None, odbt=GitCmdObjectDB, search_parent_directories=False, expand_vars=True): +    def __init__(self, path: Optional[PathLike] = None, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB, +                 search_parent_directories: bool = False, expand_vars: bool = True) -> None:          """Create a new Repo instance          :param path: @@ -126,8 +152,9 @@ class Repo(object):              warnings.warn("The use of environment variables in paths is deprecated" +                            "\nfor security reasons and may be removed in the future!!")          epath = expand_path(epath, expand_vars) -        if not os.path.exists(epath): -            raise NoSuchPathError(epath) +        if epath is not None: +            if not os.path.exists(epath): +                raise NoSuchPathError(epath)          ## Walk up the path to find the `.git` dir.          # @@ -178,6 +205,7 @@ class Repo(object):          # END while curpath          if self.git_dir is None: +            self.git_dir = cast(PathLike, self.git_dir)              raise InvalidGitRepositoryError(epath)          self._bare = False @@ -190,7 +218,7 @@ class Repo(object):          try:              common_dir = open(osp.join(self.git_dir, 'commondir'), 'rt').readlines()[0].strip()              self._common_dir = osp.join(self.git_dir, common_dir) -        except (OSError, IOError): +        except OSError:              self._common_dir = None          # adjust the wd in case we are actually bare - we didn't know that @@ -199,7 +227,7 @@ class Repo(object):              self._working_tree_dir = None          # END working dir handling -        self.working_dir = self._working_tree_dir or self.common_dir +        self.working_dir = self._working_tree_dir or self.common_dir  # type: Optional[PathLike]          self.git = self.GitCommandWrapperType(self.working_dir)          # special handling, in special times @@ -208,19 +236,19 @@ class Repo(object):              args.append(self.git)          self.odb = odbt(*args) -    def __enter__(self): +    def __enter__(self) -> 'Repo':          return self -    def __exit__(self, exc_type, exc_value, traceback): +    def __exit__(self, exc_type: TBD, exc_value: TBD, traceback: TBD) -> None:          self.close() -    def __del__(self): +    def __del__(self) -> None:          try:              self.close()          except Exception:              pass -    def close(self): +    def close(self) -> None:          if self.git:              self.git.clear_cache()              # Tempfiles objects on Windows are holding references to @@ -235,25 +263,26 @@ class Repo(object):              if is_win:                  gc.collect() -    def __eq__(self, rhs): -        if isinstance(rhs, Repo): +    def __eq__(self, rhs: object) -> bool: +        if isinstance(rhs, Repo) and self.git_dir:              return self.git_dir == rhs.git_dir          return False -    def __ne__(self, rhs): +    def __ne__(self, rhs: object) -> bool:          return not self.__eq__(rhs) -    def __hash__(self): +    def __hash__(self) -> int:          return hash(self.git_dir)      # Description property -    def _get_description(self): -        filename = osp.join(self.git_dir, 'description') +    def _get_description(self) -> str: +        filename = osp.join(self.git_dir, 'description') if self.git_dir else ""          with open(filename, 'rb') as fp:              return fp.read().rstrip().decode(defenc) -    def _set_description(self, descr): -        filename = osp.join(self.git_dir, 'description') +    def _set_description(self, descr: str) -> None: + +        filename = osp.join(self.git_dir, 'description') if self.git_dir else ""          with open(filename, 'wb') as fp:              fp.write((descr + '\n').encode(defenc)) @@ -263,25 +292,31 @@ class Repo(object):      del _set_description      @property -    def working_tree_dir(self): +    def working_tree_dir(self) -> Optional[PathLike]:          """:return: The working tree directory of our git repository. If this is a bare repository, None is returned.          """          return self._working_tree_dir      @property -    def common_dir(self): +    def common_dir(self) -> PathLike:          """          :return: The git dir that holds everything except possibly HEAD,              FETCH_HEAD, ORIG_HEAD, COMMIT_EDITMSG, index, and logs/.""" -        return self._common_dir or self.git_dir +        if self._common_dir: +            return self._common_dir +        elif self.git_dir: +            return self.git_dir +        else: +            # or could return "" +            raise InvalidGitRepositoryError()      @property -    def bare(self): +    def bare(self) -> bool:          """:return: True if the repository is bare"""          return self._bare      @property -    def heads(self): +    def heads(self) -> 'IterableList':          """A list of ``Head`` objects representing the branch heads in          this repo @@ -289,7 +324,7 @@ class Repo(object):          return Head.list_items(self)      @property -    def references(self): +    def references(self) -> 'IterableList':          """A list of Reference objects representing tags, heads and remote references.          :return: IterableList(Reference, ...)""" @@ -302,24 +337,24 @@ class Repo(object):      branches = heads      @property -    def index(self): +    def index(self) -> 'IndexFile':          """:return: IndexFile representing this repository's index.          :note: This property can be expensive, as the returned ``IndexFile`` will be           reinitialized. It's recommended to re-use the object."""          return IndexFile(self)      @property -    def head(self): +    def head(self) -> 'HEAD':          """:return: HEAD Object pointing to the current head reference"""          return HEAD(self, 'HEAD')      @property -    def remotes(self): +    def remotes(self) -> 'IterableList':          """A list of Remote objects allowing to access and manipulate remotes          :return: ``git.IterableList(Remote, ...)``"""          return Remote.list_items(self) -    def remote(self, name='origin'): +    def remote(self, name: str = 'origin') -> 'Remote':          """:return: Remote with the specified name          :raise ValueError:  if no remote with such a name exists"""          r = Remote(self, name) @@ -330,13 +365,13 @@ class Repo(object):      #{ Submodules      @property -    def submodules(self): +    def submodules(self) -> 'IterableList':          """          :return: git.IterableList(Submodule, ...) of direct submodules              available from the current head"""          return Submodule.list_items(self) -    def submodule(self, name): +    def submodule(self, name: str) -> 'IterableList':          """ :return: Submodule with the given name          :raise ValueError: If no such submodule exists"""          try: @@ -345,7 +380,7 @@ class Repo(object):              raise ValueError("Didn't find submodule named %r" % name) from e          # END exception handling -    def create_submodule(self, *args, **kwargs): +    def create_submodule(self, *args: Any, **kwargs: Any) -> Submodule:          """Create a new submodule          :note: See the documentation of Submodule.add for a description of the @@ -353,13 +388,13 @@ class Repo(object):          :return: created submodules"""          return Submodule.add(self, *args, **kwargs) -    def iter_submodules(self, *args, **kwargs): +    def iter_submodules(self, *args: Any, **kwargs: Any) -> Iterator:          """An iterator yielding Submodule instances, see Traversable interface          for a description of args and kwargs          :return: Iterator"""          return RootModule(self).traverse(*args, **kwargs) -    def submodule_update(self, *args, **kwargs): +    def submodule_update(self, *args: Any, **kwargs: Any) -> Iterator:          """Update the submodules, keeping the repository consistent as it will          take the previous state into consideration. For more information, please          see the documentation of RootModule.update""" @@ -368,41 +403,45 @@ class Repo(object):      #}END submodules      @property -    def tags(self): +    def tags(self) -> 'IterableList':          """A list of ``Tag`` objects that are available in this repo          :return: ``git.IterableList(TagReference, ...)`` """          return TagReference.list_items(self) -    def tag(self, path): +    def tag(self, path: PathLike) -> TagReference:          """:return: TagReference Object, reference pointing to a Commit or Tag          :param path: path to the tag reference, i.e. 0.1.5 or tags/0.1.5 """          return TagReference(self, path) -    def create_head(self, path, commit='HEAD', force=False, logmsg=None): +    def create_head(self, path: PathLike, commit: str = 'HEAD', +                    force: bool = False, logmsg: Optional[str] = None +                    ) -> 'SymbolicReference':          """Create a new head within the repository.          For more documentation, please see the Head.create method.          :return: newly created Head Reference"""          return Head.create(self, path, commit, force, logmsg) -    def delete_head(self, *heads, **kwargs): +    def delete_head(self, *heads: HEAD, **kwargs: Any) -> None:          """Delete the given heads          :param kwargs: Additional keyword arguments to be passed to git-branch"""          return Head.delete(self, *heads, **kwargs) -    def create_tag(self, path, ref='HEAD', message=None, force=False, **kwargs): +    def create_tag(self, path: PathLike, ref: str = 'HEAD', +                   message: Optional[str] = None, force: bool = False, **kwargs: Any +                   ) -> TagReference:          """Create a new tag reference.          For more documentation, please see the TagReference.create method.          :return: TagReference object """          return TagReference.create(self, path, ref, message, force, **kwargs) -    def delete_tag(self, *tags): +    def delete_tag(self, *tags: TBD) -> None:          """Delete the given tag references"""          return TagReference.delete(self, *tags) -    def create_remote(self, name, url, **kwargs): +    def create_remote(self, name: str, url: PathLike, **kwargs: Any) -> Remote:          """Create a new remote.          For more information, please see the documentation of the Remote.create @@ -411,11 +450,11 @@ class Repo(object):          :return: Remote reference"""          return Remote.create(self, name, url, **kwargs) -    def delete_remote(self, remote): +    def delete_remote(self, remote: 'Remote') -> Type['Remote']:          """Delete the given remote."""          return Remote.remove(self, remote) -    def _get_config_path(self, config_level): +    def _get_config_path(self, config_level: Lit_config_levels) -> str:          # we do not support an absolute path of the gitconfig on windows ,          # use the global config instead          if is_win and config_level == "system": @@ -429,11 +468,16 @@ class Repo(object):          elif config_level == "global":              return osp.normpath(osp.expanduser("~/.gitconfig"))          elif config_level == "repository": -            return osp.normpath(osp.join(self._common_dir or self.git_dir, "config")) +            if self._common_dir: +                return osp.normpath(osp.join(self._common_dir, "config")) +            elif self.git_dir: +                return osp.normpath(osp.join(self.git_dir, "config")) +            else: +                raise NotADirectoryError          raise ValueError("Invalid configuration level: %r" % config_level) -    def config_reader(self, config_level=None): +    def config_reader(self, config_level: Optional[Lit_config_levels] = None) -> GitConfigParser:          """          :return:              GitConfigParser allowing to read the full git configuration, but not to write it @@ -454,7 +498,7 @@ class Repo(object):              files = [self._get_config_path(config_level)]          return GitConfigParser(files, read_only=True, repo=self) -    def config_writer(self, config_level="repository"): +    def config_writer(self, config_level: Lit_config_levels = "repository") -> GitConfigParser:          """          :return:              GitConfigParser allowing to write values of the specified configuration file level. @@ -469,7 +513,8 @@ class Repo(object):              repository = configuration file for this repository only"""          return GitConfigParser(self._get_config_path(config_level), read_only=False, repo=self) -    def commit(self, rev=None): +    def commit(self, rev: Optional[TBD] = None +               ) -> Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree', None]:          """The Commit object for the specified revision          :param rev: revision specifier, see git-rev-parse for viable options. @@ -479,12 +524,12 @@ class Repo(object):              return self.head.commit          return self.rev_parse(str(rev) + "^0") -    def iter_trees(self, *args, **kwargs): +    def iter_trees(self, *args: Any, **kwargs: Any) -> Iterator['Tree']:          """:return: Iterator yielding Tree objects          :note: Takes all arguments known to iter_commits method"""          return (c.tree for c in self.iter_commits(*args, **kwargs)) -    def tree(self, rev=None): +    def tree(self, rev: Union['Commit', 'Tree', None] = None) -> 'Tree':          """The Tree object for the given treeish revision          Examples:: @@ -501,7 +546,8 @@ class Repo(object):              return self.head.commit.tree          return self.rev_parse(str(rev) + "^{tree}") -    def iter_commits(self, rev=None, paths='', **kwargs): +    def iter_commits(self, rev: Optional[TBD] = None, paths: Union[PathLike, List[PathLike]] = '', +                     **kwargs: Any) -> Iterator[Commit]:          """A list of Commit objects representing the history of a given ref/commit          :param rev: @@ -525,7 +571,8 @@ class Repo(object):          return Commit.iter_items(self, rev, paths, **kwargs) -    def merge_base(self, *rev, **kwargs): +    def merge_base(self, *rev: TBD, **kwargs: Any +                   ) -> List[Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree', None]]:          """Find the closest common ancestor for the given revision (e.g. Commits, Tags, References, etc)          :param rev: At least two revs to find the common ancestor for. @@ -538,9 +585,9 @@ class Repo(object):              raise ValueError("Please specify at least two revs, got only %i" % len(rev))          # end handle input -        res = [] +        res = []  # type: List[Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree', None]]          try: -            lines = self.git.merge_base(*rev, **kwargs).splitlines() +            lines = self.git.merge_base(*rev, **kwargs).splitlines()  # List[str]          except GitCommandError as err:              if err.status == 128:                  raise @@ -556,7 +603,7 @@ class Repo(object):          return res -    def is_ancestor(self, ancestor_rev, rev): +    def is_ancestor(self, ancestor_rev: 'Commit', rev: 'Commit') -> bool:          """Check if a commit is an ancestor of another          :param ancestor_rev: Rev which should be an ancestor @@ -571,12 +618,12 @@ class Repo(object):              raise          return True -    def _get_daemon_export(self): -        filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) +    def _get_daemon_export(self) -> bool: +        filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) if self.git_dir else ""          return osp.exists(filename) -    def _set_daemon_export(self, value): -        filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) +    def _set_daemon_export(self, value: object) -> None: +        filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) if self.git_dir else ""          fileexists = osp.exists(filename)          if value and not fileexists:              touch(filename) @@ -588,11 +635,11 @@ class Repo(object):      del _get_daemon_export      del _set_daemon_export -    def _get_alternates(self): +    def _get_alternates(self) -> List[str]:          """The list of alternates for this repo from which objects can be retrieved          :return: list of strings being pathnames of alternates""" -        alternates_path = osp.join(self.git_dir, 'objects', 'info', 'alternates') +        alternates_path = osp.join(self.git_dir, 'objects', 'info', 'alternates') if self.git_dir else ""          if osp.exists(alternates_path):              with open(alternates_path, 'rb') as f: @@ -600,7 +647,7 @@ class Repo(object):              return alts.strip().splitlines()          return [] -    def _set_alternates(self, alts): +    def _set_alternates(self, alts: List[str]) -> None:          """Sets the alternates          :param alts: @@ -622,8 +669,8 @@ class Repo(object):      alternates = property(_get_alternates, _set_alternates,                            doc="Retrieve a list of alternates paths or set a list paths to be used as alternates") -    def is_dirty(self, index=True, working_tree=True, untracked_files=False, -                 submodules=True, path=None): +    def is_dirty(self, index: bool = True, working_tree: bool = True, untracked_files: bool = False, +                 submodules: bool = True, path: Optional[PathLike] = None) -> bool:          """          :return:              ``True``, the repository is considered dirty. By default it will react @@ -639,7 +686,7 @@ class Repo(object):          if not submodules:              default_args.append('--ignore-submodules')          if path: -            default_args.extend(["--", path]) +            default_args.extend(["--", str(path)])          if index:              # diff index against HEAD              if osp.isfile(self.index.path) and \ @@ -658,7 +705,7 @@ class Repo(object):          return False      @property -    def untracked_files(self): +    def untracked_files(self) -> List[str]:          """          :return:              list(str,...) @@ -673,7 +720,7 @@ class Repo(object):              consider caching it yourself."""          return self._get_untracked_files() -    def _get_untracked_files(self, *args, **kwargs): +    def _get_untracked_files(self, *args: Any, **kwargs: Any) -> List[str]:          # make sure we get all files, not only untracked directories          proc = self.git.status(*args,                                 porcelain=True, @@ -697,7 +744,7 @@ class Repo(object):          finalize_process(proc)          return untracked_files -    def ignored(self, *paths): +    def ignored(self, *paths: PathLike) -> List[PathLike]:          """Checks if paths are ignored via .gitignore          Doing so using the "git check-ignore" method. @@ -711,13 +758,13 @@ class Repo(object):          return proc.replace("\\\\", "\\").replace('"', "").split("\n")      @property -    def active_branch(self): +    def active_branch(self) -> 'SymbolicReference':          """The name of the currently active branch.          :return: Head to the active branch"""          return self.head.reference -    def blame_incremental(self, rev, file, **kwargs): +    def blame_incremental(self, rev: TBD, file: TBD, **kwargs: Any) -> Optional[Iterator['BlameEntry']]:          """Iterator for blame information for the given file at the given revision.          Unlike .blame(), this does not return the actual file's contents, only @@ -732,7 +779,7 @@ class Repo(object):          should get a continuous range spanning all line numbers in the file.          """          data = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs) -        commits = {} +        commits = {}  # type: Dict[str, TBD]          stream = (line for line in data.split(b'\n') if line)          while True: @@ -740,10 +787,11 @@ class Repo(object):                  line = next(stream)  # when exhausted, causes a StopIteration, terminating this function              except StopIteration:                  return -            hexsha, orig_lineno, lineno, num_lines = line.split() -            lineno = int(lineno) -            num_lines = int(num_lines) -            orig_lineno = int(orig_lineno) +            split_line = line.split()  # type: Tuple[str, str, str, str] +            hexsha, orig_lineno_str, lineno_str, num_lines_str = split_line +            lineno = int(lineno_str) +            num_lines = int(num_lines_str) +            orig_lineno = int(orig_lineno_str)              if hexsha not in commits:                  # Now read the next few lines and build up a dict of properties                  # for this commit @@ -791,22 +839,24 @@ class Repo(object):                               safe_decode(orig_filename),                               range(orig_lineno, orig_lineno + num_lines)) -    def blame(self, rev, file, incremental=False, **kwargs): +    def blame(self, rev: TBD, file: TBD, incremental: bool = False, **kwargs: Any +              ) -> Union[List[List[Union[Optional['Commit'], List[str]]]], Optional[Iterator[BlameEntry]]]:          """The blame information for the given file at the given revision.          :param rev: revision specifier, see git-rev-parse for viable options.          :return:              list: [git.Commit, list: [<line>]] -            A list of tuples associating a Commit object with a list of lines that +            A list of lists associating a Commit object with a list of lines that              changed within the given commit. The Commit objects will be given in order              of appearance."""          if incremental:              return self.blame_incremental(rev, file, **kwargs)          data = self.git.blame(rev, '--', file, p=True, stdout_as_string=False, **kwargs) -        commits = {} -        blames = [] -        info = None +        commits = {}  # type: Dict[str, Any] +        blames = []  # type: List[List[Union[Optional['Commit'], List[str]]]] + +        info = {}  # type: Dict[str, Any]  # use Any until TypedDict available          keepends = True          for line in data.splitlines(keepends): @@ -891,7 +941,8 @@ class Repo(object):                                  pass                              # end handle line contents                              blames[-1][0] = c -                            blames[-1][1].append(line) +                            if blames[-1][1] is not None: +                                blames[-1][1].append(line)                              info = {'id': sha}                          # END if we collected commit info                      # END distinguish filename,summary,rest @@ -900,7 +951,8 @@ class Repo(object):          return blames      @classmethod -    def init(cls, path=None, mkdir=True, odbt=GitCmdObjectDB, expand_vars=True, **kwargs): +    def init(cls, path: PathLike = None, mkdir: bool = True, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB, +             expand_vars: bool = True, **kwargs: Any) -> 'Repo':          """Initialize a git repository at the given path if specified          :param path: @@ -938,9 +990,10 @@ class Repo(object):          return cls(path, odbt=odbt)      @classmethod -    def _clone(cls, git, url, path, odb_default_type, progress, multi_options=None, **kwargs): -        if progress is not None: -            progress = to_progress_instance(progress) +    def _clone(cls, git: 'Git', url: PathLike, path: PathLike, odb_default_type: Type[GitCmdObjectDB], +               progress: Optional[Callable], multi_options: Optional[List[str]] = None, **kwargs: Any +               ) -> 'Repo': +        progress_checked = to_progress_instance(progress)          odbt = kwargs.pop('odbt', odb_default_type) @@ -964,9 +1017,10 @@ class Repo(object):          if multi_options:              multi = ' '.join(multi_options).split(' ')          proc = git.clone(multi, Git.polish_url(url), clone_path, with_extended_output=True, as_process=True, -                         v=True, universal_newlines=True, **add_progress(kwargs, git, progress)) -        if progress: -            handle_process_output(proc, None, progress.new_message_handler(), finalize_process, decode_streams=False) +                         v=True, universal_newlines=True, **add_progress(kwargs, git, progress_checked)) +        if progress_checked: +            handle_process_output(proc, None, progress_checked.new_message_handler(), +                                  finalize_process, decode_streams=False)          else:              (stdout, stderr) = proc.communicate()              log.debug("Cmd(%s)'s unused stdout: %s", getattr(proc, 'args', ''), stdout) @@ -974,8 +1028,8 @@ class Repo(object):          # our git command could have a different working dir than our actual          # environment, hence we prepend its working dir if required -        if not osp.isabs(path) and git.working_dir: -            path = osp.join(git._working_dir, path) +        if not osp.isabs(path): +            path = osp.join(git._working_dir, path) if git._working_dir is not None else path          repo = cls(path, odbt=odbt) @@ -993,7 +1047,8 @@ class Repo(object):          # END handle remote repo          return repo -    def clone(self, path, progress=None, multi_options=None, **kwargs): +    def clone(self, path: PathLike, progress: Optional[Callable] = None, +              multi_options: Optional[List[str]] = None, **kwargs: Any) -> 'Repo':          """Create a clone from this repository.          :param path: is the full path of the new repo (traditionally ends with ./<name>.git). @@ -1011,7 +1066,9 @@ class Repo(object):          return self._clone(self.git, self.common_dir, path, type(self.odb), progress, multi_options, **kwargs)      @classmethod -    def clone_from(cls, url, to_path, progress=None, env=None, multi_options=None, **kwargs): +    def clone_from(cls, url: PathLike, to_path: PathLike, progress: Optional[Callable] = None, +                   env: Optional[Mapping[str, Any]] = None, +                   multi_options: Optional[List[str]] = None, **kwargs: Any) -> 'Repo':          """Create a clone from the given URL          :param url: valid git url, see http://www.kernel.org/pub/software/scm/git/docs/git-clone.html#URLS @@ -1031,7 +1088,8 @@ class Repo(object):              git.update_environment(**env)          return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs) -    def archive(self, ostream, treeish=None, prefix=None, **kwargs): +    def archive(self, ostream: Union[TextIO, BinaryIO], treeish: Optional[str] = None, +                prefix: Optional[str] = None, **kwargs: Any) -> 'Repo':          """Archive the tree at the given revision.          :param ostream: file compatible stream object to which the archive will be written as bytes @@ -1052,14 +1110,14 @@ class Repo(object):              kwargs['prefix'] = prefix          kwargs['output_stream'] = ostream          path = kwargs.pop('path', []) +        path = cast(Union[PathLike, List[PathLike], Tuple[PathLike, ...]], path)          if not isinstance(path, (tuple, list)):              path = [path]          # end assure paths is list -          self.git.archive(treeish, *path, **kwargs)          return self -    def has_separate_working_tree(self): +    def has_separate_working_tree(self) -> bool:          """          :return: True if our git_dir is not at the root of our working_tree_dir, but a .git file with a              platform agnositic symbolic link. Our git_dir will be wherever the .git file points to @@ -1067,21 +1125,24 @@ class Repo(object):          """          if self.bare:              return False -        return osp.isfile(osp.join(self.working_tree_dir, '.git')) +        if self.working_tree_dir: +            return osp.isfile(osp.join(self.working_tree_dir, '.git')) +        else: +            return False  # or raise Error?      rev_parse = rev_parse -    def __repr__(self): +    def __repr__(self) -> str:          clazz = self.__class__          return '<%s.%s %r>' % (clazz.__module__, clazz.__name__, self.git_dir) -    def currently_rebasing_on(self): +    def currently_rebasing_on(self) -> Union['SymbolicReference', Commit, 'TagObject', 'Blob', 'Tree', None]:          """          :return: The commit which is currently being replayed while rebasing.          None if we are not currently rebasing.          """ -        rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD") +        rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD") if self.git_dir else ""          if not osp.isfile(rebase_head_file):              return None          return self.commit(open(rebase_head_file, "rt").readline().strip()) diff --git a/git/repo/fun.py b/git/repo/fun.py index 714d4122..70394081 100644 --- a/git/repo/fun.py +++ b/git/repo/fun.py @@ -1,4 +1,5 @@  """Package with general repository related functions""" +from git.refs.tag import Tag  import os  import stat  from string import digits @@ -15,18 +16,28 @@ from gitdb.exc import (  import os.path as osp  from git.cmd import Git +# Typing ---------------------------------------------------------------------- + +from typing import AnyStr, Union, Optional, cast, TYPE_CHECKING +from git.types import PathLike +if TYPE_CHECKING: +    from .base import Repo +    from git.db import GitCmdObjectDB +    from git.objects import Commit, TagObject, Blob, Tree + +# ----------------------------------------------------------------------------  __all__ = ('rev_parse', 'is_git_dir', 'touch', 'find_submodule_git_dir', 'name_to_object', 'short_to_long', 'deref_tag',             'to_commit', 'find_worktree_git_dir') -def touch(filename): +def touch(filename: str) -> str:      with open(filename, "ab"):          pass      return filename -def is_git_dir(d): +def is_git_dir(d: PathLike) -> bool:      """ This is taken from the git setup.c:is_git_directory      function. @@ -48,7 +59,7 @@ def is_git_dir(d):      return False -def find_worktree_git_dir(dotgit): +def find_worktree_git_dir(dotgit: PathLike) -> Optional[str]:      """Search for a gitdir for this worktree."""      try:          statbuf = os.stat(dotgit) @@ -67,7 +78,7 @@ def find_worktree_git_dir(dotgit):      return None -def find_submodule_git_dir(d): +def find_submodule_git_dir(d: PathLike) -> Optional[PathLike]:      """Search for a submodule repo."""      if is_git_dir(d):          return d @@ -75,7 +86,7 @@ def find_submodule_git_dir(d):      try:          with open(d) as fp:              content = fp.read().rstrip() -    except (IOError, OSError): +    except IOError:          # it's probably not a file          pass      else: @@ -92,7 +103,7 @@ def find_submodule_git_dir(d):      return None -def short_to_long(odb, hexsha): +def short_to_long(odb: 'GitCmdObjectDB', hexsha: AnyStr) -> Optional[bytes]:      """:return: long hexadecimal sha1 from the given less-than-40 byte hexsha          or None if no candidate could be found.      :param hexsha: hexsha with less than 40 byte""" @@ -103,14 +114,15 @@ def short_to_long(odb, hexsha):      # END exception handling -def name_to_object(repo, name, return_ref=False): +def name_to_object(repo: 'Repo', name: str, return_ref: bool = False +                   ) -> Union[SymbolicReference, 'Commit', 'TagObject', 'Blob', 'Tree']:      """      :return: object specified by the given name, hexshas ( short and long )          as well as references are supported      :param return_ref: if name specifies a reference, we will return the reference          instead of the object. Otherwise it will raise BadObject or BadName      """ -    hexsha = None +    hexsha = None  # type: Union[None, str, bytes]      # is it a hexsha ? Try the most common ones, which is 7 to 40      if repo.re_hexsha_shortened.match(name): @@ -150,7 +162,7 @@ def name_to_object(repo, name, return_ref=False):      return Object.new_from_sha(repo, hex_to_bin(hexsha)) -def deref_tag(tag): +def deref_tag(tag: Tag) -> 'TagObject':      """Recursively dereference a tag and return the resulting object"""      while True:          try: @@ -161,7 +173,7 @@ def deref_tag(tag):      return tag -def to_commit(obj): +def to_commit(obj: Object) -> Union['Commit', 'TagObject']:      """Convert the given object to a commit if possible and return it"""      if obj.type == 'tag':          obj = deref_tag(obj) @@ -172,7 +184,7 @@ def to_commit(obj):      return obj -def rev_parse(repo, rev): +def rev_parse(repo: 'Repo', rev: str) -> Union['Commit', 'Tag', 'Tree', 'Blob']:      """      :return: Object at the given revision, either Commit, Tag, Tree or Blob      :param rev: git-rev-parse compatible revision specification as string, please see @@ -188,7 +200,7 @@ def rev_parse(repo, rev):          raise NotImplementedError("commit by message search ( regex )")      # END handle search -    obj = None +    obj = cast(Object, None)   # not ideal. Should use guards      ref = None      output_type = "commit"      start = 0 @@ -238,7 +250,7 @@ def rev_parse(repo, rev):                      pass    # error raised later                  # END exception handling              elif output_type in ('', 'blob'): -                if obj.type == 'tag': +                if obj and obj.type == 'tag':                      obj = deref_tag(obj)                  else:                      # cannot do anything for non-tags diff --git a/requirements.txt b/requirements.txt index c4e8340d..626a916a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@  gitdb>=4.0.1,<5 +typing-extensions>=3.7.4.0 diff --git a/test-requirements.txt b/test-requirements.txt index abda95cf..55206899 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -4,3 +4,4 @@ flake8  tox  virtualenv  nose +typing-extensions>=3.7.4.0 @@ -23,6 +23,7 @@ commands = {posargs}  # E266 = too many leading '#' for block comment  # E731 = do not assign a lambda expression, use a def  # W293 = Blank line contains whitespace -ignore = E265,W293,E266,E731 +# W504 = Line break after operator +ignore = E265,W293,E266,E731, W504  max-line-length = 120  exclude = .tox,.venv,build,dist,doc,git/ext/  | 
