summaryrefslogtreecommitdiff
path: root/kafka/consumer
diff options
context:
space:
mode:
authorOskari Saarenmaa <os@ohmu.fi>2015-09-18 14:06:51 +0300
committerOskari Saarenmaa <os@ohmu.fi>2015-09-18 14:06:51 +0300
commite74a8ba4942891c62ef35f70472f10ee067f89b6 (patch)
tree187e24f4eb26a8a7b6933f4c9d39a7b5aa4a21ae /kafka/consumer
parentb525e1a8d63e4fcb0ede43c05739bc84c85cc79c (diff)
downloadkafka-python-e74a8ba4942891c62ef35f70472f10ee067f89b6.tar.gz
Consumers get_messages: allow blocking until some messages are received
Modified MultiProcessConsumer's and SimpleConsumer's `block` argument to allow integer value which defines the number of messages to block for. This allows callers to ask for a high number of messages and block only until some of them are received. Otherwise callers would have to request messages one by one or block for some time.
Diffstat (limited to 'kafka/consumer')
-rw-r--r--kafka/consumer/multiprocess.py14
-rw-r--r--kafka/consumer/simple.py15
2 files changed, 18 insertions, 11 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index d03eb95..046271b 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -226,10 +226,12 @@ class MultiProcessConsumer(Consumer):
Keyword Arguments:
count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified
- time (in seconds) until count messages is fetched. If None,
- it will block forever.
+ block: If True, the API will block till all messages are fetched.
+ If block is a positive integer the API will block until that
+ many messages are fetched.
+ timeout: When blocking is requested the function will block for
+ the specified time (in seconds) until count messages is
+ fetched. If None, it will block forever.
"""
messages = []
@@ -252,8 +254,10 @@ class MultiProcessConsumer(Consumer):
if self.queue.empty():
self.events.start.set()
+ block_next_call = block is True or block > len(messages)
try:
- partition, message = self.queue.get(block, timeout)
+ partition, message = self.queue.get(block_next_call,
+ timeout)
except Empty:
break
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 733baa8..6e18290 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -272,10 +272,12 @@ class SimpleConsumer(Consumer):
Keyword Arguments:
count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified
- time (in seconds) until count messages is fetched. If None,
- it will block forever.
+ block: If True, the API will block till all messages are fetched.
+ If block is a positive integer the API will block until that
+ many messages are fetched.
+ timeout: When blocking is requested the function will block for
+ the specified time (in seconds) until count messages is
+ fetched. If None, it will block forever.
"""
messages = []
if timeout is not None:
@@ -286,12 +288,13 @@ class SimpleConsumer(Consumer):
while len(messages) < count:
block_time = timeout - time.time()
log.debug('calling _get_message block=%s timeout=%s', block, block_time)
- result = self._get_message(block, block_time,
+ block_next_call = block is True or block > len(messages)
+ result = self._get_message(block_next_call, block_time,
get_partition_info=True,
update_offset=False)
log.debug('got %s from _get_messages', result)
if not result:
- if block and (timeout is None or time.time() <= timeout):
+ if block_next_call and (timeout is None or time.time() <= timeout):
continue
break