summaryrefslogtreecommitdiff
path: root/kafka/common.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-06-04 13:21:12 -0700
committerDana Powers <dana.powers@gmail.com>2015-06-04 13:21:12 -0700
commit474aeaa833a8aebb9a115008de9b1ebd2926948d (patch)
tree9d858a24df0231433204e44738e2f5a9f1be8a66 /kafka/common.py
parent67424a22869b1906f7a02e2d895f68170f6d0f1d (diff)
parent7d6f3f541e0c380c0600eb607d927ec8f8cc966f (diff)
downloadkafka-python-474aeaa833a8aebb9a115008de9b1ebd2926948d.tar.gz
Merge pull request #331 from vshlapakov/feature-producer-retries
Async producer retries for failed messages
Diffstat (limited to 'kafka/common.py')
-rw-r--r--kafka/common.py26
1 files changed, 26 insertions, 0 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 8207bec..8c13798 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -71,6 +71,11 @@ TopicAndPartition = namedtuple("TopicAndPartition",
KafkaMessage = namedtuple("KafkaMessage",
["topic", "partition", "offset", "key", "value"])
+# Define retry policy for async producer
+# Limit value: int >= 0, 0 means no retries
+RetryOptions = namedtuple("RetryOptions",
+ ["limit", "backoff_ms", "retry_on_timeouts"])
+
#################
# Exceptions #
@@ -205,6 +210,12 @@ class KafkaConfigurationError(KafkaError):
pass
+class AsyncProducerQueueFull(KafkaError):
+ def __init__(self, failed_msgs, *args):
+ super(AsyncProducerQueueFull, self).__init__(*args)
+ self.failed_msgs = failed_msgs
+
+
def _iter_broker_errors():
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:
@@ -218,3 +229,18 @@ def check_error(response):
if response.error:
error_class = kafka_errors.get(response.error, UnknownError)
raise error_class(response)
+
+
+RETRY_BACKOFF_ERROR_TYPES = (
+ KafkaUnavailableError, LeaderNotAvailableError,
+ ConnectionError, FailedPayloadsError
+)
+
+
+RETRY_REFRESH_ERROR_TYPES = (
+ NotLeaderForPartitionError, UnknownTopicOrPartitionError,
+ LeaderNotAvailableError, ConnectionError
+)
+
+
+RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES