diff options
-rw-r--r-- | kafka/client.py | 7 | ||||
-rw-r--r-- | kafka/conn.py | 4 |
2 files changed, 6 insertions, 5 deletions
diff --git a/kafka/client.py b/kafka/client.py index 71ededa..9659364 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -19,12 +19,13 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID): + def __init__(self, host, port, bufsize=4098, client_id=CLIENT_ID, timeout=10): # We need one connection to bootstrap self.bufsize = bufsize self.client_id = client_id + self.timeout = timeout self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, bufsize) + (host, port): KafkaConnection(host, port, bufsize, timeout=timeout) } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id @@ -41,7 +42,7 @@ class KafkaClient(object): """ if (broker.host, broker.port) not in self.conns: self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, self.bufsize) + KafkaConnection(broker.host, broker.port, self.bufsize, timeout=self.timeout) return self.conns[(broker.host, broker.port)] diff --git a/kafka/conn.py b/kafka/conn.py index 1a3e260..6dd61cc 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -19,14 +19,14 @@ class KafkaConnection(local): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ - def __init__(self, host, port, bufsize=4096): + def __init__(self, host, port, bufsize=4098, timeout=10): super(KafkaConnection, self).__init__() self.host = host self.port = port self.bufsize = bufsize self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) - self._sock.settimeout(10) + self._sock.settimeout(timeout) self._dirty = False def __str__(self): |