diff options
Diffstat (limited to 'kafka/protocol')
-rw-r--r-- | kafka/protocol/fetch.py | 11 | ||||
-rw-r--r-- | kafka/protocol/legacy.py | 6 | ||||
-rw-r--r-- | kafka/protocol/message.py | 12 | ||||
-rw-r--r-- | kafka/protocol/produce.py | 7 |
4 files changed, 20 insertions, 16 deletions
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index 359f197..0b03845 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -1,8 +1,7 @@ from __future__ import absolute_import from .api import Request, Response -from .message import MessageSet -from .types import Array, Int8, Int16, Int32, Int64, Schema, String +from .types import Array, Int8, Int16, Int32, Int64, Schema, String, Bytes class FetchResponse_v0(Response): @@ -15,7 +14,7 @@ class FetchResponse_v0(Response): ('partition', Int32), ('error_code', Int16), ('highwater_offset', Int64), - ('message_set', MessageSet))))) + ('message_set', Bytes))))) ) @@ -30,7 +29,7 @@ class FetchResponse_v1(Response): ('partition', Int32), ('error_code', Int16), ('highwater_offset', Int64), - ('message_set', MessageSet))))) + ('message_set', Bytes))))) ) @@ -61,7 +60,7 @@ class FetchResponse_v4(Response): ('aborted_transactions', Array( ('producer_id', Int64), ('first_offset', Int64))), - ('message_set', MessageSet))))) + ('message_set', Bytes))))) ) @@ -81,7 +80,7 @@ class FetchResponse_v5(Response): ('aborted_transactions', Array( ('producer_id', Int64), ('first_offset', Int64))), - ('message_set', MessageSet))))) + ('message_set', Bytes))))) ) diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 37145b7..b8f84e7 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -19,6 +19,7 @@ from kafka.structs import ConsumerMetadataResponse from kafka.util import ( crc32, read_short_string, relative_unpack, write_int_string, group_by_topic_and_partition) +from kafka.protocol.message import MessageSet log = logging.getLogger(__name__) @@ -144,7 +145,7 @@ class KafkaProtocol(object): magic=msg.magic, attributes=msg.attributes ) partition_msgs.append((0, m.encode())) - topic_msgs.append((partition, partition_msgs)) + topic_msgs.append((partition, MessageSet.encode(partition_msgs, prepend_size=False))) topics.append((topic, topic_msgs)) @@ -215,7 +216,8 @@ class KafkaProtocol(object): ] @classmethod - def decode_message_set(cls, messages): + def decode_message_set(cls, raw_data): + messages = MessageSet.decode(raw_data, bytes_to_read=len(raw_data)) for offset, _, message in messages: if isinstance(message, kafka.protocol.message.Message) and message.is_compressed(): inner_messages = message.decompress() diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 70d5b36..f5a51a9 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -154,12 +154,13 @@ class MessageSet(AbstractType): HEADER_SIZE = 12 # offset + message_size @classmethod - def encode(cls, items): + def encode(cls, items, prepend_size=True): # RecordAccumulator encodes messagesets internally if isinstance(items, (io.BytesIO, KafkaBytes)): size = Int32.decode(items) - # rewind and return all the bytes - items.seek(items.tell() - 4) + if prepend_size: + # rewind and return all the bytes + items.seek(items.tell() - 4) return items.read(size + 4) encoded_values = [] @@ -167,7 +168,10 @@ class MessageSet(AbstractType): encoded_values.append(Int64.encode(offset)) encoded_values.append(Bytes.encode(message)) encoded = b''.join(encoded_values) - return Bytes.encode(encoded) + if prepend_size: + return Bytes.encode(encoded) + else: + return encoded @classmethod def decode(cls, data, bytes_to_read=None): diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index da1f308..34ff949 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -1,8 +1,7 @@ from __future__ import absolute_import from .api import Request, Response -from .message import MessageSet -from .types import Int16, Int32, Int64, String, Array, Schema +from .types import Int16, Int32, Int64, String, Array, Schema, Bytes class ProduceResponse_v0(Response): @@ -64,7 +63,7 @@ class ProduceRequest_v0(Request): ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('messages', MessageSet))))) + ('messages', Bytes))))) ) def expect_response(self): @@ -109,7 +108,7 @@ class ProduceRequest_v3(Request): ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('messages', MessageSet))))) + ('messages', Bytes))))) ) def expect_response(self): |