summaryrefslogtreecommitdiff
path: root/test/integration.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2012-11-16 15:02:12 -0500
committerDavid Arthur <mumrah@gmail.com>2012-11-16 15:02:49 -0500
commit26414d4a993b6d0fd9074b2e7c4abc4230ae52eb (patch)
tree79787ce084071488c92760b91570bd5920786f35 /test/integration.py
parent03abf98c0936a1d48af077ba9c41a11cd074ec5b (diff)
downloadkafka-python-26414d4a993b6d0fd9074b2e7c4abc4230ae52eb.tar.gz
Add Snappy support0.1-alpha
Fixes #2
Diffstat (limited to 'test/integration.py')
-rw-r--r--test/integration.py43
1 files changed, 26 insertions, 17 deletions
diff --git a/test/integration.py b/test/integration.py
index 232700f..7680682 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -118,32 +118,41 @@ class IntegrationTest(unittest.TestCase):
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1"))
- def test_produce_consume(self):
+ def _test_produce_consume(self, topic, create_func):
# Send two messages and consume them
- message1 = KafkaClient.create_message("testing 1")
- message2 = KafkaClient.create_message("testing 2")
- req = ProduceRequest("test-produce-consume", 0, [message1, message2])
+ message1 = create_func("testing 1")
+ message2 = create_func("testing 2")
+ req = ProduceRequest(topic, 0, [message1, message2])
self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0"))
- self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-0'"))
- req = FetchRequest("test-produce-consume", 0, 0, 1024)
+ self.assertTrue(self.server.wait_for("Created log for '%s'-0" % topic))
+ self.assertTrue(self.server.wait_for("Flushing log '%s-0'" % topic))
+ req = FetchRequest(topic, 0, 0, 1024)
(messages, req) = self.kafka.get_message_set(req)
self.assertEquals(len(messages), 2)
- self.assertEquals(messages[0], message1)
- self.assertEquals(messages[1], message2)
+ self.assertEquals(messages[0].payload, "testing 1")
+ self.assertEquals(messages[1].payload, "testing 2")
# Do the same, but for a different partition
- message3 = KafkaClient.create_message("testing 3")
- message4 = KafkaClient.create_message("testing 4")
- req = ProduceRequest("test-produce-consume", 1, [message3, message4])
+ message3 = create_func("testing 3")
+ message4 = create_func("testing 4")
+ req = ProduceRequest(topic, 1, [message3, message4])
self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1"))
- self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-1'"))
- req = FetchRequest("test-produce-consume", 1, 0, 1024)
+ self.assertTrue(self.server.wait_for("Created log for '%s'-1" % topic))
+ self.assertTrue(self.server.wait_for("Flushing log '%s-1'" % topic))
+ req = FetchRequest(topic, 1, 0, 1024)
(messages, req) = self.kafka.get_message_set(req)
self.assertEquals(len(messages), 2)
- self.assertEquals(messages[0], message3)
- self.assertEquals(messages[1], message4)
+ self.assertEquals(messages[0].payload, "testing 3")
+ self.assertEquals(messages[1].payload, "testing 4")
+
+ def test_produce_consume(self):
+ self._test_produce_consume("test-produce-consume", KafkaClient.create_message)
+
+ def test_produce_consume_snappy(self):
+ self._test_produce_consume("test-produce-consume-snappy", KafkaClient.create_snappy_message)
+
+ def test_produce_consume_gzip(self):
+ self._test_produce_consume("test-produce-consume-gzip", KafkaClient.create_gzip_message)
def test_check_offset(self):
# Produce/consume a message, check that the next offset looks correct