diff options
author | Mark Roberts <wizzat@gmail.com> | 2014-04-09 02:11:39 -0700 |
---|---|---|
committer | Mark Roberts <wizzat@gmail.com> | 2014-04-09 02:11:39 -0700 |
commit | 5c58151e6f3722be2b9a2af4aedf9caa70be7189 (patch) | |
tree | 8a50ebd535aea2048f4ee93e63d94036fa461a5a /test/test_protocol.py | |
parent | 115c20ced3b0b0cd3c2b0c3b62a58e3b8b4c1021 (diff) | |
download | kafka-python-5c58151e6f3722be2b9a2af4aedf9caa70be7189.tar.gz |
Add python-snappy to tox dependencies. Fix snappy protocol test
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r-- | test/test_protocol.py | 63 |
1 files changed, 36 insertions, 27 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py index a0b8b39..507cc8b 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -65,6 +65,7 @@ class TestProtocol(unittest.TestCase): self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY) self.assertEqual(msg.key, None) + decoded = snappy_decode(msg.value) expect = "".join([ struct.pack(">q", 0), # MsgSet offset struct.pack(">i", 16), # MsgSet size @@ -83,7 +84,7 @@ class TestProtocol(unittest.TestCase): "v2", # Message contents ]) - self.assertEqual(msg.value, expect) + self.assertEqual(decoded, expect) def test_encode_message_header(self): expect = "".join([ @@ -202,12 +203,16 @@ class TestProtocol(unittest.TestCase): '\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00' '\x00') offset = 11 - decoded = list(KafkaProtocol._decode_message(gzip_encoded, offset)) - self.assertEqual(len(decoded), 2) - (returned_offset1, decoded_message1) = decoded[0] + messages = list(KafkaProtocol._decode_message(gzip_encoded, offset)) + + self.assertEqual(len(messages), 2) + msg1, msg2 = messages + + returned_offset1, decoded_message1 = msg1 self.assertEqual(returned_offset1, 0) self.assertEqual(decoded_message1, create_message("v1")) - (returned_offset2, decoded_message2) = decoded[1] + + returned_offset2, decoded_message2 = msg2 self.assertEqual(returned_offset2, 0) self.assertEqual(decoded_message2, create_message("v2")) @@ -218,12 +223,16 @@ class TestProtocol(unittest.TestCase): '\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5' '\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2') offset = 11 - decoded = list(KafkaProtocol._decode_message(snappy_encoded, offset)) - self.assertEqual(len(decoded), 2) - (returned_offset1, decoded_message1) = decoded[0] + messages = list(KafkaProtocol._decode_message(snappy_encoded, offset)) + self.assertEqual(len(messages), 2) + + msg1, msg2 = messages + + returned_offset1, decoded_message1 = msg1 self.assertEqual(returned_offset1, 0) self.assertEqual(decoded_message1, create_message("v1")) - (returned_offset2, decoded_message2) = decoded[1] + + returned_offset2, decoded_message2 = msg2 self.assertEqual(returned_offset2, 0) self.assertEqual(decoded_message2, create_message("v2")) @@ -289,28 +298,28 @@ class TestProtocol(unittest.TestCase): msg_c_binary = KafkaProtocol._encode_message(create_message("c")) header = "".join([ - struct.pack('>i', 0x94), # The length of the message overall - struct.pack('>h', 0), # Msg Header, Message type = Produce - struct.pack('>h', 0), # Msg Header, API version - struct.pack('>i', 2), # Msg Header, Correlation ID - struct.pack('>h7s', 7, "client1"), # Msg Header, The client ID - struct.pack('>h', 2), # Num acks required - struct.pack('>i', 100), # Request Timeout - struct.pack('>i', 2), # The number of requests + struct.pack('>i', 0x94), # The length of the message overall + struct.pack('>h', 0), # Msg Header, Message type = Produce + struct.pack('>h', 0), # Msg Header, API version + struct.pack('>i', 2), # Msg Header, Correlation ID + struct.pack('>h7s', 7, "client1"), # Msg Header, The client ID + struct.pack('>h', 2), # Num acks required + struct.pack('>i', 100), # Request Timeout + struct.pack('>i', 2), # The number of requests ]) total_len = len(msg_a_binary) + len(msg_b_binary) topic1 = "".join([ - struct.pack('>h6s', 6, 'topic1'), # The topic1 - struct.pack('>i', 1), # One message set - struct.pack('>i', 0), # Partition 0 - struct.pack('>i', total_len + 24), # Size of the incoming message set - struct.pack('>q', 0), # No offset specified - struct.pack('>i', len(msg_a_binary)), # Length of message - msg_a_binary, # Actual message - struct.pack('>q', 0), # No offset specified - struct.pack('>i', len(msg_b_binary)), # Length of message - msg_b_binary, # Actual message + struct.pack('>h6s', 6, 'topic1'), # The topic1 + struct.pack('>i', 1), # One message set + struct.pack('>i', 0), # Partition 0 + struct.pack('>i', total_len + 24), # Size of the incoming message set + struct.pack('>q', 0), # No offset specified + struct.pack('>i', len(msg_a_binary)), # Length of message + msg_a_binary, # Actual message + struct.pack('>q', 0), # No offset specified + struct.pack('>i', len(msg_b_binary)), # Length of message + msg_b_binary, # Actual message ]) topic2 = "".join([ |