summaryrefslogtreecommitdiff
path: root/test/test_fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_fetcher.py')
-rw-r--r--test/test_fetcher.py316
1 files changed, 300 insertions, 16 deletions
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index 86d154f..5da597c 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -3,20 +3,26 @@ from __future__ import absolute_import
import pytest
-import itertools
from collections import OrderedDict
+import itertools
+import time
from kafka.client_async import KafkaClient
-from kafka.consumer.fetcher import ConsumerRecord, Fetcher, NoOffsetForPartitionError
+from kafka.codec import gzip_encode
+from kafka.consumer.fetcher import (
+ CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError
+)
from kafka.consumer.subscription_state import SubscriptionState
from kafka.metrics import Metrics
-from kafka.protocol.fetch import FetchRequest
+from kafka.protocol.fetch import FetchRequest, FetchResponse
+from kafka.protocol.message import Message
from kafka.protocol.offset import OffsetResponse
+from kafka.protocol.types import Int64, Int32
from kafka.structs import TopicPartition
from kafka.future import Future
from kafka.errors import (
StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
- UnknownTopicOrPartitionError
+ UnknownTopicOrPartitionError, OffsetOutOfRangeError
)
@@ -31,28 +37,33 @@ def subscription_state():
@pytest.fixture
-def fetcher(client, subscription_state):
- subscription_state.subscribe(topics=['foobar'])
- assignment = [TopicPartition('foobar', i) for i in range(3)]
+def topic():
+ return 'foobar'
+
+
+@pytest.fixture
+def fetcher(client, subscription_state, topic):
+ subscription_state.subscribe(topics=[topic])
+ assignment = [TopicPartition(topic, i) for i in range(3)]
subscription_state.assign_from_subscribed(assignment)
for tp in assignment:
subscription_state.seek(tp, 0)
return Fetcher(client, subscription_state, Metrics())
-def test_send_fetches(fetcher, mocker):
+def test_send_fetches(fetcher, topic, mocker):
fetch_requests = [
FetchRequest[0](
-1, fetcher.config['fetch_max_wait_ms'],
fetcher.config['fetch_min_bytes'],
- [('foobar', [
+ [(topic, [
(0, 0, fetcher.config['max_partition_fetch_bytes']),
(1, 0, fetcher.config['max_partition_fetch_bytes']),
])]),
FetchRequest[0](
-1, fetcher.config['fetch_max_wait_ms'],
fetcher.config['fetch_min_bytes'],
- [('foobar', [
+ [(topic, [
(2, 0, fetcher.config['max_partition_fetch_bytes']),
])])
]
@@ -80,9 +91,9 @@ def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version):
assert all([isinstance(r, FetchRequest[fetch_version]) for r in requests])
-def test_update_fetch_positions(fetcher, mocker):
+def test_update_fetch_positions(fetcher, topic, mocker):
mocker.patch.object(fetcher, '_reset_offset')
- partition = TopicPartition('foobar', 0)
+ partition = TopicPartition(topic, 0)
# unassigned partition
fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)])
@@ -285,7 +296,7 @@ def test__handle_offset_response(fetcher, mocker):
def test_partition_records_offset():
- """Test that compressed messagesets are handle correctly
+ """Test that compressed messagesets are handled correctly
when fetch offset is in the middle of the message list
"""
batch_start = 120
@@ -296,12 +307,285 @@ def test_partition_records_offset():
None, None, 'key', 'value', 'checksum', 0, 0)
for i in range(batch_start, batch_end)]
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
- assert records.has_more()
+ assert len(records) > 0
msgs = records.take(1)
assert msgs[0].offset == 123
assert records.fetch_offset == 124
msgs = records.take(2)
assert len(msgs) == 2
- assert records.has_more()
+ assert len(records) > 0
records.discard()
- assert not records.has_more()
+ assert len(records) == 0
+
+
+def test_fetched_records(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ fetcher._completed_fetches.append(completed_fetch)
+ records, partial = fetcher.fetched_records()
+ assert tp in records
+ assert len(records[tp]) == len(msgs)
+ assert all(map(lambda x: isinstance(x, ConsumerRecord), records[tp]))
+ assert partial is False
+
+
+@pytest.mark.parametrize(("fetch_request", "fetch_response", "num_partitions"), [
+ (
+ FetchRequest[0](
+ -1, 100, 100,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[0](
+ [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]),
+ 1,
+ ),
+ (
+ FetchRequest[1](
+ -1, 100, 100,
+ [('foo', [(0, 0, 1000), (1, 0, 1000),])]),
+ FetchResponse[1](
+ 0,
+ [("foo", [
+ (0, 0, 1000, [(0, b'xxx'),]),
+ (1, 0, 1000, [(0, b'xxx'),]),
+ ]),]),
+ 2,
+ ),
+ (
+ FetchRequest[2](
+ -1, 100, 100,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[2](
+ 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]),
+ 1,
+ ),
+ (
+ FetchRequest[3](
+ -1, 100, 100, 10000,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[3](
+ 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]),
+ 1,
+ ),
+ (
+ FetchRequest[4](
+ -1, 100, 100, 10000, 0,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[4](
+ 0, [("foo", [(0, 0, 1000, 0, [], [(0, b'xxx'),])]),]),
+ 1,
+ ),
+ (
+ # This may only be used in broker-broker api calls
+ FetchRequest[5](
+ -1, 100, 100, 10000, 0,
+ [('foo', [(0, 0, 1000),])]),
+ FetchResponse[5](
+ 0, [("foo", [(0, 0, 1000, 0, 0, [], [(0, b'xxx'),])]),]),
+ 1,
+ ),
+])
+def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_partitions):
+ fetcher._handle_fetch_response(fetch_request, time.time(), fetch_response)
+ assert len(fetcher._completed_fetches) == num_partitions
+
+
+def test__unpack_message_set(fetcher):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition('foo', 0)
+ messages = [
+ (0, None, Message(b'a')),
+ (1, None, Message(b'b')),
+ (2, None, Message(b'c'))
+ ]
+ records = list(fetcher._unpack_message_set(tp, messages))
+ assert len(records) == 3
+ assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
+ assert records[0].value == b'a'
+ assert records[1].value == b'b'
+ assert records[2].value == b'c'
+ assert records[0].offset == 0
+ assert records[1].offset == 1
+ assert records[2].offset == 2
+
+
+def test__unpack_message_set_compressed_v0(fetcher):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition('foo', 0)
+ messages = [
+ (0, None, Message(b'a')),
+ (1, None, Message(b'b')),
+ (2, None, Message(b'c')),
+ ]
+ message_bytes = []
+ for offset, _, m in messages:
+ encoded = m.encode()
+ message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded)
+ compressed_bytes = gzip_encode(b''.join(message_bytes))
+ compressed_base_offset = 0
+ compressed_msgs = [
+ (compressed_base_offset, None,
+ Message(compressed_bytes,
+ magic=0,
+ attributes=Message.CODEC_GZIP))
+ ]
+ records = list(fetcher._unpack_message_set(tp, compressed_msgs))
+ assert len(records) == 3
+ assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
+ assert records[0].value == b'a'
+ assert records[1].value == b'b'
+ assert records[2].value == b'c'
+ assert records[0].offset == 0
+ assert records[1].offset == 1
+ assert records[2].offset == 2
+
+
+def test__unpack_message_set_compressed_v1(fetcher):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition('foo', 0)
+ messages = [
+ (0, None, Message(b'a')),
+ (1, None, Message(b'b')),
+ (2, None, Message(b'c')),
+ ]
+ message_bytes = []
+ for offset, _, m in messages:
+ encoded = m.encode()
+ message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded)
+ compressed_bytes = gzip_encode(b''.join(message_bytes))
+ compressed_base_offset = 10
+ compressed_msgs = [
+ (compressed_base_offset, None,
+ Message(compressed_bytes,
+ magic=1,
+ attributes=Message.CODEC_GZIP))
+ ]
+ records = list(fetcher._unpack_message_set(tp, compressed_msgs))
+ assert len(records) == 3
+ assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
+ assert records[0].value == b'a'
+ assert records[1].value == b'b'
+ assert records[2].value == b'c'
+ assert records[0].offset == 8
+ assert records[1].offset == 9
+ assert records[2].offset == 10
+
+
+def test__parse_record(fetcher):
+ tp = TopicPartition('foo', 0)
+ record = fetcher._parse_record(tp, 123, 456, Message(b'abc'))
+ assert record.topic == 'foo'
+ assert record.partition == 0
+ assert record.offset == 123
+ assert record.timestamp == 456
+ assert record.value == b'abc'
+ assert record.key is None
+
+
+def test__message_generator(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ fetcher._completed_fetches.append(completed_fetch)
+ for i in range(10):
+ msg = next(fetcher)
+ assert isinstance(msg, ConsumerRecord)
+ assert msg.offset == i
+ assert msg.value == b'foo'
+
+
+def test__parse_fetched_data(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert isinstance(partition_record, fetcher.PartitionRecords)
+ assert len(partition_record) == 10
+
+
+def test__parse_fetched_data__paused(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ fetcher._subscriptions.pause(tp)
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+
+
+def test__parse_fetched_data__stale_offset(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ msgs = []
+ for i in range(10):
+ msg = Message(b'foo')
+ msgs.append((i, -1, msg))
+ completed_fetch = CompletedFetch(
+ tp, 10, 0, [0, 100, msgs],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+
+
+def test__parse_fetched_data__not_leader(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [NotLeaderForPartitionError.errno, -1, None],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+ fetcher._client.cluster.request_update.assert_called_with()
+
+
+def test__parse_fetched_data__unknown_tp(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [UnknownTopicOrPartitionError.errno, -1, None],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+ fetcher._client.cluster.request_update.assert_called_with()
+
+
+def test__parse_fetched_data__out_of_range(fetcher, topic, mocker):
+ fetcher.config['check_crcs'] = False
+ tp = TopicPartition(topic, 0)
+ completed_fetch = CompletedFetch(
+ tp, 0, 0, [OffsetOutOfRangeError.errno, -1, None],
+ mocker.MagicMock()
+ )
+ partition_record = fetcher._parse_fetched_data(completed_fetch)
+ assert partition_record is None
+ assert fetcher._subscriptions.assignment[tp].awaiting_reset is True