1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
#pylint: skip-file
import struct
import pytest
import six
from kafka.protocol.api import RequestHeader
from kafka.protocol.commit import GroupCoordinatorRequest
from kafka.protocol.message import Message, MessageSet
def test_create_message():
payload = b'test'
key = b'key'
msg = Message(payload, key=key)
assert msg.magic == 0
assert msg.attributes == 0
assert msg.key == key
assert msg.value == payload
def test_encode_message_v0():
message = Message(b'test', key=b'key')
encoded = message.encode()
expect = b''.join([
struct.pack('>i', -1427009701), # CRC
struct.pack('>bb', 0, 0), # Magic, flags
struct.pack('>i', 3), # Length of key
b'key', # key
struct.pack('>i', 4), # Length of value
b'test', # value
])
assert encoded == expect
def test_encode_message_v1():
message = Message(b'test', key=b'key', magic=1, timestamp=1234)
encoded = message.encode()
expect = b''.join([
struct.pack('>i', 1331087195), # CRC
struct.pack('>bb', 1, 0), # Magic, flags
struct.pack('>q', 1234), # Timestamp
struct.pack('>i', 3), # Length of key
b'key', # key
struct.pack('>i', 4), # Length of value
b'test', # value
])
assert encoded == expect
def test_decode_message():
encoded = b''.join([
struct.pack('>i', -1427009701), # CRC
struct.pack('>bb', 0, 0), # Magic, flags
struct.pack('>i', 3), # Length of key
b'key', # key
struct.pack('>i', 4), # Length of value
b'test', # value
])
decoded_message = Message.decode(encoded)
msg = Message(b'test', key=b'key')
msg.encode() # crc is recalculated during encoding
assert decoded_message == msg
def test_encode_message_set():
messages = [
Message(b'v1', key=b'k1'),
Message(b'v2', key=b'k2')
]
encoded = MessageSet.encode([(0, msg.encode())
for msg in messages],
size=False)
expect = b''.join([
struct.pack('>q', 0), # MsgSet Offset
struct.pack('>i', 18), # Msg Size
struct.pack('>i', 1474775406), # CRC
struct.pack('>bb', 0, 0), # Magic, flags
struct.pack('>i', 2), # Length of key
b'k1', # Key
struct.pack('>i', 2), # Length of value
b'v1', # Value
struct.pack('>q', 0), # MsgSet Offset
struct.pack('>i', 18), # Msg Size
struct.pack('>i', -16383415), # CRC
struct.pack('>bb', 0, 0), # Magic, flags
struct.pack('>i', 2), # Length of key
b'k2', # Key
struct.pack('>i', 2), # Length of value
b'v2', # Value
])
assert encoded == expect
def test_decode_message_set():
encoded = b''.join([
struct.pack('>q', 0), # MsgSet Offset
struct.pack('>i', 18), # Msg Size
struct.pack('>i', 1474775406), # CRC
struct.pack('>bb', 0, 0), # Magic, flags
struct.pack('>i', 2), # Length of key
b'k1', # Key
struct.pack('>i', 2), # Length of value
b'v1', # Value
struct.pack('>q', 1), # MsgSet Offset
struct.pack('>i', 18), # Msg Size
struct.pack('>i', -16383415), # CRC
struct.pack('>bb', 0, 0), # Magic, flags
struct.pack('>i', 2), # Length of key
b'k2', # Key
struct.pack('>i', 2), # Length of value
b'v2', # Value
])
msgs = MessageSet.decode(encoded, bytes_to_read=len(encoded))
assert len(msgs) == 2
msg1, msg2 = msgs
returned_offset1, message1_size, decoded_message1 = msg1
returned_offset2, message2_size, decoded_message2 = msg2
assert returned_offset1 == 0
message1 = Message(b'v1', key=b'k1')
message1.encode()
assert decoded_message1 == message1
assert returned_offset2 == 1
message2 = Message(b'v2', key=b'k2')
message2.encode()
assert decoded_message2 == message2
def test_encode_message_header():
expect = b''.join([
struct.pack('>h', 10), # API Key
struct.pack('>h', 0), # API Version
struct.pack('>i', 4), # Correlation Id
struct.pack('>h', len('client3')), # Length of clientId
b'client3', # ClientId
])
req = GroupCoordinatorRequest[0]('foo')
header = RequestHeader(req, correlation_id=4, client_id='client3')
assert header.encode() == expect
|