From c99d4119f2d6d0fdee38a159e1c845e42cf398b1 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 2 Oct 2012 12:14:48 -0400 Subject: Moved codec stuff into it's own module Snappy will go there when I get around to it --- .gitignore | 1 + kafka/__init__.py | 2 +- kafka/codec.py | 23 +++++++++++++++++++++++ kafka/kafka.py | 23 ++--------------------- test/unit.py | 4 ++-- 5 files changed, 29 insertions(+), 24 deletions(-) create mode 100644 kafka/codec.py diff --git a/.gitignore b/.gitignore index 0d20b64..27ffc2f 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.pyc +build diff --git a/kafka/__init__.py b/kafka/__init__.py index 69bc75f..1e01ea9 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -1 +1 @@ -__all__ = ["kafka"] +__all__ = ["kafka","codec"] diff --git a/kafka/codec.py b/kafka/codec.py new file mode 100644 index 0000000..47ab074 --- /dev/null +++ b/kafka/codec.py @@ -0,0 +1,23 @@ +from cStringIO import StringIO +import gzip +import logging + +log = logging.getLogger("kafka.codec") + +def gzip_encode(payload): + buf = StringIO() + f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6) + f.write(payload) + f.close() + buf.seek(0) + out = buf.read() + buf.close() + return out + +def gzip_decode(payload): + buf = StringIO(payload) + f = gzip.GzipFile(fileobj=buf, mode='r') + out = f.read() + f.close() + buf.close() + return out diff --git a/kafka/kafka.py b/kafka/kafka.py index ff9f53d..0cde87f 100644 --- a/kafka/kafka.py +++ b/kafka/kafka.py @@ -7,8 +7,9 @@ import socket import struct import zlib -log = logging.getLogger("org.apache.kafka") +from .codec import gzip_encode, gzip_decode +log = logging.getLogger("kafka") error_codes = { -1: "UnknownError", @@ -25,31 +26,11 @@ class KafkaException(Exception): def __str__(self): return str(errorType) - Message = namedtuple("Message", ["magic", "attributes", "crc", "payload"]) FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size"]) ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"]) -def gzip_encode(payload): - buf = StringIO() - f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6) - f.write(payload) - f.close() - buf.seek(0) - out = buf.read() - buf.close() - return out - -def gzip_decode(payload): - buf = StringIO(payload) - f = gzip.GzipFile(fileobj=buf, mode='r') - out = f.read() - f.close() - buf.close() - return out - - def length_prefix_message(msg): """ Prefix a message with it's length as an int diff --git a/test/unit.py b/test/unit.py index c77aa81..2db0857 100644 --- a/test/unit.py +++ b/test/unit.py @@ -3,8 +3,8 @@ import random import struct import unittest -from kafka.kafka import KafkaClient, ProduceRequest, FetchRequest -from kafka.kafka import gzip_encode, gzip_decode, length_prefix_message +from kafka.kafka import KafkaClient, ProduceRequest, FetchRequest, length_prefix_message +from kafka.codec import gzip_encode, gzip_decode ITERATIONS = 1000 STRLEN = 100 -- cgit v1.2.1