summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py18
1 files changed, 18 insertions, 0 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 603ea36..2af9f6a 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -155,6 +155,24 @@ class SimpleConsumer(object):
assert resp.error == 0
self.count_since_commit = 0
+ def get_messages(self, count=1):
+ """
+ Get the specified number of messages
+ """
+ if not hasattr(self, '_iterator'):
+ self._iterator = iter(self)
+
+ msgs = []
+ while count > 0:
+ try:
+ msgs.append(self._iterator.next())
+ count -= 1
+ except StopIteration:
+ delattr(self, '_iterator')
+ break
+
+ return msgs
+
def __iter__(self):
"""
Create an iterate per partition. Iterate through them calling next() until they are