summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras Voinarovskiy <voyn1991@gmail.com>2017-08-06 10:50:16 +0000
committerTaras Voinarovskiy <voyn1991@gmail.com>2017-08-06 10:50:16 +0000
commit61668aa89167c330ca6f0e4ba8bcc5f3944d1824 (patch)
tree26e07fec173bcf77247660f8f6242f80e4c4cc83
parent414e286d0367bbfffc454a92deb4e0f3da7cb81c (diff)
downloadkafka-python-issue1036_offset_by_time.tar.gz
Added unit tests for fetcher's `_reset_offset` and related functions.issue1036_offset_by_time
-rw-r--r--kafka/consumer/fetcher.py21
-rw-r--r--test/test_consumer_integration.py2
-rw-r--r--test/test_fetcher.py183
3 files changed, 199 insertions, 7 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 86bcd08..a22b18e 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -227,7 +227,8 @@ class Fetcher(six.Iterator):
log.debug("Resetting offset for partition %s to %s offset.",
partition, strategy)
offsets = self._retrieve_offsets({partition: timestamp})
- assert partition in offsets
+ if partition not in offsets:
+ raise NoOffsetForPartitionError(partition)
offset = offsets[partition][0]
# we might lose the assignment while fetching the offset,
@@ -660,10 +661,14 @@ class Fetcher(six.Iterator):
offsets.update(r)
list_offsets_future.success(offsets)
+ def on_fail(err):
+ if not list_offsets_future.is_done:
+ list_offsets_future.failure(err)
+
for node_id, timestamps in six.iteritems(timestamps_by_node):
_f = self._send_offset_request(node_id, timestamps)
_f.add_callback(on_success)
- _f.add_errback(lambda e: list_offsets_future.failure(e))
+ _f.add_errback(on_fail)
return list_offsets_future
def _send_offset_request(self, node_id, timestamps):
@@ -710,10 +715,13 @@ class Fetcher(six.Iterator):
if response.API_VERSION == 0:
offsets = partition_info[2]
assert len(offsets) <= 1, 'Expected OffsetResponse with one offset'
- if offsets:
+ if not offsets:
+ offset = UNKNOWN_OFFSET
+ else:
offset = offsets[0]
- log.debug("Handling v0 ListOffsetResponse response for %s. "
- "Fetched offset %s", partition, offset)
+ log.debug("Handling v0 ListOffsetResponse response for %s. "
+ "Fetched offset %s", partition, offset)
+ if offset != UNKNOWN_OFFSET:
timestamp_offset_map[partition] = (offset, None)
else:
timestamp, offset = partition_info[2:]
@@ -732,16 +740,19 @@ class Fetcher(six.Iterator):
" to obsolete leadership information, retrying.",
partition)
future.failure(error_type(partition))
+ return
elif error_type is Errors.UnknownTopicOrPartitionError:
log.warn("Received unknown topic or partition error in ListOffset "
"request for partition %s. The topic/partition " +
"may not exist or the user may not have Describe access "
"to it.", partition)
future.failure(error_type(partition))
+ return
else:
log.warning("Attempt to fetch offsets for partition %s failed due to:"
" %s", partition, error_type)
future.failure(error_type(partition))
+ return
if not future.is_done:
future.success(timestamp_offset_map)
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 803b16a..4b5e78a 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -741,7 +741,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
with self.assertRaises(UnsupportedVersionError):
consumer.end_offsets([tp])
- @kafka_versions('<0.10.1')
+ @kafka_versions('>=0.10.1')
def test_kafka_consumer_offsets_for_times_errors(self):
consumer = self.kafka_consumer()
tp = TopicPartition(self.topic, 0)
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index dcfba78..0562ec5 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -3,12 +3,21 @@ from __future__ import absolute_import
import pytest
+import itertools
+from collections import OrderedDict
+
from kafka.client_async import KafkaClient
-from kafka.consumer.fetcher import Fetcher
+from kafka.consumer.fetcher import Fetcher, NoOffsetForPartitionError
from kafka.consumer.subscription_state import SubscriptionState
from kafka.metrics import Metrics
from kafka.protocol.fetch import FetchRequest
+from kafka.protocol.offset import OffsetResponse
from kafka.structs import TopicPartition
+from kafka.future import Future
+from kafka.errors import (
+ StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
+ UnknownTopicOrPartitionError
+)
@pytest.fixture
@@ -101,3 +110,175 @@ def test_update_fetch_positions(fetcher, mocker):
fetcher.update_fetch_positions([partition])
assert fetcher._reset_offset.call_count == 0
fetcher._subscriptions.seek.assert_called_with(partition, 123)
+
+
+def test__reset_offset(fetcher, mocker):
+ tp = TopicPartition("topic", 0)
+ fetcher._subscriptions.subscribe(topics="topic")
+ fetcher._subscriptions.assign_from_subscribed([tp])
+ fetcher._subscriptions.need_offset_reset(tp)
+ mocked = mocker.patch.object(fetcher, '_retrieve_offsets')
+
+ mocked.return_value = {}
+ with pytest.raises(NoOffsetForPartitionError):
+ fetcher._reset_offset(tp)
+
+ mocked.return_value = {tp: (1001, None)}
+ fetcher._reset_offset(tp)
+ assert not fetcher._subscriptions.assignment[tp].awaiting_reset
+ assert fetcher._subscriptions.assignment[tp].position == 1001
+
+
+def test__send_offset_requests(fetcher, mocker):
+ tp = TopicPartition("topic_send_offset", 1)
+ mocked_send = mocker.patch.object(fetcher, "_send_offset_request")
+ send_futures = []
+
+ def send_side_effect(*args, **kw):
+ f = Future()
+ send_futures.append(f)
+ return f
+ mocked_send.side_effect = send_side_effect
+
+ mocked_leader = mocker.patch.object(
+ fetcher._client.cluster, "leader_for_partition")
+ # First we report unavailable leader 2 times different ways and later
+ # always as available
+ mocked_leader.side_effect = itertools.chain(
+ [None, -1], itertools.cycle([0]))
+
+ # Leader == None
+ fut = fetcher._send_offset_requests({tp: 0})
+ assert fut.failed()
+ assert isinstance(fut.exception, StaleMetadata)
+ assert not mocked_send.called
+
+ # Leader == -1
+ fut = fetcher._send_offset_requests({tp: 0})
+ assert fut.failed()
+ assert isinstance(fut.exception, LeaderNotAvailableError)
+ assert not mocked_send.called
+
+ # Leader == 0, send failed
+ fut = fetcher._send_offset_requests({tp: 0})
+ assert not fut.is_done
+ assert mocked_send.called
+ # Check that we bound the futures correctly to chain failure
+ send_futures.pop().failure(NotLeaderForPartitionError(tp))
+ assert fut.failed()
+ assert isinstance(fut.exception, NotLeaderForPartitionError)
+
+ # Leader == 0, send success
+ fut = fetcher._send_offset_requests({tp: 0})
+ assert not fut.is_done
+ assert mocked_send.called
+ # Check that we bound the futures correctly to chain success
+ send_futures.pop().success({tp: (10, 10000)})
+ assert fut.succeeded()
+ assert fut.value == {tp: (10, 10000)}
+
+
+def test__send_offset_requests_multiple_nodes(fetcher, mocker):
+ tp1 = TopicPartition("topic_send_offset", 1)
+ tp2 = TopicPartition("topic_send_offset", 2)
+ tp3 = TopicPartition("topic_send_offset", 3)
+ tp4 = TopicPartition("topic_send_offset", 4)
+ mocked_send = mocker.patch.object(fetcher, "_send_offset_request")
+ send_futures = []
+
+ def send_side_effect(node_id, timestamps):
+ f = Future()
+ send_futures.append((node_id, timestamps, f))
+ return f
+ mocked_send.side_effect = send_side_effect
+
+ mocked_leader = mocker.patch.object(
+ fetcher._client.cluster, "leader_for_partition")
+ mocked_leader.side_effect = itertools.cycle([0, 1])
+
+ # -- All node succeeded case
+ tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)])
+ fut = fetcher._send_offset_requests(tss)
+ assert not fut.is_done
+ assert mocked_send.call_count == 2
+
+ req_by_node = {}
+ second_future = None
+ for node, timestamps, f in send_futures:
+ req_by_node[node] = timestamps
+ if node == 0:
+ # Say tp3 does not have any messages so it's missing
+ f.success({tp1: (11, 1001)})
+ else:
+ second_future = f
+ assert req_by_node == {
+ 0: {tp1: 0, tp3: 0},
+ 1: {tp2: 0, tp4: 0}
+ }
+
+ # We only resolved 1 future so far, so result future is not yet ready
+ assert not fut.is_done
+ second_future.success({tp2: (12, 1002), tp4: (14, 1004)})
+ assert fut.succeeded()
+ assert fut.value == {tp1: (11, 1001), tp2: (12, 1002), tp4: (14, 1004)}
+
+ # -- First succeeded second not
+ del send_futures[:]
+ fut = fetcher._send_offset_requests(tss)
+ assert len(send_futures) == 2
+ send_futures[0][2].success({tp1: (11, 1001)})
+ send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1))
+ assert fut.failed()
+ assert isinstance(fut.exception, UnknownTopicOrPartitionError)
+
+ # -- First fails second succeeded
+ del send_futures[:]
+ fut = fetcher._send_offset_requests(tss)
+ assert len(send_futures) == 2
+ send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1))
+ send_futures[1][2].success({tp1: (11, 1001)})
+ assert fut.failed()
+ assert isinstance(fut.exception, UnknownTopicOrPartitionError)
+
+
+def test__handle_offset_response(fetcher, mocker):
+ # Broker returns UnsupportedForMessageFormatError, will omit partition
+ fut = Future()
+ res = OffsetResponse[1]([
+ ("topic", [(0, 43, -1, -1)]),
+ ("topic", [(1, 0, 1000, 9999)])
+ ])
+ fetcher._handle_offset_response(fut, res)
+ assert fut.succeeded()
+ assert fut.value == {TopicPartition("topic", 1): (9999, 1000)}
+
+ # Broker returns NotLeaderForPartitionError
+ fut = Future()
+ res = OffsetResponse[1]([
+ ("topic", [(0, 6, -1, -1)]),
+ ])
+ fetcher._handle_offset_response(fut, res)
+ assert fut.failed()
+ assert isinstance(fut.exception, NotLeaderForPartitionError)
+
+ # Broker returns UnknownTopicOrPartitionError
+ fut = Future()
+ res = OffsetResponse[1]([
+ ("topic", [(0, 3, -1, -1)]),
+ ])
+ fetcher._handle_offset_response(fut, res)
+ assert fut.failed()
+ assert isinstance(fut.exception, UnknownTopicOrPartitionError)
+
+ # Broker returns many errors and 1 result
+ # Will fail on 1st error and return
+ fut = Future()
+ res = OffsetResponse[1]([
+ ("topic", [(0, 43, -1, -1)]),
+ ("topic", [(1, 6, -1, -1)]),
+ ("topic", [(2, 3, -1, -1)]),
+ ("topic", [(3, 0, 1000, 9999)])
+ ])
+ fetcher._handle_offset_response(fut, res)
+ assert fut.failed()
+ assert isinstance(fut.exception, NotLeaderForPartitionError)