summaryrefslogtreecommitdiff
path: root/kafka/producer
diff options
context:
space:
mode:
authorBalthasar Schopman <b.schopman@tech.leaseweb.com>2015-10-22 17:12:06 +0200
committerBalthasar Schopman <b.schopman@tech.leaseweb.com>2015-10-22 17:12:06 +0200
commit64cda819d9e87634b075725321b729aeede8471a (patch)
tree9767c0a3ba59e4a70fce0901050919bc87d60662 /kafka/producer
parente99a934bab1d551d07dd0c6365f6a730028489f3 (diff)
downloadkafka-python-64cda819d9e87634b075725321b729aeede8471a.tar.gz
Prevents crashing communication thread of async producer
If an uncaught exception occurs in _send_messages() the thread sending data to Kafka (asynchronously) will crash and the queue will never be emptied. To reproduce: 1) Run an Async producer. 2) Kill the Kafka server. 3) Restart the Kafka server. The communication thread dies shortly after step 2. After step 3 the communication does not resume without this commit. The changes in both files prevent an Exception from being thrown through to do main communication process, which could cause the crash.
Diffstat (limited to 'kafka/producer')
-rw-r--r--kafka/producer/base.py5
1 files changed, 4 insertions, 1 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 3c826cd..2281547 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -185,7 +185,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# refresh topic metadata before next retry
if retry_state['do_refresh']:
log.warn('Async producer forcing metadata refresh metadata before retrying')
- client.load_metadata_for_topics()
+ try:
+ client.load_metadata_for_topics()
+ except Exception as e:
+ log.error("Async producer couldn't reload topic metadata. Error: `%s`", e.message)
# Apply retry limit, dropping messages that are over
request_tries = dict(