diff options
author | Mark Roberts <wizzat@gmail.com> | 2014-04-09 02:30:43 -0700 |
---|---|---|
committer | Mark Roberts <wizzat@gmail.com> | 2014-04-09 02:30:43 -0700 |
commit | 1cb27f99d695ce52058e41699b6dc2c99b40913a (patch) | |
tree | c79c94023f6d072029c9e4ebcbe7c56235a12b27 /test/test_protocol.py | |
parent | 5c58151e6f3722be2b9a2af4aedf9caa70be7189 (diff) | |
download | kafka-python-1cb27f99d695ce52058e41699b6dc2c99b40913a.tar.gz |
Add tests for encode_offset_request
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r-- | test/test_protocol.py | 46 |
1 files changed, 43 insertions, 3 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py index 507cc8b..d255172 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -3,7 +3,7 @@ import unittest from kafka import KafkaClient from kafka.common import ( - ProduceRequest, FetchRequest, Message, ChecksumError, + OffsetRequest, ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, ProtocolError, @@ -522,9 +522,49 @@ class TestProtocol(unittest.TestCase): decoded = KafkaProtocol.decode_metadata_response(encoded) self.assertEqual(decoded, (node_brokers, topic_partitions)) - @unittest.skip("Not Implemented") def test_encode_offset_request(self): - pass + expected = "".join([ + struct.pack(">i", 21), # Total length of the request + struct.pack('>h', 2), # Message type = offset fetch + struct.pack('>h', 0), # API version + struct.pack('>i', 4), # Correlation ID + struct.pack('>h3s', 3, "cid"), # The client ID + struct.pack('>i', -1), # Replica Id + struct.pack('>i', 0), # No topic/partitions + ]) + + encoded = KafkaProtocol.encode_offset_request("cid", 4) + + self.assertEqual(encoded, expected) + + def test_encode_offset_request__no_payload(self): + expected = "".join([ + struct.pack(">i", 65), # Total length of the request + + struct.pack('>h', 2), # Message type = offset fetch + struct.pack('>h', 0), # API version + struct.pack('>i', 4), # Correlation ID + struct.pack('>h3s', 3, "cid"), # The client ID + struct.pack('>i', -1), # Replica Id + struct.pack('>i', 1), # Num topics + struct.pack(">h6s", 6, "topic1"), # Topic for the request + struct.pack(">i", 2), # Two partitions + + struct.pack(">i", 3), # Partition 3 + struct.pack(">q", -1), # No time offset + struct.pack(">i", 1), # One offset requested + + struct.pack(">i", 4), # Partition 3 + struct.pack(">q", -1), # No time offset + struct.pack(">i", 1), # One offset requested + ]) + + encoded = KafkaProtocol.encode_offset_request("cid", 4, [ + OffsetRequest('topic1', 3, -1, 1), + OffsetRequest('topic1', 4, -1, 1), + ]) + + self.assertEqual(encoded, expected) @unittest.skip("Not Implemented") def test_decode_offset_response(self): |