summaryrefslogtreecommitdiff
path: root/git/repo/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'git/repo/base.py')
-rw-r--r--git/repo/base.py525
1 files changed, 323 insertions, 202 deletions
diff --git a/git/repo/base.py b/git/repo/base.py
index bea0dcb5..111a350e 100644
--- a/git/repo/base.py
+++ b/git/repo/base.py
@@ -13,10 +13,7 @@ from gitdb.db.loose import LooseObjectDB
from gitdb.exc import BadObject
-from git.cmd import (
- Git,
- handle_process_output
-)
+from git.cmd import Git, handle_process_output
from git.compat import (
defenc,
safe_decode,
@@ -29,20 +26,54 @@ from git.index import IndexFile
from git.objects import Submodule, RootModule, Commit
from git.refs import HEAD, Head, Reference, TagReference
from git.remote import Remote, add_progress, to_progress_instance
-from git.util import Actor, finalize_process, decygpath, hex_to_bin, expand_path, remove_password_if_present
+from git.util import (
+ Actor,
+ finalize_process,
+ decygpath,
+ hex_to_bin,
+ expand_path,
+ remove_password_if_present,
+)
import os.path as osp
-from .fun import rev_parse, is_git_dir, find_submodule_git_dir, touch, find_worktree_git_dir
+from .fun import (
+ rev_parse,
+ is_git_dir,
+ find_submodule_git_dir,
+ touch,
+ find_worktree_git_dir,
+)
import gc
import gitdb
# typing ------------------------------------------------------
-from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish, assert_never
-from typing import (Any, BinaryIO, Callable, Dict,
- Iterator, List, Mapping, Optional, Sequence,
- TextIO, Tuple, Type, Union,
- NamedTuple, cast, TYPE_CHECKING)
+from git.types import (
+ TBD,
+ PathLike,
+ Lit_config_levels,
+ Commit_ish,
+ Tree_ish,
+ assert_never,
+)
+from typing import (
+ Any,
+ BinaryIO,
+ Callable,
+ Dict,
+ Iterator,
+ List,
+ Mapping,
+ Optional,
+ Sequence,
+ TextIO,
+ Tuple,
+ Type,
+ Union,
+ NamedTuple,
+ cast,
+ TYPE_CHECKING,
+)
from git.types import ConfigLevels_Tup, TypedDict
@@ -57,11 +88,11 @@ if TYPE_CHECKING:
log = logging.getLogger(__name__)
-__all__ = ('Repo',)
+__all__ = ("Repo",)
class BlameEntry(NamedTuple):
- commit: Dict[str, 'Commit']
+ commit: Dict[str, "Commit"]
linenos: range
orig_path: Optional[str]
orig_linenos: range
@@ -81,21 +112,22 @@ class Repo(object):
if we are a bare repository.
'git_dir' is the .git repository directory, which is always set."""
- DAEMON_EXPORT_FILE = 'git-daemon-export-ok'
- git = cast('Git', None) # Must exist, or __del__ will fail in case we raise on `__init__()`
+ DAEMON_EXPORT_FILE = "git-daemon-export-ok"
+
+ git = cast("Git", None) # Must exist, or __del__ will fail in case we raise on `__init__()`
working_dir: Optional[PathLike] = None
_working_tree_dir: Optional[PathLike] = None
git_dir: PathLike = ""
_common_dir: PathLike = ""
# precompiled regex
- re_whitespace = re.compile(r'\s+')
- re_hexsha_only = re.compile('^[0-9A-Fa-f]{40}$')
- re_hexsha_shortened = re.compile('^[0-9A-Fa-f]{4,40}$')
- re_envvars = re.compile(r'(\$(\{\s?)?[a-zA-Z_]\w*(\}\s?)?|%\s?[a-zA-Z_]\w*\s?%)')
- re_author_committer_start = re.compile(r'^(author|committer)')
- re_tab_full_line = re.compile(r'^\t(.*)$')
+ re_whitespace = re.compile(r"\s+")
+ re_hexsha_only = re.compile("^[0-9A-Fa-f]{40}$")
+ re_hexsha_shortened = re.compile("^[0-9A-Fa-f]{4,40}$")
+ re_envvars = re.compile(r"(\$(\{\s?)?[a-zA-Z_]\w*(\}\s?)?|%\s?[a-zA-Z_]\w*\s?%)")
+ re_author_committer_start = re.compile(r"^(author|committer)")
+ re_tab_full_line = re.compile(r"^\t(.*)$")
# invariants
# represents the configuration level of a configuration file
@@ -105,8 +137,13 @@ class Repo(object):
# Subclasses may easily bring in their own custom types by placing a constructor or type here
GitCommandWrapperType = Git
- def __init__(self, path: Optional[PathLike] = None, odbt: Type[LooseObjectDB] = GitCmdObjectDB,
- search_parent_directories: bool = False, expand_vars: bool = True) -> None:
+ def __init__(
+ self,
+ path: Optional[PathLike] = None,
+ odbt: Type[LooseObjectDB] = GitCmdObjectDB,
+ search_parent_directories: bool = False,
+ expand_vars: bool = True,
+ ) -> None:
"""Create a new Repo instance
:param path:
@@ -132,9 +169,9 @@ class Repo(object):
which is considered a bug though.
:raise InvalidGitRepositoryError:
:raise NoSuchPathError:
- :return: git.Repo """
+ :return: git.Repo"""
- epath = path or os.getenv('GIT_DIR')
+ epath = path or os.getenv("GIT_DIR")
if not epath:
epath = os.getcwd()
if Git.is_cygwin():
@@ -144,8 +181,10 @@ class Repo(object):
if not isinstance(epath, str):
epath = str(epath)
if expand_vars and re.search(self.re_envvars, epath):
- warnings.warn("The use of environment variables in paths is deprecated" +
- "\nfor security reasons and may be removed in the future!!")
+ warnings.warn(
+ "The use of environment variables in paths is deprecated"
+ + "\nfor security reasons and may be removed in the future!!"
+ )
epath = expand_path(epath, expand_vars)
if epath is not None:
if not os.path.exists(epath):
@@ -170,15 +209,15 @@ class Repo(object):
# If GIT_DIR is specified but none of GIT_WORK_TREE and core.worktree is specified,
# the current working directory is regarded as the top level of your working tree.
self._working_tree_dir = os.path.dirname(self.git_dir)
- if os.environ.get('GIT_COMMON_DIR') is None:
+ if os.environ.get("GIT_COMMON_DIR") is None:
gitconf = self.config_reader("repository")
- if gitconf.has_option('core', 'worktree'):
- self._working_tree_dir = gitconf.get('core', 'worktree')
- if 'GIT_WORK_TREE' in os.environ:
- self._working_tree_dir = os.getenv('GIT_WORK_TREE')
+ if gitconf.has_option("core", "worktree"):
+ self._working_tree_dir = gitconf.get("core", "worktree")
+ if "GIT_WORK_TREE" in os.environ:
+ self._working_tree_dir = os.getenv("GIT_WORK_TREE")
break
- dotgit = osp.join(curpath, '.git')
+ dotgit = osp.join(curpath, ".git")
sm_gitpath = find_submodule_git_dir(dotgit)
if sm_gitpath is not None:
self.git_dir = osp.normpath(sm_gitpath)
@@ -204,13 +243,13 @@ class Repo(object):
self._bare = False
try:
- self._bare = self.config_reader("repository").getboolean('core', 'bare')
+ self._bare = self.config_reader("repository").getboolean("core", "bare")
except Exception:
# lets not assume the option exists, although it should
pass
try:
- common_dir = open(osp.join(self.git_dir, 'commondir'), 'rt').readlines()[0].strip()
+ common_dir = open(osp.join(self.git_dir, "commondir"), "rt").readlines()[0].strip()
self._common_dir = osp.join(self.git_dir, common_dir)
except OSError:
self._common_dir = ""
@@ -225,13 +264,13 @@ class Repo(object):
self.git = self.GitCommandWrapperType(self.working_dir)
# special handling, in special times
- rootpath = osp.join(self.common_dir, 'objects')
+ rootpath = osp.join(self.common_dir, "objects")
if issubclass(odbt, GitCmdObjectDB):
self.odb = odbt(rootpath, self.git)
else:
self.odb = odbt(rootpath)
- def __enter__(self) -> 'Repo':
+ def __enter__(self) -> "Repo":
return self
def __exit__(self, *args: Any) -> None:
@@ -272,25 +311,23 @@ class Repo(object):
# Description property
def _get_description(self) -> str:
if self.git_dir:
- filename = osp.join(self.git_dir, 'description')
- with open(filename, 'rb') as fp:
+ filename = osp.join(self.git_dir, "description")
+ with open(filename, "rb") as fp:
return fp.read().rstrip().decode(defenc)
def _set_description(self, descr: str) -> None:
if self.git_dir:
- filename = osp.join(self.git_dir, 'description')
- with open(filename, 'wb') as fp:
- fp.write((descr + '\n').encode(defenc))
+ filename = osp.join(self.git_dir, "description")
+ with open(filename, "wb") as fp:
+ fp.write((descr + "\n").encode(defenc))
- description = property(_get_description, _set_description,
- doc="the project's description")
+ description = property(_get_description, _set_description, doc="the project's description")
del _get_description
del _set_description
@property
def working_tree_dir(self) -> Optional[PathLike]:
- """:return: The working tree directory of our git repository. If this is a bare repository, None is returned.
- """
+ """:return: The working tree directory of our git repository. If this is a bare repository, None is returned."""
return self._working_tree_dir
@property
@@ -312,7 +349,7 @@ class Repo(object):
return self._bare
@property
- def heads(self) -> 'IterableList[Head]':
+ def heads(self) -> "IterableList[Head]":
"""A list of ``Head`` objects representing the branch heads in
this repo
@@ -320,7 +357,7 @@ class Repo(object):
return Head.list_items(self)
@property
- def references(self) -> 'IterableList[Reference]':
+ def references(self) -> "IterableList[Reference]":
"""A list of Reference objects representing tags, heads and remote references.
:return: IterableList(Reference, ...)"""
@@ -333,24 +370,24 @@ class Repo(object):
branches = heads
@property
- def index(self) -> 'IndexFile':
+ def index(self) -> "IndexFile":
""":return: IndexFile representing this repository's index.
:note: This property can be expensive, as the returned ``IndexFile`` will be
reinitialized. It's recommended to re-use the object."""
return IndexFile(self)
@property
- def head(self) -> 'HEAD':
+ def head(self) -> "HEAD":
""":return: HEAD Object pointing to the current head reference"""
- return HEAD(self, 'HEAD')
+ return HEAD(self, "HEAD")
@property
- def remotes(self) -> 'IterableList[Remote]':
+ def remotes(self) -> "IterableList[Remote]":
"""A list of Remote objects allowing to access and manipulate remotes
:return: ``git.IterableList(Remote, ...)``"""
return Remote.list_items(self)
- def remote(self, name: str = 'origin') -> 'Remote':
+ def remote(self, name: str = "origin") -> "Remote":
""":return: Remote with the specified name
:raise ValueError: if no remote with such a name exists"""
r = Remote(self, name)
@@ -358,17 +395,17 @@ class Repo(object):
raise ValueError("Remote named '%s' didn't exist" % name)
return r
- #{ Submodules
+ # { Submodules
@property
- def submodules(self) -> 'IterableList[Submodule]':
+ def submodules(self) -> "IterableList[Submodule]":
"""
:return: git.IterableList(Submodule, ...) of direct submodules
available from the current head"""
return Submodule.list_items(self)
- def submodule(self, name: str) -> 'Submodule':
- """ :return: Submodule with the given name
+ def submodule(self, name: str) -> "Submodule":
+ """:return: Submodule with the given name
:raise ValueError: If no such submodule exists"""
try:
return self.submodules[name]
@@ -396,53 +433,61 @@ class Repo(object):
see the documentation of RootModule.update"""
return RootModule(self).update(*args, **kwargs)
- #}END submodules
+ # }END submodules
@property
- def tags(self) -> 'IterableList[TagReference]':
+ def tags(self) -> "IterableList[TagReference]":
"""A list of ``Tag`` objects that are available in this repo
- :return: ``git.IterableList(TagReference, ...)`` """
+ :return: ``git.IterableList(TagReference, ...)``"""
return TagReference.list_items(self)
def tag(self, path: PathLike) -> TagReference:
""":return: TagReference Object, reference pointing to a Commit or Tag
- :param path: path to the tag reference, i.e. 0.1.5 or tags/0.1.5 """
+ :param path: path to the tag reference, i.e. 0.1.5 or tags/0.1.5"""
full_path = self._to_full_tag_path(path)
return TagReference(self, full_path)
@staticmethod
def _to_full_tag_path(path: PathLike) -> str:
path_str = str(path)
- if path_str.startswith(TagReference._common_path_default + '/'):
+ if path_str.startswith(TagReference._common_path_default + "/"):
return path_str
- if path_str.startswith(TagReference._common_default + '/'):
- return Reference._common_path_default + '/' + path_str
+ if path_str.startswith(TagReference._common_default + "/"):
+ return Reference._common_path_default + "/" + path_str
else:
- return TagReference._common_path_default + '/' + path_str
-
- def create_head(self, path: PathLike,
- commit: Union['SymbolicReference', 'str'] = 'HEAD',
- force: bool = False, logmsg: Optional[str] = None
- ) -> 'Head':
+ return TagReference._common_path_default + "/" + path_str
+
+ def create_head(
+ self,
+ path: PathLike,
+ commit: Union["SymbolicReference", "str"] = "HEAD",
+ force: bool = False,
+ logmsg: Optional[str] = None,
+ ) -> "Head":
"""Create a new head within the repository.
For more documentation, please see the Head.create method.
:return: newly created Head Reference"""
return Head.create(self, path, commit, logmsg, force)
- def delete_head(self, *heads: 'Union[str, Head]', **kwargs: Any) -> None:
+ def delete_head(self, *heads: "Union[str, Head]", **kwargs: Any) -> None:
"""Delete the given heads
:param kwargs: Additional keyword arguments to be passed to git-branch"""
return Head.delete(self, *heads, **kwargs)
- def create_tag(self, path: PathLike, ref: str = 'HEAD',
- message: Optional[str] = None, force: bool = False, **kwargs: Any
- ) -> TagReference:
+ def create_tag(
+ self,
+ path: PathLike,
+ ref: str = "HEAD",
+ message: Optional[str] = None,
+ force: bool = False,
+ **kwargs: Any,
+ ) -> TagReference:
"""Create a new tag reference.
For more documentation, please see the TagReference.create method.
- :return: TagReference object """
+ :return: TagReference object"""
return TagReference.create(self, path, ref, message, force, **kwargs)
def delete_tag(self, *tags: TagReference) -> None:
@@ -458,7 +503,7 @@ class Repo(object):
:return: Remote reference"""
return Remote.create(self, name, url, **kwargs)
- def delete_remote(self, remote: 'Remote') -> str:
+ def delete_remote(self, remote: "Remote") -> str:
"""Delete the given remote."""
return Remote.remove(self, remote)
@@ -471,7 +516,7 @@ class Repo(object):
if config_level == "system":
return "/etc/gitconfig"
elif config_level == "user":
- config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", '~'), ".config")
+ config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", "~"), ".config")
return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config")))
elif config_level == "global":
return osp.normpath(osp.expanduser("~/.gitconfig"))
@@ -483,11 +528,15 @@ class Repo(object):
return osp.normpath(osp.join(repo_dir, "config"))
else:
- assert_never(config_level, # type:ignore[unreachable]
- ValueError(f"Invalid configuration level: {config_level!r}"))
+ assert_never(
+ config_level, # type:ignore[unreachable]
+ ValueError(f"Invalid configuration level: {config_level!r}"),
+ )
- def config_reader(self, config_level: Optional[Lit_config_levels] = None,
- ) -> GitConfigParser:
+ def config_reader(
+ self,
+ config_level: Optional[Lit_config_levels] = None,
+ ) -> GitConfigParser:
"""
:return:
GitConfigParser allowing to read the full git configuration, but not to write it
@@ -503,14 +552,16 @@ class Repo(object):
unknown, instead the global path will be used."""
files = None
if config_level is None:
- files = [self._get_config_path(cast(Lit_config_levels, f))
- for f in self.config_level if cast(Lit_config_levels, f)]
+ files = [
+ self._get_config_path(cast(Lit_config_levels, f))
+ for f in self.config_level
+ if cast(Lit_config_levels, f)
+ ]
else:
files = [self._get_config_path(config_level)]
return GitConfigParser(files, read_only=True, repo=self)
- def config_writer(self, config_level: Lit_config_levels = "repository"
- ) -> GitConfigParser:
+ def config_writer(self, config_level: Lit_config_levels = "repository") -> GitConfigParser:
"""
:return:
GitConfigParser allowing to write values of the specified configuration file level.
@@ -525,8 +576,7 @@ class Repo(object):
repository = configuration file for this repository only"""
return GitConfigParser(self._get_config_path(config_level), read_only=False, repo=self)
- def commit(self, rev: Union[str, Commit_ish, None] = None
- ) -> Commit:
+ def commit(self, rev: Union[str, Commit_ish, None] = None) -> Commit:
"""The Commit object for the specified revision
:param rev: revision specifier, see git-rev-parse for viable options.
@@ -536,12 +586,12 @@ class Repo(object):
return self.head.commit
return self.rev_parse(str(rev) + "^0")
- def iter_trees(self, *args: Any, **kwargs: Any) -> Iterator['Tree']:
+ def iter_trees(self, *args: Any, **kwargs: Any) -> Iterator["Tree"]:
""":return: Iterator yielding Tree objects
:note: Takes all arguments known to iter_commits method"""
return (c.tree for c in self.iter_commits(*args, **kwargs))
- def tree(self, rev: Union[Tree_ish, str, None] = None) -> 'Tree':
+ def tree(self, rev: Union[Tree_ish, str, None] = None) -> "Tree":
"""The Tree object for the given treeish revision
Examples::
@@ -558,9 +608,12 @@ class Repo(object):
return self.head.commit.tree
return self.rev_parse(str(rev) + "^{tree}")
- def iter_commits(self, rev: Union[str, Commit, 'SymbolicReference', None] = None,
- paths: Union[PathLike, Sequence[PathLike]] = '',
- **kwargs: Any) -> Iterator[Commit]:
+ def iter_commits(
+ self,
+ rev: Union[str, Commit, "SymbolicReference", None] = None,
+ paths: Union[PathLike, Sequence[PathLike]] = "",
+ **kwargs: Any,
+ ) -> Iterator[Commit]:
"""A list of Commit objects representing the history of a given ref/commit
:param rev:
@@ -584,8 +637,7 @@ class Repo(object):
return Commit.iter_items(self, rev, paths, **kwargs)
- def merge_base(self, *rev: TBD, **kwargs: Any
- ) -> List[Union[Commit_ish, None]]:
+ def merge_base(self, *rev: TBD, **kwargs: Any) -> List[Union[Commit_ish, None]]:
"""Find the closest common ancestor for the given revision (e.g. Commits, Tags, References, etc)
:param rev: At least two revs to find the common ancestor for.
@@ -616,7 +668,7 @@ class Repo(object):
return res
- def is_ancestor(self, ancestor_rev: 'Commit', rev: 'Commit') -> bool:
+ def is_ancestor(self, ancestor_rev: "Commit", rev: "Commit") -> bool:
"""Check if a commit is an ancestor of another
:param ancestor_rev: Rev which should be an ancestor
@@ -639,8 +691,11 @@ class Repo(object):
if object_info.type == object_type.encode():
return True
else:
- log.debug("Commit hash points to an object of type '%s'. Requested were objects of type '%s'",
- object_info.type.decode(), object_type)
+ log.debug(
+ "Commit hash points to an object of type '%s'. Requested were objects of type '%s'",
+ object_info.type.decode(),
+ object_type,
+ )
return False
else:
return True
@@ -662,8 +717,11 @@ class Repo(object):
elif not value and fileexists:
os.unlink(filename)
- daemon_export = property(_get_daemon_export, _set_daemon_export,
- doc="If True, git-daemon may export this repository")
+ daemon_export = property(
+ _get_daemon_export,
+ _set_daemon_export,
+ doc="If True, git-daemon may export this repository",
+ )
del _get_daemon_export
del _set_daemon_export
@@ -672,10 +730,10 @@ class Repo(object):
:return: list of strings being pathnames of alternates"""
if self.git_dir:
- alternates_path = osp.join(self.git_dir, 'objects', 'info', 'alternates')
+ alternates_path = osp.join(self.git_dir, "objects", "info", "alternates")
if osp.exists(alternates_path):
- with open(alternates_path, 'rb') as f:
+ with open(alternates_path, "rb") as f:
alts = f.read().decode(defenc)
return alts.strip().splitlines()
return []
@@ -691,19 +749,28 @@ class Repo(object):
:note:
The method does not check for the existence of the paths in alts
as the caller is responsible."""
- alternates_path = osp.join(self.common_dir, 'objects', 'info', 'alternates')
+ alternates_path = osp.join(self.common_dir, "objects", "info", "alternates")
if not alts:
if osp.isfile(alternates_path):
os.remove(alternates_path)
else:
- with open(alternates_path, 'wb') as f:
+ with open(alternates_path, "wb") as f:
f.write("\n".join(alts).encode(defenc))
- alternates = property(_get_alternates, _set_alternates,
- doc="Retrieve a list of alternates paths or set a list paths to be used as alternates")
-
- def is_dirty(self, index: bool = True, working_tree: bool = True, untracked_files: bool = False,
- submodules: bool = True, path: Optional[PathLike] = None) -> bool:
+ alternates = property(
+ _get_alternates,
+ _set_alternates,
+ doc="Retrieve a list of alternates paths or set a list paths to be used as alternates",
+ )
+
+ def is_dirty(
+ self,
+ index: bool = True,
+ working_tree: bool = True,
+ untracked_files: bool = False,
+ submodules: bool = True,
+ path: Optional[PathLike] = None,
+ ) -> bool:
"""
:return:
``True``, the repository is considered dirty. By default it will react
@@ -715,15 +782,14 @@ class Repo(object):
return False
# start from the one which is fastest to evaluate
- default_args = ['--abbrev=40', '--full-index', '--raw']
+ default_args = ["--abbrev=40", "--full-index", "--raw"]
if not submodules:
- default_args.append('--ignore-submodules')
+ default_args.append("--ignore-submodules")
if path:
default_args.extend(["--", str(path)])
if index:
# diff index against HEAD
- if osp.isfile(self.index.path) and \
- len(self.git.diff('--cached', *default_args)):
+ if osp.isfile(self.index.path) and len(self.git.diff("--cached", *default_args)):
return True
# END index handling
if working_tree:
@@ -755,11 +821,7 @@ class Repo(object):
def _get_untracked_files(self, *args: Any, **kwargs: Any) -> List[str]:
# make sure we get all files, not only untracked directories
- proc = self.git.status(*args,
- porcelain=True,
- untracked_files=True,
- as_process=True,
- **kwargs)
+ proc = self.git.status(*args, porcelain=True, untracked_files=True, as_process=True, **kwargs)
# Untracked files prefix in porcelain mode
prefix = "?? "
untracked_files = []
@@ -767,12 +829,12 @@ class Repo(object):
line = line.decode(defenc)
if not line.startswith(prefix):
continue
- filename = line[len(prefix):].rstrip('\n')
+ filename = line[len(prefix) :].rstrip("\n")
# Special characters are escaped
if filename[0] == filename[-1] == '"':
filename = filename[1:-1]
# WHATEVER ... it's a mess, but works for me
- filename = filename.encode('ascii').decode('unicode_escape').encode('latin1').decode(defenc)
+ filename = filename.encode("ascii").decode("unicode_escape").encode("latin1").decode(defenc)
untracked_files.append(filename)
finalize_process(proc)
return untracked_files
@@ -797,7 +859,7 @@ class Repo(object):
# reveal_type(self.head.reference) # => Reference
return self.head.reference
- def blame_incremental(self, rev: str | HEAD, file: str, **kwargs: Any) -> Iterator['BlameEntry']:
+ def blame_incremental(self, rev: str | HEAD, file: str, **kwargs: Any) -> Iterator["BlameEntry"]:
"""Iterator for blame information for the given file at the given revision.
Unlike .blame(), this does not return the actual file's contents, only
@@ -812,10 +874,10 @@ class Repo(object):
should get a continuous range spanning all line numbers in the file.
"""
- data: bytes = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs)
+ data: bytes = self.git.blame(rev, "--", file, p=True, incremental=True, stdout_as_string=False, **kwargs)
commits: Dict[bytes, Commit] = {}
- stream = (line for line in data.split(b'\n') if line)
+ stream = (line for line in data.split(b"\n") if line)
while True:
try:
line = next(stream) # when exhausted, causes a StopIteration, terminating this function
@@ -835,25 +897,32 @@ class Repo(object):
line = next(stream)
except StopIteration:
return
- if line == b'boundary':
+ if line == b"boundary":
# "boundary" indicates a root commit and occurs
# instead of the "previous" tag
continue
- tag, value = line.split(b' ', 1)
+ tag, value = line.split(b" ", 1)
props[tag] = value
- if tag == b'filename':
+ if tag == b"filename":
# "filename" formally terminates the entry for --incremental
orig_filename = value
break
- c = Commit(self, hex_to_bin(hexsha),
- author=Actor(safe_decode(props[b'author']),
- safe_decode(props[b'author-mail'].lstrip(b'<').rstrip(b'>'))),
- authored_date=int(props[b'author-time']),
- committer=Actor(safe_decode(props[b'committer']),
- safe_decode(props[b'committer-mail'].lstrip(b'<').rstrip(b'>'))),
- committed_date=int(props[b'committer-time']))
+ c = Commit(
+ self,
+ hex_to_bin(hexsha),
+ author=Actor(
+ safe_decode(props[b"author"]),
+ safe_decode(props[b"author-mail"].lstrip(b"<").rstrip(b">")),
+ ),
+ authored_date=int(props[b"author-time"]),
+ committer=Actor(
+ safe_decode(props[b"committer"]),
+ safe_decode(props[b"committer-mail"].lstrip(b"<").rstrip(b">")),
+ ),
+ committed_date=int(props[b"committer-time"]),
+ )
commits[hexsha] = c
else:
# Discard all lines until we find "filename" which is
@@ -863,18 +932,21 @@ class Repo(object):
line = next(stream) # will fail if we reach the EOF unexpectedly
except StopIteration:
return
- tag, value = line.split(b' ', 1)
- if tag == b'filename':
+ tag, value = line.split(b" ", 1)
+ if tag == b"filename":
orig_filename = value
break
- yield BlameEntry(commits[hexsha],
- range(lineno, lineno + num_lines),
- safe_decode(orig_filename),
- range(orig_lineno, orig_lineno + num_lines))
+ yield BlameEntry(
+ commits[hexsha],
+ range(lineno, lineno + num_lines),
+ safe_decode(orig_filename),
+ range(orig_lineno, orig_lineno + num_lines),
+ )
- def blame(self, rev: Union[str, HEAD], file: str, incremental: bool = False, **kwargs: Any
- ) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None:
+ def blame(
+ self, rev: Union[str, HEAD], file: str, incremental: bool = False, **kwargs: Any
+ ) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None:
"""The blame information for the given file at the given revision.
:param rev: revision specifier, see git-rev-parse for viable options.
@@ -886,7 +958,7 @@ class Repo(object):
if incremental:
return self.blame_incremental(rev, file, **kwargs)
- data: bytes = self.git.blame(rev, '--', file, p=True, stdout_as_string=False, **kwargs)
+ data: bytes = self.git.blame(rev, "--", file, p=True, stdout_as_string=False, **kwargs)
commits: Dict[str, Commit] = {}
blames: List[List[Commit | List[str | bytes] | None]] = []
@@ -909,7 +981,7 @@ class Repo(object):
try:
line_str = line_bytes.rstrip().decode(defenc)
except UnicodeDecodeError:
- firstpart = ''
+ firstpart = ""
parts = []
is_binary = True
else:
@@ -929,10 +1001,10 @@ class Repo(object):
# another line of blame with the same data
digits = parts[-1].split(" ")
if len(digits) == 3:
- info = {'id': firstpart}
+ info = {"id": firstpart}
blames.append([None, []])
- elif info['id'] != firstpart:
- info = {'id': firstpart}
+ elif info["id"] != firstpart:
+ info = {"id": firstpart}
blames.append([commits.get(firstpart), []])
# END blame data initialization
else:
@@ -948,17 +1020,17 @@ class Repo(object):
# committer-time 1192271832
# committer-tz -0700 - IGNORED BY US
role = m.group(0)
- if role == 'author':
- if firstpart.endswith('-mail'):
+ if role == "author":
+ if firstpart.endswith("-mail"):
info["author_email"] = parts[-1]
- elif firstpart.endswith('-time'):
+ elif firstpart.endswith("-time"):
info["author_date"] = int(parts[-1])
elif role == firstpart:
info["author"] = parts[-1]
- elif role == 'committer':
- if firstpart.endswith('-mail'):
+ elif role == "committer":
+ if firstpart.endswith("-mail"):
info["committer_email"] = parts[-1]
- elif firstpart.endswith('-time'):
+ elif firstpart.endswith("-time"):
info["committer_date"] = int(parts[-1])
elif role == firstpart:
info["committer"] = parts[-1]
@@ -968,21 +1040,23 @@ class Repo(object):
# filename lib/grit.rb
# summary add Blob
# <and rest>
- if firstpart.startswith('filename'):
- info['filename'] = parts[-1]
- elif firstpart.startswith('summary'):
- info['summary'] = parts[-1]
- elif firstpart == '':
+ if firstpart.startswith("filename"):
+ info["filename"] = parts[-1]
+ elif firstpart.startswith("summary"):
+ info["summary"] = parts[-1]
+ elif firstpart == "":
if info:
- sha = info['id']
+ sha = info["id"]
c = commits.get(sha)
if c is None:
- c = Commit(self, hex_to_bin(sha),
- author=Actor._from_string(f"{info['author']} {info['author_email']}"),
- authored_date=info['author_date'],
- committer=Actor._from_string(
- f"{info['committer']} {info['committer_email']}"),
- committed_date=info['committer_date'])
+ c = Commit(
+ self,
+ hex_to_bin(sha),
+ author=Actor._from_string(f"{info['author']} {info['author_email']}"),
+ authored_date=info["author_date"],
+ committer=Actor._from_string(f"{info['committer']} {info['committer_email']}"),
+ committed_date=info["committer_date"],
+ )
commits[sha] = c
blames[-1][0] = c
# END if commit objects needs initial creation
@@ -990,7 +1064,7 @@ class Repo(object):
if blames[-1][1] is not None:
line: str | bytes
if not is_binary:
- if line_str and line_str[0] == '\t':
+ if line_str and line_str[0] == "\t":
line_str = line_str[1:]
line = line_str
else:
@@ -1001,16 +1075,22 @@ class Repo(object):
# the last line we have seen.
blames[-1][1].append(line)
- info = {'id': sha}
+ info = {"id": sha}
# END if we collected commit info
# END distinguish filename,summary,rest
# END distinguish author|committer vs filename,summary,rest
# END distinguish hexsha vs other information
return blames
- @ classmethod
- def init(cls, path: Union[PathLike, None] = None, mkdir: bool = True, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB,
- expand_vars: bool = True, **kwargs: Any) -> 'Repo':
+ @classmethod
+ def init(
+ cls,
+ path: Union[PathLike, None] = None,
+ mkdir: bool = True,
+ odbt: Type[GitCmdObjectDB] = GitCmdObjectDB,
+ expand_vars: bool = True,
+ **kwargs: Any,
+ ) -> "Repo":
"""Initialize a git repository at the given path if specified
:param path:
@@ -1047,12 +1127,18 @@ class Repo(object):
git.init(**kwargs)
return cls(path, odbt=odbt)
- @ classmethod
- def _clone(cls, git: 'Git', url: PathLike, path: PathLike, odb_default_type: Type[GitCmdObjectDB],
- progress: Union['RemoteProgress', 'UpdateProgress', Callable[..., 'RemoteProgress'], None] = None,
- multi_options: Optional[List[str]] = None, **kwargs: Any
- ) -> 'Repo':
- odbt = kwargs.pop('odbt', odb_default_type)
+ @classmethod
+ def _clone(
+ cls,
+ git: "Git",
+ url: PathLike,
+ path: PathLike,
+ odb_default_type: Type[GitCmdObjectDB],
+ progress: Union["RemoteProgress", "UpdateProgress", Callable[..., "RemoteProgress"], None] = None,
+ multi_options: Optional[List[str]] = None,
+ **kwargs: Any,
+ ) -> "Repo":
+ odbt = kwargs.pop("odbt", odb_default_type)
# when pathlib.Path or other classbased path is passed
if not isinstance(path, str):
@@ -1064,23 +1150,34 @@ class Repo(object):
# becomes::
# git clone --bare /cygwin/d/foo.git /cygwin/d/C:\\Work
#
- clone_path = (Git.polish_url(path)
- if Git.is_cygwin() and 'bare' in kwargs
- else path)
- sep_dir = kwargs.get('separate_git_dir')
+ clone_path = Git.polish_url(path) if Git.is_cygwin() and "bare" in kwargs else path
+ sep_dir = kwargs.get("separate_git_dir")
if sep_dir:
- kwargs['separate_git_dir'] = Git.polish_url(sep_dir)
+ kwargs["separate_git_dir"] = Git.polish_url(sep_dir)
multi = None
if multi_options:
- multi = shlex.split(' '.join(multi_options))
- proc = git.clone(multi, Git.polish_url(str(url)), clone_path, with_extended_output=True, as_process=True,
- v=True, universal_newlines=True, **add_progress(kwargs, git, progress))
+ multi = shlex.split(" ".join(multi_options))
+ proc = git.clone(
+ multi,
+ Git.polish_url(str(url)),
+ clone_path,
+ with_extended_output=True,
+ as_process=True,
+ v=True,
+ universal_newlines=True,
+ **add_progress(kwargs, git, progress),
+ )
if progress:
- handle_process_output(proc, None, to_progress_instance(progress).new_message_handler(),
- finalize_process, decode_streams=False)
+ handle_process_output(
+ proc,
+ None,
+ to_progress_instance(progress).new_message_handler(),
+ finalize_process,
+ decode_streams=False,
+ )
else:
(stdout, stderr) = proc.communicate()
- cmdline = getattr(proc, 'args', '')
+ cmdline = getattr(proc, "args", "")
cmdline = remove_password_if_present(cmdline)
log.debug("Cmd(%s)'s unused stdout: %s", cmdline, stdout)
@@ -1103,12 +1200,17 @@ class Repo(object):
# sure
if repo.remotes:
with repo.remotes[0].config_writer as writer:
- writer.set_value('url', Git.polish_url(repo.remotes[0].url))
+ writer.set_value("url", Git.polish_url(repo.remotes[0].url))
# END handle remote repo
return repo
- def clone(self, path: PathLike, progress: Optional[Callable] = None,
- multi_options: Optional[List[str]] = None, **kwargs: Any) -> 'Repo':
+ def clone(
+ self,
+ path: PathLike,
+ progress: Optional[Callable] = None,
+ multi_options: Optional[List[str]] = None,
+ **kwargs: Any,
+ ) -> "Repo":
"""Create a clone from this repository.
:param path: is the full path of the new repo (traditionally ends with ./<name>.git).
@@ -1123,12 +1225,26 @@ class Repo(object):
* All remaining keyword arguments are given to the git-clone command
:return: ``git.Repo`` (the newly cloned repo)"""
- return self._clone(self.git, self.common_dir, path, type(self.odb), progress, multi_options, **kwargs)
-
- @ classmethod
- def clone_from(cls, url: PathLike, to_path: PathLike, progress: Optional[Callable] = None,
- env: Optional[Mapping[str, str]] = None,
- multi_options: Optional[List[str]] = None, **kwargs: Any) -> 'Repo':
+ return self._clone(
+ self.git,
+ self.common_dir,
+ path,
+ type(self.odb),
+ progress,
+ multi_options,
+ **kwargs,
+ )
+
+ @classmethod
+ def clone_from(
+ cls,
+ url: PathLike,
+ to_path: PathLike,
+ progress: Optional[Callable] = None,
+ env: Optional[Mapping[str, str]] = None,
+ multi_options: Optional[List[str]] = None,
+ **kwargs: Any,
+ ) -> "Repo":
"""Create a clone from the given URL
:param url: valid git url, see http://www.kernel.org/pub/software/scm/git/docs/git-clone.html#URLS
@@ -1148,8 +1264,13 @@ class Repo(object):
git.update_environment(**env)
return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs)
- def archive(self, ostream: Union[TextIO, BinaryIO], treeish: Optional[str] = None,
- prefix: Optional[str] = None, **kwargs: Any) -> Repo:
+ def archive(
+ self,
+ ostream: Union[TextIO, BinaryIO],
+ treeish: Optional[str] = None,
+ prefix: Optional[str] = None,
+ **kwargs: Any,
+ ) -> Repo:
"""Archive the tree at the given revision.
:param ostream: file compatible stream object to which the archive will be written as bytes
@@ -1166,10 +1287,10 @@ class Repo(object):
:return: self"""
if treeish is None:
treeish = self.head.commit
- if prefix and 'prefix' not in kwargs:
- kwargs['prefix'] = prefix
- kwargs['output_stream'] = ostream
- path = kwargs.pop('path', [])
+ if prefix and "prefix" not in kwargs:
+ kwargs["prefix"] = prefix
+ kwargs["output_stream"] = ostream
+ path = kwargs.pop("path", [])
path = cast(Union[PathLike, List[PathLike], Tuple[PathLike, ...]], path)
if not isinstance(path, (tuple, list)):
path = [path]
@@ -1186,7 +1307,7 @@ class Repo(object):
if self.bare:
return False
if self.working_tree_dir:
- return osp.isfile(osp.join(self.working_tree_dir, '.git'))
+ return osp.isfile(osp.join(self.working_tree_dir, ".git"))
else:
return False # or raise Error?
@@ -1194,7 +1315,7 @@ class Repo(object):
def __repr__(self) -> str:
clazz = self.__class__
- return '<%s.%s %r>' % (clazz.__module__, clazz.__name__, self.git_dir)
+ return "<%s.%s %r>" % (clazz.__module__, clazz.__name__, self.git_dir)
def currently_rebasing_on(self) -> Commit | None:
"""