| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
 | import struct
import pytest
from six.moves import xrange
from kafka.codec import (
    has_snappy, has_gzip, has_lz4,
    gzip_encode, gzip_decode,
    snappy_encode, snappy_decode,
    lz4_encode, lz4_decode,
    lz4_encode_old_kafka, lz4_decode_old_kafka,
)
from test.testutil import random_string
def test_gzip():
    for i in xrange(1000):
        b1 = random_string(100).encode('utf-8')
        b2 = gzip_decode(gzip_encode(b1))
        assert b1 == b2
@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
def test_snappy():
    for i in xrange(1000):
        b1 = random_string(100).encode('utf-8')
        b2 = snappy_decode(snappy_encode(b1))
        assert b1 == b2
@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
def test_snappy_detect_xerial():
    import kafka as kafka1
    _detect_xerial_stream = kafka1.codec._detect_xerial_stream
    header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
    false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
    default_snappy = snappy_encode(b'foobar' * 50)
    random_snappy = snappy_encode(b'SNAPPY' * 50, xerial_compatible=False)
    short_data = b'\x01\x02\x03\x04'
    assert _detect_xerial_stream(header) is True
    assert _detect_xerial_stream(b'') is False
    assert _detect_xerial_stream(b'\x00') is False
    assert _detect_xerial_stream(false_header) is False
    assert _detect_xerial_stream(default_snappy) is True
    assert _detect_xerial_stream(random_snappy) is False
    assert _detect_xerial_stream(short_data) is False
@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
def test_snappy_decode_xerial():
    header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
    random_snappy = snappy_encode(b'SNAPPY' * 50, xerial_compatible=False)
    block_len = len(random_snappy)
    random_snappy2 = snappy_encode(b'XERIAL' * 50, xerial_compatible=False)
    block_len2 = len(random_snappy2)
    to_test = header \
        + struct.pack('!i', block_len) + random_snappy \
        + struct.pack('!i', block_len2) + random_snappy2 \
    assert snappy_decode(to_test) == (b'SNAPPY' * 50) + (b'XERIAL' * 50)
@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
def test_snappy_encode_xerial():
    to_ensure = (
        b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
        b'\x00\x00\x00\x18'
        b'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
        b'\x00\x00\x00\x18'
        b'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
    )
    to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50)
    compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
    assert compressed == to_ensure
@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
def test_lz4():
    for i in xrange(1000):
        b1 = random_string(100).encode('utf-8')
        b2 = lz4_decode(lz4_encode(b1))
        assert len(b1) == len(b2)
        assert b1 == b2
@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
def test_lz4_old():
    for i in xrange(1000):
        b1 = random_string(100).encode('utf-8')
        b2 = lz4_decode_old_kafka(lz4_encode_old_kafka(b1))
        assert len(b1) == len(b2)
        assert b1 == b2
@pytest.mark.xfail(reason="lz4tools library doesnt support incremental decompression")
@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
def test_lz4_incremental():
    for i in xrange(1000):
        # lz4 max single block size is 4MB
        # make sure we test with multiple-blocks
        b1 = random_string(100).encode('utf-8') * 50000
        b2 = lz4_decode(lz4_encode(b1))
        assert len(b1) == len(b2)
        assert b1 == b2
 |