summaryrefslogtreecommitdiff
path: root/kafka/protocol/message.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-08-15 13:00:02 -0700
committerGitHub <noreply@github.com>2017-08-15 13:00:02 -0700
commitba7afd9bc9362055ec0bedcf53eb6f8909dc22d2 (patch)
treef68b4dc2653df1e379da7b497e0fa76a19d6c5a5 /kafka/protocol/message.py
parentcbc6fdc4b973a6a94953c9ce9c33e54e415e45bf (diff)
downloadkafka-python-ba7afd9bc9362055ec0bedcf53eb6f8909dc22d2.tar.gz
BrokerConnection receive bytes pipe (#1032)
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r--kafka/protocol/message.py7
1 files changed, 4 insertions, 3 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index efdf4fc..70d5b36 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -6,6 +6,7 @@ import time
from ..codec import (has_gzip, has_snappy, has_lz4,
gzip_decode, snappy_decode,
lz4_decode, lz4_decode_old_kafka)
+from .frame import KafkaBytes
from .struct import Struct
from .types import (
Int8, Int32, Int64, Bytes, Schema, AbstractType
@@ -155,10 +156,10 @@ class MessageSet(AbstractType):
@classmethod
def encode(cls, items):
# RecordAccumulator encodes messagesets internally
- if isinstance(items, io.BytesIO):
+ if isinstance(items, (io.BytesIO, KafkaBytes)):
size = Int32.decode(items)
# rewind and return all the bytes
- items.seek(-4, 1)
+ items.seek(items.tell() - 4)
return items.read(size + 4)
encoded_values = []
@@ -198,7 +199,7 @@ class MessageSet(AbstractType):
@classmethod
def repr(cls, messages):
- if isinstance(messages, io.BytesIO):
+ if isinstance(messages, (KafkaBytes, io.BytesIO)):
offset = messages.tell()
decoded = cls.decode(messages)
messages.seek(offset)