diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-28 13:08:23 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-28 13:09:59 -0800 |
commit | 7c76138640a9ab1390211abc0fb0d5d604c15621 (patch) | |
tree | 04590c4fbc514a8fc99b7a8df58f6aac9624868c | |
parent | cda2e17cd115f76f4992a34bab2b684ed08d4fef (diff) | |
download | kafka-python-7c76138640a9ab1390211abc0fb0d5d604c15621.tar.gz |
Add ClusterMetadata.add_group_coordinator()
-rw-r--r-- | kafka/cluster.py | 39 |
1 files changed, 39 insertions, 0 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py index 2e9e117..5b5fd8e 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -141,6 +141,45 @@ class ClusterMetadata(object): """Remove a previously added listener callback""" self._listeners.remove(listener) + def add_group_coordinator(self, group, response): + """Update with metadata for a group coordinator + + group: name of group from GroupCoordinatorRequest + response: GroupCoordinatorResponse + + returns True if metadata is updated, False on error + """ + log.debug("Updating coordinator for %s: %s", group, response) + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + log.error("GroupCoordinatorResponse error: %s", error_type) + self._groups[group] = -1 + return False + + node_id = response.coordinator_id + coordinator = BrokerMetadata( + response.coordinator_id, + response.host, + response.port) + + # Assume that group coordinators are just brokers + # (this is true now, but could diverge in future) + if node_id not in self._brokers: + self._brokers[node_id] = coordinator + + # If this happens, either brokers have moved without + # changing IDs, or our assumption above is wrong + elif coordinator != self._brokers[node_id]: + log.error("GroupCoordinator metadata conflicts with existing" + " broker metadata. Coordinator: %s, Broker: %s", + coordinator, self._brokers[node_id]) + self._groups[group] = node_id + return False + + log.info("Group coordinator for %s is %s", group, coordinator) + self._groups[group] = node_id + return True + def __str__(self): return 'Cluster(brokers: %d, topics: %d, groups: %d)' % \ (len(self._brokers), len(self._partitions), len(self._groups)) |