diff options
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r-- | test/test_protocol.py | 73 |
1 files changed, 33 insertions, 40 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py index 11d4687..a028be9 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -10,9 +10,10 @@ from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, ProduceRequest, FetchRequest, Message, ChecksumError, - ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, - BrokerMetadata, PartitionMetadata, ProtocolError, - UnsupportedCodecError + ProduceResponse, FetchResponse, OffsetAndMessage, + BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, + KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall, + ProtocolError ) from kafka.protocol import ( ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol, @@ -451,21 +452,22 @@ class TestProtocol(unittest.TestCase): self.assertEqual(encoded, expected) - def _create_encoded_metadata_response(self, broker_data, topic_data, - topic_errors, partition_errors): - encoded = struct.pack('>ii', 3, len(broker_data)) - for node_id, broker in six.iteritems(broker_data): - encoded += struct.pack('>ih%dsi' % len(broker.host), node_id, + def _create_encoded_metadata_response(self, brokers, topics): + encoded = struct.pack('>ii', 3, len(brokers)) + for broker in brokers: + encoded += struct.pack('>ih%dsi' % len(broker.host), broker.nodeId, len(broker.host), broker.host, broker.port) - encoded += struct.pack('>i', len(topic_data)) - for topic, partitions in six.iteritems(topic_data): - encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic], - len(topic), topic, len(partitions)) - for partition, metadata in six.iteritems(partitions): + encoded += struct.pack('>i', len(topics)) + for topic in topics: + encoded += struct.pack('>hh%dsi' % len(topic.topic), + topic.error, len(topic.topic), + topic.topic, len(topic.partitions)) + for metadata in topic.partitions: encoded += struct.pack('>hiii', - partition_errors[(topic, partition)], - partition, metadata.leader, + metadata.error, + metadata.partition, + metadata.leader, len(metadata.replicas)) if len(metadata.replicas) > 0: encoded += struct.pack('>%di' % len(metadata.replicas), @@ -475,35 +477,26 @@ class TestProtocol(unittest.TestCase): if len(metadata.isr) > 0: encoded += struct.pack('>%di' % len(metadata.isr), *metadata.isr) - return encoded def test_decode_metadata_response(self): - node_brokers = { - 0: BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000), - 1: BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001), - 3: BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000) - } - - topic_partitions = { - b"topic1": { - 0: PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,)), - 1: PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1)) - }, - b"topic2": { - 0: PartitionMetadata(b"topic2", 0, 0, (), ()) - } - } - topic_errors = {b"topic1": 0, b"topic2": 1} - partition_errors = { - (b"topic1", 0): 0, - (b"topic1", 1): 1, - (b"topic2", 0): 0 - } + node_brokers = [ + BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000), + BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001), + BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000) + ] + + topic_partitions = [ + TopicMetadata(b"topic1", 0, [ + PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0), + PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1), 1) + ]), + TopicMetadata(b"topic2", 1, [ + PartitionMetadata(b"topic2", 0, 0, (), (), 0), + ]), + ] encoded = self._create_encoded_metadata_response(node_brokers, - topic_partitions, - topic_errors, - partition_errors) + topic_partitions) decoded = KafkaProtocol.decode_metadata_response(encoded) self.assertEqual(decoded, (node_brokers, topic_partitions)) |