diff options
author | Dana Powers <dana.powers@gmail.com> | 2018-03-28 16:40:57 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2018-03-28 16:40:57 -0700 |
commit | a556f5c140d3777cb11706fd3854c37def7387f2 (patch) | |
tree | c85b8a24833004b6e04461d2b04806426ddae93a | |
parent | 4c87d11c26e2aa5a60de0b2213dd8caa3b16d553 (diff) | |
download | kafka-python-coordinator_client_deadlock.tar.gz |
Always acquire client lock before coordinator lock to avoid deadlockscoordinator_client_deadlock
-rw-r--r-- | kafka/coordinator/base.py | 123 |
1 files changed, 64 insertions, 59 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index b177567..7deeaf0 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -231,20 +231,19 @@ class BaseCoordinator(object): Returns: the current coordinator id or None if it is unknown """ - with self._lock: - if self.coordinator_id is None: - return None - elif self._client.is_disconnected(self.coordinator_id): - self.coordinator_dead('Node Disconnected') - return None - else: - return self.coordinator_id + if self.coordinator_id is None: + return None + elif self._client.is_disconnected(self.coordinator_id): + self.coordinator_dead('Node Disconnected') + return None + else: + return self.coordinator_id def ensure_coordinator_ready(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - with self._lock: + with self._client._lock, self._lock: while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator @@ -274,17 +273,18 @@ class BaseCoordinator(object): self._find_coordinator_future = None def lookup_coordinator(self): - if self._find_coordinator_future is not None: - return self._find_coordinator_future - - # If there is an error sending the group coordinator request - # then _reset_find_coordinator_future will immediately fire and - # set _find_coordinator_future = None - # To avoid returning None, we capture the future in a local variable - self._find_coordinator_future = self._send_group_coordinator_request() - future = self._find_coordinator_future - self._find_coordinator_future.add_both(self._reset_find_coordinator_future) - return future + with self._client._lock, self._lock: + if self._find_coordinator_future is not None: + return self._find_coordinator_future + + # If there is an error sending the group coordinator request + # then _reset_find_coordinator_future will immediately fire and + # set _find_coordinator_future = None + # To avoid returning None, we capture the future in a local variable + future = self._send_group_coordinator_request() + self._find_coordinator_future = future + self._find_coordinator_future.add_both(self._reset_find_coordinator_future) + return future def need_rejoin(self): """Check whether the group should be rejoined (e.g. if metadata changes) @@ -487,7 +487,7 @@ class BaseCoordinator(object): log.debug("Received successful JoinGroup response for group %s: %s", self.group_id, response) self.sensors.join_latency.record((time.time() - send_time) * 1000) - with self._lock: + with self._client._lock, self._lock: if self.state is not MemberState.REBALANCING: # if the consumer was woken up before a rebalance completes, # we may have already left the group. In this case, we do @@ -663,7 +663,7 @@ class BaseCoordinator(object): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - with self._lock: + with self._client._lock, self._lock: ok = self._client.cluster.add_group_coordinator(self.group_id, response) if not ok: # This could happen if coordinator metadata is different @@ -693,11 +693,10 @@ class BaseCoordinator(object): def coordinator_dead(self, error): """Mark the current coordinator as dead.""" - with self._lock: - if self.coordinator_id is not None: - log.warning("Marking the coordinator dead (node %s) for group %s: %s.", - self.coordinator_id, self.group_id, error) - self.coordinator_id = None + if self.coordinator_id is not None: + log.warning("Marking the coordinator dead (node %s) for group %s: %s.", + self.coordinator_id, self.group_id, error) + self.coordinator_id = None def generation(self): """Get the current generation state if the group is stable. @@ -741,13 +740,13 @@ class BaseCoordinator(object): def close(self): """Close the coordinator, leave the current group, and reset local generation / member_id""" - with self._lock: + with self._client._lock, self._lock: self._close_heartbeat_thread() self.maybe_leave_group() def maybe_leave_group(self): """Leave the current group and reset local generation/memberId.""" - with self._lock: + with self._client._lock, self._lock: if (not self.coordinator_unknown() and self.state is not MemberState.UNJOINED and self._generation is not Generation.NO_GENERATION): @@ -941,40 +940,46 @@ class HeartbeatThread(threading.Thread): self.disable() return - # TODO: When consumer.wakeup() is implemented, we need to - # disable here to prevent propagating an exception to this - # heartbeat thread - self.coordinator._client.poll(timeout_ms=0) - - if self.coordinator.coordinator_unknown(): - future = self.coordinator.lookup_coordinator() - if not future.is_done or future.failed(): - # the immediate future check ensures that we backoff - # properly in the case that no brokers are available - # to connect to (and the future is automatically failed). + # TODO: When consumer.wakeup() is implemented, we need to + # disable here to prevent propagating an exception to this + # heartbeat thread + # + # Release coordinator lock during client poll to avoid deadlocks + # if/when connection errback needs coordinator lock + self.coordinator._client.poll(timeout_ms=0) + + if self.coordinator.coordinator_unknown(): + future = self.coordinator.lookup_coordinator() + if not future.is_done or future.failed(): + # the immediate future check ensures that we backoff + # properly in the case that no brokers are available + # to connect to (and the future is automatically failed). + with self.coordinator._lock: self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) - elif self.coordinator.heartbeat.session_timeout_expired(): - # the session timeout has expired without seeing a - # successful heartbeat, so we should probably make sure - # the coordinator is still healthy. - log.warning('Heartbeat session expired, marking coordinator dead') - self.coordinator.coordinator_dead('Heartbeat session expired') - - elif self.coordinator.heartbeat.poll_timeout_expired(): - # the poll timeout has expired, which means that the - # foreground thread has stalled in between calls to - # poll(), so we explicitly leave the group. - log.warning('Heartbeat poll expired, leaving group') - self.coordinator.maybe_leave_group() - - elif not self.coordinator.heartbeat.should_heartbeat(): - # poll again after waiting for the retry backoff in case - # the heartbeat failed or the coordinator disconnected - log.log(0, 'Not ready to heartbeat, waiting') + elif self.coordinator.heartbeat.session_timeout_expired(): + # the session timeout has expired without seeing a + # successful heartbeat, so we should probably make sure + # the coordinator is still healthy. + log.warning('Heartbeat session expired, marking coordinator dead') + self.coordinator.coordinator_dead('Heartbeat session expired') + + elif self.coordinator.heartbeat.poll_timeout_expired(): + # the poll timeout has expired, which means that the + # foreground thread has stalled in between calls to + # poll(), so we explicitly leave the group. + log.warning('Heartbeat poll expired, leaving group') + self.coordinator.maybe_leave_group() + + elif not self.coordinator.heartbeat.should_heartbeat(): + # poll again after waiting for the retry backoff in case + # the heartbeat failed or the coordinator disconnected + log.log(0, 'Not ready to heartbeat, waiting') + with self.coordinator._lock: self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) - else: + else: + with self.coordinator._client._lock, self.coordinator._lock: self.coordinator.heartbeat.sent_heartbeat() future = self.coordinator._send_heartbeat_request() future.add_callback(self._handle_heartbeat_success) |