summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py3
-rw-r--r--kafka/queue.py119
-rw-r--r--test/integration.py26
3 files changed, 148 insertions, 0 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 212f5c3..c25e2d2 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -73,6 +73,9 @@ class KafkaClient(object):
self._sock.settimeout(10)
log.debug("Connected to %s on %d", host, port)
+ def __copy__(self):
+ return KafkaClient(self.host, self.port, self.bufsize)
+
######################
# Protocol Stuff #
######################
diff --git a/kafka/queue.py b/kafka/queue.py
new file mode 100644
index 0000000..cefb381
--- /dev/null
+++ b/kafka/queue.py
@@ -0,0 +1,119 @@
+from copy import copy
+import logging
+from multiprocessing import Process, Queue, Event
+from Queue import Empty
+import time
+
+from .client import KafkaClient, FetchRequest, ProduceRequest
+
+log = logging.getLogger("kafka")
+
+class KafkaConsumerProcess(Process):
+ def __init__(self, client, topic, partition, out_queue, barrier):
+ self.client = copy(client)
+ self.topic = topic
+ self.partition = partition
+ self.out_queue = out_queue
+ self.barrier = barrier
+ self.consumer_sleep = 0.2
+ Process.__init__(self)
+
+ def config(self, consumer_sleep):
+ self.consumer_sleep = consumer_sleep / 1000.
+
+ def run(self):
+ self.barrier.wait()
+ log.info("Starting Consumer")
+ fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize)
+ while True:
+ if self.barrier.is_set() == False:
+ self.client.close()
+ break
+ lastOffset = fetchRequest.offset
+ (messages, fetchRequest) = self.client.get_message_set(fetchRequest)
+ if fetchRequest.offset == lastOffset:
+ log.debug("No more data for this partition, sleeping a bit (200ms)")
+ time.sleep(self.consumer_sleep)
+ continue
+ for message in messages:
+ self.out_queue.put(message)
+
+class KafkaProducerProcess(Process):
+ def __init__(self, client, topic, in_queue, barrier):
+ self.client = copy(client)
+ self.topic = topic
+ self.in_queue = in_queue
+ self.barrier = barrier
+ self.producer_flush_buffer = 100
+ self.producer_flush_timeout = 2.0
+ self.producer_timeout = 0.1
+ Process.__init__(self)
+
+ def config(self, producer_flush_buffer, producer_flush_timeout, producer_timeout):
+ self.producer_flush_buffer = producer_flush_buffer
+ self.producer_flush_timeout = producer_flush_timeout / 1000.
+ self.producer_timeout = producer_timeout / 1000.
+
+ def run(self):
+ self.barrier.wait()
+ log.info("Starting Producer")
+ messages = []
+ last_produce = time.time()
+
+ def flush(messages):
+ self.client.send_message_set(ProduceRequest(self.topic, -1, messages))
+ del messages[:]
+
+ while True:
+ if self.barrier.is_set() == False:
+ log.info("Producer shut down. Flushing messages")
+ flush(messages)
+ self.client.close()
+ break
+ if len(messages) > self.producer_flush_buffer:
+ log.debug("Message count threashold reached. Flushing messages")
+ flush(messages)
+ elif (time.time() - last_produce) > self.producer_flush_timeout:
+ log.debug("Producer timeout reached. Flushing messages")
+ flush(messages)
+ last_produce = time.time()
+ try:
+ messages.append(KafkaClient.create_message(self.in_queue.get(True, self.producer_timeout)))
+ except Empty:
+ continue
+
+class KafkaQueue(object):
+ def __init__(self, client, topic, partitions):
+ self.in_queue = Queue()
+ self.out_queue = Queue()
+ self.consumers = []
+ self.barrier = Event()
+
+ # Initialize and start consumer threads
+ for partition in partitions:
+ consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier)
+ consumer.config(consumer_sleep=200)
+ consumer.start()
+ self.consumers.append(consumer)
+
+ # Initialize and start producer thread
+ self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier)
+ self.producer.config(producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100)
+ self.producer.start()
+
+ # Trigger everything to start
+ self.barrier.set()
+
+ def get(self, block=True, timeout=None):
+ return self.in_queue.get(block, timeout).payload
+
+ def put(self, msg, block=True, timeout=None):
+ return self.out_queue.put(msg, block, timeout)
+
+ def close(self):
+ self.in_queue.close()
+ self.out_queue.close()
+ self.barrier.clear()
+ self.producer.join()
+ for consumer in self.consumers:
+ consumer.join()
diff --git a/test/integration.py b/test/integration.py
index 7680682..e11b33b 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -12,6 +12,7 @@ import time
import unittest
from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest
+from kafka.queue import KafkaQueue
def get_open_port():
sock = socket.socket()
@@ -231,5 +232,30 @@ class IntegrationTest(unittest.TestCase):
#self.assertTrue(self.server.wait_for("Created log for 'test-10k'-1"))
#self.assertTrue(self.server.wait_for("Flushing log 'test-10k-1'"))
+ def test_queue(self):
+ # Send 1000 messages
+ q = KafkaQueue(self.kafka, "test-queue", [0,1])
+ t1 = time.time()
+ for i in range(1000):
+ q.put("test %d" % i)
+ t2 = time.time()
+
+ # Wait for the producer to fully flush
+ time.sleep(2)
+
+ # Copy all the messages into a list
+ t1 = time.time()
+ consumed = []
+ for i in range(1000):
+ consumed.append(q.get())
+ t2 = time.time()
+
+ # Verify everything is there
+ for i in range(1000):
+ self.assertTrue("test %d" % i in consumed)
+
+ # Shutdown the queue
+ q.close()
+
if __name__ == "__main__":
unittest.main()