summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorMarc Labbe <mrlabbe@gmail.com>2013-11-14 09:26:49 -0500
committerMarc Labbe <mrlabbe@gmail.com>2013-11-14 09:26:49 -0500
commit0bdff4e833f73518a7219fca04dfbc3ed201b06e (patch)
treefe7b43720946aa9d86c92f4a8a5a9c9f4244a683 /kafka
parentaf3a57edb2c83c35b832e759b4c24ec72149841a (diff)
downloadkafka-python-0bdff4e833f73518a7219fca04dfbc3ed201b06e.tar.gz
Allow KafkaClient to take in a list of brokers for bootstrapping
Diffstat (limited to 'kafka')
-rw-r--r--kafka/NOTES.md2
-rw-r--r--kafka/client.py35
-rw-r--r--kafka/conn.py33
3 files changed, 48 insertions, 22 deletions
diff --git a/kafka/NOTES.md b/kafka/NOTES.md
index 540cdad..8fb0f47 100644
--- a/kafka/NOTES.md
+++ b/kafka/NOTES.md
@@ -18,7 +18,7 @@ There are a few levels of abstraction:
# Possible API
- client = KafkaClient("localhost", 9092)
+ client = KafkaClient("localhost:9092")
producer = KafkaProducer(client, "topic")
producer.send_string("hello")
diff --git a/kafka/client.py b/kafka/client.py
index 71ededa..81eec7d 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -8,7 +8,7 @@ import time
from kafka.common import ErrorMapping, TopicAndPartition
from kafka.common import ConnectionError, FailedPayloadsException
-from kafka.conn import KafkaConnection
+from kafka.conn import collect_hosts, KafkaConnection
from kafka.protocol import KafkaProtocol
log = logging.getLogger("kafka")
@@ -19,13 +19,15 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python"
ID_GEN = count()
- def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
+ def __init__(self, hosts, bufsize=4096, client_id=CLIENT_ID):
# We need one connection to bootstrap
self.bufsize = bufsize
self.client_id = client_id
- self.conns = { # (host, port) -> KafkaConnection
- (host, port): KafkaConnection(host, port, bufsize)
- }
+
+ self.hosts = collect_hosts(hosts)
+
+ # create connections only when we need them
+ self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...]
@@ -35,15 +37,19 @@ class KafkaClient(object):
# Private API #
##################
+ def _get_conn(self, host, port):
+ "Get or create a connection to a broker using host and port"
+
+ host_key = (host, port)
+ if host_key not in self.conns:
+ self.conns[host_key] = KafkaConnection(host, port, self.bufsize)
+
+ return self.conns[host_key]
+
def _get_conn_for_broker(self, broker):
- """
- Get or create a connection to a broker
- """
- if (broker.host, broker.port) not in self.conns:
- self.conns[(broker.host, broker.port)] = \
- KafkaConnection(broker.host, broker.port, self.bufsize)
+ "Get or create a connection to a broker"
- return self.conns[(broker.host, broker.port)]
+ return self._get_conn(broker.host, broker.port)
def _get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition)
@@ -108,7 +114,8 @@ class KafkaClient(object):
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
- for conn in self.conns.values():
+ for (host, port) in self.hosts:
+ conn = self._get_conn(host, port)
try:
conn.send(requestId, request)
response = conn.recv(requestId)
@@ -174,7 +181,7 @@ class KafkaClient(object):
except ConnectionError, e: # ignore BufferUnderflow for now
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
failed_payloads += payloads
- self.topics_to_brokers = {} # reset metadata
+ self.topics_to_brokers = {} # reset metadata
continue
for response in decoder_fn(response):
diff --git a/kafka/conn.py b/kafka/conn.py
index 14aebc6..614b1bb 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -2,6 +2,7 @@ import copy
import logging
import socket
import struct
+from random import shuffle
from threading import local
from kafka.common import BufferUnderflowError
@@ -10,6 +11,26 @@ from kafka.common import ConnectionError
log = logging.getLogger("kafka")
+def collect_hosts(hosts, randomize=True):
+ """
+ Collects a comma-separated set of hosts (host:port) and optionnaly
+ randomize the returned list.
+ """
+
+ result = []
+ for host_port in hosts.split(","):
+
+ res = host_port.split(':')
+ host = res[0]
+ port = int(res[1]) if len(res) > 1 else 9092
+ result.append((host.strip(), port))
+
+ if randomize:
+ shuffle(result)
+
+ return result
+
+
class KafkaConnection(local):
"""
A socket connection to a single Kafka broker
@@ -19,14 +40,14 @@ class KafkaConnection(local):
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
- def __init__(self, host, port, bufsize=4096):
+ def __init__(self, host, port, bufsize=4096, timeout=10):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
self.bufsize = bufsize
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._sock.connect((host, port))
- self._sock.settimeout(10)
+ self.timeout = timeout
+
+ self._sock = socket.create_connection((host, port), timeout=timeout)
self._dirty = False
def __str__(self):
@@ -125,7 +146,5 @@ class KafkaConnection(local):
Re-initialize the socket connection
"""
self.close()
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._sock.connect((self.host, self.port))
- self._sock.settimeout(10)
+ self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
self._dirty = False