diff options
| author | Sebastian Thiel <sebastian.thiel@icloud.com> | 2022-12-29 08:09:28 +0100 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-12-29 08:09:28 +0100 | 
| commit | 678a8fe08dd466fcfe8676294b52887955138960 (patch) | |
| tree | e0cb96633dc1c1cf8c362e2097140d75f60447f6 | |
| parent | ae6a6e4b088a35c0fc7b17940722c8a515f7bee7 (diff) | |
| parent | f4f2658d5d308b3fb9162e50cd4c7b346e7a0a47 (diff) | |
| download | gitpython-678a8fe08dd466fcfe8676294b52887955138960.tar.gz | |
Merge pull request #1521 from stsewd/block-insecure-options
Block insecure options and protocols by default
| -rw-r--r-- | AUTHORS | 1 | ||||
| -rw-r--r-- | git/cmd.py | 53 | ||||
| -rw-r--r-- | git/exc.py | 8 | ||||
| -rw-r--r-- | git/objects/submodule/base.py | 36 | ||||
| -rw-r--r-- | git/remote.py | 70 | ||||
| -rw-r--r-- | git/repo/base.py | 48 | ||||
| -rw-r--r-- | test/test_git.py | 4 | ||||
| -rw-r--r-- | test/test_remote.py | 258 | ||||
| -rw-r--r-- | test/test_repo.py | 143 | ||||
| -rw-r--r-- | test/test_submodule.py | 152 | 
10 files changed, 752 insertions, 21 deletions
| @@ -50,4 +50,5 @@ Contributors are:  -Patrick Gerard  -Luke Twist <itsluketwist@gmail.com>  -Joseph Hale <me _at_ jhale.dev> +-Santos Gallegos <stsewd _at_ proton.me>  Portions derived from other open source works and are clearly marked. @@ -4,6 +4,7 @@  # This module is part of GitPython and is released under  # the BSD License: http://www.opensource.org/licenses/bsd-license.php  from __future__ import annotations +import re  from contextlib import contextmanager  import io  import logging @@ -24,7 +25,7 @@ from git.compat import (  from git.exc import CommandError  from git.util import is_cygwin_git, cygpath, expand_path, remove_password_if_present -from .exc import GitCommandError, GitCommandNotFound +from .exc import GitCommandError, GitCommandNotFound, UnsafeOptionError, UnsafeProtocolError  from .util import (      LazyMixin,      stream_copy, @@ -262,6 +263,8 @@ class Git(LazyMixin):      _excluded_ = ("cat_file_all", "cat_file_header", "_version_info") +    re_unsafe_protocol = re.compile("(.+)::.+") +      def __getstate__(self) -> Dict[str, Any]:          return slots_to_dict(self, exclude=self._excluded_) @@ -454,6 +457,48 @@ class Git(LazyMixin):              url = url.replace("\\\\", "\\").replace("\\", "/")          return url +    @classmethod +    def check_unsafe_protocols(cls, url: str) -> None: +        """ +        Check for unsafe protocols. + +        Apart from the usual protocols (http, git, ssh), +        Git allows "remote helpers" that have the form `<transport>::<address>`, +        one of these helpers (`ext::`) can be used to invoke any arbitrary command. + +        See: + +        - https://git-scm.com/docs/gitremote-helpers +        - https://git-scm.com/docs/git-remote-ext +        """ +        match = cls.re_unsafe_protocol.match(url) +        if match: +            protocol = match.group(1) +            raise UnsafeProtocolError( +                f"The `{protocol}::` protocol looks suspicious, use `allow_unsafe_protocols=True` to allow it." +            ) + +    @classmethod +    def check_unsafe_options(cls, options: List[str], unsafe_options: List[str]) -> None: +        """ +        Check for unsafe options. + +        Some options that are passed to `git <command>` can be used to execute +        arbitrary commands, this are blocked by default. +        """ +        # Options can be of the form `foo` or `--foo bar` `--foo=bar`, +        # so we need to check if they start with "--foo" or if they are equal to "foo". +        bare_unsafe_options = [ +            option.lstrip("-") +            for option in unsafe_options +        ] +        for option in options: +            for unsafe_option, bare_option in zip(unsafe_options, bare_unsafe_options): +                if option.startswith(unsafe_option) or option == bare_option: +                    raise UnsafeOptionError( +                        f"{unsafe_option} is not allowed, use `allow_unsafe_options=True` to allow it." +                    ) +      class AutoInterrupt(object):          """Kill/Interrupt the stored process instance once this instance goes out of scope. It is          used to prevent processes piling up in case iterators stop reading. @@ -1148,12 +1193,12 @@ class Git(LazyMixin):          return args      @classmethod -    def __unpack_args(cls, arg_list: Sequence[str]) -> List[str]: +    def _unpack_args(cls, arg_list: Sequence[str]) -> List[str]:          outlist = []          if isinstance(arg_list, (list, tuple)):              for arg in arg_list: -                outlist.extend(cls.__unpack_args(arg)) +                outlist.extend(cls._unpack_args(arg))          else:              outlist.append(str(arg_list)) @@ -1238,7 +1283,7 @@ class Git(LazyMixin):          # Prepare the argument list          opt_args = self.transform_kwargs(**opts_kwargs) -        ext_args = self.__unpack_args([a for a in args if a is not None]) +        ext_args = self._unpack_args([a for a in args if a is not None])          if insert_after_this_arg is None:              args_list = opt_args + ext_args @@ -37,6 +37,14 @@ class NoSuchPathError(GitError, OSError):      """Thrown if a path could not be access by the system.""" +class UnsafeProtocolError(GitError): +    """Thrown if unsafe protocols are passed without being explicitly allowed.""" + + +class UnsafeOptionError(GitError): +    """Thrown if unsafe options are passed without being explicitly allowed.""" + +  class CommandError(GitError):      """Base class for exceptions thrown at every stage of `Popen()` execution. diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py index bdcdf1ec..9aa9deb2 100644 --- a/git/objects/submodule/base.py +++ b/git/objects/submodule/base.py @@ -272,7 +272,16 @@ class Submodule(IndexObject, TraversableIterableObj):          # end      @classmethod -    def _clone_repo(cls, repo: "Repo", url: str, path: PathLike, name: str, **kwargs: Any) -> "Repo": +    def _clone_repo( +        cls, +        repo: "Repo", +        url: str, +        path: PathLike, +        name: str, +        allow_unsafe_options: bool = False, +        allow_unsafe_protocols: bool = False, +        **kwargs: Any, +    ) -> "Repo":          """:return: Repo instance of newly cloned repository          :param repo: our parent repository          :param url: url to clone from @@ -289,7 +298,13 @@ class Submodule(IndexObject, TraversableIterableObj):              module_checkout_path = osp.join(str(repo.working_tree_dir), path)          # end -        clone = git.Repo.clone_from(url, module_checkout_path, **kwargs) +        clone = git.Repo.clone_from( +            url, +            module_checkout_path, +            allow_unsafe_options=allow_unsafe_options, +            allow_unsafe_protocols=allow_unsafe_protocols, +            **kwargs, +        )          if cls._need_gitfile_submodules(repo.git):              cls._write_git_file_and_module_config(module_checkout_path, module_abspath)          # end @@ -359,6 +374,8 @@ class Submodule(IndexObject, TraversableIterableObj):          depth: Union[int, None] = None,          env: Union[Mapping[str, str], None] = None,          clone_multi_options: Union[Sequence[TBD], None] = None, +        allow_unsafe_options: bool = False, +        allow_unsafe_protocols: bool = False,      ) -> "Submodule":          """Add a new submodule to the given repository. This will alter the index          as well as the .gitmodules file, but will not create a new commit. @@ -475,7 +492,16 @@ class Submodule(IndexObject, TraversableIterableObj):                  kwargs["multi_options"] = clone_multi_options              # _clone_repo(cls, repo, url, path, name, **kwargs): -            mrepo = cls._clone_repo(repo, url, path, name, env=env, **kwargs) +            mrepo = cls._clone_repo( +                repo, +                url, +                path, +                name, +                env=env, +                allow_unsafe_options=allow_unsafe_options, +                allow_unsafe_protocols=allow_unsafe_protocols, +                **kwargs, +            )          # END verify url          ## See #525 for ensuring git urls in config-files valid under Windows. @@ -520,6 +546,8 @@ class Submodule(IndexObject, TraversableIterableObj):          keep_going: bool = False,          env: Union[Mapping[str, str], None] = None,          clone_multi_options: Union[Sequence[TBD], None] = None, +        allow_unsafe_options: bool = False, +        allow_unsafe_protocols: bool = False,      ) -> "Submodule":          """Update the repository of this submodule to point to the checkout          we point at with the binsha of this instance. @@ -643,6 +671,8 @@ class Submodule(IndexObject, TraversableIterableObj):                          n=True,                          env=env,                          multi_options=clone_multi_options, +                        allow_unsafe_options=allow_unsafe_options, +                        allow_unsafe_protocols=allow_unsafe_protocols,                      )                  # END handle dry-run                  progress.update( diff --git a/git/remote.py b/git/remote.py index cd1a2c53..4240223e 100644 --- a/git/remote.py +++ b/git/remote.py @@ -539,6 +539,23 @@ class Remote(LazyMixin, IterableObj):      __slots__ = ("repo", "name", "_config_reader")      _id_attribute_ = "name" +    unsafe_git_fetch_options = [ +        # This option allows users to execute arbitrary commands. +        # https://git-scm.com/docs/git-fetch#Documentation/git-fetch.txt---upload-packltupload-packgt +        "--upload-pack", +    ] +    unsafe_git_pull_options = [ +        # This option allows users to execute arbitrary commands. +        # https://git-scm.com/docs/git-pull#Documentation/git-pull.txt---upload-packltupload-packgt +        "--upload-pack" +    ] +    unsafe_git_push_options = [ +        # This option allows users to execute arbitrary commands. +        # https://git-scm.com/docs/git-push#Documentation/git-push.txt---execltgit-receive-packgt +        "--receive-pack", +        "--exec", +    ] +      def __init__(self, repo: "Repo", name: str) -> None:          """Initialize a remote instance @@ -615,7 +632,9 @@ class Remote(LazyMixin, IterableObj):              yield Remote(repo, section[lbound + 1 : rbound])          # END for each configuration section -    def set_url(self, new_url: str, old_url: Optional[str] = None, **kwargs: Any) -> "Remote": +    def set_url( +        self, new_url: str, old_url: Optional[str] = None, allow_unsafe_protocols: bool = False, **kwargs: Any +    ) -> "Remote":          """Configure URLs on current remote (cf command git remote set_url)          This command manages URLs on the remote. @@ -624,15 +643,17 @@ class Remote(LazyMixin, IterableObj):          :param old_url: when set, replaces this URL with new_url for the remote          :return: self          """ +        if not allow_unsafe_protocols: +            Git.check_unsafe_protocols(new_url)          scmd = "set-url"          kwargs["insert_kwargs_after"] = scmd          if old_url: -            self.repo.git.remote(scmd, self.name, new_url, old_url, **kwargs) +            self.repo.git.remote(scmd, "--", self.name, new_url, old_url, **kwargs)          else: -            self.repo.git.remote(scmd, self.name, new_url, **kwargs) +            self.repo.git.remote(scmd, "--", self.name, new_url, **kwargs)          return self -    def add_url(self, url: str, **kwargs: Any) -> "Remote": +    def add_url(self, url: str, allow_unsafe_protocols: bool = False, **kwargs: Any) -> "Remote":          """Adds a new url on current remote (special case of git remote set_url)          This command adds new URLs to a given remote, making it possible to have @@ -641,7 +662,7 @@ class Remote(LazyMixin, IterableObj):          :param url: string being the URL to add as an extra remote URL          :return: self          """ -        return self.set_url(url, add=True) +        return self.set_url(url, add=True, allow_unsafe_protocols=allow_unsafe_protocols)      def delete_url(self, url: str, **kwargs: Any) -> "Remote":          """Deletes a new url on current remote (special case of git remote set_url) @@ -733,7 +754,7 @@ class Remote(LazyMixin, IterableObj):          return out_refs      @classmethod -    def create(cls, repo: "Repo", name: str, url: str, **kwargs: Any) -> "Remote": +    def create(cls, repo: "Repo", name: str, url: str, allow_unsafe_protocols: bool = False, **kwargs: Any) -> "Remote":          """Create a new remote to the given repository          :param repo: Repository instance that is to receive the new remote          :param name: Desired name of the remote @@ -743,7 +764,10 @@ class Remote(LazyMixin, IterableObj):          :raise GitCommandError: in case an origin with that name already exists"""          scmd = "add"          kwargs["insert_kwargs_after"] = scmd -        repo.git.remote(scmd, name, Git.polish_url(url), **kwargs) +        url = Git.polish_url(url) +        if not allow_unsafe_protocols: +            Git.check_unsafe_protocols(url) +        repo.git.remote(scmd, "--", name, url, **kwargs)          return cls(repo, name)      # add is an alias @@ -925,6 +949,8 @@ class Remote(LazyMixin, IterableObj):          progress: Union[RemoteProgress, None, "UpdateProgress"] = None,          verbose: bool = True,          kill_after_timeout: Union[None, float] = None, +        allow_unsafe_protocols: bool = False, +        allow_unsafe_options: bool = False,          **kwargs: Any,      ) -> IterableList[FetchInfo]:          """Fetch the latest changes for this remote @@ -967,6 +993,14 @@ class Remote(LazyMixin, IterableObj):          else:              args = [refspec] +        if not allow_unsafe_protocols: +            for ref in args: +                if ref: +                    Git.check_unsafe_protocols(ref) + +        if not allow_unsafe_options: +            Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_fetch_options) +          proc = self.repo.git.fetch(              "--", self, *args, as_process=True, with_stdout=False, universal_newlines=True, v=verbose, **kwargs          ) @@ -980,6 +1014,8 @@ class Remote(LazyMixin, IterableObj):          refspec: Union[str, List[str], None] = None,          progress: Union[RemoteProgress, "UpdateProgress", None] = None,          kill_after_timeout: Union[None, float] = None, +        allow_unsafe_protocols: bool = False, +        allow_unsafe_options: bool = False,          **kwargs: Any,      ) -> IterableList[FetchInfo]:          """Pull changes from the given branch, being the same as a fetch followed @@ -994,6 +1030,15 @@ class Remote(LazyMixin, IterableObj):              # No argument refspec, then ensure the repo's config has a fetch refspec.              self._assert_refspec()          kwargs = add_progress(kwargs, self.repo.git, progress) + +        refspec = Git._unpack_args(refspec or []) +        if not allow_unsafe_protocols: +            for ref in refspec: +                Git.check_unsafe_protocols(ref) + +        if not allow_unsafe_options: +            Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_pull_options) +          proc = self.repo.git.pull(              "--", self, refspec, with_stdout=False, as_process=True, universal_newlines=True, v=True, **kwargs          ) @@ -1007,6 +1052,8 @@ class Remote(LazyMixin, IterableObj):          refspec: Union[str, List[str], None] = None,          progress: Union[RemoteProgress, "UpdateProgress", Callable[..., RemoteProgress], None] = None,          kill_after_timeout: Union[None, float] = None, +        allow_unsafe_protocols: bool = False, +        allow_unsafe_options: bool = False,          **kwargs: Any,      ) -> PushInfoList:          """Push changes from source branch in refspec to target branch in refspec. @@ -1037,6 +1084,15 @@ class Remote(LazyMixin, IterableObj):              be 0.              Call ``.raise_if_error()`` on the returned object to raise on any failure."""          kwargs = add_progress(kwargs, self.repo.git, progress) + +        refspec = Git._unpack_args(refspec or []) +        if not allow_unsafe_protocols: +            for ref in refspec: +                Git.check_unsafe_protocols(ref) + +        if not allow_unsafe_options: +            Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_push_options) +          proc = self.repo.git.push(              "--",              self, diff --git a/git/repo/base.py b/git/repo/base.py index 165df5fe..d4463f1e 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -21,7 +21,11 @@ from git.compat import (  )  from git.config import GitConfigParser  from git.db import GitCmdObjectDB -from git.exc import InvalidGitRepositoryError, NoSuchPathError, GitCommandError +from git.exc import ( +    GitCommandError, +    InvalidGitRepositoryError, +    NoSuchPathError, +)  from git.index import IndexFile  from git.objects import Submodule, RootModule, Commit  from git.refs import HEAD, Head, Reference, TagReference @@ -129,6 +133,18 @@ class Repo(object):      re_author_committer_start = re.compile(r"^(author|committer)")      re_tab_full_line = re.compile(r"^\t(.*)$") +    unsafe_git_clone_options = [ +        # This option allows users to execute arbitrary commands. +        # https://git-scm.com/docs/git-clone#Documentation/git-clone.txt---upload-packltupload-packgt +        "--upload-pack", +        "-u", +        # Users can override configuration variables +        # like `protocol.allow` or `core.gitProxy` to execute arbitrary commands. +        # https://git-scm.com/docs/git-clone#Documentation/git-clone.txt---configltkeygtltvaluegt +        "--config", +        "-c", +    ] +      # invariants      # represents the configuration level of a configuration file      config_level: ConfigLevels_Tup = ("system", "user", "global", "repository") @@ -955,7 +971,7 @@ class Repo(object):          file: str,          incremental: bool = False,          rev_opts: Optional[List[str]] = None, -        **kwargs: Any +        **kwargs: Any,      ) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None:          """The blame information for the given file at the given revision. @@ -1146,6 +1162,8 @@ class Repo(object):          odb_default_type: Type[GitCmdObjectDB],          progress: Union["RemoteProgress", "UpdateProgress", Callable[..., "RemoteProgress"], None] = None,          multi_options: Optional[List[str]] = None, +        allow_unsafe_protocols: bool = False, +        allow_unsafe_options: bool = False,          **kwargs: Any,      ) -> "Repo":          odbt = kwargs.pop("odbt", odb_default_type) @@ -1167,6 +1185,12 @@ class Repo(object):          multi = None          if multi_options:              multi = shlex.split(" ".join(multi_options)) + +        if not allow_unsafe_protocols: +            Git.check_unsafe_protocols(str(url)) +        if not allow_unsafe_options and multi_options: +            Git.check_unsafe_options(options=multi_options, unsafe_options=cls.unsafe_git_clone_options) +          proc = git.clone(              multi,              "--", @@ -1220,6 +1244,8 @@ class Repo(object):          path: PathLike,          progress: Optional[Callable] = None,          multi_options: Optional[List[str]] = None, +        allow_unsafe_protocols: bool = False, +        allow_unsafe_options: bool = False,          **kwargs: Any,      ) -> "Repo":          """Create a clone from this repository. @@ -1230,6 +1256,7 @@ class Repo(object):              option per list item which is passed exactly as specified to clone.              For example ['--config core.filemode=false', '--config core.ignorecase',              '--recurse-submodule=repo1_path', '--recurse-submodule=repo2_path'] +        :param unsafe_protocols: Allow unsafe protocols to be used, like ext          :param kwargs:              * odbt = ObjectDatabase Type, allowing to determine the object database                implementation used by the returned Repo instance @@ -1243,6 +1270,8 @@ class Repo(object):              type(self.odb),              progress,              multi_options, +            allow_unsafe_protocols=allow_unsafe_protocols, +            allow_unsafe_options=allow_unsafe_options,              **kwargs,          ) @@ -1254,6 +1283,8 @@ class Repo(object):          progress: Optional[Callable] = None,          env: Optional[Mapping[str, str]] = None,          multi_options: Optional[List[str]] = None, +        allow_unsafe_protocols: bool = False, +        allow_unsafe_options: bool = False,          **kwargs: Any,      ) -> "Repo":          """Create a clone from the given URL @@ -1268,12 +1299,23 @@ class Repo(object):              If you want to unset some variable, consider providing empty string              as its value.          :param multi_options: See ``clone`` method +        :param unsafe_protocols: Allow unsafe protocols to be used, like ext          :param kwargs: see the ``clone`` method          :return: Repo instance pointing to the cloned directory"""          git = cls.GitCommandWrapperType(os.getcwd())          if env is not None:              git.update_environment(**env) -        return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs) +        return cls._clone( +            git, +            url, +            to_path, +            GitCmdObjectDB, +            progress, +            multi_options, +            allow_unsafe_protocols=allow_unsafe_protocols, +            allow_unsafe_options=allow_unsafe_options, +            **kwargs, +        )      def archive(          self, diff --git a/test/test_git.py b/test/test_git.py index 6ba833b4..e7d236de 100644 --- a/test/test_git.py +++ b/test/test_git.py @@ -39,12 +39,12 @@ class TestGit(TestBase):          self.assertEqual(git.call_args, ((["git", "version"],), {}))      def test_call_unpack_args_unicode(self): -        args = Git._Git__unpack_args("Unicode€™") +        args = Git._unpack_args("Unicode€™")          mangled_value = "Unicode\u20ac\u2122"          self.assertEqual(args, [mangled_value])      def test_call_unpack_args(self): -        args = Git._Git__unpack_args(["git", "log", "--", "Unicode€™"]) +        args = Git._unpack_args(["git", "log", "--", "Unicode€™"])          mangled_value = "Unicode\u20ac\u2122"          self.assertEqual(args, ["git", "log", "--", mangled_value]) diff --git a/test/test_remote.py b/test/test_remote.py index 7df64c20..3a47afab 100644 --- a/test/test_remote.py +++ b/test/test_remote.py @@ -23,6 +23,8 @@ from git import (      GitCommandError,  )  from git.cmd import Git +from pathlib import Path +from git.exc import UnsafeOptionError, UnsafeProtocolError  from test.lib import (      TestBase,      with_rw_repo, @@ -690,6 +692,262 @@ class TestRemote(TestBase):          with self.assertRaisesRegex(GitCommandError, "src refspec __BAD_REF__ does not match any"):              rem.push("__BAD_REF__") +    @with_rw_repo("HEAD") +    def test_set_unsafe_url(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        remote = rw_repo.remote("origin") +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::17/foo", +        ] +        for url in urls: +            with self.assertRaises(UnsafeProtocolError): +                remote.set_url(url) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_set_unsafe_url_allowed(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        remote = rw_repo.remote("origin") +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::17/foo", +        ] +        for url in urls: +            remote.set_url(url, allow_unsafe_protocols=True) +            assert list(remote.urls)[-1] == url +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_add_unsafe_url(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        remote = rw_repo.remote("origin") +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::17/foo", +        ] +        for url in urls: +            with self.assertRaises(UnsafeProtocolError): +                remote.add_url(url) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_add_unsafe_url_allowed(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        remote = rw_repo.remote("origin") +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::17/foo", +        ] +        for url in urls: +            remote.add_url(url, allow_unsafe_protocols=True) +            assert list(remote.urls)[-1] == url +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_create_remote_unsafe_url(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::17/foo", +        ] +        for url in urls: +            with self.assertRaises(UnsafeProtocolError): +                Remote.create(rw_repo, "origin", url) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_create_remote_unsafe_url_allowed(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::17/foo", +        ] +        for i, url in enumerate(urls): +            remote = Remote.create(rw_repo, f"origin{i}", url, allow_unsafe_protocols=True) +            assert remote.url == url +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_fetch_unsafe_url(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        remote = rw_repo.remote("origin") +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::17/foo", +        ] +        for url in urls: +            with self.assertRaises(UnsafeProtocolError): +                remote.fetch(url) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_fetch_unsafe_url_allowed(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        remote = rw_repo.remote("origin") +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::17/foo", +        ] +        for url in urls: +            # The URL will be allowed into the command, but the command will +            # fail since we don't have that protocol enabled in the Git config file. +            with self.assertRaises(GitCommandError): +                remote.fetch(url, allow_unsafe_protocols=True) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_fetch_unsafe_options(self, rw_repo): +        remote = rw_repo.remote("origin") +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] +        for unsafe_option in unsafe_options: +            with self.assertRaises(UnsafeOptionError): +                remote.fetch(**unsafe_option) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_fetch_unsafe_options_allowed(self, rw_repo): +        remote = rw_repo.remote("origin") +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] +        for unsafe_option in unsafe_options: +            # The options will be allowed, but the command will fail. +            assert not tmp_file.exists() +            with self.assertRaises(GitCommandError): +                remote.fetch(**unsafe_option, allow_unsafe_options=True) +            assert tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_pull_unsafe_url(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        remote = rw_repo.remote("origin") +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::17/foo", +        ] +        for url in urls: +            with self.assertRaises(UnsafeProtocolError): +                remote.pull(url) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_pull_unsafe_url_allowed(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        remote = rw_repo.remote("origin") +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::17/foo", +        ] +        for url in urls: +            # The URL will be allowed into the command, but the command will +            # fail since we don't have that protocol enabled in the Git config file. +            with self.assertRaises(GitCommandError): +                remote.pull(url, allow_unsafe_protocols=True) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_pull_unsafe_options(self, rw_repo): +        remote = rw_repo.remote("origin") +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] +        for unsafe_option in unsafe_options: +            with self.assertRaises(UnsafeOptionError): +                remote.pull(**unsafe_option) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_pull_unsafe_options_allowed(self, rw_repo): +        remote = rw_repo.remote("origin") +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] +        for unsafe_option in unsafe_options: +            # The options will be allowed, but the command will fail. +            assert not tmp_file.exists() +            with self.assertRaises(GitCommandError): +                remote.pull(**unsafe_option, allow_unsafe_options=True) +            assert tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_push_unsafe_url(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        remote = rw_repo.remote("origin") +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::17/foo", +        ] +        for url in urls: +            with self.assertRaises(UnsafeProtocolError): +                remote.push(url) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_push_unsafe_url_allowed(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        remote = rw_repo.remote("origin") +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::17/foo", +        ] +        for url in urls: +            # The URL will be allowed into the command, but the command will +            # fail since we don't have that protocol enabled in the Git config file. +            with self.assertRaises(GitCommandError): +                remote.push(url, allow_unsafe_protocols=True) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_push_unsafe_options(self, rw_repo): +        remote = rw_repo.remote("origin") +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        unsafe_options = [ +            { +                "receive-pack": f"touch {tmp_file}", +                "exec": f"touch {tmp_file}", +            } +        ] +        for unsafe_option in unsafe_options: +            assert not tmp_file.exists() +            with self.assertRaises(UnsafeOptionError): +                remote.push(**unsafe_option) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_push_unsafe_options_allowed(self, rw_repo): +        remote = rw_repo.remote("origin") +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        unsafe_options = [ +            { +                "receive-pack": f"touch {tmp_file}", +                "exec": f"touch {tmp_file}", +            } +        ] +        for unsafe_option in unsafe_options: +            # The options will be allowed, but the command will fail. +            assert not tmp_file.exists() +            with self.assertRaises(GitCommandError): +                remote.push(**unsafe_option, allow_unsafe_options=True) +            assert tmp_file.exists() +            tmp_file.unlink() +  class TestTimeouts(TestBase):      @with_rw_repo("HEAD", bare=False) diff --git a/test/test_repo.py b/test/test_repo.py index 6382db7e..5874dbe6 100644 --- a/test/test_repo.py +++ b/test/test_repo.py @@ -37,6 +37,8 @@ from git import (  )  from git.exc import (      BadObject, +    UnsafeOptionError, +    UnsafeProtocolError,  )  from git.repo.fun import touch  from test.lib import TestBase, with_rw_repo, fixture @@ -223,6 +225,7 @@ class TestRepo(TestBase):                  "--config submodule.repo.update=checkout",                  "--config filter.lfs.clean='git-lfs clean -- %f'",              ], +            allow_unsafe_options=True,          )          self.assertEqual(cloned.config_reader().get_value("submodule", "active"), "repo") @@ -264,6 +267,146 @@ class TestRepo(TestBase):          )      @with_rw_repo("HEAD") +    def test_clone_unsafe_options(self, rw_repo): +        tmp_dir = pathlib.Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        unsafe_options = [ +            f"--upload-pack='touch {tmp_file}'", +            f"-u 'touch {tmp_file}'", +            "--config=protocol.ext.allow=always", +            "-c protocol.ext.allow=always", +        ] +        for unsafe_option in unsafe_options: +            with self.assertRaises(UnsafeOptionError): +                rw_repo.clone(tmp_dir, multi_options=[unsafe_option]) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_clone_unsafe_options_allowed(self, rw_repo): +        tmp_dir = pathlib.Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        unsafe_options = [ +            f"--upload-pack='touch {tmp_file}'", +            f"-u 'touch {tmp_file}'", +        ] +        for i, unsafe_option in enumerate(unsafe_options): +            destination = tmp_dir / str(i) +            assert not tmp_file.exists() +            # The options will be allowed, but the command will fail. +            with self.assertRaises(GitCommandError): +                rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) +            assert tmp_file.exists() +            tmp_file.unlink() + +        unsafe_options = [ +            "--config=protocol.ext.allow=always", +            "-c protocol.ext.allow=always", +        ] +        for i, unsafe_option in enumerate(unsafe_options): +            destination = tmp_dir / str(i) +            assert not destination.exists() +            rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) +            assert destination.exists() + +    @with_rw_repo("HEAD") +    def test_clone_safe_options(self, rw_repo): +        tmp_dir = pathlib.Path(tempfile.mkdtemp()) +        options = [ +            "--depth=1", +            "--single-branch", +            "-q", +        ] +        for option in options: +            destination = tmp_dir / option +            assert not destination.exists() +            rw_repo.clone(destination, multi_options=[option]) +            assert destination.exists() + +    @with_rw_repo("HEAD") +    def test_clone_from_unsafe_options(self, rw_repo): +        tmp_dir = pathlib.Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        unsafe_options = [ +            f"--upload-pack='touch {tmp_file}'", +            f"-u 'touch {tmp_file}'", +            "--config=protocol.ext.allow=always", +            "-c protocol.ext.allow=always", +        ] +        for unsafe_option in unsafe_options: +            with self.assertRaises(UnsafeOptionError): +                Repo.clone_from(rw_repo.working_dir, tmp_dir, multi_options=[unsafe_option]) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_clone_from_unsafe_options_allowed(self, rw_repo): +        tmp_dir = pathlib.Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        unsafe_options = [ +            f"--upload-pack='touch {tmp_file}'", +            f"-u 'touch {tmp_file}'", +        ] +        for i, unsafe_option in enumerate(unsafe_options): +            destination = tmp_dir / str(i) +            assert not tmp_file.exists() +            # The options will be allowed, but the command will fail. +            with self.assertRaises(GitCommandError): +                Repo.clone_from( +                    rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True +                ) +            assert tmp_file.exists() +            tmp_file.unlink() + +        unsafe_options = [ +            "--config=protocol.ext.allow=always", +            "-c protocol.ext.allow=always", +        ] +        for i, unsafe_option in enumerate(unsafe_options): +            destination = tmp_dir / str(i) +            assert not destination.exists() +            Repo.clone_from(rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True) +            assert destination.exists() + +    @with_rw_repo("HEAD") +    def test_clone_from_safe_options(self, rw_repo): +        tmp_dir = pathlib.Path(tempfile.mkdtemp()) +        options = [ +            "--depth=1", +            "--single-branch", +            "-q", +        ] +        for option in options: +            destination = tmp_dir / option +            assert not destination.exists() +            Repo.clone_from(rw_repo.common_dir, destination, multi_options=[option]) +            assert destination.exists() + +    def test_clone_from_unsafe_procol(self): +        tmp_dir = pathlib.Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::17/foo", +        ] +        for url in urls: +            with self.assertRaises(UnsafeProtocolError): +                Repo.clone_from(url, tmp_dir) +            assert not tmp_file.exists() + +    def test_clone_from_unsafe_procol_allowed(self): +        tmp_dir = pathlib.Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        urls = [ +            "ext::sh -c touch% /tmp/pwn", +            "fd::/foo", +        ] +        for url in urls: +            # The URL will be allowed into the command, but the command will +            # fail since we don't have that protocol enabled in the Git config file. +            with self.assertRaises(GitCommandError): +                Repo.clone_from(url, tmp_dir, allow_unsafe_protocols=True) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD")      def test_max_chunk_size(self, repo):          class TestOutputStream(TestBase):              def __init__(self, max_chunk_size): diff --git a/test/test_submodule.py b/test/test_submodule.py index fef6bda3..13878df2 100644 --- a/test/test_submodule.py +++ b/test/test_submodule.py @@ -3,6 +3,8 @@  # the BSD License: http://www.opensource.org/licenses/bsd-license.php  import os  import shutil +import tempfile +from pathlib import Path  import sys  from unittest import skipIf @@ -12,7 +14,13 @@ import git  from git.cmd import Git  from git.compat import is_win  from git.config import GitConfigParser, cp -from git.exc import InvalidGitRepositoryError, RepositoryDirtyError +from git.exc import ( +    GitCommandError, +    InvalidGitRepositoryError, +    RepositoryDirtyError, +    UnsafeOptionError, +    UnsafeProtocolError, +)  from git.objects.submodule.base import Submodule  from git.objects.submodule.root import RootModule, RootUpdateProgress  from git.repo.fun import find_submodule_git_dir, touch @@ -1026,7 +1034,7 @@ class TestSubmodule(TestBase):          )          # Act -        sm.update(init=True, clone_multi_options=["--config core.eol=true"]) +        sm.update(init=True, clone_multi_options=["--config core.eol=true"], allow_unsafe_options=True)          # Assert          sm_config = GitConfigParser(file_or_files=osp.join(parent.git_dir, "modules", sm_name, "config")) @@ -1070,6 +1078,7 @@ class TestSubmodule(TestBase):              sm_name,              url=self._small_repo_url(),              clone_multi_options=["--config core.eol=true"], +            allow_unsafe_options=True,          )          # Assert @@ -1089,3 +1098,142 @@ class TestSubmodule(TestBase):          sm_config = GitConfigParser(file_or_files=osp.join(parent.git_dir, "modules", sm_name, "config"))          with self.assertRaises(cp.NoOptionError):              sm_config.get_value("core", "eol") + +    @with_rw_repo("HEAD") +    def test_submodule_add_unsafe_url(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::/foo", +        ] +        for url in urls: +            with self.assertRaises(UnsafeProtocolError): +                Submodule.add(rw_repo, "new", "new", url) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_submodule_add_unsafe_url_allowed(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::/foo", +        ] +        for url in urls: +            # The URL will be allowed into the command, but the command will +            # fail since we don't have that protocol enabled in the Git config file. +            with self.assertRaises(GitCommandError): +                Submodule.add(rw_repo, "new", "new", url, allow_unsafe_protocols=True) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_submodule_add_unsafe_options(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        unsafe_options = [ +            f"--upload-pack='touch {tmp_file}'", +            f"-u 'touch {tmp_file}'", +            "--config=protocol.ext.allow=always", +            "-c protocol.ext.allow=always", +        ] +        for unsafe_option in unsafe_options: +            with self.assertRaises(UnsafeOptionError): +                Submodule.add(rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option]) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_submodule_add_unsafe_options_allowed(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        unsafe_options = [ +            f"--upload-pack='touch {tmp_file}'", +            f"-u 'touch {tmp_file}'", +        ] +        for unsafe_option in unsafe_options: +            # The options will be allowed, but the command will fail. +            with self.assertRaises(GitCommandError): +                Submodule.add( +                    rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True +                ) +            assert not tmp_file.exists() + +        unsafe_options = [ +            "--config=protocol.ext.allow=always", +            "-c protocol.ext.allow=always", +        ] +        for unsafe_option in unsafe_options: +            with self.assertRaises(GitCommandError): +                Submodule.add( +                    rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True +                ) + +    @with_rw_repo("HEAD") +    def test_submodule_update_unsafe_url(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::/foo", +        ] +        for url in urls: +            submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) +            with self.assertRaises(UnsafeProtocolError): +                submodule.update() +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_submodule_update_unsafe_url_allowed(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        urls = [ +            f"ext::sh -c touch% {tmp_file}", +            "fd::/foo", +        ] +        for url in urls: +            submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) +            # The URL will be allowed into the command, but the command will +            # fail since we don't have that protocol enabled in the Git config file. +            with self.assertRaises(GitCommandError): +                submodule.update(allow_unsafe_protocols=True) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_submodule_update_unsafe_options(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        unsafe_options = [ +            f"--upload-pack='touch {tmp_file}'", +            f"-u 'touch {tmp_file}'", +            "--config=protocol.ext.allow=always", +            "-c protocol.ext.allow=always", +        ] +        submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) +        for unsafe_option in unsafe_options: +            with self.assertRaises(UnsafeOptionError): +                submodule.update(clone_multi_options=[unsafe_option]) +            assert not tmp_file.exists() + +    @with_rw_repo("HEAD") +    def test_submodule_update_unsafe_options_allowed(self, rw_repo): +        tmp_dir = Path(tempfile.mkdtemp()) +        tmp_file = tmp_dir / "pwn" +        unsafe_options = [ +            f"--upload-pack='touch {tmp_file}'", +            f"-u 'touch {tmp_file}'", +        ] +        submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) +        for unsafe_option in unsafe_options: +            # The options will be allowed, but the command will fail. +            with self.assertRaises(GitCommandError): +                submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) +            assert not tmp_file.exists() + +        unsafe_options = [ +            "--config=protocol.ext.allow=always", +            "-c protocol.ext.allow=always", +        ] +        submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) +        for unsafe_option in unsafe_options: +            with self.assertRaises(GitCommandError): +                submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) | 
