diff options
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/fetcher.py | 6 | ||||
-rw-r--r-- | kafka/consumer/group.py | 10 | ||||
-rw-r--r-- | kafka/consumer/subscription_state.py | 30 |
3 files changed, 24 insertions, 22 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index afb8f52..f9fcb37 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -326,9 +326,6 @@ class Fetcher(six.Iterator): max_records = self.config['max_poll_records'] assert max_records > 0 - if self._subscriptions.needs_partition_assignment: - return {}, False - drained = collections.defaultdict(list) records_remaining = max_records @@ -397,9 +394,6 @@ class Fetcher(six.Iterator): def _message_generator(self): """Iterate over fetched_records""" - if self._subscriptions.needs_partition_assignment: - raise StopIteration('Subscription needs partition assignment') - while self._next_partition_records or self._completed_fetches: if not self._next_partition_records: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 0224d16..1c1f1e8 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -644,6 +644,11 @@ class KafkaConsumer(six.Iterator): timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll()) self._client.poll(timeout_ms=timeout_ms) + # after the long poll, we should check whether the group needs to rebalance + # prior to returning data so that the group can stabilize faster + if self._coordinator.need_rejoin(): + return {} + records, _ = self._fetcher.fetched_records(max_records) return records @@ -1055,6 +1060,11 @@ class KafkaConsumer(six.Iterator): poll_ms = 0 self._client.poll(timeout_ms=poll_ms) + # after the long poll, we should check whether the group needs to rebalance + # prior to returning data so that the group can stabilize faster + if self._coordinator.need_rejoin(): + continue + # We need to make sure we at least keep up with scheduled tasks, # like heartbeats, auto-commits, and metadata refreshes timeout_at = self._next_timeout() diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 3d4dfef..10d722e 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -68,7 +68,6 @@ class SubscriptionState(object): self._group_subscription = set() self._user_assignment = set() self.assignment = dict() - self.needs_partition_assignment = False self.listener = None # initialize to true for the consumers to fetch offset upon starting up @@ -172,7 +171,6 @@ class SubscriptionState(object): log.info('Updating subscribed topics to: %s', topics) self.subscription = set(topics) self._group_subscription.update(topics) - self.needs_partition_assignment = True # Remove any assigned partitions which are no longer subscribed to for tp in set(self.assignment.keys()): @@ -192,12 +190,12 @@ class SubscriptionState(object): raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) self._group_subscription.update(topics) - def mark_for_reassignment(self): + def reset_group_subscription(self): + """Reset the group's subscription to only contain topics subscribed by this consumer.""" if self._user_assignment: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) assert self.subscription is not None, 'Subscription required' self._group_subscription.intersection_update(self.subscription) - self.needs_partition_assignment = True def assign_from_user(self, partitions): """Manually assign a list of TopicPartitions to this consumer. @@ -220,18 +218,17 @@ class SubscriptionState(object): if self.subscription is not None: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) - self._user_assignment.clear() - self._user_assignment.update(partitions) + if self._user_assignment != set(partitions): + self._user_assignment = set(partitions) - for partition in partitions: - if partition not in self.assignment: - self._add_assigned_partition(partition) + for partition in partitions: + if partition not in self.assignment: + self._add_assigned_partition(partition) - for tp in set(self.assignment.keys()) - self._user_assignment: - del self.assignment[tp] + for tp in set(self.assignment.keys()) - self._user_assignment: + del self.assignment[tp] - self.needs_partition_assignment = False - self.needs_fetch_committed_offsets = True + self.needs_fetch_committed_offsets = True def assign_from_subscribed(self, assignments): """Update the assignment to the specified partitions @@ -245,16 +242,18 @@ class SubscriptionState(object): assignments (list of TopicPartition): partitions to assign to this consumer instance. """ - if self.subscription is None: + if not self.partitions_auto_assigned(): raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) for tp in assignments: if tp.topic not in self.subscription: raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp)) + + # after rebalancing, we always reinitialize the assignment state self.assignment.clear() for tp in assignments: self._add_assigned_partition(tp) - self.needs_partition_assignment = False + self.needs_fetch_committed_offsets = True log.info("Updated partition assignment: %s", assignments) def unsubscribe(self): @@ -262,7 +261,6 @@ class SubscriptionState(object): self.subscription = None self._user_assignment.clear() self.assignment.clear() - self.needs_partition_assignment = True self.subscribed_pattern = None def group_subscription(self): |