summaryrefslogtreecommitdiff
path: root/kafka/common.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2014-05-07 00:04:04 -0700
committerDana Powers <dana.powers@gmail.com>2014-05-07 00:04:04 -0700
commitb47bf781eb0e96c3fef59cbe554325155062e129 (patch)
treec74db904c37ba1e70dab6dd2c78f4c1a4abd173f /kafka/common.py
parent3b18043821f37242bde2b186684fa05d36c61921 (diff)
parentb81bf5f69e24b0d0106693b6e47906669873ec18 (diff)
downloadkafka-python-b47bf781eb0e96c3fef59cbe554325155062e129.tar.gz
Merge pull request #158 from wizzat/add_tests
Improve Tests, fix connection error timeout, other issues
Diffstat (limited to 'kafka/common.py')
-rw-r--r--kafka/common.py124
1 files changed, 99 insertions, 25 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 005e6dd..d515532 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -48,29 +48,6 @@ Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
-ErrorStrings = {
- -1 : 'UNKNOWN',
- 0 : 'NO_ERROR',
- 1 : 'OFFSET_OUT_OF_RANGE',
- 2 : 'INVALID_MESSAGE',
- 3 : 'UNKNOWN_TOPIC_OR_PARTITON',
- 4 : 'INVALID_FETCH_SIZE',
- 5 : 'LEADER_NOT_AVAILABLE',
- 6 : 'NOT_LEADER_FOR_PARTITION',
- 7 : 'REQUEST_TIMED_OUT',
- 8 : 'BROKER_NOT_AVAILABLE',
- 9 : 'REPLICA_NOT_AVAILABLE',
- 10 : 'MESSAGE_SIZE_TOO_LARGE',
- 11 : 'STALE_CONTROLLER_EPOCH',
- 12 : 'OFFSET_METADATA_TOO_LARGE',
-}
-
-class ErrorMapping(object):
- pass
-
-for k, v in ErrorStrings.items():
- setattr(ErrorMapping, v, k)
-
#################
# Exceptions #
#################
@@ -80,11 +57,81 @@ class KafkaError(RuntimeError):
pass
-class KafkaUnavailableError(KafkaError):
+class BrokerResponseError(KafkaError):
pass
-class BrokerResponseError(KafkaError):
+class UnknownError(BrokerResponseError):
+ errno = -1
+ message = 'UNKNOWN'
+
+
+class OffsetOutOfRangeError(BrokerResponseError):
+ errno = 1
+ message = 'OFFSET_OUT_OF_RANGE'
+
+
+class InvalidMessageError(BrokerResponseError):
+ errno = 2
+ message = 'INVALID_MESSAGE'
+
+
+class UnknownTopicOrPartitionError(BrokerResponseError):
+ errno = 3
+ message = 'UNKNOWN_TOPIC_OR_PARTITON'
+
+
+class InvalidFetchRequestError(BrokerResponseError):
+ errno = 4
+ message = 'INVALID_FETCH_SIZE'
+
+
+class LeaderNotAvailableError(BrokerResponseError):
+ errno = 5
+ message = 'LEADER_NOT_AVAILABLE'
+
+
+class NotLeaderForPartitionError(BrokerResponseError):
+ errno = 6
+ message = 'NOT_LEADER_FOR_PARTITION'
+
+
+class RequestTimedOutError(BrokerResponseError):
+ errno = 7
+ message = 'REQUEST_TIMED_OUT'
+
+
+class BrokerNotAvailableError(BrokerResponseError):
+ errno = 8
+ message = 'BROKER_NOT_AVAILABLE'
+
+
+class ReplicaNotAvailableError(BrokerResponseError):
+ errno = 9
+ message = 'REPLICA_NOT_AVAILABLE'
+
+
+class MessageSizeTooLargeError(BrokerResponseError):
+ errno = 10
+ message = 'MESSAGE_SIZE_TOO_LARGE'
+
+
+class StaleControllerEpochError(BrokerResponseError):
+ errno = 11
+ message = 'STALE_CONTROLLER_EPOCH'
+
+
+class OffsetMetadataTooLargeError(BrokerResponseError):
+ errno = 12
+ message = 'OFFSET_METADATA_TOO_LARGE'
+
+
+class StaleLeaderEpochCodeError(BrokerResponseError):
+ errno = 13
+ message = 'STALE_LEADER_EPOCH_CODE'
+
+
+class KafkaUnavailableError(KafkaError):
pass
@@ -118,3 +165,30 @@ class ConsumerFetchSizeTooSmall(KafkaError):
class ConsumerNoMoreData(KafkaError):
pass
+
+
+class ProtocolError(KafkaError):
+ pass
+
+kafka_errors = {
+ -1 : UnknownError,
+ 1 : OffsetOutOfRangeError,
+ 2 : InvalidMessageError,
+ 3 : UnknownTopicOrPartitionError,
+ 4 : InvalidFetchRequestError,
+ 5 : LeaderNotAvailableError,
+ 6 : NotLeaderForPartitionError,
+ 7 : RequestTimedOutError,
+ 8 : BrokerNotAvailableError,
+ 9 : ReplicaNotAvailableError,
+ 10 : MessageSizeTooLargeError,
+ 11 : StaleControllerEpochError,
+ 12 : OffsetMetadataTooLargeError,
+ 13 : StaleLeaderEpochCodeError,
+}
+
+def check_error(response):
+ error = kafka_errors.get(response.error)
+ if error:
+ raise error(response)
+