diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 9a41d90..6af0d8f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -738,11 +738,15 @@ class BrokerConnection(object): Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ... """ - # Monkeypatch the connection request timeout - # Generally this timeout should not get triggered - # but in case it does, we want it to be reasonably short - stashed_request_timeout_ms = self.config['request_timeout_ms'] - self.config['request_timeout_ms'] = timeout * 1000 + # Monkeypatch some connection configurations to avoid timeouts + override_config = { + 'request_timeout_ms': timeout * 1000, + 'max_in_flight_requests_per_connection': 5 + } + stashed = {} + for key in override_config: + stashed[key] = self.config[key] + self.config[key] = override_config[key] # kafka kills the connection when it doesnt recognize an API request # so we can send a test request and then follow immediately with a @@ -837,7 +841,8 @@ class BrokerConnection(object): raise Errors.UnrecognizedBrokerVersion() log.removeFilter(log_filter) - self.config['request_timeout_ms'] = stashed_request_timeout_ms + for key in stashed: + self.config[key] = stashed[key] return version def __repr__(self): |