diff options
author | Taras <voyn1991@gmail.com> | 2018-03-19 00:09:29 +0200 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2018-04-18 13:41:14 -0700 |
commit | d9e41c8e8fb7033a3e9a9a7654bc2b0125f337a0 (patch) | |
tree | 1e2c0f300a8a8981c1e170505899a205f67308e5 | |
parent | 1c71dfc3c321372c808f45f569ae41352f420e8f (diff) | |
download | kafka-python-d9e41c8e8fb7033a3e9a9a7654bc2b0125f337a0.tar.gz |
Fix MemoryRecord bugs re error handling and add test coverage (#1448)
-rw-r--r-- | kafka/record/__init__.py | 4 | ||||
-rw-r--r-- | kafka/record/default_records.py | 2 | ||||
-rw-r--r-- | kafka/record/memory_records.py | 8 | ||||
-rw-r--r-- | test/record/test_records.py | 69 |
4 files changed, 75 insertions, 8 deletions
diff --git a/kafka/record/__init__.py b/kafka/record/__init__.py index cbd70d9..93936df 100644 --- a/kafka/record/__init__.py +++ b/kafka/record/__init__.py @@ -1,3 +1,3 @@ -from kafka.record.memory_records import MemoryRecords +from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder -__all__ = ["MemoryRecords"] +__all__ = ["MemoryRecords", "MemoryRecordsBuilder"] diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 2bbd47e..840868a 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -237,7 +237,7 @@ class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch): # validate whether we have read all header bytes in the current record if pos - start_pos != length: - CorruptRecordException( + raise CorruptRecordException( "Invalid record size: expected to read {} bytes in record " "payload, but instead read {}".format(length, pos - start_pos)) self._pos = pos diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index cb1cc01..f67c4fe 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -18,6 +18,7 @@ # # So we can iterate over batches just by knowing offsets of Length. Magic is # used to construct the correct class for Batch itself. +from __future__ import division import struct @@ -131,15 +132,14 @@ class MemoryRecordsBuilder(object): def append(self, timestamp, key, value, headers=[]): """ Append a message to the buffer. - Returns: - (int, int): checksum and bytes written + Returns: RecordMetadata or None if unable to append """ if self._closed: - return None, 0 + return None offset = self._next_offset metadata = self._builder.append(offset, timestamp, key, value, headers) - # Return of 0 size means there's no space to add a new message + # Return of None means there's no space to add a new message if metadata is None: return None diff --git a/test/record/test_records.py b/test/record/test_records.py index 7306bbc..224989f 100644 --- a/test/record/test_records.py +++ b/test/record/test_records.py @@ -1,5 +1,7 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals import pytest -from kafka.record import MemoryRecords +from kafka.record import MemoryRecords, MemoryRecordsBuilder from kafka.errors import CorruptRecordException # This is real live data from Kafka 11 broker @@ -152,3 +154,68 @@ def test_memory_records_corrupt(): ) with pytest.raises(CorruptRecordException): records.next_batch() + + +@pytest.mark.parametrize("compression_type", [0, 1, 2, 3]) +@pytest.mark.parametrize("magic", [0, 1, 2]) +def test_memory_records_builder(magic, compression_type): + builder = MemoryRecordsBuilder( + magic=magic, compression_type=compression_type, batch_size=1024 * 10) + base_size = builder.size_in_bytes() # V2 has a header before + + msg_sizes = [] + for offset in range(10): + metadata = builder.append( + timestamp=10000 + offset, key=b"test", value=b"Super") + msg_sizes.append(metadata.size) + assert metadata.offset == offset + if magic > 0: + assert metadata.timestamp == 10000 + offset + else: + assert metadata.timestamp == -1 + assert builder.next_offset() == offset + 1 + + # Error appends should not leave junk behind, like null bytes or something + with pytest.raises(TypeError): + builder.append( + timestamp=None, key="test", value="Super") # Not bytes, but str + + assert not builder.is_full() + size_before_close = builder.size_in_bytes() + assert size_before_close == sum(msg_sizes) + base_size + + # Size should remain the same after closing. No traling bytes + builder.close() + assert builder.compression_rate() > 0 + expected_size = size_before_close * builder.compression_rate() + assert builder.is_full() + assert builder.size_in_bytes() == expected_size + buffer = builder.buffer() + assert len(buffer) == expected_size + + # We can close second time, as in retry + builder.close() + assert builder.size_in_bytes() == expected_size + assert builder.buffer() == buffer + + # Can't append after close + meta = builder.append(timestamp=None, key=b"test", value=b"Super") + assert meta is None + + +@pytest.mark.parametrize("compression_type", [0, 1, 2, 3]) +@pytest.mark.parametrize("magic", [0, 1, 2]) +def test_memory_records_builder_full(magic, compression_type): + builder = MemoryRecordsBuilder( + magic=magic, compression_type=compression_type, batch_size=1024 * 10) + + # 1 message should always be appended + metadata = builder.append( + key=None, timestamp=None, value=b"M" * 10240) + assert metadata is not None + assert builder.is_full() + + metadata = builder.append( + key=None, timestamp=None, value=b"M") + assert metadata is None + assert builder.next_offset() == 1 |