1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
from __future__ import annotations
from typing import Any, Callable, Dict, Iterator, Optional, Tuple, TYPE_CHECKING, Union
import requests
from requests.structures import CaseInsensitiveDict
from requests_toolbelt.multipart.encoder import MultipartEncoder # type: ignore
from . import protocol
class _StdoutStream:
def __call__(self, chunk: Any) -> None:
print(chunk)
class RequestsResponse(protocol.BackendResponse):
def __init__(self, response: requests.Response) -> None:
self._response: requests.Response = response
@property
def response(self) -> requests.Response:
return self._response
@property
def status_code(self) -> int:
return self._response.status_code
@property
def headers(self) -> CaseInsensitiveDict[str]:
return self._response.headers
@property
def content(self) -> bytes:
return self._response.content
@property
def reason(self) -> str:
return self._response.reason
def json(self) -> Any:
return self._response.json()
class RequestsBackend(protocol.Backend):
def __init__(self, session: Optional[requests.Session] = None) -> None:
self._client: requests.Session = session or requests.Session()
@property
def client(self) -> requests.Session:
return self._client
@staticmethod
def prepare_send_data(
files: Optional[Dict[str, Any]] = None,
post_data: Optional[Union[Dict[str, Any], bytes]] = None,
raw: bool = False,
) -> Tuple[
Optional[Union[Dict[str, Any], bytes]],
Optional[Union[Dict[str, Any], MultipartEncoder]],
str,
]:
if files:
if post_data is None:
post_data = {}
else:
# booleans does not exists for data (neither for MultipartEncoder):
# cast to string int to avoid: 'bool' object has no attribute 'encode'
if TYPE_CHECKING:
assert isinstance(post_data, dict)
for k, v in post_data.items():
if isinstance(v, bool):
post_data[k] = str(int(v))
post_data["file"] = files.get("file")
post_data["avatar"] = files.get("avatar")
data = MultipartEncoder(post_data)
return (None, data, data.content_type)
if raw and post_data:
return (None, post_data, "application/octet-stream")
return (post_data, None, "application/json")
@staticmethod
def response_content(
response: requests.Response,
streamed: bool,
action: Optional[Callable[[bytes], None]],
chunk_size: int,
*,
iterator: bool,
) -> Optional[Union[bytes, Iterator[Any]]]:
if iterator:
return response.iter_content(chunk_size=chunk_size)
if streamed is False:
return response.content
if action is None:
action = _StdoutStream()
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
action(chunk)
return None
def http_request(
self,
method: str,
url: str,
json: Optional[Union[Dict[str, Any], bytes]] = None,
data: Optional[Union[Dict[str, Any], MultipartEncoder]] = None,
params: Optional[Any] = None,
timeout: Optional[float] = None,
verify: Optional[Union[bool, str]] = True,
stream: Optional[bool] = False,
**kwargs: Any,
) -> RequestsResponse:
"""Make HTTP request
Args:
method: The HTTP method to call ('get', 'post', 'put', 'delete', etc.)
url: The full URL
data: The data to send to the server in the body of the request
json: Data to send in the body in json by default
timeout: The timeout, in seconds, for the request
verify: Whether SSL certificates should be validated. If
the value is a string, it is the path to a CA file used for
certificate validation.
stream: Whether the data should be streamed
Returns:
A requests Response object.
"""
response: requests.Response = self._client.request(
method=method,
url=url,
params=params,
data=data,
timeout=timeout,
stream=stream,
verify=verify,
json=json,
**kwargs,
)
return RequestsResponse(response=response)
|