diff options
author | Dana Powers <dana.powers@gmail.com> | 2018-03-22 15:15:25 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2018-03-22 15:15:25 -0700 |
commit | f7a2e1294eb5d61083a136b35f71837eb9c28222 (patch) | |
tree | 7a10c7607ef552c4674b0bb71858021487d37ddd | |
parent | 18e48dce240eaa7cf714c780c02d1d5cf0b8fca2 (diff) | |
download | kafka-python-heartbeat_wait_brokers_down.tar.gz |
Check for immediate failure when looking up coordinator in heartbeat threadheartbeat_wait_brokers_down
-rw-r--r-- | kafka/coordinator/base.py | 6 |
1 files changed, 5 insertions, 1 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index bff6286..9f67d6b 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -945,7 +945,11 @@ class HeartbeatThread(threading.Thread): self.coordinator._client.poll(timeout_ms=0) if self.coordinator.coordinator_unknown(): - if not self.coordinator.lookup_coordinator().is_done: + future = self.coordinator.lookup_coordinator() + if not future.is_done or future.failed(): + # the immediate future check ensures that we backoff + # properly in the case that no brokers are available + # to connect to (and the future is automatically failed). self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) elif self.coordinator.heartbeat.session_timeout_expired(): |