diff options
-rw-r--r-- | kafka/coordinator/base.py | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 664e8d2..e538fda 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -752,9 +752,8 @@ class BaseCoordinator(object): def close(self): """Close the coordinator, leave the current group, and reset local generation / member_id""" - with self._client._lock, self._lock: - self._close_heartbeat_thread() - self.maybe_leave_group() + self._close_heartbeat_thread() + self.maybe_leave_group() def maybe_leave_group(self): """Leave the current group and reset local generation/memberId.""" @@ -918,6 +917,10 @@ class HeartbeatThread(threading.Thread): self.closed = True with self.coordinator._lock: self.coordinator._lock.notify() + if self.is_alive(): + self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000) + if self.is_alive(): + log.warning("Heartbeat thread did not fully terminate during close") def run(self): try: |