diff options
author | Balthasar Schopman <b.schopman@tech.leaseweb.com> | 2015-10-22 17:12:06 +0200 |
---|---|---|
committer | Balthasar Schopman <b.schopman@tech.leaseweb.com> | 2015-10-22 17:12:06 +0200 |
commit | 64cda819d9e87634b075725321b729aeede8471a (patch) | |
tree | 9767c0a3ba59e4a70fce0901050919bc87d60662 /kafka/producer | |
parent | e99a934bab1d551d07dd0c6365f6a730028489f3 (diff) | |
download | kafka-python-64cda819d9e87634b075725321b729aeede8471a.tar.gz |
Prevents crashing communication thread of async producer
If an uncaught exception occurs in _send_messages() the thread sending
data to Kafka (asynchronously) will crash and the queue will never be
emptied. To reproduce:
1) Run an Async producer.
2) Kill the Kafka server.
3) Restart the Kafka server.
The communication thread dies shortly after step 2. After step 3 the
communication does not resume without this commit.
The changes in both files prevent an Exception from being thrown through
to do main communication process, which could cause the crash.
Diffstat (limited to 'kafka/producer')
-rw-r--r-- | kafka/producer/base.py | 5 |
1 files changed, 4 insertions, 1 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 3c826cd..2281547 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -185,7 +185,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # refresh topic metadata before next retry if retry_state['do_refresh']: log.warn('Async producer forcing metadata refresh metadata before retrying') - client.load_metadata_for_topics() + try: + client.load_metadata_for_topics() + except Exception as e: + log.error("Async producer couldn't reload topic metadata. Error: `%s`", e.message) # Apply retry limit, dropping messages that are over request_tries = dict( |