diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-05-27 16:41:47 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-05-27 16:41:47 +0530 |
commit | 117c576c93ebc3e55d63707c26872de2b95dce73 (patch) | |
tree | d09b71334499923d5bc77e020e31a062098232e1 /kafka/consumer.py | |
parent | 38215b66ebc70954c8448ce0397ac007ba35d697 (diff) | |
download | kafka-python-117c576c93ebc3e55d63707c26872de2b95dce73.tar.gz |
New API for getting a specified set of messages
This will be easier to use in some cases where we have to get
only a specified set of messages. This API uses the __iter__
API internally, but maintains the state to give back only the
required set of messages
API is - get_messages(count=1)
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 18 |
1 files changed, 18 insertions, 0 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 603ea36..2af9f6a 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -155,6 +155,24 @@ class SimpleConsumer(object): assert resp.error == 0 self.count_since_commit = 0 + def get_messages(self, count=1): + """ + Get the specified number of messages + """ + if not hasattr(self, '_iterator'): + self._iterator = iter(self) + + msgs = [] + while count > 0: + try: + msgs.append(self._iterator.next()) + count -= 1 + except StopIteration: + delattr(self, '_iterator') + break + + return msgs + def __iter__(self): """ Create an iterate per partition. Iterate through them calling next() until they are |