summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-15 21:49:46 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:55 -0800
commit7caf9bef491b368fd1dec4430c38332fec3dc1b6 (patch)
treea98703cb28e70f1ae871be4d2cda4e342743c1a5
parent08f6ad94556256d710a5d4b517986111de32ffa1 (diff)
downloadkafka-python-7caf9bef491b368fd1dec4430c38332fec3dc1b6.tar.gz
Add private methods to manage internal _msg_iter
-rw-r--r--kafka/consumer/new.py30
1 files changed, 20 insertions, 10 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index abafae8..8f243cd 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -122,7 +122,6 @@ class KafkaConsumer(object):
def __init__(self, *topics, **configs):
self.configure(**configs)
self.set_topic_partitions(*topics)
- self._msg_iter = None
def configure(self, **configs):
"""
@@ -280,6 +279,8 @@ class KafkaConsumer(object):
self._reset_highwater_offsets()
self._reset_task_done_offsets()
+ # Reset message iterator in case we were in the middle of one
+ self._reset_message_iterator()
def next(self):
"""
@@ -297,20 +298,16 @@ class KafkaConsumer(object):
self._set_consumer_timeout_start()
while True:
- # Fetch a new batch if needed
- if self._msg_iter is None:
- self._msg_iter = self.fetch_messages()
-
# Check for auto-commit
if self._should_auto_commit():
self.commit()
try:
- return self._msg_iter.next()
+ return self._get_message_iterator().next()
# Handle batch completion
except StopIteration:
- self._msg_iter = None
+ self._reset_message_iterator()
self._check_consumer_timeout()
@@ -649,12 +646,25 @@ class KafkaConsumer(object):
self._next_commit = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
#
+ # Message iterator private methods
+ #
+ def __iter__(self):
+ return self
+
+ def _get_message_iterator(self):
+ # Fetch a new batch if needed
+ if self._msg_iter is None:
+ self._msg_iter = self.fetch_messages()
+
+ return self._msg_iter
+
+ def _reset_message_iterator(self):
+ self._msg_iter = None
+
+ #
# python private methods
#
def __repr__(self):
return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
for topic_partition in
self._topics])
-
- def __iter__(self):
- return self