diff options
author | Marc Labbe <mrlabbe@gmail.com> | 2013-11-14 09:26:49 -0500 |
---|---|---|
committer | Marc Labbe <mrlabbe@gmail.com> | 2013-11-14 09:26:49 -0500 |
commit | 0bdff4e833f73518a7219fca04dfbc3ed201b06e (patch) | |
tree | fe7b43720946aa9d86c92f4a8a5a9c9f4244a683 /kafka/conn.py | |
parent | af3a57edb2c83c35b832e759b4c24ec72149841a (diff) | |
download | kafka-python-0bdff4e833f73518a7219fca04dfbc3ed201b06e.tar.gz |
Allow KafkaClient to take in a list of brokers for bootstrapping
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 33 |
1 files changed, 26 insertions, 7 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 14aebc6..614b1bb 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -2,6 +2,7 @@ import copy import logging import socket import struct +from random import shuffle from threading import local from kafka.common import BufferUnderflowError @@ -10,6 +11,26 @@ from kafka.common import ConnectionError log = logging.getLogger("kafka") +def collect_hosts(hosts, randomize=True): + """ + Collects a comma-separated set of hosts (host:port) and optionnaly + randomize the returned list. + """ + + result = [] + for host_port in hosts.split(","): + + res = host_port.split(':') + host = res[0] + port = int(res[1]) if len(res) > 1 else 9092 + result.append((host.strip(), port)) + + if randomize: + shuffle(result) + + return result + + class KafkaConnection(local): """ A socket connection to a single Kafka broker @@ -19,14 +40,14 @@ class KafkaConnection(local): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ - def __init__(self, host, port, bufsize=4096): + def __init__(self, host, port, bufsize=4096, timeout=10): super(KafkaConnection, self).__init__() self.host = host self.port = port self.bufsize = bufsize - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((host, port)) - self._sock.settimeout(10) + self.timeout = timeout + + self._sock = socket.create_connection((host, port), timeout=timeout) self._dirty = False def __str__(self): @@ -125,7 +146,5 @@ class KafkaConnection(local): Re-initialize the socket connection """ self.close() - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((self.host, self.port)) - self._sock.settimeout(10) + self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout) self._dirty = False |