diff options
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r-- | test/test_protocol.py | 174 |
1 files changed, 93 insertions, 81 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py index e86b6f0..42cf808 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -37,24 +37,24 @@ class TestProtocol(unittest.TestCase): self.assertEqual(msg.key, None) # Need to decode to check since gzipped payload is non-deterministic decoded = gzip_decode(msg.value) - expect = ( - "\x00\x00\x00\x00\x00\x00\x00\x00" # MsgSet1 Offset - "\x00\x00\x00\x10" # MsgSet1 Size - "\x4c\x9f\x5b\xc2" # Msg1 CRC - "\x00" # Msg1 Magic - "\x00" # Msg1 Flags - "\xff\xff\xff\xff" # Msg1, null key - "\x00\x00\x00\x02" # Msg1, msg Size - "v1" # Msg1, contents - "\x00\x00\x00\x00\x00\x00\x00\x00" # MsgSet2 Offset - "\x00\x00\x00\x10" # MsgSet2 Size - "\xd5\x96\x0a\x78" # Msg2, CRC - "\x00" # Msg2, magic - "\x00" # Msg2, flags - "\xff\xff\xff\xff" # Msg2, null key - "\x00\x00\x00\x02" # Msg2, msg size - "v2" # Msg2, contents - ) + expect = "".join([ + struct.pack(">q", 0), # MsgSet offset + struct.pack(">i", 16), # MsgSet size + struct.pack(">i", 1285512130), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", -1), # -1 indicates a null key + struct.pack(">i", 2), # Msg length (bytes) + "v1", # Message contents + + struct.pack(">q", 0), # MsgSet offset + struct.pack(">i", 16), # MsgSet size + struct.pack(">i", -711587208), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", -1), # -1 indicates a null key + struct.pack(">i", 2), # Msg length (bytes) + "v2", # Message contents + ]) + self.assertEqual(decoded, expect) @unittest.skipUnless(has_snappy(), "Snappy not available") @@ -65,89 +65,101 @@ class TestProtocol(unittest.TestCase): self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY) self.assertEqual(msg.key, None) - expect = ( - "\x00\x00\x00\x00\x00\x00\x00\x00" # MsgSet1 Offset - "\x00\x00\x00\x10" # MsgSet1 Size - "\x4c\x9f\x5b\xc2" # Msg1 CRC - "\x00" # Msg1 Magic - "\x00" # Msg1 Flags - "\xff\xff\xff\xff" # Msg1, null key - "\x00\x00\x00\x02" # Msg1, msg Size - "v1" # Msg1, contents - "\x00\x00\x00\x00\x00\x00\x00\x00" # MsgSet2 Offset - "\x00\x00\x00\x10" # MsgSet2 Size - "\xd5\x96\x0a\x78" # Msg2, CRC - "\x00" # Msg2, magic - "\x00" # Msg2, flags - "\xff\xff\xff\xff" # Msg2, null key - "\x00\x00\x00\x02" # Msg2, msg size - "v2" # Msg2, contents - ) + expect = "".join([ + struct.pack(">q", 0), # MsgSet offset + struct.pack(">i", 16), # MsgSet size + struct.pack(">i", 1285512130), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", -1), # -1 indicates a null key + struct.pack(">i", 2), # Msg length (bytes) + "v1", # Message contents + + struct.pack(">q", 0), # MsgSet offset + struct.pack(">i", 16), # MsgSet size + struct.pack(">i", -711587208), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", -1), # -1 indicates a null key + struct.pack(">i", 2), # Msg length (bytes) + "v2", # Message contents + ]) + self.assertEqual(msg.value, expect) def test_encode_message_header(self): - expect = ( - "\x00\n" # API Key - "\x00\x00" # API Version - "\x00\x00\x00\x04" # CorrelationId - "\x00\x07" # Client length - "client3" # Client Id - ) + expect = "".join([ + struct.pack(">h", 10), # API Key + struct.pack(">h", 0), # API Version + struct.pack(">i", 4), # Correlation Id + struct.pack(">h", len("client3")), # Length of clientId + "client3", # ClientId + ]) + encoded = KafkaProtocol._encode_message_header("client3", 4, 10) self.assertEqual(encoded, expect) def test_encode_message(self): message = create_message("test", "key") encoded = KafkaProtocol._encode_message(message) - expect = ( - "\xaa\xf1\x8f\x5b" # CRC - "\x00" # Magic - "\x00" # Flags - "\x00\x00\x00\x03" # Key Length - "key" # Key contents - "\x00\x00\x00\x04" # Msg Length - "test" # Msg contents - ) + expect = "".join([ + struct.pack(">i", -1427009701), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 3), # Length of key + "key", # key + struct.pack(">i", 4), # Length of value + "test", # value + ]) + self.assertEqual(encoded, expect) + def test_decode_message(self): + encoded = "".join([ + struct.pack(">i", -1427009701), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 3), # Length of key + "key", # key + struct.pack(">i", 4), # Length of value + "test", # value + ]) + + offset = 10 + (returned_offset, decoded_message) = list(KafkaProtocol._decode_message(encoded, offset))[0] + + self.assertEqual(returned_offset, offset) + self.assertEqual(decoded_message, create_message("test", "key")) + def test_encode_message_failure(self): with self.assertRaises(ProtocolError): KafkaProtocol._encode_message(Message(1, 0, "key", "test")) def test_encode_message_set(self): - message_set = [create_message("v1", "k1"), create_message("v2", "k2")] + message_set = [ + create_message("v1", "k1"), + create_message("v2", "k2") + ] + encoded = KafkaProtocol._encode_message_set(message_set) - expect = ( - "\x00\x00\x00\x00\x00\x00\x00\x00" # Msgset1, Offset (Meaningless) - "\x00\x00\x00\x12" # Msgset1, Msg Size - "\x57\xe7\x49\x6e" # Msg1, CRC - "\x00" # Msg1, Magic - "\x00" # Msg1, Flags - "\x00\x00\x00\x02" # Msg1, key size - "k1" # Msg1, key - "\x00\x00\x00\x02" # Msg1, value size - "v1" # Msg1, value - "\x00\x00\x00\x00\x00\x00\x00\x00" # Msgset2, Offset (Meaningless) - "\x00\x00\x00\x12" # Msgset2, Msg Size - "\xff\x06\x02\x49" # Msg2, CRC - "\x00" # Msg2, Magic - "\x00" # Msg2, flags - "\x00\x00\x00\x02" # Msg2, key size - "k2" # Msg2, key - "\x00\x00\x00\x02" # Msg2, value size - "v2" # MSg2, value - ) + expect = "".join([ + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", 1474775406), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + "k1", # Key + struct.pack(">i", 2), # Length of value + "v1", # Value + + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", -16383415), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + "k2", # Key + struct.pack(">i", 2), # Length of value + "v2", # Value + ]) self.assertEqual(encoded, expect) - def test_decode_message(self): - encoded = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test" - offset = 10 - (returned_offset, decoded_message) = \ - list(KafkaProtocol._decode_message(encoded, offset))[0] - self.assertEqual(returned_offset, offset) - self.assertEqual(decoded_message, create_message("test", "key")) - def test_decode_message_set(self): encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2' '\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00' |