summaryrefslogtreecommitdiff
path: root/test/test_consumer.py
blob: 08fd62045d84b8ddef6b0a4c79475cc5d395bab9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97

from mock import MagicMock, patch
from . import unittest

from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
from kafka.common import (
    KafkaConfigurationError, FetchResponse,
    FailedPayloadsError, OffsetAndMessage,
    NotLeaderForPartitionError, UnknownTopicOrPartitionError
)


class TestKafkaConsumer(unittest.TestCase):
    def test_non_integer_partitions(self):
        with self.assertRaises(AssertionError):
            SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])

    def test_broker_list_required(self):
        with self.assertRaises(KafkaConfigurationError):
            KafkaConsumer()


class TestMultiProcessConsumer(unittest.TestCase):
    def test_partition_list(self):
        client = MagicMock()
        partitions = (0,)
        with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets:
            consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
            self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
        self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member

    def test_simple_consumer_failed_payloads(self):
        client = MagicMock()
        consumer = SimpleConsumer(client, group=None,
                                  topic='topic', partitions=[0, 1],
                                  auto_commit=False)

        def failed_payloads(payload):
            return FailedPayloadsError(payload)

        client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads)

        # This should not raise an exception
        consumer.get_messages(5)

    def test_simple_consumer_leader_change(self):
        client = MagicMock()
        consumer = SimpleConsumer(client, group=None,
                                  topic='topic', partitions=[0, 1],
                                  auto_commit=False)

        # Mock so that only the first request gets a valid response
        def not_leader(request):
            return FetchResponse(request.topic, request.partition,
                                 NotLeaderForPartitionError.errno, -1, ())

        client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader)

        # This should not raise an exception
        consumer.get_messages(20)

        # client should have updated metadata
        self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1)
        self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1)

    def test_simple_consumer_unknown_topic_partition(self):
        client = MagicMock()
        consumer = SimpleConsumer(client, group=None,
                                  topic='topic', partitions=[0, 1],
                                  auto_commit=False)

        # Mock so that only the first request gets a valid response
        def unknown_topic_partition(request):
            return FetchResponse(request.topic, request.partition,
                                 UnknownTopicOrPartitionError.errno, -1, ())

        client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition)

        # This should not raise an exception
        with self.assertRaises(UnknownTopicOrPartitionError):
            consumer.get_messages(20)

    @staticmethod
    def fail_requests_factory(error_factory):
        # Mock so that only the first request gets a valid response
        def fail_requests(payloads, **kwargs):
            responses = [
                FetchResponse(payloads[0].topic, payloads[0].partition, 0, 0,
                              (OffsetAndMessage(
                                  payloads[0].offset + i,
                                  "msg %d" % (payloads[0].offset + i))
                               for i in range(10))),
            ]
            for failure in payloads[1:]:
                responses.append(error_factory(failure))
            return responses
        return fail_requests