diff options
author | mrtheb <mrlabbe@gmail.com> | 2014-03-17 15:43:00 -0400 |
---|---|---|
committer | mrtheb <mrlabbe@gmail.com> | 2014-03-17 15:43:00 -0400 |
commit | 19646b1f1f9fae38c3075a9a56b8e7a9d395ff8e (patch) | |
tree | 421d3e2d628e8b564eecde6a4efcd4edac31d1ff /kafka/client.py | |
parent | 828133cff064f4f8fba753183ac21619355ac005 (diff) | |
parent | 32edabdaaff6746e4926cc897b4bba917a80cb54 (diff) | |
download | kafka-python-19646b1f1f9fae38c3075a9a56b8e7a9d395ff8e.tar.gz |
Merge branch 'master' into develop
Conflicts:
test/test_unit.py
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 29 |
1 files changed, 20 insertions, 9 deletions
diff --git a/kafka/client.py b/kafka/client.py index c3606e4..ab0eb8d 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -11,7 +11,7 @@ from kafka.common import (ErrorMapping, TopicAndPartition, LeaderUnavailableError, KafkaUnavailableError) -from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS +from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol log = logging.getLogger("kafka") @@ -25,14 +25,15 @@ class KafkaClient(object): # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a # socket timeout. - def __init__(self, host, port, client_id=CLIENT_ID, + def __init__(self, hosts, client_id=CLIENT_ID, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): # We need one connection to bootstrap self.client_id = client_id self.timeout = timeout - self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, timeout=timeout) - } + self.hosts = collect_hosts(hosts) + + # create connections only when we need them + self.conns = {} self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] @@ -42,6 +43,15 @@ class KafkaClient(object): # Private API # ################## + def _get_conn(self, host, port): + "Get or create a connection to a broker using host and port" + + host_key = (host, port) + if host_key not in self.conns: + self.conns[host_key] = KafkaConnection(host, port) + + return self.conns[host_key] + def _get_conn_for_broker(self, broker): """ Get or create a connection to a broker @@ -50,7 +60,7 @@ class KafkaClient(object): self.conns[(broker.host, broker.port)] = \ KafkaConnection(broker.host, broker.port, timeout=self.timeout) - return self.conns[(broker.host, broker.port)] + return self._get_conn(broker.host, broker.port) def _get_leader_for_partition(self, topic, partition): """ @@ -83,14 +93,15 @@ class KafkaClient(object): Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ - for conn in self.conns.values(): + for (host, port) in self.hosts: try: + conn = self._get_conn(host, port) conn.send(requestId, request) response = conn.recv(requestId) return response except Exception, e: - log.warning("Could not send request [%r] to server %s, " - "trying next server: %s" % (request, conn, e)) + log.warning("Could not send request [%r] to server %s:%i, " + "trying next server: %s" % (request, host, port, e)) continue raise KafkaUnavailableError("All servers failed to process request") |