diff options
author | Mark Roberts <wizzat@gmail.com> | 2014-04-09 02:47:07 -0700 |
---|---|---|
committer | Mark Roberts <wizzat@gmail.com> | 2014-04-09 02:47:07 -0700 |
commit | 58b4d0ff7a956d62047d06b336cd6e1f66df8270 (patch) | |
tree | d2a51fae98b02469c7b1fc19cf2a73b2dfa7af89 /test/test_protocol.py | |
parent | 1cb27f99d695ce52058e41699b6dc2c99b40913a (diff) | |
download | kafka-python-58b4d0ff7a956d62047d06b336cd6e1f66df8270.tar.gz |
Add commit offset request test
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r-- | test/test_protocol.py | 73 |
1 files changed, 47 insertions, 26 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py index d255172..f307ce8 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -3,7 +3,8 @@ import unittest from kafka import KafkaClient from kafka.common import ( - OffsetRequest, ProduceRequest, FetchRequest, Message, ChecksumError, + OffsetRequest, OffsetCommitRequest, + ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, ProtocolError, @@ -332,20 +333,11 @@ class TestProtocol(unittest.TestCase): msg_c_binary, # Actual message ]) - expect1 = "".join([ - header, - topic1, - topic2 - ]) - - expect2 = "".join([ - header, - topic2, - topic1 - ]) + expected1 = "".join([ header, topic1, topic2 ]) + expected2 = "".join([ header, topic2, topic1 ]) encoded = KafkaProtocol.encode_produce_request("client1", 2, requests, 2, 100) - self.assertIn(encoded, [ expect1, expect2 ]) + self.assertIn(encoded, [ expected1, expected2 ]) def test_decode_produce_response(self): t1 = "topic1" @@ -393,17 +385,8 @@ class TestProtocol(unittest.TestCase): struct.pack('>i', 100), # Max Bytes ]) - expected1 = "".join([ - header, - topic1, - topic2, - ]) - - expected2 = "".join([ - header, - topic2, - topic1, - ]) + expected1 = "".join([ header, topic1, topic2 ]) + expected2 = "".join([ header, topic2, topic1 ]) encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2, 100) self.assertIn(encoded, [ expected1, expected2 ]) @@ -570,9 +553,47 @@ class TestProtocol(unittest.TestCase): def test_decode_offset_response(self): pass - @unittest.skip("Not Implemented") def test_encode_offset_commit_request(self): - pass + header = "".join([ + struct.pack('>i', 99), # Total message length + + struct.pack('>h', 8), # Message type = offset fetch + struct.pack('>h', 0), # API version + struct.pack('>i', 42), # Correlation ID + struct.pack('>h9s', 9, "client_id"), # The client ID + struct.pack('>h8s', 8, "group_id"), # The group to commit for + struct.pack('>i', 2), # Num topics + ]) + + topic1 = "".join([ + struct.pack(">h6s", 6, "topic1"), # Topic for the request + struct.pack(">i", 2), # Two partitions + struct.pack(">i", 0), # Partition 0 + struct.pack(">q", 123), # Offset 123 + struct.pack(">h", -1), # Null metadata + struct.pack(">i", 1), # Partition 0 + struct.pack(">q", 234), # Offset 123 + struct.pack(">h", -1), # Null metadata + ]) + + topic2 = "".join([ + struct.pack(">h6s", 6, "topic2"), # Topic for the request + struct.pack(">i", 1), # Two partitions + struct.pack(">i", 2), # Partition 0 + struct.pack(">q", 345), # Offset 123 + struct.pack(">h", -1), # Null metadata + ]) + + expected1 = "".join([ header, topic1, topic2 ]) + expected2 = "".join([ header, topic2, topic1 ]) + + encoded = KafkaProtocol.encode_offset_commit_request("client_id", 42, "group_id", [ + OffsetCommitRequest("topic1", 0, 123, None), + OffsetCommitRequest("topic1", 1, 234, None), + OffsetCommitRequest("topic2", 2, 345, None), + ]) + + self.assertIn(encoded, [ expected1, expected2 ]) @unittest.skip("Not Implemented") def test_decode_offset_commit_response(self): |