blob: 17c709e92c2a25b5445930f1a2a55f05c734fa61 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
from eventlet import spawn, sleep, with_timeout
from eventlet.event import Event
import eventlet
from tests import LimitedTestCase
DELAY = 0.01
class TestEvent(LimitedTestCase):
def test_send_exc(self):
log = []
e = Event()
def waiter():
try:
result = e.wait()
log.append(('received', result))
except Exception as ex:
log.append(('catched', ex))
spawn(waiter)
sleep(0) # let waiter to block on e.wait()
obj = Exception()
e.send(exc=obj)
sleep(0)
sleep(0)
assert log == [('catched', obj)], log
def test_send(self):
event1 = Event()
event2 = Event()
spawn(event1.send, 'hello event1')
eventlet.Timeout(0, ValueError('interrupted'))
try:
result = event1.wait()
except ValueError:
X = object()
result = with_timeout(DELAY, event2.wait, timeout_value=X)
assert result is X, 'Nobody sent anything to event2 yet it received %r' % (result, )
|