diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-10-25 07:28:35 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-25 07:28:35 +0900 |
commit | 8b05ee8da50b4c7b832676f4e38f9d92a86639cc (patch) | |
tree | 91fe16e3c9aff44ca93633824b96da4b8ff19384 /test | |
parent | 4213d53d4ccfd239addc1db07b5b3913b4c6547c (diff) | |
download | kafka-python-8b05ee8da50b4c7b832676f4e38f9d92a86639cc.tar.gz |
Add DefaultRecordBatch implementation aka V2 message format parser/builder. (#1185)
Added bytecode optimization for varint and append/read_msg functions. Mostly based on avoiding LOAD_GLOBAL calls.
Diffstat (limited to 'test')
-rw-r--r-- | test/record/test_default_records.py | 169 | ||||
-rw-r--r-- | test/record/test_records.py | 46 | ||||
-rw-r--r-- | test/record/test_util.py | 95 | ||||
-rw-r--r-- | test/test_fetcher.py | 2 | ||||
-rw-r--r-- | test/test_producer.py | 9 |
5 files changed, 315 insertions, 6 deletions
diff --git a/test/record/test_default_records.py b/test/record/test_default_records.py new file mode 100644 index 0000000..193703e --- /dev/null +++ b/test/record/test_default_records.py @@ -0,0 +1,169 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals +import pytest +from kafka.record.default_records import ( + DefaultRecordBatch, DefaultRecordBatchBuilder +) + + +@pytest.mark.parametrize("compression_type", [ + DefaultRecordBatch.CODEC_NONE, + DefaultRecordBatch.CODEC_GZIP, + DefaultRecordBatch.CODEC_SNAPPY, + DefaultRecordBatch.CODEC_LZ4 +]) +def test_read_write_serde_v2(compression_type): + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=compression_type, is_transactional=1, + producer_id=123456, producer_epoch=123, base_sequence=9999, + batch_size=999999) + headers = [] # [("header1", b"aaa"), ("header2", b"bbb")] + for offset in range(10): + builder.append( + offset, timestamp=9999999, key=b"test", value=b"Super", + headers=headers) + buffer = builder.build() + reader = DefaultRecordBatch(bytes(buffer)) + msgs = list(reader) + + assert reader.is_transactional is True + assert reader.compression_type == compression_type + assert reader.magic == 2 + assert reader.timestamp_type == 0 + assert reader.base_offset == 0 + for offset, msg in enumerate(msgs): + assert msg.offset == offset + assert msg.timestamp == 9999999 + assert msg.key == b"test" + assert msg.value == b"Super" + assert msg.headers == headers + + +def test_written_bytes_equals_size_in_bytes_v2(): + key = b"test" + value = b"Super" + headers = [("header1", b"aaa"), ("header2", b"bbb"), ("xx", None)] + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=999999) + + size_in_bytes = builder.size_in_bytes( + 0, timestamp=9999999, key=key, value=value, headers=headers) + + pos = builder.size() + meta = builder.append( + 0, timestamp=9999999, key=key, value=value, headers=headers) + + assert builder.size() - pos == size_in_bytes + assert meta.size == size_in_bytes + + +def test_estimate_size_in_bytes_bigger_than_batch_v2(): + key = b"Super Key" + value = b"1" * 100 + headers = [("header1", b"aaa"), ("header2", b"bbb")] + estimate_size = DefaultRecordBatchBuilder.estimate_size_in_bytes( + key, value, headers) + + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=999999) + builder.append( + 0, timestamp=9999999, key=key, value=value, headers=headers) + buf = builder.build() + assert len(buf) <= estimate_size, \ + "Estimate should always be upper bound" + + +def test_default_batch_builder_validates_arguments(): + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=999999) + + # Key should not be str + with pytest.raises(TypeError): + builder.append( + 0, timestamp=9999999, key="some string", value=None, headers=[]) + + # Value should not be str + with pytest.raises(TypeError): + builder.append( + 0, timestamp=9999999, key=None, value="some string", headers=[]) + + # Timestamp should be of proper type + with pytest.raises(TypeError): + builder.append( + 0, timestamp="1243812793", key=None, value=b"some string", + headers=[]) + + # Offset of invalid type + with pytest.raises(TypeError): + builder.append( + "0", timestamp=9999999, key=None, value=b"some string", headers=[]) + + # Ok to pass value as None + builder.append( + 0, timestamp=9999999, key=b"123", value=None, headers=[]) + + # Timestamp can be None + builder.append( + 1, timestamp=None, key=None, value=b"some string", headers=[]) + + # Ok to pass offsets in not incremental order. This should not happen thou + builder.append( + 5, timestamp=9999999, key=b"123", value=None, headers=[]) + + # in case error handling code fails to fix inner buffer in builder + assert len(builder.build()) == 104 + + +def test_default_correct_metadata_response(): + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=1024 * 1024) + meta = builder.append( + 0, timestamp=9999999, key=b"test", value=b"Super", headers=[]) + + assert meta.offset == 0 + assert meta.timestamp == 9999999 + assert meta.crc is None + assert meta.size == 16 + assert repr(meta) == ( + "DefaultRecordMetadata(offset=0, size={}, timestamp={})" + .format(meta.size, meta.timestamp) + ) + + +def test_default_batch_size_limit(): + # First message can be added even if it's too big + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=1024) + + meta = builder.append( + 0, timestamp=None, key=None, value=b"M" * 2000, headers=[]) + assert meta.size > 0 + assert meta.crc is None + assert meta.offset == 0 + assert meta.timestamp is not None + assert len(builder.build()) > 2000 + + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=1024) + meta = builder.append( + 0, timestamp=None, key=None, value=b"M" * 700, headers=[]) + assert meta is not None + meta = builder.append( + 1, timestamp=None, key=None, value=b"M" * 700, headers=[]) + assert meta is None + meta = builder.append( + 2, timestamp=None, key=None, value=b"M" * 700, headers=[]) + assert meta is None + assert len(builder.build()) < 1000 diff --git a/test/record/test_records.py b/test/record/test_records.py index fc3eaca..7306bbc 100644 --- a/test/record/test_records.py +++ b/test/record/test_records.py @@ -2,6 +2,26 @@ import pytest from kafka.record import MemoryRecords from kafka.errors import CorruptRecordException +# This is real live data from Kafka 11 broker +record_batch_data_v2 = [ + # First Batch value == "123" + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x00\x00\x01\x02\x03' + b'\x18\xa2p\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff{\x06<\x00\x00\x01]' + b'\xff{\x06<\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00' + b'\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00', + # Second Batch value = "" and value = "". 2 records + b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00@\x00\x00\x00\x02\x02\xc8' + b'\\\xbd#\x00\x00\x00\x00\x00\x01\x00\x00\x01]\xff|\xddl\x00\x00\x01]\xff' + b'|\xde\x14\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00' + b'\x00\x00\x02\x0c\x00\x00\x00\x01\x00\x00\x0e\x00\xd0\x02\x02\x01\x00' + b'\x00', + # Third batch value = "123" + b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00;\x00\x00\x00\x02\x02.\x0b' + b'\x85\xb7\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff|\xe7\x9d\x00\x00\x01]' + b'\xff|\xe7\x9d\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff' + b'\x00\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00' +] + record_batch_data_v1 = [ # First Message value == "123" b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19G\x86(\xc2\x01\x00\x00' @@ -34,6 +54,32 @@ record_batch_data_v0 = [ ] +def test_memory_records_v2(): + data_bytes = b"".join(record_batch_data_v2) + b"\x00" * 4 + records = MemoryRecords(data_bytes) + + assert records.size_in_bytes() == 222 + assert records.valid_bytes() == 218 + + assert records.has_next() is True + batch = records.next_batch() + recs = list(batch) + assert len(recs) == 1 + assert recs[0].value == b"123" + assert recs[0].key is None + assert recs[0].timestamp == 1503229838908 + assert recs[0].timestamp_type == 0 + assert recs[0].checksum is None + assert recs[0].headers == [] + + assert records.next_batch() is not None + assert records.next_batch() is not None + + assert records.has_next() is False + assert records.next_batch() is None + assert records.next_batch() is None + + def test_memory_records_v1(): data_bytes = b"".join(record_batch_data_v1) + b"\x00" * 4 records = MemoryRecords(data_bytes) diff --git a/test/record/test_util.py b/test/record/test_util.py new file mode 100644 index 0000000..bfe0fcc --- /dev/null +++ b/test/record/test_util.py @@ -0,0 +1,95 @@ +import struct +import pytest +from kafka.record import util + + +varint_data = [ + (b"\x00", 0), + (b"\x01", -1), + (b"\x02", 1), + (b"\x7E", 63), + (b"\x7F", -64), + (b"\x80\x01", 64), + (b"\x81\x01", -65), + (b"\xFE\x7F", 8191), + (b"\xFF\x7F", -8192), + (b"\x80\x80\x01", 8192), + (b"\x81\x80\x01", -8193), + (b"\xFE\xFF\x7F", 1048575), + (b"\xFF\xFF\x7F", -1048576), + (b"\x80\x80\x80\x01", 1048576), + (b"\x81\x80\x80\x01", -1048577), + (b"\xFE\xFF\xFF\x7F", 134217727), + (b"\xFF\xFF\xFF\x7F", -134217728), + (b"\x80\x80\x80\x80\x01", 134217728), + (b"\x81\x80\x80\x80\x01", -134217729), + (b"\xFE\xFF\xFF\xFF\x7F", 17179869183), + (b"\xFF\xFF\xFF\xFF\x7F", -17179869184), + (b"\x80\x80\x80\x80\x80\x01", 17179869184), + (b"\x81\x80\x80\x80\x80\x01", -17179869185), + (b"\xFE\xFF\xFF\xFF\xFF\x7F", 2199023255551), + (b"\xFF\xFF\xFF\xFF\xFF\x7F", -2199023255552), + (b"\x80\x80\x80\x80\x80\x80\x01", 2199023255552), + (b"\x81\x80\x80\x80\x80\x80\x01", -2199023255553), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\x7F", 281474976710655), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -281474976710656), + (b"\x80\x80\x80\x80\x80\x80\x80\x01", 281474976710656), + (b"\x81\x80\x80\x80\x80\x80\x80\x01", -281474976710657), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 36028797018963967), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -36028797018963968), + (b"\x80\x80\x80\x80\x80\x80\x80\x80\x01", 36028797018963968), + (b"\x81\x80\x80\x80\x80\x80\x80\x80\x01", -36028797018963969), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 4611686018427387903), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -4611686018427387904), + (b"\x80\x80\x80\x80\x80\x80\x80\x80\x80\x01", 4611686018427387904), + (b"\x81\x80\x80\x80\x80\x80\x80\x80\x80\x01", -4611686018427387905), +] + + +@pytest.mark.parametrize("encoded, decoded", varint_data) +def test_encode_varint(encoded, decoded): + res = bytearray() + util.encode_varint(decoded, res.append) + assert res == encoded + + +@pytest.mark.parametrize("encoded, decoded", varint_data) +def test_decode_varint(encoded, decoded): + # We add a bit of bytes around just to check position is calculated + # correctly + value, pos = util.decode_varint( + bytearray(b"\x01\xf0" + encoded + b"\xff\x01"), 2) + assert value == decoded + assert pos - 2 == len(encoded) + + +@pytest.mark.parametrize("encoded, decoded", varint_data) +def test_size_of_varint(encoded, decoded): + assert util.size_of_varint(decoded) == len(encoded) + + +def test_crc32c(): + def make_crc(data): + crc = util.calc_crc32c(data) + return struct.pack(">I", crc) + assert make_crc(b"") == b"\x00\x00\x00\x00" + assert make_crc(b"a") == b"\xc1\xd0\x43\x30" + + # Took from librdkafka testcase + long_text = b"""\ + This software is provided 'as-is', without any express or implied + warranty. In no event will the author be held liable for any damages + arising from the use of this software. + + Permission is granted to anyone to use this software for any purpose, + including commercial applications, and to alter it and redistribute it + freely, subject to the following restrictions: + + 1. The origin of this software must not be misrepresented; you must not + claim that you wrote the original software. If you use this software + in a product, an acknowledgment in the product documentation would be + appreciated but is not required. + 2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original software. + 3. This notice may not be removed or altered from any source distribution.""" + assert make_crc(long_text) == b"\x7d\xcd\xe1\x13" diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 364a808..ef3f686 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -54,7 +54,7 @@ def _build_record_batch(msgs, compression=0): magic=1, compression_type=0, batch_size=9999999) for msg in msgs: key, value, timestamp = msg - builder.append(key=key, value=value, timestamp=timestamp) + builder.append(key=key, value=value, timestamp=timestamp, headers=[]) builder.close() return builder.buffer() diff --git a/test/test_producer.py b/test/test_producer.py index 41bd52e..20dffc2 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -88,10 +88,7 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): retries=5, max_block_ms=10000, compression_type=compression) - if producer.config['api_version'] >= (0, 10): - magic = 1 - else: - magic = 0 + magic = producer._max_usable_produce_magic() topic = random_string(5) future = producer.send( @@ -109,7 +106,9 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): else: assert record.timestamp == -1 # NO_TIMESTAMP - if magic == 1: + if magic >= 2: + assert record.checksum is None + elif magic == 1: assert record.checksum == 1370034956 else: assert record.checksum == 3296137851 |