1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
from __future__ import annotations
import signal
import threading
import time
import typing
from socket import socket, socketpair
from types import FrameType
import pytest
from urllib3.util.wait import (
_have_working_poll,
poll_wait_for_socket,
select_wait_for_socket,
wait_for_read,
wait_for_socket,
wait_for_write,
)
TYPE_SOCKET_PAIR = typing.Tuple[socket, socket]
TYPE_WAIT_FOR = typing.Callable[..., bool]
@pytest.fixture
def spair() -> typing.Generator[TYPE_SOCKET_PAIR, None, None]:
a, b = socketpair()
yield a, b
a.close()
b.close()
variants: list[TYPE_WAIT_FOR] = [wait_for_socket, select_wait_for_socket]
if _have_working_poll():
variants.append(poll_wait_for_socket)
@pytest.mark.parametrize("wfs", variants)
def test_wait_for_socket(wfs: TYPE_WAIT_FOR, spair: TYPE_SOCKET_PAIR) -> None:
a, b = spair
with pytest.raises(RuntimeError):
wfs(a, read=False, write=False)
assert not wfs(a, read=True, timeout=0)
assert wfs(a, write=True, timeout=0)
b.send(b"x")
assert wfs(a, read=True, timeout=0)
assert wfs(a, read=True, timeout=10)
assert wfs(a, read=True, timeout=None)
# Fill up the socket with data
a.setblocking(False)
try:
while True:
a.send(b"x" * 999999)
except OSError:
pass
# Now it's not writable anymore
assert not wfs(a, write=True, timeout=0)
# But if we ask for read-or-write, that succeeds
assert wfs(a, read=True, write=True, timeout=0)
# Unless we read from it
assert a.recv(1) == b"x"
assert not wfs(a, read=True, write=True, timeout=0)
# But if the remote peer closes the socket, then it becomes readable
b.close()
assert wfs(a, read=True, timeout=0)
# Waiting for a socket that's actually been closed is just a bug, and
# raises some kind of helpful exception (exact details depend on the
# platform).
with pytest.raises(Exception):
wfs(b, read=True)
def test_wait_for_read_write(spair: TYPE_SOCKET_PAIR) -> None:
a, b = spair
assert not wait_for_read(a, 0)
assert wait_for_write(a, 0)
b.send(b"x")
assert wait_for_read(a, 0)
assert wait_for_write(a, 0)
# Fill up the socket with data
a.setblocking(False)
try:
while True:
a.send(b"x" * 999999)
except OSError:
pass
# Now it's not writable anymore
assert not wait_for_write(a, 0)
@pytest.mark.skipif(not hasattr(signal, "setitimer"), reason="need setitimer() support")
@pytest.mark.parametrize("wfs", variants)
def test_eintr(wfs: TYPE_WAIT_FOR, spair: TYPE_SOCKET_PAIR) -> None:
a, b = spair
interrupt_count = [0]
def handler(sig: int, frame: FrameType | None) -> typing.Any:
assert sig == signal.SIGALRM
interrupt_count[0] += 1
old_handler = signal.signal(signal.SIGALRM, handler)
try:
assert not wfs(a, read=True, timeout=0)
start = time.monotonic()
try:
# Start delivering SIGALRM 10 times per second
signal.setitimer(signal.ITIMER_REAL, 0.1, 0.1)
# Sleep for 1 second (we hope!)
wfs(a, read=True, timeout=1)
finally:
# Stop delivering SIGALRM
signal.setitimer(signal.ITIMER_REAL, 0)
end = time.monotonic()
dur = end - start
assert 0.9 < dur < 3
finally:
signal.signal(signal.SIGALRM, old_handler)
assert interrupt_count[0] > 0
@pytest.mark.skipif(not hasattr(signal, "setitimer"), reason="need setitimer() support")
@pytest.mark.parametrize("wfs", variants)
def test_eintr_zero_timeout(wfs: TYPE_WAIT_FOR, spair: TYPE_SOCKET_PAIR) -> None:
a, b = spair
interrupt_count = [0]
def handler(sig: int, frame: FrameType | None) -> typing.Any:
assert sig == signal.SIGALRM
interrupt_count[0] += 1
old_handler = signal.signal(signal.SIGALRM, handler)
try:
assert not wfs(a, read=True, timeout=0)
try:
# Start delivering SIGALRM 1000 times per second,
# to trigger race conditions such as
# https://github.com/urllib3/urllib3/issues/1396.
signal.setitimer(signal.ITIMER_REAL, 0.001, 0.001)
# Hammer the system call for a while to trigger the
# race.
end = time.monotonic() + 5
for i in range(100000):
wfs(a, read=True, timeout=0)
if time.monotonic() >= end:
break
finally:
# Stop delivering SIGALRM
signal.setitimer(signal.ITIMER_REAL, 0)
finally:
signal.signal(signal.SIGALRM, old_handler)
assert interrupt_count[0] > 0
@pytest.mark.skipif(not hasattr(signal, "setitimer"), reason="need setitimer() support")
@pytest.mark.parametrize("wfs", variants)
def test_eintr_infinite_timeout(wfs: TYPE_WAIT_FOR, spair: TYPE_SOCKET_PAIR) -> None:
a, b = spair
interrupt_count = [0]
def handler(sig: int, frame: FrameType | None) -> typing.Any:
assert sig == signal.SIGALRM
interrupt_count[0] += 1
def make_a_readable_after_one_second() -> None:
time.sleep(1)
b.send(b"x")
old_handler = signal.signal(signal.SIGALRM, handler)
try:
assert not wfs(a, read=True, timeout=0)
start = time.monotonic()
try:
# Start delivering SIGALRM 10 times per second
signal.setitimer(signal.ITIMER_REAL, 0.1, 0.1)
# Sleep for 1 second (we hope!)
thread = threading.Thread(target=make_a_readable_after_one_second)
thread.start()
wfs(a, read=True)
finally:
# Stop delivering SIGALRM
signal.setitimer(signal.ITIMER_REAL, 0)
thread.join()
end = time.monotonic()
dur = end - start
assert 0.9 < dur < 3
finally:
signal.signal(signal.SIGALRM, old_handler)
assert interrupt_count[0] > 0
|