summaryrefslogtreecommitdiff
path: root/test/test_protocol.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-01 01:48:18 -0700
committerDana Powers <dana.powers@rd.io>2014-09-01 18:02:41 -0700
commit18ac14860791db2382c3e62715f11a6f657f265a (patch)
tree0616db85b4c8ca4bb3c9f7fb8d4c6a7ad9b63dcc /test/test_protocol.py
parenteddd1436c226545237aa057c35719950702466ed (diff)
downloadkafka-python-18ac14860791db2382c3e62715f11a6f657f265a.tar.gz
Improve metadata protocol handling
- add MetadataRequest and MetadataResponse namedtuples - add TopicMetadata namedtuple - add error codes to Topic and Partition Metadata - add KafkaClient.send_metadata_request() method - KafkaProtocol.decode_metadata_response changed to return a MetadataResponse object so that it is consistent with server api: [broker_list, topic_list]
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r--test/test_protocol.py74
1 files changed, 33 insertions, 41 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 2089f48..a92d20e 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -11,10 +11,10 @@ from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProduceRequest, FetchRequest, Message, ChecksumError,
- ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage,
- BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError,
- ProtocolError, LeaderUnavailableError, PartitionUnavailableError,
- UnsupportedCodecError
+ ProduceResponse, FetchResponse, OffsetAndMessage,
+ BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,
+ KafkaUnavailableError, PartitionUnavailableError,
+ UnsupportedCodecError, ConsumerFetchSizeTooSmall, ProtocolError,
)
from kafka.codec import (
has_snappy, gzip_encode, gzip_decode,
@@ -454,21 +454,22 @@ class TestProtocol(unittest2.TestCase):
self.assertEqual(encoded, expected)
- def _create_encoded_metadata_response(self, broker_data, topic_data,
- topic_errors, partition_errors):
- encoded = struct.pack('>ii', 3, len(broker_data))
- for node_id, broker in broker_data.iteritems():
- encoded += struct.pack('>ih%dsi' % len(broker.host), node_id,
+ def _create_encoded_metadata_response(self, brokers, topics):
+ encoded = struct.pack('>ii', 3, len(brokers))
+ for broker in brokers:
+ encoded += struct.pack('>ih%dsi' % len(broker.host), broker.nodeId,
len(broker.host), broker.host, broker.port)
- encoded += struct.pack('>i', len(topic_data))
- for topic, partitions in topic_data.iteritems():
- encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic],
- len(topic), topic, len(partitions))
- for partition, metadata in partitions.iteritems():
+ encoded += struct.pack('>i', len(topics))
+ for topic in topics:
+ encoded += struct.pack('>hh%dsi' % len(topic.topic),
+ topic.error, len(topic.topic),
+ topic.topic, len(topic.partitions))
+ for metadata in topic.partitions:
encoded += struct.pack('>hiii',
- partition_errors[(topic, partition)],
- partition, metadata.leader,
+ metadata.error,
+ metadata.partition,
+ metadata.leader,
len(metadata.replicas))
if len(metadata.replicas) > 0:
encoded += struct.pack('>%di' % len(metadata.replicas),
@@ -478,35 +479,26 @@ class TestProtocol(unittest2.TestCase):
if len(metadata.isr) > 0:
encoded += struct.pack('>%di' % len(metadata.isr),
*metadata.isr)
-
return encoded
def test_decode_metadata_response(self):
- node_brokers = {
- 0: BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000),
- 1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001),
- 3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000)
- }
-
- topic_partitions = {
- "topic1": {
- 0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)),
- 1: PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1))
- },
- "topic2": {
- 0: PartitionMetadata("topic2", 0, 0, (), ())
- }
- }
- topic_errors = {"topic1": 0, "topic2": 1}
- partition_errors = {
- ("topic1", 0): 0,
- ("topic1", 1): 1,
- ("topic2", 0): 0
- }
+ node_brokers = [
+ BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000),
+ BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001),
+ BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000)
+ ]
+
+ topic_partitions = [
+ TopicMetadata("topic1", 0, [
+ PartitionMetadata("topic1", 0, 1, (0, 2), (2,), 0),
+ PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1), 1)
+ ]),
+ TopicMetadata("topic2", 1, [
+ PartitionMetadata("topic2", 0, 0, (), (), 0),
+ ]),
+ ]
encoded = self._create_encoded_metadata_response(node_brokers,
- topic_partitions,
- topic_errors,
- partition_errors)
+ topic_partitions)
decoded = KafkaProtocol.decode_metadata_response(encoded)
self.assertEqual(decoded, (node_brokers, topic_partitions))