diff options
author | Dana Powers <dana.powers@gmail.com> | 2014-05-07 00:04:04 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2014-05-07 00:04:04 -0700 |
commit | b47bf781eb0e96c3fef59cbe554325155062e129 (patch) | |
tree | c74db904c37ba1e70dab6dd2c78f4c1a4abd173f /kafka/common.py | |
parent | 3b18043821f37242bde2b186684fa05d36c61921 (diff) | |
parent | b81bf5f69e24b0d0106693b6e47906669873ec18 (diff) | |
download | kafka-python-b47bf781eb0e96c3fef59cbe554325155062e129.tar.gz |
Merge pull request #158 from wizzat/add_tests
Improve Tests, fix connection error timeout, other issues
Diffstat (limited to 'kafka/common.py')
-rw-r--r-- | kafka/common.py | 124 |
1 files changed, 99 insertions, 25 deletions
diff --git a/kafka/common.py b/kafka/common.py index 005e6dd..d515532 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -48,29 +48,6 @@ Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) -ErrorStrings = { - -1 : 'UNKNOWN', - 0 : 'NO_ERROR', - 1 : 'OFFSET_OUT_OF_RANGE', - 2 : 'INVALID_MESSAGE', - 3 : 'UNKNOWN_TOPIC_OR_PARTITON', - 4 : 'INVALID_FETCH_SIZE', - 5 : 'LEADER_NOT_AVAILABLE', - 6 : 'NOT_LEADER_FOR_PARTITION', - 7 : 'REQUEST_TIMED_OUT', - 8 : 'BROKER_NOT_AVAILABLE', - 9 : 'REPLICA_NOT_AVAILABLE', - 10 : 'MESSAGE_SIZE_TOO_LARGE', - 11 : 'STALE_CONTROLLER_EPOCH', - 12 : 'OFFSET_METADATA_TOO_LARGE', -} - -class ErrorMapping(object): - pass - -for k, v in ErrorStrings.items(): - setattr(ErrorMapping, v, k) - ################# # Exceptions # ################# @@ -80,11 +57,81 @@ class KafkaError(RuntimeError): pass -class KafkaUnavailableError(KafkaError): +class BrokerResponseError(KafkaError): pass -class BrokerResponseError(KafkaError): +class UnknownError(BrokerResponseError): + errno = -1 + message = 'UNKNOWN' + + +class OffsetOutOfRangeError(BrokerResponseError): + errno = 1 + message = 'OFFSET_OUT_OF_RANGE' + + +class InvalidMessageError(BrokerResponseError): + errno = 2 + message = 'INVALID_MESSAGE' + + +class UnknownTopicOrPartitionError(BrokerResponseError): + errno = 3 + message = 'UNKNOWN_TOPIC_OR_PARTITON' + + +class InvalidFetchRequestError(BrokerResponseError): + errno = 4 + message = 'INVALID_FETCH_SIZE' + + +class LeaderNotAvailableError(BrokerResponseError): + errno = 5 + message = 'LEADER_NOT_AVAILABLE' + + +class NotLeaderForPartitionError(BrokerResponseError): + errno = 6 + message = 'NOT_LEADER_FOR_PARTITION' + + +class RequestTimedOutError(BrokerResponseError): + errno = 7 + message = 'REQUEST_TIMED_OUT' + + +class BrokerNotAvailableError(BrokerResponseError): + errno = 8 + message = 'BROKER_NOT_AVAILABLE' + + +class ReplicaNotAvailableError(BrokerResponseError): + errno = 9 + message = 'REPLICA_NOT_AVAILABLE' + + +class MessageSizeTooLargeError(BrokerResponseError): + errno = 10 + message = 'MESSAGE_SIZE_TOO_LARGE' + + +class StaleControllerEpochError(BrokerResponseError): + errno = 11 + message = 'STALE_CONTROLLER_EPOCH' + + +class OffsetMetadataTooLargeError(BrokerResponseError): + errno = 12 + message = 'OFFSET_METADATA_TOO_LARGE' + + +class StaleLeaderEpochCodeError(BrokerResponseError): + errno = 13 + message = 'STALE_LEADER_EPOCH_CODE' + + +class KafkaUnavailableError(KafkaError): pass @@ -118,3 +165,30 @@ class ConsumerFetchSizeTooSmall(KafkaError): class ConsumerNoMoreData(KafkaError): pass + + +class ProtocolError(KafkaError): + pass + +kafka_errors = { + -1 : UnknownError, + 1 : OffsetOutOfRangeError, + 2 : InvalidMessageError, + 3 : UnknownTopicOrPartitionError, + 4 : InvalidFetchRequestError, + 5 : LeaderNotAvailableError, + 6 : NotLeaderForPartitionError, + 7 : RequestTimedOutError, + 8 : BrokerNotAvailableError, + 9 : ReplicaNotAvailableError, + 10 : MessageSizeTooLargeError, + 11 : StaleControllerEpochError, + 12 : OffsetMetadataTooLargeError, + 13 : StaleLeaderEpochCodeError, +} + +def check_error(response): + error = kafka_errors.get(response.error) + if error: + raise error(response) + |