summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py6
-rw-r--r--kafka/cluster.py19
-rw-r--r--kafka/structs.py2
-rw-r--r--test/test_client.py61
-rw-r--r--test/test_client_async.py4
5 files changed, 52 insertions, 40 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 891ae03..8a34cc4 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -137,7 +137,7 @@ class SimpleClient(object):
kafka.errors.check_error(resp)
# Otherwise return the BrokerMetadata
- return BrokerMetadata(resp.nodeId, resp.host, resp.port)
+ return BrokerMetadata(resp.nodeId, resp.host, resp.port, None)
def _next_id(self):
"""Generate a new correlation id"""
@@ -525,7 +525,7 @@ class SimpleClient(object):
log.debug('Updating broker metadata: %s', resp.brokers)
log.debug('Updating topic metadata: %s', [topic for _, topic, _ in resp.topics])
- self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port))
+ self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port, None))
for nodeId, host, port in resp.brokers])
for error, topic, partitions in resp.topics:
@@ -577,7 +577,7 @@ class SimpleClient(object):
# (not sure how this could happen. server could be in bad state)
else:
self.topics_to_brokers[topic_part] = BrokerMetadata(
- leader, None, None
+ leader, None, None, None
)
def send_metadata_request(self, payloads=[], fail_on_error=True,
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 9aabec1..c3b8f3c 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -189,7 +189,7 @@ class ClusterMetadata(object):
for node_id, host, port in metadata.brokers:
self._brokers.update({
- node_id: BrokerMetadata(node_id, host, port)
+ node_id: BrokerMetadata(node_id, host, port, None)
})
_new_partitions = {}
@@ -272,7 +272,8 @@ class ClusterMetadata(object):
coordinator = BrokerMetadata(
response.coordinator_id,
response.host,
- response.port)
+ response.port,
+ None)
# Assume that group coordinators are just brokers
# (this is true now, but could diverge in future)
@@ -281,12 +282,14 @@ class ClusterMetadata(object):
# If this happens, either brokers have moved without
# changing IDs, or our assumption above is wrong
- elif coordinator != self._brokers[node_id]:
- log.error("GroupCoordinator metadata conflicts with existing"
- " broker metadata. Coordinator: %s, Broker: %s",
- coordinator, self._brokers[node_id])
- self._groups[group] = node_id
- return False
+ else:
+ node = self._brokers[node_id]
+ if coordinator.host != node.host or coordinator.port != node.port:
+ log.error("GroupCoordinator metadata conflicts with existing"
+ " broker metadata. Coordinator: %s, Broker: %s",
+ coordinator, node)
+ self._groups[group] = node_id
+ return False
log.info("Group coordinator for %s is %s", group, coordinator)
self._groups[group] = node_id
diff --git a/kafka/structs.py b/kafka/structs.py
index 5902930..3188516 100644
--- a/kafka/structs.py
+++ b/kafka/structs.py
@@ -58,7 +58,7 @@ TopicPartition = namedtuple("TopicPartition",
["topic", "partition"])
BrokerMetadata = namedtuple("BrokerMetadata",
- ["nodeId", "host", "port"])
+ ["nodeId", "host", "port", "rack"])
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader", "replicas", "isr", "error"])
diff --git a/test/test_client.py b/test/test_client.py
index 660af61..79ac8be 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -1,6 +1,7 @@
import socket
from mock import ANY, MagicMock, patch
+from operator import itemgetter
import six
from . import unittest
@@ -117,9 +118,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_ERROR, 'topic_1', [
@@ -137,7 +139,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_ERROR, 2, 0, [0, 1], [0, 1])
])
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
# client loads metadata at init
client = SimpleClient(hosts=['broker_1:4567'])
@@ -167,9 +169,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_LEADER, 'topic_still_creating', []),
@@ -179,7 +182,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -197,9 +200,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_LEADER, 'topic_still_creating', []),
@@ -209,7 +213,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
- decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -230,14 +234,15 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_LEADER, 'topic_no_partitions', [])
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -249,7 +254,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_ERROR, 0, 0, [0, 1], [0, 1])
])
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
# calling _get_leader_for_partition (from any broker aware request)
# will try loading metadata again for the same topic
@@ -267,15 +272,16 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_LEADER, 'topic_no_partitions', []),
(UNKNOWN_TOPIC_OR_PARTITION, 'topic_unknown', []),
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -294,9 +300,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_ERROR, 'topic_noleader', [
@@ -304,7 +311,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
self.assertDictEqual(
@@ -330,7 +337,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_ERROR, 1, 1, [1, 0], [1, 0])
]),
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0))
self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))
@@ -340,9 +347,10 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(NO_ERROR, 'topic_noleader', [
@@ -350,7 +358,7 @@ class TestSimpleClient(unittest.TestCase):
(NO_LEADER, 1, -1, [], []),
]),
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
@@ -368,14 +376,15 @@ class TestSimpleClient(unittest.TestCase):
mock_conn(conn)
brokers = [
- BrokerMetadata(0, 'broker_1', 4567),
- BrokerMetadata(1, 'broker_2', 5678)
+ BrokerMetadata(0, 'broker_1', 4567, None),
+ BrokerMetadata(1, 'broker_2', 5678, None)
]
+ resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
topics = [
(UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []),
]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](brokers, topics)
+ protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
client = SimpleClient(hosts=['broker_1:4567'])
diff --git a/test/test_client_async.py b/test/test_client_async.py
index dfe11ea..aa91704 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -53,8 +53,8 @@ def test_bootstrap_success(conn):
conn.connect.assert_called_with()
conn.send.assert_called_once_with(MetadataRequest[0]([]))
assert cli._bootstrap_fails == 0
- assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12),
- BrokerMetadata(1, 'bar', 34)])
+ assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12, None),
+ BrokerMetadata(1, 'bar', 34, None)])
def test_bootstrap_failure(conn):
conn.state = ConnectionStates.DISCONNECTED