1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
from copy import copy
import logging
from multiprocessing import Process, Queue, Event
from Queue import Empty
import time
from .client import KafkaClient, FetchRequest, ProduceRequest
log = logging.getLogger("kafka")
class KafkaConsumerProcess(Process):
def __init__(self, client, topic, partition, out_queue, barrier):
self.client = copy(client)
self.topic = topic
self.partition = partition
self.out_queue = out_queue
self.barrier = barrier
self.consumer_sleep = 0.2
Process.__init__(self)
def config(self, consumer_sleep):
self.consumer_sleep = consumer_sleep / 1000.
def run(self):
self.barrier.wait()
log.info("Starting Consumer")
fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize)
while True:
if self.barrier.is_set() == False:
self.client.close()
break
lastOffset = fetchRequest.offset
(messages, fetchRequest) = self.client.get_message_set(fetchRequest)
if fetchRequest.offset == lastOffset:
log.debug("No more data for this partition, sleeping a bit (200ms)")
time.sleep(self.consumer_sleep)
continue
for message in messages:
self.out_queue.put(message)
class KafkaProducerProcess(Process):
def __init__(self, client, topic, in_queue, barrier):
self.client = copy(client)
self.topic = topic
self.in_queue = in_queue
self.barrier = barrier
self.producer_flush_buffer = 100
self.producer_flush_timeout = 2.0
self.producer_timeout = 0.1
Process.__init__(self)
def config(self, producer_flush_buffer, producer_flush_timeout, producer_timeout):
self.producer_flush_buffer = producer_flush_buffer
self.producer_flush_timeout = producer_flush_timeout / 1000.
self.producer_timeout = producer_timeout / 1000.
def run(self):
self.barrier.wait()
log.info("Starting Producer")
messages = []
last_produce = time.time()
def flush(messages):
self.client.send_message_set(ProduceRequest(self.topic, -1, messages))
del messages[:]
while True:
if self.barrier.is_set() == False:
log.info("Producer shut down. Flushing messages")
flush(messages)
self.client.close()
break
if len(messages) > self.producer_flush_buffer:
log.debug("Message count threashold reached. Flushing messages")
flush(messages)
elif (time.time() - last_produce) > self.producer_flush_timeout:
log.debug("Producer timeout reached. Flushing messages")
flush(messages)
last_produce = time.time()
try:
messages.append(KafkaClient.create_message(self.in_queue.get(True, self.producer_timeout)))
except Empty:
continue
class KafkaQueue(object):
def __init__(self, client, topic, partitions):
self.in_queue = Queue()
self.out_queue = Queue()
self.consumers = []
self.barrier = Event()
# Initialize and start consumer threads
for partition in partitions:
consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier)
consumer.config(consumer_sleep=200)
consumer.start()
self.consumers.append(consumer)
# Initialize and start producer thread
self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier)
self.producer.config(producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100)
self.producer.start()
# Trigger everything to start
self.barrier.set()
def get(self, block=True, timeout=None):
return self.in_queue.get(block, timeout).payload
def put(self, msg, block=True, timeout=None):
return self.out_queue.put(msg, block, timeout)
def close(self):
self.in_queue.close()
self.out_queue.close()
self.barrier.clear()
self.producer.join()
for consumer in self.consumers:
consumer.join()
|