diff options
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r-- | kafka/coordinator/base.py | 24 |
1 files changed, 13 insertions, 11 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 301c06d..820fc1f 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -344,23 +344,25 @@ class BaseCoordinator(object): def ensure_active_group(self): """Ensure that the group is active (i.e. joined and synced)""" with self._lock: - if not self.need_rejoin(): - return - - # call on_join_prepare if needed. We set a flag to make sure that - # we do not call it a second time if the client is woken up before - # a pending rebalance completes. - if not self.rejoining: - self._on_join_prepare(self._generation.generation_id, - self._generation.member_id) - self.rejoining = True - if self._heartbeat_thread is None: self._start_heartbeat_thread() while self.need_rejoin(): self.ensure_coordinator_ready() + # call on_join_prepare if needed. We set a flag + # to make sure that we do not call it a second + # time if the client is woken up before a pending + # rebalance completes. This must be called on each + # iteration of the loop because an event requiring + # a rebalance (such as a metadata refresh which + # changes the matched subscription set) can occur + # while another rebalance is still in progress. + if not self.rejoining: + self._on_join_prepare(self._generation.generation_id, + self._generation.member_id) + self.rejoining = True + # ensure that there are no pending requests to the coordinator. # This is important in particular to avoid resending a pending # JoinGroup request. |