diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-17 11:48:19 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-17 12:55:42 -0700 |
commit | d8bb5d2efa9f2df09421e77040edd0caa2643cec (patch) | |
tree | b3530170dada8b1b0cccb3d217dee00da8cf519f | |
parent | cd9aa73c8463fa0214d6ee6109c63396e623dbaa (diff) | |
download | kafka-python-message_set_encode_size.tar.gz |
Always encode size with MessageSetmessage_set_encode_size
-rw-r--r-- | kafka/protocol/message.py | 4 | ||||
-rw-r--r-- | test/test_protocol.py | 4 |
2 files changed, 3 insertions, 5 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 656c131..795495d 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -143,7 +143,7 @@ class MessageSet(AbstractType): HEADER_SIZE = 12 # offset + message_size @classmethod - def encode(cls, items, size=True, recalc_message_size=True): + def encode(cls, items): # RecordAccumulator encodes messagesets internally if isinstance(items, io.BytesIO): size = Int32.decode(items) @@ -156,8 +156,6 @@ class MessageSet(AbstractType): encoded_values.append(Int64.encode(offset)) encoded_values.append(Bytes.encode(message)) encoded = b''.join(encoded_values) - if not size: - return encoded return Bytes.encode(encoded) @classmethod diff --git a/test/test_protocol.py b/test/test_protocol.py index 2b52f48..1c9f0f9 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -72,8 +72,7 @@ def test_encode_message_set(): Message(b'v2', key=b'k2') ] encoded = MessageSet.encode([(0, msg.encode()) - for msg in messages], - size=False) + for msg in messages]) expect = b''.join([ struct.pack('>q', 0), # MsgSet Offset struct.pack('>i', 18), # Msg Size @@ -93,6 +92,7 @@ def test_encode_message_set(): struct.pack('>i', 2), # Length of value b'v2', # Value ]) + expect = struct.pack('>i', len(expect)) + expect assert encoded == expect |