summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py31
1 files changed, 21 insertions, 10 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 71ededa..33c6d77 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -3,11 +3,12 @@ from collections import defaultdict
from functools import partial
from itertools import count
import logging
-import socket
import time
-from kafka.common import ErrorMapping, TopicAndPartition
-from kafka.common import ConnectionError, FailedPayloadsException
+from kafka.common import (
+ ErrorMapping, TopicAndPartition, ConnectionError,
+ FailedPayloadsException
+)
from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol
@@ -19,12 +20,12 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python"
ID_GEN = count()
- def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
+ def __init__(self, host, port, client_id=CLIENT_ID, timeout=10):
# We need one connection to bootstrap
- self.bufsize = bufsize
self.client_id = client_id
+ self.timeout = timeout
self.conns = { # (host, port) -> KafkaConnection
- (host, port): KafkaConnection(host, port, bufsize)
+ (host, port): KafkaConnection(host, port, timeout=timeout)
}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
@@ -41,7 +42,7 @@ class KafkaClient(object):
"""
if (broker.host, broker.port) not in self.conns:
self.conns[(broker.host, broker.port)] = \
- KafkaConnection(broker.host, broker.port, self.bufsize)
+ KafkaConnection(broker.host, broker.port, timeout=self.timeout)
return self.conns[(broker.host, broker.port)]
@@ -165,14 +166,24 @@ class KafkaClient(object):
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
+ failed = False
# Send the request, recv the response
try:
conn.send(requestId, request)
if decoder_fn is None:
continue
- response = conn.recv(requestId)
- except ConnectionError, e: # ignore BufferUnderflow for now
- log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
+ try:
+ response = conn.recv(requestId)
+ except ConnectionError, e:
+ log.warning("Could not receive response to request [%s] "
+ "from server %s: %s", request, conn, e)
+ failed = True
+ except ConnectionError, e:
+ log.warning("Could not send request [%s] to server %s: %s",
+ request, conn, e)
+ failed = True
+
+ if failed:
failed_payloads += payloads
self.topics_to_brokers = {} # reset metadata
continue