summaryrefslogtreecommitdiff
path: root/test/integration.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-03-30 00:28:00 -0400
committerDavid Arthur <mumrah@gmail.com>2013-04-02 20:19:30 -0400
commitb6d98c07b418b16061ae92392947d5dd6958a708 (patch)
treee777fcf3019ef0ddc6c278ef733c487f5b0532c3 /test/integration.py
parent3499e2f6ead76e1c2db6ac754358bd57f9a15268 (diff)
downloadkafka-python-b6d98c07b418b16061ae92392947d5dd6958a708.tar.gz
Big code re-org
Diffstat (limited to 'test/integration.py')
-rw-r--r--test/integration.py65
1 files changed, 43 insertions, 22 deletions
diff --git a/test/integration.py b/test/integration.py
index 0f4d9f1..b7ad056 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -12,7 +12,8 @@ import time
import unittest
from urlparse import urlparse
-from kafka.client import *
+from kafka import *
+from kafka.common import *
def get_open_port():
sock = socket.socket()
@@ -146,7 +147,7 @@ class TestKafkaClient(unittest.TestCase):
def test_produce_many_simple(self):
produce = ProduceRequest("test_produce_many_simple", 0, messages=[
- KafkaProtocol.create_message("Test message %d" % i) for i in range(100)
+ create_message("Test message %d" % i) for i in range(100)
])
for resp in self.client.send_produce_request([produce]):
@@ -172,7 +173,7 @@ class TestKafkaClient(unittest.TestCase):
def test_produce_10k_simple(self):
produce = ProduceRequest("test_produce_10k_simple", 0, messages=[
- KafkaProtocol.create_message("Test message %d" % i) for i in range(10000)
+ create_message("Test message %d" % i) for i in range(10000)
])
for resp in self.client.send_produce_request([produce]):
@@ -183,8 +184,8 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(offset.offsets[0], 10000)
def test_produce_many_gzip(self):
- message1 = KafkaProtocol.create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
- message2 = KafkaProtocol.create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
+ message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
+ message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
produce = ProduceRequest("test_produce_many_gzip", 0, messages=[message1, message2])
@@ -196,8 +197,8 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(offset.offsets[0], 200)
def test_produce_many_snappy(self):
- message1 = KafkaProtocol.create_snappy_message(["Snappy 1 %d" % i for i in range(100)])
- message2 = KafkaProtocol.create_snappy_message(["Snappy 2 %d" % i for i in range(100)])
+ message1 = create_snappy_message(["Snappy 1 %d" % i for i in range(100)])
+ message2 = create_snappy_message(["Snappy 2 %d" % i for i in range(100)])
produce = ProduceRequest("test_produce_many_snappy", 0, messages=[message1, message2])
@@ -209,9 +210,9 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(offset.offsets[0], 200)
def test_produce_mixed(self):
- message1 = KafkaProtocol.create_message("Just a plain message")
- message2 = KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100)])
- message3 = KafkaProtocol.create_snappy_message(["Snappy %d" % i for i in range(100)])
+ message1 = create_message("Just a plain message")
+ message2 = create_gzip_message(["Gzipped %d" % i for i in range(100)])
+ message3 = create_snappy_message(["Snappy %d" % i for i in range(100)])
produce = ProduceRequest("test_produce_mixed", 0, messages=[message1, message2, message3])
@@ -225,7 +226,7 @@ class TestKafkaClient(unittest.TestCase):
def test_produce_100k_gzipped(self):
produce = ProduceRequest("test_produce_100k_gzipped", 0, messages=[
- KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100000)])
+ create_gzip_message(["Gzipped %d" % i for i in range(100000)])
])
for resp in self.client.send_produce_request([produce]):
@@ -252,8 +253,8 @@ class TestKafkaClient(unittest.TestCase):
def test_produce_consume(self):
produce = ProduceRequest("test_produce_consume", 0, messages=[
- KafkaProtocol.create_message("Just a test message"),
- KafkaProtocol.create_message("Message with a key", "foo"),
+ create_message("Just a test message"),
+ create_message("Message with a key", "foo"),
])
for resp in self.client.send_produce_request([produce]):
@@ -276,7 +277,7 @@ class TestKafkaClient(unittest.TestCase):
def test_produce_consume_many(self):
produce = ProduceRequest("test_produce_consume_many", 0, messages=[
- KafkaProtocol.create_message("Test message %d" % i) for i in range(100)
+ create_message("Test message %d" % i) for i in range(100)
])
for resp in self.client.send_produce_request([produce]):
@@ -308,10 +309,10 @@ class TestKafkaClient(unittest.TestCase):
def test_produce_consume_two_partitions(self):
produce1 = ProduceRequest("test_produce_consume_two_partitions", 0, messages=[
- KafkaProtocol.create_message("Partition 0 %d" % i) for i in range(10)
+ create_message("Partition 0 %d" % i) for i in range(10)
])
produce2 = ProduceRequest("test_produce_consume_two_partitions", 1, messages=[
- KafkaProtocol.create_message("Partition 1 %d" % i) for i in range(10)
+ create_message("Partition 1 %d" % i) for i in range(10)
])
for resp in self.client.send_produce_request([produce1, produce2]):
@@ -400,22 +401,25 @@ class TestConsumer(unittest.TestCase):
cls.server2.close()
def test_consumer(self):
+ # Produce 100 messages to partition 0
produce1 = ProduceRequest("test_consumer", 0, messages=[
- KafkaProtocol.create_message("Test message 0 %d" % i) for i in range(100)
- ])
-
- produce2 = ProduceRequest("test_consumer", 1, messages=[
- KafkaProtocol.create_message("Test message 1 %d" % i) for i in range(100)
+ create_message("Test message 0 %d" % i) for i in range(100)
])
for resp in self.client.send_produce_request([produce1]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
+ # Produce 100 messages to partition 1
+ produce2 = ProduceRequest("test_consumer", 1, messages=[
+ create_message("Test message 1 %d" % i) for i in range(100)
+ ])
+
for resp in self.client.send_produce_request([produce2]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
+ # Start a consumer
consumer = SimpleConsumer(self.client, "group1", "test_consumer")
all_messages = []
for message in consumer:
@@ -424,6 +428,23 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(len(all_messages), 200)
self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes
+ # Produce more messages
+ produce3 = ProduceRequest("test_consumer", 1, messages=[
+ create_message("Test message 3 %d" % i) for i in range(10)
+ ])
+
+ for resp in self.client.send_produce_request([produce3]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 100)
+
+ # Start a new consumer, make sure we only get the newly produced messages
+ consumer = SimpleConsumer(self.client, "group1", "test_consumer")
+
+ all_messages = []
+ for message in consumer:
+ all_messages.append(message)
+ self.assertEquals(len(all_messages), 10)
+
if __name__ == "__main__":
- logging.basicConfig(level=logging.INFO)
+ logging.basicConfig(level=logging.DEBUG)
unittest.main()