summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorTim Evens <tievens@cisco.com>2016-03-30 15:32:05 -0700
committerTim Evens <tievens@cisco.com>2016-03-30 15:32:25 -0700
commit0f78d57c604e864fab51f7cfb8fa69c9c4e623c7 (patch)
tree649c0953a56f39719761751901da5da11b9589f0 /kafka/client_async.py
parentc6c862ad29ec5d0ae61d635c2020fb925b405c44 (diff)
downloadkafka-python-0f78d57c604e864fab51f7cfb8fa69c9c4e623c7.tar.gz
Kafka IPv6 Support.
IPv6 address without port can be defined as the IPv6 address. If the address is a hostname or if a port is included, then the address MUST be wrapped in brackets [] (E.g. [somehost]:1234 or [fd00:1001::2]:1234).
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py10
1 files changed, 6 insertions, 4 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index ae9dbb4..5a1d624 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -14,7 +14,7 @@ import six
import kafka.common as Errors # TODO: make Errors a separate class
from .cluster import ClusterMetadata
-from .conn import BrokerConnection, ConnectionStates, collect_hosts
+from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
from .future import Future
from .protocol.metadata import MetadataRequest
from .protocol.produce import ProduceRequest
@@ -115,9 +115,9 @@ class KafkaClient(object):
self._last_bootstrap = time.time()
metadata_request = MetadataRequest([])
- for host, port in hosts:
+ for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
- bootstrap = BrokerConnection(host, port, **self.config)
+ bootstrap = BrokerConnection(host, port, afi, **self.config)
bootstrap.connect()
while bootstrap.state is ConnectionStates.CONNECTING:
bootstrap.connect()
@@ -160,7 +160,9 @@ class KafkaClient(object):
log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
- self._conns[node_id] = BrokerConnection(broker.host, broker.port,
+
+ host, port, afi = get_ip_port_afi(broker.host)
+ self._conns[node_id] = BrokerConnection(host, broker.port, afi,
**self.config)
return self._finish_connect(node_id)