diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-03 16:07:24 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-03 16:07:24 -0800 |
commit | 79aa0f04892ce4f5b0e27a80654e3689ac9d7e32 (patch) | |
tree | 7e9a35ddc80ac2884e0c6ecf02947c7e80cf3cab /kafka/consumer | |
parent | 9b07bfb5298f961b965ee4a295b0bceb52803852 (diff) | |
download | kafka-python-79aa0f04892ce4f5b0e27a80654e3689ac9d7e32.tar.gz |
Support consumer_timeout_ms in new KafkaConsumer
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/group.py | 19 |
1 files changed, 17 insertions, 2 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a9a4ac0..6a5084d 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -42,6 +42,7 @@ class KafkaConsumer(six.Iterator): 'session_timeout_ms': 30000, 'send_buffer_bytes': 128 * 1024, 'receive_buffer_bytes': 32 * 1024, + 'consumer_timeout_ms': -1, 'api_version': 'auto', 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet #'metric_reporters': None, @@ -145,6 +146,9 @@ class KafkaConsumer(six.Iterator): (SO_SNDBUF) to use when sending data. Default: 131072 receive_buffer_bytes (int): The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: 32768 + consumer_timeout_ms (int): number of millisecond to throw a timeout + exception to the consumer if no message is available for + consumption. Default: -1 (dont throw exception) api_version (str): specify which kafka API version to use. 0.9 enables full group coordination features; 0.8.2 enables kafka-storage offset commits; 0.8.1 enables zookeeper-storage @@ -183,6 +187,7 @@ class KafkaConsumer(six.Iterator): **self.config) self._closed = False self._iterator = None + self._consumer_timeout = float('inf') #self.metrics = None if topics: @@ -595,7 +600,7 @@ class KafkaConsumer(six.Iterator): self._fetcher.update_fetch_positions(partitions) def _message_generator(self): - while True: + while time.time() < self._consumer_timeout: if self.config['api_version'] >= (0, 8, 2): self._coordinator.ensure_coordinator_known() @@ -612,7 +617,11 @@ class KafkaConsumer(six.Iterator): # init any new fetches (won't resend pending fetches) self._fetcher.init_fetches() self._client.poll(self.config['request_timeout_ms'] / 1000.0) - timeout = time.time() + self.config['heartbeat_interval_ms'] / 1000.0 + timeout = self._consumer_timeout + if self.config['api_version'] >= (0, 9): + heartbeat_timeout = time.time() + ( + self.config['heartbeat_interval_ms'] / 1000.0) + timeout = min(heartbeat_timeout, timeout) for msg in self._fetcher: yield msg if time.time() > timeout: @@ -624,6 +633,12 @@ class KafkaConsumer(six.Iterator): def __next__(self): if not self._iterator: self._iterator = self._message_generator() + + # consumer_timeout_ms can be used to stop iteration early + if self.config['consumer_timeout_ms'] >= 0: + self._consumer_timeout = time.time() + ( + self.config['consumer_timeout_ms'] / 1000.0) + try: return next(self._iterator) except StopIteration: |