summaryrefslogtreecommitdiff
path: root/kafka/protocol/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r--kafka/protocol/message.py27
1 files changed, 25 insertions, 2 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index f6cbb33..f893912 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -1,5 +1,6 @@
import io
+from ..codec import gzip_decode, snappy_decode
from . import pickle
from .struct import Struct
from .types import (
@@ -16,6 +17,9 @@ class Message(Struct):
('key', Bytes),
('value', Bytes)
)
+ CODEC_MASK = 0x03
+ CODEC_GZIP = 0x01
+ CODEC_SNAPPY = 0x02
def __init__(self, value, key=None, magic=0, attributes=0, crc=0):
self.crc = crc
@@ -49,6 +53,19 @@ class Message(Struct):
return True
return False
+ def is_compressed(self):
+ return self.attributes & self.CODEC_MASK != 0
+
+ def decompress(self):
+ codec = self.attributes & self.CODEC_MASK
+ assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY)
+ if codec == self.CODEC_GZIP:
+ raw_bytes = gzip_decode(self.value)
+ else:
+ raw_bytes = snappy_decode(self.value)
+
+ return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes))
+
class PartialMessage(bytes):
def __repr__(self):
@@ -81,8 +98,14 @@ class MessageSet(AbstractType):
return Int32.encode(len(encoded)) + encoded
@classmethod
- def decode(cls, data):
- bytes_to_read = Int32.decode(data)
+ def decode(cls, data, bytes_to_read=None):
+ """Compressed messages should pass in bytes_to_read (via message size)
+ otherwise, we decode from data as Int32
+ """
+ if isinstance(data, bytes):
+ data = io.BytesIO(data)
+ if bytes_to_read is None:
+ bytes_to_read = Int32.decode(data)
items = []
# We need at least 8 + 4 + 14 bytes to read offset + message size + message