summaryrefslogtreecommitdiff
path: root/test/test_protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r--test/test_protocol.py174
1 files changed, 93 insertions, 81 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index e86b6f0..42cf808 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -37,24 +37,24 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(msg.key, None)
# Need to decode to check since gzipped payload is non-deterministic
decoded = gzip_decode(msg.value)
- expect = (
- "\x00\x00\x00\x00\x00\x00\x00\x00" # MsgSet1 Offset
- "\x00\x00\x00\x10" # MsgSet1 Size
- "\x4c\x9f\x5b\xc2" # Msg1 CRC
- "\x00" # Msg1 Magic
- "\x00" # Msg1 Flags
- "\xff\xff\xff\xff" # Msg1, null key
- "\x00\x00\x00\x02" # Msg1, msg Size
- "v1" # Msg1, contents
- "\x00\x00\x00\x00\x00\x00\x00\x00" # MsgSet2 Offset
- "\x00\x00\x00\x10" # MsgSet2 Size
- "\xd5\x96\x0a\x78" # Msg2, CRC
- "\x00" # Msg2, magic
- "\x00" # Msg2, flags
- "\xff\xff\xff\xff" # Msg2, null key
- "\x00\x00\x00\x02" # Msg2, msg size
- "v2" # Msg2, contents
- )
+ expect = "".join([
+ struct.pack(">q", 0), # MsgSet offset
+ struct.pack(">i", 16), # MsgSet size
+ struct.pack(">i", 1285512130), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", -1), # -1 indicates a null key
+ struct.pack(">i", 2), # Msg length (bytes)
+ "v1", # Message contents
+
+ struct.pack(">q", 0), # MsgSet offset
+ struct.pack(">i", 16), # MsgSet size
+ struct.pack(">i", -711587208), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", -1), # -1 indicates a null key
+ struct.pack(">i", 2), # Msg length (bytes)
+ "v2", # Message contents
+ ])
+
self.assertEqual(decoded, expect)
@unittest.skipUnless(has_snappy(), "Snappy not available")
@@ -65,89 +65,101 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
KafkaProtocol.CODEC_SNAPPY)
self.assertEqual(msg.key, None)
- expect = (
- "\x00\x00\x00\x00\x00\x00\x00\x00" # MsgSet1 Offset
- "\x00\x00\x00\x10" # MsgSet1 Size
- "\x4c\x9f\x5b\xc2" # Msg1 CRC
- "\x00" # Msg1 Magic
- "\x00" # Msg1 Flags
- "\xff\xff\xff\xff" # Msg1, null key
- "\x00\x00\x00\x02" # Msg1, msg Size
- "v1" # Msg1, contents
- "\x00\x00\x00\x00\x00\x00\x00\x00" # MsgSet2 Offset
- "\x00\x00\x00\x10" # MsgSet2 Size
- "\xd5\x96\x0a\x78" # Msg2, CRC
- "\x00" # Msg2, magic
- "\x00" # Msg2, flags
- "\xff\xff\xff\xff" # Msg2, null key
- "\x00\x00\x00\x02" # Msg2, msg size
- "v2" # Msg2, contents
- )
+ expect = "".join([
+ struct.pack(">q", 0), # MsgSet offset
+ struct.pack(">i", 16), # MsgSet size
+ struct.pack(">i", 1285512130), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", -1), # -1 indicates a null key
+ struct.pack(">i", 2), # Msg length (bytes)
+ "v1", # Message contents
+
+ struct.pack(">q", 0), # MsgSet offset
+ struct.pack(">i", 16), # MsgSet size
+ struct.pack(">i", -711587208), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", -1), # -1 indicates a null key
+ struct.pack(">i", 2), # Msg length (bytes)
+ "v2", # Message contents
+ ])
+
self.assertEqual(msg.value, expect)
def test_encode_message_header(self):
- expect = (
- "\x00\n" # API Key
- "\x00\x00" # API Version
- "\x00\x00\x00\x04" # CorrelationId
- "\x00\x07" # Client length
- "client3" # Client Id
- )
+ expect = "".join([
+ struct.pack(">h", 10), # API Key
+ struct.pack(">h", 0), # API Version
+ struct.pack(">i", 4), # Correlation Id
+ struct.pack(">h", len("client3")), # Length of clientId
+ "client3", # ClientId
+ ])
+
encoded = KafkaProtocol._encode_message_header("client3", 4, 10)
self.assertEqual(encoded, expect)
def test_encode_message(self):
message = create_message("test", "key")
encoded = KafkaProtocol._encode_message(message)
- expect = (
- "\xaa\xf1\x8f\x5b" # CRC
- "\x00" # Magic
- "\x00" # Flags
- "\x00\x00\x00\x03" # Key Length
- "key" # Key contents
- "\x00\x00\x00\x04" # Msg Length
- "test" # Msg contents
- )
+ expect = "".join([
+ struct.pack(">i", -1427009701), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", 3), # Length of key
+ "key", # key
+ struct.pack(">i", 4), # Length of value
+ "test", # value
+ ])
+
self.assertEqual(encoded, expect)
+ def test_decode_message(self):
+ encoded = "".join([
+ struct.pack(">i", -1427009701), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", 3), # Length of key
+ "key", # key
+ struct.pack(">i", 4), # Length of value
+ "test", # value
+ ])
+
+ offset = 10
+ (returned_offset, decoded_message) = list(KafkaProtocol._decode_message(encoded, offset))[0]
+
+ self.assertEqual(returned_offset, offset)
+ self.assertEqual(decoded_message, create_message("test", "key"))
+
def test_encode_message_failure(self):
with self.assertRaises(ProtocolError):
KafkaProtocol._encode_message(Message(1, 0, "key", "test"))
def test_encode_message_set(self):
- message_set = [create_message("v1", "k1"), create_message("v2", "k2")]
+ message_set = [
+ create_message("v1", "k1"),
+ create_message("v2", "k2")
+ ]
+
encoded = KafkaProtocol._encode_message_set(message_set)
- expect = (
- "\x00\x00\x00\x00\x00\x00\x00\x00" # Msgset1, Offset (Meaningless)
- "\x00\x00\x00\x12" # Msgset1, Msg Size
- "\x57\xe7\x49\x6e" # Msg1, CRC
- "\x00" # Msg1, Magic
- "\x00" # Msg1, Flags
- "\x00\x00\x00\x02" # Msg1, key size
- "k1" # Msg1, key
- "\x00\x00\x00\x02" # Msg1, value size
- "v1" # Msg1, value
- "\x00\x00\x00\x00\x00\x00\x00\x00" # Msgset2, Offset (Meaningless)
- "\x00\x00\x00\x12" # Msgset2, Msg Size
- "\xff\x06\x02\x49" # Msg2, CRC
- "\x00" # Msg2, Magic
- "\x00" # Msg2, flags
- "\x00\x00\x00\x02" # Msg2, key size
- "k2" # Msg2, key
- "\x00\x00\x00\x02" # Msg2, value size
- "v2" # MSg2, value
- )
+ expect = "".join([
+ struct.pack(">q", 0), # MsgSet Offset
+ struct.pack(">i", 18), # Msg Size
+ struct.pack(">i", 1474775406), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", 2), # Length of key
+ "k1", # Key
+ struct.pack(">i", 2), # Length of value
+ "v1", # Value
+
+ struct.pack(">q", 0), # MsgSet Offset
+ struct.pack(">i", 18), # Msg Size
+ struct.pack(">i", -16383415), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", 2), # Length of key
+ "k2", # Key
+ struct.pack(">i", 2), # Length of value
+ "v2", # Value
+ ])
self.assertEqual(encoded, expect)
- def test_decode_message(self):
- encoded = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test"
- offset = 10
- (returned_offset, decoded_message) = \
- list(KafkaProtocol._decode_message(encoded, offset))[0]
- self.assertEqual(returned_offset, offset)
- self.assertEqual(decoded_message, create_message("test", "key"))
-
def test_decode_message_set(self):
encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2'
'\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00'