summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py65
-rw-r--r--kafka/selectors34.py635
-rw-r--r--test/test_client_async.py15
3 files changed, 682 insertions, 33 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 0c22f90..36e808c 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -6,7 +6,14 @@ import heapq
import itertools
import logging
import random
-import select
+
+# selectors in stdlib as of py3.4
+try:
+ import selectors # pylint: disable=import-error
+except ImportError:
+ # vendored backport module
+ from . import selectors34 as selectors
+
import socket
import time
@@ -92,6 +99,7 @@ class KafkaClient(object):
self.cluster = ClusterMetadata(**self.config)
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
+ self._selector = selectors.DefaultSelector()
self._conns = {}
self._connecting = set()
self._refresh_on_disconnects = True
@@ -101,6 +109,7 @@ class KafkaClient(object):
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
self._wake_r, self._wake_w = socket.socketpair()
self._wake_r.setblocking(False)
+ self._selector.register(self._wake_r, selectors.EVENT_READ)
def __del__(self):
self._wake_r.close()
@@ -160,11 +169,19 @@ class KafkaClient(object):
def _conn_state_change(self, node_id, conn):
if conn.connecting():
self._connecting.add(node_id)
+ self._selector.register(conn._sock, selectors.EVENT_WRITE)
elif conn.connected():
log.debug("Node %s connected", node_id)
if node_id in self._connecting:
self._connecting.remove(node_id)
+
+ try:
+ self._selector.unregister(conn._sock)
+ except KeyError:
+ pass
+ self._selector.register(conn._sock, selectors.EVENT_READ, conn)
+
if 'bootstrap' in self._conns and node_id != 'bootstrap':
bootstrap = self._conns.pop('bootstrap')
# XXX: make conn.close() require error to cause refresh
@@ -176,6 +193,10 @@ class KafkaClient(object):
elif conn.state is ConnectionStates.DISCONNECTING:
if node_id in self._connecting:
self._connecting.remove(node_id)
+ try:
+ self._selector.unregister(conn._sock)
+ except KeyError:
+ pass
if self._refresh_on_disconnects:
log.warning("Node %s connect failed -- refreshing metadata", node_id)
self.cluster.request_update()
@@ -388,45 +409,25 @@ class KafkaClient(object):
return responses
- def _poll(self, timeout, sleep=False):
+ def _poll(self, timeout, sleep=True):
# select on reads across all connected sockets, blocking up to timeout
- sockets = dict([(conn._sock, conn)
- for conn in six.itervalues(self._conns)
- if conn.state is ConnectionStates.CONNECTED
- and conn.in_flight_requests])
- if not sockets:
- # if sockets are connecting, we can wake when they are writeable
- if self._connecting:
- sockets = [self._conns[node]._sock for node in self._connecting]
- select.select([self._wake_r], sockets, [], timeout)
- elif timeout:
- if sleep:
- log.debug('Sleeping at %s for %s', time.time(), timeout)
- select.select([self._wake_r], [], [], timeout)
- log.debug('Woke up at %s', time.time())
- else:
- log.warning('_poll called with a non-zero timeout and'
- ' sleep=False -- but there was nothing to do.'
- ' This can cause high CPU usage during idle.')
- self._clear_wake_fd()
- return []
-
- # Add a private pipe fd to allow external wakeups
- fds = list(sockets.keys())
- fds.append(self._wake_r)
- ready, _, _ = select.select(fds, [], [], timeout)
-
+ assert self.in_flight_request_count() > 0 or self._connecting or sleep
responses = []
- for sock in ready:
- if sock == self._wake_r:
+ for key, events in self._selector.select(timeout):
+ if key.fileobj is self._wake_r:
+ self._clear_wake_fd()
+ continue
+ elif not (events & selectors.EVENT_READ):
continue
- conn = sockets[sock]
+ conn = key.data
while conn.in_flight_requests:
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
+
+ # Incomplete responses are buffered internally
+ # while conn.in_flight_requests retains the request
if not response:
break
responses.append(response)
- self._clear_wake_fd()
return responses
def in_flight_request_count(self, node_id=None):
diff --git a/kafka/selectors34.py b/kafka/selectors34.py
new file mode 100644
index 0000000..541c29c
--- /dev/null
+++ b/kafka/selectors34.py
@@ -0,0 +1,635 @@
+# pylint: skip-file
+# vendored from https://github.com/berkerpeksag/selectors34
+# at commit 5195dd2cbe598047ad0a2e446a829546f6ffc9eb (v1.1)
+#
+# Original author: Charles-Francois Natali (c.f.natali[at]gmail.com)
+# Maintainer: Berker Peksag (berker.peksag[at]gmail.com)
+# Also see https://pypi.python.org/pypi/selectors34
+"""Selectors module.
+
+This module allows high-level and efficient I/O multiplexing, built upon the
+`select` module primitives.
+
+The following code adapted from trollius.selectors.
+"""
+
+
+from abc import ABCMeta, abstractmethod
+from collections import namedtuple, Mapping
+from errno import EINTR
+import math
+import select
+import sys
+
+import six
+
+
+def _wrap_error(exc, mapping, key):
+ if key not in mapping:
+ return
+ new_err_cls = mapping[key]
+ new_err = new_err_cls(*exc.args)
+
+ # raise a new exception with the original traceback
+ if hasattr(exc, '__traceback__'):
+ traceback = exc.__traceback__
+ else:
+ traceback = sys.exc_info()[2]
+ six.reraise(new_err_cls, new_err, traceback)
+
+
+# generic events, that must be mapped to implementation-specific ones
+EVENT_READ = (1 << 0)
+EVENT_WRITE = (1 << 1)
+
+
+def _fileobj_to_fd(fileobj):
+ """Return a file descriptor from a file object.
+
+ Parameters:
+ fileobj -- file object or file descriptor
+
+ Returns:
+ corresponding file descriptor
+
+ Raises:
+ ValueError if the object is invalid
+ """
+ if isinstance(fileobj, six.integer_types):
+ fd = fileobj
+ else:
+ try:
+ fd = int(fileobj.fileno())
+ except (AttributeError, TypeError, ValueError):
+ raise ValueError("Invalid file object: "
+ "{0!r}".format(fileobj))
+ if fd < 0:
+ raise ValueError("Invalid file descriptor: {0}".format(fd))
+ return fd
+
+
+SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
+"""Object used to associate a file object to its backing file descriptor,
+selected event mask and attached data."""
+
+
+class _SelectorMapping(Mapping):
+ """Mapping of file objects to selector keys."""
+
+ def __init__(self, selector):
+ self._selector = selector
+
+ def __len__(self):
+ return len(self._selector._fd_to_key)
+
+ def __getitem__(self, fileobj):
+ try:
+ fd = self._selector._fileobj_lookup(fileobj)
+ return self._selector._fd_to_key[fd]
+ except KeyError:
+ raise KeyError("{0!r} is not registered".format(fileobj))
+
+ def __iter__(self):
+ return iter(self._selector._fd_to_key)
+
+
+class BaseSelector(six.with_metaclass(ABCMeta)):
+ """Selector abstract base class.
+
+ A selector supports registering file objects to be monitored for specific
+ I/O events.
+
+ A file object is a file descriptor or any object with a `fileno()` method.
+ An arbitrary object can be attached to the file object, which can be used
+ for example to store context information, a callback, etc.
+
+ A selector can use various implementations (select(), poll(), epoll()...)
+ depending on the platform. The default `Selector` class uses the most
+ efficient implementation on the current platform.
+ """
+
+ @abstractmethod
+ def register(self, fileobj, events, data=None):
+ """Register a file object.
+
+ Parameters:
+ fileobj -- file object or file descriptor
+ events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
+ data -- attached data
+
+ Returns:
+ SelectorKey instance
+
+ Raises:
+ ValueError if events is invalid
+ KeyError if fileobj is already registered
+ OSError if fileobj is closed or otherwise is unacceptable to
+ the underlying system call (if a system call is made)
+
+ Note:
+ OSError may or may not be raised
+ """
+ raise NotImplementedError
+
+ @abstractmethod
+ def unregister(self, fileobj):
+ """Unregister a file object.
+
+ Parameters:
+ fileobj -- file object or file descriptor
+
+ Returns:
+ SelectorKey instance
+
+ Raises:
+ KeyError if fileobj is not registered
+
+ Note:
+ If fileobj is registered but has since been closed this does
+ *not* raise OSError (even if the wrapped syscall does)
+ """
+ raise NotImplementedError
+
+ def modify(self, fileobj, events, data=None):
+ """Change a registered file object monitored events or attached data.
+
+ Parameters:
+ fileobj -- file object or file descriptor
+ events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
+ data -- attached data
+
+ Returns:
+ SelectorKey instance
+
+ Raises:
+ Anything that unregister() or register() raises
+ """
+ self.unregister(fileobj)
+ return self.register(fileobj, events, data)
+
+ @abstractmethod
+ def select(self, timeout=None):
+ """Perform the actual selection, until some monitored file objects are
+ ready or a timeout expires.
+
+ Parameters:
+ timeout -- if timeout > 0, this specifies the maximum wait time, in
+ seconds
+ if timeout <= 0, the select() call won't block, and will
+ report the currently ready file objects
+ if timeout is None, select() will block until a monitored
+ file object becomes ready
+
+ Returns:
+ list of (key, events) for ready file objects
+ `events` is a bitwise mask of EVENT_READ|EVENT_WRITE
+ """
+ raise NotImplementedError
+
+ def close(self):
+ """Close the selector.
+
+ This must be called to make sure that any underlying resource is freed.
+ """
+ pass
+
+ def get_key(self, fileobj):
+ """Return the key associated to a registered file object.
+
+ Returns:
+ SelectorKey for this file object
+ """
+ mapping = self.get_map()
+ if mapping is None:
+ raise RuntimeError('Selector is closed')
+ try:
+ return mapping[fileobj]
+ except KeyError:
+ raise KeyError("{0!r} is not registered".format(fileobj))
+
+ @abstractmethod
+ def get_map(self):
+ """Return a mapping of file objects to selector keys."""
+ raise NotImplementedError
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+
+class _BaseSelectorImpl(BaseSelector):
+ """Base selector implementation."""
+
+ def __init__(self):
+ # this maps file descriptors to keys
+ self._fd_to_key = {}
+ # read-only mapping returned by get_map()
+ self._map = _SelectorMapping(self)
+
+ def _fileobj_lookup(self, fileobj):
+ """Return a file descriptor from a file object.
+
+ This wraps _fileobj_to_fd() to do an exhaustive search in case
+ the object is invalid but we still have it in our map. This
+ is used by unregister() so we can unregister an object that
+ was previously registered even if it is closed. It is also
+ used by _SelectorMapping.
+ """
+ try:
+ return _fileobj_to_fd(fileobj)
+ except ValueError:
+ # Do an exhaustive search.
+ for key in self._fd_to_key.values():
+ if key.fileobj is fileobj:
+ return key.fd
+ # Raise ValueError after all.
+ raise
+
+ def register(self, fileobj, events, data=None):
+ if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
+ raise ValueError("Invalid events: {0!r}".format(events))
+
+ key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
+
+ if key.fd in self._fd_to_key:
+ raise KeyError("{0!r} (FD {1}) is already registered"
+ .format(fileobj, key.fd))
+
+ self._fd_to_key[key.fd] = key
+ return key
+
+ def unregister(self, fileobj):
+ try:
+ key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
+ except KeyError:
+ raise KeyError("{0!r} is not registered".format(fileobj))
+ return key
+
+ def modify(self, fileobj, events, data=None):
+ # TODO: Subclasses can probably optimize this even further.
+ try:
+ key = self._fd_to_key[self._fileobj_lookup(fileobj)]
+ except KeyError:
+ raise KeyError("{0!r} is not registered".format(fileobj))
+ if events != key.events:
+ self.unregister(fileobj)
+ key = self.register(fileobj, events, data)
+ elif data != key.data:
+ # Use a shortcut to update the data.
+ key = key._replace(data=data)
+ self._fd_to_key[key.fd] = key
+ return key
+
+ def close(self):
+ self._fd_to_key.clear()
+ self._map = None
+
+ def get_map(self):
+ return self._map
+
+ def _key_from_fd(self, fd):
+ """Return the key associated to a given file descriptor.
+
+ Parameters:
+ fd -- file descriptor
+
+ Returns:
+ corresponding key, or None if not found
+ """
+ try:
+ return self._fd_to_key[fd]
+ except KeyError:
+ return None
+
+
+class SelectSelector(_BaseSelectorImpl):
+ """Select-based selector."""
+
+ def __init__(self):
+ super(SelectSelector, self).__init__()
+ self._readers = set()
+ self._writers = set()
+
+ def register(self, fileobj, events, data=None):
+ key = super(SelectSelector, self).register(fileobj, events, data)
+ if events & EVENT_READ:
+ self._readers.add(key.fd)
+ if events & EVENT_WRITE:
+ self._writers.add(key.fd)
+ return key
+
+ def unregister(self, fileobj):
+ key = super(SelectSelector, self).unregister(fileobj)
+ self._readers.discard(key.fd)
+ self._writers.discard(key.fd)
+ return key
+
+ if sys.platform == 'win32':
+ def _select(self, r, w, _, timeout=None):
+ r, w, x = select.select(r, w, w, timeout)
+ return r, w + x, []
+ else:
+ _select = select.select
+
+ def select(self, timeout=None):
+ timeout = None if timeout is None else max(timeout, 0)
+ ready = []
+ try:
+ r, w, _ = self._select(self._readers, self._writers, [], timeout)
+ except select.error as exc:
+ if exc.args[0] == EINTR:
+ return ready
+ else:
+ raise
+ r = set(r)
+ w = set(w)
+ for fd in r | w:
+ events = 0
+ if fd in r:
+ events |= EVENT_READ
+ if fd in w:
+ events |= EVENT_WRITE
+
+ key = self._key_from_fd(fd)
+ if key:
+ ready.append((key, events & key.events))
+ return ready
+
+
+if hasattr(select, 'poll'):
+
+ class PollSelector(_BaseSelectorImpl):
+ """Poll-based selector."""
+
+ def __init__(self):
+ super(PollSelector, self).__init__()
+ self._poll = select.poll()
+
+ def register(self, fileobj, events, data=None):
+ key = super(PollSelector, self).register(fileobj, events, data)
+ poll_events = 0
+ if events & EVENT_READ:
+ poll_events |= select.POLLIN
+ if events & EVENT_WRITE:
+ poll_events |= select.POLLOUT
+ self._poll.register(key.fd, poll_events)
+ return key
+
+ def unregister(self, fileobj):
+ key = super(PollSelector, self).unregister(fileobj)
+ self._poll.unregister(key.fd)
+ return key
+
+ def select(self, timeout=None):
+ if timeout is None:
+ timeout = None
+ elif timeout <= 0:
+ timeout = 0
+ else:
+ # poll() has a resolution of 1 millisecond, round away from
+ # zero to wait *at least* timeout seconds.
+ timeout = int(math.ceil(timeout * 1e3))
+ ready = []
+ try:
+ fd_event_list = self._poll.poll(timeout)
+ except select.error as exc:
+ if exc.args[0] == EINTR:
+ return ready
+ else:
+ raise
+ for fd, event in fd_event_list:
+ events = 0
+ if event & ~select.POLLIN:
+ events |= EVENT_WRITE
+ if event & ~select.POLLOUT:
+ events |= EVENT_READ
+
+ key = self._key_from_fd(fd)
+ if key:
+ ready.append((key, events & key.events))
+ return ready
+
+
+if hasattr(select, 'epoll'):
+
+ class EpollSelector(_BaseSelectorImpl):
+ """Epoll-based selector."""
+
+ def __init__(self):
+ super(EpollSelector, self).__init__()
+ self._epoll = select.epoll()
+
+ def fileno(self):
+ return self._epoll.fileno()
+
+ def register(self, fileobj, events, data=None):
+ key = super(EpollSelector, self).register(fileobj, events, data)
+ epoll_events = 0
+ if events & EVENT_READ:
+ epoll_events |= select.EPOLLIN
+ if events & EVENT_WRITE:
+ epoll_events |= select.EPOLLOUT
+ self._epoll.register(key.fd, epoll_events)
+ return key
+
+ def unregister(self, fileobj):
+ key = super(EpollSelector, self).unregister(fileobj)
+ try:
+ self._epoll.unregister(key.fd)
+ except IOError:
+ # This can happen if the FD was closed since it
+ # was registered.
+ pass
+ return key
+
+ def select(self, timeout=None):
+ if timeout is None:
+ timeout = -1
+ elif timeout <= 0:
+ timeout = 0
+ else:
+ # epoll_wait() has a resolution of 1 millisecond, round away
+ # from zero to wait *at least* timeout seconds.
+ timeout = math.ceil(timeout * 1e3) * 1e-3
+
+ # epoll_wait() expects `maxevents` to be greater than zero;
+ # we want to make sure that `select()` can be called when no
+ # FD is registered.
+ max_ev = max(len(self._fd_to_key), 1)
+
+ ready = []
+ try:
+ fd_event_list = self._epoll.poll(timeout, max_ev)
+ except IOError as exc:
+ if exc.errno == EINTR:
+ return ready
+ else:
+ raise
+ for fd, event in fd_event_list:
+ events = 0
+ if event & ~select.EPOLLIN:
+ events |= EVENT_WRITE
+ if event & ~select.EPOLLOUT:
+ events |= EVENT_READ
+
+ key = self._key_from_fd(fd)
+ if key:
+ ready.append((key, events & key.events))
+ return ready
+
+ def close(self):
+ self._epoll.close()
+ super(EpollSelector, self).close()
+
+
+if hasattr(select, 'devpoll'):
+
+ class DevpollSelector(_BaseSelectorImpl):
+ """Solaris /dev/poll selector."""
+
+ def __init__(self):
+ super(DevpollSelector, self).__init__()
+ self._devpoll = select.devpoll()
+
+ def fileno(self):
+ return self._devpoll.fileno()
+
+ def register(self, fileobj, events, data=None):
+ key = super(DevpollSelector, self).register(fileobj, events, data)
+ poll_events = 0
+ if events & EVENT_READ:
+ poll_events |= select.POLLIN
+ if events & EVENT_WRITE:
+ poll_events |= select.POLLOUT
+ self._devpoll.register(key.fd, poll_events)
+ return key
+
+ def unregister(self, fileobj):
+ key = super(DevpollSelector, self).unregister(fileobj)
+ self._devpoll.unregister(key.fd)
+ return key
+
+ def select(self, timeout=None):
+ if timeout is None:
+ timeout = None
+ elif timeout <= 0:
+ timeout = 0
+ else:
+ # devpoll() has a resolution of 1 millisecond, round away from
+ # zero to wait *at least* timeout seconds.
+ timeout = math.ceil(timeout * 1e3)
+ ready = []
+ try:
+ fd_event_list = self._devpoll.poll(timeout)
+ except OSError as exc:
+ if exc.errno == EINTR:
+ return ready
+ else:
+ raise
+ for fd, event in fd_event_list:
+ events = 0
+ if event & ~select.POLLIN:
+ events |= EVENT_WRITE
+ if event & ~select.POLLOUT:
+ events |= EVENT_READ
+
+ key = self._key_from_fd(fd)
+ if key:
+ ready.append((key, events & key.events))
+ return ready
+
+ def close(self):
+ self._devpoll.close()
+ super(DevpollSelector, self).close()
+
+
+if hasattr(select, 'kqueue'):
+
+ class KqueueSelector(_BaseSelectorImpl):
+ """Kqueue-based selector."""
+
+ def __init__(self):
+ super(KqueueSelector, self).__init__()
+ self._kqueue = select.kqueue()
+
+ def fileno(self):
+ return self._kqueue.fileno()
+
+ def register(self, fileobj, events, data=None):
+ key = super(KqueueSelector, self).register(fileobj, events, data)
+ if events & EVENT_READ:
+ kev = select.kevent(key.fd, select.KQ_FILTER_READ,
+ select.KQ_EV_ADD)
+ self._kqueue.control([kev], 0, 0)
+ if events & EVENT_WRITE:
+ kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
+ select.KQ_EV_ADD)
+ self._kqueue.control([kev], 0, 0)
+ return key
+
+ def unregister(self, fileobj):
+ key = super(KqueueSelector, self).unregister(fileobj)
+ if key.events & EVENT_READ:
+ kev = select.kevent(key.fd, select.KQ_FILTER_READ,
+ select.KQ_EV_DELETE)
+ try:
+ self._kqueue.control([kev], 0, 0)
+ except OSError:
+ # This can happen if the FD was closed since it
+ # was registered.
+ pass
+ if key.events & EVENT_WRITE:
+ kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
+ select.KQ_EV_DELETE)
+ try:
+ self._kqueue.control([kev], 0, 0)
+ except OSError:
+ # See comment above.
+ pass
+ return key
+
+ def select(self, timeout=None):
+ timeout = None if timeout is None else max(timeout, 0)
+ max_ev = len(self._fd_to_key)
+ ready = []
+ try:
+ kev_list = self._kqueue.control(None, max_ev, timeout)
+ except OSError as exc:
+ if exc.errno == EINTR:
+ return ready
+ else:
+ raise
+ for kev in kev_list:
+ fd = kev.ident
+ flag = kev.filter
+ events = 0
+ if flag == select.KQ_FILTER_READ:
+ events |= EVENT_READ
+ if flag == select.KQ_FILTER_WRITE:
+ events |= EVENT_WRITE
+
+ key = self._key_from_fd(fd)
+ if key:
+ ready.append((key, events & key.events))
+ return ready
+
+ def close(self):
+ self._kqueue.close()
+ super(KqueueSelector, self).close()
+
+
+# Choose the best implementation, roughly:
+# epoll|kqueue|devpoll > poll > select.
+# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
+if 'KqueueSelector' in globals():
+ DefaultSelector = KqueueSelector
+elif 'EpollSelector' in globals():
+ DefaultSelector = EpollSelector
+elif 'DevpollSelector' in globals():
+ DefaultSelector = DevpollSelector
+elif 'PollSelector' in globals():
+ DefaultSelector = PollSelector
+else:
+ DefaultSelector = SelectSelector
diff --git a/test/test_client_async.py b/test/test_client_async.py
index ad76aad..922e43c 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -1,3 +1,10 @@
+# selectors in stdlib as of py3.4
+try:
+ import selectors # pylint: disable=import-error
+except ImportError:
+ # vendored backport module
+ import kafka.selectors34 as selectors
+
import socket
import time
@@ -99,15 +106,19 @@ def test_maybe_connect(conn):
def test_conn_state_change(mocker, conn):
cli = KafkaClient()
+ sel = mocker.patch.object(cli, '_selector')
node_id = 0
conn.state = ConnectionStates.CONNECTING
cli._conn_state_change(node_id, conn)
assert node_id in cli._connecting
+ sel.register.assert_called_with(conn._sock, selectors.EVENT_WRITE)
conn.state = ConnectionStates.CONNECTED
cli._conn_state_change(node_id, conn)
assert node_id not in cli._connecting
+ sel.unregister.assert_called_with(conn._sock)
+ sel.register.assert_called_with(conn._sock, selectors.EVENT_READ, conn)
# Failure to connect should trigger metadata update
assert cli.cluster._need_update is False
@@ -115,6 +126,7 @@ def test_conn_state_change(mocker, conn):
cli._conn_state_change(node_id, conn)
assert node_id not in cli._connecting
assert cli.cluster._need_update is True
+ sel.unregister.assert_called_with(conn._sock)
conn.state = ConnectionStates.CONNECTING
cli._conn_state_change(node_id, conn)
@@ -167,8 +179,9 @@ def test_is_ready(mocker, conn):
assert not cli.is_ready(0)
-def test_close(conn):
+def test_close(mocker, conn):
cli = KafkaClient()
+ mocker.patch.object(cli, '_selector')
# Unknown node - silent
cli.close(2)