diff options
author | Balthasar Schopman <b.schopman@tech.leaseweb.com> | 2015-10-22 17:12:06 +0200 |
---|---|---|
committer | Balthasar Schopman <b.schopman@tech.leaseweb.com> | 2015-10-22 17:12:06 +0200 |
commit | 64cda819d9e87634b075725321b729aeede8471a (patch) | |
tree | 9767c0a3ba59e4a70fce0901050919bc87d60662 /kafka/client.py | |
parent | e99a934bab1d551d07dd0c6365f6a730028489f3 (diff) | |
download | kafka-python-64cda819d9e87634b075725321b729aeede8471a.tar.gz |
Prevents crashing communication thread of async producer
If an uncaught exception occurs in _send_messages() the thread sending
data to Kafka (asynchronously) will crash and the queue will never be
emptied. To reproduce:
1) Run an Async producer.
2) Kill the Kafka server.
3) Restart the Kafka server.
The communication thread dies shortly after step 2. After step 3 the
communication does not resume without this commit.
The changes in both files prevent an Exception from being thrown through
to do main communication process, which could cause the crash.
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 6 |
1 files changed, 5 insertions, 1 deletions
diff --git a/kafka/client.py b/kafka/client.py index 13777a4..c05e142 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -478,7 +478,11 @@ class KafkaClient(object): else: decoder = KafkaProtocol.decode_produce_response - resps = self._send_broker_aware_request(payloads, encoder, decoder) + try: + resps = self._send_broker_aware_request(payloads, encoder, decoder) + except Exception: + if fail_on_error: + raise return [resp if not callback else callback(resp) for resp in resps if resp is not None and |