summaryrefslogtreecommitdiff
path: root/rdflib/_networking.py
blob: 311096a891912795454e421bae0dd63ae8cb1990 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from __future__ import annotations

import string
import sys
from typing import Dict
from urllib.error import HTTPError
from urllib.parse import quote as urlquote
from urllib.parse import urljoin, urlsplit
from urllib.request import HTTPRedirectHandler, Request, urlopen
from urllib.response import addinfourl


def _make_redirect_request(request: Request, http_error: HTTPError) -> Request:
    """
    Create a new request object for a redirected request.

    The logic is based on `urllib.request.HTTPRedirectHandler` from `this commit <https://github.com/python/cpython/blob/b58bc8c2a9a316891a5ea1a0487aebfc86c2793a/Lib/urllib/request.py#L641-L751>_`.

    :param request: The original request that resulted in the redirect.
    :param http_error: The response to the original request that indicates a
        redirect should occur and contains the new location.
    :return: A new request object to the location indicated by the response.
    :raises HTTPError: the supplied ``http_error`` if the redirect request
        cannot be created.
    :raises ValueError: If the response code is `None`.
    :raises ValueError: If the response does not contain a ``Location`` header
        or the ``Location`` header is not a string.
    :raises HTTPError: If the scheme of the new location is not ``http``,
        ``https``, or ``ftp``.
    :raises HTTPError: If there are too many redirects or a redirect loop.
    """
    new_url = http_error.headers.get("Location")
    if new_url is None:
        raise http_error
    if not isinstance(new_url, str):
        raise ValueError(f"Location header {new_url!r} is not a string")

    new_url_parts = urlsplit(new_url)

    # For security reasons don't allow redirection to anything other than http,
    # https or ftp.
    if new_url_parts.scheme not in ("http", "https", "ftp", ""):
        raise HTTPError(
            new_url,
            http_error.code,
            f"{http_error.reason} - Redirection to url {new_url!r} is not allowed",
            http_error.headers,
            http_error.fp,
        )

    # http.client.parse_headers() decodes as ISO-8859-1.  Recover the original
    # bytes and percent-encode non-ASCII bytes, and any special characters such
    # as the space.
    new_url = urlquote(new_url, encoding="iso-8859-1", safe=string.punctuation)
    new_url = urljoin(request.full_url, new_url)

    # XXX Probably want to forget about the state of the current
    # request, although that might interact poorly with other
    # handlers that also use handler-specific request attributes
    content_headers = ("content-length", "content-type")
    newheaders = {
        k: v for k, v in request.headers.items() if k.lower() not in content_headers
    }
    new_request = Request(
        new_url,
        headers=newheaders,
        origin_req_host=request.origin_req_host,
        unverifiable=True,
    )

    visited: Dict[str, int]
    if hasattr(request, "redirect_dict"):
        visited = request.redirect_dict
        if (
            visited.get(new_url, 0) >= HTTPRedirectHandler.max_repeats
            or len(visited) >= HTTPRedirectHandler.max_redirections
        ):
            raise HTTPError(
                request.full_url,
                http_error.code,
                HTTPRedirectHandler.inf_msg + http_error.reason,
                http_error.headers,
                http_error.fp,
            )
    else:
        visited = {}
        setattr(request, "redirect_dict", visited)

    setattr(new_request, "redirect_dict", visited)
    visited[new_url] = visited.get(new_url, 0) + 1
    return new_request


def _urlopen(request: Request) -> addinfourl:
    """
    This is a shim for `urlopen` that handles HTTP redirects with status code
    308 (Permanent Redirect).

    This function should be removed once all supported versions of Python
    handles the 308 HTTP status code.

    :param request: The request to open.
    :return: The response to the request.
    """
    try:
        return urlopen(request)
    except HTTPError as error:
        if error.code == 308 and sys.version_info < (3, 11):
            # HTTP response code 308 (Permanent Redirect) is not supported by python
            # versions older than 3.11. See <https://bugs.python.org/issue40321> and
            # <https://github.com/python/cpython/issues/84501> for more details.
            # This custom error handling should be removed once all supported
            # versions of Python handles 308.
            new_request = _make_redirect_request(request, error)
            return _urlopen(new_request)
        else:
            raise