summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-09 13:56:49 -0800
committerDana Powers <dana.powers@rd.io>2015-12-09 14:00:10 -0800
commit1856063f4e0c36a8ec6266358d82432adf879170 (patch)
tree12412911fc9273ceedd45bcd9fc37001cffa7003
parent035b936d7cf62dd5c08d0f3a317431e7ea81297d (diff)
downloadkafka-python-1856063f4e0c36a8ec6266358d82432adf879170.tar.gz
Fix _mp_consume queue variable name conflict
-rw-r--r--kafka/consumer/multiprocess.py4
1 files changed, 2 insertions, 2 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 18a5014..d0e2920 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -25,7 +25,7 @@ log = logging.getLogger(__name__)
Events = namedtuple("Events", ["start", "pause", "exit"])
-def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
+def _mp_consume(client, group, topic, message_queue, size, events, **consumer_options):
"""
A child process worker which consumes messages based on the
notifications given by the controller process
@@ -69,7 +69,7 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
if message:
while True:
try:
- queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
+ message_queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
break
except queue.Full:
if events.exit.is_set(): break