diff options
Diffstat (limited to 'test/test_fetcher.py')
-rw-r--r-- | test/test_fetcher.py | 316 |
1 files changed, 300 insertions, 16 deletions
diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 86d154f..5da597c 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -3,20 +3,26 @@ from __future__ import absolute_import import pytest -import itertools from collections import OrderedDict +import itertools +import time from kafka.client_async import KafkaClient -from kafka.consumer.fetcher import ConsumerRecord, Fetcher, NoOffsetForPartitionError +from kafka.codec import gzip_encode +from kafka.consumer.fetcher import ( + CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError +) from kafka.consumer.subscription_state import SubscriptionState from kafka.metrics import Metrics -from kafka.protocol.fetch import FetchRequest +from kafka.protocol.fetch import FetchRequest, FetchResponse +from kafka.protocol.message import Message from kafka.protocol.offset import OffsetResponse +from kafka.protocol.types import Int64, Int32 from kafka.structs import TopicPartition from kafka.future import Future from kafka.errors import ( StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError, - UnknownTopicOrPartitionError + UnknownTopicOrPartitionError, OffsetOutOfRangeError ) @@ -31,28 +37,33 @@ def subscription_state(): @pytest.fixture -def fetcher(client, subscription_state): - subscription_state.subscribe(topics=['foobar']) - assignment = [TopicPartition('foobar', i) for i in range(3)] +def topic(): + return 'foobar' + + +@pytest.fixture +def fetcher(client, subscription_state, topic): + subscription_state.subscribe(topics=[topic]) + assignment = [TopicPartition(topic, i) for i in range(3)] subscription_state.assign_from_subscribed(assignment) for tp in assignment: subscription_state.seek(tp, 0) return Fetcher(client, subscription_state, Metrics()) -def test_send_fetches(fetcher, mocker): +def test_send_fetches(fetcher, topic, mocker): fetch_requests = [ FetchRequest[0]( -1, fetcher.config['fetch_max_wait_ms'], fetcher.config['fetch_min_bytes'], - [('foobar', [ + [(topic, [ (0, 0, fetcher.config['max_partition_fetch_bytes']), (1, 0, fetcher.config['max_partition_fetch_bytes']), ])]), FetchRequest[0]( -1, fetcher.config['fetch_max_wait_ms'], fetcher.config['fetch_min_bytes'], - [('foobar', [ + [(topic, [ (2, 0, fetcher.config['max_partition_fetch_bytes']), ])]) ] @@ -80,9 +91,9 @@ def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): assert all([isinstance(r, FetchRequest[fetch_version]) for r in requests]) -def test_update_fetch_positions(fetcher, mocker): +def test_update_fetch_positions(fetcher, topic, mocker): mocker.patch.object(fetcher, '_reset_offset') - partition = TopicPartition('foobar', 0) + partition = TopicPartition(topic, 0) # unassigned partition fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)]) @@ -285,7 +296,7 @@ def test__handle_offset_response(fetcher, mocker): def test_partition_records_offset(): - """Test that compressed messagesets are handle correctly + """Test that compressed messagesets are handled correctly when fetch offset is in the middle of the message list """ batch_start = 120 @@ -296,12 +307,285 @@ def test_partition_records_offset(): None, None, 'key', 'value', 'checksum', 0, 0) for i in range(batch_start, batch_end)] records = Fetcher.PartitionRecords(fetch_offset, None, messages) - assert records.has_more() + assert len(records) > 0 msgs = records.take(1) assert msgs[0].offset == 123 assert records.fetch_offset == 124 msgs = records.take(2) assert len(msgs) == 2 - assert records.has_more() + assert len(records) > 0 records.discard() - assert not records.has_more() + assert len(records) == 0 + + +def test_fetched_records(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 0, 0, [0, 100, msgs], + mocker.MagicMock() + ) + fetcher._completed_fetches.append(completed_fetch) + records, partial = fetcher.fetched_records() + assert tp in records + assert len(records[tp]) == len(msgs) + assert all(map(lambda x: isinstance(x, ConsumerRecord), records[tp])) + assert partial is False + + +@pytest.mark.parametrize(("fetch_request", "fetch_response", "num_partitions"), [ + ( + FetchRequest[0]( + -1, 100, 100, + [('foo', [(0, 0, 1000),])]), + FetchResponse[0]( + [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), + 1, + ), + ( + FetchRequest[1]( + -1, 100, 100, + [('foo', [(0, 0, 1000), (1, 0, 1000),])]), + FetchResponse[1]( + 0, + [("foo", [ + (0, 0, 1000, [(0, b'xxx'),]), + (1, 0, 1000, [(0, b'xxx'),]), + ]),]), + 2, + ), + ( + FetchRequest[2]( + -1, 100, 100, + [('foo', [(0, 0, 1000),])]), + FetchResponse[2]( + 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), + 1, + ), + ( + FetchRequest[3]( + -1, 100, 100, 10000, + [('foo', [(0, 0, 1000),])]), + FetchResponse[3]( + 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), + 1, + ), + ( + FetchRequest[4]( + -1, 100, 100, 10000, 0, + [('foo', [(0, 0, 1000),])]), + FetchResponse[4]( + 0, [("foo", [(0, 0, 1000, 0, [], [(0, b'xxx'),])]),]), + 1, + ), + ( + # This may only be used in broker-broker api calls + FetchRequest[5]( + -1, 100, 100, 10000, 0, + [('foo', [(0, 0, 1000),])]), + FetchResponse[5]( + 0, [("foo", [(0, 0, 1000, 0, 0, [], [(0, b'xxx'),])]),]), + 1, + ), +]) +def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_partitions): + fetcher._handle_fetch_response(fetch_request, time.time(), fetch_response) + assert len(fetcher._completed_fetches) == num_partitions + + +def test__unpack_message_set(fetcher): + fetcher.config['check_crcs'] = False + tp = TopicPartition('foo', 0) + messages = [ + (0, None, Message(b'a')), + (1, None, Message(b'b')), + (2, None, Message(b'c')) + ] + records = list(fetcher._unpack_message_set(tp, messages)) + assert len(records) == 3 + assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) + assert records[0].value == b'a' + assert records[1].value == b'b' + assert records[2].value == b'c' + assert records[0].offset == 0 + assert records[1].offset == 1 + assert records[2].offset == 2 + + +def test__unpack_message_set_compressed_v0(fetcher): + fetcher.config['check_crcs'] = False + tp = TopicPartition('foo', 0) + messages = [ + (0, None, Message(b'a')), + (1, None, Message(b'b')), + (2, None, Message(b'c')), + ] + message_bytes = [] + for offset, _, m in messages: + encoded = m.encode() + message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded) + compressed_bytes = gzip_encode(b''.join(message_bytes)) + compressed_base_offset = 0 + compressed_msgs = [ + (compressed_base_offset, None, + Message(compressed_bytes, + magic=0, + attributes=Message.CODEC_GZIP)) + ] + records = list(fetcher._unpack_message_set(tp, compressed_msgs)) + assert len(records) == 3 + assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) + assert records[0].value == b'a' + assert records[1].value == b'b' + assert records[2].value == b'c' + assert records[0].offset == 0 + assert records[1].offset == 1 + assert records[2].offset == 2 + + +def test__unpack_message_set_compressed_v1(fetcher): + fetcher.config['check_crcs'] = False + tp = TopicPartition('foo', 0) + messages = [ + (0, None, Message(b'a')), + (1, None, Message(b'b')), + (2, None, Message(b'c')), + ] + message_bytes = [] + for offset, _, m in messages: + encoded = m.encode() + message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded) + compressed_bytes = gzip_encode(b''.join(message_bytes)) + compressed_base_offset = 10 + compressed_msgs = [ + (compressed_base_offset, None, + Message(compressed_bytes, + magic=1, + attributes=Message.CODEC_GZIP)) + ] + records = list(fetcher._unpack_message_set(tp, compressed_msgs)) + assert len(records) == 3 + assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) + assert records[0].value == b'a' + assert records[1].value == b'b' + assert records[2].value == b'c' + assert records[0].offset == 8 + assert records[1].offset == 9 + assert records[2].offset == 10 + + +def test__parse_record(fetcher): + tp = TopicPartition('foo', 0) + record = fetcher._parse_record(tp, 123, 456, Message(b'abc')) + assert record.topic == 'foo' + assert record.partition == 0 + assert record.offset == 123 + assert record.timestamp == 456 + assert record.value == b'abc' + assert record.key is None + + +def test__message_generator(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 0, 0, [0, 100, msgs], + mocker.MagicMock() + ) + fetcher._completed_fetches.append(completed_fetch) + for i in range(10): + msg = next(fetcher) + assert isinstance(msg, ConsumerRecord) + assert msg.offset == i + assert msg.value == b'foo' + + +def test__parse_fetched_data(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 0, 0, [0, 100, msgs], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert isinstance(partition_record, fetcher.PartitionRecords) + assert len(partition_record) == 10 + + +def test__parse_fetched_data__paused(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 0, 0, [0, 100, msgs], + mocker.MagicMock() + ) + fetcher._subscriptions.pause(tp) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + + +def test__parse_fetched_data__stale_offset(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + msgs = [] + for i in range(10): + msg = Message(b'foo') + msgs.append((i, -1, msg)) + completed_fetch = CompletedFetch( + tp, 10, 0, [0, 100, msgs], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + + +def test__parse_fetched_data__not_leader(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + completed_fetch = CompletedFetch( + tp, 0, 0, [NotLeaderForPartitionError.errno, -1, None], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + fetcher._client.cluster.request_update.assert_called_with() + + +def test__parse_fetched_data__unknown_tp(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + completed_fetch = CompletedFetch( + tp, 0, 0, [UnknownTopicOrPartitionError.errno, -1, None], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + fetcher._client.cluster.request_update.assert_called_with() + + +def test__parse_fetched_data__out_of_range(fetcher, topic, mocker): + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + completed_fetch = CompletedFetch( + tp, 0, 0, [OffsetOutOfRangeError.errno, -1, None], + mocker.MagicMock() + ) + partition_record = fetcher._parse_fetched_data(completed_fetch) + assert partition_record is None + assert fetcher._subscriptions.assignment[tp].awaiting_reset is True |