summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras <voyn1991@gmail.com>2018-03-19 00:09:29 +0200
committerDana Powers <dana.powers@gmail.com>2018-04-18 13:41:14 -0700
commitd9e41c8e8fb7033a3e9a9a7654bc2b0125f337a0 (patch)
tree1e2c0f300a8a8981c1e170505899a205f67308e5
parent1c71dfc3c321372c808f45f569ae41352f420e8f (diff)
downloadkafka-python-d9e41c8e8fb7033a3e9a9a7654bc2b0125f337a0.tar.gz
Fix MemoryRecord bugs re error handling and add test coverage (#1448)
-rw-r--r--kafka/record/__init__.py4
-rw-r--r--kafka/record/default_records.py2
-rw-r--r--kafka/record/memory_records.py8
-rw-r--r--test/record/test_records.py69
4 files changed, 75 insertions, 8 deletions
diff --git a/kafka/record/__init__.py b/kafka/record/__init__.py
index cbd70d9..93936df 100644
--- a/kafka/record/__init__.py
+++ b/kafka/record/__init__.py
@@ -1,3 +1,3 @@
-from kafka.record.memory_records import MemoryRecords
+from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder
-__all__ = ["MemoryRecords"]
+__all__ = ["MemoryRecords", "MemoryRecordsBuilder"]
diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py
index 2bbd47e..840868a 100644
--- a/kafka/record/default_records.py
+++ b/kafka/record/default_records.py
@@ -237,7 +237,7 @@ class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch):
# validate whether we have read all header bytes in the current record
if pos - start_pos != length:
- CorruptRecordException(
+ raise CorruptRecordException(
"Invalid record size: expected to read {} bytes in record "
"payload, but instead read {}".format(length, pos - start_pos))
self._pos = pos
diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py
index cb1cc01..f67c4fe 100644
--- a/kafka/record/memory_records.py
+++ b/kafka/record/memory_records.py
@@ -18,6 +18,7 @@
#
# So we can iterate over batches just by knowing offsets of Length. Magic is
# used to construct the correct class for Batch itself.
+from __future__ import division
import struct
@@ -131,15 +132,14 @@ class MemoryRecordsBuilder(object):
def append(self, timestamp, key, value, headers=[]):
""" Append a message to the buffer.
- Returns:
- (int, int): checksum and bytes written
+ Returns: RecordMetadata or None if unable to append
"""
if self._closed:
- return None, 0
+ return None
offset = self._next_offset
metadata = self._builder.append(offset, timestamp, key, value, headers)
- # Return of 0 size means there's no space to add a new message
+ # Return of None means there's no space to add a new message
if metadata is None:
return None
diff --git a/test/record/test_records.py b/test/record/test_records.py
index 7306bbc..224989f 100644
--- a/test/record/test_records.py
+++ b/test/record/test_records.py
@@ -1,5 +1,7 @@
+# -*- coding: utf-8 -*-
+from __future__ import unicode_literals
import pytest
-from kafka.record import MemoryRecords
+from kafka.record import MemoryRecords, MemoryRecordsBuilder
from kafka.errors import CorruptRecordException
# This is real live data from Kafka 11 broker
@@ -152,3 +154,68 @@ def test_memory_records_corrupt():
)
with pytest.raises(CorruptRecordException):
records.next_batch()
+
+
+@pytest.mark.parametrize("compression_type", [0, 1, 2, 3])
+@pytest.mark.parametrize("magic", [0, 1, 2])
+def test_memory_records_builder(magic, compression_type):
+ builder = MemoryRecordsBuilder(
+ magic=magic, compression_type=compression_type, batch_size=1024 * 10)
+ base_size = builder.size_in_bytes() # V2 has a header before
+
+ msg_sizes = []
+ for offset in range(10):
+ metadata = builder.append(
+ timestamp=10000 + offset, key=b"test", value=b"Super")
+ msg_sizes.append(metadata.size)
+ assert metadata.offset == offset
+ if magic > 0:
+ assert metadata.timestamp == 10000 + offset
+ else:
+ assert metadata.timestamp == -1
+ assert builder.next_offset() == offset + 1
+
+ # Error appends should not leave junk behind, like null bytes or something
+ with pytest.raises(TypeError):
+ builder.append(
+ timestamp=None, key="test", value="Super") # Not bytes, but str
+
+ assert not builder.is_full()
+ size_before_close = builder.size_in_bytes()
+ assert size_before_close == sum(msg_sizes) + base_size
+
+ # Size should remain the same after closing. No traling bytes
+ builder.close()
+ assert builder.compression_rate() > 0
+ expected_size = size_before_close * builder.compression_rate()
+ assert builder.is_full()
+ assert builder.size_in_bytes() == expected_size
+ buffer = builder.buffer()
+ assert len(buffer) == expected_size
+
+ # We can close second time, as in retry
+ builder.close()
+ assert builder.size_in_bytes() == expected_size
+ assert builder.buffer() == buffer
+
+ # Can't append after close
+ meta = builder.append(timestamp=None, key=b"test", value=b"Super")
+ assert meta is None
+
+
+@pytest.mark.parametrize("compression_type", [0, 1, 2, 3])
+@pytest.mark.parametrize("magic", [0, 1, 2])
+def test_memory_records_builder_full(magic, compression_type):
+ builder = MemoryRecordsBuilder(
+ magic=magic, compression_type=compression_type, batch_size=1024 * 10)
+
+ # 1 message should always be appended
+ metadata = builder.append(
+ key=None, timestamp=None, value=b"M" * 10240)
+ assert metadata is not None
+ assert builder.is_full()
+
+ metadata = builder.append(
+ key=None, timestamp=None, value=b"M")
+ assert metadata is None
+ assert builder.next_offset() == 1