summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorTaras <voyn1991@gmail.com>2018-03-19 00:09:29 +0200
committerDana Powers <dana.powers@gmail.com>2018-04-18 13:41:14 -0700
commitd9e41c8e8fb7033a3e9a9a7654bc2b0125f337a0 (patch)
tree1e2c0f300a8a8981c1e170505899a205f67308e5 /test
parent1c71dfc3c321372c808f45f569ae41352f420e8f (diff)
downloadkafka-python-d9e41c8e8fb7033a3e9a9a7654bc2b0125f337a0.tar.gz
Fix MemoryRecord bugs re error handling and add test coverage (#1448)
Diffstat (limited to 'test')
-rw-r--r--test/record/test_records.py69
1 files changed, 68 insertions, 1 deletions
diff --git a/test/record/test_records.py b/test/record/test_records.py
index 7306bbc..224989f 100644
--- a/test/record/test_records.py
+++ b/test/record/test_records.py
@@ -1,5 +1,7 @@
+# -*- coding: utf-8 -*-
+from __future__ import unicode_literals
import pytest
-from kafka.record import MemoryRecords
+from kafka.record import MemoryRecords, MemoryRecordsBuilder
from kafka.errors import CorruptRecordException
# This is real live data from Kafka 11 broker
@@ -152,3 +154,68 @@ def test_memory_records_corrupt():
)
with pytest.raises(CorruptRecordException):
records.next_batch()
+
+
+@pytest.mark.parametrize("compression_type", [0, 1, 2, 3])
+@pytest.mark.parametrize("magic", [0, 1, 2])
+def test_memory_records_builder(magic, compression_type):
+ builder = MemoryRecordsBuilder(
+ magic=magic, compression_type=compression_type, batch_size=1024 * 10)
+ base_size = builder.size_in_bytes() # V2 has a header before
+
+ msg_sizes = []
+ for offset in range(10):
+ metadata = builder.append(
+ timestamp=10000 + offset, key=b"test", value=b"Super")
+ msg_sizes.append(metadata.size)
+ assert metadata.offset == offset
+ if magic > 0:
+ assert metadata.timestamp == 10000 + offset
+ else:
+ assert metadata.timestamp == -1
+ assert builder.next_offset() == offset + 1
+
+ # Error appends should not leave junk behind, like null bytes or something
+ with pytest.raises(TypeError):
+ builder.append(
+ timestamp=None, key="test", value="Super") # Not bytes, but str
+
+ assert not builder.is_full()
+ size_before_close = builder.size_in_bytes()
+ assert size_before_close == sum(msg_sizes) + base_size
+
+ # Size should remain the same after closing. No traling bytes
+ builder.close()
+ assert builder.compression_rate() > 0
+ expected_size = size_before_close * builder.compression_rate()
+ assert builder.is_full()
+ assert builder.size_in_bytes() == expected_size
+ buffer = builder.buffer()
+ assert len(buffer) == expected_size
+
+ # We can close second time, as in retry
+ builder.close()
+ assert builder.size_in_bytes() == expected_size
+ assert builder.buffer() == buffer
+
+ # Can't append after close
+ meta = builder.append(timestamp=None, key=b"test", value=b"Super")
+ assert meta is None
+
+
+@pytest.mark.parametrize("compression_type", [0, 1, 2, 3])
+@pytest.mark.parametrize("magic", [0, 1, 2])
+def test_memory_records_builder_full(magic, compression_type):
+ builder = MemoryRecordsBuilder(
+ magic=magic, compression_type=compression_type, batch_size=1024 * 10)
+
+ # 1 message should always be appended
+ metadata = builder.append(
+ key=None, timestamp=None, value=b"M" * 10240)
+ assert metadata is not None
+ assert builder.is_full()
+
+ metadata = builder.append(
+ key=None, timestamp=None, value=b"M")
+ assert metadata is None
+ assert builder.next_offset() == 1