summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_consumer.py44
1 files changed, 42 insertions, 2 deletions
diff --git a/test/test_consumer.py b/test/test_consumer.py
index 08fd620..df15115 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -4,7 +4,7 @@ from . import unittest
from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
from kafka.common import (
- KafkaConfigurationError, FetchResponse,
+ KafkaConfigurationError, FetchResponse, OffsetFetchResponse,
FailedPayloadsError, OffsetAndMessage,
NotLeaderForPartitionError, UnknownTopicOrPartitionError
)
@@ -25,10 +25,11 @@ class TestMultiProcessConsumer(unittest.TestCase):
client = MagicMock()
partitions = (0,)
with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets:
- consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
+ MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member
+class TestSimpleConsumer(unittest.TestCase):
def test_simple_consumer_failed_payloads(self):
client = MagicMock()
consumer = SimpleConsumer(client, group=None,
@@ -80,6 +81,45 @@ class TestMultiProcessConsumer(unittest.TestCase):
with self.assertRaises(UnknownTopicOrPartitionError):
consumer.get_messages(20)
+ def test_simple_consumer_commit_does_not_raise(self):
+ client = MagicMock()
+ client.get_partition_ids_for_topic.return_value = [0, 1]
+
+ def mock_offset_fetch_request(group, payloads, **kwargs):
+ return [OffsetFetchResponse(p.topic, p.partition, 0, b'', 0) for p in payloads]
+
+ client.send_offset_fetch_request.side_effect = mock_offset_fetch_request
+
+ def mock_offset_commit_request(group, payloads, **kwargs):
+ raise FailedPayloadsError(payloads[0])
+
+ client.send_offset_commit_request.side_effect = mock_offset_commit_request
+
+ consumer = SimpleConsumer(client, group='foobar',
+ topic='topic', partitions=[0, 1],
+ auto_commit=False)
+
+ # Mock internal commit check
+ consumer.count_since_commit = 10
+
+ # This should not raise an exception
+ self.assertFalse(consumer.commit(partitions=[0, 1]))
+
+ def test_simple_consumer_reset_partition_offset(self):
+ client = MagicMock()
+
+ def mock_offset_request(payloads, **kwargs):
+ raise FailedPayloadsError(payloads[0])
+
+ client.send_offset_request.side_effect = mock_offset_request
+
+ consumer = SimpleConsumer(client, group='foobar',
+ topic='topic', partitions=[0, 1],
+ auto_commit=False)
+
+ # This should not raise an exception
+ self.assertEqual(consumer.reset_partition_offset(0), None)
+
@staticmethod
def fail_requests_factory(error_factory):
# Mock so that only the first request gets a valid response