summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/usage.rst2
-rw-r--r--kafka/producer/base.py259
-rw-r--r--kafka/producer/keyed.py4
-rw-r--r--kafka/producer/simple.py4
-rw-r--r--test/test_producer.py17
-rw-r--r--test/test_producer_integration.py4
6 files changed, 180 insertions, 110 deletions
diff --git a/docs/usage.rst b/docs/usage.rst
index 150d121..cdacfdc 100644
--- a/docs/usage.rst
+++ b/docs/usage.rst
@@ -47,7 +47,7 @@ SimpleProducer
# Notes:
# * If the producer dies before the messages are sent, there will be losses
# * Call producer.stop() to send the messages and cleanup
- producer = SimpleProducer(kafka, batch_send=True,
+ producer = SimpleProducer(kafka, async=True,
batch_send_every_n=20,
batch_send_every_t=60)
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 5cbc521..18af342 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -17,10 +17,9 @@ import six
from kafka.common import (
ProduceRequest, ProduceResponse, TopicAndPartition, RetryOptions,
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
- RequestTimedOutError, AsyncProducerQueueFull, UnknownError
+ RequestTimedOutError, AsyncProducerQueueFull, UnknownError,
+ RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES
)
-from kafka.common import (
- RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from kafka.util import kafka_bytestring
@@ -33,33 +32,75 @@ BATCH_SEND_MSG_COUNT = 20
# unlimited
ASYNC_QUEUE_MAXSIZE = 0
ASYNC_QUEUE_PUT_TIMEOUT = 0
-# no retries by default
-ASYNC_RETRY_LIMIT = 0
-ASYNC_RETRY_BACKOFF_MS = 0
-ASYNC_RETRY_ON_TIMEOUTS = False
+# unlimited retries by default
+ASYNC_RETRY_LIMIT = None
+ASYNC_RETRY_BACKOFF_MS = 100
+ASYNC_RETRY_ON_TIMEOUTS = True
+ASYNC_LOG_MESSAGES_ON_ERROR = True
STOP_ASYNC_PRODUCER = -1
+ASYNC_STOP_TIMEOUT_SECS = 30
def _send_upstream(queue, client, codec, batch_time, batch_size,
- req_acks, ack_timeout, retry_options, stop_event):
- """
- Listen on the queue for a specified number of messages or till
- a specified timeout and send them upstream to the brokers in one
- request
+ req_acks, ack_timeout, retry_options, stop_event,
+ log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
+ stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
+ """Private method to manage producing messages asynchronously
+
+ Listens on the queue for a specified number of messages or until
+ a specified timeout and then sends messages to the brokers in grouped
+ requests (one per broker).
+
+ Messages placed on the queue should be tuples that conform to this format:
+ ((topic, partition), message, key)
+
+ Currently does not mark messages with task_done. Do not attempt to join()!
+
+ Arguments:
+ queue (threading.Queue): the queue from which to get messages
+ client (KafkaClient): instance to use for communicating with brokers
+ codec (kafka.protocol.ALL_CODECS): compression codec to use
+ batch_time (int): interval in seconds to send message batches
+ batch_size (int): count of messages that will trigger an immediate send
+ req_acks: required acks to use with ProduceRequests. see server protocol
+ ack_timeout: timeout to wait for required acks. see server protocol
+ retry_options (RetryOptions): settings for retry limits, backoff etc
+ stop_event (threading.Event): event to monitor for shutdown signal.
+ when this event is 'set', the producer will stop sending messages.
+ log_messages_on_error (bool, optional): log stringified message-contents
+ on any produce error, otherwise only log a hash() of the contents,
+ defaults to True.
+ stop_timeout (int or float, optional): number of seconds to continue
+ retrying messages after stop_event is set, defaults to 30.
"""
- reqs = {}
+ request_tries = {}
client.reinit()
+ stop_at = None
- while not stop_event.is_set():
- timeout = batch_time
+ while not (stop_event.is_set() and queue.empty() and not request_tries):
+
+ # Handle stop_timeout
+ if stop_event.is_set():
+ if not stop_at:
+ stop_at = stop_timeout + time.time()
+ if time.time() > stop_at:
+ log.debug('Async producer stopping due to stop_timeout')
+ break
- # it's a simplification: we're comparing message sets and
- # messages: each set can contain [1..batch_size] messages
- count = batch_size - len(reqs)
+ timeout = batch_time
+ count = batch_size
send_at = time.time() + timeout
msgset = defaultdict(list)
+ # Merging messages will require a bit more work to manage correctly
+ # for now, dont look for new batches if we have old ones to retry
+ if request_tries:
+ count = 0
+ log.debug('Skipping new batch collection to handle retries')
+ else:
+ log.debug('Batching size: {0}, timeout: {1}'.format(count, timeout))
+
# Keep fetching till we gather enough messages or a
# timeout is reached
while count > 0 and timeout >= 0:
@@ -84,55 +125,79 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
tuple(messages))
- reqs[req] = 0
+ request_tries[req] = 0
- if not reqs:
+ if not request_tries:
continue
reqs_to_retry, error_cls = [], None
- do_backoff, do_refresh = False, False
-
- def _handle_error(error_cls, reqs, all_retries):
- if ((error_cls == RequestTimedOutError and
- retry_options.retry_on_timeouts) or
- error_cls in RETRY_ERROR_TYPES):
- all_retries += reqs
- if error_cls in RETRY_BACKOFF_ERROR_TYPES:
- do_backoff = True
- if error_cls in RETRY_REFRESH_ERROR_TYPES:
- do_refresh = True
-
- try:
- reply = client.send_produce_request(reqs.keys(),
- acks=req_acks,
- timeout=ack_timeout,
- fail_on_error=False)
- for i, response in enumerate(reply):
- if isinstance(response, FailedPayloadsError):
- _handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry)
- elif isinstance(response, ProduceResponse) and response.error:
- error_cls = kafka_errors.get(response.error, UnknownError)
- _handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry)
-
- except Exception as ex:
- error_cls = kafka_errors.get(type(ex), UnknownError)
- _handle_error(error_cls, reqs.keys(), reqs_to_retry)
+ retry_state = {
+ 'do_backoff': False,
+ 'do_refresh': False
+ }
+
+ def _handle_error(error_cls, request):
+ if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)):
+ reqs_to_retry.append(request)
+ if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES):
+ retry_state['do_backoff'] |= True
+ if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES):
+ retry_state['do_refresh'] |= True
+
+ reply = client.send_produce_request(request_tries.keys(),
+ acks=req_acks,
+ timeout=ack_timeout,
+ fail_on_error=False)
+ for i, response in enumerate(reply):
+ error_cls = None
+ if isinstance(response, FailedPayloadsError):
+ error_cls = response.__class__
+ orig_req = response.payload
+
+ elif isinstance(response, ProduceResponse) and response.error:
+ error_cls = kafka_errors.get(response.error, UnknownError)
+ orig_req = request_tries.keys()[i]
+
+ if error_cls:
+ _handle_error(error_cls, orig_req)
+ log.error('Error sending ProduceRequest to %s:%d with msgs %s',
+ orig_req.topic, orig_req.partition,
+ orig_req.messages if log_messages_on_error
+ else hash(orig_req.messages))
if not reqs_to_retry:
- reqs = {}
+ request_tries = {}
continue
# doing backoff before next retry
- if do_backoff and retry_options.backoff_ms:
- log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms)
+ if retry_state['do_backoff'] and retry_options.backoff_ms:
+ log.warn('Async producer backoff for %s(ms) before retrying', retry_options.backoff_ms)
time.sleep(float(retry_options.backoff_ms) / 1000)
# refresh topic metadata before next retry
- if do_refresh:
+ if retry_state['do_refresh']:
+ log.warn('Async producer forcing metadata refresh metadata before retrying')
client.load_metadata_for_topics()
- reqs = dict((key, count + 1) for (key, count) in reqs.items()
- if key in reqs_to_retry and count < retry_options.limit)
+ # Apply retry limit, dropping messages that are over
+ request_tries = dict(
+ (key, count + 1)
+ for (key, count) in request_tries.items()
+ if key in reqs_to_retry
+ and (retry_options.limit is None
+ or (count < retry_options.limit))
+ )
+
+ # Log messages we are going to retry
+ for orig_req in request_tries.keys():
+ log.info('Retrying ProduceRequest to %s:%d with msgs %s',
+ orig_req.topic, orig_req.partition,
+ orig_req.messages if log_messages_on_error
+ else hash(orig_req.messages))
+
+ if request_tries or not queue.empty():
+ log.error('Stopped producer with {0} unsent messages'
+ .format(len(request_tries) + queue.qsize()))
class Producer(object):
@@ -140,48 +205,71 @@ class Producer(object):
Base class to be used by producers
Arguments:
- client: The Kafka client instance to use
- async: If set to true, the messages are sent asynchronously via another
- thread (process). We will not wait for a response to these
- WARNING!!! current implementation of async producer does not
- guarantee message delivery. Use at your own risk! Or help us
- improve with a PR!
- req_acks: A value indicating the acknowledgements that the server must
- receive before responding to the request
- ack_timeout: Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send: If True, messages are send in batches
- batch_send_every_n: If set, messages are send in batches of this size
- batch_send_every_t: If set, messages are send after this timeout
+ client (KafkaClient): instance to use for broker communications.
+ codec (kafka.protocol.ALL_CODECS): compression codec to use.
+ req_acks (int, optional): A value indicating the acknowledgements that
+ the server must receive before responding to the request,
+ defaults to 1 (local ack).
+ ack_timeout (int, optional): millisecond timeout to wait for the
+ configured req_acks, defaults to 1000.
+ async (bool, optional): send message using a background thread,
+ defaults to False.
+ batch_send_every_n (int, optional): If async is True, messages are
+ sent in batches of this size, defaults to 20.
+ batch_send_every_t (int or float, optional): If async is True,
+ messages are sent immediately after this timeout in seconds, even
+ if there are fewer than batch_send_every_n, defaults to 20.
+ async_retry_limit (int, optional): number of retries for failed messages
+ or None for unlimited, defaults to None / unlimited.
+ async_retry_backoff_ms (int, optional): milliseconds to backoff on
+ failed messages, defaults to 100.
+ async_retry_on_timeouts (bool, optional): whether to retry on
+ RequestTimeoutError, defaults to True.
+ async_queue_maxsize (int, optional): limit to the size of the
+ internal message queue in number of messages (not size), defaults
+ to 0 (no limit).
+ async_queue_put_timeout (int or float, optional): timeout seconds
+ for queue.put in send_messages for async producers -- will only
+ apply if async_queue_maxsize > 0 and the queue is Full,
+ defaults to 0 (fail immediately on full queue).
+ async_log_messages_on_error (bool, optional): set to False and the
+ async producer will only log hash() contents on failed produce
+ requests, defaults to True (log full messages). Hash logging
+ will not allow you to identify the specific message that failed,
+ but it will allow you to match failures with retries.
+ async_stop_timeout (int or float, optional): seconds to continue
+ attempting to send queued messages after producer.stop(),
+ defaults to 30.
+
+ Deprecated Arguments:
+ batch_send (bool, optional): If True, messages are sent by a background
+ thread in batches, defaults to False. Deprecated, use 'async'
"""
-
ACK_NOT_REQUIRED = 0 # No ack is required
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed
-
DEFAULT_ACK_TIMEOUT = 1000
- def __init__(self, client, async=False,
+ def __init__(self, client,
req_acks=ACK_AFTER_LOCAL_WRITE,
ack_timeout=DEFAULT_ACK_TIMEOUT,
codec=None,
- batch_send=False,
+ async=False,
+ batch_send=False, # deprecated, use async
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
async_retry_limit=ASYNC_RETRY_LIMIT,
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
- async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
+ async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
+ async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
+ async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
- if batch_send:
- async = True
+ if async:
assert batch_send_every_n > 0
assert batch_send_every_t > 0
assert async_queue_maxsize >= 0
- else:
- batch_send_every_n = 1
- batch_send_every_t = 3600
self.client = client
self.async = async
@@ -205,16 +293,15 @@ class Producer(object):
backoff_ms=async_retry_backoff_ms,
retry_on_timeouts=async_retry_on_timeouts)
self.thread_stop_event = Event()
- self.thread = Thread(target=_send_upstream,
- args=(self.queue,
- self.client.copy(),
- self.codec,
- batch_send_every_t,
- batch_send_every_n,
- self.req_acks,
- self.ack_timeout,
- async_retry_options,
- self.thread_stop_event))
+ self.thread = Thread(
+ target=_send_upstream,
+ args=(self.queue, self.client.copy(), self.codec,
+ batch_send_every_t, batch_send_every_n,
+ self.req_acks, self.ack_timeout,
+ async_retry_options, self.thread_stop_event),
+ kwargs={'log_messages_on_error': async_log_messages_on_error,
+ 'stop_timeout': async_stop_timeout}
+ )
# Thread will die if main thread exits
self.thread.daemon = True
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 6bb2285..2de4dcc 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -49,8 +49,8 @@ class KeyedProducer(Producer):
self.partitioner_class = partitioner
self.partitioners = {}
- super(KeyedProducer, self).__init__(client, async, req_acks,
- ack_timeout, codec, batch_send,
+ super(KeyedProducer, self).__init__(client, req_acks, ack_timeout,
+ codec, async, batch_send,
batch_send_every_n,
batch_send_every_t,
async_retry_limit,
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index 78cc21c..280a02e 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -54,8 +54,8 @@ class SimpleProducer(Producer):
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
self.partition_cycles = {}
self.random_start = random_start
- super(SimpleProducer, self).__init__(client, async, req_acks,
- ack_timeout, codec, batch_send,
+ super(SimpleProducer, self).__init__(client, req_acks, ack_timeout,
+ codec, async, batch_send,
batch_send_every_n,
batch_send_every_t,
async_retry_limit,
diff --git a/test/test_producer.py b/test/test_producer.py
index 85a5a2e..c12af02 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -57,23 +57,6 @@ class TestKafkaProducer(unittest.TestCase):
assert client.send_produce_request.called
@patch('kafka.producer.base._send_upstream')
- def test_producer_async_queue_overfilled_batch_send(self, mock):
- queue_size = 2
- producer = Producer(MagicMock(), batch_send=True,
- async_queue_maxsize=queue_size)
-
- topic = b'test-topic'
- partition = 0
- message = b'test-message'
-
- with self.assertRaises(AsyncProducerQueueFull):
- message_list = [message] * (queue_size + 1)
- producer.send_messages(topic, partition, *message_list)
- self.assertEqual(producer.queue.qsize(), queue_size)
- for _ in xrange(producer.queue.qsize()):
- producer.queue.get()
-
- @patch('kafka.producer.base._send_upstream')
def test_producer_async_queue_overfilled(self, mock):
queue_size = 2
producer = Producer(MagicMock(), async=True,
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 099b975..3c414e1 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -221,7 +221,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
batch_interval = 5
producer = SimpleProducer(
self.client,
- batch_send=True,
+ async=True,
batch_send_every_n=batch_messages,
batch_send_every_t=batch_interval,
random_start=False)
@@ -287,7 +287,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
batch_interval = 5
producer = SimpleProducer(
self.client,
- batch_send=True,
+ async=True,
batch_send_every_n=100,
batch_send_every_t=batch_interval,
random_start=False)