diff options
-rw-r--r-- | kafka/coordinator/base.py | 12 |
1 files changed, 8 insertions, 4 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 30b9c40..24412c9 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -377,19 +377,23 @@ class BaseCoordinator(object): # before the pending rebalance has completed. if self.join_future is None: self.state = MemberState.REBALANCING - self.join_future = self._send_join_group_request() + future = self._send_join_group_request() + + self.join_future = future # this should happen before adding callbacks # handle join completion in the callback so that the # callback will be invoked even if the consumer is woken up # before finishing the rebalance - self.join_future.add_callback(self._handle_join_success) + future.add_callback(self._handle_join_success) # we handle failures below after the request finishes. # If the join completes after having been woken up, the # exception is ignored and we will rejoin - self.join_future.add_errback(self._handle_join_failure) + future.add_errback(self._handle_join_failure) + + else: + future = self.join_future - future = self.join_future self._client.poll(future=future) if future.failed(): |