summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorOmar Ghishan <omar.ghishan@rd.io>2013-12-18 16:53:03 -0800
committerOmar Ghishan <omar.ghishan@rd.io>2014-01-06 15:14:08 -0800
commit60ccb4dd025ec3e3da6feb77f9797aa1da723bfa (patch)
tree39c6edbd85397aab6163a88a0c629516836fe817 /kafka
parent8f076df94c0d06d67d4ab39c991d8f83995431b5 (diff)
downloadkafka-python-60ccb4dd025ec3e3da6feb77f9797aa1da723bfa.tar.gz
Allow customizing socket timeouts.
Previously, if you try to consume a message with a timeout greater than 10 seconds, but you don't receive data in those 10 seconds, a socket.timeout exception is raised. This allows a higher socket timeout to be set, or even None for no timeout.
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py7
-rw-r--r--kafka/conn.py4
2 files changed, 6 insertions, 5 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 71ededa..9659364 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -19,12 +19,13 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python"
ID_GEN = count()
- def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
+ def __init__(self, host, port, bufsize=4098, client_id=CLIENT_ID, timeout=10):
# We need one connection to bootstrap
self.bufsize = bufsize
self.client_id = client_id
+ self.timeout = timeout
self.conns = { # (host, port) -> KafkaConnection
- (host, port): KafkaConnection(host, port, bufsize)
+ (host, port): KafkaConnection(host, port, bufsize, timeout=timeout)
}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
@@ -41,7 +42,7 @@ class KafkaClient(object):
"""
if (broker.host, broker.port) not in self.conns:
self.conns[(broker.host, broker.port)] = \
- KafkaConnection(broker.host, broker.port, self.bufsize)
+ KafkaConnection(broker.host, broker.port, self.bufsize, timeout=self.timeout)
return self.conns[(broker.host, broker.port)]
diff --git a/kafka/conn.py b/kafka/conn.py
index 1a3e260..6dd61cc 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -19,14 +19,14 @@ class KafkaConnection(local):
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
- def __init__(self, host, port, bufsize=4096):
+ def __init__(self, host, port, bufsize=4098, timeout=10):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
self.bufsize = bufsize
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
- self._sock.settimeout(10)
+ self._sock.settimeout(timeout)
self._dirty = False
def __str__(self):