summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/multiprocess.py58
1 files changed, 32 insertions, 26 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index bec3100..2bb97f3 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -2,6 +2,8 @@ from __future__ import absolute_import
import logging
import time
+
+from collections import namedtuple
from multiprocessing import Process, Queue as MPQueue, Event, Value
try:
@@ -15,10 +17,11 @@ from .base import (
)
from .simple import Consumer, SimpleConsumer
-log = logging.getLogger("kafka")
+Events = namedtuple("Events", ["start", "pause", "exit"])
+log = logging.getLogger("kafka")
-def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
+def _mp_consume(client, group, topic, queue, size, events, consumer_options):
"""
A child process worker which consumes messages based on the
notifications given by the controller process
@@ -34,20 +37,20 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# We will start consumers without auto-commit. Auto-commit will be
# done by the master controller process.
consumer = SimpleConsumer(client, group, topic,
- partitions=chunk,
auto_commit=False,
auto_commit_every_n=None,
- auto_commit_every_t=None)
+ auto_commit_every_t=None,
+ **consumer_options)
# Ensure that the consumer provides the partition information
consumer.provide_partition_info()
while True:
# Wait till the controller indicates us to start consumption
- start.wait()
+ events.start.wait()
# If we are asked to quit, do so
- if exit.is_set():
+ if events.exit.is_set():
break
# Consume messages and add them to the queue. If the controller
@@ -65,7 +68,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# loop consuming all available messages before the controller
# can reset the 'start' event
if count == size.value:
- pause.wait()
+ events.pause.wait()
else:
# In case we did not receive any message, give up the CPU for
@@ -105,7 +108,8 @@ class MultiProcessConsumer(Consumer):
def __init__(self, client, group, topic, auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- num_procs=1, partitions_per_proc=0):
+ num_procs=1, partitions_per_proc=0,
+ simple_consumer_options=None):
# Initiate the base consumer class
super(MultiProcessConsumer, self).__init__(
@@ -118,9 +122,10 @@ class MultiProcessConsumer(Consumer):
# Variables for managing and controlling the data flow from
# consumer child process to master
self.queue = MPQueue(1024) # Child consumers dump messages into this
- self.start = Event() # Indicates the consumers to start fetch
- self.exit = Event() # Requests the consumers to shutdown
- self.pause = Event() # Requests the consumers to pause fetch
+ self.events = Events(
+ start = Event(), # Indicates the consumers to start fetch
+ exit = Event(), # Requests the consumers to shutdown
+ pause = Event()) # Requests the consumers to pause fetch
self.size = Value('i', 0) # Indicator of number of messages to fetch
# dict.keys() returns a view in py3 + it's not a thread-safe operation
@@ -143,11 +148,12 @@ class MultiProcessConsumer(Consumer):
self.procs = []
for chunk in chunks:
- args = (client.copy(),
- group, topic, chunk,
- self.queue, self.start, self.exit,
- self.pause, self.size)
+ options = {'partitions': list(chunk)}
+ if simple_consumer_options:
+ options.update(simple_consumer_options)
+ args = (client.copy(), group, topic, self.queue,
+ self.size, self.events, options)
proc = Process(target=_mp_consume, args=args)
proc.daemon = True
proc.start()
@@ -159,9 +165,9 @@ class MultiProcessConsumer(Consumer):
def stop(self):
# Set exit and start off all waiting consumers
- self.exit.set()
- self.pause.set()
- self.start.set()
+ self.events.exit.set()
+ self.events.pause.set()
+ self.events.start.set()
for proc in self.procs:
proc.join()
@@ -176,10 +182,10 @@ class MultiProcessConsumer(Consumer):
# Trigger the consumer procs to start off.
# We will iterate till there are no more messages available
self.size.value = 0
- self.pause.set()
+ self.events.pause.set()
while True:
- self.start.set()
+ self.events.start.set()
try:
# We will block for a small while so that the consumers get
# a chance to run and put some messages in the queue
@@ -191,12 +197,12 @@ class MultiProcessConsumer(Consumer):
# Count, check and commit messages if necessary
self.offsets[partition] = message.offset + 1
- self.start.clear()
+ self.events.start.clear()
self.count_since_commit += 1
self._auto_commit()
yield message
- self.start.clear()
+ self.events.start.clear()
def get_messages(self, count=1, block=True, timeout=10):
"""
@@ -216,7 +222,7 @@ class MultiProcessConsumer(Consumer):
# necessary, but these will not be committed to kafka. Also, the extra
# messages can be provided in subsequent runs
self.size.value = count
- self.pause.clear()
+ self.events.pause.clear()
if timeout is not None:
max_time = time.time() + timeout
@@ -228,7 +234,7 @@ class MultiProcessConsumer(Consumer):
# go into overdrive and keep consuming thousands of
# messages when the user might need only a few
if self.queue.empty():
- self.start.set()
+ self.events.start.set()
try:
partition, message = self.queue.get(block, timeout)
@@ -242,8 +248,8 @@ class MultiProcessConsumer(Consumer):
timeout = max_time - time.time()
self.size.value = 0
- self.start.clear()
- self.pause.set()
+ self.events.start.clear()
+ self.events.pause.set()
# Update and commit offsets if necessary
self.offsets.update(new_offsets)