diff options
Diffstat (limited to 'kafka/common.py')
-rw-r--r-- | kafka/common.py | 114 |
1 files changed, 89 insertions, 25 deletions
diff --git a/kafka/common.py b/kafka/common.py index 830e34d..d288b89 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -48,29 +48,6 @@ Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) -ErrorStrings = { - -1 : 'UNKNOWN', - 0 : 'NO_ERROR', - 1 : 'OFFSET_OUT_OF_RANGE', - 2 : 'INVALID_MESSAGE', - 3 : 'UNKNOWN_TOPIC_OR_PARTITON', - 4 : 'INVALID_FETCH_SIZE', - 5 : 'LEADER_NOT_AVAILABLE', - 6 : 'NOT_LEADER_FOR_PARTITION', - 7 : 'REQUEST_TIMED_OUT', - 8 : 'BROKER_NOT_AVAILABLE', - 9 : 'REPLICA_NOT_AVAILABLE', - 10 : 'MESSAGE_SIZE_TOO_LARGE', - 11 : 'STALE_CONTROLLER_EPOCH', - 12 : 'OFFSET_METADATA_TOO_LARGE', -} - -class ErrorMapping(object): - pass - -for k, v in ErrorStrings.items(): - setattr(ErrorMapping, v, k) - ################# # Exceptions # ################# @@ -80,11 +57,76 @@ class KafkaError(RuntimeError): pass -class KafkaUnavailableError(KafkaError): +class BrokerResponseError(KafkaError): pass -class BrokerResponseError(KafkaError): +class UnknownError(BrokerResponseError): + errno = -1 + message = 'UNKNOWN' + + +class OffsetOutOfRangeError(BrokerResponseError): + errno = 1 + message = 'OFFSET_OUT_OF_RANGE' + + +class InvalidMessageError(BrokerResponseError): + errno = 2 + message = 'INVALID_MESSAGE' + + +class UnknownTopicOrPartitionError(BrokerResponseError): + errno = 3 + message = 'UNKNOWN_TOPIC_OR_PARTITON' + + +class InvalidFetchRequestError(BrokerResponseError): + errno = 4 + message = 'INVALID_FETCH_SIZE' + + +class LeaderNotAvailableError(BrokerResponseError): + errno = 5 + message = 'LEADER_NOT_AVAILABLE' + + +class NotLeaderForPartitionError(BrokerResponseError): + errno = 6 + message = 'NOT_LEADER_FOR_PARTITION' + + +class RequestTimedOutError(BrokerResponseError): + errno = 7 + message = 'REQUEST_TIMED_OUT' + + +class BrokerNotAvailableError(BrokerResponseError): + errno = 8 + message = 'BROKER_NOT_AVAILABLE' + + +class ReplicaNotAvailableError(BrokerResponseError): + errno = 9 + message = 'REPLICA_NOT_AVAILABLE' + + +class MessageSizeTooLargeError(BrokerResponseError): + errno = 10 + message = 'MESSAGE_SIZE_TOO_LARGE' + + +class StaleControllerEpochError(BrokerResponseError): + errno = 11 + message = 'STALE_CONTROLLER_EPOCH' + + +class OffsetMetadataTooLarge(BrokerResponseError): + errno = 12 + message = 'OFFSET_METADATA_TOO_LARGE' + + +class KafkaUnavailableError(KafkaError): pass @@ -122,3 +164,25 @@ class ConsumerNoMoreData(KafkaError): class ProtocolError(KafkaError): pass + +kafka_errors = { + -1 : UnknownError, + 1 : OffsetOutOfRangeError, + 2 : InvalidMessageError, + 3 : UnknownTopicOrPartitionError, + 4 : InvalidFetchRequestError, + 5 : LeaderNotAvailableError, + 6 : NotLeaderForPartitionError, + 7 : RequestTimedOutError, + 8 : BrokerNotAvailableError, + 9 : ReplicaNotAvailableError, + 10 : MessageSizeTooLargeError, + 11 : StaleControllerEpochError, + 12 : OffsetMetadataTooLarge, +} + +def check_error(response): + error = kafka_errors.get(response.error) + if error: + raise error(response) + |