summaryrefslogtreecommitdiff
path: root/kafka/coordinator/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r--kafka/coordinator/base.py24
1 files changed, 13 insertions, 11 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 301c06d..820fc1f 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -344,23 +344,25 @@ class BaseCoordinator(object):
def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
with self._lock:
- if not self.need_rejoin():
- return
-
- # call on_join_prepare if needed. We set a flag to make sure that
- # we do not call it a second time if the client is woken up before
- # a pending rebalance completes.
- if not self.rejoining:
- self._on_join_prepare(self._generation.generation_id,
- self._generation.member_id)
- self.rejoining = True
-
if self._heartbeat_thread is None:
self._start_heartbeat_thread()
while self.need_rejoin():
self.ensure_coordinator_ready()
+ # call on_join_prepare if needed. We set a flag
+ # to make sure that we do not call it a second
+ # time if the client is woken up before a pending
+ # rebalance completes. This must be called on each
+ # iteration of the loop because an event requiring
+ # a rebalance (such as a metadata refresh which
+ # changes the matched subscription set) can occur
+ # while another rebalance is still in progress.
+ if not self.rejoining:
+ self._on_join_prepare(self._generation.generation_id,
+ self._generation.member_id)
+ self.rejoining = True
+
# ensure that there are no pending requests to the coordinator.
# This is important in particular to avoid resending a pending
# JoinGroup request.