diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-08 13:15:56 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-08 13:17:43 -0700 |
commit | fff812ddc80243208233f785b3f005904cf33482 (patch) | |
tree | 30f1eb703dcd8ce1063c413fd70ac11f3fff5072 /test/test_protocol.py | |
parent | 42a7ab18bb84fea60deed5f7e3a6cfdfaaaeecd6 (diff) | |
parent | 0dabb1fbe8a9f538527a03c2903475ed77a12c10 (diff) | |
download | kafka-python-fff812ddc80243208233f785b3f005904cf33482.tar.gz |
Merge pull request #223 from dpkp/metadata_refactor
Metadata Refactor
* add MetadataRequest and MetadataResponse namedtuples
* add TopicMetadata namedtuple
* add error codes to Topic and Partition Metadata
* add KafkaClient.send_metadata_request() method
* KafkaProtocol.decode_metadata_response changed to return a
MetadataResponse object so that it is consistent with server api:
[broker_list, topic_list]
* raise server exceptions in load_metadata_for_topics(*topics)
unless topics is null (full refresh)
* Replace non-standard exceptions (LeaderUnavailable,
PartitionUnavailable) with server standard exceptions
(LeaderNotAvailableError, UnknownTopicOrPartitionError)
Conflicts:
kafka/client.py
test/test_client.py
test/test_producer_integration.py
test/test_protocol.py
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r-- | test/test_protocol.py | 73 |
1 files changed, 33 insertions, 40 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py index 11d4687..a028be9 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -10,9 +10,10 @@ from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, ProduceRequest, FetchRequest, Message, ChecksumError, - ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, - BrokerMetadata, PartitionMetadata, ProtocolError, - UnsupportedCodecError + ProduceResponse, FetchResponse, OffsetAndMessage, + BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, + KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall, + ProtocolError ) from kafka.protocol import ( ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol, @@ -451,21 +452,22 @@ class TestProtocol(unittest.TestCase): self.assertEqual(encoded, expected) - def _create_encoded_metadata_response(self, broker_data, topic_data, - topic_errors, partition_errors): - encoded = struct.pack('>ii', 3, len(broker_data)) - for node_id, broker in six.iteritems(broker_data): - encoded += struct.pack('>ih%dsi' % len(broker.host), node_id, + def _create_encoded_metadata_response(self, brokers, topics): + encoded = struct.pack('>ii', 3, len(brokers)) + for broker in brokers: + encoded += struct.pack('>ih%dsi' % len(broker.host), broker.nodeId, len(broker.host), broker.host, broker.port) - encoded += struct.pack('>i', len(topic_data)) - for topic, partitions in six.iteritems(topic_data): - encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic], - len(topic), topic, len(partitions)) - for partition, metadata in six.iteritems(partitions): + encoded += struct.pack('>i', len(topics)) + for topic in topics: + encoded += struct.pack('>hh%dsi' % len(topic.topic), + topic.error, len(topic.topic), + topic.topic, len(topic.partitions)) + for metadata in topic.partitions: encoded += struct.pack('>hiii', - partition_errors[(topic, partition)], - partition, metadata.leader, + metadata.error, + metadata.partition, + metadata.leader, len(metadata.replicas)) if len(metadata.replicas) > 0: encoded += struct.pack('>%di' % len(metadata.replicas), @@ -475,35 +477,26 @@ class TestProtocol(unittest.TestCase): if len(metadata.isr) > 0: encoded += struct.pack('>%di' % len(metadata.isr), *metadata.isr) - return encoded def test_decode_metadata_response(self): - node_brokers = { - 0: BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000), - 1: BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001), - 3: BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000) - } - - topic_partitions = { - b"topic1": { - 0: PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,)), - 1: PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1)) - }, - b"topic2": { - 0: PartitionMetadata(b"topic2", 0, 0, (), ()) - } - } - topic_errors = {b"topic1": 0, b"topic2": 1} - partition_errors = { - (b"topic1", 0): 0, - (b"topic1", 1): 1, - (b"topic2", 0): 0 - } + node_brokers = [ + BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000), + BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001), + BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000) + ] + + topic_partitions = [ + TopicMetadata(b"topic1", 0, [ + PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0), + PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1), 1) + ]), + TopicMetadata(b"topic2", 1, [ + PartitionMetadata(b"topic2", 0, 0, (), (), 0), + ]), + ] encoded = self._create_encoded_metadata_response(node_brokers, - topic_partitions, - topic_errors, - partition_errors) + topic_partitions) decoded = KafkaProtocol.decode_metadata_response(encoded) self.assertEqual(decoded, (node_brokers, topic_partitions)) |