diff options
author | David Arthur <mumrah@gmail.com> | 2013-03-30 00:28:00 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-04-02 20:19:30 -0400 |
commit | b6d98c07b418b16061ae92392947d5dd6958a708 (patch) | |
tree | e777fcf3019ef0ddc6c278ef733c487f5b0532c3 /test/integration.py | |
parent | 3499e2f6ead76e1c2db6ac754358bd57f9a15268 (diff) | |
download | kafka-python-b6d98c07b418b16061ae92392947d5dd6958a708.tar.gz |
Big code re-org
Diffstat (limited to 'test/integration.py')
-rw-r--r-- | test/integration.py | 65 |
1 files changed, 43 insertions, 22 deletions
diff --git a/test/integration.py b/test/integration.py index 0f4d9f1..b7ad056 100644 --- a/test/integration.py +++ b/test/integration.py @@ -12,7 +12,8 @@ import time import unittest from urlparse import urlparse -from kafka.client import * +from kafka import * +from kafka.common import * def get_open_port(): sock = socket.socket() @@ -146,7 +147,7 @@ class TestKafkaClient(unittest.TestCase): def test_produce_many_simple(self): produce = ProduceRequest("test_produce_many_simple", 0, messages=[ - KafkaProtocol.create_message("Test message %d" % i) for i in range(100) + create_message("Test message %d" % i) for i in range(100) ]) for resp in self.client.send_produce_request([produce]): @@ -172,7 +173,7 @@ class TestKafkaClient(unittest.TestCase): def test_produce_10k_simple(self): produce = ProduceRequest("test_produce_10k_simple", 0, messages=[ - KafkaProtocol.create_message("Test message %d" % i) for i in range(10000) + create_message("Test message %d" % i) for i in range(10000) ]) for resp in self.client.send_produce_request([produce]): @@ -183,8 +184,8 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(offset.offsets[0], 10000) def test_produce_many_gzip(self): - message1 = KafkaProtocol.create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) - message2 = KafkaProtocol.create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) + message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) + message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) produce = ProduceRequest("test_produce_many_gzip", 0, messages=[message1, message2]) @@ -196,8 +197,8 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(offset.offsets[0], 200) def test_produce_many_snappy(self): - message1 = KafkaProtocol.create_snappy_message(["Snappy 1 %d" % i for i in range(100)]) - message2 = KafkaProtocol.create_snappy_message(["Snappy 2 %d" % i for i in range(100)]) + message1 = create_snappy_message(["Snappy 1 %d" % i for i in range(100)]) + message2 = create_snappy_message(["Snappy 2 %d" % i for i in range(100)]) produce = ProduceRequest("test_produce_many_snappy", 0, messages=[message1, message2]) @@ -209,9 +210,9 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(offset.offsets[0], 200) def test_produce_mixed(self): - message1 = KafkaProtocol.create_message("Just a plain message") - message2 = KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100)]) - message3 = KafkaProtocol.create_snappy_message(["Snappy %d" % i for i in range(100)]) + message1 = create_message("Just a plain message") + message2 = create_gzip_message(["Gzipped %d" % i for i in range(100)]) + message3 = create_snappy_message(["Snappy %d" % i for i in range(100)]) produce = ProduceRequest("test_produce_mixed", 0, messages=[message1, message2, message3]) @@ -225,7 +226,7 @@ class TestKafkaClient(unittest.TestCase): def test_produce_100k_gzipped(self): produce = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ - KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100000)]) + create_gzip_message(["Gzipped %d" % i for i in range(100000)]) ]) for resp in self.client.send_produce_request([produce]): @@ -252,8 +253,8 @@ class TestKafkaClient(unittest.TestCase): def test_produce_consume(self): produce = ProduceRequest("test_produce_consume", 0, messages=[ - KafkaProtocol.create_message("Just a test message"), - KafkaProtocol.create_message("Message with a key", "foo"), + create_message("Just a test message"), + create_message("Message with a key", "foo"), ]) for resp in self.client.send_produce_request([produce]): @@ -276,7 +277,7 @@ class TestKafkaClient(unittest.TestCase): def test_produce_consume_many(self): produce = ProduceRequest("test_produce_consume_many", 0, messages=[ - KafkaProtocol.create_message("Test message %d" % i) for i in range(100) + create_message("Test message %d" % i) for i in range(100) ]) for resp in self.client.send_produce_request([produce]): @@ -308,10 +309,10 @@ class TestKafkaClient(unittest.TestCase): def test_produce_consume_two_partitions(self): produce1 = ProduceRequest("test_produce_consume_two_partitions", 0, messages=[ - KafkaProtocol.create_message("Partition 0 %d" % i) for i in range(10) + create_message("Partition 0 %d" % i) for i in range(10) ]) produce2 = ProduceRequest("test_produce_consume_two_partitions", 1, messages=[ - KafkaProtocol.create_message("Partition 1 %d" % i) for i in range(10) + create_message("Partition 1 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce1, produce2]): @@ -400,22 +401,25 @@ class TestConsumer(unittest.TestCase): cls.server2.close() def test_consumer(self): + # Produce 100 messages to partition 0 produce1 = ProduceRequest("test_consumer", 0, messages=[ - KafkaProtocol.create_message("Test message 0 %d" % i) for i in range(100) - ]) - - produce2 = ProduceRequest("test_consumer", 1, messages=[ - KafkaProtocol.create_message("Test message 1 %d" % i) for i in range(100) + create_message("Test message 0 %d" % i) for i in range(100) ]) for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) + # Produce 100 messages to partition 1 + produce2 = ProduceRequest("test_consumer", 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(100) + ]) + for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) + # Start a consumer consumer = SimpleConsumer(self.client, "group1", "test_consumer") all_messages = [] for message in consumer: @@ -424,6 +428,23 @@ class TestConsumer(unittest.TestCase): self.assertEquals(len(all_messages), 200) self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes + # Produce more messages + produce3 = ProduceRequest("test_consumer", 1, messages=[ + create_message("Test message 3 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce3]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + # Start a new consumer, make sure we only get the newly produced messages + consumer = SimpleConsumer(self.client, "group1", "test_consumer") + + all_messages = [] + for message in consumer: + all_messages.append(message) + self.assertEquals(len(all_messages), 10) + if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.DEBUG) unittest.main() |