summaryrefslogtreecommitdiff
path: root/test/test_protocol.py
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-09 02:30:43 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-09 02:30:43 -0700
commit1cb27f99d695ce52058e41699b6dc2c99b40913a (patch)
treec79c94023f6d072029c9e4ebcbe7c56235a12b27 /test/test_protocol.py
parent5c58151e6f3722be2b9a2af4aedf9caa70be7189 (diff)
downloadkafka-python-1cb27f99d695ce52058e41699b6dc2c99b40913a.tar.gz
Add tests for encode_offset_request
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r--test/test_protocol.py46
1 files changed, 43 insertions, 3 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 507cc8b..d255172 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -3,7 +3,7 @@ import unittest
from kafka import KafkaClient
from kafka.common import (
- ProduceRequest, FetchRequest, Message, ChecksumError,
+ OffsetRequest, ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
TopicAndPartition, KafkaUnavailableError, ProtocolError,
@@ -522,9 +522,49 @@ class TestProtocol(unittest.TestCase):
decoded = KafkaProtocol.decode_metadata_response(encoded)
self.assertEqual(decoded, (node_brokers, topic_partitions))
- @unittest.skip("Not Implemented")
def test_encode_offset_request(self):
- pass
+ expected = "".join([
+ struct.pack(">i", 21), # Total length of the request
+ struct.pack('>h', 2), # Message type = offset fetch
+ struct.pack('>h', 0), # API version
+ struct.pack('>i', 4), # Correlation ID
+ struct.pack('>h3s', 3, "cid"), # The client ID
+ struct.pack('>i', -1), # Replica Id
+ struct.pack('>i', 0), # No topic/partitions
+ ])
+
+ encoded = KafkaProtocol.encode_offset_request("cid", 4)
+
+ self.assertEqual(encoded, expected)
+
+ def test_encode_offset_request__no_payload(self):
+ expected = "".join([
+ struct.pack(">i", 65), # Total length of the request
+
+ struct.pack('>h', 2), # Message type = offset fetch
+ struct.pack('>h', 0), # API version
+ struct.pack('>i', 4), # Correlation ID
+ struct.pack('>h3s', 3, "cid"), # The client ID
+ struct.pack('>i', -1), # Replica Id
+ struct.pack('>i', 1), # Num topics
+ struct.pack(">h6s", 6, "topic1"), # Topic for the request
+ struct.pack(">i", 2), # Two partitions
+
+ struct.pack(">i", 3), # Partition 3
+ struct.pack(">q", -1), # No time offset
+ struct.pack(">i", 1), # One offset requested
+
+ struct.pack(">i", 4), # Partition 3
+ struct.pack(">q", -1), # No time offset
+ struct.pack(">i", 1), # One offset requested
+ ])
+
+ encoded = KafkaProtocol.encode_offset_request("cid", 4, [
+ OffsetRequest('topic1', 3, -1, 1),
+ OffsetRequest('topic1', 4, -1, 1),
+ ])
+
+ self.assertEqual(encoded, expected)
@unittest.skip("Not Implemented")
def test_decode_offset_response(self):