summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-06-08 18:19:56 -0700
committerDana Powers <dana.powers@gmail.com>2015-06-08 18:19:56 -0700
commit00c6b8635bca62e4facca105d33fdd250a2d5eb4 (patch)
tree94902a62beb5aff1a71a6dba7da7d96a09d71bca
parentf1dc01e63bf174558d791b211b545428c984ae2b (diff)
parentb235ce89733e17997ad7192e10064d47da9ccfb4 (diff)
downloadkafka-python-00c6b8635bca62e4facca105d33fdd250a2d5eb4.tar.gz
Merge pull request #393 from dpkp/simple_consumer_leader_change
Simple consumer leader change
-rw-r--r--kafka/consumer/simple.py5
-rw-r--r--test/test_consumer.py75
2 files changed, 78 insertions, 2 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 88eeada..384fa8e 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -344,9 +344,12 @@ class SimpleConsumer(Consumer):
try:
check_error(resp)
- except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
+ except UnknownTopicOrPartitionError:
self.client.reset_topic_metadata(resp.topic)
raise
+ except NotLeaderForPartitionError:
+ self.client.reset_topic_metadata(resp.topic)
+ continue
except OffsetOutOfRangeError:
log.warning("OffsetOutOfRangeError for %s - %d. "
"Resetting partition offset...",
diff --git a/test/test_consumer.py b/test/test_consumer.py
index a3d09a8..08fd620 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -3,7 +3,12 @@ from mock import MagicMock, patch
from . import unittest
from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
-from kafka.common import KafkaConfigurationError
+from kafka.common import (
+ KafkaConfigurationError, FetchResponse,
+ FailedPayloadsError, OffsetAndMessage,
+ NotLeaderForPartitionError, UnknownTopicOrPartitionError
+)
+
class TestKafkaConsumer(unittest.TestCase):
def test_non_integer_partitions(self):
@@ -14,6 +19,7 @@ class TestKafkaConsumer(unittest.TestCase):
with self.assertRaises(KafkaConfigurationError):
KafkaConsumer()
+
class TestMultiProcessConsumer(unittest.TestCase):
def test_partition_list(self):
client = MagicMock()
@@ -22,3 +28,70 @@ class TestMultiProcessConsumer(unittest.TestCase):
consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member
+
+ def test_simple_consumer_failed_payloads(self):
+ client = MagicMock()
+ consumer = SimpleConsumer(client, group=None,
+ topic='topic', partitions=[0, 1],
+ auto_commit=False)
+
+ def failed_payloads(payload):
+ return FailedPayloadsError(payload)
+
+ client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads)
+
+ # This should not raise an exception
+ consumer.get_messages(5)
+
+ def test_simple_consumer_leader_change(self):
+ client = MagicMock()
+ consumer = SimpleConsumer(client, group=None,
+ topic='topic', partitions=[0, 1],
+ auto_commit=False)
+
+ # Mock so that only the first request gets a valid response
+ def not_leader(request):
+ return FetchResponse(request.topic, request.partition,
+ NotLeaderForPartitionError.errno, -1, ())
+
+ client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader)
+
+ # This should not raise an exception
+ consumer.get_messages(20)
+
+ # client should have updated metadata
+ self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1)
+ self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1)
+
+ def test_simple_consumer_unknown_topic_partition(self):
+ client = MagicMock()
+ consumer = SimpleConsumer(client, group=None,
+ topic='topic', partitions=[0, 1],
+ auto_commit=False)
+
+ # Mock so that only the first request gets a valid response
+ def unknown_topic_partition(request):
+ return FetchResponse(request.topic, request.partition,
+ UnknownTopicOrPartitionError.errno, -1, ())
+
+ client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition)
+
+ # This should not raise an exception
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ consumer.get_messages(20)
+
+ @staticmethod
+ def fail_requests_factory(error_factory):
+ # Mock so that only the first request gets a valid response
+ def fail_requests(payloads, **kwargs):
+ responses = [
+ FetchResponse(payloads[0].topic, payloads[0].partition, 0, 0,
+ (OffsetAndMessage(
+ payloads[0].offset + i,
+ "msg %d" % (payloads[0].offset + i))
+ for i in range(10))),
+ ]
+ for failure in payloads[1:]:
+ responses.append(error_factory(failure))
+ return responses
+ return fail_requests