summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/consumer/group.py12
1 files changed, 7 insertions, 5 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index bd977c5..141c1fa 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -650,17 +650,19 @@ class KafkaConsumer(six.Iterator):
self._iterator = self._message_generator()
self._fetcher.init_fetches()
- # consumer_timeout_ms can be used to stop iteration early
- if self.config['consumer_timeout_ms'] >= 0:
- self._consumer_timeout = time.time() + (
- self.config['consumer_timeout_ms'] / 1000.0)
-
+ self._set_consumer_timeout()
try:
return next(self._iterator)
except StopIteration:
self._iterator = None
raise
+ def _set_consumer_timeout(self):
+ # consumer_timeout_ms can be used to stop iteration early
+ if self.config['consumer_timeout_ms'] >= 0:
+ self._consumer_timeout = time.time() + (
+ self.config['consumer_timeout_ms'] / 1000.0)
+
# old KafkaConsumer methods are deprecated
def configure(self, **configs):
raise NotImplementedError(