summaryrefslogtreecommitdiff
path: root/test/test_protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r--test/test_protocol.py63
1 files changed, 36 insertions, 27 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index a0b8b39..507cc8b 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -65,6 +65,7 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
KafkaProtocol.CODEC_SNAPPY)
self.assertEqual(msg.key, None)
+ decoded = snappy_decode(msg.value)
expect = "".join([
struct.pack(">q", 0), # MsgSet offset
struct.pack(">i", 16), # MsgSet size
@@ -83,7 +84,7 @@ class TestProtocol(unittest.TestCase):
"v2", # Message contents
])
- self.assertEqual(msg.value, expect)
+ self.assertEqual(decoded, expect)
def test_encode_message_header(self):
expect = "".join([
@@ -202,12 +203,16 @@ class TestProtocol(unittest.TestCase):
'\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00'
'\x00')
offset = 11
- decoded = list(KafkaProtocol._decode_message(gzip_encoded, offset))
- self.assertEqual(len(decoded), 2)
- (returned_offset1, decoded_message1) = decoded[0]
+ messages = list(KafkaProtocol._decode_message(gzip_encoded, offset))
+
+ self.assertEqual(len(messages), 2)
+ msg1, msg2 = messages
+
+ returned_offset1, decoded_message1 = msg1
self.assertEqual(returned_offset1, 0)
self.assertEqual(decoded_message1, create_message("v1"))
- (returned_offset2, decoded_message2) = decoded[1]
+
+ returned_offset2, decoded_message2 = msg2
self.assertEqual(returned_offset2, 0)
self.assertEqual(decoded_message2, create_message("v2"))
@@ -218,12 +223,16 @@ class TestProtocol(unittest.TestCase):
'\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5'
'\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2')
offset = 11
- decoded = list(KafkaProtocol._decode_message(snappy_encoded, offset))
- self.assertEqual(len(decoded), 2)
- (returned_offset1, decoded_message1) = decoded[0]
+ messages = list(KafkaProtocol._decode_message(snappy_encoded, offset))
+ self.assertEqual(len(messages), 2)
+
+ msg1, msg2 = messages
+
+ returned_offset1, decoded_message1 = msg1
self.assertEqual(returned_offset1, 0)
self.assertEqual(decoded_message1, create_message("v1"))
- (returned_offset2, decoded_message2) = decoded[1]
+
+ returned_offset2, decoded_message2 = msg2
self.assertEqual(returned_offset2, 0)
self.assertEqual(decoded_message2, create_message("v2"))
@@ -289,28 +298,28 @@ class TestProtocol(unittest.TestCase):
msg_c_binary = KafkaProtocol._encode_message(create_message("c"))
header = "".join([
- struct.pack('>i', 0x94), # The length of the message overall
- struct.pack('>h', 0), # Msg Header, Message type = Produce
- struct.pack('>h', 0), # Msg Header, API version
- struct.pack('>i', 2), # Msg Header, Correlation ID
- struct.pack('>h7s', 7, "client1"), # Msg Header, The client ID
- struct.pack('>h', 2), # Num acks required
- struct.pack('>i', 100), # Request Timeout
- struct.pack('>i', 2), # The number of requests
+ struct.pack('>i', 0x94), # The length of the message overall
+ struct.pack('>h', 0), # Msg Header, Message type = Produce
+ struct.pack('>h', 0), # Msg Header, API version
+ struct.pack('>i', 2), # Msg Header, Correlation ID
+ struct.pack('>h7s', 7, "client1"), # Msg Header, The client ID
+ struct.pack('>h', 2), # Num acks required
+ struct.pack('>i', 100), # Request Timeout
+ struct.pack('>i', 2), # The number of requests
])
total_len = len(msg_a_binary) + len(msg_b_binary)
topic1 = "".join([
- struct.pack('>h6s', 6, 'topic1'), # The topic1
- struct.pack('>i', 1), # One message set
- struct.pack('>i', 0), # Partition 0
- struct.pack('>i', total_len + 24), # Size of the incoming message set
- struct.pack('>q', 0), # No offset specified
- struct.pack('>i', len(msg_a_binary)), # Length of message
- msg_a_binary, # Actual message
- struct.pack('>q', 0), # No offset specified
- struct.pack('>i', len(msg_b_binary)), # Length of message
- msg_b_binary, # Actual message
+ struct.pack('>h6s', 6, 'topic1'), # The topic1
+ struct.pack('>i', 1), # One message set
+ struct.pack('>i', 0), # Partition 0
+ struct.pack('>i', total_len + 24), # Size of the incoming message set
+ struct.pack('>q', 0), # No offset specified
+ struct.pack('>i', len(msg_a_binary)), # Length of message
+ msg_a_binary, # Actual message
+ struct.pack('>q', 0), # No offset specified
+ struct.pack('>i', len(msg_b_binary)), # Length of message
+ msg_b_binary, # Actual message
])
topic2 = "".join([