summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-12-02 13:50:22 -0800
committerDana Powers <dana.powers@gmail.com>2015-12-02 13:50:22 -0800
commit2eca5538eef948ef3828be5c3cb5c78c1c958f6f (patch)
treed89eeea1f855244e3b7364286b15be7f881ffd4a
parentc2eccd5bd1c9c0d728bfa11d798a15c05c4deb9c (diff)
parente74a8ba4942891c62ef35f70472f10ee067f89b6 (diff)
downloadkafka-python-2eca5538eef948ef3828be5c3cb5c78c1c958f6f.tar.gz
Merge pull request #457 from saaros/block-for-number-of-messages
Consumers get_messages: allow blocking until some messages are received
-rw-r--r--kafka/consumer/multiprocess.py14
-rw-r--r--kafka/consumer/simple.py15
-rw-r--r--test/test_consumer_integration.py18
3 files changed, 36 insertions, 11 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 0b09102..5e421d6 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -238,10 +238,12 @@ class MultiProcessConsumer(Consumer):
Keyword Arguments:
count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified
- time (in seconds) until count messages is fetched. If None,
- it will block forever.
+ block: If True, the API will block till all messages are fetched.
+ If block is a positive integer the API will block until that
+ many messages are fetched.
+ timeout: When blocking is requested the function will block for
+ the specified time (in seconds) until count messages is
+ fetched. If None, it will block forever.
"""
messages = []
@@ -264,8 +266,10 @@ class MultiProcessConsumer(Consumer):
if self.queue.empty():
self.events.start.set()
+ block_next_call = block is True or block > len(messages)
try:
- partition, message = self.queue.get(block, timeout)
+ partition, message = self.queue.get(block_next_call,
+ timeout)
except Empty:
break
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 9b85f8c..d8b5826 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -265,10 +265,12 @@ class SimpleConsumer(Consumer):
Keyword Arguments:
count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified
- time (in seconds) until count messages is fetched. If None,
- it will block forever.
+ block: If True, the API will block till all messages are fetched.
+ If block is a positive integer the API will block until that
+ many messages are fetched.
+ timeout: When blocking is requested the function will block for
+ the specified time (in seconds) until count messages is
+ fetched. If None, it will block forever.
"""
messages = []
if timeout is not None:
@@ -279,12 +281,13 @@ class SimpleConsumer(Consumer):
while len(messages) < count:
block_time = timeout - time.time()
log.debug('calling _get_message block=%s timeout=%s', block, block_time)
- result = self._get_message(block, block_time,
+ block_next_call = block is True or block > len(messages)
+ result = self._get_message(block_next_call, block_time,
get_partition_info=True,
update_offset=False)
log.debug('got %s from _get_messages', result)
if not result:
- if block and (timeout is None or time.time() <= timeout):
+ if block_next_call and (timeout is None or time.time() <= timeout):
continue
break
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 52b3e85..fee53f5 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -204,6 +204,14 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assert_message_count(messages, 5)
self.assertGreaterEqual(t.interval, 1)
+ # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
+ # second, get 5 back, no blocking
+ self.send_messages(0, range(0, 5))
+ with Timer() as t:
+ messages = consumer.get_messages(count=10, block=1, timeout=1)
+ self.assert_message_count(messages, 5)
+ self.assertLessEqual(t.interval, 1)
+
consumer.stop()
@kafka_versions("all")
@@ -272,6 +280,16 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assert_message_count(messages, 5)
self.assertGreaterEqual(t.interval, 1)
+ # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
+ # second, get at least one back, no blocking
+ self.send_messages(0, range(0, 5))
+ with Timer() as t:
+ messages = consumer.get_messages(count=10, block=1, timeout=1)
+ received_message_count = len(messages)
+ self.assertGreaterEqual(received_message_count, 1)
+ self.assert_message_count(messages, received_message_count)
+ self.assertLessEqual(t.interval, 1)
+
consumer.stop()
@kafka_versions("all")