diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-08-13 11:47:33 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-08-13 11:47:33 -0700 |
commit | 4b32b2e733294f0ee2447ca239e5cc9e2fef2fe4 (patch) | |
tree | b9403fabb66311fb129046d3fdfa2e69cf5e2e0b | |
parent | 422189bf04bd5cd8c76e8cbf9d48fd19a78e9ba9 (diff) | |
download | kafka-python-4b32b2e733294f0ee2447ca239e5cc9e2fef2fe4.tar.gz |
Initialize metadata_snapshot in group coordinator (#1174)
-rw-r--r-- | kafka/coordinator/consumer.py | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 71a93ec..123699f 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -87,7 +87,7 @@ class ConsumerCoordinator(BaseCoordinator): assert self.config['assignors'], 'Coordinator requires assignors' self._subscription = subscription - self._metadata_snapshot = {} + self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster) self._assignment_snapshot = None self._cluster = client.cluster self._cluster.request_update() @@ -162,15 +162,18 @@ class ConsumerCoordinator(BaseCoordinator): for partition in self._metadata_snapshot[topic] ]) - def _subscription_metadata_changed(self, cluster): - if not self._subscription.partitions_auto_assigned(): - return False - + def _build_metadata_snapshot(self, subscription, cluster): metadata_snapshot = {} - for topic in self._subscription.group_subscription(): + for topic in subscription.group_subscription(): partitions = cluster.partitions_for_topic(topic) or [] metadata_snapshot[topic] = set(partitions) + return metadata_snapshot + + def _subscription_metadata_changed(self, cluster): + if not self._subscription.partitions_auto_assigned(): + return False + metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster) if self._metadata_snapshot != metadata_snapshot: self._metadata_snapshot = metadata_snapshot return True |