summaryrefslogtreecommitdiff
path: root/kafka/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r--kafka/cluster.py19
1 files changed, 11 insertions, 8 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 9aabec1..c3b8f3c 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -189,7 +189,7 @@ class ClusterMetadata(object):
for node_id, host, port in metadata.brokers:
self._brokers.update({
- node_id: BrokerMetadata(node_id, host, port)
+ node_id: BrokerMetadata(node_id, host, port, None)
})
_new_partitions = {}
@@ -272,7 +272,8 @@ class ClusterMetadata(object):
coordinator = BrokerMetadata(
response.coordinator_id,
response.host,
- response.port)
+ response.port,
+ None)
# Assume that group coordinators are just brokers
# (this is true now, but could diverge in future)
@@ -281,12 +282,14 @@ class ClusterMetadata(object):
# If this happens, either brokers have moved without
# changing IDs, or our assumption above is wrong
- elif coordinator != self._brokers[node_id]:
- log.error("GroupCoordinator metadata conflicts with existing"
- " broker metadata. Coordinator: %s, Broker: %s",
- coordinator, self._brokers[node_id])
- self._groups[group] = node_id
- return False
+ else:
+ node = self._brokers[node_id]
+ if coordinator.host != node.host or coordinator.port != node.port:
+ log.error("GroupCoordinator metadata conflicts with existing"
+ " broker metadata. Coordinator: %s, Broker: %s",
+ coordinator, node)
+ self._groups[group] = node_id
+ return False
log.info("Group coordinator for %s is %s", group, coordinator)
self._groups[group] = node_id