diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/conftest.py | 2 | ||||
-rw-r--r-- | test/record/test_default_records.py | 37 | ||||
-rw-r--r-- | test/record/test_legacy_records.py | 31 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 23 | ||||
-rw-r--r-- | test/testutil.py | 7 |
5 files changed, 95 insertions, 5 deletions
diff --git a/test/conftest.py b/test/conftest.py index 52ebfb4..dbc2378 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -3,7 +3,6 @@ from __future__ import absolute_import import inspect import pytest -from decorator import decorate from test.fixtures import KafkaFixture, ZookeeperFixture from test.testutil import kafka_version, random_string @@ -73,6 +72,7 @@ def kafka_consumer_factory(kafka_broker, topic, request): def factory(**kafka_consumer_params): params = {} if kafka_consumer_params is None else kafka_consumer_params.copy() params.setdefault('client_id', 'consumer_%s' % (request.node.name,)) + params.setdefault('auto_offset_reset', 'earliest') _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params)) return _consumer[0] diff --git a/test/record/test_default_records.py b/test/record/test_default_records.py index 193703e..6e2f5e8 100644 --- a/test/record/test_default_records.py +++ b/test/record/test_default_records.py @@ -1,9 +1,12 @@ # -*- coding: utf-8 -*- from __future__ import unicode_literals import pytest +from mock import patch +import kafka.codec from kafka.record.default_records import ( DefaultRecordBatch, DefaultRecordBatchBuilder ) +from kafka.errors import UnsupportedCodecError @pytest.mark.parametrize("compression_type", [ @@ -17,7 +20,7 @@ def test_read_write_serde_v2(compression_type): magic=2, compression_type=compression_type, is_transactional=1, producer_id=123456, producer_epoch=123, base_sequence=9999, batch_size=999999) - headers = [] # [("header1", b"aaa"), ("header2", b"bbb")] + headers = [("header1", b"aaa"), ("header2", b"bbb")] for offset in range(10): builder.append( offset, timestamp=9999999, key=b"test", value=b"Super", @@ -167,3 +170,35 @@ def test_default_batch_size_limit(): 2, timestamp=None, key=None, value=b"M" * 700, headers=[]) assert meta is None assert len(builder.build()) < 1000 + + +@pytest.mark.parametrize("compression_type,name,checker_name", [ + (DefaultRecordBatch.CODEC_GZIP, "gzip", "has_gzip"), + (DefaultRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"), + (DefaultRecordBatch.CODEC_LZ4, "lz4", "has_lz4") +]) +@pytest.mark.parametrize("magic", [0, 1]) +def test_unavailable_codec(magic, compression_type, name, checker_name): + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=compression_type, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=1024) + builder.append(0, timestamp=None, key=None, value=b"M" * 2000, headers=[]) + correct_buffer = builder.build() + + with patch.object(kafka.codec, checker_name) as mocked: + mocked.return_value = False + # Check that builder raises error + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=compression_type, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=1024) + error_msg = "Libraries for {} compression codec not found".format(name) + with pytest.raises(UnsupportedCodecError, match=error_msg): + builder.append(0, timestamp=None, key=None, value=b"M", headers=[]) + builder.build() + + # Check that reader raises same error + batch = DefaultRecordBatch(bytes(correct_buffer)) + with pytest.raises(UnsupportedCodecError, match=error_msg): + list(batch) diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py index ffe8a35..23b8636 100644 --- a/test/record/test_legacy_records.py +++ b/test/record/test_legacy_records.py @@ -1,8 +1,11 @@ from __future__ import unicode_literals import pytest +from mock import patch from kafka.record.legacy_records import ( LegacyRecordBatch, LegacyRecordBatchBuilder ) +import kafka.codec +from kafka.errors import UnsupportedCodecError @pytest.mark.parametrize("magic", [0, 1]) @@ -164,3 +167,31 @@ def test_legacy_batch_size_limit(magic): meta = builder.append(2, timestamp=None, key=None, value=b"M" * 700) assert meta is None assert len(builder.build()) < 1000 + + +@pytest.mark.parametrize("compression_type,name,checker_name", [ + (LegacyRecordBatch.CODEC_GZIP, "gzip", "has_gzip"), + (LegacyRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"), + (LegacyRecordBatch.CODEC_LZ4, "lz4", "has_lz4") +]) +@pytest.mark.parametrize("magic", [0, 1]) +def test_unavailable_codec(magic, compression_type, name, checker_name): + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=compression_type, batch_size=1024) + builder.append(0, timestamp=None, key=None, value=b"M") + correct_buffer = builder.build() + + with patch.object(kafka.codec, checker_name) as mocked: + mocked.return_value = False + # Check that builder raises error + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=compression_type, batch_size=1024) + error_msg = "Libraries for {} compression codec not found".format(name) + with pytest.raises(UnsupportedCodecError, match=error_msg): + builder.append(0, timestamp=None, key=None, value=b"M") + builder.build() + + # Check that reader raises same error + batch = LegacyRecordBatch(bytes(correct_buffer), magic) + with pytest.raises(UnsupportedCodecError, match=error_msg): + list(batch) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 78a8a3c..c1b2479 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,6 +1,9 @@ import logging import os import time +from mock import patch +import pytest +import kafka.codec from six.moves import xrange import six @@ -13,7 +16,7 @@ from kafka import ( from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ( ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError, - KafkaTimeoutError + KafkaTimeoutError, UnsupportedCodecError ) from kafka.structs import ( ProduceRequestPayload, TopicPartition, OffsetAndTimestamp @@ -25,6 +28,7 @@ from test.testutil import ( send_messages ) + def test_kafka_consumer(simple_client, topic, kafka_consumer_factory): """Test KafkaConsumer """ @@ -47,6 +51,23 @@ def test_kafka_consumer(simple_client, topic, kafka_consumer_factory): kafka_consumer.close() +def test_kafka_consumer_unsupported_encoding( + topic, kafka_producer_factory, kafka_consumer_factory): + # Send a compressed message + producer = kafka_producer_factory(compression_type="gzip") + fut = producer.send(topic, b"simple message" * 200) + fut.get(timeout=5) + producer.close() + + # Consume, but with the related compression codec not available + with patch.object(kafka.codec, "has_gzip") as mocked: + mocked.return_value = False + consumer = kafka_consumer_factory(auto_offset_reset='earliest') + error_msg = "Libraries for gzip compression codec not found" + with pytest.raises(UnsupportedCodecError, match=error_msg): + consumer.poll(timeout_ms=2000) + + class TestConsumerIntegration(KafkaIntegrationTestCase): maxDiff = None diff --git a/test/testutil.py b/test/testutil.py index 4e5db47..365e47f 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -1,10 +1,12 @@ +from __future__ import absolute_import + +import functools import operator import os import socket import time import uuid -import decorator import pytest from . import unittest @@ -45,6 +47,7 @@ def kafka_versions(*versions): validators = map(construct_lambda, versions) def real_kafka_versions(func): + @functools.wraps(func) def wrapper(func, *args, **kwargs): version = kafka_version() @@ -56,7 +59,7 @@ def kafka_versions(*versions): pytest.skip("unsupported kafka version") return func(*args, **kwargs) - return decorator.decorator(wrapper, func) + return wrapper return real_kafka_versions |