summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-30 17:39:39 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-30 17:39:39 -0700
commit0d57c2718fcf3819f2c18911126f245e9e9ce3e0 (patch)
tree378fd3c881fccdec3b284c6a356e44dfe2e66e60 /kafka/consumer.py
parent57913f9f914a959f52bc9040a172f8c9ff77e491 (diff)
downloadkafka-python-0d57c2718fcf3819f2c18911126f245e9e9ce3e0.tar.gz
Make BrokerRequestError a base class, make subclasses for each broker error
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py16
1 files changed, 7 insertions, 9 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 98f18a0..43b8797 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -8,8 +8,9 @@ from threading import Lock
from multiprocessing import Process, Queue as MPQueue, Event, Value
from Queue import Empty, Queue
+import kafka
from kafka.common import (
- ErrorMapping, FetchRequest,
+ FetchRequest,
OffsetRequest, OffsetCommitRequest,
OffsetFetchRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
@@ -100,14 +101,11 @@ class Consumer(object):
self.commit_timer.start()
def get_or_init_offset_callback(resp):
- if resp.error == ErrorMapping.NO_ERROR:
+ try:
+ kafka.common.check_error(resp)
return resp.offset
- elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
+ except kafka.common.UnknownTopicOrPartitionError:
return 0
- else:
- raise ProtocolError("OffsetFetchRequest for topic=%s, "
- "partition=%d failed with errorcode=%s" % (
- resp.topic, resp.partition, resp.error))
if auto_commit:
for partition in partitions:
@@ -432,7 +430,7 @@ class SimpleConsumer(Consumer):
# Put the message in our queue
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
- except ConsumerFetchSizeTooSmall, e:
+ except ConsumerFetchSizeTooSmall as e:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
log.error("Max fetch size %d too small",
@@ -446,7 +444,7 @@ class SimpleConsumer(Consumer):
log.warn("Fetch size too small, increase to %d (2x) "
"and retry", self.buffer_size)
retry_partitions.add(partition)
- except ConsumerNoMoreData, e:
+ except ConsumerNoMoreData as e:
log.debug("Iteration was ended by %r", e)
except StopIteration:
# Stop iterating through this partition