diff options
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r-- | test/test_protocol.py | 74 |
1 files changed, 33 insertions, 41 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py index 2089f48..a92d20e 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -11,10 +11,10 @@ from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, ProduceRequest, FetchRequest, Message, ChecksumError, - ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, - BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, - ProtocolError, LeaderUnavailableError, PartitionUnavailableError, - UnsupportedCodecError + ProduceResponse, FetchResponse, OffsetAndMessage, + BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, + KafkaUnavailableError, PartitionUnavailableError, + UnsupportedCodecError, ConsumerFetchSizeTooSmall, ProtocolError, ) from kafka.codec import ( has_snappy, gzip_encode, gzip_decode, @@ -454,21 +454,22 @@ class TestProtocol(unittest2.TestCase): self.assertEqual(encoded, expected) - def _create_encoded_metadata_response(self, broker_data, topic_data, - topic_errors, partition_errors): - encoded = struct.pack('>ii', 3, len(broker_data)) - for node_id, broker in broker_data.iteritems(): - encoded += struct.pack('>ih%dsi' % len(broker.host), node_id, + def _create_encoded_metadata_response(self, brokers, topics): + encoded = struct.pack('>ii', 3, len(brokers)) + for broker in brokers: + encoded += struct.pack('>ih%dsi' % len(broker.host), broker.nodeId, len(broker.host), broker.host, broker.port) - encoded += struct.pack('>i', len(topic_data)) - for topic, partitions in topic_data.iteritems(): - encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic], - len(topic), topic, len(partitions)) - for partition, metadata in partitions.iteritems(): + encoded += struct.pack('>i', len(topics)) + for topic in topics: + encoded += struct.pack('>hh%dsi' % len(topic.topic), + topic.error, len(topic.topic), + topic.topic, len(topic.partitions)) + for metadata in topic.partitions: encoded += struct.pack('>hiii', - partition_errors[(topic, partition)], - partition, metadata.leader, + metadata.error, + metadata.partition, + metadata.leader, len(metadata.replicas)) if len(metadata.replicas) > 0: encoded += struct.pack('>%di' % len(metadata.replicas), @@ -478,35 +479,26 @@ class TestProtocol(unittest2.TestCase): if len(metadata.isr) > 0: encoded += struct.pack('>%di' % len(metadata.isr), *metadata.isr) - return encoded def test_decode_metadata_response(self): - node_brokers = { - 0: BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000), - 1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001), - 3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000) - } - - topic_partitions = { - "topic1": { - 0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)), - 1: PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1)) - }, - "topic2": { - 0: PartitionMetadata("topic2", 0, 0, (), ()) - } - } - topic_errors = {"topic1": 0, "topic2": 1} - partition_errors = { - ("topic1", 0): 0, - ("topic1", 1): 1, - ("topic2", 0): 0 - } + node_brokers = [ + BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000), + BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001), + BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000) + ] + + topic_partitions = [ + TopicMetadata("topic1", 0, [ + PartitionMetadata("topic1", 0, 1, (0, 2), (2,), 0), + PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1), 1) + ]), + TopicMetadata("topic2", 1, [ + PartitionMetadata("topic2", 0, 0, (), (), 0), + ]), + ] encoded = self._create_encoded_metadata_response(node_brokers, - topic_partitions, - topic_errors, - partition_errors) + topic_partitions) decoded = KafkaProtocol.decode_metadata_response(encoded) self.assertEqual(decoded, (node_brokers, topic_partitions)) |