summaryrefslogtreecommitdiff
path: root/test/integration.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2012-11-19 10:29:30 -0500
committerDavid Arthur <mumrah@gmail.com>2012-11-19 20:47:27 -0500
commit330ddbca4464f32d31b662eb95b92113e7024323 (patch)
tree3869ac9b6e2b0d8718eb5576a3031b73ad315e15 /test/integration.py
parent0c61f4467a327e38c9c8701ab680e47a209bab7c (diff)
downloadkafka-python-330ddbca4464f32d31b662eb95b92113e7024323.tar.gz
Add a Queue-like producer/consumer
Creates a producer process and one consumer process per partition. Uses `multiprocessing.Queue` for communication between the parent process and the producer/consumers. ```python kafka = KafkaClient("localhost", 9092) q = KafkaQueue(kafka, client="test-queue", partitions=[0,1]) q.put("test") q.get() q.close() kafka.close() ``` Ref #8
Diffstat (limited to 'test/integration.py')
-rw-r--r--test/integration.py26
1 files changed, 26 insertions, 0 deletions
diff --git a/test/integration.py b/test/integration.py
index 7680682..e11b33b 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -12,6 +12,7 @@ import time
import unittest
from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest
+from kafka.queue import KafkaQueue
def get_open_port():
sock = socket.socket()
@@ -231,5 +232,30 @@ class IntegrationTest(unittest.TestCase):
#self.assertTrue(self.server.wait_for("Created log for 'test-10k'-1"))
#self.assertTrue(self.server.wait_for("Flushing log 'test-10k-1'"))
+ def test_queue(self):
+ # Send 1000 messages
+ q = KafkaQueue(self.kafka, "test-queue", [0,1])
+ t1 = time.time()
+ for i in range(1000):
+ q.put("test %d" % i)
+ t2 = time.time()
+
+ # Wait for the producer to fully flush
+ time.sleep(2)
+
+ # Copy all the messages into a list
+ t1 = time.time()
+ consumed = []
+ for i in range(1000):
+ consumed.append(q.get())
+ t2 = time.time()
+
+ # Verify everything is there
+ for i in range(1000):
+ self.assertTrue("test %d" % i in consumed)
+
+ # Shutdown the queue
+ q.close()
+
if __name__ == "__main__":
unittest.main()