diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 20 |
1 files changed, 10 insertions, 10 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 7079f01..62b0095 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -133,16 +133,11 @@ class KafkaClient(object): self._delayed_tasks = DelayedTaskQueue() self._last_bootstrap = 0 self._bootstrap_fails = 0 - self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) self._selector.register(self._wake_r, selectors.EVENT_READ) - - def __del__(self): - if hasattr(self, '_wake_r'): - self._wake_r.close() - if hasattr(self, '_wake_w'): - self._wake_w.close() + self._closed = False + self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) def _bootstrap(self, hosts): # Exponential backoff if bootstrap fails @@ -180,6 +175,8 @@ class KafkaClient(object): # in that case, we should keep the bootstrap connection if not len(self.cluster.brokers()): self._conns['bootstrap'] = bootstrap + else: + bootstrap.close() self._bootstrap_fails = 0 break # No bootstrap found... @@ -230,7 +227,7 @@ class KafkaClient(object): self._selector.unregister(conn._sock) except KeyError: pass - if self._refresh_on_disconnects: + if self._refresh_on_disconnects and not self._closed: log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() @@ -272,14 +269,17 @@ class KafkaClient(object): return self._conns[node_id].connected() def close(self, node_id=None): - """Closes the connection to a particular node (if there is one). + """Closes one or all broker connections. Arguments: - node_id (int): the id of the node to close + node_id (int, optional): the id of the node to close """ if node_id is None: + self._closed = True for conn in self._conns.values(): conn.close() + self._wake_r.close() + self._wake_w.close() elif node_id in self._conns: self._conns[node_id].close() else: |