diff options
author | huangcuiyang <harlan51360@outlook.com> | 2020-09-08 05:59:24 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-07 14:59:24 -0700 |
commit | 91daea329bb40ed80bddef4635770d24b670b0c6 (patch) | |
tree | a48912cf14390f8747e09155efd1ec281499fb38 /kafka | |
parent | bd557dabd487cc44c11bf003600c82477ea5de11 (diff) | |
download | kafka-python-91daea329bb40ed80bddef4635770d24b670b0c6.tar.gz |
Fix #1985: fix consumer deadlock when heartbeat thread request timeout (#2064)
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/coordinator/base.py | 20 |
1 files changed, 12 insertions, 8 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index cd110ce..5e41309 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -242,7 +242,7 @@ class BaseCoordinator(object): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - with self._lock: + with self._client._lock, self._lock: while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator @@ -345,7 +345,7 @@ class BaseCoordinator(object): def ensure_active_group(self): """Ensure that the group is active (i.e. joined and synced)""" - with self._lock: + with self._client._lock, self._lock: if self._heartbeat_thread is None: self._start_heartbeat_thread() @@ -763,7 +763,7 @@ class BaseCoordinator(object): def maybe_leave_group(self): """Leave the current group and reset local generation/memberId.""" - with self._lock: + with self._client._lock, self._lock: if (not self.coordinator_unknown() and self.state is not MemberState.UNJOINED and self._generation is not Generation.NO_GENERATION): @@ -946,6 +946,15 @@ class HeartbeatThread(threading.Thread): log.debug('Heartbeat thread closed') def _run_once(self): + with self.coordinator._client._lock, self.coordinator._lock: + if self.enabled and self.coordinator.state is MemberState.STABLE: + # TODO: When consumer.wakeup() is implemented, we need to + # disable here to prevent propagating an exception to this + # heartbeat thread + # must get client._lock, or maybe deadlock at heartbeat + # failure callbak in consumer poll + self.coordinator._client.poll(timeout_ms=0) + with self.coordinator._lock: if not self.enabled: log.debug('Heartbeat disabled. Waiting') @@ -961,11 +970,6 @@ class HeartbeatThread(threading.Thread): self.disable() return - # TODO: When consumer.wakeup() is implemented, we need to - # disable here to prevent propagating an exception to this - # heartbeat thread - self.coordinator._client.poll(timeout_ms=0) - if self.coordinator.coordinator_unknown(): future = self.coordinator.lookup_coordinator() if not future.is_done or future.failed(): |