diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-29 17:27:36 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-29 17:27:36 -0800 |
commit | 39e7562b31a7058bdeeb6d61abbbbd7627799546 (patch) | |
tree | 49d332b67f2ae6da260d44feebb3a5635e2ff8da | |
parent | 8811326443496d5efcc784c67b9d39824ac0ecee (diff) | |
download | kafka-python-39e7562b31a7058bdeeb6d61abbbbd7627799546.tar.gz |
Log request failures in AbstractCoordinator._failed_request
-rw-r--r-- | kafka/coordinator/abstract.py | 15 |
1 files changed, 10 insertions, 5 deletions
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py index 2431132..03302a3 100644 --- a/kafka/coordinator/abstract.py +++ b/kafka/coordinator/abstract.py @@ -254,10 +254,13 @@ class AbstractCoordinator(object): future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_join_group_response, future) - _f.add_errback(self._failed_request, future) + _f.add_errback(self._failed_request, self.coordinator_id, + request, future) return future - def _failed_request(self, future, error): + def _failed_request(self, node_id, request, future, error): + log.error('Error sending %s to node %s [%s] -- marking coordinator dead', + request.__class__.__name__, node_id, error) self.coordinator_dead() future.failure(error) @@ -360,7 +363,8 @@ class AbstractCoordinator(object): future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_sync_group_response, future) - _f.add_errback(self._failed_request, future) + _f.add_errback(self._failed_request, self.coordinator_id, + request, future) return future def _handle_sync_group_response(self, future, response): @@ -414,7 +418,7 @@ class AbstractCoordinator(object): future = Future() _f = self._client.send(node_id, request) _f.add_callback(self._handle_group_coordinator_response, future) - _f.add_errback(self._failed_request, future) + _f.add_errback(self._failed_request, node_id, request, future) return future def _handle_group_coordinator_response(self, future, response): @@ -493,7 +497,8 @@ class AbstractCoordinator(object): future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_heartbeat_response, future) - _f.add_errback(self._failed_request, future) + _f.add_errback(self._failed_request, self.coordinator_id, + request, future) return future def _handle_heartbeat_response(self, future, response): |