summaryrefslogtreecommitdiff
path: root/test/test_protocol.py
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-09 10:33:40 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-09 10:33:40 -0700
commit12fae12ef2591b6129ed10431e6f4925682f7b1c (patch)
tree6eb44ddebd11295aff114e5883d3a4586b42d3d9 /test/test_protocol.py
parent8f179d8607c5632be84d86f07c004777865be00d (diff)
downloadkafka-python-12fae12ef2591b6129ed10431e6f4925682f7b1c.tar.gz
Add final tests for 100% coverage of protocol.py from test/test_protocol.py
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r--test/test_protocol.py69
1 files changed, 63 insertions, 6 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 7459ca0..555fe10 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -4,6 +4,7 @@ import unittest
from kafka import KafkaClient
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
+ OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
@@ -549,9 +550,29 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(encoded, expected)
- @unittest.skip("Not Implemented")
def test_decode_offset_response(self):
- pass
+ encoded = "".join([
+ struct.pack(">i", 42), # Correlation ID
+ struct.pack(">i", 1), # One topics
+ struct.pack(">h6s", 6, "topic1"), # First topic
+ struct.pack(">i", 2), # Two partitions
+
+ struct.pack(">i", 2), # Partition 2
+ struct.pack(">h", 0), # No error
+ struct.pack(">i", 1), # One offset
+ struct.pack(">q", 4), # Offset 4
+
+ struct.pack(">i", 4), # Partition 4
+ struct.pack(">h", 0), # No error
+ struct.pack(">i", 1), # One offset
+ struct.pack(">q", 8), # Offset 8
+ ])
+
+ results = KafkaProtocol.decode_offset_response(encoded)
+ self.assertEqual(set(results), {
+ OffsetResponse(topic = 'topic1', partition = 2, error = 0, offsets=(4,)),
+ OffsetResponse(topic = 'topic1', partition = 4, error = 0, offsets=(8,)),
+ })
def test_encode_offset_commit_request(self):
header = "".join([
@@ -595,9 +616,25 @@ class TestProtocol(unittest.TestCase):
self.assertIn(encoded, [ expected1, expected2 ])
- @unittest.skip("Not Implemented")
def test_decode_offset_commit_response(self):
- pass
+ encoded = "".join([
+ struct.pack(">i", 42), # Correlation ID
+ struct.pack(">i", 1), # One topic
+ struct.pack(">h6s", 6, "topic1"), # First topic
+ struct.pack(">i", 2), # Two partitions
+
+ struct.pack(">i", 2), # Partition 2
+ struct.pack(">h", 0), # No error
+
+ struct.pack(">i", 4), # Partition 4
+ struct.pack(">h", 0), # No error
+ ])
+
+ results = KafkaProtocol.decode_offset_commit_response(encoded)
+ self.assertEqual(set(results), {
+ OffsetCommitResponse(topic = 'topic1', partition = 2, error = 0),
+ OffsetCommitResponse(topic = 'topic1', partition = 4, error = 0),
+ })
def test_encode_offset_fetch_request(self):
header = "".join([
@@ -634,6 +671,26 @@ class TestProtocol(unittest.TestCase):
self.assertIn(encoded, [ expected1, expected2 ])
- @unittest.skip("Not Implemented")
def test_decode_offset_fetch_response(self):
- pass
+ encoded = "".join([
+ struct.pack(">i", 42), # Correlation ID
+ struct.pack(">i", 1), # One topics
+ struct.pack(">h6s", 6, "topic1"), # First topic
+ struct.pack(">i", 2), # Two partitions
+
+ struct.pack(">i", 2), # Partition 2
+ struct.pack(">q", 4), # Offset 4
+ struct.pack(">h4s", 4, "meta"), # Metadata
+ struct.pack(">h", 0), # No error
+
+ struct.pack(">i", 4), # Partition 4
+ struct.pack(">q", 8), # Offset 8
+ struct.pack(">h4s", 4, "meta"), # Metadata
+ struct.pack(">h", 0), # No error
+ ])
+
+ results = KafkaProtocol.decode_offset_fetch_response(encoded)
+ self.assertEqual(set(results), {
+ OffsetFetchResponse(topic = 'topic1', partition = 2, offset = 4, error = 0, metadata = "meta"),
+ OffsetFetchResponse(topic = 'topic1', partition = 4, offset = 8, error = 0, metadata = "meta"),
+ })