1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
from io import BytesIO
import sys
from msgpack import Unpacker, packb, OutOfData, ExtType
from pytest import raises, mark
def test_unpack_array_header_from_file():
f = BytesIO(packb([1,2,3,4]))
unpacker = Unpacker(f)
assert unpacker.read_array_header() == 4
assert unpacker.unpack() == 1
assert unpacker.unpack() == 2
assert unpacker.unpack() == 3
assert unpacker.unpack() == 4
with raises(OutOfData):
unpacker.unpack()
@mark.skipif("not hasattr(sys, 'getrefcount') == True",
reason='sys.getrefcount() is needed to pass this test')
def test_unpacker_hook_refcnt():
result = []
def hook(x):
result.append(x)
return x
basecnt = sys.getrefcount(hook)
up = Unpacker(object_hook=hook, list_hook=hook)
assert sys.getrefcount(hook) >= basecnt + 2
up.feed(packb([{}]))
up.feed(packb([{}]))
assert up.unpack() == [{}]
assert up.unpack() == [{}]
assert result == [{}, [{}], {}, [{}]]
del up
assert sys.getrefcount(hook) == basecnt
def test_unpacker_ext_hook():
class MyUnpacker(Unpacker):
def __init__(self):
super(MyUnpacker, self).__init__(
ext_hook=self._hook, raw=False)
def _hook(self, code, data):
if code == 1:
return int(data)
else:
return ExtType(code, data)
unpacker = MyUnpacker()
unpacker.feed(packb({'a': 1}))
assert unpacker.unpack() == {'a': 1}
unpacker.feed(packb({'a': ExtType(1, b'123')}))
assert unpacker.unpack() == {'a': 123}
unpacker.feed(packb({'a': ExtType(2, b'321')}))
assert unpacker.unpack() == {'a': ExtType(2, b'321')}
if __name__ == '__main__':
test_unpack_array_header_from_file()
test_unpacker_hook_refcnt()
test_unpacker_ext_hook()
|