summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorOmar Ghishan <omar.ghishan@rd.io>2014-01-03 15:29:04 -0800
committerOmar Ghishan <omar.ghishan@rd.io>2014-01-06 15:14:51 -0800
commit8540f1f3b6b07f9ddb28d3ade78679a0ac2d4355 (patch)
tree67ed86ebbb69a0e4278462f82161830e1fde6dca /kafka
parentd1e4fd25c66f9fa7d955694c55e8b51c5da3a565 (diff)
downloadkafka-python-8540f1f3b6b07f9ddb28d3ade78679a0ac2d4355.tar.gz
Fix client error handling
This differentiates between errors that occur when sending the request and receiving the response, and adds BufferUnderflowError handling.
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py22
1 files changed, 17 insertions, 5 deletions
diff --git a/kafka/client.py b/kafka/client.py
index bd3a214..821904c 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -6,8 +6,10 @@ import logging
import socket
import time
-from kafka.common import ErrorMapping, TopicAndPartition
-from kafka.common import ConnectionError, FailedPayloadsException
+from kafka.common import (
+ ErrorMapping, TopicAndPartition, BufferUnderflowError, ConnectionError,
+ FailedPayloadsException
+)
from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol
@@ -165,14 +167,24 @@ class KafkaClient(object):
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
+ failed = False
# Send the request, recv the response
try:
conn.send(requestId, request)
if decoder_fn is None:
continue
- response = conn.recv(requestId)
- except ConnectionError, e: # ignore BufferUnderflow for now
- log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
+ try:
+ response = conn.recv(requestId)
+ except (ConnectionError, BufferUnderflowError), e:
+ log.warning("Could not receive response to request [%s] "
+ "from server %s: %s", request, conn, e)
+ failed = True
+ except ConnectionError, e:
+ log.warning("Could not send request [%s] to server %s: %s",
+ request, conn, e)
+ failed = True
+
+ if failed:
failed_payloads += payloads
self.topics_to_brokers = {} # reset metadata
continue