summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-06-04 13:21:12 -0700
committerDana Powers <dana.powers@gmail.com>2015-06-04 13:21:12 -0700
commit474aeaa833a8aebb9a115008de9b1ebd2926948d (patch)
tree9d858a24df0231433204e44738e2f5a9f1be8a66 /kafka
parent67424a22869b1906f7a02e2d895f68170f6d0f1d (diff)
parent7d6f3f541e0c380c0600eb607d927ec8f8cc966f (diff)
downloadkafka-python-474aeaa833a8aebb9a115008de9b1ebd2926948d.tar.gz
Merge pull request #331 from vshlapakov/feature-producer-retries
Async producer retries for failed messages
Diffstat (limited to 'kafka')
-rw-r--r--kafka/common.py26
-rw-r--r--kafka/producer/base.py120
-rw-r--r--kafka/producer/keyed.py17
-rw-r--r--kafka/producer/simple.py17
4 files changed, 152 insertions, 28 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 8207bec..8c13798 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -71,6 +71,11 @@ TopicAndPartition = namedtuple("TopicAndPartition",
KafkaMessage = namedtuple("KafkaMessage",
["topic", "partition", "offset", "key", "value"])
+# Define retry policy for async producer
+# Limit value: int >= 0, 0 means no retries
+RetryOptions = namedtuple("RetryOptions",
+ ["limit", "backoff_ms", "retry_on_timeouts"])
+
#################
# Exceptions #
@@ -205,6 +210,12 @@ class KafkaConfigurationError(KafkaError):
pass
+class AsyncProducerQueueFull(KafkaError):
+ def __init__(self, failed_msgs, *args):
+ super(AsyncProducerQueueFull, self).__init__(*args)
+ self.failed_msgs = failed_msgs
+
+
def _iter_broker_errors():
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:
@@ -218,3 +229,18 @@ def check_error(response):
if response.error:
error_class = kafka_errors.get(response.error, UnknownError)
raise error_class(response)
+
+
+RETRY_BACKOFF_ERROR_TYPES = (
+ KafkaUnavailableError, LeaderNotAvailableError,
+ ConnectionError, FailedPayloadsError
+)
+
+
+RETRY_REFRESH_ERROR_TYPES = (
+ NotLeaderForPartitionError, UnknownTopicOrPartitionError,
+ LeaderNotAvailableError, ConnectionError
+)
+
+
+RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 4bd3de4..2edeace 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -5,9 +5,9 @@ import logging
import time
try:
- from queue import Empty, Queue
+ from queue import Empty, Full, Queue
except ImportError:
- from Queue import Empty, Queue
+ from Queue import Empty, Full, Queue
from collections import defaultdict
from threading import Thread, Event
@@ -15,8 +15,13 @@ from threading import Thread, Event
import six
from kafka.common import (
- ProduceRequest, TopicAndPartition, UnsupportedCodecError
+ ProduceRequest, TopicAndPartition, RetryOptions,
+ kafka_errors, UnsupportedCodecError, FailedPayloadsError,
+ RequestTimedOutError, AsyncProducerQueueFull, UnknownError
)
+from kafka.common import (
+ RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
+
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from kafka.util import kafka_bytestring
@@ -25,21 +30,33 @@ log = logging.getLogger("kafka")
BATCH_SEND_DEFAULT_INTERVAL = 20
BATCH_SEND_MSG_COUNT = 20
+# unlimited
+ASYNC_QUEUE_MAXSIZE = 0
+ASYNC_QUEUE_PUT_TIMEOUT = 0
+# no retries by default
+ASYNC_RETRY_LIMIT = 0
+ASYNC_RETRY_BACKOFF_MS = 0
+ASYNC_RETRY_ON_TIMEOUTS = False
+
STOP_ASYNC_PRODUCER = -1
def _send_upstream(queue, client, codec, batch_time, batch_size,
- req_acks, ack_timeout, stop_event):
+ req_acks, ack_timeout, retry_options, stop_event):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
request
"""
- stop = False
+ reqs = {}
+ client.reinit()
while not stop_event.is_set():
timeout = batch_time
- count = batch_size
+
+ # it's a simplification: we're comparing message sets and
+ # messages: each set can contain [1..batch_size] messages
+ count = batch_size - len(reqs)
send_at = time.time() + timeout
msgset = defaultdict(list)
@@ -48,7 +65,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
while count > 0 and timeout >= 0:
try:
topic_partition, msg, key = queue.get(timeout=timeout)
-
except Empty:
break
@@ -63,20 +79,60 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
msgset[topic_partition].append((msg, key))
# Send collected requests upstream
- reqs = []
for topic_partition, msg in msgset.items():
messages = create_message_set(msg, codec, key)
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
- messages)
- reqs.append(req)
+ tuple(messages))
+ reqs[req] = 0
+
+ if not reqs:
+ continue
+
+ reqs_to_retry, error_cls = [], None
+ do_backoff, do_refresh = False, False
+
+ def _handle_error(error_cls, reqs, all_retries):
+ if ((error_cls == RequestTimedOutError and
+ retry_options.retry_on_timeouts) or
+ error_cls in RETRY_ERROR_TYPES):
+ all_retries += reqs
+ if error_cls in RETRY_BACKOFF_ERROR_TYPES:
+ do_backoff = True
+ if error_cls in RETRY_REFRESH_ERROR_TYPES:
+ do_refresh = True
try:
- client.send_produce_request(reqs,
- acks=req_acks,
- timeout=ack_timeout)
- except Exception:
- log.exception("Unable to send message")
+ reply = client.send_produce_request(reqs.keys(),
+ acks=req_acks,
+ timeout=ack_timeout,
+ fail_on_error=False)
+ for i, response in enumerate(reply):
+ if isinstance(response, FailedPayloadsError):
+ _handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry)
+ elif isinstance(response, ProduceResponse) and response.error:
+ error_cls = kafka_errors.get(response.error, UnknownError)
+ _handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry)
+
+ except Exception as ex:
+ error_cls = kafka_errors.get(type(ex), UnknownError)
+ _handle_error(error_cls, reqs.keys(), reqs_to_retry)
+
+ if not reqs_to_retry:
+ reqs = {}
+ continue
+
+ # doing backoff before next retry
+ if do_backoff and retry_options.backoff_ms:
+ log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms)
+ time.sleep(float(retry_options.backoff_ms) / 1000)
+
+ # refresh topic metadata before next retry
+ if do_refresh:
+ client.load_metadata_for_topics()
+
+ reqs = dict((key, count + 1) for (key, count) in reqs.items()
+ if key in reqs_to_retry and count < retry_options.limit)
class Producer(object):
@@ -111,12 +167,18 @@ class Producer(object):
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
+ async_retry_limit=ASYNC_RETRY_LIMIT,
+ async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
+ async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
+ async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
+ async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
if batch_send:
async = True
assert batch_send_every_n > 0
assert batch_send_every_t > 0
+ assert async_queue_maxsize >= 0
else:
batch_send_every_n = 1
batch_send_every_t = 3600
@@ -135,10 +197,13 @@ class Producer(object):
self.codec = codec
if self.async:
- log.warning("async producer does not guarantee message delivery!")
- log.warning("Current implementation does not retry Failed messages")
- log.warning("Use at your own risk! (or help improve with a PR!)")
- self.queue = Queue() # Messages are sent through this queue
+ # Messages are sent through this queue
+ self.queue = Queue(async_queue_maxsize)
+ self.async_queue_put_timeout = async_queue_put_timeout
+ async_retry_options = RetryOptions(
+ limit=async_retry_limit,
+ backoff_ms=async_retry_backoff_ms,
+ retry_on_timeouts=async_retry_on_timeouts)
self.thread_stop_event = Event()
self.thread = Thread(target=_send_upstream,
args=(self.queue,
@@ -148,6 +213,7 @@ class Producer(object):
batch_send_every_n,
self.req_acks,
self.ack_timeout,
+ async_retry_options,
self.thread_stop_event))
# Thread will die if main thread exits
@@ -199,8 +265,18 @@ class Producer(object):
raise TypeError("the key must be type bytes")
if self.async:
- for m in msg:
- self.queue.put((TopicAndPartition(topic, partition), m, key))
+ for idx, m in enumerate(msg):
+ try:
+ item = (TopicAndPartition(topic, partition), m, key)
+ if self.async_queue_put_timeout == 0:
+ self.queue.put_nowait(item)
+ else:
+ self.queue.put(item, True, self.async_queue_put_timeout)
+ except Full:
+ raise AsyncProducerQueueFull(
+ msg[idx:],
+ 'Producer async queue overfilled. '
+ 'Current queue size %d.' % self.queue.qsize())
resp = []
else:
messages = create_message_set([(m, key) for m in msg], self.codec, key)
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 333b6c0..5252976 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -7,7 +7,8 @@ from kafka.util import kafka_bytestring
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT
+ BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
+ ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
)
log = logging.getLogger("kafka")
@@ -37,7 +38,12 @@ class KeyedProducer(Producer):
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
+ async_retry_limit=ASYNC_RETRY_LIMIT,
+ async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
+ async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
+ async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
+ async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
if not partitioner:
partitioner = HashedPartitioner
self.partitioner_class = partitioner
@@ -46,7 +52,12 @@ class KeyedProducer(Producer):
super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
- batch_send_every_t)
+ batch_send_every_t,
+ async_retry_limit,
+ async_retry_backoff_ms,
+ async_retry_on_timeouts,
+ async_queue_maxsize,
+ async_queue_put_timeout)
def _next_partition(self, topic, key):
if topic not in self.partitioners:
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index 2699cf2..ded6eb6 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -10,7 +10,8 @@ from six.moves import xrange
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT
+ BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
+ ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
)
log = logging.getLogger("kafka")
@@ -45,13 +46,23 @@ class SimpleProducer(Producer):
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- random_start=True):
+ random_start=True,
+ async_retry_limit=ASYNC_RETRY_LIMIT,
+ async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
+ async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
+ async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
+ async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
self.partition_cycles = {}
self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
- batch_send_every_t)
+ batch_send_every_t,
+ async_retry_limit,
+ async_retry_backoff_ms,
+ async_retry_on_timeouts,
+ async_queue_maxsize,
+ async_queue_put_timeout)
def _next_partition(self, topic):
if topic not in self.partition_cycles: