diff options
-rw-r--r-- | kafka/coordinator/base.py | 16 | ||||
-rw-r--r-- | test/test_coordinator.py | 8 |
2 files changed, 19 insertions, 5 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 53b3e1d..a3055da 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -228,11 +228,17 @@ class BaseCoordinator(object): self._find_coordinator_future = None def lookup_coordinator(self): - if self._find_coordinator_future is None: - self._find_coordinator_future = self._send_group_coordinator_request() - - self._find_coordinator_future.add_both(self._reset_find_coordinator_future) - return self._find_coordinator_future + if self._find_coordinator_future is not None: + return self._find_coordinator_future + + # If there is an error sending the group coordinator request + # then _reset_find_coordinator_future will immediately fire and + # set _find_coordinator_future = None + # To avoid returning None, we capture the future in a local variable + self._find_coordinator_future = self._send_group_coordinator_request() + future = self._find_coordinator_future + self._find_coordinator_future.add_both(self._reset_find_coordinator_future) + return future def need_rejoin(self): """Check whether the group should be rejoined (e.g. if metadata changes) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index aea2662..0e96110 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -590,3 +590,11 @@ def test_heartbeat(patched_coord): patched_coord.heartbeat_task() assert patched_coord._client.schedule.call_count == 1 assert patched_coord.heartbeat_task._handle_heartbeat_failure.call_count == 1 + + +def test_lookup_coordinator_failure(mocker, coordinator): + + mocker.patch.object(coordinator, '_send_group_coordinator_request', + return_value=Future().failure(Exception('foobar'))) + future = coordinator.lookup_coordinator() + assert future.failed() |