diff options
Diffstat (limited to 'kafka/kafka.py')
| -rw-r--r-- | kafka/kafka.py | 23 | 
1 files changed, 2 insertions, 21 deletions
diff --git a/kafka/kafka.py b/kafka/kafka.py index ff9f53d..0cde87f 100644 --- a/kafka/kafka.py +++ b/kafka/kafka.py @@ -7,8 +7,9 @@ import socket  import struct  import zlib -log = logging.getLogger("org.apache.kafka") +from .codec import gzip_encode, gzip_decode +log = logging.getLogger("kafka")  error_codes = {     -1: "UnknownError", @@ -25,31 +26,11 @@ class KafkaException(Exception):      def __str__(self):          return str(errorType) -  Message = namedtuple("Message", ["magic", "attributes", "crc", "payload"])  FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size"])  ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"])  OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"]) -def gzip_encode(payload): -    buf = StringIO() -    f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6) -    f.write(payload) -    f.close() -    buf.seek(0) -    out = buf.read() -    buf.close() -    return out - -def gzip_decode(payload): -    buf = StringIO(payload) -    f = gzip.GzipFile(fileobj=buf, mode='r') -    out = f.read() -    f.close() -    buf.close() -    return out - -  def length_prefix_message(msg):      """      Prefix a message with it's length as an int  | 
