summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py102
1 files changed, 63 insertions, 39 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index ab30883..9438a7e 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -84,6 +84,8 @@ class ConsumerCoordinator(BaseCoordinator):
self.config[key] = configs[key]
self._subscription = subscription
+ self._is_leader = False
+ self._joined_subscription = set()
self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster)
self._assignment_snapshot = None
self._cluster = client.cluster
@@ -132,11 +134,22 @@ class ConsumerCoordinator(BaseCoordinator):
def group_protocols(self):
"""Returns list of preferred (protocols, metadata)"""
- topics = self._subscription.subscription
- assert topics is not None, 'Consumer has not subscribed to topics'
+ if self._subscription.subscription is None:
+ raise Errors.IllegalStateError('Consumer has not subscribed to topics')
+ # dpkp note: I really dislike this.
+ # why? because we are using this strange method group_protocols,
+ # which is seemingly innocuous, to set internal state (_joined_subscription)
+ # that is later used to check whether metadata has changed since we joined a group
+ # but there is no guarantee that this method, group_protocols, will get called
+ # in the correct sequence or that it will only be called when we want it to be.
+ # So this really should be moved elsewhere, but I don't have the energy to
+ # work that out right now. If you read this at some later date after the mutable
+ # state has bitten you... I'm sorry! It mimics the java client, and that's the
+ # best I've got for now.
+ self._joined_subscription = set(self._subscription.subscription)
metadata_list = []
for assignor in self.config['assignors']:
- metadata = assignor.metadata(topics)
+ metadata = assignor.metadata(self._joined_subscription)
group_protocol = (assignor.name, metadata)
metadata_list.append(group_protocol)
return metadata_list
@@ -158,21 +171,29 @@ class ConsumerCoordinator(BaseCoordinator):
# check if there are any changes to the metadata which should trigger
# a rebalance
- if self._subscription_metadata_changed(cluster):
-
- if (self.config['api_version'] >= (0, 9)
- and self.config['group_id'] is not None):
-
- self._subscription.mark_for_reassignment()
-
- # If we haven't got group coordinator support,
- # just assign all partitions locally
- else:
- self._subscription.assign_from_subscribed([
- TopicPartition(topic, partition)
- for topic in self._subscription.subscription
- for partition in self._metadata_snapshot[topic]
- ])
+ if self._subscription.partitions_auto_assigned():
+ metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster)
+ if self._metadata_snapshot != metadata_snapshot:
+ self._metadata_snapshot = metadata_snapshot
+
+ # If we haven't got group coordinator support,
+ # just assign all partitions locally
+ if self._auto_assign_all_partitions():
+ self._subscription.assign_from_subscribed([
+ TopicPartition(topic, partition)
+ for topic in self._subscription.subscription
+ for partition in self._metadata_snapshot[topic]
+ ])
+
+ def _auto_assign_all_partitions(self):
+ # For users that use "subscribe" without group support,
+ # we will simply assign all partitions to this consumer
+ if self.config['api_version'] < (0, 9):
+ return True
+ elif self.config['group_id'] is None:
+ return True
+ else:
+ return False
def _build_metadata_snapshot(self, subscription, cluster):
metadata_snapshot = {}
@@ -181,16 +202,6 @@ class ConsumerCoordinator(BaseCoordinator):
metadata_snapshot[topic] = set(partitions)
return metadata_snapshot
- def _subscription_metadata_changed(self, cluster):
- if not self._subscription.partitions_auto_assigned():
- return False
-
- metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster)
- if self._metadata_snapshot != metadata_snapshot:
- self._metadata_snapshot = metadata_snapshot
- return True
- return False
-
def _lookup_assignor(self, name):
for assignor in self.config['assignors']:
if assignor.name == name:
@@ -199,12 +210,10 @@ class ConsumerCoordinator(BaseCoordinator):
def _on_join_complete(self, generation, member_id, protocol,
member_assignment_bytes):
- # if we were the assignor, then we need to make sure that there have
- # been no metadata updates since the rebalance begin. Otherwise, we
- # won't rebalance again until the next metadata change
- if self._assignment_snapshot is not None and self._assignment_snapshot != self._metadata_snapshot:
- self._subscription.mark_for_reassignment()
- return
+ # only the leader is responsible for monitoring for metadata changes
+ # (i.e. partition changes)
+ if not self._is_leader:
+ self._assignment_snapshot = None
assignor = self._lookup_assignor(protocol)
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol
@@ -307,6 +316,7 @@ class ConsumerCoordinator(BaseCoordinator):
# keep track of the metadata used for assignment so that we can check
# after rebalance completion whether anything has changed
self._cluster.request_update()
+ self._is_leader = True
self._assignment_snapshot = self._metadata_snapshot
log.debug("Performing assignment for group %s using strategy %s"
@@ -338,8 +348,8 @@ class ConsumerCoordinator(BaseCoordinator):
" for group %s failed on_partitions_revoked",
self._subscription.listener, self.group_id)
- self._assignment_snapshot = None
- self._subscription.mark_for_reassignment()
+ self._is_leader = False
+ self._subscription.reset_group_subscription()
def need_rejoin(self):
"""Check whether the group should be rejoined
@@ -347,9 +357,23 @@ class ConsumerCoordinator(BaseCoordinator):
Returns:
bool: True if consumer should rejoin group, False otherwise
"""
- return (self._subscription.partitions_auto_assigned() and
- (super(ConsumerCoordinator, self).need_rejoin() or
- self._subscription.needs_partition_assignment))
+ if not self._subscription.partitions_auto_assigned():
+ return False
+
+ if self._auto_assign_all_partitions():
+ return False
+
+ # we need to rejoin if we performed the assignment and metadata has changed
+ if (self._assignment_snapshot is not None
+ and self._assignment_snapshot != self._metadata_snapshot):
+ return True
+
+ # we need to join if our subscription has changed since the last join
+ if (self._joined_subscription is not None
+ and self._joined_subscription != self._subscription.subscription):
+ return True
+
+ return super(ConsumerCoordinator, self).need_rejoin()
def refresh_committed_offsets_if_needed(self):
"""Fetch committed offsets for assigned partitions."""