summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/conftest.py2
-rw-r--r--test/record/test_default_records.py37
-rw-r--r--test/record/test_legacy_records.py31
-rw-r--r--test/test_consumer_integration.py23
-rw-r--r--test/testutil.py7
5 files changed, 95 insertions, 5 deletions
diff --git a/test/conftest.py b/test/conftest.py
index 52ebfb4..dbc2378 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -3,7 +3,6 @@ from __future__ import absolute_import
import inspect
import pytest
-from decorator import decorate
from test.fixtures import KafkaFixture, ZookeeperFixture
from test.testutil import kafka_version, random_string
@@ -73,6 +72,7 @@ def kafka_consumer_factory(kafka_broker, topic, request):
def factory(**kafka_consumer_params):
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
+ params.setdefault('auto_offset_reset', 'earliest')
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params))
return _consumer[0]
diff --git a/test/record/test_default_records.py b/test/record/test_default_records.py
index 193703e..6e2f5e8 100644
--- a/test/record/test_default_records.py
+++ b/test/record/test_default_records.py
@@ -1,9 +1,12 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import pytest
+from mock import patch
+import kafka.codec
from kafka.record.default_records import (
DefaultRecordBatch, DefaultRecordBatchBuilder
)
+from kafka.errors import UnsupportedCodecError
@pytest.mark.parametrize("compression_type", [
@@ -17,7 +20,7 @@ def test_read_write_serde_v2(compression_type):
magic=2, compression_type=compression_type, is_transactional=1,
producer_id=123456, producer_epoch=123, base_sequence=9999,
batch_size=999999)
- headers = [] # [("header1", b"aaa"), ("header2", b"bbb")]
+ headers = [("header1", b"aaa"), ("header2", b"bbb")]
for offset in range(10):
builder.append(
offset, timestamp=9999999, key=b"test", value=b"Super",
@@ -167,3 +170,35 @@ def test_default_batch_size_limit():
2, timestamp=None, key=None, value=b"M" * 700, headers=[])
assert meta is None
assert len(builder.build()) < 1000
+
+
+@pytest.mark.parametrize("compression_type,name,checker_name", [
+ (DefaultRecordBatch.CODEC_GZIP, "gzip", "has_gzip"),
+ (DefaultRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"),
+ (DefaultRecordBatch.CODEC_LZ4, "lz4", "has_lz4")
+])
+@pytest.mark.parametrize("magic", [0, 1])
+def test_unavailable_codec(magic, compression_type, name, checker_name):
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=compression_type, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=1024)
+ builder.append(0, timestamp=None, key=None, value=b"M" * 2000, headers=[])
+ correct_buffer = builder.build()
+
+ with patch.object(kafka.codec, checker_name) as mocked:
+ mocked.return_value = False
+ # Check that builder raises error
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=compression_type, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=1024)
+ error_msg = "Libraries for {} compression codec not found".format(name)
+ with pytest.raises(UnsupportedCodecError, match=error_msg):
+ builder.append(0, timestamp=None, key=None, value=b"M", headers=[])
+ builder.build()
+
+ # Check that reader raises same error
+ batch = DefaultRecordBatch(bytes(correct_buffer))
+ with pytest.raises(UnsupportedCodecError, match=error_msg):
+ list(batch)
diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py
index ffe8a35..23b8636 100644
--- a/test/record/test_legacy_records.py
+++ b/test/record/test_legacy_records.py
@@ -1,8 +1,11 @@
from __future__ import unicode_literals
import pytest
+from mock import patch
from kafka.record.legacy_records import (
LegacyRecordBatch, LegacyRecordBatchBuilder
)
+import kafka.codec
+from kafka.errors import UnsupportedCodecError
@pytest.mark.parametrize("magic", [0, 1])
@@ -164,3 +167,31 @@ def test_legacy_batch_size_limit(magic):
meta = builder.append(2, timestamp=None, key=None, value=b"M" * 700)
assert meta is None
assert len(builder.build()) < 1000
+
+
+@pytest.mark.parametrize("compression_type,name,checker_name", [
+ (LegacyRecordBatch.CODEC_GZIP, "gzip", "has_gzip"),
+ (LegacyRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"),
+ (LegacyRecordBatch.CODEC_LZ4, "lz4", "has_lz4")
+])
+@pytest.mark.parametrize("magic", [0, 1])
+def test_unavailable_codec(magic, compression_type, name, checker_name):
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=compression_type, batch_size=1024)
+ builder.append(0, timestamp=None, key=None, value=b"M")
+ correct_buffer = builder.build()
+
+ with patch.object(kafka.codec, checker_name) as mocked:
+ mocked.return_value = False
+ # Check that builder raises error
+ builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=compression_type, batch_size=1024)
+ error_msg = "Libraries for {} compression codec not found".format(name)
+ with pytest.raises(UnsupportedCodecError, match=error_msg):
+ builder.append(0, timestamp=None, key=None, value=b"M")
+ builder.build()
+
+ # Check that reader raises same error
+ batch = LegacyRecordBatch(bytes(correct_buffer), magic)
+ with pytest.raises(UnsupportedCodecError, match=error_msg):
+ list(batch)
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 78a8a3c..c1b2479 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,6 +1,9 @@
import logging
import os
import time
+from mock import patch
+import pytest
+import kafka.codec
from six.moves import xrange
import six
@@ -13,7 +16,7 @@ from kafka import (
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import (
ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
- KafkaTimeoutError
+ KafkaTimeoutError, UnsupportedCodecError
)
from kafka.structs import (
ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
@@ -25,6 +28,7 @@ from test.testutil import (
send_messages
)
+
def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
"""Test KafkaConsumer
"""
@@ -47,6 +51,23 @@ def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
kafka_consumer.close()
+def test_kafka_consumer_unsupported_encoding(
+ topic, kafka_producer_factory, kafka_consumer_factory):
+ # Send a compressed message
+ producer = kafka_producer_factory(compression_type="gzip")
+ fut = producer.send(topic, b"simple message" * 200)
+ fut.get(timeout=5)
+ producer.close()
+
+ # Consume, but with the related compression codec not available
+ with patch.object(kafka.codec, "has_gzip") as mocked:
+ mocked.return_value = False
+ consumer = kafka_consumer_factory(auto_offset_reset='earliest')
+ error_msg = "Libraries for gzip compression codec not found"
+ with pytest.raises(UnsupportedCodecError, match=error_msg):
+ consumer.poll(timeout_ms=2000)
+
+
class TestConsumerIntegration(KafkaIntegrationTestCase):
maxDiff = None
diff --git a/test/testutil.py b/test/testutil.py
index 4e5db47..365e47f 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -1,10 +1,12 @@
+from __future__ import absolute_import
+
+import functools
import operator
import os
import socket
import time
import uuid
-import decorator
import pytest
from . import unittest
@@ -45,6 +47,7 @@ def kafka_versions(*versions):
validators = map(construct_lambda, versions)
def real_kafka_versions(func):
+ @functools.wraps(func)
def wrapper(func, *args, **kwargs):
version = kafka_version()
@@ -56,7 +59,7 @@ def kafka_versions(*versions):
pytest.skip("unsupported kafka version")
return func(*args, **kwargs)
- return decorator.decorator(wrapper, func)
+ return wrapper
return real_kafka_versions