summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-08-13 11:47:33 -0700
committerGitHub <noreply@github.com>2017-08-13 11:47:33 -0700
commit4b32b2e733294f0ee2447ca239e5cc9e2fef2fe4 (patch)
treeb9403fabb66311fb129046d3fdfa2e69cf5e2e0b
parent422189bf04bd5cd8c76e8cbf9d48fd19a78e9ba9 (diff)
downloadkafka-python-4b32b2e733294f0ee2447ca239e5cc9e2fef2fe4.tar.gz
Initialize metadata_snapshot in group coordinator (#1174)
-rw-r--r--kafka/coordinator/consumer.py15
1 files changed, 9 insertions, 6 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 71a93ec..123699f 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -87,7 +87,7 @@ class ConsumerCoordinator(BaseCoordinator):
assert self.config['assignors'], 'Coordinator requires assignors'
self._subscription = subscription
- self._metadata_snapshot = {}
+ self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster)
self._assignment_snapshot = None
self._cluster = client.cluster
self._cluster.request_update()
@@ -162,15 +162,18 @@ class ConsumerCoordinator(BaseCoordinator):
for partition in self._metadata_snapshot[topic]
])
- def _subscription_metadata_changed(self, cluster):
- if not self._subscription.partitions_auto_assigned():
- return False
-
+ def _build_metadata_snapshot(self, subscription, cluster):
metadata_snapshot = {}
- for topic in self._subscription.group_subscription():
+ for topic in subscription.group_subscription():
partitions = cluster.partitions_for_topic(topic) or []
metadata_snapshot[topic] = set(partitions)
+ return metadata_snapshot
+
+ def _subscription_metadata_changed(self, cluster):
+ if not self._subscription.partitions_auto_assigned():
+ return False
+ metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster)
if self._metadata_snapshot != metadata_snapshot:
self._metadata_snapshot = metadata_snapshot
return True