summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/common.py39
-rw-r--r--kafka/future.py7
2 files changed, 33 insertions, 13 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 6a32372..cd93ff6 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -90,7 +90,9 @@ RetryOptions = namedtuple("RetryOptions",
class KafkaError(RuntimeError):
- pass
+ retriable = False
+ # whether metadata should be refreshed on error
+ invalid_metadata = False
class IllegalStateError(KafkaError):
@@ -101,24 +103,30 @@ class IllegalArgumentError(KafkaError):
pass
-class RetriableError(KafkaError):
- pass
+class DisconnectError(KafkaError):
+ retriable = True
+ invalid_metadata = True
-class DisconnectError(KafkaError):
- pass
+class NodeNotReadyError(KafkaError):
+ retriable = True
class CorrelationIdError(KafkaError):
- pass
+ retriable = True
class Cancelled(KafkaError):
- pass
+ retriable = True
class TooManyInFlightRequests(KafkaError):
- pass
+ retriable = True
+
+
+class StaleMetadata(KafkaError):
+ retriable = True
+ invalid_metadata = True
class BrokerResponseError(KafkaError):
@@ -161,6 +169,7 @@ class UnknownTopicOrPartitionError(BrokerResponseError):
message = 'UNKNOWN_TOPIC_OR_PARTITON'
description = ('This request is for a topic or partition that does not'
' exist on this broker.')
+ invalid_metadata = True
class InvalidFetchRequestError(BrokerResponseError):
@@ -173,8 +182,10 @@ class LeaderNotAvailableError(BrokerResponseError):
errno = 5
message = 'LEADER_NOT_AVAILABLE'
description = ('This error is thrown if we are in the middle of a'
- 'leadership election and there is currently no leader for'
- 'this partition and hence it is unavailable for writes.')
+ ' leadership election and there is currently no leader for'
+ ' this partition and hence it is unavailable for writes.')
+ retriable = True
+ invalid_metadata = True
class NotLeaderForPartitionError(BrokerResponseError):
@@ -184,6 +195,8 @@ class NotLeaderForPartitionError(BrokerResponseError):
' messages to a replica that is not the leader for some'
' partition. It indicates that the clients metadata is out'
' of date.')
+ retriable = True
+ invalid_metadata = True
class RequestTimedOutError(BrokerResponseError):
@@ -191,6 +204,7 @@ class RequestTimedOutError(BrokerResponseError):
message = 'REQUEST_TIMED_OUT'
description = ('This error is thrown if the request exceeds the'
' user-specified time limit in the request.')
+ retriable = True
class BrokerNotAvailableError(BrokerResponseError):
@@ -212,7 +226,7 @@ class MessageSizeTooLargeError(BrokerResponseError):
description = ('The server has a configurable maximum message size to avoid'
' unbounded memory allocation. This error is thrown if the'
' client attempt to produce a message larger than this'
- 'maximum.')
+ ' maximum.')
class StaleControllerEpochError(BrokerResponseError):
@@ -242,6 +256,7 @@ class GroupLoadInProgressError(BrokerResponseError):
' change for that offsets topic partition), or in response'
' to group membership requests (such as heartbeats) when'
' group metadata is being loaded by the coordinator.')
+ retriable = True
class GroupCoordinatorNotAvailableError(BrokerResponseError):
@@ -251,6 +266,7 @@ class GroupCoordinatorNotAvailableError(BrokerResponseError):
' requests, offset commits, and most group management'
' requests if the offsets topic has not yet been created, or'
' if the group coordinator is not active.')
+ retriable = True
class NotCoordinatorForGroupError(BrokerResponseError):
@@ -259,6 +275,7 @@ class NotCoordinatorForGroupError(BrokerResponseError):
description = ('The broker returns this error code if it receives an offset'
' fetch or commit request for a group that it is not a'
' coordinator for.')
+ retriable = True
class InvalidTopicError(BrokerResponseError):
diff --git a/kafka/future.py b/kafka/future.py
index 24173bb..20c31cf 100644
--- a/kafka/future.py
+++ b/kafka/future.py
@@ -1,4 +1,4 @@
-from kafka.common import RetriableError, IllegalStateError
+from kafka.common import IllegalStateError
class Future(object):
@@ -16,7 +16,10 @@ class Future(object):
return self.is_done and self.exception
def retriable(self):
- return isinstance(self.exception, RetriableError)
+ try:
+ return self.exception.retriable
+ except AttributeError:
+ return False
def success(self, value):
if self.is_done: