diff options
author | Dana Powers <dana.powers@gmail.com> | 2018-02-02 16:36:30 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-02 16:36:30 -0800 |
commit | 618c5051493693c1305aa9f08e8a0583d5fcf0e3 (patch) | |
tree | 3a2fcec8260915a83f19a603671c4a0e5461cca0 /kafka/coordinator/base.py | |
parent | 08a7fb7b754a754c6c64e96d4ba5c4f56cf38a5f (diff) | |
download | kafka-python-618c5051493693c1305aa9f08e8a0583d5fcf0e3.tar.gz |
KAFKA-3949: Avoid race condition when subscription changes during rebalance (#1364)
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r-- | kafka/coordinator/base.py | 24 |
1 files changed, 13 insertions, 11 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 301c06d..820fc1f 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -344,23 +344,25 @@ class BaseCoordinator(object): def ensure_active_group(self): """Ensure that the group is active (i.e. joined and synced)""" with self._lock: - if not self.need_rejoin(): - return - - # call on_join_prepare if needed. We set a flag to make sure that - # we do not call it a second time if the client is woken up before - # a pending rebalance completes. - if not self.rejoining: - self._on_join_prepare(self._generation.generation_id, - self._generation.member_id) - self.rejoining = True - if self._heartbeat_thread is None: self._start_heartbeat_thread() while self.need_rejoin(): self.ensure_coordinator_ready() + # call on_join_prepare if needed. We set a flag + # to make sure that we do not call it a second + # time if the client is woken up before a pending + # rebalance completes. This must be called on each + # iteration of the loop because an event requiring + # a rebalance (such as a metadata refresh which + # changes the matched subscription set) can occur + # while another rebalance is still in progress. + if not self.rejoining: + self._on_join_prepare(self._generation.generation_id, + self._generation.member_id) + self.rejoining = True + # ensure that there are no pending requests to the coordinator. # This is important in particular to avoid resending a pending # JoinGroup request. |