diff options
| -rw-r--r-- | .github/workflows/cygwin-test.yml | 15 | ||||
| -rw-r--r-- | .github/workflows/lint.yml | 14 | ||||
| -rw-r--r-- | .github/workflows/pythonpackage.yml | 12 | ||||
| -rw-r--r-- | .pre-commit-config.yaml | 19 | ||||
| -rw-r--r-- | README.md | 2 | ||||
| -rw-r--r-- | git/cmd.py | 5 | ||||
| -rw-r--r-- | git/diff.py | 5 | ||||
| -rw-r--r-- | git/objects/commit.py | 4 | ||||
| -rw-r--r-- | git/objects/submodule/base.py | 8 | ||||
| -rw-r--r-- | git/objects/util.py | 35 | ||||
| -rw-r--r-- | git/remote.py | 9 | ||||
| -rw-r--r-- | git/repo/base.py | 82 | ||||
| -rw-r--r-- | git/repo/fun.py | 3 | ||||
| -rwxr-xr-x | setup.py | 7 | ||||
| -rw-r--r-- | test-requirements.txt | 5 | ||||
| -rw-r--r-- | test/test_base.py | 9 | ||||
| -rw-r--r-- | test/test_commit.py | 31 | ||||
| -rw-r--r-- | test/test_diff.py | 70 | ||||
| -rw-r--r-- | test/test_remote.py | 424 | ||||
| -rw-r--r-- | test/test_repo.py | 336 | ||||
| -rw-r--r-- | test/test_submodule.py | 238 | ||||
| -rw-r--r-- | test/test_util.py | 21 |
22 files changed, 811 insertions, 543 deletions
diff --git a/.github/workflows/cygwin-test.yml b/.github/workflows/cygwin-test.yml index 16b42f89..0018e7df 100644 --- a/.github/workflows/cygwin-test.yml +++ b/.github/workflows/cygwin-test.yml @@ -1,16 +1,12 @@ name: test-cygwin -on: - push: - branches: - main - pull_request: - branches: - main +on: [push, pull_request, workflow_dispatch] jobs: build: runs-on: windows-latest + strategy: + fail-fast: false env: CHERE_INVOKING: 1 SHELLOPTS: igncr @@ -47,11 +43,6 @@ jobs: # If we rewrite the user's config by accident, we will mess it up # and cause subsequent tests to fail cat test/fixtures/.gitconfig >> ~/.gitconfig - - name: Lint with flake8 - shell: bash.exe -eo pipefail -o igncr "{0}" - run: | - set -x - /usr/bin/python -m flake8 - name: Test with pytest shell: bash.exe -eo pipefail -o igncr "{0}" run: | diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 00000000..c78a4053 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,14 @@ +name: Lint + +on: [push, pull_request, workflow_dispatch] + +jobs: + lint: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.x" + - uses: pre-commit/action@v3.0.0 diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index 5373dace..6d6c6795 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -3,11 +3,7 @@ name: Python package -on: - push: - branches: [ main ] - pull_request: - branches: [ main ] +on: [push, pull_request, workflow_dispatch] permissions: contents: read @@ -17,6 +13,7 @@ jobs: runs-on: ubuntu-latest strategy: + fail-fast: false matrix: python-version: [3.7, 3.8, 3.9, "3.10", "3.11"] @@ -47,11 +44,6 @@ jobs: # and cause subsequent tests to fail cat test/fixtures/.gitconfig >> ~/.gitconfig - - name: Lint with flake8 - run: | - set -x - flake8 - - name: Check types with mypy # With new versions of pypi new issues might arise. This is a problem if there is nobody able to fix them, # so we have to ignore errors until that changes. diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 00000000..581cb69b --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,19 @@ +repos: + - repo: https://github.com/PyCQA/flake8 + rev: 6.0.0 + hooks: + - id: flake8 + additional_dependencies: + [ + flake8-bugbear==22.12.6, + flake8-comprehensions==3.10.1, + flake8-typing-imports==1.14.0, + ] + exclude: ^doc|^git/ext/|^test/ + + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: check-merge-conflict + - id: check-toml + - id: check-yaml @@ -107,7 +107,7 @@ with MINGW's. Ensure testing libraries are installed. In the root directory, run: `pip install -r test-requirements.txt` -To lint, run: `flake8` +To lint, run: `pre-commit run --all-files` To typecheck, run: `mypy -p git` @@ -695,15 +695,14 @@ class Git(LazyMixin): return self def __next__(self) -> bytes: - return next(self) - - def next(self) -> bytes: line = self.readline() if not line: raise StopIteration return line + next = __next__ + def __del__(self) -> None: bytes_left = self._size - self._nbr if bytes_left: diff --git a/git/diff.py b/git/diff.py index c4424592..c1a5bd26 100644 --- a/git/diff.py +++ b/git/diff.py @@ -144,7 +144,10 @@ class Diffable(object): args.append("--abbrev=40") # we need full shas args.append("--full-index") # get full index paths, not only filenames - args.append("-M") # check for renames, in both formats + # remove default '-M' arg (check for renames) if user is overriding it + if not any(x in kwargs for x in ('find_renames', 'no_renames', 'M')): + args.append("-M") + if create_patch: args.append("-p") else: diff --git a/git/objects/commit.py b/git/objects/commit.py index 82d2387b..547e8fe8 100644 --- a/git/objects/commit.py +++ b/git/objects/commit.py @@ -324,14 +324,14 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): :return: git.Stats""" if not self.parents: - text = self.repo.git.diff_tree(self.hexsha, "--", numstat=True, root=True) + text = self.repo.git.diff_tree(self.hexsha, "--", numstat=True, no_renames=True, root=True) text2 = "" for line in text.splitlines()[1:]: (insertions, deletions, filename) = line.split("\t") text2 += "%s\t%s\t%s\n" % (insertions, deletions, filename) text = text2 else: - text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, "--", numstat=True) + text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, "--", numstat=True, no_renames=True) return Stats._list_from_string(self.repo, text) @property diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py index 9aa9deb2..7db64d70 100644 --- a/git/objects/submodule/base.py +++ b/git/objects/submodule/base.py @@ -287,7 +287,9 @@ class Submodule(IndexObject, TraversableIterableObj): :param url: url to clone from :param path: repository - relative path to the submodule checkout location :param name: canonical of the submodule - :param kwrags: additinoal arguments given to git.clone""" + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack + :param kwargs: additional arguments given to git.clone""" module_abspath = cls._module_abspath(repo, path, name) module_checkout_path = module_abspath if cls._need_gitfile_submodules(repo.git): @@ -411,6 +413,8 @@ class Submodule(IndexObject, TraversableIterableObj): as its value. :param clone_multi_options: A list of Clone options. Please see ``git.repo.base.Repo.clone`` for details. + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :return: The newly created submodule instance :note: works atomically, such that no change will be done if the repository update fails for instance""" @@ -581,6 +585,8 @@ class Submodule(IndexObject, TraversableIterableObj): as its value. :param clone_multi_options: list of Clone options. Please see ``git.repo.base.Repo.clone`` for details. Only take effect with `init` option. + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :note: does nothing in bare repositories :note: method is definitely not atomic if recurisve is True :return: self""" diff --git a/git/objects/util.py b/git/objects/util.py index f405d628..af279154 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -137,22 +137,25 @@ def get_object_type_by_name( def utctz_to_altz(utctz: str) -> int: - """we convert utctz to the timezone in seconds, it is the format time.altzone - returns. Git stores it as UTC timezone which has the opposite sign as well, - which explains the -1 * ( that was made explicit here ) - - :param utctz: git utc timezone string, i.e. +0200""" - return -1 * int(float(utctz) / 100 * 3600) - - -def altz_to_utctz_str(altz: float) -> str: - """As above, but inverses the operation, returning a string that can be used - in commit objects""" - utci = -1 * int((float(altz) / 3600) * 100) - utcs = str(abs(utci)) - utcs = "0" * (4 - len(utcs)) + utcs - prefix = (utci < 0 and "-") or "+" - return prefix + utcs + """Convert a git timezone offset into a timezone offset west of + UTC in seconds (compatible with time.altzone). + + :param utctz: git utc timezone string, i.e. +0200 + """ + int_utctz = int(utctz) + seconds = ((abs(int_utctz) // 100) * 3600 + (abs(int_utctz) % 100) * 60) + return seconds if int_utctz < 0 else -seconds + + +def altz_to_utctz_str(altz: int) -> str: + """Convert a timezone offset west of UTC in seconds into a git timezone offset string + + :param altz: timezone offset in seconds west of UTC + """ + hours = abs(altz) // 3600 + minutes = (abs(altz) % 3600) // 60 + sign = "-" if altz >= 60 else "+" + return "{}{:02}{:02}".format(sign, hours, minutes) def verify_utctz(offset: str) -> str: diff --git a/git/remote.py b/git/remote.py index 3f86a297..5886a69f 100644 --- a/git/remote.py +++ b/git/remote.py @@ -641,6 +641,7 @@ class Remote(LazyMixin, IterableObj): :param new_url: string being the URL to add as an extra remote URL :param old_url: when set, replaces this URL with new_url for the remote + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext :return: self """ if not allow_unsafe_protocols: @@ -660,6 +661,7 @@ class Remote(LazyMixin, IterableObj): multiple URLs for a single remote. :param url: string being the URL to add as an extra remote URL + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext :return: self """ return self.set_url(url, add=True, allow_unsafe_protocols=allow_unsafe_protocols) @@ -760,6 +762,7 @@ class Remote(LazyMixin, IterableObj): :param repo: Repository instance that is to receive the new remote :param name: Desired name of the remote :param url: URL which corresponds to the remote's name + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext :param kwargs: Additional arguments to be passed to the git-remote add command :return: New Remote instance :raise GitCommandError: in case an origin with that name already exists""" @@ -978,6 +981,8 @@ class Remote(LazyMixin, IterableObj): :param kill_after_timeout: To specify a timeout in seconds for the git command, after which the process should be killed. It is set to None by default. + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :param kwargs: Additional arguments to be passed to git-fetch :return: IterableList(FetchInfo, ...) list of FetchInfo instances providing detailed @@ -1027,6 +1032,8 @@ class Remote(LazyMixin, IterableObj): :param refspec: see :meth:`fetch` method :param progress: see :meth:`push` method :param kill_after_timeout: see :meth:`fetch` method + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :param kwargs: Additional arguments to be passed to git-pull :return: Please see :meth:`fetch` method""" if refspec is None: @@ -1077,6 +1084,8 @@ class Remote(LazyMixin, IterableObj): :param kill_after_timeout: To specify a timeout in seconds for the git command, after which the process should be killed. It is set to None by default. + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --receive-pack :param kwargs: Additional arguments to be passed to git-push :return: A ``PushInfoList`` object, where each list member diff --git a/git/repo/base.py b/git/repo/base.py index 93ed0c71..2fc9cf1f 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -9,6 +9,9 @@ import os import re import shlex import warnings + +from pathlib import Path + from gitdb.db.loose import LooseObjectDB from gitdb.exc import BadObject @@ -112,7 +115,7 @@ class Repo(object): 'working_dir' is the working directory of the git command, which is the working tree directory if available or the .git directory in case of bare repositories - 'working_tree_dir' is the working tree directory, but will raise AssertionError + 'working_tree_dir' is the working tree directory, but will return None if we are a bare repository. 'git_dir' is the .git repository directory, which is always set.""" @@ -120,9 +123,9 @@ class Repo(object): DAEMON_EXPORT_FILE = "git-daemon-export-ok" git = cast("Git", None) # Must exist, or __del__ will fail in case we raise on `__init__()` - working_dir: Optional[PathLike] = None + working_dir: PathLike _working_tree_dir: Optional[PathLike] = None - git_dir: PathLike = "" + git_dir: PathLike _common_dir: PathLike = "" # precompiled regex @@ -212,13 +215,14 @@ class Repo(object): ## Walk up the path to find the `.git` dir. # curpath = epath + git_dir = None while curpath: # ABOUT osp.NORMPATH # It's important to normalize the paths, as submodules will otherwise initialize their # repo instances with paths that depend on path-portions that will not exist after being # removed. It's just cleaner. if is_git_dir(curpath): - self.git_dir = curpath + git_dir = curpath # from man git-config : core.worktree # Set the path to the root of the working tree. If GIT_COMMON_DIR environment # variable is set, core.worktree is ignored and not used for determining the @@ -227,9 +231,9 @@ class Repo(object): # directory, which is either specified by GIT_DIR, or automatically discovered. # If GIT_DIR is specified but none of GIT_WORK_TREE and core.worktree is specified, # the current working directory is regarded as the top level of your working tree. - self._working_tree_dir = os.path.dirname(self.git_dir) + self._working_tree_dir = os.path.dirname(git_dir) if os.environ.get("GIT_COMMON_DIR") is None: - gitconf = self.config_reader("repository") + gitconf = self._config_reader("repository", git_dir) if gitconf.has_option("core", "worktree"): self._working_tree_dir = gitconf.get("core", "worktree") if "GIT_WORK_TREE" in os.environ: @@ -239,14 +243,14 @@ class Repo(object): dotgit = osp.join(curpath, ".git") sm_gitpath = find_submodule_git_dir(dotgit) if sm_gitpath is not None: - self.git_dir = osp.normpath(sm_gitpath) + git_dir = osp.normpath(sm_gitpath) sm_gitpath = find_submodule_git_dir(dotgit) if sm_gitpath is None: sm_gitpath = find_worktree_git_dir(dotgit) if sm_gitpath is not None: - self.git_dir = expand_path(sm_gitpath, expand_vars) + git_dir = expand_path(sm_gitpath, expand_vars) self._working_tree_dir = curpath break @@ -257,8 +261,9 @@ class Repo(object): break # END while curpath - if self.git_dir is None: + if git_dir is None: raise InvalidGitRepositoryError(epath) + self.git_dir = git_dir self._bare = False try: @@ -268,7 +273,7 @@ class Repo(object): pass try: - common_dir = open(osp.join(self.git_dir, "commondir"), "rt").readlines()[0].strip() + common_dir = (Path(self.git_dir) / "commondir").read_text().splitlines()[0].strip() self._common_dir = osp.join(self.git_dir, common_dir) except OSError: self._common_dir = "" @@ -279,7 +284,7 @@ class Repo(object): self._working_tree_dir = None # END working dir handling - self.working_dir: Optional[PathLike] = self._working_tree_dir or self.common_dir + self.working_dir: PathLike = self._working_tree_dir or self.common_dir self.git = self.GitCommandWrapperType(self.working_dir) # special handling, in special times @@ -317,7 +322,7 @@ class Repo(object): gc.collect() def __eq__(self, rhs: object) -> bool: - if isinstance(rhs, Repo) and self.git_dir: + if isinstance(rhs, Repo): return self.git_dir == rhs.git_dir return False @@ -329,14 +334,12 @@ class Repo(object): # Description property def _get_description(self) -> str: - if self.git_dir: - filename = osp.join(self.git_dir, "description") + filename = osp.join(self.git_dir, "description") with open(filename, "rb") as fp: return fp.read().rstrip().decode(defenc) def _set_description(self, descr: str) -> None: - if self.git_dir: - filename = osp.join(self.git_dir, "description") + filename = osp.join(self.git_dir, "description") with open(filename, "wb") as fp: fp.write((descr + "\n").encode(defenc)) @@ -354,13 +357,7 @@ class Repo(object): """ :return: The git dir that holds everything except possibly HEAD, FETCH_HEAD, ORIG_HEAD, COMMIT_EDITMSG, index, and logs/.""" - if self._common_dir: - return self._common_dir - elif self.git_dir: - return self.git_dir - else: - # or could return "" - raise InvalidGitRepositoryError() + return self._common_dir or self.git_dir @property def bare(self) -> bool: @@ -529,7 +526,9 @@ class Repo(object): """Delete the given remote.""" return Remote.remove(self, remote) - def _get_config_path(self, config_level: Lit_config_levels) -> str: + def _get_config_path(self, config_level: Lit_config_levels, git_dir: Optional[PathLike] = None) -> str: + if git_dir is None: + git_dir = self.git_dir # we do not support an absolute path of the gitconfig on windows , # use the global config instead if is_win and config_level == "system": @@ -543,7 +542,7 @@ class Repo(object): elif config_level == "global": return osp.normpath(osp.expanduser("~/.gitconfig")) elif config_level == "repository": - repo_dir = self._common_dir or self.git_dir + repo_dir = self._common_dir or git_dir if not repo_dir: raise NotADirectoryError else: @@ -572,15 +571,21 @@ class Repo(object): you know which file you wish to read to prevent reading multiple files. :note: On windows, system configuration cannot currently be read as the path is unknown, instead the global path will be used.""" - files = None + return self._config_reader(config_level=config_level) + + def _config_reader( + self, + config_level: Optional[Lit_config_levels] = None, + git_dir: Optional[PathLike] = None, + ) -> GitConfigParser: if config_level is None: files = [ - self._get_config_path(cast(Lit_config_levels, f)) + self._get_config_path(cast(Lit_config_levels, f), git_dir) for f in self.config_level if cast(Lit_config_levels, f) ] else: - files = [self._get_config_path(config_level)] + files = [self._get_config_path(config_level, git_dir)] return GitConfigParser(files, read_only=True, repo=self) def config_writer(self, config_level: Lit_config_levels = "repository") -> GitConfigParser: @@ -870,8 +875,15 @@ class Repo(object): """ try: proc: str = self.git.check_ignore(*paths) - except GitCommandError: - return [] + except GitCommandError as err: + # If return code is 1, this means none of the items in *paths + # are ignored by Git, so return an empty list. Raise the + # exception on all other return codes. + if err.status == 1: + return [] + else: + raise + return proc.replace("\\\\", "\\").replace('"', "").split("\n") @property @@ -1259,7 +1271,8 @@ class Repo(object): option per list item which is passed exactly as specified to clone. For example ['--config core.filemode=false', '--config core.ignorecase', '--recurse-submodule=repo1_path', '--recurse-submodule=repo2_path'] - :param unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :param kwargs: * odbt = ObjectDatabase Type, allowing to determine the object database implementation used by the returned Repo instance @@ -1302,7 +1315,8 @@ class Repo(object): If you want to unset some variable, consider providing empty string as its value. :param multi_options: See ``clone`` method - :param unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :param kwargs: see the ``clone`` method :return: Repo instance pointing to the cloned directory""" git = cls.GitCommandWrapperType(os.getcwd()) @@ -1383,4 +1397,6 @@ class Repo(object): rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD") if not osp.isfile(rebase_head_file): return None - return self.commit(open(rebase_head_file, "rt").readline().strip()) + with open(rebase_head_file, "rt") as f: + content = f.readline().strip() + return self.commit(content) diff --git a/git/repo/fun.py b/git/repo/fun.py index 2ca2e3d6..ae35aa81 100644 --- a/git/repo/fun.py +++ b/git/repo/fun.py @@ -2,6 +2,7 @@ from __future__ import annotations import os import stat +from pathlib import Path from string import digits from git.exc import WorkTreeRepositoryUnsupported @@ -83,7 +84,7 @@ def find_worktree_git_dir(dotgit: "PathLike") -> Optional[str]: return None try: - lines = open(dotgit, "r").readlines() + lines = Path(dotgit).read_text().splitlines() for key, value in [line.strip().split(": ") for line in lines]: if key == "gitdir": return value @@ -44,7 +44,7 @@ class sdist(_sdist): def _stamp_version(filename: str) -> None: found, out = False, [] try: - with open(filename, "r") as f: + with open(filename) as f: for line in f: if "__version__ =" in line: line = line.replace("\"git\"", "'%s'" % VERSION) @@ -82,7 +82,7 @@ setup( name="GitPython", cmdclass={"build_py": build_py, "sdist": sdist}, version=VERSION, - description="""GitPython is a python library used to interact with Git repositories""", + description="GitPython is a Python library used to interact with Git repositories", author="Sebastian Thiel, Michael Trier", author_email="byronimo@gmail.com, mtrier@gmail.com", license="BSD", @@ -95,7 +95,7 @@ setup( install_requires=requirements, tests_require=requirements + test_requirements, zip_safe=False, - long_description="""GitPython is a python library used to interact with Git repositories""", + long_description="""GitPython is a Python library used to interact with Git repositories""", long_description_content_type="text/markdown", classifiers=[ # Picked from @@ -121,5 +121,6 @@ setup( "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", ], ) diff --git a/test-requirements.txt b/test-requirements.txt index 6549f0fa..6c6d5706 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,10 +3,7 @@ mypy black -flake8 -flake8-bugbear -flake8-comprehensions -flake8-typing-imports +pre-commit virtualenv diff --git a/test/test_base.py b/test/test_base.py index ccfdc8ed..30029367 100644 --- a/test/test_base.py +++ b/test/test_base.py @@ -9,6 +9,7 @@ import sys import tempfile from unittest import SkipTest, skipIf +from git import Repo from git.objects import Blob, Tree, Commit, TagObject from git.compat import is_win from git.objects.util import get_object_type_by_name @@ -95,14 +96,18 @@ class TestBase(_TestBase): self.assertEqual(self.rorepo.head.reference.object, self.rorepo.active_branch.object) @with_rw_repo("HEAD", bare=True) - def test_with_bare_rw_repo(self, bare_rw_repo): + def test_with_bare_rw_repo(self, bare_rw_repo: Repo): assert bare_rw_repo.config_reader("repository").getboolean("core", "bare") assert osp.isfile(osp.join(bare_rw_repo.git_dir, "HEAD")) + assert osp.isdir(bare_rw_repo.working_dir) + assert bare_rw_repo.working_tree_dir is None @with_rw_repo("0.1.6") - def test_with_rw_repo(self, rw_repo): + def test_with_rw_repo(self, rw_repo: Repo): assert not rw_repo.config_reader("repository").getboolean("core", "bare") + assert osp.isdir(rw_repo.working_tree_dir) assert osp.isdir(osp.join(rw_repo.working_tree_dir, "lib")) + assert osp.isdir(rw_repo.working_dir) @skipIf(HIDE_WINDOWS_FREEZE_ERRORS, "FIXME: Freezes! sometimes...") @with_rw_and_rw_remote_repo("0.1.6") diff --git a/test/test_commit.py b/test/test_commit.py index c5a43c94..1efc6889 100644 --- a/test/test_commit.py +++ b/test/test_commit.py @@ -159,6 +159,37 @@ class TestCommit(TestCommitSerialization): self.assertEqual(commit.committer_tz_offset, 14400, commit.committer_tz_offset) self.assertEqual(commit.message, "initial project\n") + def test_renames(self): + commit = self.rorepo.commit("185d847ec7647fd2642a82d9205fb3d07ea71715") + files = commit.stats.files + + # when a file is renamed, the output of git diff is like "dir/{old => new}" + # unless we disable rename with --no-renames, which produces two lines + # one with the old path deletes and another with the new added + self.assertEqual(len(files), 2) + + def check_entries(path, changes): + expected = { + ".github/workflows/Future.yml" : { + 'insertions': 57, + 'deletions': 0, + 'lines': 57 + }, + ".github/workflows/test_pytest.yml" : { + 'insertions': 0, + 'deletions': 55, + 'lines': 55 + }, + } + assert path in expected + assert isinstance(changes, dict) + for key in ("insertions", "deletions", "lines"): + assert changes[key] == expected[path][key] + + for path, changes in files.items(): + check_entries(path, changes) + # END for each stated file + def test_unicode_actor(self): # assure we can parse unicode actors correctly name = "Üäöß ÄußÉ" diff --git a/test/test_diff.py b/test/test_diff.py index 7065f063..50433774 100644 --- a/test/test_diff.py +++ b/test/test_diff.py @@ -411,3 +411,73 @@ class TestDiff(TestBase): cp = c.parents[0] diff_index = c.diff(cp, ["does/not/exist"]) self.assertEqual(len(diff_index), 0) + + @with_rw_directory + def test_rename_override(self, rw_dir): + """Test disabling of diff rename detection""" + + # create and commit file_a.txt + repo = Repo.init(rw_dir) + file_a = osp.join(rw_dir, "file_a.txt") + with open(file_a, "w", encoding='utf-8') as outfile: + outfile.write("hello world\n") + repo.git.add(Git.polish_url(file_a)) + repo.git.commit(message="Added file_a.txt") + + # remove file_a.txt + repo.git.rm(Git.polish_url(file_a)) + + # create and commit file_b.txt with similarity index of 52 + file_b = osp.join(rw_dir, "file_b.txt") + with open(file_b, "w", encoding='utf-8') as outfile: + outfile.write("hello world\nhello world") + repo.git.add(Git.polish_url(file_b)) + repo.git.commit(message="Removed file_a.txt. Added file_b.txt") + + commit_a = repo.commit('HEAD') + commit_b = repo.commit('HEAD~1') + + # check default diff command with renamed files enabled + diffs = commit_b.diff(commit_a) + self.assertEqual(1, len(diffs)) + diff = diffs[0] + self.assertEqual(True, diff.renamed_file) + self.assertEqual('file_a.txt', diff.rename_from) + self.assertEqual('file_b.txt', diff.rename_to) + + # check diff with rename files disabled + diffs = commit_b.diff(commit_a, no_renames=True) + self.assertEqual(2, len(diffs)) + + # check fileA.txt deleted + diff = diffs[0] + self.assertEqual(True, diff.deleted_file) + self.assertEqual('file_a.txt', diff.a_path) + + # check fileB.txt added + diff = diffs[1] + self.assertEqual(True, diff.new_file) + self.assertEqual('file_b.txt', diff.a_path) + + # check diff with high similarity index + diffs = commit_b.diff(commit_a, split_single_char_options=False, M='75%') + self.assertEqual(2, len(diffs)) + + # check fileA.txt deleted + diff = diffs[0] + self.assertEqual(True, diff.deleted_file) + self.assertEqual('file_a.txt', diff.a_path) + + # check fileB.txt added + diff = diffs[1] + self.assertEqual(True, diff.new_file) + self.assertEqual('file_b.txt', diff.a_path) + + # check diff with low similarity index + diffs = commit_b.diff(commit_a, split_single_char_options=False, M='40%') + self.assertEqual(1, len(diffs)) + diff = diffs[0] + self.assertEqual(True, diff.renamed_file) + self.assertEqual('file_a.txt', diff.rename_from) + self.assertEqual('file_b.txt', diff.rename_to) + diff --git a/test/test_remote.py b/test/test_remote.py index 3a47afab..9636ca48 100644 --- a/test/test_remote.py +++ b/test/test_remote.py @@ -694,259 +694,279 @@ class TestRemote(TestBase): @with_rw_repo("HEAD") def test_set_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.set_url(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.set_url(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_set_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - remote.set_url(url, allow_unsafe_protocols=True) - assert list(remote.urls)[-1] == url - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + remote.set_url(url, allow_unsafe_protocols=True) + assert list(remote.urls)[-1] == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_add_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.add_url(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.add_url(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_add_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - remote.add_url(url, allow_unsafe_protocols=True) - assert list(remote.urls)[-1] == url - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + remote.add_url(url, allow_unsafe_protocols=True) + assert list(remote.urls)[-1] == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_create_remote_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - Remote.create(rw_repo, "origin", url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Remote.create(rw_repo, "origin", url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_create_remote_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for i, url in enumerate(urls): - remote = Remote.create(rw_repo, f"origin{i}", url, allow_unsafe_protocols=True) - assert remote.url == url - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for i, url in enumerate(urls): + remote = Remote.create(rw_repo, f"origin{i}", url, allow_unsafe_protocols=True) + assert remote.url == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.fetch(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.fetch(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - remote.fetch(url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.fetch(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_options(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - remote.fetch(**unsafe_option) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + remote.fetch(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_options_allowed(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - assert not tmp_file.exists() - with self.assertRaises(GitCommandError): - remote.fetch(**unsafe_option, allow_unsafe_options=True) - assert tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + remote.fetch(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() @with_rw_repo("HEAD") def test_pull_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.pull(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.pull(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - remote.pull(url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.pull(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_options(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - remote.pull(**unsafe_option) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + remote.pull(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_options_allowed(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - assert not tmp_file.exists() - with self.assertRaises(GitCommandError): - remote.pull(**unsafe_option, allow_unsafe_options=True) - assert tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + remote.pull(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() @with_rw_repo("HEAD") def test_push_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.push(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.push(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - remote.push(url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.push(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_options(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - { - "receive-pack": f"touch {tmp_file}", - "exec": f"touch {tmp_file}", - } - ] - for unsafe_option in unsafe_options: - assert not tmp_file.exists() - with self.assertRaises(UnsafeOptionError): - remote.push(**unsafe_option) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + { + "receive-pack": f"touch {tmp_file}", + "exec": f"touch {tmp_file}", + } + ] + for unsafe_option in unsafe_options: + assert not tmp_file.exists() + with self.assertRaises(UnsafeOptionError): + remote.push(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_options_allowed(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - { - "receive-pack": f"touch {tmp_file}", - "exec": f"touch {tmp_file}", - } - ] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - assert not tmp_file.exists() - with self.assertRaises(GitCommandError): - remote.push(**unsafe_option, allow_unsafe_options=True) - assert tmp_file.exists() - tmp_file.unlink() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + { + "receive-pack": f"touch {tmp_file}", + "exec": f"touch {tmp_file}", + } + ] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + remote.push(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() class TestTimeouts(TestBase): diff --git a/test/test_repo.py b/test/test_repo.py index 5874dbe6..07c1e9ad 100644 --- a/test/test_repo.py +++ b/test/test_repo.py @@ -268,143 +268,176 @@ class TestRepo(TestBase): @with_rw_repo("HEAD") def test_clone_unsafe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - rw_repo.clone(tmp_dir, multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + rw_repo.clone(tmp_dir, multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_clone_unsafe_options_allowed(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not tmp_file.exists() - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not tmp_file.exists() + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not destination.exists() rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) - assert tmp_file.exists() - tmp_file.unlink() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not destination.exists() - rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) - assert destination.exists() + assert destination.exists() @with_rw_repo("HEAD") def test_clone_safe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - options = [ - "--depth=1", - "--single-branch", - "-q", - ] - for option in options: - destination = tmp_dir / option - assert not destination.exists() - rw_repo.clone(destination, multi_options=[option]) - assert destination.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + options = [ + "--depth=1", + "--single-branch", + "-q", + ] + for option in options: + destination = tmp_dir / option + assert not destination.exists() + rw_repo.clone(destination, multi_options=[option]) + assert destination.exists() @with_rw_repo("HEAD") def test_clone_from_unsafe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - Repo.clone_from(rw_repo.working_dir, tmp_dir, multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + Repo.clone_from(rw_repo.working_dir, tmp_dir, multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_clone_from_unsafe_options_allowed(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not tmp_file.exists() - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): - Repo.clone_from( - rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True - ) - assert tmp_file.exists() - tmp_file.unlink() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not destination.exists() - Repo.clone_from(rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True) - assert destination.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not tmp_file.exists() + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + Repo.clone_from( + rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True + ) + assert tmp_file.exists() + tmp_file.unlink() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not destination.exists() + Repo.clone_from(rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True) + assert destination.exists() @with_rw_repo("HEAD") def test_clone_from_safe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - options = [ - "--depth=1", - "--single-branch", - "-q", - ] - for option in options: - destination = tmp_dir / option - assert not destination.exists() - Repo.clone_from(rw_repo.common_dir, destination, multi_options=[option]) - assert destination.exists() - - def test_clone_from_unsafe_procol(self): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - Repo.clone_from(url, tmp_dir) - assert not tmp_file.exists() - - def test_clone_from_unsafe_procol_allowed(self): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - "ext::sh -c touch% /tmp/pwn", - "fd::/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - Repo.clone_from(url, tmp_dir, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + options = [ + "--depth=1", + "--single-branch", + "-q", + ] + for option in options: + destination = tmp_dir / option + assert not destination.exists() + Repo.clone_from(rw_repo.common_dir, destination, multi_options=[option]) + assert destination.exists() + + def test_clone_from_unsafe_protocol(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Repo.clone_from(url, tmp_dir / "repo") + assert not tmp_file.exists() + + def test_clone_from_unsafe_protocol_allowed(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + Repo.clone_from(url, tmp_dir / "repo", allow_unsafe_protocols=True) + assert not tmp_file.exists() + + def test_clone_from_unsafe_protocol_allowed_and_enabled(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + ] + allow_ext = [ + "--config=protocol.ext.allow=always", + ] + for url in urls: + # The URL will be allowed into the command, and the protocol is enabled, + # but the command will fail since it can't read from the remote repo. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + Repo.clone_from( + url, + tmp_dir / "repo", + multi_options=allow_ext, + allow_unsafe_protocols=True, + allow_unsafe_options=True, + ) + assert tmp_file.exists() + tmp_file.unlink() @with_rw_repo("HEAD") def test_max_chunk_size(self, repo): @@ -1326,26 +1359,55 @@ class TestRepo(TestBase): @with_rw_repo("HEAD") def test_clone_command_injection(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - unexpected_file = tmp_dir / "pwn" - assert not unexpected_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + unexpected_file = tmp_dir / "pwn" + assert not unexpected_file.exists() - payload = f"--upload-pack=touch {unexpected_file}" - rw_repo.clone(payload) + payload = f"--upload-pack=touch {unexpected_file}" + rw_repo.clone(payload) - assert not unexpected_file.exists() - # A repo was cloned with the payload as name - assert pathlib.Path(payload).exists() + assert not unexpected_file.exists() + # A repo was cloned with the payload as name + assert pathlib.Path(payload).exists() @with_rw_repo("HEAD") def test_clone_from_command_injection(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - temp_repo = Repo.init(tmp_dir / "repo") - unexpected_file = tmp_dir / "pwn" + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + temp_repo = Repo.init(tmp_dir / "repo") + unexpected_file = tmp_dir / "pwn" + + assert not unexpected_file.exists() + payload = f"--upload-pack=touch {unexpected_file}" + with self.assertRaises(GitCommandError): + rw_repo.clone_from(payload, temp_repo.common_dir) + + assert not unexpected_file.exists() + + def test_ignored_items_reported(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + temp_repo = Repo.init(tmp_dir / "repo") + + gi = tmp_dir / "repo" / ".gitignore" + + with open(gi, 'w') as file: + file.write('ignored_file.txt\n') + file.write('ignored_dir/\n') + + assert temp_repo.ignored(['included_file.txt', 'included_dir/file.txt']) == [] + assert temp_repo.ignored(['ignored_file.txt']) == ['ignored_file.txt'] + assert temp_repo.ignored(['included_file.txt', 'ignored_file.txt']) == ['ignored_file.txt'] + assert temp_repo.ignored(['included_file.txt', 'ignored_file.txt', 'included_dir/file.txt', 'ignored_dir/file.txt']) == ['ignored_file.txt', 'ignored_dir/file.txt'] + + def test_ignored_raises_error_w_symlink(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + temp_repo = Repo.init(tmp_dir / "repo") - assert not unexpected_file.exists() - payload = f"--upload-pack=touch {unexpected_file}" - with self.assertRaises(GitCommandError): - rw_repo.clone_from(payload, temp_repo.common_dir) + os.mkdir(tmp_dir / "target") + os.symlink(tmp_dir / "target", tmp_dir / "symlink") - assert not unexpected_file.exists() + with pytest.raises(GitCommandError): + temp_repo.ignored(tmp_dir / "symlink/file.txt")
\ No newline at end of file diff --git a/test/test_submodule.py b/test/test_submodule.py index 13878df2..98222641 100644 --- a/test/test_submodule.py +++ b/test/test_submodule.py @@ -1101,139 +1101,147 @@ class TestSubmodule(TestBase): @with_rw_repo("HEAD") def test_submodule_add_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - Submodule.add(rw_repo, "new", "new", url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Submodule.add(rw_repo, "new", "new", url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - Submodule.add(rw_repo, "new", "new", url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + Submodule.add(rw_repo, "new", "new", url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_options(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - Submodule.add(rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + Submodule.add(rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_options_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): - Submodule.add( - rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True - ) - assert not tmp_file.exists() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(GitCommandError): - Submodule.add( - rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True - ) + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + Submodule.add( + rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True + ) + assert not tmp_file.exists() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(GitCommandError): + Submodule.add( + rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True + ) @with_rw_repo("HEAD") def test_submodule_update_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) - with self.assertRaises(UnsafeProtocolError): - submodule.update() - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) + with self.assertRaises(UnsafeProtocolError): + submodule.update() + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - submodule.update(allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + submodule.update(allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_options(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - submodule.update(clone_multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + submodule.update(clone_multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_options_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): - submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) - assert not tmp_file.exists() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) - for unsafe_option in unsafe_options: - with self.assertRaises(GitCommandError): - submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) + assert not tmp_file.exists() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + with self.assertRaises(GitCommandError): + submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) diff --git a/test/test_util.py b/test/test_util.py index 90dd89a9..c17efce3 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -333,6 +333,27 @@ class TestUtils(TestBase): self.assertRaises(IndexError, ilist.__delitem__, 0) self.assertRaises(IndexError, ilist.__delitem__, "something") + def test_utctz_to_altz(self): + self.assertEqual(utctz_to_altz("+0000"), 0) + self.assertEqual(utctz_to_altz("+1400"), -(14 * 3600)) + self.assertEqual(utctz_to_altz("-1200"), 12 * 3600) + self.assertEqual(utctz_to_altz("+0001"), -60) + self.assertEqual(utctz_to_altz("+0530"), -(5 * 3600 + 1800)) + self.assertEqual(utctz_to_altz("-0930"), 9 * 3600 + 1800) + + def test_altz_to_utctz_str(self): + self.assertEqual(altz_to_utctz_str(0), "+0000") + self.assertEqual(altz_to_utctz_str(-(14 * 3600)), "+1400") + self.assertEqual(altz_to_utctz_str(12 * 3600), "-1200") + self.assertEqual(altz_to_utctz_str(-60), "+0001") + self.assertEqual(altz_to_utctz_str(-(5 * 3600 + 1800)), "+0530") + self.assertEqual(altz_to_utctz_str(9 * 3600 + 1800), "-0930") + + self.assertEqual(altz_to_utctz_str(1), "+0000") + self.assertEqual(altz_to_utctz_str(59), "+0000") + self.assertEqual(altz_to_utctz_str(-1), "+0000") + self.assertEqual(altz_to_utctz_str(-59), "+0000") + def test_from_timestamp(self): # Correct offset: UTC+2, should return datetime + tzoffset(+2) altz = utctz_to_altz("+0200") |
