diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-08 15:20:54 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-08 16:12:01 -0700 |
commit | c28c8a31c36696544d81495e0bf9e2c425ba3786 (patch) | |
tree | d3cd531bde9d9047d43d7f072c39022cef643d10 /test/test_consumer.py | |
parent | f1dc01e63bf174558d791b211b545428c984ae2b (diff) | |
download | kafka-python-c28c8a31c36696544d81495e0bf9e2c425ba3786.tar.gz |
Add unit tests for SimpleConsumer error handling
Diffstat (limited to 'test/test_consumer.py')
-rw-r--r-- | test/test_consumer.py | 75 |
1 files changed, 74 insertions, 1 deletions
diff --git a/test/test_consumer.py b/test/test_consumer.py index a3d09a8..08fd620 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -3,7 +3,12 @@ from mock import MagicMock, patch from . import unittest from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer -from kafka.common import KafkaConfigurationError +from kafka.common import ( + KafkaConfigurationError, FetchResponse, + FailedPayloadsError, OffsetAndMessage, + NotLeaderForPartitionError, UnknownTopicOrPartitionError +) + class TestKafkaConsumer(unittest.TestCase): def test_non_integer_partitions(self): @@ -14,6 +19,7 @@ class TestKafkaConsumer(unittest.TestCase): with self.assertRaises(KafkaConfigurationError): KafkaConsumer() + class TestMultiProcessConsumer(unittest.TestCase): def test_partition_list(self): client = MagicMock() @@ -22,3 +28,70 @@ class TestMultiProcessConsumer(unittest.TestCase): consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions) self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) ) self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member + + def test_simple_consumer_failed_payloads(self): + client = MagicMock() + consumer = SimpleConsumer(client, group=None, + topic='topic', partitions=[0, 1], + auto_commit=False) + + def failed_payloads(payload): + return FailedPayloadsError(payload) + + client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads) + + # This should not raise an exception + consumer.get_messages(5) + + def test_simple_consumer_leader_change(self): + client = MagicMock() + consumer = SimpleConsumer(client, group=None, + topic='topic', partitions=[0, 1], + auto_commit=False) + + # Mock so that only the first request gets a valid response + def not_leader(request): + return FetchResponse(request.topic, request.partition, + NotLeaderForPartitionError.errno, -1, ()) + + client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader) + + # This should not raise an exception + consumer.get_messages(20) + + # client should have updated metadata + self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1) + self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1) + + def test_simple_consumer_unknown_topic_partition(self): + client = MagicMock() + consumer = SimpleConsumer(client, group=None, + topic='topic', partitions=[0, 1], + auto_commit=False) + + # Mock so that only the first request gets a valid response + def unknown_topic_partition(request): + return FetchResponse(request.topic, request.partition, + UnknownTopicOrPartitionError.errno, -1, ()) + + client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition) + + # This should not raise an exception + with self.assertRaises(UnknownTopicOrPartitionError): + consumer.get_messages(20) + + @staticmethod + def fail_requests_factory(error_factory): + # Mock so that only the first request gets a valid response + def fail_requests(payloads, **kwargs): + responses = [ + FetchResponse(payloads[0].topic, payloads[0].partition, 0, 0, + (OffsetAndMessage( + payloads[0].offset + i, + "msg %d" % (payloads[0].offset + i)) + for i in range(10))), + ] + for failure in payloads[1:]: + responses.append(error_factory(failure)) + return responses + return fail_requests |