diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-12-02 13:50:22 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-12-02 13:50:22 -0800 |
commit | 2eca5538eef948ef3828be5c3cb5c78c1c958f6f (patch) | |
tree | d89eeea1f855244e3b7364286b15be7f881ffd4a | |
parent | c2eccd5bd1c9c0d728bfa11d798a15c05c4deb9c (diff) | |
parent | e74a8ba4942891c62ef35f70472f10ee067f89b6 (diff) | |
download | kafka-python-2eca5538eef948ef3828be5c3cb5c78c1c958f6f.tar.gz |
Merge pull request #457 from saaros/block-for-number-of-messages
Consumers get_messages: allow blocking until some messages are received
-rw-r--r-- | kafka/consumer/multiprocess.py | 14 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 15 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 18 |
3 files changed, 36 insertions, 11 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 0b09102..5e421d6 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -238,10 +238,12 @@ class MultiProcessConsumer(Consumer): Keyword Arguments: count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. + block: If True, the API will block till all messages are fetched. + If block is a positive integer the API will block until that + many messages are fetched. + timeout: When blocking is requested the function will block for + the specified time (in seconds) until count messages is + fetched. If None, it will block forever. """ messages = [] @@ -264,8 +266,10 @@ class MultiProcessConsumer(Consumer): if self.queue.empty(): self.events.start.set() + block_next_call = block is True or block > len(messages) try: - partition, message = self.queue.get(block, timeout) + partition, message = self.queue.get(block_next_call, + timeout) except Empty: break diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 9b85f8c..d8b5826 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -265,10 +265,12 @@ class SimpleConsumer(Consumer): Keyword Arguments: count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. + block: If True, the API will block till all messages are fetched. + If block is a positive integer the API will block until that + many messages are fetched. + timeout: When blocking is requested the function will block for + the specified time (in seconds) until count messages is + fetched. If None, it will block forever. """ messages = [] if timeout is not None: @@ -279,12 +281,13 @@ class SimpleConsumer(Consumer): while len(messages) < count: block_time = timeout - time.time() log.debug('calling _get_message block=%s timeout=%s', block, block_time) - result = self._get_message(block, block_time, + block_next_call = block is True or block > len(messages) + result = self._get_message(block_next_call, block_time, get_partition_info=True, update_offset=False) log.debug('got %s from _get_messages', result) if not result: - if block and (timeout is None or time.time() <= timeout): + if block_next_call and (timeout is None or time.time() <= timeout): continue break diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 52b3e85..fee53f5 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -204,6 +204,14 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertGreaterEqual(t.interval, 1) + # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1 + # second, get 5 back, no blocking + self.send_messages(0, range(0, 5)) + with Timer() as t: + messages = consumer.get_messages(count=10, block=1, timeout=1) + self.assert_message_count(messages, 5) + self.assertLessEqual(t.interval, 1) + consumer.stop() @kafka_versions("all") @@ -272,6 +280,16 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertGreaterEqual(t.interval, 1) + # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1 + # second, get at least one back, no blocking + self.send_messages(0, range(0, 5)) + with Timer() as t: + messages = consumer.get_messages(count=10, block=1, timeout=1) + received_message_count = len(messages) + self.assertGreaterEqual(received_message_count, 1) + self.assert_message_count(messages, received_message_count) + self.assertLessEqual(t.interval, 1) + consumer.stop() @kafka_versions("all") |