summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-05-27 16:41:47 +0530
committerMahendra M <mahendra.m@gmail.com>2013-05-27 16:41:47 +0530
commit117c576c93ebc3e55d63707c26872de2b95dce73 (patch)
treed09b71334499923d5bc77e020e31a062098232e1 /kafka/consumer.py
parent38215b66ebc70954c8448ce0397ac007ba35d697 (diff)
downloadkafka-python-117c576c93ebc3e55d63707c26872de2b95dce73.tar.gz
New API for getting a specified set of messages
This will be easier to use in some cases where we have to get only a specified set of messages. This API uses the __iter__ API internally, but maintains the state to give back only the required set of messages API is - get_messages(count=1)
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py18
1 files changed, 18 insertions, 0 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 603ea36..2af9f6a 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -155,6 +155,24 @@ class SimpleConsumer(object):
assert resp.error == 0
self.count_since_commit = 0
+ def get_messages(self, count=1):
+ """
+ Get the specified number of messages
+ """
+ if not hasattr(self, '_iterator'):
+ self._iterator = iter(self)
+
+ msgs = []
+ while count > 0:
+ try:
+ msgs.append(self._iterator.next())
+ count -= 1
+ except StopIteration:
+ delattr(self, '_iterator')
+ break
+
+ return msgs
+
def __iter__(self):
"""
Create an iterate per partition. Iterate through them calling next() until they are