summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorOmar <omar.ghishan@rd.io>2014-02-26 16:55:28 -0800
committerOmar <omar.ghishan@rd.io>2014-02-26 16:55:28 -0800
commitab89a44ecf9c93b116fcc8516cfc21749df74507 (patch)
tree5e318c4c7541bade1da687d2cf5fd145c594dc2b /kafka/client.py
parente5fdc1c7b22c8ad2aaa66a486871d0ed65977e3d (diff)
parent51910f981843dfa967d24659cdb46117210c832d (diff)
downloadkafka-python-ab89a44ecf9c93b116fcc8516cfc21749df74507.tar.gz
Merge pull request #122 from mrtheb/multihosts
Support for multiple hosts on KafkaClient boostrap (improves on #70)
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py25
1 files changed, 18 insertions, 7 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 155f658..96cc1df 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -10,7 +10,7 @@ from kafka.common import (ErrorMapping, TopicAndPartition,
BrokerResponseError, PartitionUnavailableError,
KafkaUnavailableError, KafkaRequestError)
-from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
+from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
log = logging.getLogger("kafka")
@@ -24,14 +24,15 @@ class KafkaClient(object):
# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
# socket timeout.
- def __init__(self, host, port, client_id=CLIENT_ID,
+ def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
# We need one connection to bootstrap
self.client_id = client_id
self.timeout = timeout
- self.conns = { # (host, port) -> KafkaConnection
- (host, port): KafkaConnection(host, port, timeout=timeout)
- }
+ self.hosts = collect_hosts(hosts)
+
+ # create connections only when we need them
+ self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
@@ -41,6 +42,15 @@ class KafkaClient(object):
# Private API #
##################
+ def _get_conn(self, host, port):
+ "Get or create a connection to a broker using host and port"
+
+ host_key = (host, port)
+ if host_key not in self.conns:
+ self.conns[host_key] = KafkaConnection(host, port)
+
+ return self.conns[host_key]
+
def _get_conn_for_broker(self, broker):
"""
Get or create a connection to a broker
@@ -49,7 +59,7 @@ class KafkaClient(object):
self.conns[(broker.host, broker.port)] = \
KafkaConnection(broker.host, broker.port, timeout=self.timeout)
- return self.conns[(broker.host, broker.port)]
+ return self._get_conn(broker.host, broker.port)
def _get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition)
@@ -72,7 +82,8 @@ class KafkaClient(object):
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
- for conn in self.conns.values():
+ for (host, port) in self.hosts:
+ conn = self._get_conn(host, port)
try:
conn.send(requestId, request)
response = conn.recv(requestId)