diff options
-rw-r--r-- | kafka/consumer/multiprocess.py | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 18a5014..d0e2920 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -25,7 +25,7 @@ log = logging.getLogger(__name__) Events = namedtuple("Events", ["start", "pause", "exit"]) -def _mp_consume(client, group, topic, queue, size, events, **consumer_options): +def _mp_consume(client, group, topic, message_queue, size, events, **consumer_options): """ A child process worker which consumes messages based on the notifications given by the controller process @@ -69,7 +69,7 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options): if message: while True: try: - queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) + message_queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) break except queue.Full: if events.exit.is_set(): break |