diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-25 18:17:39 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-25 18:17:39 +0530 |
commit | 65c8eb1f9f3a309f924cf469abb16af98bbe5d6d (patch) | |
tree | b116b9c287b7e6ef49ef93bf5ffb11178ee1f6eb /kafka/consumer.py | |
parent | 99da57f98a65a457481dcf5c1edcca95dfd464a5 (diff) | |
download | kafka-python-65c8eb1f9f3a309f924cf469abb16af98bbe5d6d.tar.gz |
Got MultiProcessConsumer working
Other changes
* Put a message size restriction on the shared queue
- to prevent message overload
* Wait for a while after each process is started (in constructor)
* Wait for a while in each child if the consumer does not return any messages
Just to be nice to the CPU.
* Control the start event more granularly - this prevents infinite loops
if the control does not return to the generator. For eg:
for msg in consumer:
assert False
* Update message status before yield
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 30 |
1 files changed, 20 insertions, 10 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index ca66e87..1fa51a2 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,7 +1,9 @@ from itertools import izip_longest, repeat import logging +import time from threading import Lock from multiprocessing import Process, Queue, Event, Value +from Queue import Empty from kafka.common import ( ErrorMapping, FetchRequest, @@ -412,14 +414,14 @@ class MultiProcessConsumer(Consumer): # Initiate the base consumer class super(MultiProcessConsumer, self).__init__(client, group, topic, - partitions=partitions, + partitions=None, auto_commit=auto_commit, auto_commit_every_n=auto_commit_every_n, auto_commit_every_t=auto_commit_every_t) # Variables for managing and controlling the data flow from # consumer child process to master - self.queue = Queue() # Child consumers dump messages into this + self.queue = Queue(1024) # Child consumers dump messages into this self.start = Event() # Indicates the consumers to start fetch self.exit = Event() # Requests the consumers to shutdown self.pause = Event() # Requests the consumers to pause fetch @@ -441,9 +443,11 @@ class MultiProcessConsumer(Consumer): self.procs = [] for chunk in chunks: - proc = Process(target=_self._consume, args=(chunk,)) + chunk = filter(lambda x: x is not None, list(chunk)) + proc = Process(target=self._consume, args=(chunk,)) proc.daemon = True proc.start() + time.sleep(0.2) self.procs.append(proc) def _consume(self, partitions): @@ -468,7 +472,7 @@ class MultiProcessConsumer(Consumer): self.start.wait() # If we are asked to quit, do so - if self.exit.isSet(): + if self.exit.is_set(): break # Consume messages and add them to the queue. If the controller @@ -488,6 +492,11 @@ class MultiProcessConsumer(Consumer): self.pause.wait() break + # In case we did not receive any message, give up the CPU for + # a while before we try again + if count == 0: + time.sleep(0.1) + consumer.stop() def stop(self): @@ -507,21 +516,22 @@ class MultiProcessConsumer(Consumer): # Trigger the consumer procs to start off. # We will iterate till there are no more messages available self.size.value = 0 - self.start.set() self.pause.set() while True: + self.start.set() try: # We will block for a small while so that the consumers get # a chance to run and put some messages in the queue - partition, message = self.queue.get(block=True, timeout=0.1) - except Queue.Empty: + partition, message = self.queue.get(block=True, timeout=1) + except Empty: break - yield message - # Count, check and commit messages if necessary self.offsets[partition] = message.offset + self.start.clear() + yield message + self.count_since_commit += 1 self._auto_commit() @@ -555,7 +565,7 @@ class MultiProcessConsumer(Consumer): try: partition, message = self.queue.get(block, timeout) - except Queue.Empty: + except Empty: break messages.append(message) |