diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-05 23:22:37 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-06 19:27:56 -0700 |
commit | 1d5f4b1f889737ef3ba04d8303a02a4957a2d183 (patch) | |
tree | c2eb33cb630d0c698796f3f5e404cb0ec7d3b40d /kafka | |
parent | 2ba22bf4cebf5e25351816b38cd3cb70e2ea4cb8 (diff) | |
download | kafka-python-1d5f4b1f889737ef3ba04d8303a02a4957a2d183.tar.gz |
Add async_stop_timeout parameter to tune how long to let the producer
keep trying to send messages before timing out.
Log an error if async producer was stopped before all messages sent.
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/producer/base.py | 29 |
1 files changed, 26 insertions, 3 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 0fd742d..18af342 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -39,11 +39,13 @@ ASYNC_RETRY_ON_TIMEOUTS = True ASYNC_LOG_MESSAGES_ON_ERROR = True STOP_ASYNC_PRODUCER = -1 +ASYNC_STOP_TIMEOUT_SECS = 30 def _send_upstream(queue, client, codec, batch_time, batch_size, req_acks, ack_timeout, retry_options, stop_event, - log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR): + log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, + stop_timeout=ASYNC_STOP_TIMEOUT_SECS): """Private method to manage producing messages asynchronously Listens on the queue for a specified number of messages or until @@ -69,11 +71,23 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, log_messages_on_error (bool, optional): log stringified message-contents on any produce error, otherwise only log a hash() of the contents, defaults to True. + stop_timeout (int or float, optional): number of seconds to continue + retrying messages after stop_event is set, defaults to 30. """ request_tries = {} client.reinit() + stop_at = None while not (stop_event.is_set() and queue.empty() and not request_tries): + + # Handle stop_timeout + if stop_event.is_set(): + if not stop_at: + stop_at = stop_timeout + time.time() + if time.time() > stop_at: + log.debug('Async producer stopping due to stop_timeout') + break + timeout = batch_time count = batch_size send_at = time.time() + timeout @@ -181,6 +195,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, orig_req.messages if log_messages_on_error else hash(orig_req.messages)) + if request_tries or not queue.empty(): + log.error('Stopped producer with {0} unsent messages' + .format(len(request_tries) + queue.qsize())) + class Producer(object): """ @@ -219,6 +237,9 @@ class Producer(object): requests, defaults to True (log full messages). Hash logging will not allow you to identify the specific message that failed, but it will allow you to match failures with retries. + async_stop_timeout (int or float, optional): seconds to continue + attempting to send queued messages after producer.stop(), + defaults to 30. Deprecated Arguments: batch_send (bool, optional): If True, messages are sent by a background @@ -242,7 +263,8 @@ class Producer(object): async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT, - async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR): + async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, + async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS): if async: assert batch_send_every_n > 0 @@ -277,7 +299,8 @@ class Producer(object): batch_send_every_t, batch_send_every_n, self.req_acks, self.ack_timeout, async_retry_options, self.thread_stop_event), - kwargs={'log_messages_on_error': async_log_messages_on_error} + kwargs={'log_messages_on_error': async_log_messages_on_error, + 'stop_timeout': async_stop_timeout} ) # Thread will die if main thread exits |