summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-05 23:22:37 -0700
committerDana Powers <dana.powers@rd.io>2015-06-06 19:27:56 -0700
commit1d5f4b1f889737ef3ba04d8303a02a4957a2d183 (patch)
treec2eb33cb630d0c698796f3f5e404cb0ec7d3b40d /kafka
parent2ba22bf4cebf5e25351816b38cd3cb70e2ea4cb8 (diff)
downloadkafka-python-1d5f4b1f889737ef3ba04d8303a02a4957a2d183.tar.gz
Add async_stop_timeout parameter to tune how long to let the producer
keep trying to send messages before timing out. Log an error if async producer was stopped before all messages sent.
Diffstat (limited to 'kafka')
-rw-r--r--kafka/producer/base.py29
1 files changed, 26 insertions, 3 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 0fd742d..18af342 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -39,11 +39,13 @@ ASYNC_RETRY_ON_TIMEOUTS = True
ASYNC_LOG_MESSAGES_ON_ERROR = True
STOP_ASYNC_PRODUCER = -1
+ASYNC_STOP_TIMEOUT_SECS = 30
def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout, retry_options, stop_event,
- log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR):
+ log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
+ stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
"""Private method to manage producing messages asynchronously
Listens on the queue for a specified number of messages or until
@@ -69,11 +71,23 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
log_messages_on_error (bool, optional): log stringified message-contents
on any produce error, otherwise only log a hash() of the contents,
defaults to True.
+ stop_timeout (int or float, optional): number of seconds to continue
+ retrying messages after stop_event is set, defaults to 30.
"""
request_tries = {}
client.reinit()
+ stop_at = None
while not (stop_event.is_set() and queue.empty() and not request_tries):
+
+ # Handle stop_timeout
+ if stop_event.is_set():
+ if not stop_at:
+ stop_at = stop_timeout + time.time()
+ if time.time() > stop_at:
+ log.debug('Async producer stopping due to stop_timeout')
+ break
+
timeout = batch_time
count = batch_size
send_at = time.time() + timeout
@@ -181,6 +195,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
orig_req.messages if log_messages_on_error
else hash(orig_req.messages))
+ if request_tries or not queue.empty():
+ log.error('Stopped producer with {0} unsent messages'
+ .format(len(request_tries) + queue.qsize()))
+
class Producer(object):
"""
@@ -219,6 +237,9 @@ class Producer(object):
requests, defaults to True (log full messages). Hash logging
will not allow you to identify the specific message that failed,
but it will allow you to match failures with retries.
+ async_stop_timeout (int or float, optional): seconds to continue
+ attempting to send queued messages after producer.stop(),
+ defaults to 30.
Deprecated Arguments:
batch_send (bool, optional): If True, messages are sent by a background
@@ -242,7 +263,8 @@ class Producer(object):
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
- async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR):
+ async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
+ async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
if async:
assert batch_send_every_n > 0
@@ -277,7 +299,8 @@ class Producer(object):
batch_send_every_t, batch_send_every_n,
self.req_acks, self.ack_timeout,
async_retry_options, self.thread_stop_event),
- kwargs={'log_messages_on_error': async_log_messages_on_error}
+ kwargs={'log_messages_on_error': async_log_messages_on_error,
+ 'stop_timeout': async_stop_timeout}
)
# Thread will die if main thread exits