diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 23 |
1 files changed, 15 insertions, 8 deletions
diff --git a/kafka/client.py b/kafka/client.py index 11f54eb..99d6fec 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -16,7 +16,7 @@ from kafka.common import (TopicPartition, BrokerMetadata, UnknownError, from kafka.conn import ( collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS, - ConnectionStates) + ConnectionStates, get_ip_port_afi) from kafka.protocol import KafkaProtocol # New KafkaClient @@ -56,12 +56,12 @@ class SimpleClient(object): # Private API # ################## - def _get_conn(self, host, port): + def _get_conn(self, host, port, afi): """Get or create a connection to a broker using host and port""" host_key = (host, port) if host_key not in self._conns: self._conns[host_key] = BrokerConnection( - host, port, + host, port, afi, request_timeout_ms=self.timeout * 1000, client_id=self.client_id ) @@ -139,13 +139,17 @@ class SimpleClient(object): Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ - hosts = set([(broker.host, broker.port) for broker in self.brokers.values()]) + hosts = set() + for broker in self.brokers.values(): + host, port, afi = get_ip_port_afi(broker.host) + hosts.add((host, broker.port, afi)) + hosts.update(self.hosts) hosts = list(hosts) random.shuffle(hosts) - for (host, port) in hosts: - conn = self._get_conn(host, port) + for (host, port, afi) in hosts: + conn = self._get_conn(host, port, afi) if not conn.connected(): log.warning("Skipping unconnected connection: %s", conn) continue @@ -227,7 +231,9 @@ class SimpleClient(object): failed_payloads(broker_payloads) continue - conn = self._get_conn(broker.host, broker.port) + + host, port, afi = get_ip_port_afi(broker.host) + conn = self._get_conn(host, broker.port, afi) conn.connect() if not conn.connected(): refresh_metadata = True @@ -323,7 +329,8 @@ class SimpleClient(object): # Send the request, recv the response try: - conn = self._get_conn(broker.host, broker.port) + host, port, afi = get_ip_port_afi(broker.host) + conn = self._get_conn(host, broker.port, afi) conn.send(requestId, request) except ConnectionError as e: |