summaryrefslogtreecommitdiff
path: root/test/test_protocol.py
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-09 02:47:07 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-09 02:47:07 -0700
commit58b4d0ff7a956d62047d06b336cd6e1f66df8270 (patch)
treed2a51fae98b02469c7b1fc19cf2a73b2dfa7af89 /test/test_protocol.py
parent1cb27f99d695ce52058e41699b6dc2c99b40913a (diff)
downloadkafka-python-58b4d0ff7a956d62047d06b336cd6e1f66df8270.tar.gz
Add commit offset request test
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r--test/test_protocol.py73
1 files changed, 47 insertions, 26 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index d255172..f307ce8 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -3,7 +3,8 @@ import unittest
from kafka import KafkaClient
from kafka.common import (
- OffsetRequest, ProduceRequest, FetchRequest, Message, ChecksumError,
+ OffsetRequest, OffsetCommitRequest,
+ ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
TopicAndPartition, KafkaUnavailableError, ProtocolError,
@@ -332,20 +333,11 @@ class TestProtocol(unittest.TestCase):
msg_c_binary, # Actual message
])
- expect1 = "".join([
- header,
- topic1,
- topic2
- ])
-
- expect2 = "".join([
- header,
- topic2,
- topic1
- ])
+ expected1 = "".join([ header, topic1, topic2 ])
+ expected2 = "".join([ header, topic2, topic1 ])
encoded = KafkaProtocol.encode_produce_request("client1", 2, requests, 2, 100)
- self.assertIn(encoded, [ expect1, expect2 ])
+ self.assertIn(encoded, [ expected1, expected2 ])
def test_decode_produce_response(self):
t1 = "topic1"
@@ -393,17 +385,8 @@ class TestProtocol(unittest.TestCase):
struct.pack('>i', 100), # Max Bytes
])
- expected1 = "".join([
- header,
- topic1,
- topic2,
- ])
-
- expected2 = "".join([
- header,
- topic2,
- topic1,
- ])
+ expected1 = "".join([ header, topic1, topic2 ])
+ expected2 = "".join([ header, topic2, topic1 ])
encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2, 100)
self.assertIn(encoded, [ expected1, expected2 ])
@@ -570,9 +553,47 @@ class TestProtocol(unittest.TestCase):
def test_decode_offset_response(self):
pass
- @unittest.skip("Not Implemented")
def test_encode_offset_commit_request(self):
- pass
+ header = "".join([
+ struct.pack('>i', 99), # Total message length
+
+ struct.pack('>h', 8), # Message type = offset fetch
+ struct.pack('>h', 0), # API version
+ struct.pack('>i', 42), # Correlation ID
+ struct.pack('>h9s', 9, "client_id"), # The client ID
+ struct.pack('>h8s', 8, "group_id"), # The group to commit for
+ struct.pack('>i', 2), # Num topics
+ ])
+
+ topic1 = "".join([
+ struct.pack(">h6s", 6, "topic1"), # Topic for the request
+ struct.pack(">i", 2), # Two partitions
+ struct.pack(">i", 0), # Partition 0
+ struct.pack(">q", 123), # Offset 123
+ struct.pack(">h", -1), # Null metadata
+ struct.pack(">i", 1), # Partition 0
+ struct.pack(">q", 234), # Offset 123
+ struct.pack(">h", -1), # Null metadata
+ ])
+
+ topic2 = "".join([
+ struct.pack(">h6s", 6, "topic2"), # Topic for the request
+ struct.pack(">i", 1), # Two partitions
+ struct.pack(">i", 2), # Partition 0
+ struct.pack(">q", 345), # Offset 123
+ struct.pack(">h", -1), # Null metadata
+ ])
+
+ expected1 = "".join([ header, topic1, topic2 ])
+ expected2 = "".join([ header, topic2, topic1 ])
+
+ encoded = KafkaProtocol.encode_offset_commit_request("client_id", 42, "group_id", [
+ OffsetCommitRequest("topic1", 0, 123, None),
+ OffsetCommitRequest("topic1", 1, 234, None),
+ OffsetCommitRequest("topic2", 2, 345, None),
+ ])
+
+ self.assertIn(encoded, [ expected1, expected2 ])
@unittest.skip("Not Implemented")
def test_decode_offset_commit_response(self):