summaryrefslogtreecommitdiff
path: root/kafka/queue.py
blob: 6fe9eaa342d4969d3e9d2d65f5ecb148650ccaf3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
from copy import copy
import logging
from multiprocessing import Process, Queue, Event
from Queue import Empty
import time

from .client import KafkaClient, FetchRequest, ProduceRequest

log = logging.getLogger("kafka")

raise NotImplementedError("Still need to refactor this class")

class KafkaConsumerProcess(Process):
    def __init__(self, client, topic, partition, out_queue, barrier, consumer_fetch_size=1024, consumer_sleep=200):
        self.client = copy(client)
        self.topic = topic
        self.partition = partition
        self.out_queue = out_queue
        self.barrier = barrier
        self.consumer_fetch_size = consumer_fetch_size
        self.consumer_sleep = consumer_sleep / 1000.
        log.info("Initializing %s" % self)
        Process.__init__(self)

    def __str__(self):
        return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % (
                self.topic, self.partition, self.consumer_sleep)

    def run(self):
        self.barrier.wait()
        log.info("Starting %s" % self)
        fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.consumer_fetch_size)
        while True:
            if self.barrier.is_set() == False:
                log.info("Shutdown %s" % self)
                self.client.close()
                break
            lastOffset = fetchRequest.offset
            (messages, fetchRequest) = self.client.get_message_set(fetchRequest)
            if fetchRequest.offset == lastOffset:
                log.debug("No more data for this partition, sleeping a bit (200ms)")
                time.sleep(self.consumer_sleep)
                continue
            for message in messages:
                self.out_queue.put(message)

class KafkaProducerProcess(Process):
    def __init__(self, client, topic, in_queue, barrier, producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100):
        self.client = copy(client)
        self.topic = topic
        self.in_queue = in_queue
        self.barrier = barrier
        self.producer_flush_buffer = producer_flush_buffer
        self.producer_flush_timeout = producer_flush_timeout / 1000.
        self.producer_timeout = producer_timeout / 1000.
        log.info("Initializing %s" % self)
        Process.__init__(self)

    def __str__(self):
        return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, flush_timeout=%s, timeout=%s]" % (
                self.topic, self.producer_flush_buffer, self.producer_flush_timeout, self.producer_timeout)

    def run(self):
        self.barrier.wait()
        log.info("Starting %s" % self)
        messages = []
        last_produce = time.time()

        def flush(messages):
            self.client.send_message_set(ProduceRequest(self.topic, -1, messages))
            del messages[:]

        while True:
            if self.barrier.is_set() == False:
                log.info("Shutdown %s, flushing messages" % self)
                flush(messages)
                self.client.close()
                break
            if len(messages) > self.producer_flush_buffer:
                log.debug("Message count threashold reached. Flushing messages")
                flush(messages)
                last_produce = time.time()
            elif (time.time() - last_produce) > self.producer_flush_timeout:
                log.debug("Producer timeout reached. Flushing messages")
                flush(messages)
                last_produce = time.time()
            try:
                messages.append(KafkaClient.create_message(self.in_queue.get(True, self.producer_timeout)))
            except Empty:
                continue

class KafkaQueue(object):
    def __init__(self, client, topic, partitions, producer_config={}, consumer_config={}):
        """
        KafkaQueue a Queue-like object backed by a Kafka producer and some number of consumers

        Messages are eagerly loaded by the consumer in batches of size consumer_fetch_size. 
        Messages are buffered in the producer thread until producer_flush_timeout or 
        producer_flush_buffer is reached.

        Params
        ======
        client: KafkaClient object
        topic: str, the topic name
        partitions: list of ints, the partions to consume from
        producer_config: dict, see below
        consumer_config: dict, see below

        Consumer Config
        ===============
        consumer_fetch_size: int, number of bytes to fetch in one call to Kafka. Default 
            is 1024
        consumer_sleep: int, time in milliseconds a consumer should sleep when it reaches
            the end of a partition. Default is 200

        Producer Config
        ===============
        producer_timeout: int, time in milliseconds a producer should wait for messages to 
            enqueue for producing. Default is 100
        producer_flush_timeout: int, time in milliseconds a producer should allow messages
            to accumulate before sending to Kafka. Default is 2000
        producer_flush_buffer: int, number of messages a producer should allow to accumulate.
            Default is 500
        
        """
        self.in_queue = Queue()
        self.out_queue = Queue()
        self.consumers = []
        self.barrier = Event()

        # Initialize and start consumer threads
        for partition in partitions:
            consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier, **consumer_config)
            consumer.start()
            self.consumers.append(consumer)

        # Initialize and start producer thread
        self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier, **producer_config)
        self.producer.start()

        # Trigger everything to start
        self.barrier.set()

    def get(self, block=True, timeout=None):
        """
        Consume a message from Kafka

        Params
        ======
        block: boolean, default True
        timeout: int, number of seconds to wait when blocking, default None

        Returns
        =======
        msg: str, the payload from Kafka
        """
        return self.in_queue.get(block, timeout).payload

    def put(self, msg, block=True, timeout=None):
        """
        Send a message to Kafka

        Params
        ======
        msg: std, the message to send
        block: boolean, default True
        timeout: int, number of seconds to wait when blocking, default None
        """
        self.out_queue.put(msg, block, timeout)

    def close(self):
        """
        Close the internal queues and Kafka consumers/producer
        """
        self.in_queue.close()
        self.out_queue.close()
        self.barrier.clear()
        self.producer.join()
        for consumer in self.consumers:
            consumer.join()