summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras <voyn1991@gmail.com>2018-03-18 15:56:47 +0200
committerTaras <voyn1991@gmail.com>2018-03-18 15:56:47 +0200
commitef955155f89e9a665a4b636e4535d7b50fce869d (patch)
treecfe10509abaa41d0efa9f841d3a4aa4e7558ec81
parent18e48dce240eaa7cf714c780c02d1d5cf0b8fca2 (diff)
downloadkafka-python-fix_records.tar.gz
Add codec validators to record parser and builder for all formats.fix_records
Also removed `decorator` dependency on tests Fixes #1443
-rw-r--r--Makefile8
-rw-r--r--kafka/record/default_records.py22
-rw-r--r--kafka/record/legacy_records.py18
-rw-r--r--kafka/record/memory_records.py2
-rw-r--r--requirements-dev.txt2
-rw-r--r--test/conftest.py2
-rw-r--r--test/record/test_default_records.py37
-rw-r--r--test/record/test_legacy_records.py31
-rw-r--r--test/test_consumer_integration.py23
-rw-r--r--test/testutil.py7
10 files changed, 136 insertions, 16 deletions
diff --git a/Makefile b/Makefile
index 5f80ccd..7dfd305 100644
--- a/Makefile
+++ b/Makefile
@@ -23,11 +23,13 @@ test27: build-integration
# Test using py.test directly if you want to use local python. Useful for other
# platforms that require manual installation for C libraries, ie. Windows.
test-local: build-integration
- py.test --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF kafka test
+ KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) py.test \
+ --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF $(FLAGS) kafka test
cov-local: build-integration
- py.test --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \
- --cov-config=.covrc --cov-report html kafka test
+ KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) py.test \
+ --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \
+ --cov-config=.covrc --cov-report html $(FLAGS) kafka test
@echo "open file://`pwd`/htmlcov/index.html"
# Check the readme for syntax errors, which can lead to invalid formatting on
diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py
index 2bbd47e..2ea29dd 100644
--- a/kafka/record/default_records.py
+++ b/kafka/record/default_records.py
@@ -54,17 +54,18 @@
# * Timestamp Type (3)
# * Compression Type (0-2)
-import io
import struct
import time
from kafka.record.abc import ABCRecord, ABCRecordBatch, ABCRecordBatchBuilder
-from kafka.record.util import decode_varint, encode_varint, calc_crc32c, size_of_varint
-
-from kafka.errors import CorruptRecordException
+from kafka.record.util import (
+ decode_varint, encode_varint, calc_crc32c, size_of_varint
+)
+from kafka.errors import CorruptRecordException, UnsupportedCodecError
from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode,
gzip_decode, snappy_decode, lz4_decode
)
+import kafka.codec as codecs
class DefaultRecordBase(object):
@@ -101,6 +102,17 @@ class DefaultRecordBase(object):
LOG_APPEND_TIME = 1
CREATE_TIME = 0
+ def _assert_has_codec(self, compression_type):
+ if compression_type == self.CODEC_GZIP:
+ checker, name = codecs.has_gzip, "gzip"
+ elif compression_type == self.CODEC_SNAPPY:
+ checker, name = codecs.has_snappy, "snappy"
+ elif compression_type == self.CODEC_LZ4:
+ checker, name = codecs.has_lz4, "lz4"
+ if not checker():
+ raise UnsupportedCodecError(
+ "Libraries for {} compression codec not found".format(name))
+
class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch):
@@ -156,6 +168,7 @@ class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch):
if not self._decompressed:
compression_type = self.compression_type
if compression_type != self.CODEC_NONE:
+ self._assert_has_codec(compression_type)
data = memoryview(self._buffer)[self._pos:]
if compression_type == self.CODEC_GZIP:
uncompressed = gzip_decode(data)
@@ -481,6 +494,7 @@ class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder):
def _maybe_compress(self):
if self._compression_type != self.CODEC_NONE:
+ self._assert_has_codec(self._compression_type)
header_size = self.HEADER_STRUCT.size
data = bytes(self._buffer[header_size:])
if self._compression_type == self.CODEC_GZIP:
diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py
index 036e6c4..1bdba81 100644
--- a/kafka/record/legacy_records.py
+++ b/kafka/record/legacy_records.py
@@ -49,9 +49,10 @@ from kafka.record.util import calc_crc32
from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
- gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
+ gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka,
)
-from kafka.errors import CorruptRecordException
+import kafka.codec as codecs
+from kafka.errors import CorruptRecordException, UnsupportedCodecError
class LegacyRecordBase(object):
@@ -112,6 +113,17 @@ class LegacyRecordBase(object):
NO_TIMESTAMP = -1
+ def _assert_has_codec(self, compression_type):
+ if compression_type == self.CODEC_GZIP:
+ checker, name = codecs.has_gzip, "gzip"
+ elif compression_type == self.CODEC_SNAPPY:
+ checker, name = codecs.has_snappy, "snappy"
+ elif compression_type == self.CODEC_LZ4:
+ checker, name = codecs.has_lz4, "lz4"
+ if not checker():
+ raise UnsupportedCodecError(
+ "Libraries for {} compression codec not found".format(name))
+
class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase):
@@ -166,6 +178,7 @@ class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase):
data = self._buffer[pos:pos + value_size]
compression_type = self.compression_type
+ self._assert_has_codec(compression_type)
if compression_type == self.CODEC_GZIP:
uncompressed = gzip_decode(data)
elif compression_type == self.CODEC_SNAPPY:
@@ -419,6 +432,7 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase):
def _maybe_compress(self):
if self._compression_type:
+ self._assert_has_codec(self._compression_type)
data = bytes(self._buffer)
if self._compression_type == self.CODEC_GZIP:
compressed = gzip_encode(data)
diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py
index cb1cc01..844dc84 100644
--- a/kafka/record/memory_records.py
+++ b/kafka/record/memory_records.py
@@ -132,7 +132,7 @@ class MemoryRecordsBuilder(object):
""" Append a message to the buffer.
Returns:
- (int, int): checksum and bytes written
+ RecordMetadata: object containing checksum, bytes written, etc.
"""
if self._closed:
return None, 0
diff --git a/requirements-dev.txt b/requirements-dev.txt
index 88153e0..b98b58a 100644
--- a/requirements-dev.txt
+++ b/requirements-dev.txt
@@ -1,7 +1,6 @@
flake8==3.4.1
pytest==3.4.0
pytest-cov==2.5.1
-pytest-catchlog==1.2.2
docker-py==1.10.6
coveralls==1.2.0
Sphinx==1.6.4
@@ -13,3 +12,4 @@ pylint==1.8.2
pytest-pylint==0.7.1
pytest-mock==1.6.3
sphinx-rtd-theme==0.2.4
+crc32c==1.2
diff --git a/test/conftest.py b/test/conftest.py
index 52ebfb4..dbc2378 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -3,7 +3,6 @@ from __future__ import absolute_import
import inspect
import pytest
-from decorator import decorate
from test.fixtures import KafkaFixture, ZookeeperFixture
from test.testutil import kafka_version, random_string
@@ -73,6 +72,7 @@ def kafka_consumer_factory(kafka_broker, topic, request):
def factory(**kafka_consumer_params):
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
+ params.setdefault('auto_offset_reset', 'earliest')
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params))
return _consumer[0]
diff --git a/test/record/test_default_records.py b/test/record/test_default_records.py
index 193703e..6e2f5e8 100644
--- a/test/record/test_default_records.py
+++ b/test/record/test_default_records.py
@@ -1,9 +1,12 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import pytest
+from mock import patch
+import kafka.codec
from kafka.record.default_records import (
DefaultRecordBatch, DefaultRecordBatchBuilder
)
+from kafka.errors import UnsupportedCodecError
@pytest.mark.parametrize("compression_type", [
@@ -17,7 +20,7 @@ def test_read_write_serde_v2(compression_type):
magic=2, compression_type=compression_type, is_transactional=1,
producer_id=123456, producer_epoch=123, base_sequence=9999,
batch_size=999999)
- headers = [] # [("header1", b"aaa"), ("header2", b"bbb")]
+ headers = [("header1", b"aaa"), ("header2", b"bbb")]
for offset in range(10):
builder.append(
offset, timestamp=9999999, key=b"test", value=b"Super",
@@ -167,3 +170,35 @@ def test_default_batch_size_limit():
2, timestamp=None, key=None, value=b"M" * 700, headers=[])
assert meta is None
assert len(builder.build()) < 1000
+
+
+@pytest.mark.parametrize("compression_type,name,checker_name", [
+ (DefaultRecordBatch.CODEC_GZIP, "gzip", "has_gzip"),
+ (DefaultRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"),
+ (DefaultRecordBatch.CODEC_LZ4, "lz4", "has_lz4")
+])
+@pytest.mark.parametrize("magic", [0, 1])
+def test_unavailable_codec(magic, compression_type, name, checker_name):
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=compression_type, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=1024)
+ builder.append(0, timestamp=None, key=None, value=b"M" * 2000, headers=[])
+ correct_buffer = builder.build()
+
+ with patch.object(kafka.codec, checker_name) as mocked:
+ mocked.return_value = False
+ # Check that builder raises error
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=compression_type, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=1024)
+ error_msg = "Libraries for {} compression codec not found".format(name)
+ with pytest.raises(UnsupportedCodecError, match=error_msg):
+ builder.append(0, timestamp=None, key=None, value=b"M", headers=[])
+ builder.build()
+
+ # Check that reader raises same error
+ batch = DefaultRecordBatch(bytes(correct_buffer))
+ with pytest.raises(UnsupportedCodecError, match=error_msg):
+ list(batch)
diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py
index ffe8a35..23b8636 100644
--- a/test/record/test_legacy_records.py
+++ b/test/record/test_legacy_records.py
@@ -1,8 +1,11 @@
from __future__ import unicode_literals
import pytest
+from mock import patch
from kafka.record.legacy_records import (
LegacyRecordBatch, LegacyRecordBatchBuilder
)
+import kafka.codec
+from kafka.errors import UnsupportedCodecError
@pytest.mark.parametrize("magic", [0, 1])
@@ -164,3 +167,31 @@ def test_legacy_batch_size_limit(magic):
meta = builder.append(2, timestamp=None, key=None, value=b"M" * 700)
assert meta is None
assert len(builder.build()) < 1000
+
+
+@pytest.mark.parametrize("compression_type,name,checker_name", [
+ (LegacyRecordBatch.CODEC_GZIP, "gzip", "has_gzip"),
+ (LegacyRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"),
+ (LegacyRecordBatch.CODEC_LZ4, "lz4", "has_lz4")
+])
+@pytest.mark.parametrize("magic", [0, 1])
+def test_unavailable_codec(magic, compression_type, name, checker_name):
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=compression_type, batch_size=1024)
+ builder.append(0, timestamp=None, key=None, value=b"M")
+ correct_buffer = builder.build()
+
+ with patch.object(kafka.codec, checker_name) as mocked:
+ mocked.return_value = False
+ # Check that builder raises error
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=compression_type, batch_size=1024)
+ error_msg = "Libraries for {} compression codec not found".format(name)
+ with pytest.raises(UnsupportedCodecError, match=error_msg):
+ builder.append(0, timestamp=None, key=None, value=b"M")
+ builder.build()
+
+ # Check that reader raises same error
+ batch = LegacyRecordBatch(bytes(correct_buffer), magic)
+ with pytest.raises(UnsupportedCodecError, match=error_msg):
+ list(batch)
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 78a8a3c..c1b2479 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,6 +1,9 @@
import logging
import os
import time
+from mock import patch
+import pytest
+import kafka.codec
from six.moves import xrange
import six
@@ -13,7 +16,7 @@ from kafka import (
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import (
ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
- KafkaTimeoutError
+ KafkaTimeoutError, UnsupportedCodecError
)
from kafka.structs import (
ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
@@ -25,6 +28,7 @@ from test.testutil import (
send_messages
)
+
def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
"""Test KafkaConsumer
"""
@@ -47,6 +51,23 @@ def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
kafka_consumer.close()
+def test_kafka_consumer_unsupported_encoding(
+ topic, kafka_producer_factory, kafka_consumer_factory):
+ # Send a compressed message
+ producer = kafka_producer_factory(compression_type="gzip")
+ fut = producer.send(topic, b"simple message" * 200)
+ fut.get(timeout=5)
+ producer.close()
+
+ # Consume, but with the related compression codec not available
+ with patch.object(kafka.codec, "has_gzip") as mocked:
+ mocked.return_value = False
+ consumer = kafka_consumer_factory(auto_offset_reset='earliest')
+ error_msg = "Libraries for gzip compression codec not found"
+ with pytest.raises(UnsupportedCodecError, match=error_msg):
+ consumer.poll(timeout_ms=2000)
+
+
class TestConsumerIntegration(KafkaIntegrationTestCase):
maxDiff = None
diff --git a/test/testutil.py b/test/testutil.py
index 4e5db47..365e47f 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -1,10 +1,12 @@
+from __future__ import absolute_import
+
+import functools
import operator
import os
import socket
import time
import uuid
-import decorator
import pytest
from . import unittest
@@ -45,6 +47,7 @@ def kafka_versions(*versions):
validators = map(construct_lambda, versions)
def real_kafka_versions(func):
+ @functools.wraps(func)
def wrapper(func, *args, **kwargs):
version = kafka_version()
@@ -56,7 +59,7 @@ def kafka_versions(*versions):
pytest.skip("unsupported kafka version")
return func(*args, **kwargs)
- return decorator.decorator(wrapper, func)
+ return wrapper
return real_kafka_versions