diff options
-rw-r--r-- | kafka/coordinator/base.py | 22 |
1 files changed, 14 insertions, 8 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index dca809e..b0a0981 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -186,7 +186,7 @@ class BaseCoordinator(object): self.coordinator_dead() return True - return not self._client.ready(self.coordinator_id) + return False def ensure_coordinator_known(self): """Block until the coordinator for this group is known @@ -288,9 +288,13 @@ class BaseCoordinator(object): return future def _failed_request(self, node_id, request, future, error): - log.error('Error sending %s to node %s [%s] -- marking coordinator dead', + log.error('Error sending %s to node %s [%s]', request.__class__.__name__, node_id, error) - self.coordinator_dead() + # Marking coordinator dead + # unless the error is caused by internal client pipelining + if not isinstance(error, (Errors.NodeNotReadyError, + Errors.TooManyInFlightRequests)): + self.coordinator_dead() future.failure(error) def _handle_join_group_response(self, future, response): @@ -388,7 +392,8 @@ class BaseCoordinator(object): def _send_sync_group_request(self, request): if self.coordinator_unknown(): - return Future().failure(Errors.GroupCoordinatorNotAvailableError()) + e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id) + return Future().failure(e) future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_sync_group_response, future) @@ -439,7 +444,7 @@ class BaseCoordinator(object): Future: resolves to the node id of the coordinator """ node_id = self._client.least_loaded_node() - if node_id is None or not self._client.ready(node_id): + if node_id is None: return Future().failure(Errors.NoBrokersAvailable()) log.debug("Issuing group metadata request to broker %s", node_id) @@ -490,8 +495,8 @@ class BaseCoordinator(object): def coordinator_dead(self, error=None): """Mark the current coordinator as dead.""" if self.coordinator_id is not None: - log.info("Marking the coordinator dead (node %s): %s.", - self.coordinator_id, error) + log.warning("Marking the coordinator dead (node %s): %s.", + self.coordinator_id, error) self.coordinator_id = None def close(self): @@ -501,6 +506,7 @@ class BaseCoordinator(object): self._client.unschedule(self.heartbeat_task) except KeyError: pass + if not self.coordinator_unknown() and self.generation > 0: # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. @@ -634,7 +640,7 @@ class HeartbeatTask(object): self._client.schedule(self, time.time() + ttl) def _handle_heartbeat_failure(self, e): - log.warning("Heartbeat failed; retrying") + log.warning("Heartbeat failed (%s); retrying", e) self._request_in_flight = False etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0 self._client.schedule(self, etd) |