diff options
author | David Arthur <mumrah@gmail.com> | 2012-09-28 20:46:50 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2012-09-28 20:46:50 -0400 |
commit | 478de24d5b4a9c73c2fc969f3fd7cf8ff0710ecb (patch) | |
tree | 44d2490febea4ab53a3f365e6eacc585b2a63ddd /test/integration.py | |
parent | 51d8bbb231b29c006323d0705f8819f592aeb6e7 (diff) | |
download | kafka-python-478de24d5b4a9c73c2fc969f3fd7cf8ff0710ecb.tar.gz |
Adding some integration tests
Diffstat (limited to 'test/integration.py')
-rw-r--r-- | test/integration.py | 45 |
1 files changed, 41 insertions, 4 deletions
diff --git a/test/integration.py b/test/integration.py index 0ce8b66..2779898 100644 --- a/test/integration.py +++ b/test/integration.py @@ -83,13 +83,50 @@ class IntegrationTest(unittest.TestCase): self.kafka = KafkaClient("localhost", port) def test_produce(self): - req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")]) + req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")]) self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'my-topic'-0")) + self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0")) - req = ProduceRequest("my-topic", 1, [KafkaClient.create_message("testing")]) + req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")]) self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'my-topic'-1")) + self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1")) + + def test_produce_consume(self): + message1 = KafkaClient.create_message("testing 1") + message2 = KafkaClient.create_message("testing 2") + req = ProduceRequest("test-produce-consume", 0, [message1, message2]) + self.kafka.send_message_set(req) + self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0")) + time.sleep(1) + req = FetchRequest("test-produce-consume", 0, 0, 1024) + (messages, req) = self.kafka.get_message_set(req) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0], message1) + self.assertEquals(messages[1], message2) + + message3 = KafkaClient.create_message("testing 3") + message4 = KafkaClient.create_message("testing 4") + req = ProduceRequest("test-produce-consume", 1, [message3, message4]) + self.kafka.send_message_set(req) + self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1")) + time.sleep(1) + req = FetchRequest("test-produce-consume", 1, 0, 1024) + (messages, req) = self.kafka.get_message_set(req) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0], message3) + self.assertEquals(messages[1], message4) + + def test_check_offset(self): + message1 = KafkaClient.create_message("testing 1") + req = ProduceRequest("test-check-offset", 0, [message1]) + self.kafka.send_message_set(req) + self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0")) + time.sleep(1) + req = FetchRequest("test-check-offset", 0, 0, 1024) + (messages, req) = self.kafka.get_message_set(req) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0], message1) + assertEquals(req.offset, len(KafkaClient.encode_message(message1))) def tearDown(self): self.kafka.close() |