summaryrefslogtreecommitdiff
path: root/test/record/test_legacy_records.py
blob: 43970f7c9548e977d501995a6e16b7c27ff8d981 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from __future__ import unicode_literals
import pytest
from mock import patch
from kafka.record.legacy_records import (
    LegacyRecordBatch, LegacyRecordBatchBuilder
)
import kafka.codec
from kafka.errors import UnsupportedCodecError


@pytest.mark.parametrize("magic", [0, 1])
def test_read_write_serde_v0_v1_no_compression(magic):
    builder = LegacyRecordBatchBuilder(
        magic=magic, compression_type=0, batch_size=9999999)
    builder.append(
        0, timestamp=9999999, key=b"test", value=b"Super")
    buffer = builder.build()

    batch = LegacyRecordBatch(bytes(buffer), magic)
    msgs = list(batch)
    assert len(msgs) == 1
    msg = msgs[0]

    assert msg.offset == 0
    assert msg.timestamp == (9999999 if magic else None)
    assert msg.timestamp_type == (0 if magic else None)
    assert msg.key == b"test"
    assert msg.value == b"Super"
    assert msg.checksum == (-2095076219 if magic else 278251978) & 0xffffffff


@pytest.mark.parametrize("compression_type", [
    LegacyRecordBatch.CODEC_GZIP,
    LegacyRecordBatch.CODEC_SNAPPY,
    LegacyRecordBatch.CODEC_LZ4
])
@pytest.mark.parametrize("magic", [0, 1])
def test_read_write_serde_v0_v1_with_compression(compression_type, magic):
    builder = LegacyRecordBatchBuilder(
        magic=magic, compression_type=compression_type, batch_size=9999999)
    for offset in range(10):
        builder.append(
            offset, timestamp=9999999, key=b"test", value=b"Super")
    buffer = builder.build()

    batch = LegacyRecordBatch(bytes(buffer), magic)
    msgs = list(batch)

    for offset, msg in enumerate(msgs):
        assert msg.offset == offset
        assert msg.timestamp == (9999999 if magic else None)
        assert msg.timestamp_type == (0 if magic else None)
        assert msg.key == b"test"
        assert msg.value == b"Super"
        assert msg.checksum == (-2095076219 if magic else 278251978) & \
            0xffffffff


@pytest.mark.parametrize("magic", [0, 1])
def test_written_bytes_equals_size_in_bytes(magic):
    key = b"test"
    value = b"Super"
    builder = LegacyRecordBatchBuilder(
        magic=magic, compression_type=0, batch_size=9999999)

    size_in_bytes = builder.size_in_bytes(
        0, timestamp=9999999, key=key, value=value)

    pos = builder.size()
    builder.append(0, timestamp=9999999, key=key, value=value)

    assert builder.size() - pos == size_in_bytes


@pytest.mark.parametrize("magic", [0, 1])
def test_estimate_size_in_bytes_bigger_than_batch(magic):
    key = b"Super Key"
    value = b"1" * 100
    estimate_size = LegacyRecordBatchBuilder.estimate_size_in_bytes(
        magic, compression_type=0, key=key, value=value)

    builder = LegacyRecordBatchBuilder(
        magic=magic, compression_type=0, batch_size=9999999)
    builder.append(
        0, timestamp=9999999, key=key, value=value)
    buf = builder.build()
    assert len(buf) <= estimate_size, \
        "Estimate should always be upper bound"


@pytest.mark.parametrize("magic", [0, 1])
def test_legacy_batch_builder_validates_arguments(magic):
    builder = LegacyRecordBatchBuilder(
        magic=magic, compression_type=0, batch_size=1024 * 1024)

    # Key should not be str
    with pytest.raises(TypeError):
        builder.append(
            0, timestamp=9999999, key="some string", value=None)

    # Value should not be str
    with pytest.raises(TypeError):
        builder.append(
            0, timestamp=9999999, key=None, value="some string")

    # Timestamp should be of proper type
    if magic != 0:
        with pytest.raises(TypeError):
            builder.append(
                0, timestamp="1243812793", key=None, value=b"some string")

    # Offset of invalid type
    with pytest.raises(TypeError):
        builder.append(
            "0", timestamp=9999999, key=None, value=b"some string")

    # Ok to pass value as None
    builder.append(
        0, timestamp=9999999, key=b"123", value=None)

    # Timestamp can be None
    builder.append(
        1, timestamp=None, key=None, value=b"some string")

    # Ok to pass offsets in not incremental order. This should not happen thou
    builder.append(
        5, timestamp=9999999, key=b"123", value=None)

    # in case error handling code fails to fix inner buffer in builder
    assert len(builder.build()) == 119 if magic else 95


@pytest.mark.parametrize("magic", [0, 1])
def test_legacy_correct_metadata_response(magic):
    builder = LegacyRecordBatchBuilder(
        magic=magic, compression_type=0, batch_size=1024 * 1024)
    meta = builder.append(
        0, timestamp=9999999, key=b"test", value=b"Super")

    assert meta.offset == 0
    assert meta.timestamp == (9999999 if magic else -1)
    assert meta.crc == (-2095076219 if magic else 278251978) & 0xffffffff
    assert repr(meta) == (
        "LegacyRecordMetadata(offset=0, crc={!r}, size={}, "
        "timestamp={})".format(meta.crc, meta.size, meta.timestamp)
    )


@pytest.mark.parametrize("magic", [0, 1])
def test_legacy_batch_size_limit(magic):
    # First message can be added even if it's too big
    builder = LegacyRecordBatchBuilder(
        magic=magic, compression_type=0, batch_size=1024)
    meta = builder.append(0, timestamp=None, key=None, value=b"M" * 2000)
    assert meta.size > 0
    assert meta.crc is not None
    assert meta.offset == 0
    assert meta.timestamp is not None
    assert len(builder.build()) > 2000

    builder = LegacyRecordBatchBuilder(
        magic=magic, compression_type=0, batch_size=1024)
    meta = builder.append(0, timestamp=None, key=None, value=b"M" * 700)
    assert meta is not None
    meta = builder.append(1, timestamp=None, key=None, value=b"M" * 700)
    assert meta is None
    meta = builder.append(2, timestamp=None, key=None, value=b"M" * 700)
    assert meta is None
    assert len(builder.build()) < 1000


@pytest.mark.parametrize("compression_type,name,checker_name", [
    (LegacyRecordBatch.CODEC_GZIP, "gzip", "has_gzip"),
    (LegacyRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"),
    (LegacyRecordBatch.CODEC_LZ4, "lz4", "has_lz4")
])
@pytest.mark.parametrize("magic", [0, 1])
def test_unavailable_codec(magic, compression_type, name, checker_name):
    builder = LegacyRecordBatchBuilder(
        magic=magic, compression_type=compression_type, batch_size=1024)
    builder.append(0, timestamp=None, key=None, value=b"M")
    correct_buffer = builder.build()

    with patch.object(kafka.codec, checker_name) as mocked:
        mocked.return_value = False
        # Check that builder raises error
        builder = LegacyRecordBatchBuilder(
            magic=magic, compression_type=compression_type, batch_size=1024)
        error_msg = "Libraries for {} compression codec not found".format(name)
        with pytest.raises(UnsupportedCodecError, match=error_msg):
            builder.append(0, timestamp=None, key=None, value=b"M")
            builder.build()

        # Check that reader raises same error
        batch = LegacyRecordBatch(bytes(correct_buffer), magic)
        with pytest.raises(UnsupportedCodecError, match=error_msg):
            list(batch)