summaryrefslogtreecommitdiff
path: root/test/test_consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_consumer.py')
-rw-r--r--test/test_consumer.py135
1 files changed, 2 insertions, 133 deletions
diff --git a/test/test_consumer.py b/test/test_consumer.py
index edcc2d8..436fe55 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -1,15 +1,7 @@
-import sys
-
-from mock import MagicMock, patch
-from . import unittest
import pytest
-from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
-from kafka.errors import (
- FailedPayloadsError, KafkaConfigurationError, NotLeaderForPartitionError,
- UnknownTopicOrPartitionError)
-from kafka.structs import (
- FetchResponsePayload, OffsetAndMessage, OffsetFetchResponsePayload)
+from kafka import KafkaConsumer
+from kafka.errors import KafkaConfigurationError
class TestKafkaConsumer:
@@ -32,126 +24,3 @@ class TestKafkaConsumer:
assert sub == set(['foo'])
sub.add('fizz')
assert consumer.subscription() == set(['foo'])
-
-
-class TestMultiProcessConsumer(unittest.TestCase):
- @unittest.skipIf(sys.platform.startswith('win'), 'test mocking fails on windows')
- def test_partition_list(self):
- client = MagicMock()
- partitions = (0,)
- with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets:
- MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
- self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
- self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member
-
-
-class TestSimpleConsumer(unittest.TestCase):
- def test_non_integer_partitions(self):
- with self.assertRaises(AssertionError):
- SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0'])
-
- def test_simple_consumer_failed_payloads(self):
- client = MagicMock()
- consumer = SimpleConsumer(client, group=None,
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- def failed_payloads(payload):
- return FailedPayloadsError(payload)
-
- client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads)
-
- # This should not raise an exception
- consumer.get_messages(5)
-
- def test_simple_consumer_leader_change(self):
- client = MagicMock()
- consumer = SimpleConsumer(client, group=None,
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- # Mock so that only the first request gets a valid response
- def not_leader(request):
- return FetchResponsePayload(request.topic, request.partition,
- NotLeaderForPartitionError.errno, -1, ())
-
- client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader)
-
- # This should not raise an exception
- consumer.get_messages(20)
-
- # client should have updated metadata
- self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1)
- self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1)
-
- def test_simple_consumer_unknown_topic_partition(self):
- client = MagicMock()
- consumer = SimpleConsumer(client, group=None,
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- # Mock so that only the first request gets a valid response
- def unknown_topic_partition(request):
- return FetchResponsePayload(request.topic, request.partition,
- UnknownTopicOrPartitionError.errno, -1, ())
-
- client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition)
-
- # This should not raise an exception
- with self.assertRaises(UnknownTopicOrPartitionError):
- consumer.get_messages(20)
-
- def test_simple_consumer_commit_does_not_raise(self):
- client = MagicMock()
- client.get_partition_ids_for_topic.return_value = [0, 1]
-
- def mock_offset_fetch_request(group, payloads, **kwargs):
- return [OffsetFetchResponsePayload(p.topic, p.partition, 0, b'', 0) for p in payloads]
-
- client.send_offset_fetch_request.side_effect = mock_offset_fetch_request
-
- def mock_offset_commit_request(group, payloads, **kwargs):
- raise FailedPayloadsError(payloads[0])
-
- client.send_offset_commit_request.side_effect = mock_offset_commit_request
-
- consumer = SimpleConsumer(client, group='foobar',
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- # Mock internal commit check
- consumer.count_since_commit = 10
-
- # This should not raise an exception
- self.assertFalse(consumer.commit(partitions=[0, 1]))
-
- def test_simple_consumer_reset_partition_offset(self):
- client = MagicMock()
-
- def mock_offset_request(payloads, **kwargs):
- raise FailedPayloadsError(payloads[0])
-
- client.send_offset_request.side_effect = mock_offset_request
-
- consumer = SimpleConsumer(client, group='foobar',
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- # This should not raise an exception
- self.assertEqual(consumer.reset_partition_offset(0), None)
-
- @staticmethod
- def fail_requests_factory(error_factory):
- # Mock so that only the first request gets a valid response
- def fail_requests(payloads, **kwargs):
- responses = [
- FetchResponsePayload(payloads[0].topic, payloads[0].partition, 0, 0,
- [OffsetAndMessage(
- payloads[0].offset + i,
- "msg %d" % (payloads[0].offset + i))
- for i in range(10)]),
- ]
- for failure in payloads[1:]:
- responses.append(error_factory(failure))
- return responses
- return fail_requests