summaryrefslogtreecommitdiff
path: root/test/test_protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r--test/test_protocol.py73
1 files changed, 33 insertions, 40 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 11d4687..a028be9 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -10,9 +10,10 @@ from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProduceRequest, FetchRequest, Message, ChecksumError,
- ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage,
- BrokerMetadata, PartitionMetadata, ProtocolError,
- UnsupportedCodecError
+ ProduceResponse, FetchResponse, OffsetAndMessage,
+ BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,
+ KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
+ ProtocolError
)
from kafka.protocol import (
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
@@ -451,21 +452,22 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(encoded, expected)
- def _create_encoded_metadata_response(self, broker_data, topic_data,
- topic_errors, partition_errors):
- encoded = struct.pack('>ii', 3, len(broker_data))
- for node_id, broker in six.iteritems(broker_data):
- encoded += struct.pack('>ih%dsi' % len(broker.host), node_id,
+ def _create_encoded_metadata_response(self, brokers, topics):
+ encoded = struct.pack('>ii', 3, len(brokers))
+ for broker in brokers:
+ encoded += struct.pack('>ih%dsi' % len(broker.host), broker.nodeId,
len(broker.host), broker.host, broker.port)
- encoded += struct.pack('>i', len(topic_data))
- for topic, partitions in six.iteritems(topic_data):
- encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic],
- len(topic), topic, len(partitions))
- for partition, metadata in six.iteritems(partitions):
+ encoded += struct.pack('>i', len(topics))
+ for topic in topics:
+ encoded += struct.pack('>hh%dsi' % len(topic.topic),
+ topic.error, len(topic.topic),
+ topic.topic, len(topic.partitions))
+ for metadata in topic.partitions:
encoded += struct.pack('>hiii',
- partition_errors[(topic, partition)],
- partition, metadata.leader,
+ metadata.error,
+ metadata.partition,
+ metadata.leader,
len(metadata.replicas))
if len(metadata.replicas) > 0:
encoded += struct.pack('>%di' % len(metadata.replicas),
@@ -475,35 +477,26 @@ class TestProtocol(unittest.TestCase):
if len(metadata.isr) > 0:
encoded += struct.pack('>%di' % len(metadata.isr),
*metadata.isr)
-
return encoded
def test_decode_metadata_response(self):
- node_brokers = {
- 0: BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000),
- 1: BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001),
- 3: BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000)
- }
-
- topic_partitions = {
- b"topic1": {
- 0: PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,)),
- 1: PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1))
- },
- b"topic2": {
- 0: PartitionMetadata(b"topic2", 0, 0, (), ())
- }
- }
- topic_errors = {b"topic1": 0, b"topic2": 1}
- partition_errors = {
- (b"topic1", 0): 0,
- (b"topic1", 1): 1,
- (b"topic2", 0): 0
- }
+ node_brokers = [
+ BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000),
+ BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001),
+ BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000)
+ ]
+
+ topic_partitions = [
+ TopicMetadata(b"topic1", 0, [
+ PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0),
+ PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1), 1)
+ ]),
+ TopicMetadata(b"topic2", 1, [
+ PartitionMetadata(b"topic2", 0, 0, (), (), 0),
+ ]),
+ ]
encoded = self._create_encoded_metadata_response(node_brokers,
- topic_partitions,
- topic_errors,
- partition_errors)
+ topic_partitions)
decoded = KafkaProtocol.decode_metadata_response(encoded)
self.assertEqual(decoded, (node_brokers, topic_partitions))