summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-04 16:29:29 -0800
committerDana Powers <dana.powers@rd.io>2015-12-04 16:29:29 -0800
commit7dec992a4ebd4d98008aaa1e65a32f46db1b064a (patch)
treea59b4f175ce839ffc1132f445f7cf9736e4e2587
parent8a74c20345e80c28cd92a0eec86fb3f593da6ed3 (diff)
downloadkafka-python-7dec992a4ebd4d98008aaa1e65a32f46db1b064a.tar.gz
client.reinit() can raise an exception; catch in async producer
-rw-r--r--kafka/producer/base.py12
1 files changed, 10 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 8774c66..f2c7cfe 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -78,9 +78,17 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
retrying messages after stop_event is set, defaults to 30.
"""
request_tries = {}
- client.reinit()
- stop_at = None
+ while not stop_event.is_set():
+ try:
+ client.reinit()
+ except Exception as e:
+ log.warn('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms)
+ time.sleep(float(retry_options.backoff_ms) / 1000)
+ else:
+ break
+
+ stop_at = None
while not (stop_event.is_set() and queue.empty() and not request_tries):
# Handle stop_timeout