diff options
-rw-r--r-- | kafka/consumer/base.py | 1 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 85 | ||||
-rw-r--r-- | pylint.rc | 2 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 5 | ||||
-rw-r--r-- | tox.ini | 2 |
5 files changed, 57 insertions, 38 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 9cdcf89..efc9404 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -25,6 +25,7 @@ MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 ITER_TIMEOUT_SECONDS = 60 NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 +FULL_QUEUE_WAIT_TIME_SECONDS = 0.1 class Consumer(object): diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index bec3100..5ce8b4d 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -2,23 +2,27 @@ from __future__ import absolute_import import logging import time -from multiprocessing import Process, Queue as MPQueue, Event, Value + +from collections import namedtuple +from multiprocessing import Process, Manager as MPManager try: - from Queue import Empty + from Queue import Empty, Full except ImportError: # python 2 - from queue import Empty + from queue import Empty, Full from .base import ( AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL, - NO_MESSAGES_WAIT_TIME_SECONDS + NO_MESSAGES_WAIT_TIME_SECONDS, + FULL_QUEUE_WAIT_TIME_SECONDS ) from .simple import Consumer, SimpleConsumer -log = logging.getLogger("kafka") +Events = namedtuple("Events", ["start", "pause", "exit"]) +log = logging.getLogger("kafka") -def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): +def _mp_consume(client, group, topic, queue, size, events, **consumer_options): """ A child process worker which consumes messages based on the notifications given by the controller process @@ -34,20 +38,20 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # We will start consumers without auto-commit. Auto-commit will be # done by the master controller process. consumer = SimpleConsumer(client, group, topic, - partitions=chunk, auto_commit=False, auto_commit_every_n=None, - auto_commit_every_t=None) + auto_commit_every_t=None, + **consumer_options) # Ensure that the consumer provides the partition information consumer.provide_partition_info() while True: # Wait till the controller indicates us to start consumption - start.wait() + events.start.wait() # If we are asked to quit, do so - if exit.is_set(): + if events.exit.is_set(): break # Consume messages and add them to the queue. If the controller @@ -56,7 +60,13 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): message = consumer.get_message() if message: - queue.put(message) + while True: + try: + queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) + break + except Full: + if events.exit.is_set(): break + count += 1 # We have reached the required size. The controller might have @@ -65,7 +75,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # loop consuming all available messages before the controller # can reset the 'start' event if count == size.value: - pause.wait() + events.pause.wait() else: # In case we did not receive any message, give up the CPU for @@ -105,7 +115,8 @@ class MultiProcessConsumer(Consumer): def __init__(self, client, group, topic, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - num_procs=1, partitions_per_proc=0): + num_procs=1, partitions_per_proc=0, + **simple_consumer_options): # Initiate the base consumer class super(MultiProcessConsumer, self).__init__( @@ -117,11 +128,13 @@ class MultiProcessConsumer(Consumer): # Variables for managing and controlling the data flow from # consumer child process to master - self.queue = MPQueue(1024) # Child consumers dump messages into this - self.start = Event() # Indicates the consumers to start fetch - self.exit = Event() # Requests the consumers to shutdown - self.pause = Event() # Requests the consumers to pause fetch - self.size = Value('i', 0) # Indicator of number of messages to fetch + manager = MPManager() + self.queue = manager.Queue(1024) # Child consumers dump messages into this + self.events = Events( + start = manager.Event(), # Indicates the consumers to start fetch + exit = manager.Event(), # Requests the consumers to shutdown + pause = manager.Event()) # Requests the consumers to pause fetch + self.size = manager.Value('i', 0) # Indicator of number of messages to fetch # dict.keys() returns a view in py3 + it's not a thread-safe operation # http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3 @@ -143,12 +156,14 @@ class MultiProcessConsumer(Consumer): self.procs = [] for chunk in chunks: - args = (client.copy(), - group, topic, chunk, - self.queue, self.start, self.exit, - self.pause, self.size) - - proc = Process(target=_mp_consume, args=args) + options = {'partitions': list(chunk)} + if simple_consumer_options: + simple_consumer_options.pop('partitions', None) + options.update(simple_consumer_options) + + args = (client.copy(), group, topic, self.queue, + self.size, self.events) + proc = Process(target=_mp_consume, args=args, kwargs=options) proc.daemon = True proc.start() self.procs.append(proc) @@ -159,9 +174,9 @@ class MultiProcessConsumer(Consumer): def stop(self): # Set exit and start off all waiting consumers - self.exit.set() - self.pause.set() - self.start.set() + self.events.exit.set() + self.events.pause.set() + self.events.start.set() for proc in self.procs: proc.join() @@ -176,10 +191,10 @@ class MultiProcessConsumer(Consumer): # Trigger the consumer procs to start off. # We will iterate till there are no more messages available self.size.value = 0 - self.pause.set() + self.events.pause.set() while True: - self.start.set() + self.events.start.set() try: # We will block for a small while so that the consumers get # a chance to run and put some messages in the queue @@ -191,12 +206,12 @@ class MultiProcessConsumer(Consumer): # Count, check and commit messages if necessary self.offsets[partition] = message.offset + 1 - self.start.clear() + self.events.start.clear() self.count_since_commit += 1 self._auto_commit() yield message - self.start.clear() + self.events.start.clear() def get_messages(self, count=1, block=True, timeout=10): """ @@ -216,7 +231,7 @@ class MultiProcessConsumer(Consumer): # necessary, but these will not be committed to kafka. Also, the extra # messages can be provided in subsequent runs self.size.value = count - self.pause.clear() + self.events.pause.clear() if timeout is not None: max_time = time.time() + timeout @@ -228,7 +243,7 @@ class MultiProcessConsumer(Consumer): # go into overdrive and keep consuming thousands of # messages when the user might need only a few if self.queue.empty(): - self.start.set() + self.events.start.set() try: partition, message = self.queue.get(block, timeout) @@ -242,8 +257,8 @@ class MultiProcessConsumer(Consumer): timeout = max_time - time.time() self.size.value = 0 - self.start.clear() - self.pause.set() + self.events.start.clear() + self.events.pause.set() # Update and commit offsets if necessary self.offsets.update(new_offsets) diff --git a/pylint.rc b/pylint.rc new file mode 100644 index 0000000..1e76d8c --- /dev/null +++ b/pylint.rc @@ -0,0 +1,2 @@ +[TYPECHECK] +ignored-classes=SyncManager diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 9c89190..d3df56a 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -61,7 +61,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): group = kwargs.pop('group', self.id().encode('utf-8')) topic = kwargs.pop('topic', self.topic) - if consumer_class == SimpleConsumer: + if consumer_class in [SimpleConsumer, MultiProcessConsumer]: kwargs.setdefault('iter_timeout', 0) return consumer_class(self.client, group, topic, **kwargs) @@ -243,7 +243,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) - consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False) + consumer = MultiProcessConsumer(self.client, "group1", self.topic, + auto_commit=False, iter_timeout=0) self.assertEqual(consumer.pending(), 20) self.assertEqual(consumer.pending(partitions=[0]), 10) @@ -37,7 +37,7 @@ deps = unittest2 mock pylint -commands = pylint {posargs: -E kafka test} +commands = pylint --rcfile=pylint.rc {posargs: -E kafka test} [testenv:docs] deps = |