diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-04 16:29:29 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-04 16:29:29 -0800 |
commit | 7dec992a4ebd4d98008aaa1e65a32f46db1b064a (patch) | |
tree | a59b4f175ce839ffc1132f445f7cf9736e4e2587 | |
parent | 8a74c20345e80c28cd92a0eec86fb3f593da6ed3 (diff) | |
download | kafka-python-7dec992a4ebd4d98008aaa1e65a32f46db1b064a.tar.gz |
client.reinit() can raise an exception; catch in async producer
-rw-r--r-- | kafka/producer/base.py | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 8774c66..f2c7cfe 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -78,9 +78,17 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, retrying messages after stop_event is set, defaults to 30. """ request_tries = {} - client.reinit() - stop_at = None + while not stop_event.is_set(): + try: + client.reinit() + except Exception as e: + log.warn('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms) + time.sleep(float(retry_options.backoff_ms) / 1000) + else: + break + + stop_at = None while not (stop_event.is_set() and queue.empty() and not request_tries): # Handle stop_timeout |