summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--AUTHORS1
-rw-r--r--git/cmd.py53
-rw-r--r--git/exc.py8
-rw-r--r--git/objects/submodule/base.py36
-rw-r--r--git/remote.py70
-rw-r--r--git/repo/base.py48
-rw-r--r--test/test_git.py4
-rw-r--r--test/test_remote.py258
-rw-r--r--test/test_repo.py143
-rw-r--r--test/test_submodule.py152
10 files changed, 752 insertions, 21 deletions
diff --git a/AUTHORS b/AUTHORS
index 8f3f2ccf..8ccc09fc 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -50,4 +50,5 @@ Contributors are:
-Patrick Gerard
-Luke Twist <itsluketwist@gmail.com>
-Joseph Hale <me _at_ jhale.dev>
+-Santos Gallegos <stsewd _at_ proton.me>
Portions derived from other open source works and are clearly marked.
diff --git a/git/cmd.py b/git/cmd.py
index 3dd5aad3..9ef1e3a6 100644
--- a/git/cmd.py
+++ b/git/cmd.py
@@ -4,6 +4,7 @@
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
from __future__ import annotations
+import re
from contextlib import contextmanager
import io
import logging
@@ -24,7 +25,7 @@ from git.compat import (
from git.exc import CommandError
from git.util import is_cygwin_git, cygpath, expand_path, remove_password_if_present
-from .exc import GitCommandError, GitCommandNotFound
+from .exc import GitCommandError, GitCommandNotFound, UnsafeOptionError, UnsafeProtocolError
from .util import (
LazyMixin,
stream_copy,
@@ -262,6 +263,8 @@ class Git(LazyMixin):
_excluded_ = ("cat_file_all", "cat_file_header", "_version_info")
+ re_unsafe_protocol = re.compile("(.+)::.+")
+
def __getstate__(self) -> Dict[str, Any]:
return slots_to_dict(self, exclude=self._excluded_)
@@ -454,6 +457,48 @@ class Git(LazyMixin):
url = url.replace("\\\\", "\\").replace("\\", "/")
return url
+ @classmethod
+ def check_unsafe_protocols(cls, url: str) -> None:
+ """
+ Check for unsafe protocols.
+
+ Apart from the usual protocols (http, git, ssh),
+ Git allows "remote helpers" that have the form `<transport>::<address>`,
+ one of these helpers (`ext::`) can be used to invoke any arbitrary command.
+
+ See:
+
+ - https://git-scm.com/docs/gitremote-helpers
+ - https://git-scm.com/docs/git-remote-ext
+ """
+ match = cls.re_unsafe_protocol.match(url)
+ if match:
+ protocol = match.group(1)
+ raise UnsafeProtocolError(
+ f"The `{protocol}::` protocol looks suspicious, use `allow_unsafe_protocols=True` to allow it."
+ )
+
+ @classmethod
+ def check_unsafe_options(cls, options: List[str], unsafe_options: List[str]) -> None:
+ """
+ Check for unsafe options.
+
+ Some options that are passed to `git <command>` can be used to execute
+ arbitrary commands, this are blocked by default.
+ """
+ # Options can be of the form `foo` or `--foo bar` `--foo=bar`,
+ # so we need to check if they start with "--foo" or if they are equal to "foo".
+ bare_unsafe_options = [
+ option.lstrip("-")
+ for option in unsafe_options
+ ]
+ for option in options:
+ for unsafe_option, bare_option in zip(unsafe_options, bare_unsafe_options):
+ if option.startswith(unsafe_option) or option == bare_option:
+ raise UnsafeOptionError(
+ f"{unsafe_option} is not allowed, use `allow_unsafe_options=True` to allow it."
+ )
+
class AutoInterrupt(object):
"""Kill/Interrupt the stored process instance once this instance goes out of scope. It is
used to prevent processes piling up in case iterators stop reading.
@@ -1148,12 +1193,12 @@ class Git(LazyMixin):
return args
@classmethod
- def __unpack_args(cls, arg_list: Sequence[str]) -> List[str]:
+ def _unpack_args(cls, arg_list: Sequence[str]) -> List[str]:
outlist = []
if isinstance(arg_list, (list, tuple)):
for arg in arg_list:
- outlist.extend(cls.__unpack_args(arg))
+ outlist.extend(cls._unpack_args(arg))
else:
outlist.append(str(arg_list))
@@ -1238,7 +1283,7 @@ class Git(LazyMixin):
# Prepare the argument list
opt_args = self.transform_kwargs(**opts_kwargs)
- ext_args = self.__unpack_args([a for a in args if a is not None])
+ ext_args = self._unpack_args([a for a in args if a is not None])
if insert_after_this_arg is None:
args_list = opt_args + ext_args
diff --git a/git/exc.py b/git/exc.py
index 22fcde0d..9b69a588 100644
--- a/git/exc.py
+++ b/git/exc.py
@@ -37,6 +37,14 @@ class NoSuchPathError(GitError, OSError):
"""Thrown if a path could not be access by the system."""
+class UnsafeProtocolError(GitError):
+ """Thrown if unsafe protocols are passed without being explicitly allowed."""
+
+
+class UnsafeOptionError(GitError):
+ """Thrown if unsafe options are passed without being explicitly allowed."""
+
+
class CommandError(GitError):
"""Base class for exceptions thrown at every stage of `Popen()` execution.
diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py
index bdcdf1ec..9aa9deb2 100644
--- a/git/objects/submodule/base.py
+++ b/git/objects/submodule/base.py
@@ -272,7 +272,16 @@ class Submodule(IndexObject, TraversableIterableObj):
# end
@classmethod
- def _clone_repo(cls, repo: "Repo", url: str, path: PathLike, name: str, **kwargs: Any) -> "Repo":
+ def _clone_repo(
+ cls,
+ repo: "Repo",
+ url: str,
+ path: PathLike,
+ name: str,
+ allow_unsafe_options: bool = False,
+ allow_unsafe_protocols: bool = False,
+ **kwargs: Any,
+ ) -> "Repo":
""":return: Repo instance of newly cloned repository
:param repo: our parent repository
:param url: url to clone from
@@ -289,7 +298,13 @@ class Submodule(IndexObject, TraversableIterableObj):
module_checkout_path = osp.join(str(repo.working_tree_dir), path)
# end
- clone = git.Repo.clone_from(url, module_checkout_path, **kwargs)
+ clone = git.Repo.clone_from(
+ url,
+ module_checkout_path,
+ allow_unsafe_options=allow_unsafe_options,
+ allow_unsafe_protocols=allow_unsafe_protocols,
+ **kwargs,
+ )
if cls._need_gitfile_submodules(repo.git):
cls._write_git_file_and_module_config(module_checkout_path, module_abspath)
# end
@@ -359,6 +374,8 @@ class Submodule(IndexObject, TraversableIterableObj):
depth: Union[int, None] = None,
env: Union[Mapping[str, str], None] = None,
clone_multi_options: Union[Sequence[TBD], None] = None,
+ allow_unsafe_options: bool = False,
+ allow_unsafe_protocols: bool = False,
) -> "Submodule":
"""Add a new submodule to the given repository. This will alter the index
as well as the .gitmodules file, but will not create a new commit.
@@ -475,7 +492,16 @@ class Submodule(IndexObject, TraversableIterableObj):
kwargs["multi_options"] = clone_multi_options
# _clone_repo(cls, repo, url, path, name, **kwargs):
- mrepo = cls._clone_repo(repo, url, path, name, env=env, **kwargs)
+ mrepo = cls._clone_repo(
+ repo,
+ url,
+ path,
+ name,
+ env=env,
+ allow_unsafe_options=allow_unsafe_options,
+ allow_unsafe_protocols=allow_unsafe_protocols,
+ **kwargs,
+ )
# END verify url
## See #525 for ensuring git urls in config-files valid under Windows.
@@ -520,6 +546,8 @@ class Submodule(IndexObject, TraversableIterableObj):
keep_going: bool = False,
env: Union[Mapping[str, str], None] = None,
clone_multi_options: Union[Sequence[TBD], None] = None,
+ allow_unsafe_options: bool = False,
+ allow_unsafe_protocols: bool = False,
) -> "Submodule":
"""Update the repository of this submodule to point to the checkout
we point at with the binsha of this instance.
@@ -643,6 +671,8 @@ class Submodule(IndexObject, TraversableIterableObj):
n=True,
env=env,
multi_options=clone_multi_options,
+ allow_unsafe_options=allow_unsafe_options,
+ allow_unsafe_protocols=allow_unsafe_protocols,
)
# END handle dry-run
progress.update(
diff --git a/git/remote.py b/git/remote.py
index cd1a2c53..4240223e 100644
--- a/git/remote.py
+++ b/git/remote.py
@@ -539,6 +539,23 @@ class Remote(LazyMixin, IterableObj):
__slots__ = ("repo", "name", "_config_reader")
_id_attribute_ = "name"
+ unsafe_git_fetch_options = [
+ # This option allows users to execute arbitrary commands.
+ # https://git-scm.com/docs/git-fetch#Documentation/git-fetch.txt---upload-packltupload-packgt
+ "--upload-pack",
+ ]
+ unsafe_git_pull_options = [
+ # This option allows users to execute arbitrary commands.
+ # https://git-scm.com/docs/git-pull#Documentation/git-pull.txt---upload-packltupload-packgt
+ "--upload-pack"
+ ]
+ unsafe_git_push_options = [
+ # This option allows users to execute arbitrary commands.
+ # https://git-scm.com/docs/git-push#Documentation/git-push.txt---execltgit-receive-packgt
+ "--receive-pack",
+ "--exec",
+ ]
+
def __init__(self, repo: "Repo", name: str) -> None:
"""Initialize a remote instance
@@ -615,7 +632,9 @@ class Remote(LazyMixin, IterableObj):
yield Remote(repo, section[lbound + 1 : rbound])
# END for each configuration section
- def set_url(self, new_url: str, old_url: Optional[str] = None, **kwargs: Any) -> "Remote":
+ def set_url(
+ self, new_url: str, old_url: Optional[str] = None, allow_unsafe_protocols: bool = False, **kwargs: Any
+ ) -> "Remote":
"""Configure URLs on current remote (cf command git remote set_url)
This command manages URLs on the remote.
@@ -624,15 +643,17 @@ class Remote(LazyMixin, IterableObj):
:param old_url: when set, replaces this URL with new_url for the remote
:return: self
"""
+ if not allow_unsafe_protocols:
+ Git.check_unsafe_protocols(new_url)
scmd = "set-url"
kwargs["insert_kwargs_after"] = scmd
if old_url:
- self.repo.git.remote(scmd, self.name, new_url, old_url, **kwargs)
+ self.repo.git.remote(scmd, "--", self.name, new_url, old_url, **kwargs)
else:
- self.repo.git.remote(scmd, self.name, new_url, **kwargs)
+ self.repo.git.remote(scmd, "--", self.name, new_url, **kwargs)
return self
- def add_url(self, url: str, **kwargs: Any) -> "Remote":
+ def add_url(self, url: str, allow_unsafe_protocols: bool = False, **kwargs: Any) -> "Remote":
"""Adds a new url on current remote (special case of git remote set_url)
This command adds new URLs to a given remote, making it possible to have
@@ -641,7 +662,7 @@ class Remote(LazyMixin, IterableObj):
:param url: string being the URL to add as an extra remote URL
:return: self
"""
- return self.set_url(url, add=True)
+ return self.set_url(url, add=True, allow_unsafe_protocols=allow_unsafe_protocols)
def delete_url(self, url: str, **kwargs: Any) -> "Remote":
"""Deletes a new url on current remote (special case of git remote set_url)
@@ -733,7 +754,7 @@ class Remote(LazyMixin, IterableObj):
return out_refs
@classmethod
- def create(cls, repo: "Repo", name: str, url: str, **kwargs: Any) -> "Remote":
+ def create(cls, repo: "Repo", name: str, url: str, allow_unsafe_protocols: bool = False, **kwargs: Any) -> "Remote":
"""Create a new remote to the given repository
:param repo: Repository instance that is to receive the new remote
:param name: Desired name of the remote
@@ -743,7 +764,10 @@ class Remote(LazyMixin, IterableObj):
:raise GitCommandError: in case an origin with that name already exists"""
scmd = "add"
kwargs["insert_kwargs_after"] = scmd
- repo.git.remote(scmd, name, Git.polish_url(url), **kwargs)
+ url = Git.polish_url(url)
+ if not allow_unsafe_protocols:
+ Git.check_unsafe_protocols(url)
+ repo.git.remote(scmd, "--", name, url, **kwargs)
return cls(repo, name)
# add is an alias
@@ -925,6 +949,8 @@ class Remote(LazyMixin, IterableObj):
progress: Union[RemoteProgress, None, "UpdateProgress"] = None,
verbose: bool = True,
kill_after_timeout: Union[None, float] = None,
+ allow_unsafe_protocols: bool = False,
+ allow_unsafe_options: bool = False,
**kwargs: Any,
) -> IterableList[FetchInfo]:
"""Fetch the latest changes for this remote
@@ -967,6 +993,14 @@ class Remote(LazyMixin, IterableObj):
else:
args = [refspec]
+ if not allow_unsafe_protocols:
+ for ref in args:
+ if ref:
+ Git.check_unsafe_protocols(ref)
+
+ if not allow_unsafe_options:
+ Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_fetch_options)
+
proc = self.repo.git.fetch(
"--", self, *args, as_process=True, with_stdout=False, universal_newlines=True, v=verbose, **kwargs
)
@@ -980,6 +1014,8 @@ class Remote(LazyMixin, IterableObj):
refspec: Union[str, List[str], None] = None,
progress: Union[RemoteProgress, "UpdateProgress", None] = None,
kill_after_timeout: Union[None, float] = None,
+ allow_unsafe_protocols: bool = False,
+ allow_unsafe_options: bool = False,
**kwargs: Any,
) -> IterableList[FetchInfo]:
"""Pull changes from the given branch, being the same as a fetch followed
@@ -994,6 +1030,15 @@ class Remote(LazyMixin, IterableObj):
# No argument refspec, then ensure the repo's config has a fetch refspec.
self._assert_refspec()
kwargs = add_progress(kwargs, self.repo.git, progress)
+
+ refspec = Git._unpack_args(refspec or [])
+ if not allow_unsafe_protocols:
+ for ref in refspec:
+ Git.check_unsafe_protocols(ref)
+
+ if not allow_unsafe_options:
+ Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_pull_options)
+
proc = self.repo.git.pull(
"--", self, refspec, with_stdout=False, as_process=True, universal_newlines=True, v=True, **kwargs
)
@@ -1007,6 +1052,8 @@ class Remote(LazyMixin, IterableObj):
refspec: Union[str, List[str], None] = None,
progress: Union[RemoteProgress, "UpdateProgress", Callable[..., RemoteProgress], None] = None,
kill_after_timeout: Union[None, float] = None,
+ allow_unsafe_protocols: bool = False,
+ allow_unsafe_options: bool = False,
**kwargs: Any,
) -> PushInfoList:
"""Push changes from source branch in refspec to target branch in refspec.
@@ -1037,6 +1084,15 @@ class Remote(LazyMixin, IterableObj):
be 0.
Call ``.raise_if_error()`` on the returned object to raise on any failure."""
kwargs = add_progress(kwargs, self.repo.git, progress)
+
+ refspec = Git._unpack_args(refspec or [])
+ if not allow_unsafe_protocols:
+ for ref in refspec:
+ Git.check_unsafe_protocols(ref)
+
+ if not allow_unsafe_options:
+ Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_push_options)
+
proc = self.repo.git.push(
"--",
self,
diff --git a/git/repo/base.py b/git/repo/base.py
index 165df5fe..d4463f1e 100644
--- a/git/repo/base.py
+++ b/git/repo/base.py
@@ -21,7 +21,11 @@ from git.compat import (
)
from git.config import GitConfigParser
from git.db import GitCmdObjectDB
-from git.exc import InvalidGitRepositoryError, NoSuchPathError, GitCommandError
+from git.exc import (
+ GitCommandError,
+ InvalidGitRepositoryError,
+ NoSuchPathError,
+)
from git.index import IndexFile
from git.objects import Submodule, RootModule, Commit
from git.refs import HEAD, Head, Reference, TagReference
@@ -129,6 +133,18 @@ class Repo(object):
re_author_committer_start = re.compile(r"^(author|committer)")
re_tab_full_line = re.compile(r"^\t(.*)$")
+ unsafe_git_clone_options = [
+ # This option allows users to execute arbitrary commands.
+ # https://git-scm.com/docs/git-clone#Documentation/git-clone.txt---upload-packltupload-packgt
+ "--upload-pack",
+ "-u",
+ # Users can override configuration variables
+ # like `protocol.allow` or `core.gitProxy` to execute arbitrary commands.
+ # https://git-scm.com/docs/git-clone#Documentation/git-clone.txt---configltkeygtltvaluegt
+ "--config",
+ "-c",
+ ]
+
# invariants
# represents the configuration level of a configuration file
config_level: ConfigLevels_Tup = ("system", "user", "global", "repository")
@@ -955,7 +971,7 @@ class Repo(object):
file: str,
incremental: bool = False,
rev_opts: Optional[List[str]] = None,
- **kwargs: Any
+ **kwargs: Any,
) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None:
"""The blame information for the given file at the given revision.
@@ -1146,6 +1162,8 @@ class Repo(object):
odb_default_type: Type[GitCmdObjectDB],
progress: Union["RemoteProgress", "UpdateProgress", Callable[..., "RemoteProgress"], None] = None,
multi_options: Optional[List[str]] = None,
+ allow_unsafe_protocols: bool = False,
+ allow_unsafe_options: bool = False,
**kwargs: Any,
) -> "Repo":
odbt = kwargs.pop("odbt", odb_default_type)
@@ -1167,6 +1185,12 @@ class Repo(object):
multi = None
if multi_options:
multi = shlex.split(" ".join(multi_options))
+
+ if not allow_unsafe_protocols:
+ Git.check_unsafe_protocols(str(url))
+ if not allow_unsafe_options and multi_options:
+ Git.check_unsafe_options(options=multi_options, unsafe_options=cls.unsafe_git_clone_options)
+
proc = git.clone(
multi,
"--",
@@ -1220,6 +1244,8 @@ class Repo(object):
path: PathLike,
progress: Optional[Callable] = None,
multi_options: Optional[List[str]] = None,
+ allow_unsafe_protocols: bool = False,
+ allow_unsafe_options: bool = False,
**kwargs: Any,
) -> "Repo":
"""Create a clone from this repository.
@@ -1230,6 +1256,7 @@ class Repo(object):
option per list item which is passed exactly as specified to clone.
For example ['--config core.filemode=false', '--config core.ignorecase',
'--recurse-submodule=repo1_path', '--recurse-submodule=repo2_path']
+ :param unsafe_protocols: Allow unsafe protocols to be used, like ext
:param kwargs:
* odbt = ObjectDatabase Type, allowing to determine the object database
implementation used by the returned Repo instance
@@ -1243,6 +1270,8 @@ class Repo(object):
type(self.odb),
progress,
multi_options,
+ allow_unsafe_protocols=allow_unsafe_protocols,
+ allow_unsafe_options=allow_unsafe_options,
**kwargs,
)
@@ -1254,6 +1283,8 @@ class Repo(object):
progress: Optional[Callable] = None,
env: Optional[Mapping[str, str]] = None,
multi_options: Optional[List[str]] = None,
+ allow_unsafe_protocols: bool = False,
+ allow_unsafe_options: bool = False,
**kwargs: Any,
) -> "Repo":
"""Create a clone from the given URL
@@ -1268,12 +1299,23 @@ class Repo(object):
If you want to unset some variable, consider providing empty string
as its value.
:param multi_options: See ``clone`` method
+ :param unsafe_protocols: Allow unsafe protocols to be used, like ext
:param kwargs: see the ``clone`` method
:return: Repo instance pointing to the cloned directory"""
git = cls.GitCommandWrapperType(os.getcwd())
if env is not None:
git.update_environment(**env)
- return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs)
+ return cls._clone(
+ git,
+ url,
+ to_path,
+ GitCmdObjectDB,
+ progress,
+ multi_options,
+ allow_unsafe_protocols=allow_unsafe_protocols,
+ allow_unsafe_options=allow_unsafe_options,
+ **kwargs,
+ )
def archive(
self,
diff --git a/test/test_git.py b/test/test_git.py
index 6ba833b4..e7d236de 100644
--- a/test/test_git.py
+++ b/test/test_git.py
@@ -39,12 +39,12 @@ class TestGit(TestBase):
self.assertEqual(git.call_args, ((["git", "version"],), {}))
def test_call_unpack_args_unicode(self):
- args = Git._Git__unpack_args("Unicode€™")
+ args = Git._unpack_args("Unicode€™")
mangled_value = "Unicode\u20ac\u2122"
self.assertEqual(args, [mangled_value])
def test_call_unpack_args(self):
- args = Git._Git__unpack_args(["git", "log", "--", "Unicode€™"])
+ args = Git._unpack_args(["git", "log", "--", "Unicode€™"])
mangled_value = "Unicode\u20ac\u2122"
self.assertEqual(args, ["git", "log", "--", mangled_value])
diff --git a/test/test_remote.py b/test/test_remote.py
index 7df64c20..3a47afab 100644
--- a/test/test_remote.py
+++ b/test/test_remote.py
@@ -23,6 +23,8 @@ from git import (
GitCommandError,
)
from git.cmd import Git
+from pathlib import Path
+from git.exc import UnsafeOptionError, UnsafeProtocolError
from test.lib import (
TestBase,
with_rw_repo,
@@ -690,6 +692,262 @@ class TestRemote(TestBase):
with self.assertRaisesRegex(GitCommandError, "src refspec __BAD_REF__ does not match any"):
rem.push("__BAD_REF__")
+ @with_rw_repo("HEAD")
+ def test_set_unsafe_url(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ remote = rw_repo.remote("origin")
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::17/foo",
+ ]
+ for url in urls:
+ with self.assertRaises(UnsafeProtocolError):
+ remote.set_url(url)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_set_unsafe_url_allowed(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ remote = rw_repo.remote("origin")
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::17/foo",
+ ]
+ for url in urls:
+ remote.set_url(url, allow_unsafe_protocols=True)
+ assert list(remote.urls)[-1] == url
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_add_unsafe_url(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ remote = rw_repo.remote("origin")
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::17/foo",
+ ]
+ for url in urls:
+ with self.assertRaises(UnsafeProtocolError):
+ remote.add_url(url)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_add_unsafe_url_allowed(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ remote = rw_repo.remote("origin")
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::17/foo",
+ ]
+ for url in urls:
+ remote.add_url(url, allow_unsafe_protocols=True)
+ assert list(remote.urls)[-1] == url
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_create_remote_unsafe_url(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::17/foo",
+ ]
+ for url in urls:
+ with self.assertRaises(UnsafeProtocolError):
+ Remote.create(rw_repo, "origin", url)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_create_remote_unsafe_url_allowed(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::17/foo",
+ ]
+ for i, url in enumerate(urls):
+ remote = Remote.create(rw_repo, f"origin{i}", url, allow_unsafe_protocols=True)
+ assert remote.url == url
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_fetch_unsafe_url(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ remote = rw_repo.remote("origin")
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::17/foo",
+ ]
+ for url in urls:
+ with self.assertRaises(UnsafeProtocolError):
+ remote.fetch(url)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_fetch_unsafe_url_allowed(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ remote = rw_repo.remote("origin")
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::17/foo",
+ ]
+ for url in urls:
+ # The URL will be allowed into the command, but the command will
+ # fail since we don't have that protocol enabled in the Git config file.
+ with self.assertRaises(GitCommandError):
+ remote.fetch(url, allow_unsafe_protocols=True)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_fetch_unsafe_options(self, rw_repo):
+ remote = rw_repo.remote("origin")
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ unsafe_options = [{"upload-pack": f"touch {tmp_file}"}]
+ for unsafe_option in unsafe_options:
+ with self.assertRaises(UnsafeOptionError):
+ remote.fetch(**unsafe_option)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_fetch_unsafe_options_allowed(self, rw_repo):
+ remote = rw_repo.remote("origin")
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ unsafe_options = [{"upload-pack": f"touch {tmp_file}"}]
+ for unsafe_option in unsafe_options:
+ # The options will be allowed, but the command will fail.
+ assert not tmp_file.exists()
+ with self.assertRaises(GitCommandError):
+ remote.fetch(**unsafe_option, allow_unsafe_options=True)
+ assert tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_pull_unsafe_url(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ remote = rw_repo.remote("origin")
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::17/foo",
+ ]
+ for url in urls:
+ with self.assertRaises(UnsafeProtocolError):
+ remote.pull(url)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_pull_unsafe_url_allowed(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ remote = rw_repo.remote("origin")
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::17/foo",
+ ]
+ for url in urls:
+ # The URL will be allowed into the command, but the command will
+ # fail since we don't have that protocol enabled in the Git config file.
+ with self.assertRaises(GitCommandError):
+ remote.pull(url, allow_unsafe_protocols=True)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_pull_unsafe_options(self, rw_repo):
+ remote = rw_repo.remote("origin")
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ unsafe_options = [{"upload-pack": f"touch {tmp_file}"}]
+ for unsafe_option in unsafe_options:
+ with self.assertRaises(UnsafeOptionError):
+ remote.pull(**unsafe_option)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_pull_unsafe_options_allowed(self, rw_repo):
+ remote = rw_repo.remote("origin")
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ unsafe_options = [{"upload-pack": f"touch {tmp_file}"}]
+ for unsafe_option in unsafe_options:
+ # The options will be allowed, but the command will fail.
+ assert not tmp_file.exists()
+ with self.assertRaises(GitCommandError):
+ remote.pull(**unsafe_option, allow_unsafe_options=True)
+ assert tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_push_unsafe_url(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ remote = rw_repo.remote("origin")
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::17/foo",
+ ]
+ for url in urls:
+ with self.assertRaises(UnsafeProtocolError):
+ remote.push(url)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_push_unsafe_url_allowed(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ remote = rw_repo.remote("origin")
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::17/foo",
+ ]
+ for url in urls:
+ # The URL will be allowed into the command, but the command will
+ # fail since we don't have that protocol enabled in the Git config file.
+ with self.assertRaises(GitCommandError):
+ remote.push(url, allow_unsafe_protocols=True)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_push_unsafe_options(self, rw_repo):
+ remote = rw_repo.remote("origin")
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ unsafe_options = [
+ {
+ "receive-pack": f"touch {tmp_file}",
+ "exec": f"touch {tmp_file}",
+ }
+ ]
+ for unsafe_option in unsafe_options:
+ assert not tmp_file.exists()
+ with self.assertRaises(UnsafeOptionError):
+ remote.push(**unsafe_option)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_push_unsafe_options_allowed(self, rw_repo):
+ remote = rw_repo.remote("origin")
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ unsafe_options = [
+ {
+ "receive-pack": f"touch {tmp_file}",
+ "exec": f"touch {tmp_file}",
+ }
+ ]
+ for unsafe_option in unsafe_options:
+ # The options will be allowed, but the command will fail.
+ assert not tmp_file.exists()
+ with self.assertRaises(GitCommandError):
+ remote.push(**unsafe_option, allow_unsafe_options=True)
+ assert tmp_file.exists()
+ tmp_file.unlink()
+
class TestTimeouts(TestBase):
@with_rw_repo("HEAD", bare=False)
diff --git a/test/test_repo.py b/test/test_repo.py
index 6382db7e..5874dbe6 100644
--- a/test/test_repo.py
+++ b/test/test_repo.py
@@ -37,6 +37,8 @@ from git import (
)
from git.exc import (
BadObject,
+ UnsafeOptionError,
+ UnsafeProtocolError,
)
from git.repo.fun import touch
from test.lib import TestBase, with_rw_repo, fixture
@@ -223,6 +225,7 @@ class TestRepo(TestBase):
"--config submodule.repo.update=checkout",
"--config filter.lfs.clean='git-lfs clean -- %f'",
],
+ allow_unsafe_options=True,
)
self.assertEqual(cloned.config_reader().get_value("submodule", "active"), "repo")
@@ -264,6 +267,146 @@ class TestRepo(TestBase):
)
@with_rw_repo("HEAD")
+ def test_clone_unsafe_options(self, rw_repo):
+ tmp_dir = pathlib.Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ unsafe_options = [
+ f"--upload-pack='touch {tmp_file}'",
+ f"-u 'touch {tmp_file}'",
+ "--config=protocol.ext.allow=always",
+ "-c protocol.ext.allow=always",
+ ]
+ for unsafe_option in unsafe_options:
+ with self.assertRaises(UnsafeOptionError):
+ rw_repo.clone(tmp_dir, multi_options=[unsafe_option])
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_clone_unsafe_options_allowed(self, rw_repo):
+ tmp_dir = pathlib.Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ unsafe_options = [
+ f"--upload-pack='touch {tmp_file}'",
+ f"-u 'touch {tmp_file}'",
+ ]
+ for i, unsafe_option in enumerate(unsafe_options):
+ destination = tmp_dir / str(i)
+ assert not tmp_file.exists()
+ # The options will be allowed, but the command will fail.
+ with self.assertRaises(GitCommandError):
+ rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True)
+ assert tmp_file.exists()
+ tmp_file.unlink()
+
+ unsafe_options = [
+ "--config=protocol.ext.allow=always",
+ "-c protocol.ext.allow=always",
+ ]
+ for i, unsafe_option in enumerate(unsafe_options):
+ destination = tmp_dir / str(i)
+ assert not destination.exists()
+ rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True)
+ assert destination.exists()
+
+ @with_rw_repo("HEAD")
+ def test_clone_safe_options(self, rw_repo):
+ tmp_dir = pathlib.Path(tempfile.mkdtemp())
+ options = [
+ "--depth=1",
+ "--single-branch",
+ "-q",
+ ]
+ for option in options:
+ destination = tmp_dir / option
+ assert not destination.exists()
+ rw_repo.clone(destination, multi_options=[option])
+ assert destination.exists()
+
+ @with_rw_repo("HEAD")
+ def test_clone_from_unsafe_options(self, rw_repo):
+ tmp_dir = pathlib.Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ unsafe_options = [
+ f"--upload-pack='touch {tmp_file}'",
+ f"-u 'touch {tmp_file}'",
+ "--config=protocol.ext.allow=always",
+ "-c protocol.ext.allow=always",
+ ]
+ for unsafe_option in unsafe_options:
+ with self.assertRaises(UnsafeOptionError):
+ Repo.clone_from(rw_repo.working_dir, tmp_dir, multi_options=[unsafe_option])
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_clone_from_unsafe_options_allowed(self, rw_repo):
+ tmp_dir = pathlib.Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ unsafe_options = [
+ f"--upload-pack='touch {tmp_file}'",
+ f"-u 'touch {tmp_file}'",
+ ]
+ for i, unsafe_option in enumerate(unsafe_options):
+ destination = tmp_dir / str(i)
+ assert not tmp_file.exists()
+ # The options will be allowed, but the command will fail.
+ with self.assertRaises(GitCommandError):
+ Repo.clone_from(
+ rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True
+ )
+ assert tmp_file.exists()
+ tmp_file.unlink()
+
+ unsafe_options = [
+ "--config=protocol.ext.allow=always",
+ "-c protocol.ext.allow=always",
+ ]
+ for i, unsafe_option in enumerate(unsafe_options):
+ destination = tmp_dir / str(i)
+ assert not destination.exists()
+ Repo.clone_from(rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True)
+ assert destination.exists()
+
+ @with_rw_repo("HEAD")
+ def test_clone_from_safe_options(self, rw_repo):
+ tmp_dir = pathlib.Path(tempfile.mkdtemp())
+ options = [
+ "--depth=1",
+ "--single-branch",
+ "-q",
+ ]
+ for option in options:
+ destination = tmp_dir / option
+ assert not destination.exists()
+ Repo.clone_from(rw_repo.common_dir, destination, multi_options=[option])
+ assert destination.exists()
+
+ def test_clone_from_unsafe_procol(self):
+ tmp_dir = pathlib.Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::17/foo",
+ ]
+ for url in urls:
+ with self.assertRaises(UnsafeProtocolError):
+ Repo.clone_from(url, tmp_dir)
+ assert not tmp_file.exists()
+
+ def test_clone_from_unsafe_procol_allowed(self):
+ tmp_dir = pathlib.Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ urls = [
+ "ext::sh -c touch% /tmp/pwn",
+ "fd::/foo",
+ ]
+ for url in urls:
+ # The URL will be allowed into the command, but the command will
+ # fail since we don't have that protocol enabled in the Git config file.
+ with self.assertRaises(GitCommandError):
+ Repo.clone_from(url, tmp_dir, allow_unsafe_protocols=True)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
def test_max_chunk_size(self, repo):
class TestOutputStream(TestBase):
def __init__(self, max_chunk_size):
diff --git a/test/test_submodule.py b/test/test_submodule.py
index fef6bda3..13878df2 100644
--- a/test/test_submodule.py
+++ b/test/test_submodule.py
@@ -3,6 +3,8 @@
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
import os
import shutil
+import tempfile
+from pathlib import Path
import sys
from unittest import skipIf
@@ -12,7 +14,13 @@ import git
from git.cmd import Git
from git.compat import is_win
from git.config import GitConfigParser, cp
-from git.exc import InvalidGitRepositoryError, RepositoryDirtyError
+from git.exc import (
+ GitCommandError,
+ InvalidGitRepositoryError,
+ RepositoryDirtyError,
+ UnsafeOptionError,
+ UnsafeProtocolError,
+)
from git.objects.submodule.base import Submodule
from git.objects.submodule.root import RootModule, RootUpdateProgress
from git.repo.fun import find_submodule_git_dir, touch
@@ -1026,7 +1034,7 @@ class TestSubmodule(TestBase):
)
# Act
- sm.update(init=True, clone_multi_options=["--config core.eol=true"])
+ sm.update(init=True, clone_multi_options=["--config core.eol=true"], allow_unsafe_options=True)
# Assert
sm_config = GitConfigParser(file_or_files=osp.join(parent.git_dir, "modules", sm_name, "config"))
@@ -1070,6 +1078,7 @@ class TestSubmodule(TestBase):
sm_name,
url=self._small_repo_url(),
clone_multi_options=["--config core.eol=true"],
+ allow_unsafe_options=True,
)
# Assert
@@ -1089,3 +1098,142 @@ class TestSubmodule(TestBase):
sm_config = GitConfigParser(file_or_files=osp.join(parent.git_dir, "modules", sm_name, "config"))
with self.assertRaises(cp.NoOptionError):
sm_config.get_value("core", "eol")
+
+ @with_rw_repo("HEAD")
+ def test_submodule_add_unsafe_url(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::/foo",
+ ]
+ for url in urls:
+ with self.assertRaises(UnsafeProtocolError):
+ Submodule.add(rw_repo, "new", "new", url)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_submodule_add_unsafe_url_allowed(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::/foo",
+ ]
+ for url in urls:
+ # The URL will be allowed into the command, but the command will
+ # fail since we don't have that protocol enabled in the Git config file.
+ with self.assertRaises(GitCommandError):
+ Submodule.add(rw_repo, "new", "new", url, allow_unsafe_protocols=True)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_submodule_add_unsafe_options(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ unsafe_options = [
+ f"--upload-pack='touch {tmp_file}'",
+ f"-u 'touch {tmp_file}'",
+ "--config=protocol.ext.allow=always",
+ "-c protocol.ext.allow=always",
+ ]
+ for unsafe_option in unsafe_options:
+ with self.assertRaises(UnsafeOptionError):
+ Submodule.add(rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option])
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_submodule_add_unsafe_options_allowed(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ unsafe_options = [
+ f"--upload-pack='touch {tmp_file}'",
+ f"-u 'touch {tmp_file}'",
+ ]
+ for unsafe_option in unsafe_options:
+ # The options will be allowed, but the command will fail.
+ with self.assertRaises(GitCommandError):
+ Submodule.add(
+ rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True
+ )
+ assert not tmp_file.exists()
+
+ unsafe_options = [
+ "--config=protocol.ext.allow=always",
+ "-c protocol.ext.allow=always",
+ ]
+ for unsafe_option in unsafe_options:
+ with self.assertRaises(GitCommandError):
+ Submodule.add(
+ rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True
+ )
+
+ @with_rw_repo("HEAD")
+ def test_submodule_update_unsafe_url(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::/foo",
+ ]
+ for url in urls:
+ submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url)
+ with self.assertRaises(UnsafeProtocolError):
+ submodule.update()
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_submodule_update_unsafe_url_allowed(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ urls = [
+ f"ext::sh -c touch% {tmp_file}",
+ "fd::/foo",
+ ]
+ for url in urls:
+ submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url)
+ # The URL will be allowed into the command, but the command will
+ # fail since we don't have that protocol enabled in the Git config file.
+ with self.assertRaises(GitCommandError):
+ submodule.update(allow_unsafe_protocols=True)
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_submodule_update_unsafe_options(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ unsafe_options = [
+ f"--upload-pack='touch {tmp_file}'",
+ f"-u 'touch {tmp_file}'",
+ "--config=protocol.ext.allow=always",
+ "-c protocol.ext.allow=always",
+ ]
+ submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir))
+ for unsafe_option in unsafe_options:
+ with self.assertRaises(UnsafeOptionError):
+ submodule.update(clone_multi_options=[unsafe_option])
+ assert not tmp_file.exists()
+
+ @with_rw_repo("HEAD")
+ def test_submodule_update_unsafe_options_allowed(self, rw_repo):
+ tmp_dir = Path(tempfile.mkdtemp())
+ tmp_file = tmp_dir / "pwn"
+ unsafe_options = [
+ f"--upload-pack='touch {tmp_file}'",
+ f"-u 'touch {tmp_file}'",
+ ]
+ submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir))
+ for unsafe_option in unsafe_options:
+ # The options will be allowed, but the command will fail.
+ with self.assertRaises(GitCommandError):
+ submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True)
+ assert not tmp_file.exists()
+
+ unsafe_options = [
+ "--config=protocol.ext.allow=always",
+ "-c protocol.ext.allow=always",
+ ]
+ submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir))
+ for unsafe_option in unsafe_options:
+ with self.assertRaises(GitCommandError):
+ submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True)