summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-12 14:42:17 -0800
committerDana Powers <dana.powers@rd.io>2016-01-12 14:42:17 -0800
commitdcd62b72e39df00da23e13d783fa5681a20e381b (patch)
tree1ef61901e79303b1eef518662e7cad7a40b05be8
parentcc3e1cc9a17de52a3ab7955548b8bae945777a97 (diff)
downloadkafka-python-dcd62b72e39df00da23e13d783fa5681a20e381b.tar.gz
Move consumer_timeout handling to private method
-rw-r--r--kafka/consumer/group.py12
1 files changed, 7 insertions, 5 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index bd977c5..141c1fa 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -650,17 +650,19 @@ class KafkaConsumer(six.Iterator):
self._iterator = self._message_generator()
self._fetcher.init_fetches()
- # consumer_timeout_ms can be used to stop iteration early
- if self.config['consumer_timeout_ms'] >= 0:
- self._consumer_timeout = time.time() + (
- self.config['consumer_timeout_ms'] / 1000.0)
-
+ self._set_consumer_timeout()
try:
return next(self._iterator)
except StopIteration:
self._iterator = None
raise
+ def _set_consumer_timeout(self):
+ # consumer_timeout_ms can be used to stop iteration early
+ if self.config['consumer_timeout_ms'] >= 0:
+ self._consumer_timeout = time.time() + (
+ self.config['consumer_timeout_ms'] / 1000.0)
+
# old KafkaConsumer methods are deprecated
def configure(self, **configs):
raise NotImplementedError(