diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/coordinator/base.py | 34 |
1 files changed, 23 insertions, 11 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 8ce9a24..1435183 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -331,18 +331,13 @@ class BaseCoordinator(object): with self._lock: log.info("Successfully joined group %s with generation %s", self.group_id, self._generation.generation_id) - self.join_future = None self.state = MemberState.STABLE - self.rejoining = False - self._heartbeat_thread.enable() - self._on_join_complete(self._generation.generation_id, - self._generation.member_id, - self._generation.protocol, - member_assignment_bytes) + self.rejoin_needed = False + if self._heartbeat_thread: + self._heartbeat_thread.enable() def _handle_join_failure(self, _): with self._lock: - self.join_future = None self.state = MemberState.UNJOINED def ensure_active_group(self): @@ -351,7 +346,7 @@ class BaseCoordinator(object): if self._heartbeat_thread is None: self._start_heartbeat_thread() - while self.need_rejoin(): + while self.need_rejoin() or self._rejoin_incomplete(): self.ensure_coordinator_ready() # call on_join_prepare if needed. We set a flag @@ -382,6 +377,12 @@ class BaseCoordinator(object): # This ensures that we do not mistakenly attempt to rejoin # before the pending rebalance has completed. if self.join_future is None: + # Fence off the heartbeat thread explicitly so that it cannot + # interfere with the join group. Note that this must come after + # the call to _on_join_prepare since we must be able to continue + # sending heartbeats if that callback takes some time. + self._heartbeat_thread.disable() + self.state = MemberState.REBALANCING future = self._send_join_group_request() @@ -402,7 +403,16 @@ class BaseCoordinator(object): self._client.poll(future=future) - if future.failed(): + if future.succeeded(): + self._on_join_complete(self._generation.generation_id, + self._generation.member_id, + self._generation.protocol, + future.value) + self.join_future = None + self.rejoining = False + + else: + self.join_future = None exception = future.exception if isinstance(exception, (Errors.UnknownMemberIdError, Errors.RebalanceInProgressError, @@ -412,6 +422,9 @@ class BaseCoordinator(object): raise exception # pylint: disable-msg=raising-bad-type time.sleep(self.config['retry_backoff_ms'] / 1000) + def _rejoin_incomplete(self): + return self.join_future is not None + def _send_join_group_request(self): """Join the group and return the assignment for the next generation. @@ -497,7 +510,6 @@ class BaseCoordinator(object): self._generation = Generation(response.generation_id, response.member_id, response.group_protocol) - self.rejoin_needed = False if response.leader_id == response.member_id: log.info("Elected group leader -- performing partition" |