diff options
author | Mark Roberts <wizzat@gmail.com> | 2014-04-09 02:58:36 -0700 |
---|---|---|
committer | Mark Roberts <wizzat@gmail.com> | 2014-04-09 02:58:36 -0700 |
commit | 8f179d8607c5632be84d86f07c004777865be00d (patch) | |
tree | 141704a9072aa768592530b5f247fee644df3b1f | |
parent | 58b4d0ff7a956d62047d06b336cd6e1f66df8270 (diff) | |
download | kafka-python-8f179d8607c5632be84d86f07c004777865be00d.tar.gz |
Add encode_offset_fetch_request test
-rw-r--r-- | test/test_protocol.py | 49 |
1 files changed, 40 insertions, 9 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py index f307ce8..7459ca0 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -3,7 +3,7 @@ import unittest from kafka import KafkaClient from kafka.common import ( - OffsetRequest, OffsetCommitRequest, + OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, PartitionMetadata, @@ -557,7 +557,7 @@ class TestProtocol(unittest.TestCase): header = "".join([ struct.pack('>i', 99), # Total message length - struct.pack('>h', 8), # Message type = offset fetch + struct.pack('>h', 8), # Message type = offset commit struct.pack('>h', 0), # API version struct.pack('>i', 42), # Correlation ID struct.pack('>h9s', 9, "client_id"), # The client ID @@ -571,16 +571,16 @@ class TestProtocol(unittest.TestCase): struct.pack(">i", 0), # Partition 0 struct.pack(">q", 123), # Offset 123 struct.pack(">h", -1), # Null metadata - struct.pack(">i", 1), # Partition 0 - struct.pack(">q", 234), # Offset 123 + struct.pack(">i", 1), # Partition 1 + struct.pack(">q", 234), # Offset 234 struct.pack(">h", -1), # Null metadata ]) topic2 = "".join([ struct.pack(">h6s", 6, "topic2"), # Topic for the request - struct.pack(">i", 1), # Two partitions - struct.pack(">i", 2), # Partition 0 - struct.pack(">q", 345), # Offset 123 + struct.pack(">i", 1), # One partition + struct.pack(">i", 2), # Partition 2 + struct.pack(">q", 345), # Offset 345 struct.pack(">h", -1), # Null metadata ]) @@ -599,9 +599,40 @@ class TestProtocol(unittest.TestCase): def test_decode_offset_commit_response(self): pass - @unittest.skip("Not Implemented") def test_encode_offset_fetch_request(self): - pass + header = "".join([ + struct.pack('>i', 69), # Total message length + struct.pack('>h', 9), # Message type = offset fetch + struct.pack('>h', 0), # API version + struct.pack('>i', 42), # Correlation ID + struct.pack('>h9s', 9, "client_id"), # The client ID + struct.pack('>h8s', 8, "group_id"), # The group to commit for + struct.pack('>i', 2), # Num topics + ]) + + topic1 = "".join([ + struct.pack(">h6s", 6, "topic1"), # Topic for the request + struct.pack(">i", 2), # Two partitions + struct.pack(">i", 0), # Partition 0 + struct.pack(">i", 1), # Partition 1 + ]) + + topic2 = "".join([ + struct.pack(">h6s", 6, "topic2"), # Topic for the request + struct.pack(">i", 1), # One partitions + struct.pack(">i", 2), # Partition 2 + ]) + + expected1 = "".join([ header, topic1, topic2 ]) + expected2 = "".join([ header, topic2, topic1 ]) + + encoded = KafkaProtocol.encode_offset_fetch_request("client_id", 42, "group_id", [ + OffsetFetchRequest("topic1", 0), + OffsetFetchRequest("topic1", 1), + OffsetFetchRequest("topic2", 2), + ]) + + self.assertIn(encoded, [ expected1, expected2 ]) @unittest.skip("Not Implemented") def test_decode_offset_fetch_response(self): |