summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/test_protocol.py69
1 files changed, 63 insertions, 6 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 7459ca0..555fe10 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -4,6 +4,7 @@ import unittest
from kafka import KafkaClient
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
+ OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
@@ -549,9 +550,29 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(encoded, expected)
- @unittest.skip("Not Implemented")
def test_decode_offset_response(self):
- pass
+ encoded = "".join([
+ struct.pack(">i", 42), # Correlation ID
+ struct.pack(">i", 1), # One topics
+ struct.pack(">h6s", 6, "topic1"), # First topic
+ struct.pack(">i", 2), # Two partitions
+
+ struct.pack(">i", 2), # Partition 2
+ struct.pack(">h", 0), # No error
+ struct.pack(">i", 1), # One offset
+ struct.pack(">q", 4), # Offset 4
+
+ struct.pack(">i", 4), # Partition 4
+ struct.pack(">h", 0), # No error
+ struct.pack(">i", 1), # One offset
+ struct.pack(">q", 8), # Offset 8
+ ])
+
+ results = KafkaProtocol.decode_offset_response(encoded)
+ self.assertEqual(set(results), {
+ OffsetResponse(topic = 'topic1', partition = 2, error = 0, offsets=(4,)),
+ OffsetResponse(topic = 'topic1', partition = 4, error = 0, offsets=(8,)),
+ })
def test_encode_offset_commit_request(self):
header = "".join([
@@ -595,9 +616,25 @@ class TestProtocol(unittest.TestCase):
self.assertIn(encoded, [ expected1, expected2 ])
- @unittest.skip("Not Implemented")
def test_decode_offset_commit_response(self):
- pass
+ encoded = "".join([
+ struct.pack(">i", 42), # Correlation ID
+ struct.pack(">i", 1), # One topic
+ struct.pack(">h6s", 6, "topic1"), # First topic
+ struct.pack(">i", 2), # Two partitions
+
+ struct.pack(">i", 2), # Partition 2
+ struct.pack(">h", 0), # No error
+
+ struct.pack(">i", 4), # Partition 4
+ struct.pack(">h", 0), # No error
+ ])
+
+ results = KafkaProtocol.decode_offset_commit_response(encoded)
+ self.assertEqual(set(results), {
+ OffsetCommitResponse(topic = 'topic1', partition = 2, error = 0),
+ OffsetCommitResponse(topic = 'topic1', partition = 4, error = 0),
+ })
def test_encode_offset_fetch_request(self):
header = "".join([
@@ -634,6 +671,26 @@ class TestProtocol(unittest.TestCase):
self.assertIn(encoded, [ expected1, expected2 ])
- @unittest.skip("Not Implemented")
def test_decode_offset_fetch_response(self):
- pass
+ encoded = "".join([
+ struct.pack(">i", 42), # Correlation ID
+ struct.pack(">i", 1), # One topics
+ struct.pack(">h6s", 6, "topic1"), # First topic
+ struct.pack(">i", 2), # Two partitions
+
+ struct.pack(">i", 2), # Partition 2
+ struct.pack(">q", 4), # Offset 4
+ struct.pack(">h4s", 4, "meta"), # Metadata
+ struct.pack(">h", 0), # No error
+
+ struct.pack(">i", 4), # Partition 4
+ struct.pack(">q", 8), # Offset 8
+ struct.pack(">h4s", 4, "meta"), # Metadata
+ struct.pack(">h", 0), # No error
+ ])
+
+ results = KafkaProtocol.decode_offset_fetch_response(encoded)
+ self.assertEqual(set(results), {
+ OffsetFetchResponse(topic = 'topic1', partition = 2, offset = 4, error = 0, metadata = "meta"),
+ OffsetFetchResponse(topic = 'topic1', partition = 4, offset = 8, error = 0, metadata = "meta"),
+ })