summaryrefslogtreecommitdiff
path: root/kafka/consumer
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer')
-rw-r--r--kafka/consumer/fetcher.py6
-rw-r--r--kafka/consumer/group.py10
-rw-r--r--kafka/consumer/subscription_state.py30
3 files changed, 24 insertions, 22 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index afb8f52..f9fcb37 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -326,9 +326,6 @@ class Fetcher(six.Iterator):
max_records = self.config['max_poll_records']
assert max_records > 0
- if self._subscriptions.needs_partition_assignment:
- return {}, False
-
drained = collections.defaultdict(list)
records_remaining = max_records
@@ -397,9 +394,6 @@ class Fetcher(six.Iterator):
def _message_generator(self):
"""Iterate over fetched_records"""
- if self._subscriptions.needs_partition_assignment:
- raise StopIteration('Subscription needs partition assignment')
-
while self._next_partition_records or self._completed_fetches:
if not self._next_partition_records:
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 0224d16..1c1f1e8 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -644,6 +644,11 @@ class KafkaConsumer(six.Iterator):
timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll())
self._client.poll(timeout_ms=timeout_ms)
+ # after the long poll, we should check whether the group needs to rebalance
+ # prior to returning data so that the group can stabilize faster
+ if self._coordinator.need_rejoin():
+ return {}
+
records, _ = self._fetcher.fetched_records(max_records)
return records
@@ -1055,6 +1060,11 @@ class KafkaConsumer(six.Iterator):
poll_ms = 0
self._client.poll(timeout_ms=poll_ms)
+ # after the long poll, we should check whether the group needs to rebalance
+ # prior to returning data so that the group can stabilize faster
+ if self._coordinator.need_rejoin():
+ continue
+
# We need to make sure we at least keep up with scheduled tasks,
# like heartbeats, auto-commits, and metadata refreshes
timeout_at = self._next_timeout()
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 3d4dfef..10d722e 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -68,7 +68,6 @@ class SubscriptionState(object):
self._group_subscription = set()
self._user_assignment = set()
self.assignment = dict()
- self.needs_partition_assignment = False
self.listener = None
# initialize to true for the consumers to fetch offset upon starting up
@@ -172,7 +171,6 @@ class SubscriptionState(object):
log.info('Updating subscribed topics to: %s', topics)
self.subscription = set(topics)
self._group_subscription.update(topics)
- self.needs_partition_assignment = True
# Remove any assigned partitions which are no longer subscribed to
for tp in set(self.assignment.keys()):
@@ -192,12 +190,12 @@ class SubscriptionState(object):
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
self._group_subscription.update(topics)
- def mark_for_reassignment(self):
+ def reset_group_subscription(self):
+ """Reset the group's subscription to only contain topics subscribed by this consumer."""
if self._user_assignment:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
assert self.subscription is not None, 'Subscription required'
self._group_subscription.intersection_update(self.subscription)
- self.needs_partition_assignment = True
def assign_from_user(self, partitions):
"""Manually assign a list of TopicPartitions to this consumer.
@@ -220,18 +218,17 @@ class SubscriptionState(object):
if self.subscription is not None:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
- self._user_assignment.clear()
- self._user_assignment.update(partitions)
+ if self._user_assignment != set(partitions):
+ self._user_assignment = set(partitions)
- for partition in partitions:
- if partition not in self.assignment:
- self._add_assigned_partition(partition)
+ for partition in partitions:
+ if partition not in self.assignment:
+ self._add_assigned_partition(partition)
- for tp in set(self.assignment.keys()) - self._user_assignment:
- del self.assignment[tp]
+ for tp in set(self.assignment.keys()) - self._user_assignment:
+ del self.assignment[tp]
- self.needs_partition_assignment = False
- self.needs_fetch_committed_offsets = True
+ self.needs_fetch_committed_offsets = True
def assign_from_subscribed(self, assignments):
"""Update the assignment to the specified partitions
@@ -245,16 +242,18 @@ class SubscriptionState(object):
assignments (list of TopicPartition): partitions to assign to this
consumer instance.
"""
- if self.subscription is None:
+ if not self.partitions_auto_assigned():
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
for tp in assignments:
if tp.topic not in self.subscription:
raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp))
+
+ # after rebalancing, we always reinitialize the assignment state
self.assignment.clear()
for tp in assignments:
self._add_assigned_partition(tp)
- self.needs_partition_assignment = False
+ self.needs_fetch_committed_offsets = True
log.info("Updated partition assignment: %s", assignments)
def unsubscribe(self):
@@ -262,7 +261,6 @@ class SubscriptionState(object):
self.subscription = None
self._user_assignment.clear()
self.assignment.clear()
- self.needs_partition_assignment = True
self.subscribed_pattern = None
def group_subscription(self):