diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/coordinator/base.py | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 53b3e1d..a3055da 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -228,11 +228,17 @@ class BaseCoordinator(object): self._find_coordinator_future = None def lookup_coordinator(self): - if self._find_coordinator_future is None: - self._find_coordinator_future = self._send_group_coordinator_request() - - self._find_coordinator_future.add_both(self._reset_find_coordinator_future) - return self._find_coordinator_future + if self._find_coordinator_future is not None: + return self._find_coordinator_future + + # If there is an error sending the group coordinator request + # then _reset_find_coordinator_future will immediately fire and + # set _find_coordinator_future = None + # To avoid returning None, we capture the future in a local variable + self._find_coordinator_future = self._send_group_coordinator_request() + future = self._find_coordinator_future + self._find_coordinator_future.add_both(self._reset_find_coordinator_future) + return future def need_rejoin(self): """Check whether the group should be rejoined (e.g. if metadata changes) |