diff options
author | Mark Roberts <wizzat@gmail.com> | 2014-04-30 17:39:39 -0700 |
---|---|---|
committer | Mark Roberts <wizzat@gmail.com> | 2014-04-30 17:39:39 -0700 |
commit | 0d57c2718fcf3819f2c18911126f245e9e9ce3e0 (patch) | |
tree | 378fd3c881fccdec3b284c6a356e44dfe2e66e60 /kafka/consumer.py | |
parent | 57913f9f914a959f52bc9040a172f8c9ff77e491 (diff) | |
download | kafka-python-0d57c2718fcf3819f2c18911126f245e9e9ce3e0.tar.gz |
Make BrokerRequestError a base class, make subclasses for each broker error
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 16 |
1 files changed, 7 insertions, 9 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 98f18a0..43b8797 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -8,8 +8,9 @@ from threading import Lock from multiprocessing import Process, Queue as MPQueue, Event, Value from Queue import Empty, Queue +import kafka from kafka.common import ( - ErrorMapping, FetchRequest, + FetchRequest, OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData @@ -100,14 +101,11 @@ class Consumer(object): self.commit_timer.start() def get_or_init_offset_callback(resp): - if resp.error == ErrorMapping.NO_ERROR: + try: + kafka.common.check_error(resp) return resp.offset - elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: + except kafka.common.UnknownTopicOrPartitionError: return 0 - else: - raise ProtocolError("OffsetFetchRequest for topic=%s, " - "partition=%d failed with errorcode=%s" % ( - resp.topic, resp.partition, resp.error)) if auto_commit: for partition in partitions: @@ -432,7 +430,7 @@ class SimpleConsumer(Consumer): # Put the message in our queue self.queue.put((partition, message)) self.fetch_offsets[partition] = message.offset + 1 - except ConsumerFetchSizeTooSmall, e: + except ConsumerFetchSizeTooSmall as e: if (self.max_buffer_size is not None and self.buffer_size == self.max_buffer_size): log.error("Max fetch size %d too small", @@ -446,7 +444,7 @@ class SimpleConsumer(Consumer): log.warn("Fetch size too small, increase to %d (2x) " "and retry", self.buffer_size) retry_partitions.add(partition) - except ConsumerNoMoreData, e: + except ConsumerNoMoreData as e: log.debug("Iteration was ended by %r", e) except StopIteration: # Stop iterating through this partition |