summaryrefslogtreecommitdiff
path: root/test/test_protocol.py
diff options
context:
space:
mode:
authortpalino <tpalino@linkedin.com>2015-07-01 22:29:56 -0700
committertpalino <tpalino@linkedin.com>2015-07-01 22:29:56 -0700
commitc01963682e1ec16dbde9b55cda4ec95ad2346476 (patch)
treeb4de368ad49a6d0e23be9ff7f3af64d1f3c4f890 /test/test_protocol.py
parentadbd4ac052e4a5b40cfc2a3589b7adbcb656afe5 (diff)
downloadkafka-python-c01963682e1ec16dbde9b55cda4ec95ad2346476.tar.gz
Support consumer metadata requests
Support added for ConsumerMetadataRequest and ConsumerMetadataResponse Added consumer-aware request routine for supporting the consumer coordinator Added separate client method for fetching Kafka-committed offsets from the coordinator
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r--test/test_protocol.py28
1 files changed, 28 insertions, 0 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 0938228..1f967f2 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -560,6 +560,34 @@ class TestProtocol(unittest.TestCase):
decoded = KafkaProtocol.decode_metadata_response(encoded)
self.assertEqual(decoded, (node_brokers, topic_partitions))
+ def test_encode_consumer_metadata_request(self):
+ expected = b"".join([
+ struct.pack(">i", 17), # Total length of the request
+ struct.pack('>h', 10), # API key consumer metadata
+ struct.pack('>h', 0), # API version
+ struct.pack('>i', 4), # Correlation ID
+ struct.pack('>h3s', 3, b"cid"),# The client ID
+ struct.pack('>h2s', 2, b"g1"), # Group "g1"
+ ])
+
+ encoded = KafkaProtocol.encode_consumer_metadata_request(b"cid", 4, b"g1")
+
+ self.assertEqual(encoded, expected)
+
+ def test_decode_consumer_metadata_response(self):
+ encoded = b"".join([
+ struct.pack(">i", 42), # Correlation ID
+ struct.pack(">h", 0), # No Error
+ struct.pack(">i", 1), # Broker ID
+ struct.pack(">h23s", 23, b"brokers1.kafka.rdio.com"), # Broker Host
+ struct.pack(">i", 1000), # Broker Port
+ ])
+
+ results = KafkaProtocol.decode_consumer_metadata_response(encoded)
+ self.assertEqual(results,
+ ConsumerMetadataResponse(error = 0, node = 1, host = b'brokers1.kafka.rdio.com', port = 1000)
+ )
+
def test_encode_offset_request(self):
expected = b"".join([
struct.pack(">i", 21), # Total length of the request