summaryrefslogtreecommitdiff
path: root/kafka/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/common.py')
-rw-r--r--kafka/common.py114
1 files changed, 89 insertions, 25 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 830e34d..d288b89 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -48,29 +48,6 @@ Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
-ErrorStrings = {
- -1 : 'UNKNOWN',
- 0 : 'NO_ERROR',
- 1 : 'OFFSET_OUT_OF_RANGE',
- 2 : 'INVALID_MESSAGE',
- 3 : 'UNKNOWN_TOPIC_OR_PARTITON',
- 4 : 'INVALID_FETCH_SIZE',
- 5 : 'LEADER_NOT_AVAILABLE',
- 6 : 'NOT_LEADER_FOR_PARTITION',
- 7 : 'REQUEST_TIMED_OUT',
- 8 : 'BROKER_NOT_AVAILABLE',
- 9 : 'REPLICA_NOT_AVAILABLE',
- 10 : 'MESSAGE_SIZE_TOO_LARGE',
- 11 : 'STALE_CONTROLLER_EPOCH',
- 12 : 'OFFSET_METADATA_TOO_LARGE',
-}
-
-class ErrorMapping(object):
- pass
-
-for k, v in ErrorStrings.items():
- setattr(ErrorMapping, v, k)
-
#################
# Exceptions #
#################
@@ -80,11 +57,76 @@ class KafkaError(RuntimeError):
pass
-class KafkaUnavailableError(KafkaError):
+class BrokerResponseError(KafkaError):
pass
-class BrokerResponseError(KafkaError):
+class UnknownError(BrokerResponseError):
+ errno = -1
+ message = 'UNKNOWN'
+
+
+class OffsetOutOfRangeError(BrokerResponseError):
+ errno = 1
+ message = 'OFFSET_OUT_OF_RANGE'
+
+
+class InvalidMessageError(BrokerResponseError):
+ errno = 2
+ message = 'INVALID_MESSAGE'
+
+
+class UnknownTopicOrPartitionError(BrokerResponseError):
+ errno = 3
+ message = 'UNKNOWN_TOPIC_OR_PARTITON'
+
+
+class InvalidFetchRequestError(BrokerResponseError):
+ errno = 4
+ message = 'INVALID_FETCH_SIZE'
+
+
+class LeaderNotAvailableError(BrokerResponseError):
+ errno = 5
+ message = 'LEADER_NOT_AVAILABLE'
+
+
+class NotLeaderForPartitionError(BrokerResponseError):
+ errno = 6
+ message = 'NOT_LEADER_FOR_PARTITION'
+
+
+class RequestTimedOutError(BrokerResponseError):
+ errno = 7
+ message = 'REQUEST_TIMED_OUT'
+
+
+class BrokerNotAvailableError(BrokerResponseError):
+ errno = 8
+ message = 'BROKER_NOT_AVAILABLE'
+
+
+class ReplicaNotAvailableError(BrokerResponseError):
+ errno = 9
+ message = 'REPLICA_NOT_AVAILABLE'
+
+
+class MessageSizeTooLargeError(BrokerResponseError):
+ errno = 10
+ message = 'MESSAGE_SIZE_TOO_LARGE'
+
+
+class StaleControllerEpochError(BrokerResponseError):
+ errno = 11
+ message = 'STALE_CONTROLLER_EPOCH'
+
+
+class OffsetMetadataTooLarge(BrokerResponseError):
+ errno = 12
+ message = 'OFFSET_METADATA_TOO_LARGE'
+
+
+class KafkaUnavailableError(KafkaError):
pass
@@ -122,3 +164,25 @@ class ConsumerNoMoreData(KafkaError):
class ProtocolError(KafkaError):
pass
+
+kafka_errors = {
+ -1 : UnknownError,
+ 1 : OffsetOutOfRangeError,
+ 2 : InvalidMessageError,
+ 3 : UnknownTopicOrPartitionError,
+ 4 : InvalidFetchRequestError,
+ 5 : LeaderNotAvailableError,
+ 6 : NotLeaderForPartitionError,
+ 7 : RequestTimedOutError,
+ 8 : BrokerNotAvailableError,
+ 9 : ReplicaNotAvailableError,
+ 10 : MessageSizeTooLargeError,
+ 11 : StaleControllerEpochError,
+ 12 : OffsetMetadataTooLarge,
+}
+
+def check_error(response):
+ error = kafka_errors.get(response.error)
+ if error:
+ raise error(response)
+