diff options
| author | Dana Powers <dana.powers@gmail.com> | 2017-12-06 14:40:44 -0800 |
|---|---|---|
| committer | Dana Powers <dana.powers@gmail.com> | 2017-12-06 14:40:44 -0800 |
| commit | f747738009abb1f03a804be4be952d84494ad39d (patch) | |
| tree | 72dd30b83185b067414babf39d646eafd3390592 /kafka | |
| parent | 01c34c5194c37c96fc2b74751265cba550161a57 (diff) | |
| download | kafka-python-f747738009abb1f03a804be4be952d84494ad39d.tar.gz | |
Refactor dns lookup into separate functions
Diffstat (limited to 'kafka')
| -rw-r--r-- | kafka/conn.py | 104 |
1 files changed, 49 insertions, 55 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index f899644..7a2abfe 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -203,6 +203,7 @@ class BrokerConnection(object): self.afi = afi self._init_host = host self._init_port = port + self._init_afi = afi self.in_flight_requests = collections.deque() self._api_versions = None @@ -250,59 +251,35 @@ class BrokerConnection(object): self._sasl_auth_future = None self.last_attempt = 0 self._gai = None - self._gai_index = 0 self._sensors = None if self.config['metrics']: self._sensors = BrokerConnectionMetrics(self.config['metrics'], self.config['metric_group_prefix'], self.node_id) + def _next_afi_host_port(self): + if not self._gai: + self._gai = dns_lookup(self._init_host, self._init_port, self._init_afi) + if not self._gai: + log.error('DNS lookup failed for {0}:{1} ({2})' + .format(self._init_host, self._init_port, self._init_afi)) + return + + afi, _, __, ___, sockaddr = self._gai.pop(0) + host, port = sockaddr[:2] + return (afi, host, port) + def connect(self): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED: - log.debug('%s: creating new socket', self) - # if self.afi is set to AF_UNSPEC, then we need to do a name - # resolution and try all available address families - if self.afi == socket.AF_UNSPEC: - if self._gai is None: - # XXX: all DNS functions in Python are blocking. If we really - # want to be non-blocking here, we need to use a 3rd-party - # library like python-adns, or move resolution onto its - # own thread. This will be subject to the default libc - # name resolution timeout (5s on most Linux boxes) - try: - self._gai = socket.getaddrinfo(self._init_host, - self._init_port, - socket.AF_UNSPEC, - socket.SOCK_STREAM) - except socket.gaierror as ex: - log.warning('DNS lookup failed for %s:%d,' - ' exception was %s. Is your' - ' advertised.listeners (called' - ' advertised.host.name before Kafka 9)' - ' correct and resolvable?', - self._init_host, self._init_port, ex) - self._gai = [] - self._gai_index = 0 - else: - # if self._gai already exists, then we should try the next - # name - self._gai_index += 1 - while True: - if self._gai_index >= len(self._gai): - log.error('Unable to connect to any of the names for {0}:{1}' - .format(self._init_host, self._init_port)) - self._gai = None - self._gai_index = 0 - return - afi, _, __, ___, sockaddr = self._gai[self._gai_index] - if afi not in (socket.AF_INET, socket.AF_INET6): - self._gai_index += 1 - continue - break - self.host, self.port = sockaddr[:2] - self._sock = socket.socket(afi, socket.SOCK_STREAM) + self.last_attempt = time.time() + next_lookup = self._next_afi_host_port() + if not next_lookup: + self.close(Errors.ConnectionError('DNS failure')) + return else: + log.debug('%s: creating new socket', self) + self.afi, self.host, self.port = next_lookup self._sock = socket.socket(self.afi, socket.SOCK_STREAM) for option in self.config['socket_options']: @@ -310,7 +287,6 @@ class BrokerConnection(object): self._sock.setsockopt(*option) self._sock.setblocking(False) - self.last_attempt = time.time() self.state = ConnectionStates.CONNECTING if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): self._wrap_ssl() @@ -643,23 +619,15 @@ class BrokerConnection(object): will be failed with this exception. Default: kafka.errors.ConnectionError. """ - if self.state is ConnectionStates.DISCONNECTED: - if error is not None: - if sys.version_info >= (3, 2): - log.warning('%s: close() called on disconnected connection with error: %s', self, error, stack_info=True) - else: - log.warning('%s: close() called on disconnected connection with error: %s', self, error) - return - log.info('%s: Closing connection. %s', self, error or '') - self.state = ConnectionStates.DISCONNECTING - self.config['state_change_callback'](self) + if self.state is not ConnectionStates.DISCONNECTED: + self.state = ConnectionStates.DISCONNECTING + self.config['state_change_callback'](self) self._update_reconnect_backoff() if self._sock: self._sock.close() self._sock = None self.state = ConnectionStates.DISCONNECTED - self.last_attempt = time.time() self._sasl_auth_future = None self._protocol = KafkaProtocol( client_id=self.config['client_id'], @@ -1169,3 +1137,29 @@ def collect_hosts(hosts, randomize=True): shuffle(result) return result + + +def is_inet_4_or_6(gai): + """Given a getaddrinfo struct, return True iff ipv4 or ipv6""" + return gai[0] in (socket.AF_INET, socket.AF_INET6) + + +def dns_lookup(host, port, afi=socket.AF_UNSPEC): + """Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)""" + # XXX: all DNS functions in Python are blocking. If we really + # want to be non-blocking here, we need to use a 3rd-party + # library like python-adns, or move resolution onto its + # own thread. This will be subject to the default libc + # name resolution timeout (5s on most Linux boxes) + try: + return list(filter(is_inet_4_or_6, + socket.getaddrinfo(host, port, afi, + socket.SOCK_STREAM))) + except socket.gaierror as ex: + log.warning('DNS lookup failed for %s:%d,' + ' exception was %s. Is your' + ' advertised.listeners (called' + ' advertised.host.name before Kafka 9)' + ' correct and resolvable?', + host, port, ex) + return [] |
