diff options
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 13 |
1 files changed, 13 insertions, 0 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 47c721f..a300c83 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -6,6 +6,8 @@ import socket import sys import time +from kafka.errors import KafkaConfigurationError + from kafka.vendor import six from kafka.client_async import KafkaClient, selectors @@ -267,6 +269,17 @@ class KafkaConsumer(six.Iterator): new_config, self.config['auto_offset_reset']) self.config['auto_offset_reset'] = new_config + request_timeout_ms = self.config['request_timeout_ms'] + session_timeout_ms = self.config['session_timeout_ms'] + fetch_max_wait_ms = self.config['fetch_max_wait_ms'] + if request_timeout_ms <= session_timeout_ms: + raise KafkaConfigurationError( + "Request timeout (%s) must be larger than session timeout (%s)" % + (request_timeout_ms, session_timeout_ms)) + if request_timeout_ms <= fetch_max_wait_ms: + raise KafkaConfigurationError("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" % + (request_timeout_ms, fetch_max_wait_ms)) + metrics_tags = {'client-id': self.config['client_id']} metric_config = MetricConfig(samples=self.config['metrics_num_samples'], time_window_ms=self.config['metrics_sample_window_ms'], |