summaryrefslogtreecommitdiff
path: root/kafka/coordinator/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r--kafka/coordinator/base.py12
1 files changed, 8 insertions, 4 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 30b9c40..24412c9 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -377,19 +377,23 @@ class BaseCoordinator(object):
# before the pending rebalance has completed.
if self.join_future is None:
self.state = MemberState.REBALANCING
- self.join_future = self._send_join_group_request()
+ future = self._send_join_group_request()
+
+ self.join_future = future # this should happen before adding callbacks
# handle join completion in the callback so that the
# callback will be invoked even if the consumer is woken up
# before finishing the rebalance
- self.join_future.add_callback(self._handle_join_success)
+ future.add_callback(self._handle_join_success)
# we handle failures below after the request finishes.
# If the join completes after having been woken up, the
# exception is ignored and we will rejoin
- self.join_future.add_errback(self._handle_join_failure)
+ future.add_errback(self._handle_join_failure)
+
+ else:
+ future = self.join_future
- future = self.join_future
self._client.poll(future=future)
if future.failed():