diff options
author | Sebastian Thiel <sebastian.thiel@icloud.com> | 2022-05-18 08:17:32 +0800 |
---|---|---|
committer | Sebastian Thiel <sebastian.thiel@icloud.com> | 2022-05-18 08:17:32 +0800 |
commit | 38e9a18b976b2b7e3b3cd0fcd5b037573823d7c2 (patch) | |
tree | 8ac0bc1540650cf3befff11735e1421de372fa92 /git/repo/base.py | |
parent | b30720ee4d9762a03eae4fa7cfa4b0190d81784d (diff) | |
parent | 43c00af7e542911cce638cfab49c6a6dc9349e55 (diff) | |
download | gitpython-38e9a18b976b2b7e3b3cd0fcd5b037573823d7c2.tar.gz |
Merge branch 'black-fmt'
Diffstat (limited to 'git/repo/base.py')
-rw-r--r-- | git/repo/base.py | 525 |
1 files changed, 323 insertions, 202 deletions
diff --git a/git/repo/base.py b/git/repo/base.py index bea0dcb5..111a350e 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -13,10 +13,7 @@ from gitdb.db.loose import LooseObjectDB from gitdb.exc import BadObject -from git.cmd import ( - Git, - handle_process_output -) +from git.cmd import Git, handle_process_output from git.compat import ( defenc, safe_decode, @@ -29,20 +26,54 @@ from git.index import IndexFile from git.objects import Submodule, RootModule, Commit from git.refs import HEAD, Head, Reference, TagReference from git.remote import Remote, add_progress, to_progress_instance -from git.util import Actor, finalize_process, decygpath, hex_to_bin, expand_path, remove_password_if_present +from git.util import ( + Actor, + finalize_process, + decygpath, + hex_to_bin, + expand_path, + remove_password_if_present, +) import os.path as osp -from .fun import rev_parse, is_git_dir, find_submodule_git_dir, touch, find_worktree_git_dir +from .fun import ( + rev_parse, + is_git_dir, + find_submodule_git_dir, + touch, + find_worktree_git_dir, +) import gc import gitdb # typing ------------------------------------------------------ -from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish, assert_never -from typing import (Any, BinaryIO, Callable, Dict, - Iterator, List, Mapping, Optional, Sequence, - TextIO, Tuple, Type, Union, - NamedTuple, cast, TYPE_CHECKING) +from git.types import ( + TBD, + PathLike, + Lit_config_levels, + Commit_ish, + Tree_ish, + assert_never, +) +from typing import ( + Any, + BinaryIO, + Callable, + Dict, + Iterator, + List, + Mapping, + Optional, + Sequence, + TextIO, + Tuple, + Type, + Union, + NamedTuple, + cast, + TYPE_CHECKING, +) from git.types import ConfigLevels_Tup, TypedDict @@ -57,11 +88,11 @@ if TYPE_CHECKING: log = logging.getLogger(__name__) -__all__ = ('Repo',) +__all__ = ("Repo",) class BlameEntry(NamedTuple): - commit: Dict[str, 'Commit'] + commit: Dict[str, "Commit"] linenos: range orig_path: Optional[str] orig_linenos: range @@ -81,21 +112,22 @@ class Repo(object): if we are a bare repository. 'git_dir' is the .git repository directory, which is always set.""" - DAEMON_EXPORT_FILE = 'git-daemon-export-ok' - git = cast('Git', None) # Must exist, or __del__ will fail in case we raise on `__init__()` + DAEMON_EXPORT_FILE = "git-daemon-export-ok" + + git = cast("Git", None) # Must exist, or __del__ will fail in case we raise on `__init__()` working_dir: Optional[PathLike] = None _working_tree_dir: Optional[PathLike] = None git_dir: PathLike = "" _common_dir: PathLike = "" # precompiled regex - re_whitespace = re.compile(r'\s+') - re_hexsha_only = re.compile('^[0-9A-Fa-f]{40}$') - re_hexsha_shortened = re.compile('^[0-9A-Fa-f]{4,40}$') - re_envvars = re.compile(r'(\$(\{\s?)?[a-zA-Z_]\w*(\}\s?)?|%\s?[a-zA-Z_]\w*\s?%)') - re_author_committer_start = re.compile(r'^(author|committer)') - re_tab_full_line = re.compile(r'^\t(.*)$') + re_whitespace = re.compile(r"\s+") + re_hexsha_only = re.compile("^[0-9A-Fa-f]{40}$") + re_hexsha_shortened = re.compile("^[0-9A-Fa-f]{4,40}$") + re_envvars = re.compile(r"(\$(\{\s?)?[a-zA-Z_]\w*(\}\s?)?|%\s?[a-zA-Z_]\w*\s?%)") + re_author_committer_start = re.compile(r"^(author|committer)") + re_tab_full_line = re.compile(r"^\t(.*)$") # invariants # represents the configuration level of a configuration file @@ -105,8 +137,13 @@ class Repo(object): # Subclasses may easily bring in their own custom types by placing a constructor or type here GitCommandWrapperType = Git - def __init__(self, path: Optional[PathLike] = None, odbt: Type[LooseObjectDB] = GitCmdObjectDB, - search_parent_directories: bool = False, expand_vars: bool = True) -> None: + def __init__( + self, + path: Optional[PathLike] = None, + odbt: Type[LooseObjectDB] = GitCmdObjectDB, + search_parent_directories: bool = False, + expand_vars: bool = True, + ) -> None: """Create a new Repo instance :param path: @@ -132,9 +169,9 @@ class Repo(object): which is considered a bug though. :raise InvalidGitRepositoryError: :raise NoSuchPathError: - :return: git.Repo """ + :return: git.Repo""" - epath = path or os.getenv('GIT_DIR') + epath = path or os.getenv("GIT_DIR") if not epath: epath = os.getcwd() if Git.is_cygwin(): @@ -144,8 +181,10 @@ class Repo(object): if not isinstance(epath, str): epath = str(epath) if expand_vars and re.search(self.re_envvars, epath): - warnings.warn("The use of environment variables in paths is deprecated" + - "\nfor security reasons and may be removed in the future!!") + warnings.warn( + "The use of environment variables in paths is deprecated" + + "\nfor security reasons and may be removed in the future!!" + ) epath = expand_path(epath, expand_vars) if epath is not None: if not os.path.exists(epath): @@ -170,15 +209,15 @@ class Repo(object): # If GIT_DIR is specified but none of GIT_WORK_TREE and core.worktree is specified, # the current working directory is regarded as the top level of your working tree. self._working_tree_dir = os.path.dirname(self.git_dir) - if os.environ.get('GIT_COMMON_DIR') is None: + if os.environ.get("GIT_COMMON_DIR") is None: gitconf = self.config_reader("repository") - if gitconf.has_option('core', 'worktree'): - self._working_tree_dir = gitconf.get('core', 'worktree') - if 'GIT_WORK_TREE' in os.environ: - self._working_tree_dir = os.getenv('GIT_WORK_TREE') + if gitconf.has_option("core", "worktree"): + self._working_tree_dir = gitconf.get("core", "worktree") + if "GIT_WORK_TREE" in os.environ: + self._working_tree_dir = os.getenv("GIT_WORK_TREE") break - dotgit = osp.join(curpath, '.git') + dotgit = osp.join(curpath, ".git") sm_gitpath = find_submodule_git_dir(dotgit) if sm_gitpath is not None: self.git_dir = osp.normpath(sm_gitpath) @@ -204,13 +243,13 @@ class Repo(object): self._bare = False try: - self._bare = self.config_reader("repository").getboolean('core', 'bare') + self._bare = self.config_reader("repository").getboolean("core", "bare") except Exception: # lets not assume the option exists, although it should pass try: - common_dir = open(osp.join(self.git_dir, 'commondir'), 'rt').readlines()[0].strip() + common_dir = open(osp.join(self.git_dir, "commondir"), "rt").readlines()[0].strip() self._common_dir = osp.join(self.git_dir, common_dir) except OSError: self._common_dir = "" @@ -225,13 +264,13 @@ class Repo(object): self.git = self.GitCommandWrapperType(self.working_dir) # special handling, in special times - rootpath = osp.join(self.common_dir, 'objects') + rootpath = osp.join(self.common_dir, "objects") if issubclass(odbt, GitCmdObjectDB): self.odb = odbt(rootpath, self.git) else: self.odb = odbt(rootpath) - def __enter__(self) -> 'Repo': + def __enter__(self) -> "Repo": return self def __exit__(self, *args: Any) -> None: @@ -272,25 +311,23 @@ class Repo(object): # Description property def _get_description(self) -> str: if self.git_dir: - filename = osp.join(self.git_dir, 'description') - with open(filename, 'rb') as fp: + filename = osp.join(self.git_dir, "description") + with open(filename, "rb") as fp: return fp.read().rstrip().decode(defenc) def _set_description(self, descr: str) -> None: if self.git_dir: - filename = osp.join(self.git_dir, 'description') - with open(filename, 'wb') as fp: - fp.write((descr + '\n').encode(defenc)) + filename = osp.join(self.git_dir, "description") + with open(filename, "wb") as fp: + fp.write((descr + "\n").encode(defenc)) - description = property(_get_description, _set_description, - doc="the project's description") + description = property(_get_description, _set_description, doc="the project's description") del _get_description del _set_description @property def working_tree_dir(self) -> Optional[PathLike]: - """:return: The working tree directory of our git repository. If this is a bare repository, None is returned. - """ + """:return: The working tree directory of our git repository. If this is a bare repository, None is returned.""" return self._working_tree_dir @property @@ -312,7 +349,7 @@ class Repo(object): return self._bare @property - def heads(self) -> 'IterableList[Head]': + def heads(self) -> "IterableList[Head]": """A list of ``Head`` objects representing the branch heads in this repo @@ -320,7 +357,7 @@ class Repo(object): return Head.list_items(self) @property - def references(self) -> 'IterableList[Reference]': + def references(self) -> "IterableList[Reference]": """A list of Reference objects representing tags, heads and remote references. :return: IterableList(Reference, ...)""" @@ -333,24 +370,24 @@ class Repo(object): branches = heads @property - def index(self) -> 'IndexFile': + def index(self) -> "IndexFile": """:return: IndexFile representing this repository's index. :note: This property can be expensive, as the returned ``IndexFile`` will be reinitialized. It's recommended to re-use the object.""" return IndexFile(self) @property - def head(self) -> 'HEAD': + def head(self) -> "HEAD": """:return: HEAD Object pointing to the current head reference""" - return HEAD(self, 'HEAD') + return HEAD(self, "HEAD") @property - def remotes(self) -> 'IterableList[Remote]': + def remotes(self) -> "IterableList[Remote]": """A list of Remote objects allowing to access and manipulate remotes :return: ``git.IterableList(Remote, ...)``""" return Remote.list_items(self) - def remote(self, name: str = 'origin') -> 'Remote': + def remote(self, name: str = "origin") -> "Remote": """:return: Remote with the specified name :raise ValueError: if no remote with such a name exists""" r = Remote(self, name) @@ -358,17 +395,17 @@ class Repo(object): raise ValueError("Remote named '%s' didn't exist" % name) return r - #{ Submodules + # { Submodules @property - def submodules(self) -> 'IterableList[Submodule]': + def submodules(self) -> "IterableList[Submodule]": """ :return: git.IterableList(Submodule, ...) of direct submodules available from the current head""" return Submodule.list_items(self) - def submodule(self, name: str) -> 'Submodule': - """ :return: Submodule with the given name + def submodule(self, name: str) -> "Submodule": + """:return: Submodule with the given name :raise ValueError: If no such submodule exists""" try: return self.submodules[name] @@ -396,53 +433,61 @@ class Repo(object): see the documentation of RootModule.update""" return RootModule(self).update(*args, **kwargs) - #}END submodules + # }END submodules @property - def tags(self) -> 'IterableList[TagReference]': + def tags(self) -> "IterableList[TagReference]": """A list of ``Tag`` objects that are available in this repo - :return: ``git.IterableList(TagReference, ...)`` """ + :return: ``git.IterableList(TagReference, ...)``""" return TagReference.list_items(self) def tag(self, path: PathLike) -> TagReference: """:return: TagReference Object, reference pointing to a Commit or Tag - :param path: path to the tag reference, i.e. 0.1.5 or tags/0.1.5 """ + :param path: path to the tag reference, i.e. 0.1.5 or tags/0.1.5""" full_path = self._to_full_tag_path(path) return TagReference(self, full_path) @staticmethod def _to_full_tag_path(path: PathLike) -> str: path_str = str(path) - if path_str.startswith(TagReference._common_path_default + '/'): + if path_str.startswith(TagReference._common_path_default + "/"): return path_str - if path_str.startswith(TagReference._common_default + '/'): - return Reference._common_path_default + '/' + path_str + if path_str.startswith(TagReference._common_default + "/"): + return Reference._common_path_default + "/" + path_str else: - return TagReference._common_path_default + '/' + path_str - - def create_head(self, path: PathLike, - commit: Union['SymbolicReference', 'str'] = 'HEAD', - force: bool = False, logmsg: Optional[str] = None - ) -> 'Head': + return TagReference._common_path_default + "/" + path_str + + def create_head( + self, + path: PathLike, + commit: Union["SymbolicReference", "str"] = "HEAD", + force: bool = False, + logmsg: Optional[str] = None, + ) -> "Head": """Create a new head within the repository. For more documentation, please see the Head.create method. :return: newly created Head Reference""" return Head.create(self, path, commit, logmsg, force) - def delete_head(self, *heads: 'Union[str, Head]', **kwargs: Any) -> None: + def delete_head(self, *heads: "Union[str, Head]", **kwargs: Any) -> None: """Delete the given heads :param kwargs: Additional keyword arguments to be passed to git-branch""" return Head.delete(self, *heads, **kwargs) - def create_tag(self, path: PathLike, ref: str = 'HEAD', - message: Optional[str] = None, force: bool = False, **kwargs: Any - ) -> TagReference: + def create_tag( + self, + path: PathLike, + ref: str = "HEAD", + message: Optional[str] = None, + force: bool = False, + **kwargs: Any, + ) -> TagReference: """Create a new tag reference. For more documentation, please see the TagReference.create method. - :return: TagReference object """ + :return: TagReference object""" return TagReference.create(self, path, ref, message, force, **kwargs) def delete_tag(self, *tags: TagReference) -> None: @@ -458,7 +503,7 @@ class Repo(object): :return: Remote reference""" return Remote.create(self, name, url, **kwargs) - def delete_remote(self, remote: 'Remote') -> str: + def delete_remote(self, remote: "Remote") -> str: """Delete the given remote.""" return Remote.remove(self, remote) @@ -471,7 +516,7 @@ class Repo(object): if config_level == "system": return "/etc/gitconfig" elif config_level == "user": - config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", '~'), ".config") + config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", "~"), ".config") return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config"))) elif config_level == "global": return osp.normpath(osp.expanduser("~/.gitconfig")) @@ -483,11 +528,15 @@ class Repo(object): return osp.normpath(osp.join(repo_dir, "config")) else: - assert_never(config_level, # type:ignore[unreachable] - ValueError(f"Invalid configuration level: {config_level!r}")) + assert_never( + config_level, # type:ignore[unreachable] + ValueError(f"Invalid configuration level: {config_level!r}"), + ) - def config_reader(self, config_level: Optional[Lit_config_levels] = None, - ) -> GitConfigParser: + def config_reader( + self, + config_level: Optional[Lit_config_levels] = None, + ) -> GitConfigParser: """ :return: GitConfigParser allowing to read the full git configuration, but not to write it @@ -503,14 +552,16 @@ class Repo(object): unknown, instead the global path will be used.""" files = None if config_level is None: - files = [self._get_config_path(cast(Lit_config_levels, f)) - for f in self.config_level if cast(Lit_config_levels, f)] + files = [ + self._get_config_path(cast(Lit_config_levels, f)) + for f in self.config_level + if cast(Lit_config_levels, f) + ] else: files = [self._get_config_path(config_level)] return GitConfigParser(files, read_only=True, repo=self) - def config_writer(self, config_level: Lit_config_levels = "repository" - ) -> GitConfigParser: + def config_writer(self, config_level: Lit_config_levels = "repository") -> GitConfigParser: """ :return: GitConfigParser allowing to write values of the specified configuration file level. @@ -525,8 +576,7 @@ class Repo(object): repository = configuration file for this repository only""" return GitConfigParser(self._get_config_path(config_level), read_only=False, repo=self) - def commit(self, rev: Union[str, Commit_ish, None] = None - ) -> Commit: + def commit(self, rev: Union[str, Commit_ish, None] = None) -> Commit: """The Commit object for the specified revision :param rev: revision specifier, see git-rev-parse for viable options. @@ -536,12 +586,12 @@ class Repo(object): return self.head.commit return self.rev_parse(str(rev) + "^0") - def iter_trees(self, *args: Any, **kwargs: Any) -> Iterator['Tree']: + def iter_trees(self, *args: Any, **kwargs: Any) -> Iterator["Tree"]: """:return: Iterator yielding Tree objects :note: Takes all arguments known to iter_commits method""" return (c.tree for c in self.iter_commits(*args, **kwargs)) - def tree(self, rev: Union[Tree_ish, str, None] = None) -> 'Tree': + def tree(self, rev: Union[Tree_ish, str, None] = None) -> "Tree": """The Tree object for the given treeish revision Examples:: @@ -558,9 +608,12 @@ class Repo(object): return self.head.commit.tree return self.rev_parse(str(rev) + "^{tree}") - def iter_commits(self, rev: Union[str, Commit, 'SymbolicReference', None] = None, - paths: Union[PathLike, Sequence[PathLike]] = '', - **kwargs: Any) -> Iterator[Commit]: + def iter_commits( + self, + rev: Union[str, Commit, "SymbolicReference", None] = None, + paths: Union[PathLike, Sequence[PathLike]] = "", + **kwargs: Any, + ) -> Iterator[Commit]: """A list of Commit objects representing the history of a given ref/commit :param rev: @@ -584,8 +637,7 @@ class Repo(object): return Commit.iter_items(self, rev, paths, **kwargs) - def merge_base(self, *rev: TBD, **kwargs: Any - ) -> List[Union[Commit_ish, None]]: + def merge_base(self, *rev: TBD, **kwargs: Any) -> List[Union[Commit_ish, None]]: """Find the closest common ancestor for the given revision (e.g. Commits, Tags, References, etc) :param rev: At least two revs to find the common ancestor for. @@ -616,7 +668,7 @@ class Repo(object): return res - def is_ancestor(self, ancestor_rev: 'Commit', rev: 'Commit') -> bool: + def is_ancestor(self, ancestor_rev: "Commit", rev: "Commit") -> bool: """Check if a commit is an ancestor of another :param ancestor_rev: Rev which should be an ancestor @@ -639,8 +691,11 @@ class Repo(object): if object_info.type == object_type.encode(): return True else: - log.debug("Commit hash points to an object of type '%s'. Requested were objects of type '%s'", - object_info.type.decode(), object_type) + log.debug( + "Commit hash points to an object of type '%s'. Requested were objects of type '%s'", + object_info.type.decode(), + object_type, + ) return False else: return True @@ -662,8 +717,11 @@ class Repo(object): elif not value and fileexists: os.unlink(filename) - daemon_export = property(_get_daemon_export, _set_daemon_export, - doc="If True, git-daemon may export this repository") + daemon_export = property( + _get_daemon_export, + _set_daemon_export, + doc="If True, git-daemon may export this repository", + ) del _get_daemon_export del _set_daemon_export @@ -672,10 +730,10 @@ class Repo(object): :return: list of strings being pathnames of alternates""" if self.git_dir: - alternates_path = osp.join(self.git_dir, 'objects', 'info', 'alternates') + alternates_path = osp.join(self.git_dir, "objects", "info", "alternates") if osp.exists(alternates_path): - with open(alternates_path, 'rb') as f: + with open(alternates_path, "rb") as f: alts = f.read().decode(defenc) return alts.strip().splitlines() return [] @@ -691,19 +749,28 @@ class Repo(object): :note: The method does not check for the existence of the paths in alts as the caller is responsible.""" - alternates_path = osp.join(self.common_dir, 'objects', 'info', 'alternates') + alternates_path = osp.join(self.common_dir, "objects", "info", "alternates") if not alts: if osp.isfile(alternates_path): os.remove(alternates_path) else: - with open(alternates_path, 'wb') as f: + with open(alternates_path, "wb") as f: f.write("\n".join(alts).encode(defenc)) - alternates = property(_get_alternates, _set_alternates, - doc="Retrieve a list of alternates paths or set a list paths to be used as alternates") - - def is_dirty(self, index: bool = True, working_tree: bool = True, untracked_files: bool = False, - submodules: bool = True, path: Optional[PathLike] = None) -> bool: + alternates = property( + _get_alternates, + _set_alternates, + doc="Retrieve a list of alternates paths or set a list paths to be used as alternates", + ) + + def is_dirty( + self, + index: bool = True, + working_tree: bool = True, + untracked_files: bool = False, + submodules: bool = True, + path: Optional[PathLike] = None, + ) -> bool: """ :return: ``True``, the repository is considered dirty. By default it will react @@ -715,15 +782,14 @@ class Repo(object): return False # start from the one which is fastest to evaluate - default_args = ['--abbrev=40', '--full-index', '--raw'] + default_args = ["--abbrev=40", "--full-index", "--raw"] if not submodules: - default_args.append('--ignore-submodules') + default_args.append("--ignore-submodules") if path: default_args.extend(["--", str(path)]) if index: # diff index against HEAD - if osp.isfile(self.index.path) and \ - len(self.git.diff('--cached', *default_args)): + if osp.isfile(self.index.path) and len(self.git.diff("--cached", *default_args)): return True # END index handling if working_tree: @@ -755,11 +821,7 @@ class Repo(object): def _get_untracked_files(self, *args: Any, **kwargs: Any) -> List[str]: # make sure we get all files, not only untracked directories - proc = self.git.status(*args, - porcelain=True, - untracked_files=True, - as_process=True, - **kwargs) + proc = self.git.status(*args, porcelain=True, untracked_files=True, as_process=True, **kwargs) # Untracked files prefix in porcelain mode prefix = "?? " untracked_files = [] @@ -767,12 +829,12 @@ class Repo(object): line = line.decode(defenc) if not line.startswith(prefix): continue - filename = line[len(prefix):].rstrip('\n') + filename = line[len(prefix) :].rstrip("\n") # Special characters are escaped if filename[0] == filename[-1] == '"': filename = filename[1:-1] # WHATEVER ... it's a mess, but works for me - filename = filename.encode('ascii').decode('unicode_escape').encode('latin1').decode(defenc) + filename = filename.encode("ascii").decode("unicode_escape").encode("latin1").decode(defenc) untracked_files.append(filename) finalize_process(proc) return untracked_files @@ -797,7 +859,7 @@ class Repo(object): # reveal_type(self.head.reference) # => Reference return self.head.reference - def blame_incremental(self, rev: str | HEAD, file: str, **kwargs: Any) -> Iterator['BlameEntry']: + def blame_incremental(self, rev: str | HEAD, file: str, **kwargs: Any) -> Iterator["BlameEntry"]: """Iterator for blame information for the given file at the given revision. Unlike .blame(), this does not return the actual file's contents, only @@ -812,10 +874,10 @@ class Repo(object): should get a continuous range spanning all line numbers in the file. """ - data: bytes = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs) + data: bytes = self.git.blame(rev, "--", file, p=True, incremental=True, stdout_as_string=False, **kwargs) commits: Dict[bytes, Commit] = {} - stream = (line for line in data.split(b'\n') if line) + stream = (line for line in data.split(b"\n") if line) while True: try: line = next(stream) # when exhausted, causes a StopIteration, terminating this function @@ -835,25 +897,32 @@ class Repo(object): line = next(stream) except StopIteration: return - if line == b'boundary': + if line == b"boundary": # "boundary" indicates a root commit and occurs # instead of the "previous" tag continue - tag, value = line.split(b' ', 1) + tag, value = line.split(b" ", 1) props[tag] = value - if tag == b'filename': + if tag == b"filename": # "filename" formally terminates the entry for --incremental orig_filename = value break - c = Commit(self, hex_to_bin(hexsha), - author=Actor(safe_decode(props[b'author']), - safe_decode(props[b'author-mail'].lstrip(b'<').rstrip(b'>'))), - authored_date=int(props[b'author-time']), - committer=Actor(safe_decode(props[b'committer']), - safe_decode(props[b'committer-mail'].lstrip(b'<').rstrip(b'>'))), - committed_date=int(props[b'committer-time'])) + c = Commit( + self, + hex_to_bin(hexsha), + author=Actor( + safe_decode(props[b"author"]), + safe_decode(props[b"author-mail"].lstrip(b"<").rstrip(b">")), + ), + authored_date=int(props[b"author-time"]), + committer=Actor( + safe_decode(props[b"committer"]), + safe_decode(props[b"committer-mail"].lstrip(b"<").rstrip(b">")), + ), + committed_date=int(props[b"committer-time"]), + ) commits[hexsha] = c else: # Discard all lines until we find "filename" which is @@ -863,18 +932,21 @@ class Repo(object): line = next(stream) # will fail if we reach the EOF unexpectedly except StopIteration: return - tag, value = line.split(b' ', 1) - if tag == b'filename': + tag, value = line.split(b" ", 1) + if tag == b"filename": orig_filename = value break - yield BlameEntry(commits[hexsha], - range(lineno, lineno + num_lines), - safe_decode(orig_filename), - range(orig_lineno, orig_lineno + num_lines)) + yield BlameEntry( + commits[hexsha], + range(lineno, lineno + num_lines), + safe_decode(orig_filename), + range(orig_lineno, orig_lineno + num_lines), + ) - def blame(self, rev: Union[str, HEAD], file: str, incremental: bool = False, **kwargs: Any - ) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None: + def blame( + self, rev: Union[str, HEAD], file: str, incremental: bool = False, **kwargs: Any + ) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None: """The blame information for the given file at the given revision. :param rev: revision specifier, see git-rev-parse for viable options. @@ -886,7 +958,7 @@ class Repo(object): if incremental: return self.blame_incremental(rev, file, **kwargs) - data: bytes = self.git.blame(rev, '--', file, p=True, stdout_as_string=False, **kwargs) + data: bytes = self.git.blame(rev, "--", file, p=True, stdout_as_string=False, **kwargs) commits: Dict[str, Commit] = {} blames: List[List[Commit | List[str | bytes] | None]] = [] @@ -909,7 +981,7 @@ class Repo(object): try: line_str = line_bytes.rstrip().decode(defenc) except UnicodeDecodeError: - firstpart = '' + firstpart = "" parts = [] is_binary = True else: @@ -929,10 +1001,10 @@ class Repo(object): # another line of blame with the same data digits = parts[-1].split(" ") if len(digits) == 3: - info = {'id': firstpart} + info = {"id": firstpart} blames.append([None, []]) - elif info['id'] != firstpart: - info = {'id': firstpart} + elif info["id"] != firstpart: + info = {"id": firstpart} blames.append([commits.get(firstpart), []]) # END blame data initialization else: @@ -948,17 +1020,17 @@ class Repo(object): # committer-time 1192271832 # committer-tz -0700 - IGNORED BY US role = m.group(0) - if role == 'author': - if firstpart.endswith('-mail'): + if role == "author": + if firstpart.endswith("-mail"): info["author_email"] = parts[-1] - elif firstpart.endswith('-time'): + elif firstpart.endswith("-time"): info["author_date"] = int(parts[-1]) elif role == firstpart: info["author"] = parts[-1] - elif role == 'committer': - if firstpart.endswith('-mail'): + elif role == "committer": + if firstpart.endswith("-mail"): info["committer_email"] = parts[-1] - elif firstpart.endswith('-time'): + elif firstpart.endswith("-time"): info["committer_date"] = int(parts[-1]) elif role == firstpart: info["committer"] = parts[-1] @@ -968,21 +1040,23 @@ class Repo(object): # filename lib/grit.rb # summary add Blob # <and rest> - if firstpart.startswith('filename'): - info['filename'] = parts[-1] - elif firstpart.startswith('summary'): - info['summary'] = parts[-1] - elif firstpart == '': + if firstpart.startswith("filename"): + info["filename"] = parts[-1] + elif firstpart.startswith("summary"): + info["summary"] = parts[-1] + elif firstpart == "": if info: - sha = info['id'] + sha = info["id"] c = commits.get(sha) if c is None: - c = Commit(self, hex_to_bin(sha), - author=Actor._from_string(f"{info['author']} {info['author_email']}"), - authored_date=info['author_date'], - committer=Actor._from_string( - f"{info['committer']} {info['committer_email']}"), - committed_date=info['committer_date']) + c = Commit( + self, + hex_to_bin(sha), + author=Actor._from_string(f"{info['author']} {info['author_email']}"), + authored_date=info["author_date"], + committer=Actor._from_string(f"{info['committer']} {info['committer_email']}"), + committed_date=info["committer_date"], + ) commits[sha] = c blames[-1][0] = c # END if commit objects needs initial creation @@ -990,7 +1064,7 @@ class Repo(object): if blames[-1][1] is not None: line: str | bytes if not is_binary: - if line_str and line_str[0] == '\t': + if line_str and line_str[0] == "\t": line_str = line_str[1:] line = line_str else: @@ -1001,16 +1075,22 @@ class Repo(object): # the last line we have seen. blames[-1][1].append(line) - info = {'id': sha} + info = {"id": sha} # END if we collected commit info # END distinguish filename,summary,rest # END distinguish author|committer vs filename,summary,rest # END distinguish hexsha vs other information return blames - @ classmethod - def init(cls, path: Union[PathLike, None] = None, mkdir: bool = True, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB, - expand_vars: bool = True, **kwargs: Any) -> 'Repo': + @classmethod + def init( + cls, + path: Union[PathLike, None] = None, + mkdir: bool = True, + odbt: Type[GitCmdObjectDB] = GitCmdObjectDB, + expand_vars: bool = True, + **kwargs: Any, + ) -> "Repo": """Initialize a git repository at the given path if specified :param path: @@ -1047,12 +1127,18 @@ class Repo(object): git.init(**kwargs) return cls(path, odbt=odbt) - @ classmethod - def _clone(cls, git: 'Git', url: PathLike, path: PathLike, odb_default_type: Type[GitCmdObjectDB], - progress: Union['RemoteProgress', 'UpdateProgress', Callable[..., 'RemoteProgress'], None] = None, - multi_options: Optional[List[str]] = None, **kwargs: Any - ) -> 'Repo': - odbt = kwargs.pop('odbt', odb_default_type) + @classmethod + def _clone( + cls, + git: "Git", + url: PathLike, + path: PathLike, + odb_default_type: Type[GitCmdObjectDB], + progress: Union["RemoteProgress", "UpdateProgress", Callable[..., "RemoteProgress"], None] = None, + multi_options: Optional[List[str]] = None, + **kwargs: Any, + ) -> "Repo": + odbt = kwargs.pop("odbt", odb_default_type) # when pathlib.Path or other classbased path is passed if not isinstance(path, str): @@ -1064,23 +1150,34 @@ class Repo(object): # becomes:: # git clone --bare /cygwin/d/foo.git /cygwin/d/C:\\Work # - clone_path = (Git.polish_url(path) - if Git.is_cygwin() and 'bare' in kwargs - else path) - sep_dir = kwargs.get('separate_git_dir') + clone_path = Git.polish_url(path) if Git.is_cygwin() and "bare" in kwargs else path + sep_dir = kwargs.get("separate_git_dir") if sep_dir: - kwargs['separate_git_dir'] = Git.polish_url(sep_dir) + kwargs["separate_git_dir"] = Git.polish_url(sep_dir) multi = None if multi_options: - multi = shlex.split(' '.join(multi_options)) - proc = git.clone(multi, Git.polish_url(str(url)), clone_path, with_extended_output=True, as_process=True, - v=True, universal_newlines=True, **add_progress(kwargs, git, progress)) + multi = shlex.split(" ".join(multi_options)) + proc = git.clone( + multi, + Git.polish_url(str(url)), + clone_path, + with_extended_output=True, + as_process=True, + v=True, + universal_newlines=True, + **add_progress(kwargs, git, progress), + ) if progress: - handle_process_output(proc, None, to_progress_instance(progress).new_message_handler(), - finalize_process, decode_streams=False) + handle_process_output( + proc, + None, + to_progress_instance(progress).new_message_handler(), + finalize_process, + decode_streams=False, + ) else: (stdout, stderr) = proc.communicate() - cmdline = getattr(proc, 'args', '') + cmdline = getattr(proc, "args", "") cmdline = remove_password_if_present(cmdline) log.debug("Cmd(%s)'s unused stdout: %s", cmdline, stdout) @@ -1103,12 +1200,17 @@ class Repo(object): # sure if repo.remotes: with repo.remotes[0].config_writer as writer: - writer.set_value('url', Git.polish_url(repo.remotes[0].url)) + writer.set_value("url", Git.polish_url(repo.remotes[0].url)) # END handle remote repo return repo - def clone(self, path: PathLike, progress: Optional[Callable] = None, - multi_options: Optional[List[str]] = None, **kwargs: Any) -> 'Repo': + def clone( + self, + path: PathLike, + progress: Optional[Callable] = None, + multi_options: Optional[List[str]] = None, + **kwargs: Any, + ) -> "Repo": """Create a clone from this repository. :param path: is the full path of the new repo (traditionally ends with ./<name>.git). @@ -1123,12 +1225,26 @@ class Repo(object): * All remaining keyword arguments are given to the git-clone command :return: ``git.Repo`` (the newly cloned repo)""" - return self._clone(self.git, self.common_dir, path, type(self.odb), progress, multi_options, **kwargs) - - @ classmethod - def clone_from(cls, url: PathLike, to_path: PathLike, progress: Optional[Callable] = None, - env: Optional[Mapping[str, str]] = None, - multi_options: Optional[List[str]] = None, **kwargs: Any) -> 'Repo': + return self._clone( + self.git, + self.common_dir, + path, + type(self.odb), + progress, + multi_options, + **kwargs, + ) + + @classmethod + def clone_from( + cls, + url: PathLike, + to_path: PathLike, + progress: Optional[Callable] = None, + env: Optional[Mapping[str, str]] = None, + multi_options: Optional[List[str]] = None, + **kwargs: Any, + ) -> "Repo": """Create a clone from the given URL :param url: valid git url, see http://www.kernel.org/pub/software/scm/git/docs/git-clone.html#URLS @@ -1148,8 +1264,13 @@ class Repo(object): git.update_environment(**env) return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs) - def archive(self, ostream: Union[TextIO, BinaryIO], treeish: Optional[str] = None, - prefix: Optional[str] = None, **kwargs: Any) -> Repo: + def archive( + self, + ostream: Union[TextIO, BinaryIO], + treeish: Optional[str] = None, + prefix: Optional[str] = None, + **kwargs: Any, + ) -> Repo: """Archive the tree at the given revision. :param ostream: file compatible stream object to which the archive will be written as bytes @@ -1166,10 +1287,10 @@ class Repo(object): :return: self""" if treeish is None: treeish = self.head.commit - if prefix and 'prefix' not in kwargs: - kwargs['prefix'] = prefix - kwargs['output_stream'] = ostream - path = kwargs.pop('path', []) + if prefix and "prefix" not in kwargs: + kwargs["prefix"] = prefix + kwargs["output_stream"] = ostream + path = kwargs.pop("path", []) path = cast(Union[PathLike, List[PathLike], Tuple[PathLike, ...]], path) if not isinstance(path, (tuple, list)): path = [path] @@ -1186,7 +1307,7 @@ class Repo(object): if self.bare: return False if self.working_tree_dir: - return osp.isfile(osp.join(self.working_tree_dir, '.git')) + return osp.isfile(osp.join(self.working_tree_dir, ".git")) else: return False # or raise Error? @@ -1194,7 +1315,7 @@ class Repo(object): def __repr__(self) -> str: clazz = self.__class__ - return '<%s.%s %r>' % (clazz.__module__, clazz.__name__, self.git_dir) + return "<%s.%s %r>" % (clazz.__module__, clazz.__name__, self.git_dir) def currently_rebasing_on(self) -> Commit | None: """ |