diff options
author | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-03 15:29:04 -0800 |
---|---|---|
committer | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-06 15:14:51 -0800 |
commit | 8540f1f3b6b07f9ddb28d3ade78679a0ac2d4355 (patch) | |
tree | 67ed86ebbb69a0e4278462f82161830e1fde6dca /kafka | |
parent | d1e4fd25c66f9fa7d955694c55e8b51c5da3a565 (diff) | |
download | kafka-python-8540f1f3b6b07f9ddb28d3ade78679a0ac2d4355.tar.gz |
Fix client error handling
This differentiates between errors that occur when sending the request
and receiving the response, and adds BufferUnderflowError handling.
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 22 |
1 files changed, 17 insertions, 5 deletions
diff --git a/kafka/client.py b/kafka/client.py index bd3a214..821904c 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -6,8 +6,10 @@ import logging import socket import time -from kafka.common import ErrorMapping, TopicAndPartition -from kafka.common import ConnectionError, FailedPayloadsException +from kafka.common import ( + ErrorMapping, TopicAndPartition, BufferUnderflowError, ConnectionError, + FailedPayloadsException +) from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -165,14 +167,24 @@ class KafkaClient(object): request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) + failed = False # Send the request, recv the response try: conn.send(requestId, request) if decoder_fn is None: continue - response = conn.recv(requestId) - except ConnectionError, e: # ignore BufferUnderflow for now - log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) + try: + response = conn.recv(requestId) + except (ConnectionError, BufferUnderflowError), e: + log.warning("Could not receive response to request [%s] " + "from server %s: %s", request, conn, e) + failed = True + except ConnectionError, e: + log.warning("Could not send request [%s] to server %s: %s", + request, conn, e) + failed = True + + if failed: failed_payloads += payloads self.topics_to_brokers = {} # reset metadata continue |