summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-10-25 07:28:35 +0900
committerGitHub <noreply@github.com>2017-10-25 07:28:35 +0900
commit8b05ee8da50b4c7b832676f4e38f9d92a86639cc (patch)
tree91fe16e3c9aff44ca93633824b96da4b8ff19384 /test
parent4213d53d4ccfd239addc1db07b5b3913b4c6547c (diff)
downloadkafka-python-8b05ee8da50b4c7b832676f4e38f9d92a86639cc.tar.gz
Add DefaultRecordBatch implementation aka V2 message format parser/builder. (#1185)
Added bytecode optimization for varint and append/read_msg functions. Mostly based on avoiding LOAD_GLOBAL calls.
Diffstat (limited to 'test')
-rw-r--r--test/record/test_default_records.py169
-rw-r--r--test/record/test_records.py46
-rw-r--r--test/record/test_util.py95
-rw-r--r--test/test_fetcher.py2
-rw-r--r--test/test_producer.py9
5 files changed, 315 insertions, 6 deletions
diff --git a/test/record/test_default_records.py b/test/record/test_default_records.py
new file mode 100644
index 0000000..193703e
--- /dev/null
+++ b/test/record/test_default_records.py
@@ -0,0 +1,169 @@
+# -*- coding: utf-8 -*-
+from __future__ import unicode_literals
+import pytest
+from kafka.record.default_records import (
+ DefaultRecordBatch, DefaultRecordBatchBuilder
+)
+
+
+@pytest.mark.parametrize("compression_type", [
+ DefaultRecordBatch.CODEC_NONE,
+ DefaultRecordBatch.CODEC_GZIP,
+ DefaultRecordBatch.CODEC_SNAPPY,
+ DefaultRecordBatch.CODEC_LZ4
+])
+def test_read_write_serde_v2(compression_type):
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=compression_type, is_transactional=1,
+ producer_id=123456, producer_epoch=123, base_sequence=9999,
+ batch_size=999999)
+ headers = [] # [("header1", b"aaa"), ("header2", b"bbb")]
+ for offset in range(10):
+ builder.append(
+ offset, timestamp=9999999, key=b"test", value=b"Super",
+ headers=headers)
+ buffer = builder.build()
+ reader = DefaultRecordBatch(bytes(buffer))
+ msgs = list(reader)
+
+ assert reader.is_transactional is True
+ assert reader.compression_type == compression_type
+ assert reader.magic == 2
+ assert reader.timestamp_type == 0
+ assert reader.base_offset == 0
+ for offset, msg in enumerate(msgs):
+ assert msg.offset == offset
+ assert msg.timestamp == 9999999
+ assert msg.key == b"test"
+ assert msg.value == b"Super"
+ assert msg.headers == headers
+
+
+def test_written_bytes_equals_size_in_bytes_v2():
+ key = b"test"
+ value = b"Super"
+ headers = [("header1", b"aaa"), ("header2", b"bbb"), ("xx", None)]
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=0, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=999999)
+
+ size_in_bytes = builder.size_in_bytes(
+ 0, timestamp=9999999, key=key, value=value, headers=headers)
+
+ pos = builder.size()
+ meta = builder.append(
+ 0, timestamp=9999999, key=key, value=value, headers=headers)
+
+ assert builder.size() - pos == size_in_bytes
+ assert meta.size == size_in_bytes
+
+
+def test_estimate_size_in_bytes_bigger_than_batch_v2():
+ key = b"Super Key"
+ value = b"1" * 100
+ headers = [("header1", b"aaa"), ("header2", b"bbb")]
+ estimate_size = DefaultRecordBatchBuilder.estimate_size_in_bytes(
+ key, value, headers)
+
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=0, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=999999)
+ builder.append(
+ 0, timestamp=9999999, key=key, value=value, headers=headers)
+ buf = builder.build()
+ assert len(buf) <= estimate_size, \
+ "Estimate should always be upper bound"
+
+
+def test_default_batch_builder_validates_arguments():
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=0, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=999999)
+
+ # Key should not be str
+ with pytest.raises(TypeError):
+ builder.append(
+ 0, timestamp=9999999, key="some string", value=None, headers=[])
+
+ # Value should not be str
+ with pytest.raises(TypeError):
+ builder.append(
+ 0, timestamp=9999999, key=None, value="some string", headers=[])
+
+ # Timestamp should be of proper type
+ with pytest.raises(TypeError):
+ builder.append(
+ 0, timestamp="1243812793", key=None, value=b"some string",
+ headers=[])
+
+ # Offset of invalid type
+ with pytest.raises(TypeError):
+ builder.append(
+ "0", timestamp=9999999, key=None, value=b"some string", headers=[])
+
+ # Ok to pass value as None
+ builder.append(
+ 0, timestamp=9999999, key=b"123", value=None, headers=[])
+
+ # Timestamp can be None
+ builder.append(
+ 1, timestamp=None, key=None, value=b"some string", headers=[])
+
+ # Ok to pass offsets in not incremental order. This should not happen thou
+ builder.append(
+ 5, timestamp=9999999, key=b"123", value=None, headers=[])
+
+ # in case error handling code fails to fix inner buffer in builder
+ assert len(builder.build()) == 104
+
+
+def test_default_correct_metadata_response():
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=0, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=1024 * 1024)
+ meta = builder.append(
+ 0, timestamp=9999999, key=b"test", value=b"Super", headers=[])
+
+ assert meta.offset == 0
+ assert meta.timestamp == 9999999
+ assert meta.crc is None
+ assert meta.size == 16
+ assert repr(meta) == (
+ "DefaultRecordMetadata(offset=0, size={}, timestamp={})"
+ .format(meta.size, meta.timestamp)
+ )
+
+
+def test_default_batch_size_limit():
+ # First message can be added even if it's too big
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=0, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=1024)
+
+ meta = builder.append(
+ 0, timestamp=None, key=None, value=b"M" * 2000, headers=[])
+ assert meta.size > 0
+ assert meta.crc is None
+ assert meta.offset == 0
+ assert meta.timestamp is not None
+ assert len(builder.build()) > 2000
+
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=0, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=1024)
+ meta = builder.append(
+ 0, timestamp=None, key=None, value=b"M" * 700, headers=[])
+ assert meta is not None
+ meta = builder.append(
+ 1, timestamp=None, key=None, value=b"M" * 700, headers=[])
+ assert meta is None
+ meta = builder.append(
+ 2, timestamp=None, key=None, value=b"M" * 700, headers=[])
+ assert meta is None
+ assert len(builder.build()) < 1000
diff --git a/test/record/test_records.py b/test/record/test_records.py
index fc3eaca..7306bbc 100644
--- a/test/record/test_records.py
+++ b/test/record/test_records.py
@@ -2,6 +2,26 @@ import pytest
from kafka.record import MemoryRecords
from kafka.errors import CorruptRecordException
+# This is real live data from Kafka 11 broker
+record_batch_data_v2 = [
+ # First Batch value == "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x00\x00\x01\x02\x03'
+ b'\x18\xa2p\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff{\x06<\x00\x00\x01]'
+ b'\xff{\x06<\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00'
+ b'\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00',
+ # Second Batch value = "" and value = "". 2 records
+ b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00@\x00\x00\x00\x02\x02\xc8'
+ b'\\\xbd#\x00\x00\x00\x00\x00\x01\x00\x00\x01]\xff|\xddl\x00\x00\x01]\xff'
+ b'|\xde\x14\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00'
+ b'\x00\x00\x02\x0c\x00\x00\x00\x01\x00\x00\x0e\x00\xd0\x02\x02\x01\x00'
+ b'\x00',
+ # Third batch value = "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00;\x00\x00\x00\x02\x02.\x0b'
+ b'\x85\xb7\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff|\xe7\x9d\x00\x00\x01]'
+ b'\xff|\xe7\x9d\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
+ b'\x00\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00'
+]
+
record_batch_data_v1 = [
# First Message value == "123"
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19G\x86(\xc2\x01\x00\x00'
@@ -34,6 +54,32 @@ record_batch_data_v0 = [
]
+def test_memory_records_v2():
+ data_bytes = b"".join(record_batch_data_v2) + b"\x00" * 4
+ records = MemoryRecords(data_bytes)
+
+ assert records.size_in_bytes() == 222
+ assert records.valid_bytes() == 218
+
+ assert records.has_next() is True
+ batch = records.next_batch()
+ recs = list(batch)
+ assert len(recs) == 1
+ assert recs[0].value == b"123"
+ assert recs[0].key is None
+ assert recs[0].timestamp == 1503229838908
+ assert recs[0].timestamp_type == 0
+ assert recs[0].checksum is None
+ assert recs[0].headers == []
+
+ assert records.next_batch() is not None
+ assert records.next_batch() is not None
+
+ assert records.has_next() is False
+ assert records.next_batch() is None
+ assert records.next_batch() is None
+
+
def test_memory_records_v1():
data_bytes = b"".join(record_batch_data_v1) + b"\x00" * 4
records = MemoryRecords(data_bytes)
diff --git a/test/record/test_util.py b/test/record/test_util.py
new file mode 100644
index 0000000..bfe0fcc
--- /dev/null
+++ b/test/record/test_util.py
@@ -0,0 +1,95 @@
+import struct
+import pytest
+from kafka.record import util
+
+
+varint_data = [
+ (b"\x00", 0),
+ (b"\x01", -1),
+ (b"\x02", 1),
+ (b"\x7E", 63),
+ (b"\x7F", -64),
+ (b"\x80\x01", 64),
+ (b"\x81\x01", -65),
+ (b"\xFE\x7F", 8191),
+ (b"\xFF\x7F", -8192),
+ (b"\x80\x80\x01", 8192),
+ (b"\x81\x80\x01", -8193),
+ (b"\xFE\xFF\x7F", 1048575),
+ (b"\xFF\xFF\x7F", -1048576),
+ (b"\x80\x80\x80\x01", 1048576),
+ (b"\x81\x80\x80\x01", -1048577),
+ (b"\xFE\xFF\xFF\x7F", 134217727),
+ (b"\xFF\xFF\xFF\x7F", -134217728),
+ (b"\x80\x80\x80\x80\x01", 134217728),
+ (b"\x81\x80\x80\x80\x01", -134217729),
+ (b"\xFE\xFF\xFF\xFF\x7F", 17179869183),
+ (b"\xFF\xFF\xFF\xFF\x7F", -17179869184),
+ (b"\x80\x80\x80\x80\x80\x01", 17179869184),
+ (b"\x81\x80\x80\x80\x80\x01", -17179869185),
+ (b"\xFE\xFF\xFF\xFF\xFF\x7F", 2199023255551),
+ (b"\xFF\xFF\xFF\xFF\xFF\x7F", -2199023255552),
+ (b"\x80\x80\x80\x80\x80\x80\x01", 2199023255552),
+ (b"\x81\x80\x80\x80\x80\x80\x01", -2199023255553),
+ (b"\xFE\xFF\xFF\xFF\xFF\xFF\x7F", 281474976710655),
+ (b"\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -281474976710656),
+ (b"\x80\x80\x80\x80\x80\x80\x80\x01", 281474976710656),
+ (b"\x81\x80\x80\x80\x80\x80\x80\x01", -281474976710657),
+ (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 36028797018963967),
+ (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -36028797018963968),
+ (b"\x80\x80\x80\x80\x80\x80\x80\x80\x01", 36028797018963968),
+ (b"\x81\x80\x80\x80\x80\x80\x80\x80\x01", -36028797018963969),
+ (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 4611686018427387903),
+ (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -4611686018427387904),
+ (b"\x80\x80\x80\x80\x80\x80\x80\x80\x80\x01", 4611686018427387904),
+ (b"\x81\x80\x80\x80\x80\x80\x80\x80\x80\x01", -4611686018427387905),
+]
+
+
+@pytest.mark.parametrize("encoded, decoded", varint_data)
+def test_encode_varint(encoded, decoded):
+ res = bytearray()
+ util.encode_varint(decoded, res.append)
+ assert res == encoded
+
+
+@pytest.mark.parametrize("encoded, decoded", varint_data)
+def test_decode_varint(encoded, decoded):
+ # We add a bit of bytes around just to check position is calculated
+ # correctly
+ value, pos = util.decode_varint(
+ bytearray(b"\x01\xf0" + encoded + b"\xff\x01"), 2)
+ assert value == decoded
+ assert pos - 2 == len(encoded)
+
+
+@pytest.mark.parametrize("encoded, decoded", varint_data)
+def test_size_of_varint(encoded, decoded):
+ assert util.size_of_varint(decoded) == len(encoded)
+
+
+def test_crc32c():
+ def make_crc(data):
+ crc = util.calc_crc32c(data)
+ return struct.pack(">I", crc)
+ assert make_crc(b"") == b"\x00\x00\x00\x00"
+ assert make_crc(b"a") == b"\xc1\xd0\x43\x30"
+
+ # Took from librdkafka testcase
+ long_text = b"""\
+ This software is provided 'as-is', without any express or implied
+ warranty. In no event will the author be held liable for any damages
+ arising from the use of this software.
+
+ Permission is granted to anyone to use this software for any purpose,
+ including commercial applications, and to alter it and redistribute it
+ freely, subject to the following restrictions:
+
+ 1. The origin of this software must not be misrepresented; you must not
+ claim that you wrote the original software. If you use this software
+ in a product, an acknowledgment in the product documentation would be
+ appreciated but is not required.
+ 2. Altered source versions must be plainly marked as such, and must not be
+ misrepresented as being the original software.
+ 3. This notice may not be removed or altered from any source distribution."""
+ assert make_crc(long_text) == b"\x7d\xcd\xe1\x13"
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index 364a808..ef3f686 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -54,7 +54,7 @@ def _build_record_batch(msgs, compression=0):
magic=1, compression_type=0, batch_size=9999999)
for msg in msgs:
key, value, timestamp = msg
- builder.append(key=key, value=value, timestamp=timestamp)
+ builder.append(key=key, value=value, timestamp=timestamp, headers=[])
builder.close()
return builder.buffer()
diff --git a/test/test_producer.py b/test/test_producer.py
index 41bd52e..20dffc2 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -88,10 +88,7 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
retries=5,
max_block_ms=10000,
compression_type=compression)
- if producer.config['api_version'] >= (0, 10):
- magic = 1
- else:
- magic = 0
+ magic = producer._max_usable_produce_magic()
topic = random_string(5)
future = producer.send(
@@ -109,7 +106,9 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
else:
assert record.timestamp == -1 # NO_TIMESTAMP
- if magic == 1:
+ if magic >= 2:
+ assert record.checksum is None
+ elif magic == 1:
assert record.checksum == 1370034956
else:
assert record.checksum == 3296137851