diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 10 |
1 files changed, 6 insertions, 4 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index ae9dbb4..5a1d624 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -14,7 +14,7 @@ import six import kafka.common as Errors # TODO: make Errors a separate class from .cluster import ClusterMetadata -from .conn import BrokerConnection, ConnectionStates, collect_hosts +from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi from .future import Future from .protocol.metadata import MetadataRequest from .protocol.produce import ProduceRequest @@ -115,9 +115,9 @@ class KafkaClient(object): self._last_bootstrap = time.time() metadata_request = MetadataRequest([]) - for host, port in hosts: + for host, port, afi in hosts: log.debug("Attempting to bootstrap via node at %s:%s", host, port) - bootstrap = BrokerConnection(host, port, **self.config) + bootstrap = BrokerConnection(host, port, afi, **self.config) bootstrap.connect() while bootstrap.state is ConnectionStates.CONNECTING: bootstrap.connect() @@ -160,7 +160,9 @@ class KafkaClient(object): log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) - self._conns[node_id] = BrokerConnection(broker.host, broker.port, + + host, port, afi = get_ip_port_afi(broker.host) + self._conns[node_id] = BrokerConnection(host, broker.port, afi, **self.config) return self._finish_connect(node_id) |