diff options
-rw-r--r-- | kafka/consumer/multiprocess.py | 58 |
1 files changed, 32 insertions, 26 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index bec3100..2bb97f3 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -2,6 +2,8 @@ from __future__ import absolute_import import logging import time + +from collections import namedtuple from multiprocessing import Process, Queue as MPQueue, Event, Value try: @@ -15,10 +17,11 @@ from .base import ( ) from .simple import Consumer, SimpleConsumer -log = logging.getLogger("kafka") +Events = namedtuple("Events", ["start", "pause", "exit"]) +log = logging.getLogger("kafka") -def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): +def _mp_consume(client, group, topic, queue, size, events, consumer_options): """ A child process worker which consumes messages based on the notifications given by the controller process @@ -34,20 +37,20 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # We will start consumers without auto-commit. Auto-commit will be # done by the master controller process. consumer = SimpleConsumer(client, group, topic, - partitions=chunk, auto_commit=False, auto_commit_every_n=None, - auto_commit_every_t=None) + auto_commit_every_t=None, + **consumer_options) # Ensure that the consumer provides the partition information consumer.provide_partition_info() while True: # Wait till the controller indicates us to start consumption - start.wait() + events.start.wait() # If we are asked to quit, do so - if exit.is_set(): + if events.exit.is_set(): break # Consume messages and add them to the queue. If the controller @@ -65,7 +68,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # loop consuming all available messages before the controller # can reset the 'start' event if count == size.value: - pause.wait() + events.pause.wait() else: # In case we did not receive any message, give up the CPU for @@ -105,7 +108,8 @@ class MultiProcessConsumer(Consumer): def __init__(self, client, group, topic, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - num_procs=1, partitions_per_proc=0): + num_procs=1, partitions_per_proc=0, + simple_consumer_options=None): # Initiate the base consumer class super(MultiProcessConsumer, self).__init__( @@ -118,9 +122,10 @@ class MultiProcessConsumer(Consumer): # Variables for managing and controlling the data flow from # consumer child process to master self.queue = MPQueue(1024) # Child consumers dump messages into this - self.start = Event() # Indicates the consumers to start fetch - self.exit = Event() # Requests the consumers to shutdown - self.pause = Event() # Requests the consumers to pause fetch + self.events = Events( + start = Event(), # Indicates the consumers to start fetch + exit = Event(), # Requests the consumers to shutdown + pause = Event()) # Requests the consumers to pause fetch self.size = Value('i', 0) # Indicator of number of messages to fetch # dict.keys() returns a view in py3 + it's not a thread-safe operation @@ -143,11 +148,12 @@ class MultiProcessConsumer(Consumer): self.procs = [] for chunk in chunks: - args = (client.copy(), - group, topic, chunk, - self.queue, self.start, self.exit, - self.pause, self.size) + options = {'partitions': list(chunk)} + if simple_consumer_options: + options.update(simple_consumer_options) + args = (client.copy(), group, topic, self.queue, + self.size, self.events, options) proc = Process(target=_mp_consume, args=args) proc.daemon = True proc.start() @@ -159,9 +165,9 @@ class MultiProcessConsumer(Consumer): def stop(self): # Set exit and start off all waiting consumers - self.exit.set() - self.pause.set() - self.start.set() + self.events.exit.set() + self.events.pause.set() + self.events.start.set() for proc in self.procs: proc.join() @@ -176,10 +182,10 @@ class MultiProcessConsumer(Consumer): # Trigger the consumer procs to start off. # We will iterate till there are no more messages available self.size.value = 0 - self.pause.set() + self.events.pause.set() while True: - self.start.set() + self.events.start.set() try: # We will block for a small while so that the consumers get # a chance to run and put some messages in the queue @@ -191,12 +197,12 @@ class MultiProcessConsumer(Consumer): # Count, check and commit messages if necessary self.offsets[partition] = message.offset + 1 - self.start.clear() + self.events.start.clear() self.count_since_commit += 1 self._auto_commit() yield message - self.start.clear() + self.events.start.clear() def get_messages(self, count=1, block=True, timeout=10): """ @@ -216,7 +222,7 @@ class MultiProcessConsumer(Consumer): # necessary, but these will not be committed to kafka. Also, the extra # messages can be provided in subsequent runs self.size.value = count - self.pause.clear() + self.events.pause.clear() if timeout is not None: max_time = time.time() + timeout @@ -228,7 +234,7 @@ class MultiProcessConsumer(Consumer): # go into overdrive and keep consuming thousands of # messages when the user might need only a few if self.queue.empty(): - self.start.set() + self.events.start.set() try: partition, message = self.queue.get(block, timeout) @@ -242,8 +248,8 @@ class MultiProcessConsumer(Consumer): timeout = max_time - time.time() self.size.value = 0 - self.start.clear() - self.pause.set() + self.events.start.clear() + self.events.pause.set() # Update and commit offsets if necessary self.offsets.update(new_offsets) |