diff options
author | David Arthur <mumrah@gmail.com> | 2012-11-16 15:02:12 -0500 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2012-11-16 15:02:49 -0500 |
commit | 26414d4a993b6d0fd9074b2e7c4abc4230ae52eb (patch) | |
tree | 79787ce084071488c92760b91570bd5920786f35 | |
parent | 03abf98c0936a1d48af077ba9c41a11cd074ec5b (diff) | |
download | kafka-python-26414d4a993b6d0fd9074b2e7c4abc4230ae52eb.tar.gz |
Add Snappy support0.1-alpha
Fixes #2
-rw-r--r-- | README.md | 33 | ||||
-rw-r--r-- | kafka/__init__.py | 1 | ||||
-rw-r--r-- | kafka/client.py | 24 | ||||
-rw-r--r-- | kafka/codec.py | 17 | ||||
-rw-r--r-- | test/integration.py | 43 | ||||
-rw-r--r-- | test/unit.py | 47 |
6 files changed, 144 insertions, 21 deletions
@@ -38,8 +38,29 @@ cd kafka-python python setup.py install ``` +## Optional Snappy install + +Download and build Snappy from http://code.google.com/p/snappy/downloads/list + +```shell +wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz +tar xzvf snappy-1.0.5.tar.gz +cd snappy-1.0.5 +./configure +make +sudo make install +``` + +Install the `python-snappy` module +```shell +pip install python-snappy +``` + # Tests +Some of the tests will fail if Snappy is not installed. These tests will throw NotImplementedError. If you see other failures, +they might be bugs - so please report them! + ## Run the unit tests ```shell @@ -137,5 +158,15 @@ for msg in kafka.iter_messages(FetchRequest("my-topic", 0, 0, 1024*1024), False) print(msg.payload) kafka.close() ``` - This will only iterate through messages in the byte range of (0, 1024\*1024) + +## Create some compressed messages + +```python +kafka = KafkaClient("localhost", 9092) +messages = [kafka.create_snappy_message("testing 1"), + kafka.create_snappy_message("testing 2")] +req = ProduceRequest(topic, 1, messages) +kafka.send_message_set(req) +kafka.close() +``` diff --git a/kafka/__init__.py b/kafka/__init__.py index 41038d7..1dcae86 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -9,3 +9,4 @@ from .client import ( Message, ProduceRequest, FetchRequest, OffsetRequest ) from .codec import gzip_encode, gzip_decode +from .codec import snappy_encode, snappy_decode diff --git a/kafka/client.py b/kafka/client.py index 9d23568..212f5c3 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -5,6 +5,7 @@ import struct import zlib from .codec import gzip_encode, gzip_decode +from .codec import snappy_encode, snappy_decode log = logging.getLogger("kafka") @@ -340,7 +341,10 @@ class KafkaClient(object): yield msg elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 2: # Snappy encoded Message - raise NotImplementedError("Snappy codec is not yet supported") + snp = snappy_decode(payload) + (msgs, _) = cls.read_message_set(snp) + for msg in msgs: + yield msg else: raise RuntimeError("Unsupported compression type: %d" % (att & KafkaClient.ATTRIBUTE_CODEC_MASK)) @@ -437,6 +441,24 @@ class KafkaClient(object): gzipped = gzip_encode(message_set) return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x01), zlib.crc32(gzipped), gzipped) + @classmethod + def create_snappy_message(cls, *payloads): + """ + Create a Snappy encoded Message + + Params + ====== + payloads, list of messages (bytes) to be encoded + + Returns + ======= + A Message tuple + """ + messages = [cls.create_message(payload) for payload in payloads] + message_set = cls.encode_message_set(messages) + snapped = snappy_encode(message_set) + return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x02), zlib.crc32(snapped), snapped) + def send_message_set(self, produceRequest): """ Send a ProduceRequest diff --git a/kafka/codec.py b/kafka/codec.py index 47ab074..83f3c0b 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -4,6 +4,13 @@ import logging log = logging.getLogger("kafka.codec") +try: + import snappy + hasSnappy=True +except ImportError: + log.warn("Snappy codec not available") + hasSnappy=False + def gzip_encode(payload): buf = StringIO() f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6) @@ -21,3 +28,13 @@ def gzip_decode(payload): f.close() buf.close() return out + +def snappy_encode(payload): + if not hasSnappy: + raise NotImplementedError("Snappy codec not available") + return snappy.compress(payload) + +def snappy_decode(payload): + if not hasSnappy: + raise NotImplementedError("Snappy codec not available") + return snappy.decompress(payload) diff --git a/test/integration.py b/test/integration.py index 232700f..7680682 100644 --- a/test/integration.py +++ b/test/integration.py @@ -118,32 +118,41 @@ class IntegrationTest(unittest.TestCase): self.kafka.send_message_set(req) self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1")) - def test_produce_consume(self): + def _test_produce_consume(self, topic, create_func): # Send two messages and consume them - message1 = KafkaClient.create_message("testing 1") - message2 = KafkaClient.create_message("testing 2") - req = ProduceRequest("test-produce-consume", 0, [message1, message2]) + message1 = create_func("testing 1") + message2 = create_func("testing 2") + req = ProduceRequest(topic, 0, [message1, message2]) self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0")) - self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-0'")) - req = FetchRequest("test-produce-consume", 0, 0, 1024) + self.assertTrue(self.server.wait_for("Created log for '%s'-0" % topic)) + self.assertTrue(self.server.wait_for("Flushing log '%s-0'" % topic)) + req = FetchRequest(topic, 0, 0, 1024) (messages, req) = self.kafka.get_message_set(req) self.assertEquals(len(messages), 2) - self.assertEquals(messages[0], message1) - self.assertEquals(messages[1], message2) + self.assertEquals(messages[0].payload, "testing 1") + self.assertEquals(messages[1].payload, "testing 2") # Do the same, but for a different partition - message3 = KafkaClient.create_message("testing 3") - message4 = KafkaClient.create_message("testing 4") - req = ProduceRequest("test-produce-consume", 1, [message3, message4]) + message3 = create_func("testing 3") + message4 = create_func("testing 4") + req = ProduceRequest(topic, 1, [message3, message4]) self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1")) - self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-1'")) - req = FetchRequest("test-produce-consume", 1, 0, 1024) + self.assertTrue(self.server.wait_for("Created log for '%s'-1" % topic)) + self.assertTrue(self.server.wait_for("Flushing log '%s-1'" % topic)) + req = FetchRequest(topic, 1, 0, 1024) (messages, req) = self.kafka.get_message_set(req) self.assertEquals(len(messages), 2) - self.assertEquals(messages[0], message3) - self.assertEquals(messages[1], message4) + self.assertEquals(messages[0].payload, "testing 3") + self.assertEquals(messages[1].payload, "testing 4") + + def test_produce_consume(self): + self._test_produce_consume("test-produce-consume", KafkaClient.create_message) + + def test_produce_consume_snappy(self): + self._test_produce_consume("test-produce-consume-snappy", KafkaClient.create_snappy_message) + + def test_produce_consume_gzip(self): + self._test_produce_consume("test-produce-consume-gzip", KafkaClient.create_gzip_message) def test_check_offset(self): # Produce/consume a message, check that the next offset looks correct diff --git a/test/unit.py b/test/unit.py index 7cb1aed..5961509 100644 --- a/test/unit.py +++ b/test/unit.py @@ -5,18 +5,20 @@ import unittest from kafka.client import KafkaClient, ProduceRequest, FetchRequest, length_prefix_message from kafka.codec import gzip_encode, gzip_decode +from kafka.codec import snappy_encode, snappy_decode ITERATIONS = 1000 STRLEN = 100 def random_string(): - return os.urandom(random.randint(0, STRLEN)) + return os.urandom(random.randint(1, STRLEN)) class TestPackage(unittest.TestCase): def test_top_level_namespace(self): import kafka as kafka1 self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient") self.assertEquals(kafka1.gzip_encode.__name__, "gzip_encode") + self.assertEquals(kafka1.snappy_encode.__name__, "snappy_encode") self.assertEquals(kafka1.client.__name__, "kafka.client") self.assertEquals(kafka1.codec.__name__, "kafka.codec") @@ -41,6 +43,9 @@ class TestPackage(unittest.TestCase): from kafka import gzip_encode as gzip_encode2 self.assertEquals(gzip_encode2.__name__, "gzip_encode") + from kafka import snappy_encode as snappy_encode2 + self.assertEquals(snappy_encode2.__name__, "snappy_encode") + class TestMisc(unittest.TestCase): def test_length_prefix(self): for i in xrange(ITERATIONS): @@ -55,6 +60,12 @@ class TestCodec(unittest.TestCase): s2 = gzip_decode(gzip_encode(s1)) self.assertEquals(s1,s2) + def test_snappy(self): + for i in xrange(ITERATIONS): + s1 = random_string() + s2 = snappy_decode(snappy_encode(s1)) + self.assertEquals(s1,s2) + class TestMessage(unittest.TestCase): def test_create(self): msg = KafkaClient.create_message("testing") @@ -75,6 +86,18 @@ class TestMessage(unittest.TestCase): self.assertEquals(inner.payload, "testing") self.assertEquals(inner.crc, -386704890) + def test_create_snappy(self): + msg = KafkaClient.create_snappy_message("testing") + self.assertEquals(msg.magic, 1) + self.assertEquals(msg.attributes, 2) + self.assertEquals(msg.crc, -62350868) + (messages, _) = KafkaClient.read_message_set(snappy_decode(msg.payload)) + inner = messages[0] + self.assertEquals(inner.magic, 1) + self.assertEquals(inner.attributes, 0) + self.assertEquals(inner.payload, "testing") + self.assertEquals(inner.crc, -386704890) + def test_message_simple(self): msg = KafkaClient.create_message("testing") enc = KafkaClient.encode_message(msg) @@ -110,6 +133,15 @@ class TestMessage(unittest.TestCase): self.assertEquals(messages[1].payload, "two") self.assertEquals(messages[2].payload, "three") + def test_message_snappy(self): + msg = KafkaClient.create_snappy_message("one", "two", "three") + enc = KafkaClient.encode_message(msg) + (messages, read) = KafkaClient.read_message_set(enc) + self.assertEquals(len(messages), 3) + self.assertEquals(messages[0].payload, "one") + self.assertEquals(messages[1].payload, "two") + self.assertEquals(messages[2].payload, "three") + def test_message_simple_random(self): for i in xrange(ITERATIONS): n = random.randint(0, 10) @@ -122,7 +154,7 @@ class TestMessage(unittest.TestCase): def test_message_gzip_random(self): for i in xrange(ITERATIONS): - n = random.randint(0, 10) + n = random.randint(1, 10) strings = [random_string() for j in range(n)] msg = KafkaClient.create_gzip_message(*strings) enc = KafkaClient.encode_message(msg) @@ -131,6 +163,17 @@ class TestMessage(unittest.TestCase): for j in range(n): self.assertEquals(messages[j].payload, strings[j]) + def test_message_snappy_random(self): + for i in xrange(ITERATIONS): + n = random.randint(1, 10) + strings = [random_string() for j in range(n)] + msg = KafkaClient.create_snappy_message(*strings) + enc = KafkaClient.encode_message(msg) + (messages, read) = KafkaClient.read_message_set(enc) + self.assertEquals(len(messages), n) + for j in range(n): + self.assertEquals(messages[j].payload, strings[j]) + class TestRequests(unittest.TestCase): def test_produce_request(self): req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")]) |