summaryrefslogtreecommitdiff
path: root/kafka/cluster.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-28 13:08:23 -0800
committerDana Powers <dana.powers@rd.io>2015-12-28 13:09:59 -0800
commit7c76138640a9ab1390211abc0fb0d5d604c15621 (patch)
tree04590c4fbc514a8fc99b7a8df58f6aac9624868c /kafka/cluster.py
parentcda2e17cd115f76f4992a34bab2b684ed08d4fef (diff)
downloadkafka-python-7c76138640a9ab1390211abc0fb0d5d604c15621.tar.gz
Add ClusterMetadata.add_group_coordinator()
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r--kafka/cluster.py39
1 files changed, 39 insertions, 0 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 2e9e117..5b5fd8e 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -141,6 +141,45 @@ class ClusterMetadata(object):
"""Remove a previously added listener callback"""
self._listeners.remove(listener)
+ def add_group_coordinator(self, group, response):
+ """Update with metadata for a group coordinator
+
+ group: name of group from GroupCoordinatorRequest
+ response: GroupCoordinatorResponse
+
+ returns True if metadata is updated, False on error
+ """
+ log.debug("Updating coordinator for %s: %s", group, response)
+ error_type = Errors.for_code(response.error_code)
+ if error_type is not Errors.NoError:
+ log.error("GroupCoordinatorResponse error: %s", error_type)
+ self._groups[group] = -1
+ return False
+
+ node_id = response.coordinator_id
+ coordinator = BrokerMetadata(
+ response.coordinator_id,
+ response.host,
+ response.port)
+
+ # Assume that group coordinators are just brokers
+ # (this is true now, but could diverge in future)
+ if node_id not in self._brokers:
+ self._brokers[node_id] = coordinator
+
+ # If this happens, either brokers have moved without
+ # changing IDs, or our assumption above is wrong
+ elif coordinator != self._brokers[node_id]:
+ log.error("GroupCoordinator metadata conflicts with existing"
+ " broker metadata. Coordinator: %s, Broker: %s",
+ coordinator, self._brokers[node_id])
+ self._groups[group] = node_id
+ return False
+
+ log.info("Group coordinator for %s is %s", group, coordinator)
+ self._groups[group] = node_id
+ return True
+
def __str__(self):
return 'Cluster(brokers: %d, topics: %d, groups: %d)' % \
(len(self._brokers), len(self._partitions), len(self._groups))