summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/conn.py9
1 files changed, 7 insertions, 2 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index d4c5464..c2b38b5 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -1151,6 +1151,10 @@ class BrokerConnection(object):
stashed[key] = self.config[key]
self.config[key] = override_config[key]
+ def reset_override_configs():
+ for key in stashed:
+ self.config[key] = stashed[key]
+
# kafka kills the connection when it doesn't recognize an API request
# so we can send a test request and then follow immediately with a
# vanilla MetadataRequest. If the server did not recognize the first
@@ -1170,6 +1174,7 @@ class BrokerConnection(object):
for version, request in test_cases:
if not self.connect_blocking(timeout_at - time.time()):
+ reset_override_configs()
raise Errors.NodeNotReadyError()
f = self.send(request)
# HACK: sleeping to wait for socket to send bytes
@@ -1226,10 +1231,10 @@ class BrokerConnection(object):
log.info("Broker is not v%s -- it did not recognize %s",
version, request.__class__.__name__)
else:
+ reset_override_configs()
raise Errors.UnrecognizedBrokerVersion()
- for key in stashed:
- self.config[key] = stashed[key]
+ reset_override_configs()
return version
def __str__(self):