summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py23
1 files changed, 15 insertions, 8 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 11f54eb..99d6fec 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -16,7 +16,7 @@ from kafka.common import (TopicPartition, BrokerMetadata, UnknownError,
from kafka.conn import (
collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS,
- ConnectionStates)
+ ConnectionStates, get_ip_port_afi)
from kafka.protocol import KafkaProtocol
# New KafkaClient
@@ -56,12 +56,12 @@ class SimpleClient(object):
# Private API #
##################
- def _get_conn(self, host, port):
+ def _get_conn(self, host, port, afi):
"""Get or create a connection to a broker using host and port"""
host_key = (host, port)
if host_key not in self._conns:
self._conns[host_key] = BrokerConnection(
- host, port,
+ host, port, afi,
request_timeout_ms=self.timeout * 1000,
client_id=self.client_id
)
@@ -139,13 +139,17 @@ class SimpleClient(object):
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
- hosts = set([(broker.host, broker.port) for broker in self.brokers.values()])
+ hosts = set()
+ for broker in self.brokers.values():
+ host, port, afi = get_ip_port_afi(broker.host)
+ hosts.add((host, broker.port, afi))
+
hosts.update(self.hosts)
hosts = list(hosts)
random.shuffle(hosts)
- for (host, port) in hosts:
- conn = self._get_conn(host, port)
+ for (host, port, afi) in hosts:
+ conn = self._get_conn(host, port, afi)
if not conn.connected():
log.warning("Skipping unconnected connection: %s", conn)
continue
@@ -227,7 +231,9 @@ class SimpleClient(object):
failed_payloads(broker_payloads)
continue
- conn = self._get_conn(broker.host, broker.port)
+
+ host, port, afi = get_ip_port_afi(broker.host)
+ conn = self._get_conn(host, broker.port, afi)
conn.connect()
if not conn.connected():
refresh_metadata = True
@@ -323,7 +329,8 @@ class SimpleClient(object):
# Send the request, recv the response
try:
- conn = self._get_conn(broker.host, broker.port)
+ host, port, afi = get_ip_port_afi(broker.host)
+ conn = self._get_conn(host, broker.port, afi)
conn.send(requestId, request)
except ConnectionError as e: