diff options
Diffstat (limited to 'test/test_protocol.py')
| -rw-r--r-- | test/test_protocol.py | 46 | 
1 files changed, 43 insertions, 3 deletions
| diff --git a/test/test_protocol.py b/test/test_protocol.py index 507cc8b..d255172 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -3,7 +3,7 @@ import unittest  from kafka import KafkaClient  from kafka.common import ( -    ProduceRequest, FetchRequest, Message, ChecksumError, +    OffsetRequest, ProduceRequest, FetchRequest, Message, ChecksumError,      ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,      OffsetAndMessage, BrokerMetadata, PartitionMetadata,      TopicAndPartition, KafkaUnavailableError, ProtocolError, @@ -522,9 +522,49 @@ class TestProtocol(unittest.TestCase):          decoded = KafkaProtocol.decode_metadata_response(encoded)          self.assertEqual(decoded, (node_brokers, topic_partitions)) -    @unittest.skip("Not Implemented")      def test_encode_offset_request(self): -        pass +        expected = "".join([ +            struct.pack(">i", 21),         # Total length of the request +            struct.pack('>h', 2),          # Message type = offset fetch +            struct.pack('>h', 0),          # API version +            struct.pack('>i', 4),          # Correlation ID +            struct.pack('>h3s', 3, "cid"), # The client ID +            struct.pack('>i', -1),         # Replica Id +            struct.pack('>i', 0),          # No topic/partitions +        ]) + +        encoded = KafkaProtocol.encode_offset_request("cid", 4) + +        self.assertEqual(encoded, expected) + +    def test_encode_offset_request__no_payload(self): +        expected = "".join([ +            struct.pack(">i", 65),            # Total length of the request + +            struct.pack('>h', 2),             # Message type = offset fetch +            struct.pack('>h', 0),             # API version +            struct.pack('>i', 4),             # Correlation ID +            struct.pack('>h3s', 3, "cid"),    # The client ID +            struct.pack('>i', -1),            # Replica Id +            struct.pack('>i', 1),             # Num topics +            struct.pack(">h6s", 6, "topic1"), # Topic for the request +            struct.pack(">i", 2),             # Two partitions + +            struct.pack(">i", 3),             # Partition 3 +            struct.pack(">q", -1),            # No time offset +            struct.pack(">i", 1),             # One offset requested + +            struct.pack(">i", 4),             # Partition 3 +            struct.pack(">q", -1),            # No time offset +            struct.pack(">i", 1),             # One offset requested +        ]) + +        encoded = KafkaProtocol.encode_offset_request("cid", 4, [ +            OffsetRequest('topic1', 3, -1, 1), +            OffsetRequest('topic1', 4, -1, 1), +        ]) + +        self.assertEqual(encoded, expected)      @unittest.skip("Not Implemented")      def test_decode_offset_response(self): | 
