1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
"""Channel testing"""
from test.testlib import *
from git.async.channel import *
import time
class TestChannels(TestBase):
def test_base(self):
# creating channel yields a write and a read channal
wc, rc = mkchannel()
assert isinstance(wc, Writer) # default args
assert isinstance(rc, Reader)
# TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO
item = 1
item2 = 2
wc.write(item)
wc.write(item2)
# read all - it blocks as its still open for writing
to = 0.2
st = time.time()
assert rc.read(timeout=to) == [item, item2]
assert time.time() - st >= to
# next read blocks. it waits a second
st = time.time()
assert len(rc.read(1, True, to)) == 0
assert time.time() - st >= to
# writing to a closed channel raises
assert not wc.closed()
wc.close()
assert wc.closed()
wc.close() # fine
assert wc.closed()
self.failUnlessRaises(ReadOnly, wc.write, 1)
# reading from a closed channel never blocks
assert len(rc.read()) == 0
assert len(rc.read(5)) == 0
assert len(rc.read(1)) == 0
# test callback channels
wc, rc = mkchannel(wtype = CallbackWriter, rtype = CallbackReader)
cb = [0, 0] # set slots to one if called
def pre_write(item):
cb[0] = 1
return item + 1
def pre_read(count):
cb[1] = 1
# set, verify it returns previous one
assert wc.set_pre_cb(pre_write) is None
assert rc.set_pre_cb(pre_read) is None
assert wc.set_pre_cb(pre_write) is pre_write
assert rc.set_pre_cb(pre_read) is pre_read
# writer transforms input
val = 5
wc.write(val)
assert cb[0] == 1 and cb[1] == 0
rval = rc.read(1)[0] # read one item, must not block
assert cb[0] == 1 and cb[1] == 1
assert rval == val + 1
|