summaryrefslogtreecommitdiff
path: root/kafka/common.py
diff options
context:
space:
mode:
authorEnrico Canzonieri <ecanzonieri@gmail.com>2015-01-26 15:01:51 -0800
committerEnrico Canzonieri <enrico@yelp.com>2015-01-26 15:01:51 -0800
commit9ab8415ed75b08c5de9f823708027bb4f10a0643 (patch)
treef2011cb5cbdc4d5cd3d9bff9c52b35c2a6aca2ad /kafka/common.py
parentf517ddf283a86947a15f95e5ec562e81f4c477e5 (diff)
parent587206ff6ad59ae01248d24ff9c9fadbdfc1c1fc (diff)
downloadkafka-python-9ab8415ed75b08c5de9f823708027bb4f10a0643.tar.gz
Merge branch 'master' of github.com:mumrah/kafka-python into validate_consumer_offset
Conflicts: kafka/consumer/simple.py
Diffstat (limited to 'kafka/common.py')
-rw-r--r--kafka/common.py36
1 files changed, 12 insertions, 24 deletions
diff --git a/kafka/common.py b/kafka/common.py
index e4b3b1b..b7bb06c 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -1,3 +1,5 @@
+import inspect
+import sys
from collections import namedtuple
###############
@@ -79,9 +81,6 @@ class KafkaError(RuntimeError):
class BrokerResponseError(KafkaError):
pass
-class NoError(BrokerResponseError):
- errno = 0
- message = 'SUCCESS'
class UnknownError(BrokerResponseError):
errno = -1
@@ -201,27 +200,16 @@ class KafkaConfigurationError(KafkaError):
pass
-kafka_errors = {
- -1 : UnknownError,
- 0 : NoError,
- 1 : OffsetOutOfRangeError,
- 2 : InvalidMessageError,
- 3 : UnknownTopicOrPartitionError,
- 4 : InvalidFetchRequestError,
- 5 : LeaderNotAvailableError,
- 6 : NotLeaderForPartitionError,
- 7 : RequestTimedOutError,
- 8 : BrokerNotAvailableError,
- 9 : ReplicaNotAvailableError,
- 10 : MessageSizeTooLargeError,
- 11 : StaleControllerEpochError,
- 12 : OffsetMetadataTooLargeError,
- 13 : StaleLeaderEpochCodeError,
-}
+def _iter_broker_errors():
+ for name, obj in inspect.getmembers(sys.modules[__name__]):
+ if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:
+ yield obj
-def check_error(response):
- error = kafka_errors.get(response.error, UnknownError)
- if error is not NoError:
- raise error(response)
+kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()])
+
+def check_error(response):
+ if response.error:
+ error_class = kafka_errors.get(response.error, UnknownError)
+ raise error_class(response)