summaryrefslogtreecommitdiff
path: root/kafka/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol')
-rw-r--r--kafka/protocol/fetch.py11
-rw-r--r--kafka/protocol/legacy.py6
-rw-r--r--kafka/protocol/message.py12
-rw-r--r--kafka/protocol/produce.py7
4 files changed, 20 insertions, 16 deletions
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py
index 359f197..0b03845 100644
--- a/kafka/protocol/fetch.py
+++ b/kafka/protocol/fetch.py
@@ -1,8 +1,7 @@
from __future__ import absolute_import
from .api import Request, Response
-from .message import MessageSet
-from .types import Array, Int8, Int16, Int32, Int64, Schema, String
+from .types import Array, Int8, Int16, Int32, Int64, Schema, String, Bytes
class FetchResponse_v0(Response):
@@ -15,7 +14,7 @@ class FetchResponse_v0(Response):
('partition', Int32),
('error_code', Int16),
('highwater_offset', Int64),
- ('message_set', MessageSet)))))
+ ('message_set', Bytes)))))
)
@@ -30,7 +29,7 @@ class FetchResponse_v1(Response):
('partition', Int32),
('error_code', Int16),
('highwater_offset', Int64),
- ('message_set', MessageSet)))))
+ ('message_set', Bytes)))))
)
@@ -61,7 +60,7 @@ class FetchResponse_v4(Response):
('aborted_transactions', Array(
('producer_id', Int64),
('first_offset', Int64))),
- ('message_set', MessageSet)))))
+ ('message_set', Bytes)))))
)
@@ -81,7 +80,7 @@ class FetchResponse_v5(Response):
('aborted_transactions', Array(
('producer_id', Int64),
('first_offset', Int64))),
- ('message_set', MessageSet)))))
+ ('message_set', Bytes)))))
)
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index 37145b7..b8f84e7 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -19,6 +19,7 @@ from kafka.structs import ConsumerMetadataResponse
from kafka.util import (
crc32, read_short_string, relative_unpack,
write_int_string, group_by_topic_and_partition)
+from kafka.protocol.message import MessageSet
log = logging.getLogger(__name__)
@@ -144,7 +145,7 @@ class KafkaProtocol(object):
magic=msg.magic, attributes=msg.attributes
)
partition_msgs.append((0, m.encode()))
- topic_msgs.append((partition, partition_msgs))
+ topic_msgs.append((partition, MessageSet.encode(partition_msgs, prepend_size=False)))
topics.append((topic, topic_msgs))
@@ -215,7 +216,8 @@ class KafkaProtocol(object):
]
@classmethod
- def decode_message_set(cls, messages):
+ def decode_message_set(cls, raw_data):
+ messages = MessageSet.decode(raw_data, bytes_to_read=len(raw_data))
for offset, _, message in messages:
if isinstance(message, kafka.protocol.message.Message) and message.is_compressed():
inner_messages = message.decompress()
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 70d5b36..f5a51a9 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -154,12 +154,13 @@ class MessageSet(AbstractType):
HEADER_SIZE = 12 # offset + message_size
@classmethod
- def encode(cls, items):
+ def encode(cls, items, prepend_size=True):
# RecordAccumulator encodes messagesets internally
if isinstance(items, (io.BytesIO, KafkaBytes)):
size = Int32.decode(items)
- # rewind and return all the bytes
- items.seek(items.tell() - 4)
+ if prepend_size:
+ # rewind and return all the bytes
+ items.seek(items.tell() - 4)
return items.read(size + 4)
encoded_values = []
@@ -167,7 +168,10 @@ class MessageSet(AbstractType):
encoded_values.append(Int64.encode(offset))
encoded_values.append(Bytes.encode(message))
encoded = b''.join(encoded_values)
- return Bytes.encode(encoded)
+ if prepend_size:
+ return Bytes.encode(encoded)
+ else:
+ return encoded
@classmethod
def decode(cls, data, bytes_to_read=None):
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py
index da1f308..34ff949 100644
--- a/kafka/protocol/produce.py
+++ b/kafka/protocol/produce.py
@@ -1,8 +1,7 @@
from __future__ import absolute_import
from .api import Request, Response
-from .message import MessageSet
-from .types import Int16, Int32, Int64, String, Array, Schema
+from .types import Int16, Int32, Int64, String, Array, Schema, Bytes
class ProduceResponse_v0(Response):
@@ -64,7 +63,7 @@ class ProduceRequest_v0(Request):
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
- ('messages', MessageSet)))))
+ ('messages', Bytes)))))
)
def expect_response(self):
@@ -109,7 +108,7 @@ class ProduceRequest_v3(Request):
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
- ('messages', MessageSet)))))
+ ('messages', Bytes)))))
)
def expect_response(self):