diff options
author | David Arthur <mumrah@gmail.com> | 2012-11-16 15:02:12 -0500 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2012-11-16 15:02:49 -0500 |
commit | 26414d4a993b6d0fd9074b2e7c4abc4230ae52eb (patch) | |
tree | 79787ce084071488c92760b91570bd5920786f35 /test/integration.py | |
parent | 03abf98c0936a1d48af077ba9c41a11cd074ec5b (diff) | |
download | kafka-python-26414d4a993b6d0fd9074b2e7c4abc4230ae52eb.tar.gz |
Add Snappy support0.1-alpha
Fixes #2
Diffstat (limited to 'test/integration.py')
-rw-r--r-- | test/integration.py | 43 |
1 files changed, 26 insertions, 17 deletions
diff --git a/test/integration.py b/test/integration.py index 232700f..7680682 100644 --- a/test/integration.py +++ b/test/integration.py @@ -118,32 +118,41 @@ class IntegrationTest(unittest.TestCase): self.kafka.send_message_set(req) self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1")) - def test_produce_consume(self): + def _test_produce_consume(self, topic, create_func): # Send two messages and consume them - message1 = KafkaClient.create_message("testing 1") - message2 = KafkaClient.create_message("testing 2") - req = ProduceRequest("test-produce-consume", 0, [message1, message2]) + message1 = create_func("testing 1") + message2 = create_func("testing 2") + req = ProduceRequest(topic, 0, [message1, message2]) self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0")) - self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-0'")) - req = FetchRequest("test-produce-consume", 0, 0, 1024) + self.assertTrue(self.server.wait_for("Created log for '%s'-0" % topic)) + self.assertTrue(self.server.wait_for("Flushing log '%s-0'" % topic)) + req = FetchRequest(topic, 0, 0, 1024) (messages, req) = self.kafka.get_message_set(req) self.assertEquals(len(messages), 2) - self.assertEquals(messages[0], message1) - self.assertEquals(messages[1], message2) + self.assertEquals(messages[0].payload, "testing 1") + self.assertEquals(messages[1].payload, "testing 2") # Do the same, but for a different partition - message3 = KafkaClient.create_message("testing 3") - message4 = KafkaClient.create_message("testing 4") - req = ProduceRequest("test-produce-consume", 1, [message3, message4]) + message3 = create_func("testing 3") + message4 = create_func("testing 4") + req = ProduceRequest(topic, 1, [message3, message4]) self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1")) - self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-1'")) - req = FetchRequest("test-produce-consume", 1, 0, 1024) + self.assertTrue(self.server.wait_for("Created log for '%s'-1" % topic)) + self.assertTrue(self.server.wait_for("Flushing log '%s-1'" % topic)) + req = FetchRequest(topic, 1, 0, 1024) (messages, req) = self.kafka.get_message_set(req) self.assertEquals(len(messages), 2) - self.assertEquals(messages[0], message3) - self.assertEquals(messages[1], message4) + self.assertEquals(messages[0].payload, "testing 3") + self.assertEquals(messages[1].payload, "testing 4") + + def test_produce_consume(self): + self._test_produce_consume("test-produce-consume", KafkaClient.create_message) + + def test_produce_consume_snappy(self): + self._test_produce_consume("test-produce-consume-snappy", KafkaClient.create_snappy_message) + + def test_produce_consume_gzip(self): + self._test_produce_consume("test-produce-consume-gzip", KafkaClient.create_gzip_message) def test_check_offset(self): # Produce/consume a message, check that the next offset looks correct |