diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-15 21:49:46 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:42:55 -0800 |
commit | 7caf9bef491b368fd1dec4430c38332fec3dc1b6 (patch) | |
tree | a98703cb28e70f1ae871be4d2cda4e342743c1a5 | |
parent | 08f6ad94556256d710a5d4b517986111de32ffa1 (diff) | |
download | kafka-python-7caf9bef491b368fd1dec4430c38332fec3dc1b6.tar.gz |
Add private methods to manage internal _msg_iter
-rw-r--r-- | kafka/consumer/new.py | 30 |
1 files changed, 20 insertions, 10 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index abafae8..8f243cd 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -122,7 +122,6 @@ class KafkaConsumer(object): def __init__(self, *topics, **configs): self.configure(**configs) self.set_topic_partitions(*topics) - self._msg_iter = None def configure(self, **configs): """ @@ -280,6 +279,8 @@ class KafkaConsumer(object): self._reset_highwater_offsets() self._reset_task_done_offsets() + # Reset message iterator in case we were in the middle of one + self._reset_message_iterator() def next(self): """ @@ -297,20 +298,16 @@ class KafkaConsumer(object): self._set_consumer_timeout_start() while True: - # Fetch a new batch if needed - if self._msg_iter is None: - self._msg_iter = self.fetch_messages() - # Check for auto-commit if self._should_auto_commit(): self.commit() try: - return self._msg_iter.next() + return self._get_message_iterator().next() # Handle batch completion except StopIteration: - self._msg_iter = None + self._reset_message_iterator() self._check_consumer_timeout() @@ -649,12 +646,25 @@ class KafkaConsumer(object): self._next_commit = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0) # + # Message iterator private methods + # + def __iter__(self): + return self + + def _get_message_iterator(self): + # Fetch a new batch if needed + if self._msg_iter is None: + self._msg_iter = self.fetch_messages() + + return self._msg_iter + + def _reset_message_iterator(self): + self._msg_iter = None + + # # python private methods # def __repr__(self): return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition for topic_partition in self._topics]) - - def __iter__(self): - return self |