summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py30
1 files changed, 20 insertions, 10 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index ca66e87..1fa51a2 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -1,7 +1,9 @@
from itertools import izip_longest, repeat
import logging
+import time
from threading import Lock
from multiprocessing import Process, Queue, Event, Value
+from Queue import Empty
from kafka.common import (
ErrorMapping, FetchRequest,
@@ -412,14 +414,14 @@ class MultiProcessConsumer(Consumer):
# Initiate the base consumer class
super(MultiProcessConsumer, self).__init__(client, group, topic,
- partitions=partitions,
+ partitions=None,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
# Variables for managing and controlling the data flow from
# consumer child process to master
- self.queue = Queue() # Child consumers dump messages into this
+ self.queue = Queue(1024) # Child consumers dump messages into this
self.start = Event() # Indicates the consumers to start fetch
self.exit = Event() # Requests the consumers to shutdown
self.pause = Event() # Requests the consumers to pause fetch
@@ -441,9 +443,11 @@ class MultiProcessConsumer(Consumer):
self.procs = []
for chunk in chunks:
- proc = Process(target=_self._consume, args=(chunk,))
+ chunk = filter(lambda x: x is not None, list(chunk))
+ proc = Process(target=self._consume, args=(chunk,))
proc.daemon = True
proc.start()
+ time.sleep(0.2)
self.procs.append(proc)
def _consume(self, partitions):
@@ -468,7 +472,7 @@ class MultiProcessConsumer(Consumer):
self.start.wait()
# If we are asked to quit, do so
- if self.exit.isSet():
+ if self.exit.is_set():
break
# Consume messages and add them to the queue. If the controller
@@ -488,6 +492,11 @@ class MultiProcessConsumer(Consumer):
self.pause.wait()
break
+ # In case we did not receive any message, give up the CPU for
+ # a while before we try again
+ if count == 0:
+ time.sleep(0.1)
+
consumer.stop()
def stop(self):
@@ -507,21 +516,22 @@ class MultiProcessConsumer(Consumer):
# Trigger the consumer procs to start off.
# We will iterate till there are no more messages available
self.size.value = 0
- self.start.set()
self.pause.set()
while True:
+ self.start.set()
try:
# We will block for a small while so that the consumers get
# a chance to run and put some messages in the queue
- partition, message = self.queue.get(block=True, timeout=0.1)
- except Queue.Empty:
+ partition, message = self.queue.get(block=True, timeout=1)
+ except Empty:
break
- yield message
-
# Count, check and commit messages if necessary
self.offsets[partition] = message.offset
+ self.start.clear()
+ yield message
+
self.count_since_commit += 1
self._auto_commit()
@@ -555,7 +565,7 @@ class MultiProcessConsumer(Consumer):
try:
partition, message = self.queue.get(block, timeout)
- except Queue.Empty:
+ except Empty:
break
messages.append(message)