diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-09 13:56:49 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-09 14:06:00 -0800 |
commit | 58bdeb17d7e337c48ee2c14bf1f73b00eed0e727 (patch) | |
tree | 28acb494be69e5e14e2e286796fd5dfcc336a98f | |
parent | 3a0a8e1ee4c39655ba12900eb6bd6f7901262239 (diff) | |
download | kafka-python-58bdeb17d7e337c48ee2c14bf1f73b00eed0e727.tar.gz |
Fix _mp_consume queue variable name conflict
-rw-r--r-- | kafka/consumer/multiprocess.py | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 18a5014..d0e2920 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -25,7 +25,7 @@ log = logging.getLogger(__name__) Events = namedtuple("Events", ["start", "pause", "exit"]) -def _mp_consume(client, group, topic, queue, size, events, **consumer_options): +def _mp_consume(client, group, topic, message_queue, size, events, **consumer_options): """ A child process worker which consumes messages based on the notifications given by the controller process @@ -69,7 +69,7 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options): if message: while True: try: - queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) + message_queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) break except queue.Full: if events.exit.is_set(): break |