diff options
| author | Dana Powers <dana.powers@rd.io> | 2015-06-08 15:20:54 -0700 | 
|---|---|---|
| committer | Dana Powers <dana.powers@rd.io> | 2015-06-08 16:12:01 -0700 | 
| commit | c28c8a31c36696544d81495e0bf9e2c425ba3786 (patch) | |
| tree | d3cd531bde9d9047d43d7f072c39022cef643d10 /test/test_consumer.py | |
| parent | f1dc01e63bf174558d791b211b545428c984ae2b (diff) | |
| download | kafka-python-c28c8a31c36696544d81495e0bf9e2c425ba3786.tar.gz | |
Add unit tests for SimpleConsumer error handling
Diffstat (limited to 'test/test_consumer.py')
| -rw-r--r-- | test/test_consumer.py | 75 | 
1 files changed, 74 insertions, 1 deletions
diff --git a/test/test_consumer.py b/test/test_consumer.py index a3d09a8..08fd620 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -3,7 +3,12 @@ from mock import MagicMock, patch  from . import unittest  from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer -from kafka.common import KafkaConfigurationError +from kafka.common import ( +    KafkaConfigurationError, FetchResponse, +    FailedPayloadsError, OffsetAndMessage, +    NotLeaderForPartitionError, UnknownTopicOrPartitionError +) +  class TestKafkaConsumer(unittest.TestCase):      def test_non_integer_partitions(self): @@ -14,6 +19,7 @@ class TestKafkaConsumer(unittest.TestCase):          with self.assertRaises(KafkaConfigurationError):              KafkaConsumer() +  class TestMultiProcessConsumer(unittest.TestCase):      def test_partition_list(self):          client = MagicMock() @@ -22,3 +28,70 @@ class TestMultiProcessConsumer(unittest.TestCase):              consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)              self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )          self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member + +    def test_simple_consumer_failed_payloads(self): +        client = MagicMock() +        consumer = SimpleConsumer(client, group=None, +                                  topic='topic', partitions=[0, 1], +                                  auto_commit=False) + +        def failed_payloads(payload): +            return FailedPayloadsError(payload) + +        client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads) + +        # This should not raise an exception +        consumer.get_messages(5) + +    def test_simple_consumer_leader_change(self): +        client = MagicMock() +        consumer = SimpleConsumer(client, group=None, +                                  topic='topic', partitions=[0, 1], +                                  auto_commit=False) + +        # Mock so that only the first request gets a valid response +        def not_leader(request): +            return FetchResponse(request.topic, request.partition, +                                 NotLeaderForPartitionError.errno, -1, ()) + +        client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader) + +        # This should not raise an exception +        consumer.get_messages(20) + +        # client should have updated metadata +        self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1) +        self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1) + +    def test_simple_consumer_unknown_topic_partition(self): +        client = MagicMock() +        consumer = SimpleConsumer(client, group=None, +                                  topic='topic', partitions=[0, 1], +                                  auto_commit=False) + +        # Mock so that only the first request gets a valid response +        def unknown_topic_partition(request): +            return FetchResponse(request.topic, request.partition, +                                 UnknownTopicOrPartitionError.errno, -1, ()) + +        client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition) + +        # This should not raise an exception +        with self.assertRaises(UnknownTopicOrPartitionError): +            consumer.get_messages(20) + +    @staticmethod +    def fail_requests_factory(error_factory): +        # Mock so that only the first request gets a valid response +        def fail_requests(payloads, **kwargs): +            responses = [ +                FetchResponse(payloads[0].topic, payloads[0].partition, 0, 0, +                              (OffsetAndMessage( +                                  payloads[0].offset + i, +                                  "msg %d" % (payloads[0].offset + i)) +                               for i in range(10))), +            ] +            for failure in payloads[1:]: +                responses.append(error_factory(failure)) +            return responses +        return fail_requests  | 
