diff options
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r-- | kafka/coordinator/consumer.py | 102 |
1 files changed, 63 insertions, 39 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index ab30883..9438a7e 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -84,6 +84,8 @@ class ConsumerCoordinator(BaseCoordinator): self.config[key] = configs[key] self._subscription = subscription + self._is_leader = False + self._joined_subscription = set() self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster) self._assignment_snapshot = None self._cluster = client.cluster @@ -132,11 +134,22 @@ class ConsumerCoordinator(BaseCoordinator): def group_protocols(self): """Returns list of preferred (protocols, metadata)""" - topics = self._subscription.subscription - assert topics is not None, 'Consumer has not subscribed to topics' + if self._subscription.subscription is None: + raise Errors.IllegalStateError('Consumer has not subscribed to topics') + # dpkp note: I really dislike this. + # why? because we are using this strange method group_protocols, + # which is seemingly innocuous, to set internal state (_joined_subscription) + # that is later used to check whether metadata has changed since we joined a group + # but there is no guarantee that this method, group_protocols, will get called + # in the correct sequence or that it will only be called when we want it to be. + # So this really should be moved elsewhere, but I don't have the energy to + # work that out right now. If you read this at some later date after the mutable + # state has bitten you... I'm sorry! It mimics the java client, and that's the + # best I've got for now. + self._joined_subscription = set(self._subscription.subscription) metadata_list = [] for assignor in self.config['assignors']: - metadata = assignor.metadata(topics) + metadata = assignor.metadata(self._joined_subscription) group_protocol = (assignor.name, metadata) metadata_list.append(group_protocol) return metadata_list @@ -158,21 +171,29 @@ class ConsumerCoordinator(BaseCoordinator): # check if there are any changes to the metadata which should trigger # a rebalance - if self._subscription_metadata_changed(cluster): - - if (self.config['api_version'] >= (0, 9) - and self.config['group_id'] is not None): - - self._subscription.mark_for_reassignment() - - # If we haven't got group coordinator support, - # just assign all partitions locally - else: - self._subscription.assign_from_subscribed([ - TopicPartition(topic, partition) - for topic in self._subscription.subscription - for partition in self._metadata_snapshot[topic] - ]) + if self._subscription.partitions_auto_assigned(): + metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster) + if self._metadata_snapshot != metadata_snapshot: + self._metadata_snapshot = metadata_snapshot + + # If we haven't got group coordinator support, + # just assign all partitions locally + if self._auto_assign_all_partitions(): + self._subscription.assign_from_subscribed([ + TopicPartition(topic, partition) + for topic in self._subscription.subscription + for partition in self._metadata_snapshot[topic] + ]) + + def _auto_assign_all_partitions(self): + # For users that use "subscribe" without group support, + # we will simply assign all partitions to this consumer + if self.config['api_version'] < (0, 9): + return True + elif self.config['group_id'] is None: + return True + else: + return False def _build_metadata_snapshot(self, subscription, cluster): metadata_snapshot = {} @@ -181,16 +202,6 @@ class ConsumerCoordinator(BaseCoordinator): metadata_snapshot[topic] = set(partitions) return metadata_snapshot - def _subscription_metadata_changed(self, cluster): - if not self._subscription.partitions_auto_assigned(): - return False - - metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster) - if self._metadata_snapshot != metadata_snapshot: - self._metadata_snapshot = metadata_snapshot - return True - return False - def _lookup_assignor(self, name): for assignor in self.config['assignors']: if assignor.name == name: @@ -199,12 +210,10 @@ class ConsumerCoordinator(BaseCoordinator): def _on_join_complete(self, generation, member_id, protocol, member_assignment_bytes): - # if we were the assignor, then we need to make sure that there have - # been no metadata updates since the rebalance begin. Otherwise, we - # won't rebalance again until the next metadata change - if self._assignment_snapshot is not None and self._assignment_snapshot != self._metadata_snapshot: - self._subscription.mark_for_reassignment() - return + # only the leader is responsible for monitoring for metadata changes + # (i.e. partition changes) + if not self._is_leader: + self._assignment_snapshot = None assignor = self._lookup_assignor(protocol) assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol @@ -307,6 +316,7 @@ class ConsumerCoordinator(BaseCoordinator): # keep track of the metadata used for assignment so that we can check # after rebalance completion whether anything has changed self._cluster.request_update() + self._is_leader = True self._assignment_snapshot = self._metadata_snapshot log.debug("Performing assignment for group %s using strategy %s" @@ -338,8 +348,8 @@ class ConsumerCoordinator(BaseCoordinator): " for group %s failed on_partitions_revoked", self._subscription.listener, self.group_id) - self._assignment_snapshot = None - self._subscription.mark_for_reassignment() + self._is_leader = False + self._subscription.reset_group_subscription() def need_rejoin(self): """Check whether the group should be rejoined @@ -347,9 +357,23 @@ class ConsumerCoordinator(BaseCoordinator): Returns: bool: True if consumer should rejoin group, False otherwise """ - return (self._subscription.partitions_auto_assigned() and - (super(ConsumerCoordinator, self).need_rejoin() or - self._subscription.needs_partition_assignment)) + if not self._subscription.partitions_auto_assigned(): + return False + + if self._auto_assign_all_partitions(): + return False + + # we need to rejoin if we performed the assignment and metadata has changed + if (self._assignment_snapshot is not None + and self._assignment_snapshot != self._metadata_snapshot): + return True + + # we need to join if our subscription has changed since the last join + if (self._joined_subscription is not None + and self._joined_subscription != self._subscription.subscription): + return True + + return super(ConsumerCoordinator, self).need_rejoin() def refresh_committed_offsets_if_needed(self): """Fetch committed offsets for assigned partitions.""" |