diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-09-28 09:56:11 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-09-28 09:56:11 -0700 |
commit | 5c784890b6f323ea37c6171a59184e9304cbcb5c (patch) | |
tree | 1f60344b106a41a622c05cedb6b5f6c16c610a09 | |
parent | 2a7aca1630b81669595d753083239ec9fbf66ff5 (diff) | |
download | kafka-python-5c784890b6f323ea37c6171a59184e9304cbcb5c.tar.gz |
Monkeypatch max_in_flight_requests_per_connection when checking broker version (#834)
-rw-r--r-- | kafka/conn.py | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 9a41d90..6af0d8f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -738,11 +738,15 @@ class BrokerConnection(object): Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ... """ - # Monkeypatch the connection request timeout - # Generally this timeout should not get triggered - # but in case it does, we want it to be reasonably short - stashed_request_timeout_ms = self.config['request_timeout_ms'] - self.config['request_timeout_ms'] = timeout * 1000 + # Monkeypatch some connection configurations to avoid timeouts + override_config = { + 'request_timeout_ms': timeout * 1000, + 'max_in_flight_requests_per_connection': 5 + } + stashed = {} + for key in override_config: + stashed[key] = self.config[key] + self.config[key] = override_config[key] # kafka kills the connection when it doesnt recognize an API request # so we can send a test request and then follow immediately with a @@ -837,7 +841,8 @@ class BrokerConnection(object): raise Errors.UnrecognizedBrokerVersion() log.removeFilter(log_filter) - self.config['request_timeout_ms'] = stashed_request_timeout_ms + for key in stashed: + self.config[key] = stashed[key] return version def __repr__(self): |