diff options
author | Taras <voyn1991@gmail.com> | 2018-03-18 15:56:47 +0200 |
---|---|---|
committer | Taras <voyn1991@gmail.com> | 2018-03-18 15:56:47 +0200 |
commit | ef955155f89e9a665a4b636e4535d7b50fce869d (patch) | |
tree | cfe10509abaa41d0efa9f841d3a4aa4e7558ec81 | |
parent | 18e48dce240eaa7cf714c780c02d1d5cf0b8fca2 (diff) | |
download | kafka-python-fix_records.tar.gz |
Add codec validators to record parser and builder for all formats.fix_records
Also removed `decorator` dependency on tests
Fixes #1443
-rw-r--r-- | Makefile | 8 | ||||
-rw-r--r-- | kafka/record/default_records.py | 22 | ||||
-rw-r--r-- | kafka/record/legacy_records.py | 18 | ||||
-rw-r--r-- | kafka/record/memory_records.py | 2 | ||||
-rw-r--r-- | requirements-dev.txt | 2 | ||||
-rw-r--r-- | test/conftest.py | 2 | ||||
-rw-r--r-- | test/record/test_default_records.py | 37 | ||||
-rw-r--r-- | test/record/test_legacy_records.py | 31 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 23 | ||||
-rw-r--r-- | test/testutil.py | 7 |
10 files changed, 136 insertions, 16 deletions
@@ -23,11 +23,13 @@ test27: build-integration # Test using py.test directly if you want to use local python. Useful for other # platforms that require manual installation for C libraries, ie. Windows. test-local: build-integration - py.test --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF kafka test + KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) py.test \ + --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF $(FLAGS) kafka test cov-local: build-integration - py.test --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \ - --cov-config=.covrc --cov-report html kafka test + KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) py.test \ + --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \ + --cov-config=.covrc --cov-report html $(FLAGS) kafka test @echo "open file://`pwd`/htmlcov/index.html" # Check the readme for syntax errors, which can lead to invalid formatting on diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 2bbd47e..2ea29dd 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -54,17 +54,18 @@ # * Timestamp Type (3) # * Compression Type (0-2) -import io import struct import time from kafka.record.abc import ABCRecord, ABCRecordBatch, ABCRecordBatchBuilder -from kafka.record.util import decode_varint, encode_varint, calc_crc32c, size_of_varint - -from kafka.errors import CorruptRecordException +from kafka.record.util import ( + decode_varint, encode_varint, calc_crc32c, size_of_varint +) +from kafka.errors import CorruptRecordException, UnsupportedCodecError from kafka.codec import ( gzip_encode, snappy_encode, lz4_encode, gzip_decode, snappy_decode, lz4_decode ) +import kafka.codec as codecs class DefaultRecordBase(object): @@ -101,6 +102,17 @@ class DefaultRecordBase(object): LOG_APPEND_TIME = 1 CREATE_TIME = 0 + def _assert_has_codec(self, compression_type): + if compression_type == self.CODEC_GZIP: + checker, name = codecs.has_gzip, "gzip" + elif compression_type == self.CODEC_SNAPPY: + checker, name = codecs.has_snappy, "snappy" + elif compression_type == self.CODEC_LZ4: + checker, name = codecs.has_lz4, "lz4" + if not checker(): + raise UnsupportedCodecError( + "Libraries for {} compression codec not found".format(name)) + class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch): @@ -156,6 +168,7 @@ class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch): if not self._decompressed: compression_type = self.compression_type if compression_type != self.CODEC_NONE: + self._assert_has_codec(compression_type) data = memoryview(self._buffer)[self._pos:] if compression_type == self.CODEC_GZIP: uncompressed = gzip_decode(data) @@ -481,6 +494,7 @@ class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder): def _maybe_compress(self): if self._compression_type != self.CODEC_NONE: + self._assert_has_codec(self._compression_type) header_size = self.HEADER_STRUCT.size data = bytes(self._buffer[header_size:]) if self._compression_type == self.CODEC_GZIP: diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 036e6c4..1bdba81 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -49,9 +49,10 @@ from kafka.record.util import calc_crc32 from kafka.codec import ( gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka, - gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka + gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka, ) -from kafka.errors import CorruptRecordException +import kafka.codec as codecs +from kafka.errors import CorruptRecordException, UnsupportedCodecError class LegacyRecordBase(object): @@ -112,6 +113,17 @@ class LegacyRecordBase(object): NO_TIMESTAMP = -1 + def _assert_has_codec(self, compression_type): + if compression_type == self.CODEC_GZIP: + checker, name = codecs.has_gzip, "gzip" + elif compression_type == self.CODEC_SNAPPY: + checker, name = codecs.has_snappy, "snappy" + elif compression_type == self.CODEC_LZ4: + checker, name = codecs.has_lz4, "lz4" + if not checker(): + raise UnsupportedCodecError( + "Libraries for {} compression codec not found".format(name)) + class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase): @@ -166,6 +178,7 @@ class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase): data = self._buffer[pos:pos + value_size] compression_type = self.compression_type + self._assert_has_codec(compression_type) if compression_type == self.CODEC_GZIP: uncompressed = gzip_decode(data) elif compression_type == self.CODEC_SNAPPY: @@ -419,6 +432,7 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase): def _maybe_compress(self): if self._compression_type: + self._assert_has_codec(self._compression_type) data = bytes(self._buffer) if self._compression_type == self.CODEC_GZIP: compressed = gzip_encode(data) diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index cb1cc01..844dc84 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -132,7 +132,7 @@ class MemoryRecordsBuilder(object): """ Append a message to the buffer. Returns: - (int, int): checksum and bytes written + RecordMetadata: object containing checksum, bytes written, etc. """ if self._closed: return None, 0 diff --git a/requirements-dev.txt b/requirements-dev.txt index 88153e0..b98b58a 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,7 +1,6 @@ flake8==3.4.1 pytest==3.4.0 pytest-cov==2.5.1 -pytest-catchlog==1.2.2 docker-py==1.10.6 coveralls==1.2.0 Sphinx==1.6.4 @@ -13,3 +12,4 @@ pylint==1.8.2 pytest-pylint==0.7.1 pytest-mock==1.6.3 sphinx-rtd-theme==0.2.4 +crc32c==1.2 diff --git a/test/conftest.py b/test/conftest.py index 52ebfb4..dbc2378 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -3,7 +3,6 @@ from __future__ import absolute_import import inspect import pytest -from decorator import decorate from test.fixtures import KafkaFixture, ZookeeperFixture from test.testutil import kafka_version, random_string @@ -73,6 +72,7 @@ def kafka_consumer_factory(kafka_broker, topic, request): def factory(**kafka_consumer_params): params = {} if kafka_consumer_params is None else kafka_consumer_params.copy() params.setdefault('client_id', 'consumer_%s' % (request.node.name,)) + params.setdefault('auto_offset_reset', 'earliest') _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params)) return _consumer[0] diff --git a/test/record/test_default_records.py b/test/record/test_default_records.py index 193703e..6e2f5e8 100644 --- a/test/record/test_default_records.py +++ b/test/record/test_default_records.py @@ -1,9 +1,12 @@ # -*- coding: utf-8 -*- from __future__ import unicode_literals import pytest +from mock import patch +import kafka.codec from kafka.record.default_records import ( DefaultRecordBatch, DefaultRecordBatchBuilder ) +from kafka.errors import UnsupportedCodecError @pytest.mark.parametrize("compression_type", [ @@ -17,7 +20,7 @@ def test_read_write_serde_v2(compression_type): magic=2, compression_type=compression_type, is_transactional=1, producer_id=123456, producer_epoch=123, base_sequence=9999, batch_size=999999) - headers = [] # [("header1", b"aaa"), ("header2", b"bbb")] + headers = [("header1", b"aaa"), ("header2", b"bbb")] for offset in range(10): builder.append( offset, timestamp=9999999, key=b"test", value=b"Super", @@ -167,3 +170,35 @@ def test_default_batch_size_limit(): 2, timestamp=None, key=None, value=b"M" * 700, headers=[]) assert meta is None assert len(builder.build()) < 1000 + + +@pytest.mark.parametrize("compression_type,name,checker_name", [ + (DefaultRecordBatch.CODEC_GZIP, "gzip", "has_gzip"), + (DefaultRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"), + (DefaultRecordBatch.CODEC_LZ4, "lz4", "has_lz4") +]) +@pytest.mark.parametrize("magic", [0, 1]) +def test_unavailable_codec(magic, compression_type, name, checker_name): + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=compression_type, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=1024) + builder.append(0, timestamp=None, key=None, value=b"M" * 2000, headers=[]) + correct_buffer = builder.build() + + with patch.object(kafka.codec, checker_name) as mocked: + mocked.return_value = False + # Check that builder raises error + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=compression_type, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=1024) + error_msg = "Libraries for {} compression codec not found".format(name) + with pytest.raises(UnsupportedCodecError, match=error_msg): + builder.append(0, timestamp=None, key=None, value=b"M", headers=[]) + builder.build() + + # Check that reader raises same error + batch = DefaultRecordBatch(bytes(correct_buffer)) + with pytest.raises(UnsupportedCodecError, match=error_msg): + list(batch) diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py index ffe8a35..23b8636 100644 --- a/test/record/test_legacy_records.py +++ b/test/record/test_legacy_records.py @@ -1,8 +1,11 @@ from __future__ import unicode_literals import pytest +from mock import patch from kafka.record.legacy_records import ( LegacyRecordBatch, LegacyRecordBatchBuilder ) +import kafka.codec +from kafka.errors import UnsupportedCodecError @pytest.mark.parametrize("magic", [0, 1]) @@ -164,3 +167,31 @@ def test_legacy_batch_size_limit(magic): meta = builder.append(2, timestamp=None, key=None, value=b"M" * 700) assert meta is None assert len(builder.build()) < 1000 + + +@pytest.mark.parametrize("compression_type,name,checker_name", [ + (LegacyRecordBatch.CODEC_GZIP, "gzip", "has_gzip"), + (LegacyRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"), + (LegacyRecordBatch.CODEC_LZ4, "lz4", "has_lz4") +]) +@pytest.mark.parametrize("magic", [0, 1]) +def test_unavailable_codec(magic, compression_type, name, checker_name): + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=compression_type, batch_size=1024) + builder.append(0, timestamp=None, key=None, value=b"M") + correct_buffer = builder.build() + + with patch.object(kafka.codec, checker_name) as mocked: + mocked.return_value = False + # Check that builder raises error + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=compression_type, batch_size=1024) + error_msg = "Libraries for {} compression codec not found".format(name) + with pytest.raises(UnsupportedCodecError, match=error_msg): + builder.append(0, timestamp=None, key=None, value=b"M") + builder.build() + + # Check that reader raises same error + batch = LegacyRecordBatch(bytes(correct_buffer), magic) + with pytest.raises(UnsupportedCodecError, match=error_msg): + list(batch) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 78a8a3c..c1b2479 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,6 +1,9 @@ import logging import os import time +from mock import patch +import pytest +import kafka.codec from six.moves import xrange import six @@ -13,7 +16,7 @@ from kafka import ( from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ( ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError, - KafkaTimeoutError + KafkaTimeoutError, UnsupportedCodecError ) from kafka.structs import ( ProduceRequestPayload, TopicPartition, OffsetAndTimestamp @@ -25,6 +28,7 @@ from test.testutil import ( send_messages ) + def test_kafka_consumer(simple_client, topic, kafka_consumer_factory): """Test KafkaConsumer """ @@ -47,6 +51,23 @@ def test_kafka_consumer(simple_client, topic, kafka_consumer_factory): kafka_consumer.close() +def test_kafka_consumer_unsupported_encoding( + topic, kafka_producer_factory, kafka_consumer_factory): + # Send a compressed message + producer = kafka_producer_factory(compression_type="gzip") + fut = producer.send(topic, b"simple message" * 200) + fut.get(timeout=5) + producer.close() + + # Consume, but with the related compression codec not available + with patch.object(kafka.codec, "has_gzip") as mocked: + mocked.return_value = False + consumer = kafka_consumer_factory(auto_offset_reset='earliest') + error_msg = "Libraries for gzip compression codec not found" + with pytest.raises(UnsupportedCodecError, match=error_msg): + consumer.poll(timeout_ms=2000) + + class TestConsumerIntegration(KafkaIntegrationTestCase): maxDiff = None diff --git a/test/testutil.py b/test/testutil.py index 4e5db47..365e47f 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -1,10 +1,12 @@ +from __future__ import absolute_import + +import functools import operator import os import socket import time import uuid -import decorator import pytest from . import unittest @@ -45,6 +47,7 @@ def kafka_versions(*versions): validators = map(construct_lambda, versions) def real_kafka_versions(func): + @functools.wraps(func) def wrapper(func, *args, **kwargs): version = kafka_version() @@ -56,7 +59,7 @@ def kafka_versions(*versions): pytest.skip("unsupported kafka version") return func(*args, **kwargs) - return decorator.decorator(wrapper, func) + return wrapper return real_kafka_versions |