summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/coordinator/base.py16
-rw-r--r--test/test_coordinator.py8
2 files changed, 19 insertions, 5 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 53b3e1d..a3055da 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -228,11 +228,17 @@ class BaseCoordinator(object):
self._find_coordinator_future = None
def lookup_coordinator(self):
- if self._find_coordinator_future is None:
- self._find_coordinator_future = self._send_group_coordinator_request()
-
- self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
- return self._find_coordinator_future
+ if self._find_coordinator_future is not None:
+ return self._find_coordinator_future
+
+ # If there is an error sending the group coordinator request
+ # then _reset_find_coordinator_future will immediately fire and
+ # set _find_coordinator_future = None
+ # To avoid returning None, we capture the future in a local variable
+ self._find_coordinator_future = self._send_group_coordinator_request()
+ future = self._find_coordinator_future
+ self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
+ return future
def need_rejoin(self):
"""Check whether the group should be rejoined (e.g. if metadata changes)
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index aea2662..0e96110 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -590,3 +590,11 @@ def test_heartbeat(patched_coord):
patched_coord.heartbeat_task()
assert patched_coord._client.schedule.call_count == 1
assert patched_coord.heartbeat_task._handle_heartbeat_failure.call_count == 1
+
+
+def test_lookup_coordinator_failure(mocker, coordinator):
+
+ mocker.patch.object(coordinator, '_send_group_coordinator_request',
+ return_value=Future().failure(Exception('foobar')))
+ future = coordinator.lookup_coordinator()
+ assert future.failed()