diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-01-07 10:52:01 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-08-13 19:47:36 -0700 |
commit | df227a6015992d8ddb79f5faa3f782d0042edd6b (patch) | |
tree | 064e31324df666ecc7575d5033948600f5df52fd /kafka/protocol/message.py | |
parent | f13ce1d87919ab763b02e38c17080580e199b4af (diff) | |
download | kafka-python-receive_bytes_pipe.tar.gz |
BrokerConnection.receive_bytes(data) -> response eventsreceive_bytes_pipe
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r-- | kafka/protocol/message.py | 7 |
1 files changed, 4 insertions, 3 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index efdf4fc..70d5b36 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -6,6 +6,7 @@ import time from ..codec import (has_gzip, has_snappy, has_lz4, gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka) +from .frame import KafkaBytes from .struct import Struct from .types import ( Int8, Int32, Int64, Bytes, Schema, AbstractType @@ -155,10 +156,10 @@ class MessageSet(AbstractType): @classmethod def encode(cls, items): # RecordAccumulator encodes messagesets internally - if isinstance(items, io.BytesIO): + if isinstance(items, (io.BytesIO, KafkaBytes)): size = Int32.decode(items) # rewind and return all the bytes - items.seek(-4, 1) + items.seek(items.tell() - 4) return items.read(size + 4) encoded_values = [] @@ -198,7 +199,7 @@ class MessageSet(AbstractType): @classmethod def repr(cls, messages): - if isinstance(messages, io.BytesIO): + if isinstance(messages, (KafkaBytes, io.BytesIO)): offset = messages.tell() decoded = cls.decode(messages) messages.seek(offset) |