diff options
author | Marc Labbe <mrlabbe@gmail.com> | 2013-11-14 09:26:49 -0500 |
---|---|---|
committer | Marc Labbe <mrlabbe@gmail.com> | 2013-11-14 09:26:49 -0500 |
commit | 0bdff4e833f73518a7219fca04dfbc3ed201b06e (patch) | |
tree | fe7b43720946aa9d86c92f4a8a5a9c9f4244a683 /kafka | |
parent | af3a57edb2c83c35b832e759b4c24ec72149841a (diff) | |
download | kafka-python-0bdff4e833f73518a7219fca04dfbc3ed201b06e.tar.gz |
Allow KafkaClient to take in a list of brokers for bootstrapping
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/NOTES.md | 2 | ||||
-rw-r--r-- | kafka/client.py | 35 | ||||
-rw-r--r-- | kafka/conn.py | 33 |
3 files changed, 48 insertions, 22 deletions
diff --git a/kafka/NOTES.md b/kafka/NOTES.md index 540cdad..8fb0f47 100644 --- a/kafka/NOTES.md +++ b/kafka/NOTES.md @@ -18,7 +18,7 @@ There are a few levels of abstraction: # Possible API - client = KafkaClient("localhost", 9092) + client = KafkaClient("localhost:9092") producer = KafkaProducer(client, "topic") producer.send_string("hello") diff --git a/kafka/client.py b/kafka/client.py index 71ededa..81eec7d 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -8,7 +8,7 @@ import time from kafka.common import ErrorMapping, TopicAndPartition from kafka.common import ConnectionError, FailedPayloadsException -from kafka.conn import KafkaConnection +from kafka.conn import collect_hosts, KafkaConnection from kafka.protocol import KafkaProtocol log = logging.getLogger("kafka") @@ -19,13 +19,15 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID): + def __init__(self, hosts, bufsize=4096, client_id=CLIENT_ID): # We need one connection to bootstrap self.bufsize = bufsize self.client_id = client_id - self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, bufsize) - } + + self.hosts = collect_hosts(hosts) + + # create connections only when we need them + self.conns = {} self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] @@ -35,15 +37,19 @@ class KafkaClient(object): # Private API # ################## + def _get_conn(self, host, port): + "Get or create a connection to a broker using host and port" + + host_key = (host, port) + if host_key not in self.conns: + self.conns[host_key] = KafkaConnection(host, port, self.bufsize) + + return self.conns[host_key] + def _get_conn_for_broker(self, broker): - """ - Get or create a connection to a broker - """ - if (broker.host, broker.port) not in self.conns: - self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, self.bufsize) + "Get or create a connection to a broker" - return self.conns[(broker.host, broker.port)] + return self._get_conn(broker.host, broker.port) def _get_leader_for_partition(self, topic, partition): key = TopicAndPartition(topic, partition) @@ -108,7 +114,8 @@ class KafkaClient(object): Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ - for conn in self.conns.values(): + for (host, port) in self.hosts: + conn = self._get_conn(host, port) try: conn.send(requestId, request) response = conn.recv(requestId) @@ -174,7 +181,7 @@ class KafkaClient(object): except ConnectionError, e: # ignore BufferUnderflow for now log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) failed_payloads += payloads - self.topics_to_brokers = {} # reset metadata + self.topics_to_brokers = {} # reset metadata continue for response in decoder_fn(response): diff --git a/kafka/conn.py b/kafka/conn.py index 14aebc6..614b1bb 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -2,6 +2,7 @@ import copy import logging import socket import struct +from random import shuffle from threading import local from kafka.common import BufferUnderflowError @@ -10,6 +11,26 @@ from kafka.common import ConnectionError log = logging.getLogger("kafka") +def collect_hosts(hosts, randomize=True): + """ + Collects a comma-separated set of hosts (host:port) and optionnaly + randomize the returned list. + """ + + result = [] + for host_port in hosts.split(","): + + res = host_port.split(':') + host = res[0] + port = int(res[1]) if len(res) > 1 else 9092 + result.append((host.strip(), port)) + + if randomize: + shuffle(result) + + return result + + class KafkaConnection(local): """ A socket connection to a single Kafka broker @@ -19,14 +40,14 @@ class KafkaConnection(local): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ - def __init__(self, host, port, bufsize=4096): + def __init__(self, host, port, bufsize=4096, timeout=10): super(KafkaConnection, self).__init__() self.host = host self.port = port self.bufsize = bufsize - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((host, port)) - self._sock.settimeout(10) + self.timeout = timeout + + self._sock = socket.create_connection((host, port), timeout=timeout) self._dirty = False def __str__(self): @@ -125,7 +146,5 @@ class KafkaConnection(local): Re-initialize the socket connection """ self.close() - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((self.host, self.port)) - self._sock.settimeout(10) + self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout) self._dirty = False |