diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_consumer_integration.py | 134 | ||||
-rw-r--r-- | test/test_fetcher.py | 183 |
2 files changed, 313 insertions, 4 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 193a570..4b5e78a 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,16 +1,23 @@ import logging import os +import time from six.moves import xrange import six from . import unittest from kafka import ( - KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, create_gzip_message + KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, + create_gzip_message, KafkaProducer ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES -from kafka.errors import ConsumerFetchSizeTooSmall, OffsetOutOfRangeError -from kafka.structs import ProduceRequestPayload, TopicPartition +from kafka.errors import ( + ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError, + KafkaTimeoutError +) +from kafka.structs import ( + ProduceRequestPayload, TopicPartition, OffsetAndTimestamp +) from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( @@ -88,6 +95,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): **configs) return consumer + def kafka_producer(self, **configs): + brokers = '%s:%d' % (self.server.host, self.server.port) + producer = KafkaProducer( + bootstrap_servers=brokers, **configs) + return producer + def test_simple_consumer(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -624,3 +637,118 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): fetched_msgs = [next(consumer) for i in range(10)] self.assertEqual(len(fetched_msgs), 10) + + @kafka_versions('>=0.10.1') + def test_kafka_consumer_offsets_for_time(self): + late_time = int(time.time()) * 1000 + middle_time = late_time - 1000 + early_time = late_time - 2000 + tp = TopicPartition(self.topic, 0) + + kafka_producer = self.kafka_producer() + early_msg = kafka_producer.send( + self.topic, partition=0, value=b"first", + timestamp_ms=early_time).get() + late_msg = kafka_producer.send( + self.topic, partition=0, value=b"last", + timestamp_ms=late_time).get() + + consumer = self.kafka_consumer() + offsets = consumer.offsets_for_times({tp: early_time}) + self.assertEqual(len(offsets), 1) + self.assertEqual(offsets[tp].offset, early_msg.offset) + self.assertEqual(offsets[tp].timestamp, early_time) + + offsets = consumer.offsets_for_times({tp: middle_time}) + self.assertEqual(offsets[tp].offset, late_msg.offset) + self.assertEqual(offsets[tp].timestamp, late_time) + + offsets = consumer.offsets_for_times({tp: late_time}) + self.assertEqual(offsets[tp].offset, late_msg.offset) + self.assertEqual(offsets[tp].timestamp, late_time) + + offsets = consumer.offsets_for_times({}) + self.assertEqual(offsets, {}) + + # Out of bound timestamps check + + offsets = consumer.offsets_for_times({tp: 0}) + self.assertEqual(offsets[tp].offset, early_msg.offset) + self.assertEqual(offsets[tp].timestamp, early_time) + + offsets = consumer.offsets_for_times({tp: 9999999999999}) + self.assertEqual(offsets[tp], None) + + # Beginning/End offsets + + offsets = consumer.beginning_offsets([tp]) + self.assertEqual(offsets, { + tp: early_msg.offset, + }) + offsets = consumer.end_offsets([tp]) + self.assertEqual(offsets, { + tp: late_msg.offset + 1 + }) + + @kafka_versions('>=0.10.1') + def test_kafka_consumer_offsets_search_many_partitions(self): + tp0 = TopicPartition(self.topic, 0) + tp1 = TopicPartition(self.topic, 1) + + kafka_producer = self.kafka_producer() + send_time = int(time.time() * 1000) + p0msg = kafka_producer.send( + self.topic, partition=0, value=b"XXX", + timestamp_ms=send_time).get() + p1msg = kafka_producer.send( + self.topic, partition=1, value=b"XXX", + timestamp_ms=send_time).get() + + consumer = self.kafka_consumer() + offsets = consumer.offsets_for_times({ + tp0: send_time, + tp1: send_time + }) + + self.assertEqual(offsets, { + tp0: OffsetAndTimestamp(p0msg.offset, send_time), + tp1: OffsetAndTimestamp(p1msg.offset, send_time) + }) + + offsets = consumer.beginning_offsets([tp0, tp1]) + self.assertEqual(offsets, { + tp0: p0msg.offset, + tp1: p1msg.offset + }) + + offsets = consumer.end_offsets([tp0, tp1]) + self.assertEqual(offsets, { + tp0: p0msg.offset + 1, + tp1: p1msg.offset + 1 + }) + + @kafka_versions('<0.10.1') + def test_kafka_consumer_offsets_for_time_old(self): + consumer = self.kafka_consumer() + tp = TopicPartition(self.topic, 0) + + with self.assertRaises(UnsupportedVersionError): + consumer.offsets_for_times({tp: int(time.time())}) + + with self.assertRaises(UnsupportedVersionError): + consumer.beginning_offsets([tp]) + + with self.assertRaises(UnsupportedVersionError): + consumer.end_offsets([tp]) + + @kafka_versions('>=0.10.1') + def test_kafka_consumer_offsets_for_times_errors(self): + consumer = self.kafka_consumer() + tp = TopicPartition(self.topic, 0) + bad_tp = TopicPartition(self.topic, 100) + + with self.assertRaises(ValueError): + consumer.offsets_for_times({tp: -1}) + + with self.assertRaises(KafkaTimeoutError): + consumer.offsets_for_times({bad_tp: 0}) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index dcfba78..0562ec5 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -3,12 +3,21 @@ from __future__ import absolute_import import pytest +import itertools +from collections import OrderedDict + from kafka.client_async import KafkaClient -from kafka.consumer.fetcher import Fetcher +from kafka.consumer.fetcher import Fetcher, NoOffsetForPartitionError from kafka.consumer.subscription_state import SubscriptionState from kafka.metrics import Metrics from kafka.protocol.fetch import FetchRequest +from kafka.protocol.offset import OffsetResponse from kafka.structs import TopicPartition +from kafka.future import Future +from kafka.errors import ( + StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError, + UnknownTopicOrPartitionError +) @pytest.fixture @@ -101,3 +110,175 @@ def test_update_fetch_positions(fetcher, mocker): fetcher.update_fetch_positions([partition]) assert fetcher._reset_offset.call_count == 0 fetcher._subscriptions.seek.assert_called_with(partition, 123) + + +def test__reset_offset(fetcher, mocker): + tp = TopicPartition("topic", 0) + fetcher._subscriptions.subscribe(topics="topic") + fetcher._subscriptions.assign_from_subscribed([tp]) + fetcher._subscriptions.need_offset_reset(tp) + mocked = mocker.patch.object(fetcher, '_retrieve_offsets') + + mocked.return_value = {} + with pytest.raises(NoOffsetForPartitionError): + fetcher._reset_offset(tp) + + mocked.return_value = {tp: (1001, None)} + fetcher._reset_offset(tp) + assert not fetcher._subscriptions.assignment[tp].awaiting_reset + assert fetcher._subscriptions.assignment[tp].position == 1001 + + +def test__send_offset_requests(fetcher, mocker): + tp = TopicPartition("topic_send_offset", 1) + mocked_send = mocker.patch.object(fetcher, "_send_offset_request") + send_futures = [] + + def send_side_effect(*args, **kw): + f = Future() + send_futures.append(f) + return f + mocked_send.side_effect = send_side_effect + + mocked_leader = mocker.patch.object( + fetcher._client.cluster, "leader_for_partition") + # First we report unavailable leader 2 times different ways and later + # always as available + mocked_leader.side_effect = itertools.chain( + [None, -1], itertools.cycle([0])) + + # Leader == None + fut = fetcher._send_offset_requests({tp: 0}) + assert fut.failed() + assert isinstance(fut.exception, StaleMetadata) + assert not mocked_send.called + + # Leader == -1 + fut = fetcher._send_offset_requests({tp: 0}) + assert fut.failed() + assert isinstance(fut.exception, LeaderNotAvailableError) + assert not mocked_send.called + + # Leader == 0, send failed + fut = fetcher._send_offset_requests({tp: 0}) + assert not fut.is_done + assert mocked_send.called + # Check that we bound the futures correctly to chain failure + send_futures.pop().failure(NotLeaderForPartitionError(tp)) + assert fut.failed() + assert isinstance(fut.exception, NotLeaderForPartitionError) + + # Leader == 0, send success + fut = fetcher._send_offset_requests({tp: 0}) + assert not fut.is_done + assert mocked_send.called + # Check that we bound the futures correctly to chain success + send_futures.pop().success({tp: (10, 10000)}) + assert fut.succeeded() + assert fut.value == {tp: (10, 10000)} + + +def test__send_offset_requests_multiple_nodes(fetcher, mocker): + tp1 = TopicPartition("topic_send_offset", 1) + tp2 = TopicPartition("topic_send_offset", 2) + tp3 = TopicPartition("topic_send_offset", 3) + tp4 = TopicPartition("topic_send_offset", 4) + mocked_send = mocker.patch.object(fetcher, "_send_offset_request") + send_futures = [] + + def send_side_effect(node_id, timestamps): + f = Future() + send_futures.append((node_id, timestamps, f)) + return f + mocked_send.side_effect = send_side_effect + + mocked_leader = mocker.patch.object( + fetcher._client.cluster, "leader_for_partition") + mocked_leader.side_effect = itertools.cycle([0, 1]) + + # -- All node succeeded case + tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)]) + fut = fetcher._send_offset_requests(tss) + assert not fut.is_done + assert mocked_send.call_count == 2 + + req_by_node = {} + second_future = None + for node, timestamps, f in send_futures: + req_by_node[node] = timestamps + if node == 0: + # Say tp3 does not have any messages so it's missing + f.success({tp1: (11, 1001)}) + else: + second_future = f + assert req_by_node == { + 0: {tp1: 0, tp3: 0}, + 1: {tp2: 0, tp4: 0} + } + + # We only resolved 1 future so far, so result future is not yet ready + assert not fut.is_done + second_future.success({tp2: (12, 1002), tp4: (14, 1004)}) + assert fut.succeeded() + assert fut.value == {tp1: (11, 1001), tp2: (12, 1002), tp4: (14, 1004)} + + # -- First succeeded second not + del send_futures[:] + fut = fetcher._send_offset_requests(tss) + assert len(send_futures) == 2 + send_futures[0][2].success({tp1: (11, 1001)}) + send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1)) + assert fut.failed() + assert isinstance(fut.exception, UnknownTopicOrPartitionError) + + # -- First fails second succeeded + del send_futures[:] + fut = fetcher._send_offset_requests(tss) + assert len(send_futures) == 2 + send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1)) + send_futures[1][2].success({tp1: (11, 1001)}) + assert fut.failed() + assert isinstance(fut.exception, UnknownTopicOrPartitionError) + + +def test__handle_offset_response(fetcher, mocker): + # Broker returns UnsupportedForMessageFormatError, will omit partition + fut = Future() + res = OffsetResponse[1]([ + ("topic", [(0, 43, -1, -1)]), + ("topic", [(1, 0, 1000, 9999)]) + ]) + fetcher._handle_offset_response(fut, res) + assert fut.succeeded() + assert fut.value == {TopicPartition("topic", 1): (9999, 1000)} + + # Broker returns NotLeaderForPartitionError + fut = Future() + res = OffsetResponse[1]([ + ("topic", [(0, 6, -1, -1)]), + ]) + fetcher._handle_offset_response(fut, res) + assert fut.failed() + assert isinstance(fut.exception, NotLeaderForPartitionError) + + # Broker returns UnknownTopicOrPartitionError + fut = Future() + res = OffsetResponse[1]([ + ("topic", [(0, 3, -1, -1)]), + ]) + fetcher._handle_offset_response(fut, res) + assert fut.failed() + assert isinstance(fut.exception, UnknownTopicOrPartitionError) + + # Broker returns many errors and 1 result + # Will fail on 1st error and return + fut = Future() + res = OffsetResponse[1]([ + ("topic", [(0, 43, -1, -1)]), + ("topic", [(1, 6, -1, -1)]), + ("topic", [(2, 3, -1, -1)]), + ("topic", [(3, 0, 1000, 9999)]) + ]) + fetcher._handle_offset_response(fut, res) + assert fut.failed() + assert isinstance(fut.exception, NotLeaderForPartitionError) |