diff options
-rw-r--r-- | kafka/consumer/fetcher.py | 21 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 2 | ||||
-rw-r--r-- | test/test_fetcher.py | 183 |
3 files changed, 199 insertions, 7 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 86bcd08..a22b18e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -227,7 +227,8 @@ class Fetcher(six.Iterator): log.debug("Resetting offset for partition %s to %s offset.", partition, strategy) offsets = self._retrieve_offsets({partition: timestamp}) - assert partition in offsets + if partition not in offsets: + raise NoOffsetForPartitionError(partition) offset = offsets[partition][0] # we might lose the assignment while fetching the offset, @@ -660,10 +661,14 @@ class Fetcher(six.Iterator): offsets.update(r) list_offsets_future.success(offsets) + def on_fail(err): + if not list_offsets_future.is_done: + list_offsets_future.failure(err) + for node_id, timestamps in six.iteritems(timestamps_by_node): _f = self._send_offset_request(node_id, timestamps) _f.add_callback(on_success) - _f.add_errback(lambda e: list_offsets_future.failure(e)) + _f.add_errback(on_fail) return list_offsets_future def _send_offset_request(self, node_id, timestamps): @@ -710,10 +715,13 @@ class Fetcher(six.Iterator): if response.API_VERSION == 0: offsets = partition_info[2] assert len(offsets) <= 1, 'Expected OffsetResponse with one offset' - if offsets: + if not offsets: + offset = UNKNOWN_OFFSET + else: offset = offsets[0] - log.debug("Handling v0 ListOffsetResponse response for %s. " - "Fetched offset %s", partition, offset) + log.debug("Handling v0 ListOffsetResponse response for %s. " + "Fetched offset %s", partition, offset) + if offset != UNKNOWN_OFFSET: timestamp_offset_map[partition] = (offset, None) else: timestamp, offset = partition_info[2:] @@ -732,16 +740,19 @@ class Fetcher(six.Iterator): " to obsolete leadership information, retrying.", partition) future.failure(error_type(partition)) + return elif error_type is Errors.UnknownTopicOrPartitionError: log.warn("Received unknown topic or partition error in ListOffset " "request for partition %s. The topic/partition " + "may not exist or the user may not have Describe access " "to it.", partition) future.failure(error_type(partition)) + return else: log.warning("Attempt to fetch offsets for partition %s failed due to:" " %s", partition, error_type) future.failure(error_type(partition)) + return if not future.is_done: future.success(timestamp_offset_map) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 803b16a..4b5e78a 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -741,7 +741,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(UnsupportedVersionError): consumer.end_offsets([tp]) - @kafka_versions('<0.10.1') + @kafka_versions('>=0.10.1') def test_kafka_consumer_offsets_for_times_errors(self): consumer = self.kafka_consumer() tp = TopicPartition(self.topic, 0) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index dcfba78..0562ec5 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -3,12 +3,21 @@ from __future__ import absolute_import import pytest +import itertools +from collections import OrderedDict + from kafka.client_async import KafkaClient -from kafka.consumer.fetcher import Fetcher +from kafka.consumer.fetcher import Fetcher, NoOffsetForPartitionError from kafka.consumer.subscription_state import SubscriptionState from kafka.metrics import Metrics from kafka.protocol.fetch import FetchRequest +from kafka.protocol.offset import OffsetResponse from kafka.structs import TopicPartition +from kafka.future import Future +from kafka.errors import ( + StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError, + UnknownTopicOrPartitionError +) @pytest.fixture @@ -101,3 +110,175 @@ def test_update_fetch_positions(fetcher, mocker): fetcher.update_fetch_positions([partition]) assert fetcher._reset_offset.call_count == 0 fetcher._subscriptions.seek.assert_called_with(partition, 123) + + +def test__reset_offset(fetcher, mocker): + tp = TopicPartition("topic", 0) + fetcher._subscriptions.subscribe(topics="topic") + fetcher._subscriptions.assign_from_subscribed([tp]) + fetcher._subscriptions.need_offset_reset(tp) + mocked = mocker.patch.object(fetcher, '_retrieve_offsets') + + mocked.return_value = {} + with pytest.raises(NoOffsetForPartitionError): + fetcher._reset_offset(tp) + + mocked.return_value = {tp: (1001, None)} + fetcher._reset_offset(tp) + assert not fetcher._subscriptions.assignment[tp].awaiting_reset + assert fetcher._subscriptions.assignment[tp].position == 1001 + + +def test__send_offset_requests(fetcher, mocker): + tp = TopicPartition("topic_send_offset", 1) + mocked_send = mocker.patch.object(fetcher, "_send_offset_request") + send_futures = [] + + def send_side_effect(*args, **kw): + f = Future() + send_futures.append(f) + return f + mocked_send.side_effect = send_side_effect + + mocked_leader = mocker.patch.object( + fetcher._client.cluster, "leader_for_partition") + # First we report unavailable leader 2 times different ways and later + # always as available + mocked_leader.side_effect = itertools.chain( + [None, -1], itertools.cycle([0])) + + # Leader == None + fut = fetcher._send_offset_requests({tp: 0}) + assert fut.failed() + assert isinstance(fut.exception, StaleMetadata) + assert not mocked_send.called + + # Leader == -1 + fut = fetcher._send_offset_requests({tp: 0}) + assert fut.failed() + assert isinstance(fut.exception, LeaderNotAvailableError) + assert not mocked_send.called + + # Leader == 0, send failed + fut = fetcher._send_offset_requests({tp: 0}) + assert not fut.is_done + assert mocked_send.called + # Check that we bound the futures correctly to chain failure + send_futures.pop().failure(NotLeaderForPartitionError(tp)) + assert fut.failed() + assert isinstance(fut.exception, NotLeaderForPartitionError) + + # Leader == 0, send success + fut = fetcher._send_offset_requests({tp: 0}) + assert not fut.is_done + assert mocked_send.called + # Check that we bound the futures correctly to chain success + send_futures.pop().success({tp: (10, 10000)}) + assert fut.succeeded() + assert fut.value == {tp: (10, 10000)} + + +def test__send_offset_requests_multiple_nodes(fetcher, mocker): + tp1 = TopicPartition("topic_send_offset", 1) + tp2 = TopicPartition("topic_send_offset", 2) + tp3 = TopicPartition("topic_send_offset", 3) + tp4 = TopicPartition("topic_send_offset", 4) + mocked_send = mocker.patch.object(fetcher, "_send_offset_request") + send_futures = [] + + def send_side_effect(node_id, timestamps): + f = Future() + send_futures.append((node_id, timestamps, f)) + return f + mocked_send.side_effect = send_side_effect + + mocked_leader = mocker.patch.object( + fetcher._client.cluster, "leader_for_partition") + mocked_leader.side_effect = itertools.cycle([0, 1]) + + # -- All node succeeded case + tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)]) + fut = fetcher._send_offset_requests(tss) + assert not fut.is_done + assert mocked_send.call_count == 2 + + req_by_node = {} + second_future = None + for node, timestamps, f in send_futures: + req_by_node[node] = timestamps + if node == 0: + # Say tp3 does not have any messages so it's missing + f.success({tp1: (11, 1001)}) + else: + second_future = f + assert req_by_node == { + 0: {tp1: 0, tp3: 0}, + 1: {tp2: 0, tp4: 0} + } + + # We only resolved 1 future so far, so result future is not yet ready + assert not fut.is_done + second_future.success({tp2: (12, 1002), tp4: (14, 1004)}) + assert fut.succeeded() + assert fut.value == {tp1: (11, 1001), tp2: (12, 1002), tp4: (14, 1004)} + + # -- First succeeded second not + del send_futures[:] + fut = fetcher._send_offset_requests(tss) + assert len(send_futures) == 2 + send_futures[0][2].success({tp1: (11, 1001)}) + send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1)) + assert fut.failed() + assert isinstance(fut.exception, UnknownTopicOrPartitionError) + + # -- First fails second succeeded + del send_futures[:] + fut = fetcher._send_offset_requests(tss) + assert len(send_futures) == 2 + send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1)) + send_futures[1][2].success({tp1: (11, 1001)}) + assert fut.failed() + assert isinstance(fut.exception, UnknownTopicOrPartitionError) + + +def test__handle_offset_response(fetcher, mocker): + # Broker returns UnsupportedForMessageFormatError, will omit partition + fut = Future() + res = OffsetResponse[1]([ + ("topic", [(0, 43, -1, -1)]), + ("topic", [(1, 0, 1000, 9999)]) + ]) + fetcher._handle_offset_response(fut, res) + assert fut.succeeded() + assert fut.value == {TopicPartition("topic", 1): (9999, 1000)} + + # Broker returns NotLeaderForPartitionError + fut = Future() + res = OffsetResponse[1]([ + ("topic", [(0, 6, -1, -1)]), + ]) + fetcher._handle_offset_response(fut, res) + assert fut.failed() + assert isinstance(fut.exception, NotLeaderForPartitionError) + + # Broker returns UnknownTopicOrPartitionError + fut = Future() + res = OffsetResponse[1]([ + ("topic", [(0, 3, -1, -1)]), + ]) + fetcher._handle_offset_response(fut, res) + assert fut.failed() + assert isinstance(fut.exception, UnknownTopicOrPartitionError) + + # Broker returns many errors and 1 result + # Will fail on 1st error and return + fut = Future() + res = OffsetResponse[1]([ + ("topic", [(0, 43, -1, -1)]), + ("topic", [(1, 6, -1, -1)]), + ("topic", [(2, 3, -1, -1)]), + ("topic", [(3, 0, 1000, 9999)]) + ]) + fetcher._handle_offset_response(fut, res) + assert fut.failed() + assert isinstance(fut.exception, NotLeaderForPartitionError) |