diff options
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r-- | test/test_protocol.py | 146 |
1 files changed, 146 insertions, 0 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py new file mode 100644 index 0000000..247fcc3 --- /dev/null +++ b/test/test_protocol.py @@ -0,0 +1,146 @@ +#pylint: skip-file +import struct + +import pytest +import six + +from kafka.protocol.api import RequestHeader +from kafka.protocol.commit import GroupCoordinatorRequest +from kafka.protocol.message import Message, MessageSet + + +def test_create_message(): + payload = b'test' + key = b'key' + msg = Message(payload, key=key) + assert msg.magic == 0 + assert msg.attributes == 0 + assert msg.key == key + assert msg.value == payload + + +def test_encode_message_v0(): + message = Message(b'test', key=b'key') + encoded = message.encode() + expect = b''.join([ + struct.pack('>i', -1427009701), # CRC + struct.pack('>bb', 0, 0), # Magic, flags + struct.pack('>i', 3), # Length of key + b'key', # key + struct.pack('>i', 4), # Length of value + b'test', # value + ]) + assert encoded == expect + + +def test_encode_message_v1(): + message = Message(b'test', key=b'key', magic=1, timestamp=1234) + encoded = message.encode() + expect = b''.join([ + struct.pack('>i', 1331087195), # CRC + struct.pack('>bb', 1, 0), # Magic, flags + struct.pack('>q', 1234), # Timestamp + struct.pack('>i', 3), # Length of key + b'key', # key + struct.pack('>i', 4), # Length of value + b'test', # value + ]) + assert encoded == expect + + +def test_decode_message(): + encoded = b''.join([ + struct.pack('>i', -1427009701), # CRC + struct.pack('>bb', 0, 0), # Magic, flags + struct.pack('>i', 3), # Length of key + b'key', # key + struct.pack('>i', 4), # Length of value + b'test', # value + ]) + decoded_message = Message.decode(encoded) + msg = Message(b'test', key=b'key') + msg.encode() # crc is recalculated during encoding + assert decoded_message == msg + + +def test_encode_message_set(): + messages = [ + Message(b'v1', key=b'k1'), + Message(b'v2', key=b'k2') + ] + encoded = MessageSet.encode([(0, msg.encode()) + for msg in messages], + size=False) + expect = b''.join([ + struct.pack('>q', 0), # MsgSet Offset + struct.pack('>i', 18), # Msg Size + struct.pack('>i', 1474775406), # CRC + struct.pack('>bb', 0, 0), # Magic, flags + struct.pack('>i', 2), # Length of key + b'k1', # Key + struct.pack('>i', 2), # Length of value + b'v1', # Value + + struct.pack('>q', 0), # MsgSet Offset + struct.pack('>i', 18), # Msg Size + struct.pack('>i', -16383415), # CRC + struct.pack('>bb', 0, 0), # Magic, flags + struct.pack('>i', 2), # Length of key + b'k2', # Key + struct.pack('>i', 2), # Length of value + b'v2', # Value + ]) + assert encoded == expect + + +def test_decode_message_set(): + encoded = b''.join([ + struct.pack('>q', 0), # MsgSet Offset + struct.pack('>i', 18), # Msg Size + struct.pack('>i', 1474775406), # CRC + struct.pack('>bb', 0, 0), # Magic, flags + struct.pack('>i', 2), # Length of key + b'k1', # Key + struct.pack('>i', 2), # Length of value + b'v1', # Value + + struct.pack('>q', 1), # MsgSet Offset + struct.pack('>i', 18), # Msg Size + struct.pack('>i', -16383415), # CRC + struct.pack('>bb', 0, 0), # Magic, flags + struct.pack('>i', 2), # Length of key + b'k2', # Key + struct.pack('>i', 2), # Length of value + b'v2', # Value + ]) + + msgs = MessageSet.decode(encoded, bytes_to_read=len(encoded)) + assert len(msgs) == 2 + msg1, msg2 = msgs + + returned_offset1, message1_size, decoded_message1 = msg1 + returned_offset2, message2_size, decoded_message2 = msg2 + + assert returned_offset1 == 0 + message1 = Message(b'v1', key=b'k1') + message1.encode() + assert decoded_message1 == message1 + + assert returned_offset2 == 1 + message2 = Message(b'v2', key=b'k2') + message2.encode() + assert decoded_message2 == message2 + + +def test_encode_message_header(): + expect = b''.join([ + struct.pack('>h', 10), # API Key + struct.pack('>h', 0), # API Version + struct.pack('>i', 4), # Correlation Id + struct.pack('>h', len('client3')), # Length of clientId + b'client3', # ClientId + ]) + + req = GroupCoordinatorRequest[0]('foo') + header = RequestHeader(req, correlation_id=4, client_id='client3') + assert header.encode() == expect |