summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/coordinator/base.py9
1 files changed, 6 insertions, 3 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 664e8d2..e538fda 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -752,9 +752,8 @@ class BaseCoordinator(object):
def close(self):
"""Close the coordinator, leave the current group,
and reset local generation / member_id"""
- with self._client._lock, self._lock:
- self._close_heartbeat_thread()
- self.maybe_leave_group()
+ self._close_heartbeat_thread()
+ self.maybe_leave_group()
def maybe_leave_group(self):
"""Leave the current group and reset local generation/memberId."""
@@ -918,6 +917,10 @@ class HeartbeatThread(threading.Thread):
self.closed = True
with self.coordinator._lock:
self.coordinator._lock.notify()
+ if self.is_alive():
+ self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000)
+ if self.is_alive():
+ log.warning("Heartbeat thread did not fully terminate during close")
def run(self):
try: