From c01963682e1ec16dbde9b55cda4ec95ad2346476 Mon Sep 17 00:00:00 2001 From: tpalino Date: Wed, 1 Jul 2015 22:29:56 -0700 Subject: Support consumer metadata requests Support added for ConsumerMetadataRequest and ConsumerMetadataResponse Added consumer-aware request routine for supporting the consumer coordinator Added separate client method for fetching Kafka-committed offsets from the coordinator --- test/test_protocol.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) (limited to 'test') diff --git a/test/test_protocol.py b/test/test_protocol.py index 0938228..1f967f2 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -560,6 +560,34 @@ class TestProtocol(unittest.TestCase): decoded = KafkaProtocol.decode_metadata_response(encoded) self.assertEqual(decoded, (node_brokers, topic_partitions)) + def test_encode_consumer_metadata_request(self): + expected = b"".join([ + struct.pack(">i", 17), # Total length of the request + struct.pack('>h', 10), # API key consumer metadata + struct.pack('>h', 0), # API version + struct.pack('>i', 4), # Correlation ID + struct.pack('>h3s', 3, b"cid"),# The client ID + struct.pack('>h2s', 2, b"g1"), # Group "g1" + ]) + + encoded = KafkaProtocol.encode_consumer_metadata_request(b"cid", 4, b"g1") + + self.assertEqual(encoded, expected) + + def test_decode_consumer_metadata_response(self): + encoded = b"".join([ + struct.pack(">i", 42), # Correlation ID + struct.pack(">h", 0), # No Error + struct.pack(">i", 1), # Broker ID + struct.pack(">h23s", 23, b"brokers1.kafka.rdio.com"), # Broker Host + struct.pack(">i", 1000), # Broker Port + ]) + + results = KafkaProtocol.decode_consumer_metadata_response(encoded) + self.assertEqual(results, + ConsumerMetadataResponse(error = 0, node = 1, host = b'brokers1.kafka.rdio.com', port = 1000) + ) + def test_encode_offset_request(self): expected = b"".join([ struct.pack(">i", 21), # Total length of the request -- cgit v1.2.1 From 5d8d5412e576c5514497be3809ea899378f40e56 Mon Sep 17 00:00:00 2001 From: tpalino Date: Wed, 1 Jul 2015 22:47:02 -0700 Subject: Add missing import for ConsumerMetadataResponse --- test/test_protocol.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test') diff --git a/test/test_protocol.py b/test/test_protocol.py index 1f967f2..f5bc289 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -13,7 +13,7 @@ from kafka.common import ( ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall, - ProtocolError + ProtocolError, ConsumerMetadataResponse ) from kafka.protocol import ( ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol, -- cgit v1.2.1 From da03827d12520bd9c8c5b35bb43e35168f09771a Mon Sep 17 00:00:00 2001 From: tpalino Date: Thu, 2 Jul 2015 09:32:37 -0700 Subject: Fix misnamed node to nodeId in test --- test/test_protocol.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test') diff --git a/test/test_protocol.py b/test/test_protocol.py index f5bc289..ac7bea6 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -585,7 +585,7 @@ class TestProtocol(unittest.TestCase): results = KafkaProtocol.decode_consumer_metadata_response(encoded) self.assertEqual(results, - ConsumerMetadataResponse(error = 0, node = 1, host = b'brokers1.kafka.rdio.com', port = 1000) + ConsumerMetadataResponse(error = 0, nodeId = 1, host = b'brokers1.kafka.rdio.com', port = 1000) ) def test_encode_offset_request(self): -- cgit v1.2.1