diff options
author | Balthasar Schopman <b.schopman@tech.leaseweb.com> | 2015-10-22 17:12:06 +0200 |
---|---|---|
committer | Balthasar Schopman <b.schopman@tech.leaseweb.com> | 2015-10-22 17:12:06 +0200 |
commit | 64cda819d9e87634b075725321b729aeede8471a (patch) | |
tree | 9767c0a3ba59e4a70fce0901050919bc87d60662 /kafka | |
parent | e99a934bab1d551d07dd0c6365f6a730028489f3 (diff) | |
download | kafka-python-64cda819d9e87634b075725321b729aeede8471a.tar.gz |
Prevents crashing communication thread of async producer
If an uncaught exception occurs in _send_messages() the thread sending
data to Kafka (asynchronously) will crash and the queue will never be
emptied. To reproduce:
1) Run an Async producer.
2) Kill the Kafka server.
3) Restart the Kafka server.
The communication thread dies shortly after step 2. After step 3 the
communication does not resume without this commit.
The changes in both files prevent an Exception from being thrown through
to do main communication process, which could cause the crash.
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 6 | ||||
-rw-r--r-- | kafka/producer/base.py | 5 |
2 files changed, 9 insertions, 2 deletions
diff --git a/kafka/client.py b/kafka/client.py index 13777a4..c05e142 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -478,7 +478,11 @@ class KafkaClient(object): else: decoder = KafkaProtocol.decode_produce_response - resps = self._send_broker_aware_request(payloads, encoder, decoder) + try: + resps = self._send_broker_aware_request(payloads, encoder, decoder) + except Exception: + if fail_on_error: + raise return [resp if not callback else callback(resp) for resp in resps if resp is not None and diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 3c826cd..2281547 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -185,7 +185,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # refresh topic metadata before next retry if retry_state['do_refresh']: log.warn('Async producer forcing metadata refresh metadata before retrying') - client.load_metadata_for_topics() + try: + client.load_metadata_for_topics() + except Exception as e: + log.error("Async producer couldn't reload topic metadata. Error: `%s`", e.message) # Apply retry limit, dropping messages that are over request_tries = dict( |