summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py49
1 files changed, 43 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 65451f9..0ce469d 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -52,9 +52,10 @@ class BrokerConnection(object):
'api_version': (0, 8, 2), # default to most restrictive
}
- def __init__(self, host, port, **configs):
+ def __init__(self, host, port, afi, **configs):
self.host = host
self.port = port
+ self.afi = afi
self.in_flight_requests = collections.deque()
self.config = copy.copy(self.DEFAULT_CONFIG)
@@ -76,7 +77,7 @@ class BrokerConnection(object):
"""Attempt to connect and return ConnectionState"""
if self.state is ConnectionStates.DISCONNECTED:
self.close()
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
if self.config['receive_buffer_bytes'] is not None:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
self.config['receive_buffer_bytes'])
@@ -356,6 +357,39 @@ class BrokerConnection(object):
return "<BrokerConnection host=%s port=%d>" % (self.host, self.port)
+def get_ip_port_afi(host_and_port_str):
+ """
+ Parse the IP and port from a string in the format of:
+
+ * host_or_ip <- Can be either IPv4 or IPv6 address or hostname/fqdn
+ * host_or_ip:port <- This is only for IPv4
+ * [host_or_ip]:port. <- This is only for IPv6
+
+ .. note:: If the port is not specified, default will be returned.
+
+ :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6
+ """
+ afi = socket.AF_INET
+
+ if host_and_port_str.strip()[0] == '[':
+ afi = socket.AF_INET6
+ res = host_and_port_str.split("]:")
+ res[0] = res[0].replace("[", "")
+ res[0] = res[0].replace("]", "")
+
+ elif host_and_port_str.count(":") > 1:
+ afi = socket.AF_INET6
+ res = [host_and_port_str]
+
+ else:
+ res = host_and_port_str.split(':')
+
+ host = res[0]
+ port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
+
+ return host.strip(), port, afi
+
+
def collect_hosts(hosts, randomize=True):
"""
Collects a comma-separated set of hosts (host:port) and optionally
@@ -366,12 +400,15 @@ def collect_hosts(hosts, randomize=True):
hosts = hosts.strip().split(',')
result = []
+ afi = socket.AF_INET
for host_port in hosts:
- res = host_port.split(':')
- host = res[0]
- port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
- result.append((host.strip(), port))
+ host, port, afi = get_ip_port_afi(host_port)
+
+ if port < 0:
+ port = DEFAULT_KAFKA_PORT
+
+ result.append((host, port, afi))
if randomize:
shuffle(result)