From 0f78d57c604e864fab51f7cfb8fa69c9c4e623c7 Mon Sep 17 00:00:00 2001 From: Tim Evens Date: Wed, 30 Mar 2016 15:32:05 -0700 Subject: Kafka IPv6 Support. IPv6 address without port can be defined as the IPv6 address. If the address is a hostname or if a port is included, then the address MUST be wrapped in brackets [] (E.g. [somehost]:1234 or [fd00:1001::2]:1234). --- kafka/client_async.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index ae9dbb4..5a1d624 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -14,7 +14,7 @@ import six import kafka.common as Errors # TODO: make Errors a separate class from .cluster import ClusterMetadata -from .conn import BrokerConnection, ConnectionStates, collect_hosts +from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi from .future import Future from .protocol.metadata import MetadataRequest from .protocol.produce import ProduceRequest @@ -115,9 +115,9 @@ class KafkaClient(object): self._last_bootstrap = time.time() metadata_request = MetadataRequest([]) - for host, port in hosts: + for host, port, afi in hosts: log.debug("Attempting to bootstrap via node at %s:%s", host, port) - bootstrap = BrokerConnection(host, port, **self.config) + bootstrap = BrokerConnection(host, port, afi, **self.config) bootstrap.connect() while bootstrap.state is ConnectionStates.CONNECTING: bootstrap.connect() @@ -160,7 +160,9 @@ class KafkaClient(object): log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) - self._conns[node_id] = BrokerConnection(broker.host, broker.port, + + host, port, afi = get_ip_port_afi(broker.host) + self._conns[node_id] = BrokerConnection(host, broker.port, afi, **self.config) return self._finish_connect(node_id) -- cgit v1.2.1