summaryrefslogtreecommitdiff
path: root/kafka/queue.py
blob: cefb381cf5c4f6e0cbef37d127ad93d6147ac191 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from copy import copy
import logging
from multiprocessing import Process, Queue, Event
from Queue import Empty
import time

from .client import KafkaClient, FetchRequest, ProduceRequest

log = logging.getLogger("kafka")

class KafkaConsumerProcess(Process):
    def __init__(self, client, topic, partition, out_queue, barrier):
        self.client = copy(client)
        self.topic = topic
        self.partition = partition
        self.out_queue = out_queue
        self.barrier = barrier
        self.consumer_sleep = 0.2
        Process.__init__(self)

    def config(self, consumer_sleep):
        self.consumer_sleep = consumer_sleep / 1000.

    def run(self):
        self.barrier.wait()
        log.info("Starting Consumer")
        fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize)
        while True:
            if self.barrier.is_set() == False:
                self.client.close()
                break
            lastOffset = fetchRequest.offset
            (messages, fetchRequest) = self.client.get_message_set(fetchRequest)
            if fetchRequest.offset == lastOffset:
                log.debug("No more data for this partition, sleeping a bit (200ms)")
                time.sleep(self.consumer_sleep)
                continue
            for message in messages:
                self.out_queue.put(message)

class KafkaProducerProcess(Process):
    def __init__(self, client, topic, in_queue, barrier):
        self.client = copy(client)
        self.topic = topic
        self.in_queue = in_queue
        self.barrier = barrier
        self.producer_flush_buffer = 100
        self.producer_flush_timeout = 2.0
        self.producer_timeout = 0.1
        Process.__init__(self)

    def config(self, producer_flush_buffer, producer_flush_timeout, producer_timeout):
        self.producer_flush_buffer = producer_flush_buffer
        self.producer_flush_timeout = producer_flush_timeout / 1000.
        self.producer_timeout = producer_timeout / 1000.

    def run(self):
        self.barrier.wait()
        log.info("Starting Producer")
        messages = []
        last_produce = time.time()

        def flush(messages):
            self.client.send_message_set(ProduceRequest(self.topic, -1, messages))
            del messages[:]

        while True:
            if self.barrier.is_set() == False:
                log.info("Producer shut down. Flushing messages")
                flush(messages)
                self.client.close()
                break
            if len(messages) > self.producer_flush_buffer:
                log.debug("Message count threashold reached. Flushing messages")
                flush(messages)
            elif (time.time() - last_produce) > self.producer_flush_timeout:
                log.debug("Producer timeout reached. Flushing messages")
                flush(messages)
                last_produce = time.time()
            try:
                messages.append(KafkaClient.create_message(self.in_queue.get(True, self.producer_timeout)))
            except Empty:
                continue

class KafkaQueue(object):
    def __init__(self, client, topic, partitions):
        self.in_queue = Queue()
        self.out_queue = Queue()
        self.consumers = []
        self.barrier = Event()

        # Initialize and start consumer threads
        for partition in partitions:
            consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier)
            consumer.config(consumer_sleep=200)
            consumer.start()
            self.consumers.append(consumer)

        # Initialize and start producer thread
        self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier)
        self.producer.config(producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100)
        self.producer.start()

        # Trigger everything to start
        self.barrier.set()

    def get(self, block=True, timeout=None):
        return self.in_queue.get(block, timeout).payload

    def put(self, msg, block=True, timeout=None):
        return self.out_queue.put(msg, block, timeout)

    def close(self):
        self.in_queue.close()
        self.out_queue.close()
        self.barrier.clear()
        self.producer.join()
        for consumer in self.consumers:
            consumer.join()