summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/producer/base.py12
1 files changed, 10 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 8774c66..f2c7cfe 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -78,9 +78,17 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
retrying messages after stop_event is set, defaults to 30.
"""
request_tries = {}
- client.reinit()
- stop_at = None
+ while not stop_event.is_set():
+ try:
+ client.reinit()
+ except Exception as e:
+ log.warn('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms)
+ time.sleep(float(retry_options.backoff_ms) / 1000)
+ else:
+ break
+
+ stop_at = None
while not (stop_event.is_set() and queue.empty() and not request_tries):
# Handle stop_timeout