diff options
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 18 |
1 files changed, 18 insertions, 0 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 52b3e85..fee53f5 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -204,6 +204,14 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertGreaterEqual(t.interval, 1) + # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1 + # second, get 5 back, no blocking + self.send_messages(0, range(0, 5)) + with Timer() as t: + messages = consumer.get_messages(count=10, block=1, timeout=1) + self.assert_message_count(messages, 5) + self.assertLessEqual(t.interval, 1) + consumer.stop() @kafka_versions("all") @@ -272,6 +280,16 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertGreaterEqual(t.interval, 1) + # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1 + # second, get at least one back, no blocking + self.send_messages(0, range(0, 5)) + with Timer() as t: + messages = consumer.get_messages(count=10, block=1, timeout=1) + received_message_count = len(messages) + self.assertGreaterEqual(received_message_count, 1) + self.assert_message_count(messages, received_message_count) + self.assertLessEqual(t.interval, 1) + consumer.stop() @kafka_versions("all") |