diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-02 15:00:35 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-02 15:00:35 -0800 |
commit | ee6b9cb5b1310c48a3ed5b66be0dd0c4dd16dc43 (patch) | |
tree | cc7eed3a19c98d664186265cb9bfcdf468f0aac2 | |
parent | cfbdc05f27f4ba9f89d720c08015e48f7c43b2b2 (diff) | |
download | kafka-python-ee6b9cb5b1310c48a3ed5b66be0dd0c4dd16dc43.tar.gz |
Fix python3 / python2 comments re queue/Queue
-rw-r--r-- | kafka/consumer/multiprocess.py | 10 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 10 |
2 files changed, 10 insertions, 10 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 5e421d6..18a5014 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -4,9 +4,9 @@ from collections import namedtuple import logging from multiprocessing import Process, Manager as MPManager try: - from Queue import Empty, Full # python 3 + import queue # python 3 except ImportError: - from queue import Empty, Full # python 2 + import Queue as queue # python 2 import time from ..common import KafkaError @@ -71,7 +71,7 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options): try: queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) break - except Full: + except queue.Full: if events.exit.is_set(): break count += 1 @@ -220,7 +220,7 @@ class MultiProcessConsumer(Consumer): # TODO: This is a hack and will make the consumer block for # at least one second. Need to find a better way of doing this partition, message = self.queue.get(block=True, timeout=1) - except Empty: + except queue.Empty: break # Count, check and commit messages if necessary @@ -270,7 +270,7 @@ class MultiProcessConsumer(Consumer): try: partition, message = self.queue.get(block_next_call, timeout) - except Empty: + except queue.Empty: break _msg = (partition, message) if self.partition_info else message diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index d8b5826..aad229a 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -6,9 +6,9 @@ except ImportError: from itertools import izip_longest as izip_longest, repeat # python 2 import logging try: - from Queue import Empty, Queue # python 3 + import queue # python 3 except ImportError: - from queue import Empty, Queue # python 2 + import Queue as queue # python 2 import sys import time @@ -136,7 +136,7 @@ class SimpleConsumer(Consumer): self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout self.auto_offset_reset = auto_offset_reset - self.queue = Queue() + self.queue = queue.Queue() def __repr__(self): return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ @@ -257,7 +257,7 @@ class SimpleConsumer(Consumer): if self.auto_commit: self.commit() - self.queue = Queue() + self.queue = queue.Queue() def get_messages(self, count=1, block=True, timeout=0.1): """ @@ -341,7 +341,7 @@ class SimpleConsumer(Consumer): return partition, message else: return message - except Empty: + except queue.Empty: log.debug('internal queue empty after fetch - returning None') return None |