diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-10-24 16:05:50 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-24 16:05:50 -0700 |
commit | d2001e4b69c2b03202a44899b687b05e735261a8 (patch) | |
tree | 5662b887c203a843d4b0484aed32fb2bb59423c1 /kafka | |
parent | 8b05ee8da50b4c7b832676f4e38f9d92a86639cc (diff) | |
download | kafka-python-d2001e4b69c2b03202a44899b687b05e735261a8.tar.gz |
Handle lookup_coordinator send failures (#1279)
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/coordinator/base.py | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 53b3e1d..a3055da 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -228,11 +228,17 @@ class BaseCoordinator(object): self._find_coordinator_future = None def lookup_coordinator(self): - if self._find_coordinator_future is None: - self._find_coordinator_future = self._send_group_coordinator_request() - - self._find_coordinator_future.add_both(self._reset_find_coordinator_future) - return self._find_coordinator_future + if self._find_coordinator_future is not None: + return self._find_coordinator_future + + # If there is an error sending the group coordinator request + # then _reset_find_coordinator_future will immediately fire and + # set _find_coordinator_future = None + # To avoid returning None, we capture the future in a local variable + self._find_coordinator_future = self._send_group_coordinator_request() + future = self._find_coordinator_future + self._find_coordinator_future.add_both(self._reset_find_coordinator_future) + return future def need_rejoin(self): """Check whether the group should be rejoined (e.g. if metadata changes) |