1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
|
from __future__ import absolute_import
from collections import namedtuple
import logging
from multiprocessing import Process, Manager as MPManager
import time
from six.moves import queue
from ..common import KafkaError
from .base import (
Consumer,
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
NO_MESSAGES_WAIT_TIME_SECONDS,
FULL_QUEUE_WAIT_TIME_SECONDS,
MAX_BACKOFF_SECONDS,
)
from .simple import SimpleConsumer
log = logging.getLogger(__name__)
Events = namedtuple("Events", ["start", "pause", "exit"])
def _mp_consume(client, group, topic, message_queue, size, events, **consumer_options):
"""
A child process worker which consumes messages based on the
notifications given by the controller process
NOTE: Ideally, this should have been a method inside the Consumer
class. However, multiprocessing module has issues in windows. The
functionality breaks unless this function is kept outside of a class
"""
# Initial interval for retries in seconds.
interval = 1
while not events.exit.is_set():
try:
# Make the child processes open separate socket connections
client.reinit()
# We will start consumers without auto-commit. Auto-commit will be
# done by the master controller process.
consumer = SimpleConsumer(client, group, topic,
auto_commit=False,
auto_commit_every_n=None,
auto_commit_every_t=None,
**consumer_options)
# Ensure that the consumer provides the partition information
consumer.provide_partition_info()
while True:
# Wait till the controller indicates us to start consumption
events.start.wait()
# If we are asked to quit, do so
if events.exit.is_set():
break
# Consume messages and add them to the queue. If the controller
# indicates a specific number of messages, follow that advice
count = 0
message = consumer.get_message()
if message:
while True:
try:
message_queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
break
except queue.Full:
if events.exit.is_set(): break
count += 1
# We have reached the required size. The controller might have
# more than what he needs. Wait for a while.
# Without this logic, it is possible that we run into a big
# loop consuming all available messages before the controller
# can reset the 'start' event
if count == size.value:
events.pause.wait()
else:
# In case we did not receive any message, give up the CPU for
# a while before we try again
time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
consumer.stop()
except KafkaError as e:
# Retry with exponential backoff
log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval))
time.sleep(interval)
interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS
class MultiProcessConsumer(Consumer):
"""
A consumer implementation that consumes partitions for a topic in
parallel using multiple processes
Arguments:
client: a connected SimpleClient
group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
topic: the topic to consume
Keyword Arguments:
partitions: An optional list of partitions to consume the data from
auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit
num_procs: Number of processes to start for consuming messages.
The available partitions will be divided among these processes
partitions_per_proc: Number of partitions to be allocated per process
(overrides num_procs)
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
reset one another when one is triggered. These triggers simply call the
commit method on this class. A manual call to commit will also reset
these triggers
"""
def __init__(self, client, group, topic,
partitions=None,
auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
num_procs=1,
partitions_per_proc=0,
**simple_consumer_options):
# Initiate the base consumer class
super(MultiProcessConsumer, self).__init__(
client, group, topic,
partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
# Variables for managing and controlling the data flow from
# consumer child process to master
manager = MPManager()
self.queue = manager.Queue(1024) # Child consumers dump messages into this
self.events = Events(
start = manager.Event(), # Indicates the consumers to start fetch
exit = manager.Event(), # Requests the consumers to shutdown
pause = manager.Event()) # Requests the consumers to pause fetch
self.size = manager.Value('i', 0) # Indicator of number of messages to fetch
# dict.keys() returns a view in py3 + it's not a thread-safe operation
# http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3
# It's safer to copy dict as it only runs during the init.
partitions = list(self.offsets.copy().keys())
# By default, start one consumer process for all partitions
# The logic below ensures that
# * we do not cross the num_procs limit
# * we have an even distribution of partitions among processes
if partitions_per_proc:
num_procs = len(partitions) / partitions_per_proc
if num_procs * partitions_per_proc < len(partitions):
num_procs += 1
# The final set of chunks
chunks = [partitions[proc::num_procs] for proc in range(num_procs)]
self.procs = []
for chunk in chunks:
options = {'partitions': list(chunk)}
if simple_consumer_options:
simple_consumer_options.pop('partitions', None)
options.update(simple_consumer_options)
args = (client.copy(), self.group, self.topic, self.queue,
self.size, self.events)
proc = Process(target=_mp_consume, args=args, kwargs=options)
proc.daemon = True
proc.start()
self.procs.append(proc)
def __repr__(self):
return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \
(self.group, self.topic, len(self.procs))
def stop(self):
# Set exit and start off all waiting consumers
self.events.exit.set()
self.events.pause.set()
self.events.start.set()
for proc in self.procs:
proc.join()
proc.terminate()
super(MultiProcessConsumer, self).stop()
def __iter__(self):
"""
Iterator to consume the messages available on this consumer
"""
# Trigger the consumer procs to start off.
# We will iterate till there are no more messages available
self.size.value = 0
self.events.pause.set()
while True:
self.events.start.set()
try:
# We will block for a small while so that the consumers get
# a chance to run and put some messages in the queue
# TODO: This is a hack and will make the consumer block for
# at least one second. Need to find a better way of doing this
partition, message = self.queue.get(block=True, timeout=1)
except queue.Empty:
break
# Count, check and commit messages if necessary
self.offsets[partition] = message.offset + 1
self.events.start.clear()
self.count_since_commit += 1
self._auto_commit()
yield message
self.events.start.clear()
def get_messages(self, count=1, block=True, timeout=10):
"""
Fetch the specified number of messages
Keyword Arguments:
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till all messages are fetched.
If block is a positive integer the API will block until that
many messages are fetched.
timeout: When blocking is requested the function will block for
the specified time (in seconds) until count messages is
fetched. If None, it will block forever.
"""
messages = []
# Give a size hint to the consumers. Each consumer process will fetch
# a maximum of "count" messages. This will fetch more messages than
# necessary, but these will not be committed to kafka. Also, the extra
# messages can be provided in subsequent runs
self.size.value = count
self.events.pause.clear()
if timeout is not None:
max_time = time.time() + timeout
new_offsets = {}
while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
# go into overdrive and keep consuming thousands of
# messages when the user might need only a few
if self.queue.empty():
self.events.start.set()
block_next_call = block is True or block > len(messages)
try:
partition, message = self.queue.get(block_next_call,
timeout)
except queue.Empty:
break
_msg = (partition, message) if self.partition_info else message
messages.append(_msg)
new_offsets[partition] = message.offset + 1
count -= 1
if timeout is not None:
timeout = max_time - time.time()
self.size.value = 0
self.events.start.clear()
self.events.pause.set()
# Update and commit offsets if necessary
self.offsets.update(new_offsets)
self.count_since_commit += len(messages)
self._auto_commit()
return messages
|