summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-04-18 08:29:19 -0700
committerGitHub <noreply@github.com>2018-04-18 08:29:19 -0700
commit1c71dfc3c321372c808f45f569ae41352f420e8f (patch)
tree3987e9c17e954e80cd49806cb6ac2e4eb694aece
parente23676d6c03b87f14ec8992de583f673dc8a1a3e (diff)
downloadkafka-python-1c71dfc3c321372c808f45f569ae41352f420e8f.tar.gz
Always acquire client lock before coordinator lock to avoid deadlocks (#1464)
-rw-r--r--kafka/coordinator/base.py123
1 files changed, 64 insertions, 59 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index b177567..7deeaf0 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -231,20 +231,19 @@ class BaseCoordinator(object):
Returns: the current coordinator id or None if it is unknown
"""
- with self._lock:
- if self.coordinator_id is None:
- return None
- elif self._client.is_disconnected(self.coordinator_id):
- self.coordinator_dead('Node Disconnected')
- return None
- else:
- return self.coordinator_id
+ if self.coordinator_id is None:
+ return None
+ elif self._client.is_disconnected(self.coordinator_id):
+ self.coordinator_dead('Node Disconnected')
+ return None
+ else:
+ return self.coordinator_id
def ensure_coordinator_ready(self):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
"""
- with self._lock:
+ with self._client._lock, self._lock:
while self.coordinator_unknown():
# Prior to 0.8.2 there was no group coordinator
@@ -274,17 +273,18 @@ class BaseCoordinator(object):
self._find_coordinator_future = None
def lookup_coordinator(self):
- if self._find_coordinator_future is not None:
- return self._find_coordinator_future
-
- # If there is an error sending the group coordinator request
- # then _reset_find_coordinator_future will immediately fire and
- # set _find_coordinator_future = None
- # To avoid returning None, we capture the future in a local variable
- self._find_coordinator_future = self._send_group_coordinator_request()
- future = self._find_coordinator_future
- self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
- return future
+ with self._client._lock, self._lock:
+ if self._find_coordinator_future is not None:
+ return self._find_coordinator_future
+
+ # If there is an error sending the group coordinator request
+ # then _reset_find_coordinator_future will immediately fire and
+ # set _find_coordinator_future = None
+ # To avoid returning None, we capture the future in a local variable
+ future = self._send_group_coordinator_request()
+ self._find_coordinator_future = future
+ self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
+ return future
def need_rejoin(self):
"""Check whether the group should be rejoined (e.g. if metadata changes)
@@ -487,7 +487,7 @@ class BaseCoordinator(object):
log.debug("Received successful JoinGroup response for group %s: %s",
self.group_id, response)
self.sensors.join_latency.record((time.time() - send_time) * 1000)
- with self._lock:
+ with self._client._lock, self._lock:
if self.state is not MemberState.REBALANCING:
# if the consumer was woken up before a rebalance completes,
# we may have already left the group. In this case, we do
@@ -663,7 +663,7 @@ class BaseCoordinator(object):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- with self._lock:
+ with self._client._lock, self._lock:
ok = self._client.cluster.add_group_coordinator(self.group_id, response)
if not ok:
# This could happen if coordinator metadata is different
@@ -693,11 +693,10 @@ class BaseCoordinator(object):
def coordinator_dead(self, error):
"""Mark the current coordinator as dead."""
- with self._lock:
- if self.coordinator_id is not None:
- log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
- self.coordinator_id, self.group_id, error)
- self.coordinator_id = None
+ if self.coordinator_id is not None:
+ log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
+ self.coordinator_id, self.group_id, error)
+ self.coordinator_id = None
def generation(self):
"""Get the current generation state if the group is stable.
@@ -741,13 +740,13 @@ class BaseCoordinator(object):
def close(self):
"""Close the coordinator, leave the current group,
and reset local generation / member_id"""
- with self._lock:
+ with self._client._lock, self._lock:
self._close_heartbeat_thread()
self.maybe_leave_group()
def maybe_leave_group(self):
"""Leave the current group and reset local generation/memberId."""
- with self._lock:
+ with self._client._lock, self._lock:
if (not self.coordinator_unknown()
and self.state is not MemberState.UNJOINED
and self._generation is not Generation.NO_GENERATION):
@@ -941,40 +940,46 @@ class HeartbeatThread(threading.Thread):
self.disable()
return
- # TODO: When consumer.wakeup() is implemented, we need to
- # disable here to prevent propagating an exception to this
- # heartbeat thread
- self.coordinator._client.poll(timeout_ms=0)
-
- if self.coordinator.coordinator_unknown():
- future = self.coordinator.lookup_coordinator()
- if not future.is_done or future.failed():
- # the immediate future check ensures that we backoff
- # properly in the case that no brokers are available
- # to connect to (and the future is automatically failed).
+ # TODO: When consumer.wakeup() is implemented, we need to
+ # disable here to prevent propagating an exception to this
+ # heartbeat thread
+ #
+ # Release coordinator lock during client poll to avoid deadlocks
+ # if/when connection errback needs coordinator lock
+ self.coordinator._client.poll(timeout_ms=0)
+
+ if self.coordinator.coordinator_unknown():
+ future = self.coordinator.lookup_coordinator()
+ if not future.is_done or future.failed():
+ # the immediate future check ensures that we backoff
+ # properly in the case that no brokers are available
+ # to connect to (and the future is automatically failed).
+ with self.coordinator._lock:
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
- elif self.coordinator.heartbeat.session_timeout_expired():
- # the session timeout has expired without seeing a
- # successful heartbeat, so we should probably make sure
- # the coordinator is still healthy.
- log.warning('Heartbeat session expired, marking coordinator dead')
- self.coordinator.coordinator_dead('Heartbeat session expired')
-
- elif self.coordinator.heartbeat.poll_timeout_expired():
- # the poll timeout has expired, which means that the
- # foreground thread has stalled in between calls to
- # poll(), so we explicitly leave the group.
- log.warning('Heartbeat poll expired, leaving group')
- self.coordinator.maybe_leave_group()
-
- elif not self.coordinator.heartbeat.should_heartbeat():
- # poll again after waiting for the retry backoff in case
- # the heartbeat failed or the coordinator disconnected
- log.log(0, 'Not ready to heartbeat, waiting')
+ elif self.coordinator.heartbeat.session_timeout_expired():
+ # the session timeout has expired without seeing a
+ # successful heartbeat, so we should probably make sure
+ # the coordinator is still healthy.
+ log.warning('Heartbeat session expired, marking coordinator dead')
+ self.coordinator.coordinator_dead('Heartbeat session expired')
+
+ elif self.coordinator.heartbeat.poll_timeout_expired():
+ # the poll timeout has expired, which means that the
+ # foreground thread has stalled in between calls to
+ # poll(), so we explicitly leave the group.
+ log.warning('Heartbeat poll expired, leaving group')
+ self.coordinator.maybe_leave_group()
+
+ elif not self.coordinator.heartbeat.should_heartbeat():
+ # poll again after waiting for the retry backoff in case
+ # the heartbeat failed or the coordinator disconnected
+ log.log(0, 'Not ready to heartbeat, waiting')
+ with self.coordinator._lock:
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
- else:
+ else:
+ with self.coordinator._client._lock, self.coordinator._lock:
self.coordinator.heartbeat.sent_heartbeat()
future = self.coordinator._send_heartbeat_request()
future.add_callback(self._handle_heartbeat_success)