summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/test_protocol.py49
1 files changed, 40 insertions, 9 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index f307ce8..7459ca0 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -3,7 +3,7 @@ import unittest
from kafka import KafkaClient
from kafka.common import (
- OffsetRequest, OffsetCommitRequest,
+ OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
@@ -557,7 +557,7 @@ class TestProtocol(unittest.TestCase):
header = "".join([
struct.pack('>i', 99), # Total message length
- struct.pack('>h', 8), # Message type = offset fetch
+ struct.pack('>h', 8), # Message type = offset commit
struct.pack('>h', 0), # API version
struct.pack('>i', 42), # Correlation ID
struct.pack('>h9s', 9, "client_id"), # The client ID
@@ -571,16 +571,16 @@ class TestProtocol(unittest.TestCase):
struct.pack(">i", 0), # Partition 0
struct.pack(">q", 123), # Offset 123
struct.pack(">h", -1), # Null metadata
- struct.pack(">i", 1), # Partition 0
- struct.pack(">q", 234), # Offset 123
+ struct.pack(">i", 1), # Partition 1
+ struct.pack(">q", 234), # Offset 234
struct.pack(">h", -1), # Null metadata
])
topic2 = "".join([
struct.pack(">h6s", 6, "topic2"), # Topic for the request
- struct.pack(">i", 1), # Two partitions
- struct.pack(">i", 2), # Partition 0
- struct.pack(">q", 345), # Offset 123
+ struct.pack(">i", 1), # One partition
+ struct.pack(">i", 2), # Partition 2
+ struct.pack(">q", 345), # Offset 345
struct.pack(">h", -1), # Null metadata
])
@@ -599,9 +599,40 @@ class TestProtocol(unittest.TestCase):
def test_decode_offset_commit_response(self):
pass
- @unittest.skip("Not Implemented")
def test_encode_offset_fetch_request(self):
- pass
+ header = "".join([
+ struct.pack('>i', 69), # Total message length
+ struct.pack('>h', 9), # Message type = offset fetch
+ struct.pack('>h', 0), # API version
+ struct.pack('>i', 42), # Correlation ID
+ struct.pack('>h9s', 9, "client_id"), # The client ID
+ struct.pack('>h8s', 8, "group_id"), # The group to commit for
+ struct.pack('>i', 2), # Num topics
+ ])
+
+ topic1 = "".join([
+ struct.pack(">h6s", 6, "topic1"), # Topic for the request
+ struct.pack(">i", 2), # Two partitions
+ struct.pack(">i", 0), # Partition 0
+ struct.pack(">i", 1), # Partition 1
+ ])
+
+ topic2 = "".join([
+ struct.pack(">h6s", 6, "topic2"), # Topic for the request
+ struct.pack(">i", 1), # One partitions
+ struct.pack(">i", 2), # Partition 2
+ ])
+
+ expected1 = "".join([ header, topic1, topic2 ])
+ expected2 = "".join([ header, topic2, topic1 ])
+
+ encoded = KafkaProtocol.encode_offset_fetch_request("client_id", 42, "group_id", [
+ OffsetFetchRequest("topic1", 0),
+ OffsetFetchRequest("topic1", 1),
+ OffsetFetchRequest("topic2", 2),
+ ])
+
+ self.assertIn(encoded, [ expected1, expected2 ])
@unittest.skip("Not Implemented")
def test_decode_offset_fetch_response(self):