diff options
| -rw-r--r-- | kafka/queue.py | 215 | 
1 files changed, 0 insertions, 215 deletions
| diff --git a/kafka/queue.py b/kafka/queue.py deleted file mode 100644 index 26cafad..0000000 --- a/kafka/queue.py +++ /dev/null @@ -1,215 +0,0 @@ -from __future__ import absolute_import - -from copy import copy -import logging -from multiprocessing import Process, Queue, Event -from Queue import Empty -import time - -from kafka.client import KafkaClient, FetchRequest, ProduceRequest - -log = logging.getLogger("kafka") - -raise NotImplementedError("Still need to refactor this class") - - -class KafkaConsumerProcess(Process): -    def __init__(self, client, topic, partition, out_queue, barrier, -                 consumer_fetch_size=1024, consumer_sleep=200): -        self.client = copy(client) -        self.topic = topic -        self.partition = partition -        self.out_queue = out_queue -        self.barrier = barrier -        self.consumer_fetch_size = consumer_fetch_size -        self.consumer_sleep = consumer_sleep / 1000. -        log.info("Initializing %s" % self) -        Process.__init__(self) - -    def __str__(self): -        return "[KafkaConsumerProcess: topic=%s, \ -            partition=%s, sleep=%s]" % \ -            (self.topic, self.partition, self.consumer_sleep) - -    def run(self): -        self.barrier.wait() -        log.info("Starting %s" % self) -        fetchRequest = FetchRequest(self.topic, self.partition, -                                    offset=0, size=self.consumer_fetch_size) - -        while True: -            if self.barrier.is_set() is False: -                log.info("Shutdown %s" % self) -                self.client.close() -                break - -            lastOffset = fetchRequest.offset -            (messages, fetchRequest) = self.client.get_message_set(fetchRequest) - -            if fetchRequest.offset == lastOffset: -                log.debug("No more data for this partition, " -                          "sleeping a bit (200ms)") -                time.sleep(self.consumer_sleep) -                continue - -            for message in messages: -                self.out_queue.put(message) - - -class KafkaProducerProcess(Process): -    def __init__(self, client, topic, in_queue, barrier, -                 producer_flush_buffer=500, -                 producer_flush_timeout=2000, -                 producer_timeout=100): - -        self.client = copy(client) -        self.topic = topic -        self.in_queue = in_queue -        self.barrier = barrier -        self.producer_flush_buffer = producer_flush_buffer -        self.producer_flush_timeout = producer_flush_timeout / 1000. -        self.producer_timeout = producer_timeout / 1000. -        log.info("Initializing %s" % self) -        Process.__init__(self) - -    def __str__(self): -        return "[KafkaProducerProcess: topic=%s, \ -            flush_buffer=%s, flush_timeout=%s, timeout=%s]" % \ -            (self.topic, -                self.producer_flush_buffer, -                self.producer_flush_timeout, -                self.producer_timeout) - -    def run(self): -        self.barrier.wait() -        log.info("Starting %s" % self) -        messages = [] -        last_produce = time.time() - -        def flush(messages): -            self.client.send_message_set(ProduceRequest(self.topic, -1, -                                                        messages)) -            del messages[:] - -        while True: -            if self.barrier.is_set() is False: -                log.info("Shutdown %s, flushing messages" % self) -                flush(messages) -                self.client.close() -                break - -            if len(messages) > self.producer_flush_buffer: -                log.debug("Message count threshold reached. Flushing messages") -                flush(messages) -                last_produce = time.time() - -            elif (time.time() - last_produce) > self.producer_flush_timeout: -                log.debug("Producer timeout reached. Flushing messages") -                flush(messages) -                last_produce = time.time() - -            try: -                msg = KafkaClient.create_message( -                    self.in_queue.get(True, self.producer_timeout)) -                messages.append(msg) - -            except Empty: -                continue - - -class KafkaQueue(object): -    def __init__(self, client, topic, partitions, -                 producer_config=None, consumer_config=None): -        """ -        KafkaQueue a Queue-like object backed by a Kafka producer and some -        number of consumers - -        Messages are eagerly loaded by the consumer in batches of size -        consumer_fetch_size. -        Messages are buffered in the producer thread until -        producer_flush_timeout or producer_flush_buffer is reached. - -        Arguments: -            client: KafkaClient object -            topic: str, the topic name -            partitions: list of ints, the partions to consume from -            producer_config: dict, see below -            consumer_config: dict, see below - -        Consumer Config -        =============== -        consumer_fetch_size: int, number of bytes to fetch in one call -                             to Kafka. Default is 1024 -        consumer_sleep: int, time in milliseconds a consumer should sleep -                        when it reaches the end of a partition. Default is 200 - -        Producer Config -        =============== -        producer_timeout: int, time in milliseconds a producer should -                          wait for messages to enqueue for producing. -                          Default is 100 -        producer_flush_timeout: int, time in milliseconds a producer -                                should allow messages to accumulate before -                                sending to Kafka. Default is 2000 -        producer_flush_buffer: int, number of messages a producer should -                               allow to accumulate. Default is 500 - -        """ -        producer_config = {} if producer_config is None else producer_config -        consumer_config = {} if consumer_config is None else consumer_config - -        self.in_queue = Queue() -        self.out_queue = Queue() -        self.consumers = [] -        self.barrier = Event() - -        # Initialize and start consumer threads -        for partition in partitions: -            consumer = KafkaConsumerProcess(client, topic, partition, -                                            self.in_queue, self.barrier, -                                            **consumer_config) -            consumer.start() -            self.consumers.append(consumer) - -        # Initialize and start producer thread -        self.producer = KafkaProducerProcess(client, topic, self.out_queue, -                                             self.barrier, **producer_config) -        self.producer.start() - -        # Trigger everything to start -        self.barrier.set() - -    def get(self, block=True, timeout=None): -        """ -        Consume a message from Kafka - -        Arguments: -            block: boolean, default True -            timeout: int, number of seconds to wait when blocking, default None - -        Returns: -            msg: str, the payload from Kafka -        """ -        return self.in_queue.get(block, timeout).payload - -    def put(self, msg, block=True, timeout=None): -        """ -        Send a message to Kafka - -        Arguments: -            msg: std, the message to send -            block: boolean, default True -            timeout: int, number of seconds to wait when blocking, default None -        """ -        self.out_queue.put(msg, block, timeout) - -    def close(self): -        """ -        Close the internal queues and Kafka consumers/producer -        """ -        self.in_queue.close() -        self.out_queue.close() -        self.barrier.clear() -        self.producer.join() -        for consumer in self.consumers: -            consumer.join() | 
