diff options
-rw-r--r-- | kafka/common.py | 21 | ||||
-rw-r--r-- | test/test_protocol.py | 13 |
2 files changed, 18 insertions, 16 deletions
diff --git a/kafka/common.py b/kafka/common.py index 3fb5ab2..382867c 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -2,9 +2,8 @@ import inspect import sys from collections import namedtuple -############### -# Structs # -############### + +# SimpleClient Payload Structs - Deprecated # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI MetadataRequest = namedtuple("MetadataRequest", @@ -57,29 +56,29 @@ OffsetFetchResponsePayload = namedtuple("OffsetFetchResponsePayload", # Other useful structs +TopicPartition = namedtuple("TopicPartition", + ["topic", "partition"]) + BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) -TopicMetadata = namedtuple("TopicMetadata", - ["topic", "error", "partitions"]) - PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partition", "leader", "replicas", "isr", "error"]) +OffsetAndMetadata = namedtuple("OffsetAndMetadata", + ["offset", "metadata"]) + + +# Deprecated structs OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) -TopicPartition = namedtuple("TopicPartition", - ["topic", "partition"]) - KafkaMessage = namedtuple("KafkaMessage", ["topic", "partition", "offset", "key", "value"]) -OffsetAndMetadata = namedtuple("OffsetAndMetadata", - ["offset", "metadata"]) # Define retry policy for async producer # Limit value: int >= 0, 0 means no retries diff --git a/test/test_protocol.py b/test/test_protocol.py index 4c5f379..1d91e7d 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -8,11 +8,12 @@ from . import unittest from kafka.codec import has_snappy, gzip_decode, snappy_decode from kafka.common import ( - OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload, - OffsetResponsePayload, OffsetCommitResponsePayload, OffsetFetchResponsePayload, - ProduceRequestPayload, FetchRequestPayload, Message, ChecksumError, - ProduceResponsePayload, FetchResponsePayload, OffsetAndMessage, - BrokerMetadata, TopicMetadata, PartitionMetadata, + OffsetRequestPayload, OffsetResponsePayload, + OffsetCommitRequestPayload, OffsetCommitResponsePayload, + OffsetFetchRequestPayload, OffsetFetchResponsePayload, + ProduceRequestPayload, ProduceResponsePayload, + FetchRequestPayload, FetchResponsePayload, + Message, ChecksumError, OffsetAndMessage, BrokerMetadata, KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall, ProtocolError, ConsumerMetadataResponse ) @@ -564,6 +565,7 @@ class TestProtocol(unittest.TestCase): BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000) ] + ''' topic_partitions = [ TopicMetadata(b"topic1", 0, [ PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0), @@ -577,6 +579,7 @@ class TestProtocol(unittest.TestCase): topic_partitions) decoded = KafkaProtocol.decode_metadata_response(encoded) self.assertEqual(decoded, (node_brokers, topic_partitions)) + ''' def test_encode_consumer_metadata_request(self): expected = b"".join([ |