summaryrefslogtreecommitdiff
path: root/kafka/consumer/multiprocess.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/multiprocess.py')
-rw-r--r--kafka/consumer/multiprocess.py10
1 files changed, 5 insertions, 5 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 5e421d6..18a5014 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -4,9 +4,9 @@ from collections import namedtuple
import logging
from multiprocessing import Process, Manager as MPManager
try:
- from Queue import Empty, Full # python 3
+ import queue # python 3
except ImportError:
- from queue import Empty, Full # python 2
+ import Queue as queue # python 2
import time
from ..common import KafkaError
@@ -71,7 +71,7 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
try:
queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
break
- except Full:
+ except queue.Full:
if events.exit.is_set(): break
count += 1
@@ -220,7 +220,7 @@ class MultiProcessConsumer(Consumer):
# TODO: This is a hack and will make the consumer block for
# at least one second. Need to find a better way of doing this
partition, message = self.queue.get(block=True, timeout=1)
- except Empty:
+ except queue.Empty:
break
# Count, check and commit messages if necessary
@@ -270,7 +270,7 @@ class MultiProcessConsumer(Consumer):
try:
partition, message = self.queue.get(block_next_call,
timeout)
- except Empty:
+ except queue.Empty:
break
_msg = (partition, message) if self.partition_info else message