diff options
-rw-r--r-- | kafka/consumer/simple.py | 5 | ||||
-rw-r--r-- | test/test_consumer.py | 75 |
2 files changed, 78 insertions, 2 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 88eeada..384fa8e 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -344,9 +344,12 @@ class SimpleConsumer(Consumer): try: check_error(resp) - except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): + except UnknownTopicOrPartitionError: self.client.reset_topic_metadata(resp.topic) raise + except NotLeaderForPartitionError: + self.client.reset_topic_metadata(resp.topic) + continue except OffsetOutOfRangeError: log.warning("OffsetOutOfRangeError for %s - %d. " "Resetting partition offset...", diff --git a/test/test_consumer.py b/test/test_consumer.py index a3d09a8..08fd620 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -3,7 +3,12 @@ from mock import MagicMock, patch from . import unittest from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer -from kafka.common import KafkaConfigurationError +from kafka.common import ( + KafkaConfigurationError, FetchResponse, + FailedPayloadsError, OffsetAndMessage, + NotLeaderForPartitionError, UnknownTopicOrPartitionError +) + class TestKafkaConsumer(unittest.TestCase): def test_non_integer_partitions(self): @@ -14,6 +19,7 @@ class TestKafkaConsumer(unittest.TestCase): with self.assertRaises(KafkaConfigurationError): KafkaConsumer() + class TestMultiProcessConsumer(unittest.TestCase): def test_partition_list(self): client = MagicMock() @@ -22,3 +28,70 @@ class TestMultiProcessConsumer(unittest.TestCase): consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions) self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) ) self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member + + def test_simple_consumer_failed_payloads(self): + client = MagicMock() + consumer = SimpleConsumer(client, group=None, + topic='topic', partitions=[0, 1], + auto_commit=False) + + def failed_payloads(payload): + return FailedPayloadsError(payload) + + client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads) + + # This should not raise an exception + consumer.get_messages(5) + + def test_simple_consumer_leader_change(self): + client = MagicMock() + consumer = SimpleConsumer(client, group=None, + topic='topic', partitions=[0, 1], + auto_commit=False) + + # Mock so that only the first request gets a valid response + def not_leader(request): + return FetchResponse(request.topic, request.partition, + NotLeaderForPartitionError.errno, -1, ()) + + client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader) + + # This should not raise an exception + consumer.get_messages(20) + + # client should have updated metadata + self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1) + self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1) + + def test_simple_consumer_unknown_topic_partition(self): + client = MagicMock() + consumer = SimpleConsumer(client, group=None, + topic='topic', partitions=[0, 1], + auto_commit=False) + + # Mock so that only the first request gets a valid response + def unknown_topic_partition(request): + return FetchResponse(request.topic, request.partition, + UnknownTopicOrPartitionError.errno, -1, ()) + + client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition) + + # This should not raise an exception + with self.assertRaises(UnknownTopicOrPartitionError): + consumer.get_messages(20) + + @staticmethod + def fail_requests_factory(error_factory): + # Mock so that only the first request gets a valid response + def fail_requests(payloads, **kwargs): + responses = [ + FetchResponse(payloads[0].topic, payloads[0].partition, 0, 0, + (OffsetAndMessage( + payloads[0].offset + i, + "msg %d" % (payloads[0].offset + i)) + for i in range(10))), + ] + for failure in payloads[1:]: + responses.append(error_factory(failure)) + return responses + return fail_requests |