diff options
Diffstat (limited to 'kafka/consumer/multiprocess.py')
-rw-r--r-- | kafka/consumer/multiprocess.py | 15 |
1 files changed, 11 insertions, 4 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index a63b090..5ce8b4d 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -7,13 +7,14 @@ from collections import namedtuple from multiprocessing import Process, Manager as MPManager try: - from Queue import Empty + from Queue import Empty, Full except ImportError: # python 2 - from queue import Empty + from queue import Empty, Full from .base import ( AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL, - NO_MESSAGES_WAIT_TIME_SECONDS + NO_MESSAGES_WAIT_TIME_SECONDS, + FULL_QUEUE_WAIT_TIME_SECONDS ) from .simple import Consumer, SimpleConsumer @@ -59,7 +60,13 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options): message = consumer.get_message() if message: - queue.put(message) + while True: + try: + queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) + break + except Full: + if events.exit.is_set(): break + count += 1 # We have reached the required size. The controller might have |