summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-09 13:56:49 -0800
committerDana Powers <dana.powers@rd.io>2015-12-09 14:06:00 -0800
commit58bdeb17d7e337c48ee2c14bf1f73b00eed0e727 (patch)
tree28acb494be69e5e14e2e286796fd5dfcc336a98f
parent3a0a8e1ee4c39655ba12900eb6bd6f7901262239 (diff)
downloadkafka-python-58bdeb17d7e337c48ee2c14bf1f73b00eed0e727.tar.gz
Fix _mp_consume queue variable name conflict
-rw-r--r--kafka/consumer/multiprocess.py4
1 files changed, 2 insertions, 2 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 18a5014..d0e2920 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -25,7 +25,7 @@ log = logging.getLogger(__name__)
Events = namedtuple("Events", ["start", "pause", "exit"])
-def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
+def _mp_consume(client, group, topic, message_queue, size, events, **consumer_options):
"""
A child process worker which consumes messages based on the
notifications given by the controller process
@@ -69,7 +69,7 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
if message:
while True:
try:
- queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
+ message_queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
break
except queue.Full:
if events.exit.is_set(): break