summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2012-11-16 15:02:12 -0500
committerDavid Arthur <mumrah@gmail.com>2012-11-16 15:02:49 -0500
commit26414d4a993b6d0fd9074b2e7c4abc4230ae52eb (patch)
tree79787ce084071488c92760b91570bd5920786f35
parent03abf98c0936a1d48af077ba9c41a11cd074ec5b (diff)
downloadkafka-python-26414d4a993b6d0fd9074b2e7c4abc4230ae52eb.tar.gz
Add Snappy support0.1-alpha
Fixes #2
-rw-r--r--README.md33
-rw-r--r--kafka/__init__.py1
-rw-r--r--kafka/client.py24
-rw-r--r--kafka/codec.py17
-rw-r--r--test/integration.py43
-rw-r--r--test/unit.py47
6 files changed, 144 insertions, 21 deletions
diff --git a/README.md b/README.md
index a1cec1b..c2dfa99 100644
--- a/README.md
+++ b/README.md
@@ -38,8 +38,29 @@ cd kafka-python
python setup.py install
```
+## Optional Snappy install
+
+Download and build Snappy from http://code.google.com/p/snappy/downloads/list
+
+```shell
+wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz
+tar xzvf snappy-1.0.5.tar.gz
+cd snappy-1.0.5
+./configure
+make
+sudo make install
+```
+
+Install the `python-snappy` module
+```shell
+pip install python-snappy
+```
+
# Tests
+Some of the tests will fail if Snappy is not installed. These tests will throw NotImplementedError. If you see other failures,
+they might be bugs - so please report them!
+
## Run the unit tests
```shell
@@ -137,5 +158,15 @@ for msg in kafka.iter_messages(FetchRequest("my-topic", 0, 0, 1024*1024), False)
print(msg.payload)
kafka.close()
```
-
This will only iterate through messages in the byte range of (0, 1024\*1024)
+
+## Create some compressed messages
+
+```python
+kafka = KafkaClient("localhost", 9092)
+messages = [kafka.create_snappy_message("testing 1"),
+ kafka.create_snappy_message("testing 2")]
+req = ProduceRequest(topic, 1, messages)
+kafka.send_message_set(req)
+kafka.close()
+```
diff --git a/kafka/__init__.py b/kafka/__init__.py
index 41038d7..1dcae86 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -9,3 +9,4 @@ from .client import (
Message, ProduceRequest, FetchRequest, OffsetRequest
)
from .codec import gzip_encode, gzip_decode
+from .codec import snappy_encode, snappy_decode
diff --git a/kafka/client.py b/kafka/client.py
index 9d23568..212f5c3 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -5,6 +5,7 @@ import struct
import zlib
from .codec import gzip_encode, gzip_decode
+from .codec import snappy_encode, snappy_decode
log = logging.getLogger("kafka")
@@ -340,7 +341,10 @@ class KafkaClient(object):
yield msg
elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 2:
# Snappy encoded Message
- raise NotImplementedError("Snappy codec is not yet supported")
+ snp = snappy_decode(payload)
+ (msgs, _) = cls.read_message_set(snp)
+ for msg in msgs:
+ yield msg
else:
raise RuntimeError("Unsupported compression type: %d" % (att & KafkaClient.ATTRIBUTE_CODEC_MASK))
@@ -437,6 +441,24 @@ class KafkaClient(object):
gzipped = gzip_encode(message_set)
return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x01), zlib.crc32(gzipped), gzipped)
+ @classmethod
+ def create_snappy_message(cls, *payloads):
+ """
+ Create a Snappy encoded Message
+
+ Params
+ ======
+ payloads, list of messages (bytes) to be encoded
+
+ Returns
+ =======
+ A Message tuple
+ """
+ messages = [cls.create_message(payload) for payload in payloads]
+ message_set = cls.encode_message_set(messages)
+ snapped = snappy_encode(message_set)
+ return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x02), zlib.crc32(snapped), snapped)
+
def send_message_set(self, produceRequest):
"""
Send a ProduceRequest
diff --git a/kafka/codec.py b/kafka/codec.py
index 47ab074..83f3c0b 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -4,6 +4,13 @@ import logging
log = logging.getLogger("kafka.codec")
+try:
+ import snappy
+ hasSnappy=True
+except ImportError:
+ log.warn("Snappy codec not available")
+ hasSnappy=False
+
def gzip_encode(payload):
buf = StringIO()
f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6)
@@ -21,3 +28,13 @@ def gzip_decode(payload):
f.close()
buf.close()
return out
+
+def snappy_encode(payload):
+ if not hasSnappy:
+ raise NotImplementedError("Snappy codec not available")
+ return snappy.compress(payload)
+
+def snappy_decode(payload):
+ if not hasSnappy:
+ raise NotImplementedError("Snappy codec not available")
+ return snappy.decompress(payload)
diff --git a/test/integration.py b/test/integration.py
index 232700f..7680682 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -118,32 +118,41 @@ class IntegrationTest(unittest.TestCase):
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1"))
- def test_produce_consume(self):
+ def _test_produce_consume(self, topic, create_func):
# Send two messages and consume them
- message1 = KafkaClient.create_message("testing 1")
- message2 = KafkaClient.create_message("testing 2")
- req = ProduceRequest("test-produce-consume", 0, [message1, message2])
+ message1 = create_func("testing 1")
+ message2 = create_func("testing 2")
+ req = ProduceRequest(topic, 0, [message1, message2])
self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0"))
- self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-0'"))
- req = FetchRequest("test-produce-consume", 0, 0, 1024)
+ self.assertTrue(self.server.wait_for("Created log for '%s'-0" % topic))
+ self.assertTrue(self.server.wait_for("Flushing log '%s-0'" % topic))
+ req = FetchRequest(topic, 0, 0, 1024)
(messages, req) = self.kafka.get_message_set(req)
self.assertEquals(len(messages), 2)
- self.assertEquals(messages[0], message1)
- self.assertEquals(messages[1], message2)
+ self.assertEquals(messages[0].payload, "testing 1")
+ self.assertEquals(messages[1].payload, "testing 2")
# Do the same, but for a different partition
- message3 = KafkaClient.create_message("testing 3")
- message4 = KafkaClient.create_message("testing 4")
- req = ProduceRequest("test-produce-consume", 1, [message3, message4])
+ message3 = create_func("testing 3")
+ message4 = create_func("testing 4")
+ req = ProduceRequest(topic, 1, [message3, message4])
self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1"))
- self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-1'"))
- req = FetchRequest("test-produce-consume", 1, 0, 1024)
+ self.assertTrue(self.server.wait_for("Created log for '%s'-1" % topic))
+ self.assertTrue(self.server.wait_for("Flushing log '%s-1'" % topic))
+ req = FetchRequest(topic, 1, 0, 1024)
(messages, req) = self.kafka.get_message_set(req)
self.assertEquals(len(messages), 2)
- self.assertEquals(messages[0], message3)
- self.assertEquals(messages[1], message4)
+ self.assertEquals(messages[0].payload, "testing 3")
+ self.assertEquals(messages[1].payload, "testing 4")
+
+ def test_produce_consume(self):
+ self._test_produce_consume("test-produce-consume", KafkaClient.create_message)
+
+ def test_produce_consume_snappy(self):
+ self._test_produce_consume("test-produce-consume-snappy", KafkaClient.create_snappy_message)
+
+ def test_produce_consume_gzip(self):
+ self._test_produce_consume("test-produce-consume-gzip", KafkaClient.create_gzip_message)
def test_check_offset(self):
# Produce/consume a message, check that the next offset looks correct
diff --git a/test/unit.py b/test/unit.py
index 7cb1aed..5961509 100644
--- a/test/unit.py
+++ b/test/unit.py
@@ -5,18 +5,20 @@ import unittest
from kafka.client import KafkaClient, ProduceRequest, FetchRequest, length_prefix_message
from kafka.codec import gzip_encode, gzip_decode
+from kafka.codec import snappy_encode, snappy_decode
ITERATIONS = 1000
STRLEN = 100
def random_string():
- return os.urandom(random.randint(0, STRLEN))
+ return os.urandom(random.randint(1, STRLEN))
class TestPackage(unittest.TestCase):
def test_top_level_namespace(self):
import kafka as kafka1
self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient")
self.assertEquals(kafka1.gzip_encode.__name__, "gzip_encode")
+ self.assertEquals(kafka1.snappy_encode.__name__, "snappy_encode")
self.assertEquals(kafka1.client.__name__, "kafka.client")
self.assertEquals(kafka1.codec.__name__, "kafka.codec")
@@ -41,6 +43,9 @@ class TestPackage(unittest.TestCase):
from kafka import gzip_encode as gzip_encode2
self.assertEquals(gzip_encode2.__name__, "gzip_encode")
+ from kafka import snappy_encode as snappy_encode2
+ self.assertEquals(snappy_encode2.__name__, "snappy_encode")
+
class TestMisc(unittest.TestCase):
def test_length_prefix(self):
for i in xrange(ITERATIONS):
@@ -55,6 +60,12 @@ class TestCodec(unittest.TestCase):
s2 = gzip_decode(gzip_encode(s1))
self.assertEquals(s1,s2)
+ def test_snappy(self):
+ for i in xrange(ITERATIONS):
+ s1 = random_string()
+ s2 = snappy_decode(snappy_encode(s1))
+ self.assertEquals(s1,s2)
+
class TestMessage(unittest.TestCase):
def test_create(self):
msg = KafkaClient.create_message("testing")
@@ -75,6 +86,18 @@ class TestMessage(unittest.TestCase):
self.assertEquals(inner.payload, "testing")
self.assertEquals(inner.crc, -386704890)
+ def test_create_snappy(self):
+ msg = KafkaClient.create_snappy_message("testing")
+ self.assertEquals(msg.magic, 1)
+ self.assertEquals(msg.attributes, 2)
+ self.assertEquals(msg.crc, -62350868)
+ (messages, _) = KafkaClient.read_message_set(snappy_decode(msg.payload))
+ inner = messages[0]
+ self.assertEquals(inner.magic, 1)
+ self.assertEquals(inner.attributes, 0)
+ self.assertEquals(inner.payload, "testing")
+ self.assertEquals(inner.crc, -386704890)
+
def test_message_simple(self):
msg = KafkaClient.create_message("testing")
enc = KafkaClient.encode_message(msg)
@@ -110,6 +133,15 @@ class TestMessage(unittest.TestCase):
self.assertEquals(messages[1].payload, "two")
self.assertEquals(messages[2].payload, "three")
+ def test_message_snappy(self):
+ msg = KafkaClient.create_snappy_message("one", "two", "three")
+ enc = KafkaClient.encode_message(msg)
+ (messages, read) = KafkaClient.read_message_set(enc)
+ self.assertEquals(len(messages), 3)
+ self.assertEquals(messages[0].payload, "one")
+ self.assertEquals(messages[1].payload, "two")
+ self.assertEquals(messages[2].payload, "three")
+
def test_message_simple_random(self):
for i in xrange(ITERATIONS):
n = random.randint(0, 10)
@@ -122,7 +154,7 @@ class TestMessage(unittest.TestCase):
def test_message_gzip_random(self):
for i in xrange(ITERATIONS):
- n = random.randint(0, 10)
+ n = random.randint(1, 10)
strings = [random_string() for j in range(n)]
msg = KafkaClient.create_gzip_message(*strings)
enc = KafkaClient.encode_message(msg)
@@ -131,6 +163,17 @@ class TestMessage(unittest.TestCase):
for j in range(n):
self.assertEquals(messages[j].payload, strings[j])
+ def test_message_snappy_random(self):
+ for i in xrange(ITERATIONS):
+ n = random.randint(1, 10)
+ strings = [random_string() for j in range(n)]
+ msg = KafkaClient.create_snappy_message(*strings)
+ enc = KafkaClient.encode_message(msg)
+ (messages, read) = KafkaClient.read_message_set(enc)
+ self.assertEquals(len(messages), n)
+ for j in range(n):
+ self.assertEquals(messages[j].payload, strings[j])
+
class TestRequests(unittest.TestCase):
def test_produce_request(self):
req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")])