summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-03-25 14:35:23 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-03-25 14:35:23 +0300
commitfb118fb75c818a32d0bb81fe725faca0a714b580 (patch)
tree01284283176011a8f003c7eeba4dfc8c9f486f51
parent9d5c93e902f093872c7475bb010f8e4a5b97aa40 (diff)
downloadkafka-python-fb118fb75c818a32d0bb81fe725faca0a714b580.tar.gz
Manageable queue.put() operation for MPConsumer processes
-rw-r--r--kafka/consumer/base.py1
-rw-r--r--kafka/consumer/multiprocess.py15
2 files changed, 12 insertions, 4 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 9cdcf89..efc9404 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -25,6 +25,7 @@ MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
ITER_TIMEOUT_SECONDS = 60
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
+FULL_QUEUE_WAIT_TIME_SECONDS = 0.1
class Consumer(object):
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index a63b090..5ce8b4d 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -7,13 +7,14 @@ from collections import namedtuple
from multiprocessing import Process, Manager as MPManager
try:
- from Queue import Empty
+ from Queue import Empty, Full
except ImportError: # python 2
- from queue import Empty
+ from queue import Empty, Full
from .base import (
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
- NO_MESSAGES_WAIT_TIME_SECONDS
+ NO_MESSAGES_WAIT_TIME_SECONDS,
+ FULL_QUEUE_WAIT_TIME_SECONDS
)
from .simple import Consumer, SimpleConsumer
@@ -59,7 +60,13 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
message = consumer.get_message()
if message:
- queue.put(message)
+ while True:
+ try:
+ queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
+ break
+ except Full:
+ if events.exit.is_set(): break
+
count += 1
# We have reached the required size. The controller might have