diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-03-30 17:21:20 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-03-30 17:21:20 -0700 |
commit | b96f4ccf070109a022deb98b569e61d23e4e75b9 (patch) | |
tree | b899ac53627c6b169ddc6e41072502e3cb30185f /kafka/conn.py | |
parent | c6c862ad29ec5d0ae61d635c2020fb925b405c44 (diff) | |
parent | f456ffc8d95d04b0381dc07cf2ae113043f3c887 (diff) | |
download | kafka-python-b96f4ccf070109a022deb98b569e61d23e4e75b9.tar.gz |
Merge pull request #615 from TimEvens/master
Kafka IPv6 Support.
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 49 |
1 files changed, 43 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 65451f9..0ce469d 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -52,9 +52,10 @@ class BrokerConnection(object): 'api_version': (0, 8, 2), # default to most restrictive } - def __init__(self, host, port, **configs): + def __init__(self, host, port, afi, **configs): self.host = host self.port = port + self.afi = afi self.in_flight_requests = collections.deque() self.config = copy.copy(self.DEFAULT_CONFIG) @@ -76,7 +77,7 @@ class BrokerConnection(object): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED: self.close() - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock = socket.socket(self.afi, socket.SOCK_STREAM) if self.config['receive_buffer_bytes'] is not None: self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.config['receive_buffer_bytes']) @@ -356,6 +357,39 @@ class BrokerConnection(object): return "<BrokerConnection host=%s port=%d>" % (self.host, self.port) +def get_ip_port_afi(host_and_port_str): + """ + Parse the IP and port from a string in the format of: + + * host_or_ip <- Can be either IPv4 or IPv6 address or hostname/fqdn + * host_or_ip:port <- This is only for IPv4 + * [host_or_ip]:port. <- This is only for IPv6 + + .. note:: If the port is not specified, default will be returned. + + :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6 + """ + afi = socket.AF_INET + + if host_and_port_str.strip()[0] == '[': + afi = socket.AF_INET6 + res = host_and_port_str.split("]:") + res[0] = res[0].replace("[", "") + res[0] = res[0].replace("]", "") + + elif host_and_port_str.count(":") > 1: + afi = socket.AF_INET6 + res = [host_and_port_str] + + else: + res = host_and_port_str.split(':') + + host = res[0] + port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT + + return host.strip(), port, afi + + def collect_hosts(hosts, randomize=True): """ Collects a comma-separated set of hosts (host:port) and optionally @@ -366,12 +400,15 @@ def collect_hosts(hosts, randomize=True): hosts = hosts.strip().split(',') result = [] + afi = socket.AF_INET for host_port in hosts: - res = host_port.split(':') - host = res[0] - port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT - result.append((host.strip(), port)) + host, port, afi = get_ip_port_afi(host_port) + + if port < 0: + port = DEFAULT_KAFKA_PORT + + result.append((host, port, afi)) if randomize: shuffle(result) |