diff options
-rw-r--r-- | kafka/consumer/base.py | 1 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 15 |
2 files changed, 12 insertions, 4 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 9cdcf89..efc9404 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -25,6 +25,7 @@ MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 ITER_TIMEOUT_SECONDS = 60 NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 +FULL_QUEUE_WAIT_TIME_SECONDS = 0.1 class Consumer(object): diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index a63b090..5ce8b4d 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -7,13 +7,14 @@ from collections import namedtuple from multiprocessing import Process, Manager as MPManager try: - from Queue import Empty + from Queue import Empty, Full except ImportError: # python 2 - from queue import Empty + from queue import Empty, Full from .base import ( AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL, - NO_MESSAGES_WAIT_TIME_SECONDS + NO_MESSAGES_WAIT_TIME_SECONDS, + FULL_QUEUE_WAIT_TIME_SECONDS ) from .simple import Consumer, SimpleConsumer @@ -59,7 +60,13 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options): message = consumer.get_message() if message: - queue.put(message) + while True: + try: + queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) + break + except Full: + if events.exit.is_set(): break + count += 1 # We have reached the required size. The controller might have |