summaryrefslogtreecommitdiff
path: root/kafka/consumer
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-03 16:07:24 -0800
committerDana Powers <dana.powers@rd.io>2016-01-03 16:07:24 -0800
commit79aa0f04892ce4f5b0e27a80654e3689ac9d7e32 (patch)
tree7e9a35ddc80ac2884e0c6ecf02947c7e80cf3cab /kafka/consumer
parent9b07bfb5298f961b965ee4a295b0bceb52803852 (diff)
downloadkafka-python-79aa0f04892ce4f5b0e27a80654e3689ac9d7e32.tar.gz
Support consumer_timeout_ms in new KafkaConsumer
Diffstat (limited to 'kafka/consumer')
-rw-r--r--kafka/consumer/group.py19
1 files changed, 17 insertions, 2 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index a9a4ac0..6a5084d 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -42,6 +42,7 @@ class KafkaConsumer(six.Iterator):
'session_timeout_ms': 30000,
'send_buffer_bytes': 128 * 1024,
'receive_buffer_bytes': 32 * 1024,
+ 'consumer_timeout_ms': -1,
'api_version': 'auto',
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
#'metric_reporters': None,
@@ -145,6 +146,9 @@ class KafkaConsumer(six.Iterator):
(SO_SNDBUF) to use when sending data. Default: 131072
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: 32768
+ consumer_timeout_ms (int): number of millisecond to throw a timeout
+ exception to the consumer if no message is available for
+ consumption. Default: -1 (dont throw exception)
api_version (str): specify which kafka API version to use.
0.9 enables full group coordination features; 0.8.2 enables
kafka-storage offset commits; 0.8.1 enables zookeeper-storage
@@ -183,6 +187,7 @@ class KafkaConsumer(six.Iterator):
**self.config)
self._closed = False
self._iterator = None
+ self._consumer_timeout = float('inf')
#self.metrics = None
if topics:
@@ -595,7 +600,7 @@ class KafkaConsumer(six.Iterator):
self._fetcher.update_fetch_positions(partitions)
def _message_generator(self):
- while True:
+ while time.time() < self._consumer_timeout:
if self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
@@ -612,7 +617,11 @@ class KafkaConsumer(six.Iterator):
# init any new fetches (won't resend pending fetches)
self._fetcher.init_fetches()
self._client.poll(self.config['request_timeout_ms'] / 1000.0)
- timeout = time.time() + self.config['heartbeat_interval_ms'] / 1000.0
+ timeout = self._consumer_timeout
+ if self.config['api_version'] >= (0, 9):
+ heartbeat_timeout = time.time() + (
+ self.config['heartbeat_interval_ms'] / 1000.0)
+ timeout = min(heartbeat_timeout, timeout)
for msg in self._fetcher:
yield msg
if time.time() > timeout:
@@ -624,6 +633,12 @@ class KafkaConsumer(six.Iterator):
def __next__(self):
if not self._iterator:
self._iterator = self._message_generator()
+
+ # consumer_timeout_ms can be used to stop iteration early
+ if self.config['consumer_timeout_ms'] >= 0:
+ self._consumer_timeout = time.time() + (
+ self.config['consumer_timeout_ms'] / 1000.0)
+
try:
return next(self._iterator)
except StopIteration: