summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/coordinator/base.py22
1 files changed, 14 insertions, 8 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index dca809e..b0a0981 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -186,7 +186,7 @@ class BaseCoordinator(object):
self.coordinator_dead()
return True
- return not self._client.ready(self.coordinator_id)
+ return False
def ensure_coordinator_known(self):
"""Block until the coordinator for this group is known
@@ -288,9 +288,13 @@ class BaseCoordinator(object):
return future
def _failed_request(self, node_id, request, future, error):
- log.error('Error sending %s to node %s [%s] -- marking coordinator dead',
+ log.error('Error sending %s to node %s [%s]',
request.__class__.__name__, node_id, error)
- self.coordinator_dead()
+ # Marking coordinator dead
+ # unless the error is caused by internal client pipelining
+ if not isinstance(error, (Errors.NodeNotReadyError,
+ Errors.TooManyInFlightRequests)):
+ self.coordinator_dead()
future.failure(error)
def _handle_join_group_response(self, future, response):
@@ -388,7 +392,8 @@ class BaseCoordinator(object):
def _send_sync_group_request(self, request):
if self.coordinator_unknown():
- return Future().failure(Errors.GroupCoordinatorNotAvailableError())
+ e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
+ return Future().failure(e)
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_sync_group_response, future)
@@ -439,7 +444,7 @@ class BaseCoordinator(object):
Future: resolves to the node id of the coordinator
"""
node_id = self._client.least_loaded_node()
- if node_id is None or not self._client.ready(node_id):
+ if node_id is None:
return Future().failure(Errors.NoBrokersAvailable())
log.debug("Issuing group metadata request to broker %s", node_id)
@@ -490,8 +495,8 @@ class BaseCoordinator(object):
def coordinator_dead(self, error=None):
"""Mark the current coordinator as dead."""
if self.coordinator_id is not None:
- log.info("Marking the coordinator dead (node %s): %s.",
- self.coordinator_id, error)
+ log.warning("Marking the coordinator dead (node %s): %s.",
+ self.coordinator_id, error)
self.coordinator_id = None
def close(self):
@@ -501,6 +506,7 @@ class BaseCoordinator(object):
self._client.unschedule(self.heartbeat_task)
except KeyError:
pass
+
if not self.coordinator_unknown() and self.generation > 0:
# this is a minimal effort attempt to leave the group. we do not
# attempt any resending if the request fails or times out.
@@ -634,7 +640,7 @@ class HeartbeatTask(object):
self._client.schedule(self, time.time() + ttl)
def _handle_heartbeat_failure(self, e):
- log.warning("Heartbeat failed; retrying")
+ log.warning("Heartbeat failed (%s); retrying", e)
self._request_in_flight = False
etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0
self._client.schedule(self, etd)