summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/common.py26
-rw-r--r--kafka/producer/base.py120
-rw-r--r--kafka/producer/keyed.py17
-rw-r--r--kafka/producer/simple.py17
-rw-r--r--test/test_producer.py144
-rw-r--r--tox.ini1
6 files changed, 296 insertions, 29 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 8207bec..8c13798 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -71,6 +71,11 @@ TopicAndPartition = namedtuple("TopicAndPartition",
KafkaMessage = namedtuple("KafkaMessage",
["topic", "partition", "offset", "key", "value"])
+# Define retry policy for async producer
+# Limit value: int >= 0, 0 means no retries
+RetryOptions = namedtuple("RetryOptions",
+ ["limit", "backoff_ms", "retry_on_timeouts"])
+
#################
# Exceptions #
@@ -205,6 +210,12 @@ class KafkaConfigurationError(KafkaError):
pass
+class AsyncProducerQueueFull(KafkaError):
+ def __init__(self, failed_msgs, *args):
+ super(AsyncProducerQueueFull, self).__init__(*args)
+ self.failed_msgs = failed_msgs
+
+
def _iter_broker_errors():
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:
@@ -218,3 +229,18 @@ def check_error(response):
if response.error:
error_class = kafka_errors.get(response.error, UnknownError)
raise error_class(response)
+
+
+RETRY_BACKOFF_ERROR_TYPES = (
+ KafkaUnavailableError, LeaderNotAvailableError,
+ ConnectionError, FailedPayloadsError
+)
+
+
+RETRY_REFRESH_ERROR_TYPES = (
+ NotLeaderForPartitionError, UnknownTopicOrPartitionError,
+ LeaderNotAvailableError, ConnectionError
+)
+
+
+RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 4bd3de4..2edeace 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -5,9 +5,9 @@ import logging
import time
try:
- from queue import Empty, Queue
+ from queue import Empty, Full, Queue
except ImportError:
- from Queue import Empty, Queue
+ from Queue import Empty, Full, Queue
from collections import defaultdict
from threading import Thread, Event
@@ -15,8 +15,13 @@ from threading import Thread, Event
import six
from kafka.common import (
- ProduceRequest, TopicAndPartition, UnsupportedCodecError
+ ProduceRequest, TopicAndPartition, RetryOptions,
+ kafka_errors, UnsupportedCodecError, FailedPayloadsError,
+ RequestTimedOutError, AsyncProducerQueueFull, UnknownError
)
+from kafka.common import (
+ RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
+
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from kafka.util import kafka_bytestring
@@ -25,21 +30,33 @@ log = logging.getLogger("kafka")
BATCH_SEND_DEFAULT_INTERVAL = 20
BATCH_SEND_MSG_COUNT = 20
+# unlimited
+ASYNC_QUEUE_MAXSIZE = 0
+ASYNC_QUEUE_PUT_TIMEOUT = 0
+# no retries by default
+ASYNC_RETRY_LIMIT = 0
+ASYNC_RETRY_BACKOFF_MS = 0
+ASYNC_RETRY_ON_TIMEOUTS = False
+
STOP_ASYNC_PRODUCER = -1
def _send_upstream(queue, client, codec, batch_time, batch_size,
- req_acks, ack_timeout, stop_event):
+ req_acks, ack_timeout, retry_options, stop_event):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
request
"""
- stop = False
+ reqs = {}
+ client.reinit()
while not stop_event.is_set():
timeout = batch_time
- count = batch_size
+
+ # it's a simplification: we're comparing message sets and
+ # messages: each set can contain [1..batch_size] messages
+ count = batch_size - len(reqs)
send_at = time.time() + timeout
msgset = defaultdict(list)
@@ -48,7 +65,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
while count > 0 and timeout >= 0:
try:
topic_partition, msg, key = queue.get(timeout=timeout)
-
except Empty:
break
@@ -63,20 +79,60 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
msgset[topic_partition].append((msg, key))
# Send collected requests upstream
- reqs = []
for topic_partition, msg in msgset.items():
messages = create_message_set(msg, codec, key)
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
- messages)
- reqs.append(req)
+ tuple(messages))
+ reqs[req] = 0
+
+ if not reqs:
+ continue
+
+ reqs_to_retry, error_cls = [], None
+ do_backoff, do_refresh = False, False
+
+ def _handle_error(error_cls, reqs, all_retries):
+ if ((error_cls == RequestTimedOutError and
+ retry_options.retry_on_timeouts) or
+ error_cls in RETRY_ERROR_TYPES):
+ all_retries += reqs
+ if error_cls in RETRY_BACKOFF_ERROR_TYPES:
+ do_backoff = True
+ if error_cls in RETRY_REFRESH_ERROR_TYPES:
+ do_refresh = True
try:
- client.send_produce_request(reqs,
- acks=req_acks,
- timeout=ack_timeout)
- except Exception:
- log.exception("Unable to send message")
+ reply = client.send_produce_request(reqs.keys(),
+ acks=req_acks,
+ timeout=ack_timeout,
+ fail_on_error=False)
+ for i, response in enumerate(reply):
+ if isinstance(response, FailedPayloadsError):
+ _handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry)
+ elif isinstance(response, ProduceResponse) and response.error:
+ error_cls = kafka_errors.get(response.error, UnknownError)
+ _handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry)
+
+ except Exception as ex:
+ error_cls = kafka_errors.get(type(ex), UnknownError)
+ _handle_error(error_cls, reqs.keys(), reqs_to_retry)
+
+ if not reqs_to_retry:
+ reqs = {}
+ continue
+
+ # doing backoff before next retry
+ if do_backoff and retry_options.backoff_ms:
+ log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms)
+ time.sleep(float(retry_options.backoff_ms) / 1000)
+
+ # refresh topic metadata before next retry
+ if do_refresh:
+ client.load_metadata_for_topics()
+
+ reqs = dict((key, count + 1) for (key, count) in reqs.items()
+ if key in reqs_to_retry and count < retry_options.limit)
class Producer(object):
@@ -111,12 +167,18 @@ class Producer(object):
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
+ async_retry_limit=ASYNC_RETRY_LIMIT,
+ async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
+ async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
+ async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
+ async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
if batch_send:
async = True
assert batch_send_every_n > 0
assert batch_send_every_t > 0
+ assert async_queue_maxsize >= 0
else:
batch_send_every_n = 1
batch_send_every_t = 3600
@@ -135,10 +197,13 @@ class Producer(object):
self.codec = codec
if self.async:
- log.warning("async producer does not guarantee message delivery!")
- log.warning("Current implementation does not retry Failed messages")
- log.warning("Use at your own risk! (or help improve with a PR!)")
- self.queue = Queue() # Messages are sent through this queue
+ # Messages are sent through this queue
+ self.queue = Queue(async_queue_maxsize)
+ self.async_queue_put_timeout = async_queue_put_timeout
+ async_retry_options = RetryOptions(
+ limit=async_retry_limit,
+ backoff_ms=async_retry_backoff_ms,
+ retry_on_timeouts=async_retry_on_timeouts)
self.thread_stop_event = Event()
self.thread = Thread(target=_send_upstream,
args=(self.queue,
@@ -148,6 +213,7 @@ class Producer(object):
batch_send_every_n,
self.req_acks,
self.ack_timeout,
+ async_retry_options,
self.thread_stop_event))
# Thread will die if main thread exits
@@ -199,8 +265,18 @@ class Producer(object):
raise TypeError("the key must be type bytes")
if self.async:
- for m in msg:
- self.queue.put((TopicAndPartition(topic, partition), m, key))
+ for idx, m in enumerate(msg):
+ try:
+ item = (TopicAndPartition(topic, partition), m, key)
+ if self.async_queue_put_timeout == 0:
+ self.queue.put_nowait(item)
+ else:
+ self.queue.put(item, True, self.async_queue_put_timeout)
+ except Full:
+ raise AsyncProducerQueueFull(
+ msg[idx:],
+ 'Producer async queue overfilled. '
+ 'Current queue size %d.' % self.queue.qsize())
resp = []
else:
messages = create_message_set([(m, key) for m in msg], self.codec, key)
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 333b6c0..5252976 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -7,7 +7,8 @@ from kafka.util import kafka_bytestring
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT
+ BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
+ ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
)
log = logging.getLogger("kafka")
@@ -37,7 +38,12 @@ class KeyedProducer(Producer):
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
+ async_retry_limit=ASYNC_RETRY_LIMIT,
+ async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
+ async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
+ async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
+ async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
if not partitioner:
partitioner = HashedPartitioner
self.partitioner_class = partitioner
@@ -46,7 +52,12 @@ class KeyedProducer(Producer):
super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
- batch_send_every_t)
+ batch_send_every_t,
+ async_retry_limit,
+ async_retry_backoff_ms,
+ async_retry_on_timeouts,
+ async_queue_maxsize,
+ async_queue_put_timeout)
def _next_partition(self, topic, key):
if topic not in self.partitioners:
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index 2699cf2..ded6eb6 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -10,7 +10,8 @@ from six.moves import xrange
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT
+ BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
+ ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
)
log = logging.getLogger("kafka")
@@ -45,13 +46,23 @@ class SimpleProducer(Producer):
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- random_start=True):
+ random_start=True,
+ async_retry_limit=ASYNC_RETRY_LIMIT,
+ async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
+ async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
+ async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
+ async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
self.partition_cycles = {}
self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
- batch_send_every_t)
+ batch_send_every_t,
+ async_retry_limit,
+ async_retry_backoff_ms,
+ async_retry_on_timeouts,
+ async_queue_maxsize,
+ async_queue_put_timeout)
def _next_partition(self, topic):
if topic not in self.partition_cycles:
diff --git a/test/test_producer.py b/test/test_producer.py
index f6b3d6a..258b9c3 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -1,11 +1,26 @@
# -*- coding: utf-8 -*-
+import time
import logging
-from mock import MagicMock
+from mock import MagicMock, patch
from . import unittest
+from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions
+from kafka.common import AsyncProducerQueueFull
from kafka.producer.base import Producer
+from kafka.producer.base import _send_upstream
+from kafka.protocol import CODEC_NONE
+
+import threading
+try:
+ from queue import Empty, Queue
+except ImportError:
+ from Queue import Empty, Queue
+try:
+ xrange
+except NameError:
+ xrange = range
class TestKafkaProducer(unittest.TestCase):
@@ -40,3 +55,130 @@ class TestKafkaProducer(unittest.TestCase):
topic = b"test-topic"
producer.send_messages(topic, b'hi')
assert client.send_produce_request.called
+
+ @patch('kafka.producer.base._send_upstream')
+ def test_producer_async_queue_overfilled_batch_send(self, mock):
+ queue_size = 2
+ producer = Producer(MagicMock(), batch_send=True,
+ async_queue_maxsize=queue_size)
+
+ topic = b'test-topic'
+ partition = 0
+ message = b'test-message'
+
+ with self.assertRaises(AsyncProducerQueueFull):
+ message_list = [message] * (queue_size + 1)
+ producer.send_messages(topic, partition, *message_list)
+ self.assertEqual(producer.queue.qsize(), queue_size)
+ for _ in xrange(producer.queue.qsize()):
+ producer.queue.get()
+
+ @patch('kafka.producer.base._send_upstream')
+ def test_producer_async_queue_overfilled(self, mock):
+ queue_size = 2
+ producer = Producer(MagicMock(), async=True,
+ async_queue_maxsize=queue_size)
+
+ topic = b'test-topic'
+ partition = 0
+ message = b'test-message'
+
+ with self.assertRaises(AsyncProducerQueueFull):
+ message_list = [message] * (queue_size + 1)
+ producer.send_messages(topic, partition, *message_list)
+ self.assertEqual(producer.queue.qsize(), queue_size)
+ for _ in xrange(producer.queue.qsize()):
+ producer.queue.get()
+
+
+class TestKafkaProducerSendUpstream(unittest.TestCase):
+
+ def setUp(self):
+ self.client = MagicMock()
+ self.queue = Queue()
+
+ def _run_process(self, retries_limit=3, sleep_timeout=1):
+ # run _send_upstream process with the queue
+ stop_event = threading.Event()
+ retry_options = RetryOptions(limit=retries_limit,
+ backoff_ms=50,
+ retry_on_timeouts=False)
+ self.thread = threading.Thread(
+ target=_send_upstream,
+ args=(self.queue, self.client, CODEC_NONE,
+ 0.3, # batch time (seconds)
+ 3, # batch length
+ Producer.ACK_AFTER_LOCAL_WRITE,
+ Producer.DEFAULT_ACK_TIMEOUT,
+ retry_options,
+ stop_event))
+ self.thread.daemon = True
+ self.thread.start()
+ time.sleep(sleep_timeout)
+ stop_event.set()
+
+ def test_wo_retries(self):
+
+ # lets create a queue and add 10 messages for 1 partition
+ for i in range(10):
+ self.queue.put((TopicAndPartition("test", 0), "msg %i", "key %i"))
+
+ self._run_process()
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 4 non-void cals:
+ # 3 batches of 3 msgs each + 1 batch of 1 message
+ self.assertEqual(self.client.send_produce_request.call_count, 4)
+
+ def test_first_send_failed(self):
+
+ # lets create a queue and add 10 messages for 10 different partitions
+ # to show how retries should work ideally
+ for i in range(10):
+ self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+
+ self.client.is_first_time = True
+ def send_side_effect(reqs, *args, **kwargs):
+ if self.client.is_first_time:
+ self.client.is_first_time = False
+ return [FailedPayloadsError(reqs)]
+ return []
+
+ self.client.send_produce_request.side_effect = send_side_effect
+
+ self._run_process(2)
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 5 non-void cals: 1st failed batch of 3 msgs
+ # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
+ self.assertEqual(self.client.send_produce_request.call_count, 5)
+
+ def test_with_limited_retries(self):
+
+ # lets create a queue and add 10 messages for 10 different partitions
+ # to show how retries should work ideally
+ for i in range(10):
+ self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i))
+
+ def send_side_effect(reqs, *args, **kwargs):
+ return [FailedPayloadsError(reqs)]
+
+ self.client.send_produce_request.side_effect = send_side_effect
+
+ self._run_process(3, 3)
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 16 non-void cals:
+ # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg +
+ # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed
+ self.assertEqual(self.client.send_produce_request.call_count, 16)
+
+ def tearDown(self):
+ for _ in xrange(self.queue.qsize()):
+ self.queue.get()
diff --git a/tox.ini b/tox.ini
index fba7d8e..e3e8568 100644
--- a/tox.ini
+++ b/tox.ini
@@ -14,6 +14,7 @@ commands =
nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
setenv =
PROJECT_ROOT = {toxinidir}
+passenv = KAFKA_VERSION
[testenv:py33]
deps =