diff options
-rw-r--r-- | test/test_protocol.py | 69 |
1 files changed, 63 insertions, 6 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py index 7459ca0..555fe10 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -4,6 +4,7 @@ import unittest from kafka import KafkaClient from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, + OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, PartitionMetadata, @@ -549,9 +550,29 @@ class TestProtocol(unittest.TestCase): self.assertEqual(encoded, expected) - @unittest.skip("Not Implemented") def test_decode_offset_response(self): - pass + encoded = "".join([ + struct.pack(">i", 42), # Correlation ID + struct.pack(">i", 1), # One topics + struct.pack(">h6s", 6, "topic1"), # First topic + struct.pack(">i", 2), # Two partitions + + struct.pack(">i", 2), # Partition 2 + struct.pack(">h", 0), # No error + struct.pack(">i", 1), # One offset + struct.pack(">q", 4), # Offset 4 + + struct.pack(">i", 4), # Partition 4 + struct.pack(">h", 0), # No error + struct.pack(">i", 1), # One offset + struct.pack(">q", 8), # Offset 8 + ]) + + results = KafkaProtocol.decode_offset_response(encoded) + self.assertEqual(set(results), { + OffsetResponse(topic = 'topic1', partition = 2, error = 0, offsets=(4,)), + OffsetResponse(topic = 'topic1', partition = 4, error = 0, offsets=(8,)), + }) def test_encode_offset_commit_request(self): header = "".join([ @@ -595,9 +616,25 @@ class TestProtocol(unittest.TestCase): self.assertIn(encoded, [ expected1, expected2 ]) - @unittest.skip("Not Implemented") def test_decode_offset_commit_response(self): - pass + encoded = "".join([ + struct.pack(">i", 42), # Correlation ID + struct.pack(">i", 1), # One topic + struct.pack(">h6s", 6, "topic1"), # First topic + struct.pack(">i", 2), # Two partitions + + struct.pack(">i", 2), # Partition 2 + struct.pack(">h", 0), # No error + + struct.pack(">i", 4), # Partition 4 + struct.pack(">h", 0), # No error + ]) + + results = KafkaProtocol.decode_offset_commit_response(encoded) + self.assertEqual(set(results), { + OffsetCommitResponse(topic = 'topic1', partition = 2, error = 0), + OffsetCommitResponse(topic = 'topic1', partition = 4, error = 0), + }) def test_encode_offset_fetch_request(self): header = "".join([ @@ -634,6 +671,26 @@ class TestProtocol(unittest.TestCase): self.assertIn(encoded, [ expected1, expected2 ]) - @unittest.skip("Not Implemented") def test_decode_offset_fetch_response(self): - pass + encoded = "".join([ + struct.pack(">i", 42), # Correlation ID + struct.pack(">i", 1), # One topics + struct.pack(">h6s", 6, "topic1"), # First topic + struct.pack(">i", 2), # Two partitions + + struct.pack(">i", 2), # Partition 2 + struct.pack(">q", 4), # Offset 4 + struct.pack(">h4s", 4, "meta"), # Metadata + struct.pack(">h", 0), # No error + + struct.pack(">i", 4), # Partition 4 + struct.pack(">q", 8), # Offset 8 + struct.pack(">h4s", 4, "meta"), # Metadata + struct.pack(">h", 0), # No error + ]) + + results = KafkaProtocol.decode_offset_fetch_response(encoded) + self.assertEqual(set(results), { + OffsetFetchResponse(topic = 'topic1', partition = 2, offset = 4, error = 0, metadata = "meta"), + OffsetFetchResponse(topic = 'topic1', partition = 4, offset = 8, error = 0, metadata = "meta"), + }) |