summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--kafka/__init__.py2
-rw-r--r--kafka/codec.py23
-rw-r--r--kafka/kafka.py23
-rw-r--r--test/unit.py4
5 files changed, 29 insertions, 24 deletions
diff --git a/.gitignore b/.gitignore
index 0d20b64..27ffc2f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,2 @@
*.pyc
+build
diff --git a/kafka/__init__.py b/kafka/__init__.py
index 69bc75f..1e01ea9 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -1 +1 @@
-__all__ = ["kafka"]
+__all__ = ["kafka","codec"]
diff --git a/kafka/codec.py b/kafka/codec.py
new file mode 100644
index 0000000..47ab074
--- /dev/null
+++ b/kafka/codec.py
@@ -0,0 +1,23 @@
+from cStringIO import StringIO
+import gzip
+import logging
+
+log = logging.getLogger("kafka.codec")
+
+def gzip_encode(payload):
+ buf = StringIO()
+ f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6)
+ f.write(payload)
+ f.close()
+ buf.seek(0)
+ out = buf.read()
+ buf.close()
+ return out
+
+def gzip_decode(payload):
+ buf = StringIO(payload)
+ f = gzip.GzipFile(fileobj=buf, mode='r')
+ out = f.read()
+ f.close()
+ buf.close()
+ return out
diff --git a/kafka/kafka.py b/kafka/kafka.py
index ff9f53d..0cde87f 100644
--- a/kafka/kafka.py
+++ b/kafka/kafka.py
@@ -7,8 +7,9 @@ import socket
import struct
import zlib
-log = logging.getLogger("org.apache.kafka")
+from .codec import gzip_encode, gzip_decode
+log = logging.getLogger("kafka")
error_codes = {
-1: "UnknownError",
@@ -25,31 +26,11 @@ class KafkaException(Exception):
def __str__(self):
return str(errorType)
-
Message = namedtuple("Message", ["magic", "attributes", "crc", "payload"])
FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size"])
ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"])
OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"])
-def gzip_encode(payload):
- buf = StringIO()
- f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6)
- f.write(payload)
- f.close()
- buf.seek(0)
- out = buf.read()
- buf.close()
- return out
-
-def gzip_decode(payload):
- buf = StringIO(payload)
- f = gzip.GzipFile(fileobj=buf, mode='r')
- out = f.read()
- f.close()
- buf.close()
- return out
-
-
def length_prefix_message(msg):
"""
Prefix a message with it's length as an int
diff --git a/test/unit.py b/test/unit.py
index c77aa81..2db0857 100644
--- a/test/unit.py
+++ b/test/unit.py
@@ -3,8 +3,8 @@ import random
import struct
import unittest
-from kafka.kafka import KafkaClient, ProduceRequest, FetchRequest
-from kafka.kafka import gzip_encode, gzip_decode, length_prefix_message
+from kafka.kafka import KafkaClient, ProduceRequest, FetchRequest, length_prefix_message
+from kafka.codec import gzip_encode, gzip_decode
ITERATIONS = 1000
STRLEN = 100