diff options
-rw-r--r-- | kafka/client.py | 3 | ||||
-rw-r--r-- | kafka/queue.py | 119 | ||||
-rw-r--r-- | test/integration.py | 26 |
3 files changed, 148 insertions, 0 deletions
diff --git a/kafka/client.py b/kafka/client.py index 212f5c3..c25e2d2 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -73,6 +73,9 @@ class KafkaClient(object): self._sock.settimeout(10) log.debug("Connected to %s on %d", host, port) + def __copy__(self): + return KafkaClient(self.host, self.port, self.bufsize) + ###################### # Protocol Stuff # ###################### diff --git a/kafka/queue.py b/kafka/queue.py new file mode 100644 index 0000000..cefb381 --- /dev/null +++ b/kafka/queue.py @@ -0,0 +1,119 @@ +from copy import copy +import logging +from multiprocessing import Process, Queue, Event +from Queue import Empty +import time + +from .client import KafkaClient, FetchRequest, ProduceRequest + +log = logging.getLogger("kafka") + +class KafkaConsumerProcess(Process): + def __init__(self, client, topic, partition, out_queue, barrier): + self.client = copy(client) + self.topic = topic + self.partition = partition + self.out_queue = out_queue + self.barrier = barrier + self.consumer_sleep = 0.2 + Process.__init__(self) + + def config(self, consumer_sleep): + self.consumer_sleep = consumer_sleep / 1000. + + def run(self): + self.barrier.wait() + log.info("Starting Consumer") + fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize) + while True: + if self.barrier.is_set() == False: + self.client.close() + break + lastOffset = fetchRequest.offset + (messages, fetchRequest) = self.client.get_message_set(fetchRequest) + if fetchRequest.offset == lastOffset: + log.debug("No more data for this partition, sleeping a bit (200ms)") + time.sleep(self.consumer_sleep) + continue + for message in messages: + self.out_queue.put(message) + +class KafkaProducerProcess(Process): + def __init__(self, client, topic, in_queue, barrier): + self.client = copy(client) + self.topic = topic + self.in_queue = in_queue + self.barrier = barrier + self.producer_flush_buffer = 100 + self.producer_flush_timeout = 2.0 + self.producer_timeout = 0.1 + Process.__init__(self) + + def config(self, producer_flush_buffer, producer_flush_timeout, producer_timeout): + self.producer_flush_buffer = producer_flush_buffer + self.producer_flush_timeout = producer_flush_timeout / 1000. + self.producer_timeout = producer_timeout / 1000. + + def run(self): + self.barrier.wait() + log.info("Starting Producer") + messages = [] + last_produce = time.time() + + def flush(messages): + self.client.send_message_set(ProduceRequest(self.topic, -1, messages)) + del messages[:] + + while True: + if self.barrier.is_set() == False: + log.info("Producer shut down. Flushing messages") + flush(messages) + self.client.close() + break + if len(messages) > self.producer_flush_buffer: + log.debug("Message count threashold reached. Flushing messages") + flush(messages) + elif (time.time() - last_produce) > self.producer_flush_timeout: + log.debug("Producer timeout reached. Flushing messages") + flush(messages) + last_produce = time.time() + try: + messages.append(KafkaClient.create_message(self.in_queue.get(True, self.producer_timeout))) + except Empty: + continue + +class KafkaQueue(object): + def __init__(self, client, topic, partitions): + self.in_queue = Queue() + self.out_queue = Queue() + self.consumers = [] + self.barrier = Event() + + # Initialize and start consumer threads + for partition in partitions: + consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier) + consumer.config(consumer_sleep=200) + consumer.start() + self.consumers.append(consumer) + + # Initialize and start producer thread + self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier) + self.producer.config(producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100) + self.producer.start() + + # Trigger everything to start + self.barrier.set() + + def get(self, block=True, timeout=None): + return self.in_queue.get(block, timeout).payload + + def put(self, msg, block=True, timeout=None): + return self.out_queue.put(msg, block, timeout) + + def close(self): + self.in_queue.close() + self.out_queue.close() + self.barrier.clear() + self.producer.join() + for consumer in self.consumers: + consumer.join() diff --git a/test/integration.py b/test/integration.py index 7680682..e11b33b 100644 --- a/test/integration.py +++ b/test/integration.py @@ -12,6 +12,7 @@ import time import unittest from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest +from kafka.queue import KafkaQueue def get_open_port(): sock = socket.socket() @@ -231,5 +232,30 @@ class IntegrationTest(unittest.TestCase): #self.assertTrue(self.server.wait_for("Created log for 'test-10k'-1")) #self.assertTrue(self.server.wait_for("Flushing log 'test-10k-1'")) + def test_queue(self): + # Send 1000 messages + q = KafkaQueue(self.kafka, "test-queue", [0,1]) + t1 = time.time() + for i in range(1000): + q.put("test %d" % i) + t2 = time.time() + + # Wait for the producer to fully flush + time.sleep(2) + + # Copy all the messages into a list + t1 = time.time() + consumed = [] + for i in range(1000): + consumed.append(q.get()) + t2 = time.time() + + # Verify everything is there + for i in range(1000): + self.assertTrue("test %d" % i in consumed) + + # Shutdown the queue + q.close() + if __name__ == "__main__": unittest.main() |