diff options
Diffstat (limited to 'kafka/selectors34.py')
-rw-r--r-- | kafka/selectors34.py | 635 |
1 files changed, 0 insertions, 635 deletions
diff --git a/kafka/selectors34.py b/kafka/selectors34.py deleted file mode 100644 index 541c29c..0000000 --- a/kafka/selectors34.py +++ /dev/null @@ -1,635 +0,0 @@ -# pylint: skip-file -# vendored from https://github.com/berkerpeksag/selectors34 -# at commit 5195dd2cbe598047ad0a2e446a829546f6ffc9eb (v1.1) -# -# Original author: Charles-Francois Natali (c.f.natali[at]gmail.com) -# Maintainer: Berker Peksag (berker.peksag[at]gmail.com) -# Also see https://pypi.python.org/pypi/selectors34 -"""Selectors module. - -This module allows high-level and efficient I/O multiplexing, built upon the -`select` module primitives. - -The following code adapted from trollius.selectors. -""" - - -from abc import ABCMeta, abstractmethod -from collections import namedtuple, Mapping -from errno import EINTR -import math -import select -import sys - -import six - - -def _wrap_error(exc, mapping, key): - if key not in mapping: - return - new_err_cls = mapping[key] - new_err = new_err_cls(*exc.args) - - # raise a new exception with the original traceback - if hasattr(exc, '__traceback__'): - traceback = exc.__traceback__ - else: - traceback = sys.exc_info()[2] - six.reraise(new_err_cls, new_err, traceback) - - -# generic events, that must be mapped to implementation-specific ones -EVENT_READ = (1 << 0) -EVENT_WRITE = (1 << 1) - - -def _fileobj_to_fd(fileobj): - """Return a file descriptor from a file object. - - Parameters: - fileobj -- file object or file descriptor - - Returns: - corresponding file descriptor - - Raises: - ValueError if the object is invalid - """ - if isinstance(fileobj, six.integer_types): - fd = fileobj - else: - try: - fd = int(fileobj.fileno()) - except (AttributeError, TypeError, ValueError): - raise ValueError("Invalid file object: " - "{0!r}".format(fileobj)) - if fd < 0: - raise ValueError("Invalid file descriptor: {0}".format(fd)) - return fd - - -SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) -"""Object used to associate a file object to its backing file descriptor, -selected event mask and attached data.""" - - -class _SelectorMapping(Mapping): - """Mapping of file objects to selector keys.""" - - def __init__(self, selector): - self._selector = selector - - def __len__(self): - return len(self._selector._fd_to_key) - - def __getitem__(self, fileobj): - try: - fd = self._selector._fileobj_lookup(fileobj) - return self._selector._fd_to_key[fd] - except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) - - def __iter__(self): - return iter(self._selector._fd_to_key) - - -class BaseSelector(six.with_metaclass(ABCMeta)): - """Selector abstract base class. - - A selector supports registering file objects to be monitored for specific - I/O events. - - A file object is a file descriptor or any object with a `fileno()` method. - An arbitrary object can be attached to the file object, which can be used - for example to store context information, a callback, etc. - - A selector can use various implementations (select(), poll(), epoll()...) - depending on the platform. The default `Selector` class uses the most - efficient implementation on the current platform. - """ - - @abstractmethod - def register(self, fileobj, events, data=None): - """Register a file object. - - Parameters: - fileobj -- file object or file descriptor - events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) - data -- attached data - - Returns: - SelectorKey instance - - Raises: - ValueError if events is invalid - KeyError if fileobj is already registered - OSError if fileobj is closed or otherwise is unacceptable to - the underlying system call (if a system call is made) - - Note: - OSError may or may not be raised - """ - raise NotImplementedError - - @abstractmethod - def unregister(self, fileobj): - """Unregister a file object. - - Parameters: - fileobj -- file object or file descriptor - - Returns: - SelectorKey instance - - Raises: - KeyError if fileobj is not registered - - Note: - If fileobj is registered but has since been closed this does - *not* raise OSError (even if the wrapped syscall does) - """ - raise NotImplementedError - - def modify(self, fileobj, events, data=None): - """Change a registered file object monitored events or attached data. - - Parameters: - fileobj -- file object or file descriptor - events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) - data -- attached data - - Returns: - SelectorKey instance - - Raises: - Anything that unregister() or register() raises - """ - self.unregister(fileobj) - return self.register(fileobj, events, data) - - @abstractmethod - def select(self, timeout=None): - """Perform the actual selection, until some monitored file objects are - ready or a timeout expires. - - Parameters: - timeout -- if timeout > 0, this specifies the maximum wait time, in - seconds - if timeout <= 0, the select() call won't block, and will - report the currently ready file objects - if timeout is None, select() will block until a monitored - file object becomes ready - - Returns: - list of (key, events) for ready file objects - `events` is a bitwise mask of EVENT_READ|EVENT_WRITE - """ - raise NotImplementedError - - def close(self): - """Close the selector. - - This must be called to make sure that any underlying resource is freed. - """ - pass - - def get_key(self, fileobj): - """Return the key associated to a registered file object. - - Returns: - SelectorKey for this file object - """ - mapping = self.get_map() - if mapping is None: - raise RuntimeError('Selector is closed') - try: - return mapping[fileobj] - except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) - - @abstractmethod - def get_map(self): - """Return a mapping of file objects to selector keys.""" - raise NotImplementedError - - def __enter__(self): - return self - - def __exit__(self, *args): - self.close() - - -class _BaseSelectorImpl(BaseSelector): - """Base selector implementation.""" - - def __init__(self): - # this maps file descriptors to keys - self._fd_to_key = {} - # read-only mapping returned by get_map() - self._map = _SelectorMapping(self) - - def _fileobj_lookup(self, fileobj): - """Return a file descriptor from a file object. - - This wraps _fileobj_to_fd() to do an exhaustive search in case - the object is invalid but we still have it in our map. This - is used by unregister() so we can unregister an object that - was previously registered even if it is closed. It is also - used by _SelectorMapping. - """ - try: - return _fileobj_to_fd(fileobj) - except ValueError: - # Do an exhaustive search. - for key in self._fd_to_key.values(): - if key.fileobj is fileobj: - return key.fd - # Raise ValueError after all. - raise - - def register(self, fileobj, events, data=None): - if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): - raise ValueError("Invalid events: {0!r}".format(events)) - - key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) - - if key.fd in self._fd_to_key: - raise KeyError("{0!r} (FD {1}) is already registered" - .format(fileobj, key.fd)) - - self._fd_to_key[key.fd] = key - return key - - def unregister(self, fileobj): - try: - key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) - except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) - return key - - def modify(self, fileobj, events, data=None): - # TODO: Subclasses can probably optimize this even further. - try: - key = self._fd_to_key[self._fileobj_lookup(fileobj)] - except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) - if events != key.events: - self.unregister(fileobj) - key = self.register(fileobj, events, data) - elif data != key.data: - # Use a shortcut to update the data. - key = key._replace(data=data) - self._fd_to_key[key.fd] = key - return key - - def close(self): - self._fd_to_key.clear() - self._map = None - - def get_map(self): - return self._map - - def _key_from_fd(self, fd): - """Return the key associated to a given file descriptor. - - Parameters: - fd -- file descriptor - - Returns: - corresponding key, or None if not found - """ - try: - return self._fd_to_key[fd] - except KeyError: - return None - - -class SelectSelector(_BaseSelectorImpl): - """Select-based selector.""" - - def __init__(self): - super(SelectSelector, self).__init__() - self._readers = set() - self._writers = set() - - def register(self, fileobj, events, data=None): - key = super(SelectSelector, self).register(fileobj, events, data) - if events & EVENT_READ: - self._readers.add(key.fd) - if events & EVENT_WRITE: - self._writers.add(key.fd) - return key - - def unregister(self, fileobj): - key = super(SelectSelector, self).unregister(fileobj) - self._readers.discard(key.fd) - self._writers.discard(key.fd) - return key - - if sys.platform == 'win32': - def _select(self, r, w, _, timeout=None): - r, w, x = select.select(r, w, w, timeout) - return r, w + x, [] - else: - _select = select.select - - def select(self, timeout=None): - timeout = None if timeout is None else max(timeout, 0) - ready = [] - try: - r, w, _ = self._select(self._readers, self._writers, [], timeout) - except select.error as exc: - if exc.args[0] == EINTR: - return ready - else: - raise - r = set(r) - w = set(w) - for fd in r | w: - events = 0 - if fd in r: - events |= EVENT_READ - if fd in w: - events |= EVENT_WRITE - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - -if hasattr(select, 'poll'): - - class PollSelector(_BaseSelectorImpl): - """Poll-based selector.""" - - def __init__(self): - super(PollSelector, self).__init__() - self._poll = select.poll() - - def register(self, fileobj, events, data=None): - key = super(PollSelector, self).register(fileobj, events, data) - poll_events = 0 - if events & EVENT_READ: - poll_events |= select.POLLIN - if events & EVENT_WRITE: - poll_events |= select.POLLOUT - self._poll.register(key.fd, poll_events) - return key - - def unregister(self, fileobj): - key = super(PollSelector, self).unregister(fileobj) - self._poll.unregister(key.fd) - return key - - def select(self, timeout=None): - if timeout is None: - timeout = None - elif timeout <= 0: - timeout = 0 - else: - # poll() has a resolution of 1 millisecond, round away from - # zero to wait *at least* timeout seconds. - timeout = int(math.ceil(timeout * 1e3)) - ready = [] - try: - fd_event_list = self._poll.poll(timeout) - except select.error as exc: - if exc.args[0] == EINTR: - return ready - else: - raise - for fd, event in fd_event_list: - events = 0 - if event & ~select.POLLIN: - events |= EVENT_WRITE - if event & ~select.POLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - -if hasattr(select, 'epoll'): - - class EpollSelector(_BaseSelectorImpl): - """Epoll-based selector.""" - - def __init__(self): - super(EpollSelector, self).__init__() - self._epoll = select.epoll() - - def fileno(self): - return self._epoll.fileno() - - def register(self, fileobj, events, data=None): - key = super(EpollSelector, self).register(fileobj, events, data) - epoll_events = 0 - if events & EVENT_READ: - epoll_events |= select.EPOLLIN - if events & EVENT_WRITE: - epoll_events |= select.EPOLLOUT - self._epoll.register(key.fd, epoll_events) - return key - - def unregister(self, fileobj): - key = super(EpollSelector, self).unregister(fileobj) - try: - self._epoll.unregister(key.fd) - except IOError: - # This can happen if the FD was closed since it - # was registered. - pass - return key - - def select(self, timeout=None): - if timeout is None: - timeout = -1 - elif timeout <= 0: - timeout = 0 - else: - # epoll_wait() has a resolution of 1 millisecond, round away - # from zero to wait *at least* timeout seconds. - timeout = math.ceil(timeout * 1e3) * 1e-3 - - # epoll_wait() expects `maxevents` to be greater than zero; - # we want to make sure that `select()` can be called when no - # FD is registered. - max_ev = max(len(self._fd_to_key), 1) - - ready = [] - try: - fd_event_list = self._epoll.poll(timeout, max_ev) - except IOError as exc: - if exc.errno == EINTR: - return ready - else: - raise - for fd, event in fd_event_list: - events = 0 - if event & ~select.EPOLLIN: - events |= EVENT_WRITE - if event & ~select.EPOLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - def close(self): - self._epoll.close() - super(EpollSelector, self).close() - - -if hasattr(select, 'devpoll'): - - class DevpollSelector(_BaseSelectorImpl): - """Solaris /dev/poll selector.""" - - def __init__(self): - super(DevpollSelector, self).__init__() - self._devpoll = select.devpoll() - - def fileno(self): - return self._devpoll.fileno() - - def register(self, fileobj, events, data=None): - key = super(DevpollSelector, self).register(fileobj, events, data) - poll_events = 0 - if events & EVENT_READ: - poll_events |= select.POLLIN - if events & EVENT_WRITE: - poll_events |= select.POLLOUT - self._devpoll.register(key.fd, poll_events) - return key - - def unregister(self, fileobj): - key = super(DevpollSelector, self).unregister(fileobj) - self._devpoll.unregister(key.fd) - return key - - def select(self, timeout=None): - if timeout is None: - timeout = None - elif timeout <= 0: - timeout = 0 - else: - # devpoll() has a resolution of 1 millisecond, round away from - # zero to wait *at least* timeout seconds. - timeout = math.ceil(timeout * 1e3) - ready = [] - try: - fd_event_list = self._devpoll.poll(timeout) - except OSError as exc: - if exc.errno == EINTR: - return ready - else: - raise - for fd, event in fd_event_list: - events = 0 - if event & ~select.POLLIN: - events |= EVENT_WRITE - if event & ~select.POLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - def close(self): - self._devpoll.close() - super(DevpollSelector, self).close() - - -if hasattr(select, 'kqueue'): - - class KqueueSelector(_BaseSelectorImpl): - """Kqueue-based selector.""" - - def __init__(self): - super(KqueueSelector, self).__init__() - self._kqueue = select.kqueue() - - def fileno(self): - return self._kqueue.fileno() - - def register(self, fileobj, events, data=None): - key = super(KqueueSelector, self).register(fileobj, events, data) - if events & EVENT_READ: - kev = select.kevent(key.fd, select.KQ_FILTER_READ, - select.KQ_EV_ADD) - self._kqueue.control([kev], 0, 0) - if events & EVENT_WRITE: - kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, - select.KQ_EV_ADD) - self._kqueue.control([kev], 0, 0) - return key - - def unregister(self, fileobj): - key = super(KqueueSelector, self).unregister(fileobj) - if key.events & EVENT_READ: - kev = select.kevent(key.fd, select.KQ_FILTER_READ, - select.KQ_EV_DELETE) - try: - self._kqueue.control([kev], 0, 0) - except OSError: - # This can happen if the FD was closed since it - # was registered. - pass - if key.events & EVENT_WRITE: - kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, - select.KQ_EV_DELETE) - try: - self._kqueue.control([kev], 0, 0) - except OSError: - # See comment above. - pass - return key - - def select(self, timeout=None): - timeout = None if timeout is None else max(timeout, 0) - max_ev = len(self._fd_to_key) - ready = [] - try: - kev_list = self._kqueue.control(None, max_ev, timeout) - except OSError as exc: - if exc.errno == EINTR: - return ready - else: - raise - for kev in kev_list: - fd = kev.ident - flag = kev.filter - events = 0 - if flag == select.KQ_FILTER_READ: - events |= EVENT_READ - if flag == select.KQ_FILTER_WRITE: - events |= EVENT_WRITE - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - def close(self): - self._kqueue.close() - super(KqueueSelector, self).close() - - -# Choose the best implementation, roughly: -# epoll|kqueue|devpoll > poll > select. -# select() also can't accept a FD > FD_SETSIZE (usually around 1024) -if 'KqueueSelector' in globals(): - DefaultSelector = KqueueSelector -elif 'EpollSelector' in globals(): - DefaultSelector = EpollSelector -elif 'DevpollSelector' in globals(): - DefaultSelector = DevpollSelector -elif 'PollSelector' in globals(): - DefaultSelector = PollSelector -else: - DefaultSelector = SelectSelector |