summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/common.py21
-rw-r--r--test/test_protocol.py13
2 files changed, 18 insertions, 16 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 3fb5ab2..382867c 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -2,9 +2,8 @@ import inspect
import sys
from collections import namedtuple
-###############
-# Structs #
-###############
+
+# SimpleClient Payload Structs - Deprecated
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
MetadataRequest = namedtuple("MetadataRequest",
@@ -57,29 +56,29 @@ OffsetFetchResponsePayload = namedtuple("OffsetFetchResponsePayload",
# Other useful structs
+TopicPartition = namedtuple("TopicPartition",
+ ["topic", "partition"])
+
BrokerMetadata = namedtuple("BrokerMetadata",
["nodeId", "host", "port"])
-TopicMetadata = namedtuple("TopicMetadata",
- ["topic", "error", "partitions"])
-
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader", "replicas", "isr", "error"])
+OffsetAndMetadata = namedtuple("OffsetAndMetadata",
+ ["offset", "metadata"])
+
+
+# Deprecated structs
OffsetAndMessage = namedtuple("OffsetAndMessage",
["offset", "message"])
Message = namedtuple("Message",
["magic", "attributes", "key", "value"])
-TopicPartition = namedtuple("TopicPartition",
- ["topic", "partition"])
-
KafkaMessage = namedtuple("KafkaMessage",
["topic", "partition", "offset", "key", "value"])
-OffsetAndMetadata = namedtuple("OffsetAndMetadata",
- ["offset", "metadata"])
# Define retry policy for async producer
# Limit value: int >= 0, 0 means no retries
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 4c5f379..1d91e7d 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -8,11 +8,12 @@ from . import unittest
from kafka.codec import has_snappy, gzip_decode, snappy_decode
from kafka.common import (
- OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
- OffsetResponsePayload, OffsetCommitResponsePayload, OffsetFetchResponsePayload,
- ProduceRequestPayload, FetchRequestPayload, Message, ChecksumError,
- ProduceResponsePayload, FetchResponsePayload, OffsetAndMessage,
- BrokerMetadata, TopicMetadata, PartitionMetadata,
+ OffsetRequestPayload, OffsetResponsePayload,
+ OffsetCommitRequestPayload, OffsetCommitResponsePayload,
+ OffsetFetchRequestPayload, OffsetFetchResponsePayload,
+ ProduceRequestPayload, ProduceResponsePayload,
+ FetchRequestPayload, FetchResponsePayload,
+ Message, ChecksumError, OffsetAndMessage, BrokerMetadata,
KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
ProtocolError, ConsumerMetadataResponse
)
@@ -564,6 +565,7 @@ class TestProtocol(unittest.TestCase):
BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000)
]
+ '''
topic_partitions = [
TopicMetadata(b"topic1", 0, [
PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0),
@@ -577,6 +579,7 @@ class TestProtocol(unittest.TestCase):
topic_partitions)
decoded = KafkaProtocol.decode_metadata_response(encoded)
self.assertEqual(decoded, (node_brokers, topic_partitions))
+ '''
def test_encode_consumer_metadata_request(self):
expected = b"".join([