summaryrefslogtreecommitdiff
path: root/test/test_memoryview.py
blob: e1b63b8fb2441fbac6d731ccae9c064d8f12bada (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#!/usr/bin/env python
# coding: utf-8

from array import array
from msgpack import packb, unpackb
import sys


# For Python < 3:
#  - array type only supports old buffer interface
#  - array.frombytes is not available, must use deprecated array.fromstring
if sys.version_info[0] < 3:

    def make_memoryview(obj):
        return memoryview(buffer(obj))

    def make_array(f, data):
        a = array(f)
        a.fromstring(data)
        return a

    def get_data(a):
        return a.tostring()


else:
    make_memoryview = memoryview

    def make_array(f, data):
        a = array(f)
        a.frombytes(data)
        return a

    def get_data(a):
        return a.tobytes()


def _runtest(format, nbytes, expected_header, expected_prefix, use_bin_type):
    # create a new array
    original_array = array(format)
    original_array.fromlist([255] * (nbytes // original_array.itemsize))
    original_data = get_data(original_array)
    view = make_memoryview(original_array)

    # pack, unpack, and reconstruct array
    packed = packb(view, use_bin_type=use_bin_type)
    unpacked = unpackb(packed)
    reconstructed_array = make_array(format, unpacked)

    # check that we got the right amount of data
    assert len(original_data) == nbytes
    # check packed header
    assert packed[:1] == expected_header
    # check packed length prefix, if any
    assert packed[1 : 1 + len(expected_prefix)] == expected_prefix
    # check packed data
    assert packed[1 + len(expected_prefix) :] == original_data
    # check array unpacked correctly
    assert original_array == reconstructed_array


def test_fixstr_from_byte():
    _runtest("B", 1, b"\xa1", b"", False)
    _runtest("B", 31, b"\xbf", b"", False)


def test_fixstr_from_float():
    _runtest("f", 4, b"\xa4", b"", False)
    _runtest("f", 28, b"\xbc", b"", False)


def test_str16_from_byte():
    _runtest("B", 2 ** 8, b"\xda", b"\x01\x00", False)
    _runtest("B", 2 ** 16 - 1, b"\xda", b"\xff\xff", False)


def test_str16_from_float():
    _runtest("f", 2 ** 8, b"\xda", b"\x01\x00", False)
    _runtest("f", 2 ** 16 - 4, b"\xda", b"\xff\xfc", False)


def test_str32_from_byte():
    _runtest("B", 2 ** 16, b"\xdb", b"\x00\x01\x00\x00", False)


def test_str32_from_float():
    _runtest("f", 2 ** 16, b"\xdb", b"\x00\x01\x00\x00", False)


def test_bin8_from_byte():
    _runtest("B", 1, b"\xc4", b"\x01", True)
    _runtest("B", 2 ** 8 - 1, b"\xc4", b"\xff", True)


def test_bin8_from_float():
    _runtest("f", 4, b"\xc4", b"\x04", True)
    _runtest("f", 2 ** 8 - 4, b"\xc4", b"\xfc", True)


def test_bin16_from_byte():
    _runtest("B", 2 ** 8, b"\xc5", b"\x01\x00", True)
    _runtest("B", 2 ** 16 - 1, b"\xc5", b"\xff\xff", True)


def test_bin16_from_float():
    _runtest("f", 2 ** 8, b"\xc5", b"\x01\x00", True)
    _runtest("f", 2 ** 16 - 4, b"\xc5", b"\xff\xfc", True)


def test_bin32_from_byte():
    _runtest("B", 2 ** 16, b"\xc6", b"\x00\x01\x00\x00", True)


def test_bin32_from_float():
    _runtest("f", 2 ** 16, b"\xc6", b"\x00\x01\x00\x00", True)