diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client_async.py | 4 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 78 |
2 files changed, 39 insertions, 43 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index ac2d364..87922b2 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -595,7 +595,9 @@ class KafkaClient(object): self._poll(timeout / 1000) - responses.extend(self._fire_pending_completed_requests()) + # called without the lock to avoid deadlock potential + # if handlers need to acquire locks + responses.extend(self._fire_pending_completed_requests()) # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 5cdbdcf..700c31f 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -243,7 +243,7 @@ class BaseCoordinator(object): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - with self._client._lock, self._lock: + with self._lock: while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator @@ -273,7 +273,7 @@ class BaseCoordinator(object): self._find_coordinator_future = None def lookup_coordinator(self): - with self._client._lock, self._lock: + with self._lock: if self._find_coordinator_future is not None: return self._find_coordinator_future @@ -346,7 +346,7 @@ class BaseCoordinator(object): def ensure_active_group(self): """Ensure that the group is active (i.e. joined and synced)""" - with self._client._lock, self._lock: + with self._lock: if self._heartbeat_thread is None: self._start_heartbeat_thread() @@ -504,7 +504,7 @@ class BaseCoordinator(object): log.debug("Received successful JoinGroup response for group %s: %s", self.group_id, response) self.sensors.join_latency.record((time.time() - send_time) * 1000) - with self._client._lock, self._lock: + with self._lock: if self.state is not MemberState.REBALANCING: # if the consumer was woken up before a rebalance completes, # we may have already left the group. In this case, we do @@ -679,7 +679,7 @@ class BaseCoordinator(object): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - with self._client._lock, self._lock: + with self._lock: coordinator_id = self._client.cluster.add_group_coordinator(self.group_id, response) if not coordinator_id: # This could happen if coordinator metadata is different @@ -761,7 +761,7 @@ class BaseCoordinator(object): def maybe_leave_group(self): """Leave the current group and reset local generation/memberId.""" - with self._client._lock, self._lock: + with self._lock: if (not self.coordinator_unknown() and self.state is not MemberState.UNJOINED and self._generation is not Generation.NO_GENERATION): @@ -959,46 +959,40 @@ class HeartbeatThread(threading.Thread): self.disable() return - # TODO: When consumer.wakeup() is implemented, we need to - # disable here to prevent propagating an exception to this - # heartbeat thread - # - # Release coordinator lock during client poll to avoid deadlocks - # if/when connection errback needs coordinator lock - self.coordinator._client.poll(timeout_ms=0) - - if self.coordinator.coordinator_unknown(): - future = self.coordinator.lookup_coordinator() - if not future.is_done or future.failed(): - # the immediate future check ensures that we backoff - # properly in the case that no brokers are available - # to connect to (and the future is automatically failed). - with self.coordinator._lock: + # TODO: When consumer.wakeup() is implemented, we need to + # disable here to prevent propagating an exception to this + # heartbeat thread + self.coordinator._client.poll(timeout_ms=0) + + if self.coordinator.coordinator_unknown(): + future = self.coordinator.lookup_coordinator() + if not future.is_done or future.failed(): + # the immediate future check ensures that we backoff + # properly in the case that no brokers are available + # to connect to (and the future is automatically failed). self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) - elif self.coordinator.heartbeat.session_timeout_expired(): - # the session timeout has expired without seeing a - # successful heartbeat, so we should probably make sure - # the coordinator is still healthy. - log.warning('Heartbeat session expired, marking coordinator dead') - self.coordinator.coordinator_dead('Heartbeat session expired') - - elif self.coordinator.heartbeat.poll_timeout_expired(): - # the poll timeout has expired, which means that the - # foreground thread has stalled in between calls to - # poll(), so we explicitly leave the group. - log.warning('Heartbeat poll expired, leaving group') - self.coordinator.maybe_leave_group() - - elif not self.coordinator.heartbeat.should_heartbeat(): - # poll again after waiting for the retry backoff in case - # the heartbeat failed or the coordinator disconnected - log.log(0, 'Not ready to heartbeat, waiting') - with self.coordinator._lock: + elif self.coordinator.heartbeat.session_timeout_expired(): + # the session timeout has expired without seeing a + # successful heartbeat, so we should probably make sure + # the coordinator is still healthy. + log.warning('Heartbeat session expired, marking coordinator dead') + self.coordinator.coordinator_dead('Heartbeat session expired') + + elif self.coordinator.heartbeat.poll_timeout_expired(): + # the poll timeout has expired, which means that the + # foreground thread has stalled in between calls to + # poll(), so we explicitly leave the group. + log.warning('Heartbeat poll expired, leaving group') + self.coordinator.maybe_leave_group() + + elif not self.coordinator.heartbeat.should_heartbeat(): + # poll again after waiting for the retry backoff in case + # the heartbeat failed or the coordinator disconnected + log.log(0, 'Not ready to heartbeat, waiting') self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) - else: - with self.coordinator._client._lock, self.coordinator._lock: + else: self.coordinator.heartbeat.sent_heartbeat() future = self.coordinator._send_heartbeat_request() future.add_callback(self._handle_heartbeat_success) |