summaryrefslogtreecommitdiff
path: root/test/test_protocol.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-12-02 15:50:05 -0800
committerDana Powers <dana.powers@gmail.com>2015-12-02 15:50:05 -0800
commit3e28b42e6691b864b6f940034a4ccdce0b69d406 (patch)
treeb2badd68fc5ac37ab50016f25a0efdb2abb76c55 /test/test_protocol.py
parentee6b9cb5b1310c48a3ed5b66be0dd0c4dd16dc43 (diff)
parentda03827d12520bd9c8c5b35bb43e35168f09771a (diff)
downloadkafka-python-3e28b42e6691b864b6f940034a4ccdce0b69d406.tar.gz
Merge pull request #420 from toddpalino/master
Initial support for consumer coordinator
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r--test/test_protocol.py30
1 files changed, 29 insertions, 1 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 0938228..ac7bea6 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -13,7 +13,7 @@ from kafka.common import (
ProduceResponse, FetchResponse, OffsetAndMessage,
BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,
KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
- ProtocolError
+ ProtocolError, ConsumerMetadataResponse
)
from kafka.protocol import (
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
@@ -560,6 +560,34 @@ class TestProtocol(unittest.TestCase):
decoded = KafkaProtocol.decode_metadata_response(encoded)
self.assertEqual(decoded, (node_brokers, topic_partitions))
+ def test_encode_consumer_metadata_request(self):
+ expected = b"".join([
+ struct.pack(">i", 17), # Total length of the request
+ struct.pack('>h', 10), # API key consumer metadata
+ struct.pack('>h', 0), # API version
+ struct.pack('>i', 4), # Correlation ID
+ struct.pack('>h3s', 3, b"cid"),# The client ID
+ struct.pack('>h2s', 2, b"g1"), # Group "g1"
+ ])
+
+ encoded = KafkaProtocol.encode_consumer_metadata_request(b"cid", 4, b"g1")
+
+ self.assertEqual(encoded, expected)
+
+ def test_decode_consumer_metadata_response(self):
+ encoded = b"".join([
+ struct.pack(">i", 42), # Correlation ID
+ struct.pack(">h", 0), # No Error
+ struct.pack(">i", 1), # Broker ID
+ struct.pack(">h23s", 23, b"brokers1.kafka.rdio.com"), # Broker Host
+ struct.pack(">i", 1000), # Broker Port
+ ])
+
+ results = KafkaProtocol.decode_consumer_metadata_response(encoded)
+ self.assertEqual(results,
+ ConsumerMetadataResponse(error = 0, nodeId = 1, host = b'brokers1.kafka.rdio.com', port = 1000)
+ )
+
def test_encode_offset_request(self):
expected = b"".join([
struct.pack(">i", 21), # Total length of the request